import copy
import ray
import numpy as np
import pandas as pd
from filterpy.kalman import KalmanFilter
from scipy.spatial.distance import sqeuclidean as euclid
from tqdm.tk import tqdm_tk as tqdm
from ..core.assignment import associate, associate_secondary
[docs]class KalmanTracker:
"""
This class contains the Kalman filter of individual tracked objects
observed as a rotated bounding box and the history of that object and its
ancestors.
"""
tree = 0
count = 0
def __init__(self, rect, start, area, dist=200, boundary=(2048,1536)):
"""
Initialize the object and Kalman filter with the rotated bounding box.
Parameters
----------
rect : array
The box parameters.
start : int
The current frame.
area : float
The box area.
dist : int, optional
The cutoff distance for predictions. The default is 200.
boundary : tuple, optional
Dimensions of the image. The default is (2048,1536).
Returns
-------
None.
"""
# Initialize filter with constant velocity model
self.kf = KalmanFilter(dim_x=12, dim_z=5)
d = 1
a = 0.5
self.kf.F = np.array([
[1,0,0,0,0,d,0,0,0,0,a,0],
[0,1,0,0,0,0,d,0,0,0,0,a],
[0,0,1,0,0,0,0,d,0,0,0,0],
[0,0,0,1,0,0,0,0,d,0,0,0],
[0,0,0,0,1,0,0,0,0,d,0,0],
[0,0,0,0,0,1,0,0,0,0,d,0],
[0,0,0,0,0,0,1,0,0,0,0,d],
[0,0,0,0,0,0,0,1,0,0,0,0],
[0,0,0,0,0,0,0,0,1,0,0,0],
[0,0,0,0,0,0,0,0,0,1,0,0],
[0,0,0,0,0,0,0,0,0,0,1,0],
[0,0,0,0,0,0,0,0,0,0,0,1]
])
self.kf.H = np.array([
[1,0,0,0,0,0,0,0,0,0,0,0],
[0,1,0,0,0,0,0,0,0,0,0,0],
[0,0,1,0,0,0,0,0,0,0,0,0],
[0,0,0,1,0,0,0,0,0,0,0,0],
[0,0,0,0,1,0,0,0,0,0,0,0]
])
self.kf.R[2:,2:] *= 10
self.kf.P[5:,5:] *= 10.
self.kf.P *= 10
self.kf.P[10:,10:] *= 10.
self.kf.Q *= 0.1
self.kf.Q[5:,5:] *= 0.1
self.kf.x[:5] = rect
# KalmanBoxTracker parameters
self.distance = dist
self.bound = boundary
self.time_since_update = 0
self.tree_id = KalmanTracker.tree
KalmanTracker.tree += 1
self.id = KalmanTracker.count
KalmanTracker.count += 1
self.parent = -1
self.history = []
self.prediction_history = []
self.hits = 0
self.age = 0
self.start = start
self.last_state = rect
self.reverse_state = None
self.detections = [np.concatenate(([start], rect.flatten()))]
self.areas = [[area]]
self.division = -1
self.summary = None
[docs] def update(self, rect, frame, area):
"""
Update the filter and filter history.
"""
self.time_since_update = 0
self.reverse_state = self.last_state
self.last_state = rect
self.prediction_history.extend(self.history)
self.history = []
self.hits += 1
self.kf.P *= 1.01
self.kf.update(rect)
self.detections.append(np.concatenate(([frame], rect.flatten())))
self.areas.append([area])
[docs] def predict(self):
"""
Predict next state from filter while keeping boundary conditions of
image, maximum distance, rotation angle < pi, and area > 0.
"""
self.age += 1
self.time_since_update += 1
if self.time_since_update > 5:
self.history.append(self.history[-1])
return self.history[-1]
if self.time_since_update > 1:
if np.sum(
(self.last_state[:2].reshape(-1) -
self.history[-1][:2].reshape(-1))**2
) > self.distance**2:
self.history.append(self.history[-1])
return self.history[-1]
if ((self.kf.x[4]+self.kf.x[9])<=0):
self.kf.x[4] += np.pi
elif ((self.kf.x[4]+self.kf.x[9])>np.pi):
self.kf.x[4] -= np.pi
if ((self.kf.x[2]+self.kf.x[7])<=0):
self.kf.x[7] *= 0.0
if ((self.kf.x[3]+self.kf.x[8])<=0):
self.kf.x[8] *= 0.0
self.kf.predict()
if ((0<=self.kf.x[0]<self.bound[0]) and (0<=self.kf.x[1]<self.bound[1])
and np.sum(
(self.last_state[:2].reshape(-1) -
self.kf.x[:2].reshape(-1))**2) < self.distance**2
):
self.history.append(self.kf.x)
else:
if len(self.history) > 0:
self.history.append(self.history[-1])
else:
x = np.concatenate(
[self.last_state, np.zeros(self.kf.x[5:].shape)])
self.history.append(x)
return self.history[-1]
[docs] def get_state(self):
"""
Return the current bounding box estimate.
"""
return self.kf.x
[docs] def get_predictions(self):
"""
Return prediciton history.
"""
return self.prediction_history
[docs] def summarize(self):
"""
Compiles summary of the tracker object with all relevant information to
transform into a dataframe. Perform the side tracking based on minimum
difference of adjacent rotation angles.
Returns
-------
array
Summary of the tracker.
"""
dets = np.stack(self.detections)
rot = dets[:,5]
dist = np.abs(np.arctan2(np.sin(rot[:-1]-rot[1:]), np.cos(rot[:-1]-rot[1:])))
side = np.empty_like(rot)
side[0] = rot[0]
side = rot.reshape((-1))
mask = dist > np.pi/2
mask = np.cumsum(mask.astype(int))*np.pi
side[1:] += mask
side[side > np.pi] %= 2*np.pi
length = len(dets[dets[:,0]>=self.start])
general_columns = np.tile(
[self.tree_id, self.id, self.start, self.parent, self.division, 0],
(length,1))
self.summary = np.concatenate((
general_columns, side[-length:].reshape((-1,1)), dets[-length:],
self.areas[-length:]
), axis=1)
return self.summary
[docs]class DiatomTrack:
"""
This class performs the spatial tracking of diatom objects using Kalman
filter objects and also tracks the long sides / thecae of the objects.
"""
def __init__(
self, max_age=200, division_age=1000, gap_distance=200,
split_distance=50, offset=80):
"""
Initalize the tracking parameters.
Parameters
----------
max_age : int, optional
Maximum frames an object may be undetected. The default is 200.
division_age : int, optional
The minimal frames an object has to be tracked before division. The
default is 1000.
gap_distance : int, optional
The maximum distance for matching after detection gap. The default
is 200.
split_distance : int, optional
The maximum distance to consider splitting. The default is 50.
offset : int, optional
The maximum distance in Euclidean tracking. The default is 80.
Returns
-------
None.
"""
self.max_age = max_age
self.division_age = division_age
self.gap_distance = gap_distance
self.split_dist = split_distance
self.offset = offset
self.trackers = []
self.expired_trackers = []
self.frame_count = 0
self.summary = None
[docs] def update(self, dets=np.empty((0, 5)), areas=[]):
"""
Perform the tracking on the data from the next frame and archive
expired trackers.
Parameters
----------
dets : array, optional
The data of detected bounding boxes. The default is
np.empty((0, 5)).
areas : lilst, optional
The areas of the detections. The default is [].
Returns
-------
None.
"""
self.frame_count += 1
trks = np.zeros((len(self.trackers), 5))
trks_remain = []
taken = []
for t, trk in enumerate(trks):
pos = self.trackers[t].predict().reshape((-1))
trk[:] = [pos[0], pos[1], pos[2], pos[3], pos[4]]
if 0 < sum(abs(self.trackers[t].get_state()[5:7])) < 4:
trks_remain.append(trk)
taken.append(t)
trks_remain = np.array(trks_remain)
matched, unmatched_dets, unmatched_trks = associate(
dets, trks_remain)
matched[:,1] = np.array(taken)[matched[:,1]]
unmatched_trks = np.delete(np.arange(len(trks)), matched[:,1])
i = 0
while len(unmatched_trks) > 0 and len(unmatched_dets) > 0:
func, distance = self._secondary_match(i)
if func:
matched, unmatched_dets, unmatched_trks = func(
dets, unmatched_dets, matched, unmatched_trks, distance,
trks, areas=areas)
else:
break
i += 1
for m in matched:
self.trackers[m[1]].update(
dets[m[0]].reshape((5,1)), self.frame_count, areas[m[0]])
for i in unmatched_dets:
trk = KalmanTracker(
dets[i].reshape((5,1)), self.frame_count, areas[i],
self.gap_distance)
self.trackers.append(trk)
self._remove_trackers()
def _secondary_match(self, i):
"""
Iterate over the different matching functions.
"""
funcs = [
self._match_prediction, self._match_last_state,
self._match_reversed, self._match_prediction,
self._match_last_state, self._match_reversed,
self._match_split, self._match_last_state]
dist = [
None, None, None,
self.offset**2, self.offset**2, self.offset**2,
None, self.gap_distance**2]
if i >= len(dist):
return False, None
return funcs[i], dist[i]
def _match_prediction(
self, dets, unmatched_dets, matched, unmatched_trks, distance,
trks, areas):
"""
Match predictions with detections.
"""
to_match_trks = trks[unmatched_trks]
return associate_secondary(
matched, dets, unmatched_dets, to_match_trks, unmatched_trks,
distance)
def _match_last_state(self, dets, unmatched_dets, matched, unmatched_trks,
distance, trks, areas):
"""
Match last updated locations with detections
"""
last_pos_trks = np.zeros((len(unmatched_trks), 5))
for t, trk in zip(unmatched_trks, last_pos_trks):
pos = self.trackers[t].last_state.reshape((-1))
trk[:] = [pos[0], pos[1], pos[2], pos[3], pos[4]]
return associate_secondary(
matched, dets, unmatched_dets, last_pos_trks, unmatched_trks,
distance)
def _match_reversed(self, dets, unmatched_dets, matched, unmatched_trks,
distance, trks, areas):
"""
Match second last updated location with detections.
"""
reversed_trks = np.zeros((len(unmatched_trks), 5))
for t, trk in zip(unmatched_trks, reversed_trks):
pos = self.trackers[t].reverse_state
if pos is None:
trk[:] = [0, 0, 0, 0, 0]
else:
pos = pos.reshape((-1))
trk[:] = [pos[0], pos[1], pos[2], pos[3], pos[4]]
return associate_secondary(
matched, dets, unmatched_dets, reversed_trks, unmatched_trks, distance)
def _match_split(self, dets, unmatched_dets, matched, unmatched_trks,
distance, trks, areas):
"""
Match duplicates of trackers with detections. Archive matched old
trackers and create two duplicates in place for successful division.
"""
split_trks = np.zeros((len(self.trackers), 5))
for t, trk in enumerate(split_trks):
if (
self.trackers[t].age > self.division_age or
self.trackers[t].parent == -1
):
pos = self.trackers[t].last_state.reshape((-1))
trk[:] = [pos[0], pos[1], pos[2], pos[3], pos[4]]
else:
trk[:] = [-self.split_dist, -self.split_dist, 0, 0, 0]
split_matched, split_unmatched_dets, _ = associate(
dets[unmatched_dets], split_trks, distance=self.split_dist**2)
for m in split_matched:
t = copy.deepcopy(self.trackers[m[1]])
t.division = self.frame_count
parent = t.id
self.expired_trackers.append(t)
self.trackers[m[1]].parent = parent
self.trackers[m[1]].start = self.frame_count
self.trackers[m[1]].hits = 0
self.trackers[m[1]].age = 0
self.trackers[m[1]].id = KalmanTracker.count
KalmanTracker.count += 1
split_trackers = []
for i, m in enumerate(split_matched):
split_trackers.append(
copy.deepcopy(self.trackers[m[1]]))
for m, t in zip(split_matched, split_trackers):
t.update(
dets[unmatched_dets[m[0]]].reshape((5,1)), self.frame_count,
areas[m[0]])
t.id = KalmanTracker.count
KalmanTracker.count += 1
self.trackers.append(t)
return matched, unmatched_dets[split_unmatched_dets], unmatched_trks
def _remove_trackers(self):
"""
Remove expired or short-lived trackers.
"""
i = len(self.trackers)
for trk in reversed(self.trackers):
i -= 1
if (
(trk.time_since_update > self.max_age) or
(trk.hits < 20 and trk.time_since_update > 20) or
(100 < trk.hits < trk.time_since_update)
):
self.expired_trackers.append(self.trackers[i])
self.trackers.pop(i)
def _match_rect_long_side(self):
"""
Match the long side / thecae over splitting events by comparing the
center location of the next generation with the center location of the
previous generation and then estimating the outside of the closer
object with the relative shift. Subsequently assign the other objects
older theca.
"""
self.summary.sort(key = lambda x: x[0][1])
divisions = {
f'{int(t[0][3])}': [] for t in self.summary if t[0][3] != -1}
for t in self.summary:
if t[0][3] != -1:
divisions[str(int(t[0][3]))].append(int(t[0][1]))
for parent, children in divisions.items():
c1, c2 = children
parent = int(parent)
if len(self.summary[parent])>20:
ppos = np.mean(
self.summary[parent][-110:-10, 7:9], axis=0).reshape((-1))
pre_angle = np.mean(self.summary[parent][-110:-10, -2])
else:
ppos = np.mean(
self.summary[parent][:, 7:9], axis=0).reshape((-1))
pre_angle = np.mean(self.summary[parent][:, -2])
pre_vec = np.array([np.cos(pre_angle), np.sin(pre_angle)])
cpos1 = self.summary[c1][0, 7:9]
cpos2 = self.summary[c2][0, 7:9]
dist = [euclid(cpos1, ppos), euclid(cpos2, ppos)]
ref = np.argmin(dist)
dist_vec = (cpos1, cpos2)[ref] - ppos
dot_dist = np.dot(dist_vec, pre_vec)
if (ref == 0 and dot_dist >= 0) or (dot_dist < 0 and ref != 0):
self.summary[c2][:,6] += np.pi
self.summary[c2][:,6] %= 2*np.pi
else:
self.summary[c1][:,6] += np.pi
self.summary[c1][:,6] %= 2*np.pi
if self.summary[parent][0][3] != -1:
angle = self.summary[children[ref]][0,6]
ortho_vec = np.array([np.cos(angle), np.sin(angle)])
dot_rot = np.dot(ortho_vec, pre_vec)
if (ref == 0 and dot_rot >= 0) or (dot_rot < 0 and ref != 0):
self.summary[c2][:,5] = 2
self.summary[c1][:,5] = 1
else:
self.summary[c1][:,5] = 2
self.summary[c2][:,5] = 1
[docs] def get_state(self):
"""
Return current states of all active trackers.
Returns
-------
state : list
States of all active trackers.
"""
state = []
for trk in self.trackers:
state.append((trk.get_state(), trk.time_since_update, trk.tree_id))
return state
[docs] def summarize(self):
"""
Compile the tracking results and match the long sides / theca over
generations.
Returns
-------
list
Summary of all tracking results.
"""
self.summary = [t.summarize() for t in self.trackers]
summary = [t.summarize() for t in self.expired_trackers]
self.summary.extend(summary)
self._match_rect_long_side()
return self.summary
[docs]def track(data, age, offset, split, gap):
"""
Track diatoms and theca over multiple generations using DiatomTrack.
Parameters
----------
data : list
The rotated rectangle data for each frame.
age : int
Maximum frames an object may be undetected.
offset : int
The maximum distance in Euclidean tracking.
split : int
The maximum distance to consider splitting.
gap : int
The maximum distance for matching after detection gap.
Returns
-------
output : tuple
Track data and trackers summarys.
Tracker : DiatomTrack object
The DiatomTrack object used to track the objects.
"""
Tracker = DiatomTrack(max_age=age, gap_distance=gap, split_distance=split, offset=offset)
tracks = []
output = None
ray.init()
try:
for frame in tqdm(range(len(data)), desc='Tracking frame to frame'):
name, frame_data, frame_areas = data[frame]
rects = np.array([d[:5] for d in frame_data])
Tracker.update(rects, frame_areas)
tracks.append([name, Tracker.get_state()])
finally:
ray.shutdown()
output = (tracks, Tracker.summarize())
return output, Tracker
[docs]def tracks_to_dataframe(array_list):
"""
Transform list of DiatomTrack output into a readable Pandas dataframe.
Parameters
----------
array_list : list
Arrays containing tracking data.
Returns
-------
dataframe : Pandas dataframe
The data in dataframe format.
"""
array = np.vstack(array_list)
dataframe = pd.DataFrame(array, columns=[
'tree', 'id', 'start', 'parent', 'division', 'age', 'side',
'frame', 'x', 'y', 'width', 'height', 'rotation', 'area'])
dataframe['show'] = 1
dataframe['frame'] -= 1
dataframe = dataframe.astype({
'frame': 'int', 'tree': 'int', 'id': 'int', 'start': 'int',
'parent': 'int', 'division': 'int'})
dataframe.sort_values(by=['frame', 'id'], axis=0, inplace=True)
dataframe.set_index('frame', inplace=True, drop=False)
dataframe.rename_axis('f', inplace=True)
dataframe['row'] = range(len(dataframe))
dataframe.set_index('row', inplace=True, drop=True, append=True)
return dataframe