Source code for DiatomTrack.core.tracker

import copy
import ray

import numpy as np
import pandas as pd

from filterpy.kalman import KalmanFilter

from scipy.spatial.distance import sqeuclidean as euclid
from tqdm.tk import tqdm_tk as tqdm

from ..core.assignment import associate, associate_secondary


[docs]class KalmanTracker: """ This class contains the Kalman filter of individual tracked objects observed as a rotated bounding box and the history of that object and its ancestors. """ tree = 0 count = 0 def __init__(self, rect, start, area, dist=200, boundary=(2048,1536)): """ Initialize the object and Kalman filter with the rotated bounding box. Parameters ---------- rect : array The box parameters. start : int The current frame. area : float The box area. dist : int, optional The cutoff distance for predictions. The default is 200. boundary : tuple, optional Dimensions of the image. The default is (2048,1536). Returns ------- None. """ # Initialize filter with constant velocity model self.kf = KalmanFilter(dim_x=12, dim_z=5) d = 1 a = 0.5 self.kf.F = np.array([ [1,0,0,0,0,d,0,0,0,0,a,0], [0,1,0,0,0,0,d,0,0,0,0,a], [0,0,1,0,0,0,0,d,0,0,0,0], [0,0,0,1,0,0,0,0,d,0,0,0], [0,0,0,0,1,0,0,0,0,d,0,0], [0,0,0,0,0,1,0,0,0,0,d,0], [0,0,0,0,0,0,1,0,0,0,0,d], [0,0,0,0,0,0,0,1,0,0,0,0], [0,0,0,0,0,0,0,0,1,0,0,0], [0,0,0,0,0,0,0,0,0,1,0,0], [0,0,0,0,0,0,0,0,0,0,1,0], [0,0,0,0,0,0,0,0,0,0,0,1] ]) self.kf.H = np.array([ [1,0,0,0,0,0,0,0,0,0,0,0], [0,1,0,0,0,0,0,0,0,0,0,0], [0,0,1,0,0,0,0,0,0,0,0,0], [0,0,0,1,0,0,0,0,0,0,0,0], [0,0,0,0,1,0,0,0,0,0,0,0] ]) self.kf.R[2:,2:] *= 10 self.kf.P[5:,5:] *= 10. self.kf.P *= 10 self.kf.P[10:,10:] *= 10. self.kf.Q *= 0.1 self.kf.Q[5:,5:] *= 0.1 self.kf.x[:5] = rect # KalmanBoxTracker parameters self.distance = dist self.bound = boundary self.time_since_update = 0 self.tree_id = KalmanTracker.tree KalmanTracker.tree += 1 self.id = KalmanTracker.count KalmanTracker.count += 1 self.parent = -1 self.history = [] self.prediction_history = [] self.hits = 0 self.age = 0 self.start = start self.last_state = rect self.reverse_state = None self.detections = [np.concatenate(([start], rect.flatten()))] self.areas = [[area]] self.division = -1 self.summary = None
[docs] def update(self, rect, frame, area): """ Update the filter and filter history. """ self.time_since_update = 0 self.reverse_state = self.last_state self.last_state = rect self.prediction_history.extend(self.history) self.history = [] self.hits += 1 self.kf.P *= 1.01 self.kf.update(rect) self.detections.append(np.concatenate(([frame], rect.flatten()))) self.areas.append([area])
[docs] def predict(self): """ Predict next state from filter while keeping boundary conditions of image, maximum distance, rotation angle < pi, and area > 0. """ self.age += 1 self.time_since_update += 1 if self.time_since_update > 5: self.history.append(self.history[-1]) return self.history[-1] if self.time_since_update > 1: if np.sum( (self.last_state[:2].reshape(-1) - self.history[-1][:2].reshape(-1))**2 ) > self.distance**2: self.history.append(self.history[-1]) return self.history[-1] if ((self.kf.x[4]+self.kf.x[9])<=0): self.kf.x[4] += np.pi elif ((self.kf.x[4]+self.kf.x[9])>np.pi): self.kf.x[4] -= np.pi if ((self.kf.x[2]+self.kf.x[7])<=0): self.kf.x[7] *= 0.0 if ((self.kf.x[3]+self.kf.x[8])<=0): self.kf.x[8] *= 0.0 self.kf.predict() if ((0<=self.kf.x[0]<self.bound[0]) and (0<=self.kf.x[1]<self.bound[1]) and np.sum( (self.last_state[:2].reshape(-1) - self.kf.x[:2].reshape(-1))**2) < self.distance**2 ): self.history.append(self.kf.x) else: if len(self.history) > 0: self.history.append(self.history[-1]) else: x = np.concatenate( [self.last_state, np.zeros(self.kf.x[5:].shape)]) self.history.append(x) return self.history[-1]
[docs] def get_state(self): """ Return the current bounding box estimate. """ return self.kf.x
[docs] def get_predictions(self): """ Return prediciton history. """ return self.prediction_history
[docs] def summarize(self): """ Compiles summary of the tracker object with all relevant information to transform into a dataframe. Perform the side tracking based on minimum difference of adjacent rotation angles. Returns ------- array Summary of the tracker. """ dets = np.stack(self.detections) rot = dets[:,5] dist = np.abs(np.arctan2(np.sin(rot[:-1]-rot[1:]), np.cos(rot[:-1]-rot[1:]))) side = np.empty_like(rot) side[0] = rot[0] side = rot.reshape((-1)) mask = dist > np.pi/2 mask = np.cumsum(mask.astype(int))*np.pi side[1:] += mask side[side > np.pi] %= 2*np.pi length = len(dets[dets[:,0]>=self.start]) general_columns = np.tile( [self.tree_id, self.id, self.start, self.parent, self.division, 0], (length,1)) self.summary = np.concatenate(( general_columns, side[-length:].reshape((-1,1)), dets[-length:], self.areas[-length:] ), axis=1) return self.summary
[docs]class DiatomTrack: """ This class performs the spatial tracking of diatom objects using Kalman filter objects and also tracks the long sides / thecae of the objects. """ def __init__( self, max_age=200, division_age=1000, gap_distance=200, split_distance=50, offset=80): """ Initalize the tracking parameters. Parameters ---------- max_age : int, optional Maximum frames an object may be undetected. The default is 200. division_age : int, optional The minimal frames an object has to be tracked before division. The default is 1000. gap_distance : int, optional The maximum distance for matching after detection gap. The default is 200. split_distance : int, optional The maximum distance to consider splitting. The default is 50. offset : int, optional The maximum distance in Euclidean tracking. The default is 80. Returns ------- None. """ self.max_age = max_age self.division_age = division_age self.gap_distance = gap_distance self.split_dist = split_distance self.offset = offset self.trackers = [] self.expired_trackers = [] self.frame_count = 0 self.summary = None
[docs] def update(self, dets=np.empty((0, 5)), areas=[]): """ Perform the tracking on the data from the next frame and archive expired trackers. Parameters ---------- dets : array, optional The data of detected bounding boxes. The default is np.empty((0, 5)). areas : lilst, optional The areas of the detections. The default is []. Returns ------- None. """ self.frame_count += 1 trks = np.zeros((len(self.trackers), 5)) trks_remain = [] taken = [] for t, trk in enumerate(trks): pos = self.trackers[t].predict().reshape((-1)) trk[:] = [pos[0], pos[1], pos[2], pos[3], pos[4]] if 0 < sum(abs(self.trackers[t].get_state()[5:7])) < 4: trks_remain.append(trk) taken.append(t) trks_remain = np.array(trks_remain) matched, unmatched_dets, unmatched_trks = associate( dets, trks_remain) matched[:,1] = np.array(taken)[matched[:,1]] unmatched_trks = np.delete(np.arange(len(trks)), matched[:,1]) i = 0 while len(unmatched_trks) > 0 and len(unmatched_dets) > 0: func, distance = self._secondary_match(i) if func: matched, unmatched_dets, unmatched_trks = func( dets, unmatched_dets, matched, unmatched_trks, distance, trks, areas=areas) else: break i += 1 for m in matched: self.trackers[m[1]].update( dets[m[0]].reshape((5,1)), self.frame_count, areas[m[0]]) for i in unmatched_dets: trk = KalmanTracker( dets[i].reshape((5,1)), self.frame_count, areas[i], self.gap_distance) self.trackers.append(trk) self._remove_trackers()
def _secondary_match(self, i): """ Iterate over the different matching functions. """ funcs = [ self._match_prediction, self._match_last_state, self._match_reversed, self._match_prediction, self._match_last_state, self._match_reversed, self._match_split, self._match_last_state] dist = [ None, None, None, self.offset**2, self.offset**2, self.offset**2, None, self.gap_distance**2] if i >= len(dist): return False, None return funcs[i], dist[i] def _match_prediction( self, dets, unmatched_dets, matched, unmatched_trks, distance, trks, areas): """ Match predictions with detections. """ to_match_trks = trks[unmatched_trks] return associate_secondary( matched, dets, unmatched_dets, to_match_trks, unmatched_trks, distance) def _match_last_state(self, dets, unmatched_dets, matched, unmatched_trks, distance, trks, areas): """ Match last updated locations with detections """ last_pos_trks = np.zeros((len(unmatched_trks), 5)) for t, trk in zip(unmatched_trks, last_pos_trks): pos = self.trackers[t].last_state.reshape((-1)) trk[:] = [pos[0], pos[1], pos[2], pos[3], pos[4]] return associate_secondary( matched, dets, unmatched_dets, last_pos_trks, unmatched_trks, distance) def _match_reversed(self, dets, unmatched_dets, matched, unmatched_trks, distance, trks, areas): """ Match second last updated location with detections. """ reversed_trks = np.zeros((len(unmatched_trks), 5)) for t, trk in zip(unmatched_trks, reversed_trks): pos = self.trackers[t].reverse_state if pos is None: trk[:] = [0, 0, 0, 0, 0] else: pos = pos.reshape((-1)) trk[:] = [pos[0], pos[1], pos[2], pos[3], pos[4]] return associate_secondary( matched, dets, unmatched_dets, reversed_trks, unmatched_trks, distance) def _match_split(self, dets, unmatched_dets, matched, unmatched_trks, distance, trks, areas): """ Match duplicates of trackers with detections. Archive matched old trackers and create two duplicates in place for successful division. """ split_trks = np.zeros((len(self.trackers), 5)) for t, trk in enumerate(split_trks): if ( self.trackers[t].age > self.division_age or self.trackers[t].parent == -1 ): pos = self.trackers[t].last_state.reshape((-1)) trk[:] = [pos[0], pos[1], pos[2], pos[3], pos[4]] else: trk[:] = [-self.split_dist, -self.split_dist, 0, 0, 0] split_matched, split_unmatched_dets, _ = associate( dets[unmatched_dets], split_trks, distance=self.split_dist**2) for m in split_matched: t = copy.deepcopy(self.trackers[m[1]]) t.division = self.frame_count parent = t.id self.expired_trackers.append(t) self.trackers[m[1]].parent = parent self.trackers[m[1]].start = self.frame_count self.trackers[m[1]].hits = 0 self.trackers[m[1]].age = 0 self.trackers[m[1]].id = KalmanTracker.count KalmanTracker.count += 1 split_trackers = [] for i, m in enumerate(split_matched): split_trackers.append( copy.deepcopy(self.trackers[m[1]])) for m, t in zip(split_matched, split_trackers): t.update( dets[unmatched_dets[m[0]]].reshape((5,1)), self.frame_count, areas[m[0]]) t.id = KalmanTracker.count KalmanTracker.count += 1 self.trackers.append(t) return matched, unmatched_dets[split_unmatched_dets], unmatched_trks def _remove_trackers(self): """ Remove expired or short-lived trackers. """ i = len(self.trackers) for trk in reversed(self.trackers): i -= 1 if ( (trk.time_since_update > self.max_age) or (trk.hits < 20 and trk.time_since_update > 20) or (100 < trk.hits < trk.time_since_update) ): self.expired_trackers.append(self.trackers[i]) self.trackers.pop(i) def _match_rect_long_side(self): """ Match the long side / thecae over splitting events by comparing the center location of the next generation with the center location of the previous generation and then estimating the outside of the closer object with the relative shift. Subsequently assign the other objects older theca. """ self.summary.sort(key = lambda x: x[0][1]) divisions = { f'{int(t[0][3])}': [] for t in self.summary if t[0][3] != -1} for t in self.summary: if t[0][3] != -1: divisions[str(int(t[0][3]))].append(int(t[0][1])) for parent, children in divisions.items(): c1, c2 = children parent = int(parent) if len(self.summary[parent])>20: ppos = np.mean( self.summary[parent][-110:-10, 7:9], axis=0).reshape((-1)) pre_angle = np.mean(self.summary[parent][-110:-10, -2]) else: ppos = np.mean( self.summary[parent][:, 7:9], axis=0).reshape((-1)) pre_angle = np.mean(self.summary[parent][:, -2]) pre_vec = np.array([np.cos(pre_angle), np.sin(pre_angle)]) cpos1 = self.summary[c1][0, 7:9] cpos2 = self.summary[c2][0, 7:9] dist = [euclid(cpos1, ppos), euclid(cpos2, ppos)] ref = np.argmin(dist) dist_vec = (cpos1, cpos2)[ref] - ppos dot_dist = np.dot(dist_vec, pre_vec) if (ref == 0 and dot_dist >= 0) or (dot_dist < 0 and ref != 0): self.summary[c2][:,6] += np.pi self.summary[c2][:,6] %= 2*np.pi else: self.summary[c1][:,6] += np.pi self.summary[c1][:,6] %= 2*np.pi if self.summary[parent][0][3] != -1: angle = self.summary[children[ref]][0,6] ortho_vec = np.array([np.cos(angle), np.sin(angle)]) dot_rot = np.dot(ortho_vec, pre_vec) if (ref == 0 and dot_rot >= 0) or (dot_rot < 0 and ref != 0): self.summary[c2][:,5] = 2 self.summary[c1][:,5] = 1 else: self.summary[c1][:,5] = 2 self.summary[c2][:,5] = 1
[docs] def get_state(self): """ Return current states of all active trackers. Returns ------- state : list States of all active trackers. """ state = [] for trk in self.trackers: state.append((trk.get_state(), trk.time_since_update, trk.tree_id)) return state
[docs] def summarize(self): """ Compile the tracking results and match the long sides / theca over generations. Returns ------- list Summary of all tracking results. """ self.summary = [t.summarize() for t in self.trackers] summary = [t.summarize() for t in self.expired_trackers] self.summary.extend(summary) self._match_rect_long_side() return self.summary
[docs]def track(data, age, offset, split, gap): """ Track diatoms and theca over multiple generations using DiatomTrack. Parameters ---------- data : list The rotated rectangle data for each frame. age : int Maximum frames an object may be undetected. offset : int The maximum distance in Euclidean tracking. split : int The maximum distance to consider splitting. gap : int The maximum distance for matching after detection gap. Returns ------- output : tuple Track data and trackers summarys. Tracker : DiatomTrack object The DiatomTrack object used to track the objects. """ Tracker = DiatomTrack(max_age=age, gap_distance=gap, split_distance=split, offset=offset) tracks = [] output = None ray.init() try: for frame in tqdm(range(len(data)), desc='Tracking frame to frame'): name, frame_data, frame_areas = data[frame] rects = np.array([d[:5] for d in frame_data]) Tracker.update(rects, frame_areas) tracks.append([name, Tracker.get_state()]) finally: ray.shutdown() output = (tracks, Tracker.summarize()) return output, Tracker
[docs]def tracks_to_dataframe(array_list): """ Transform list of DiatomTrack output into a readable Pandas dataframe. Parameters ---------- array_list : list Arrays containing tracking data. Returns ------- dataframe : Pandas dataframe The data in dataframe format. """ array = np.vstack(array_list) dataframe = pd.DataFrame(array, columns=[ 'tree', 'id', 'start', 'parent', 'division', 'age', 'side', 'frame', 'x', 'y', 'width', 'height', 'rotation', 'area']) dataframe['show'] = 1 dataframe['frame'] -= 1 dataframe = dataframe.astype({ 'frame': 'int', 'tree': 'int', 'id': 'int', 'start': 'int', 'parent': 'int', 'division': 'int'}) dataframe.sort_values(by=['frame', 'id'], axis=0, inplace=True) dataframe.set_index('frame', inplace=True, drop=False) dataframe.rename_axis('f', inplace=True) dataframe['row'] = range(len(dataframe)) dataframe.set_index('row', inplace=True, drop=True, append=True) return dataframe