import ray
import cv2 as cv
import numpy as np
from scipy.optimize import linear_sum_assignment
from ..utils import contours
[docs]def linear_assignment(cost_matrix):
# Solve LAP
x, y = linear_sum_assignment(cost_matrix, maximize=True)
return np.array(list(zip(x, y)))
[docs]def iou_batch(trackers, detections):
"""
Calculate the IoU cost matrix for trackers and detections.
Parameters
----------
trackers : array
Tracker data.
detections : array
Detection data.
Returns
-------
array
IoU cost matrix for all tracker and detection combinations.
"""
trks = np.repeat(trackers, len(detections), axis=0).reshape(
(-1,len(detections),5))
dets = np.tile(detections, (len(trackers),1)).reshape((len(trackers),-1,5))
input_matrix = np.concatenate((trks, dets), axis=-1)
# calculate row-wise via ray
refs = []
for row in input_matrix:
row_ref = calculate_row.remote(row)
refs.append(row_ref)
iou_matrix = []
for row in ray.get(refs):
iou_matrix.append(row)
return np.array(iou_matrix)
[docs]def euclid_batch(trackers, detections, distance):
"""
Calculate the Euclidean cost matrix for trackers and detections.
Parameters
----------
trackers : array
Tracker data.
detections : array
Detection data.
distance : int
Cutoff distance.
Returns
-------
array
Euclidean cost matrix for all tracker and detection combinations.
"""
trks = np.repeat(trackers, len(detections), axis=0).reshape(
(-1,len(detections),5))
dets = np.tile(detections, (len(trackers),1)).reshape((len(trackers),-1,5))
input_matrix = np.concatenate((trks, dets), axis=-1)
trk_coord = input_matrix[:,:,[0,1]]
det_coord = input_matrix[:,:,[5,6]]
iou_matrix = np.sum((trk_coord - det_coord)**2, axis=2)
iou_matrix /= distance
iou_matrix[iou_matrix > 1] = 1
return 1 - iou_matrix
@ray.remote
def calculate_row(array):
# Caculate IoU along a row in parallel with ray
row = np.apply_along_axis(calculate_iou, axis=1, arr=array)
return row
[docs]def calculate_iou(array):
"""
Caculate IoU between two rotated rectangles in OpenCV's format.
Parameters
----------
array : array
The two rotated rectangles concatenated.
Returns
-------
float
The IoU of the rectangles.
"""
if np.array_equal(array[:5], array[5:]):
return 1.
r_ht = contours.convert_array_to_rectangle(array[:5])
r_gt = contours.convert_array_to_rectangle(array[5:])
intersect = cv.rotatedRectangleIntersection(r_ht, r_gt)
# there are two possibilities to intersect: intersection(=1) and enclosed(=2)
if intersect[0] > 0:
intersection = cv.contourArea(intersect[1])
union = sum(array[2:4]*array[7:9]) - intersection
return intersection / union
return 0.
[docs]def associate(detections, trackers, distance=None):
"""
Assign detections to trackers.
Parameters
----------
detections : array
The detection data.
trackers : array
The tracker data.
distance : int, optional
The cutoff Euclidean distance. The default is None.
Returns
-------
matched : array
Indices of matched detections and trackers.
unmatched_dets : array
Indices of unmatched detections.
unmatched_trks : array
Indices of unmatched trackers.
"""
if(len(trackers)==0):
return (
np.empty((0,2),dtype=int), np.arange(len(detections)),
np.empty((0,5),dtype=int))
if not distance:
iou_matrix_full = iou_batch(detections, trackers)
else:
iou_matrix_full = euclid_batch(detections, trackers, distance)
# sort out rows and columns that are 0 and lead to wrong assignments
trk_idx = np.argwhere(np.all(iou_matrix_full[...,:] == 0, axis=0))
det_idx = np.argwhere(np.all(iou_matrix_full[...,:] == 0, axis=1))
iou_matrix = np.delete(np.delete(iou_matrix_full, det_idx, axis=0), trk_idx, axis=1)
new_det_idx = [i for i in range(iou_matrix_full.shape[0]) if i not in det_idx]
new_trk_idx = [i for i in range(iou_matrix_full.shape[1]) if i not in trk_idx]
if min(iou_matrix.shape) > 0:
a = (iou_matrix > 0.5).astype(np.int32)
# check whether there is only one good overlap pairwise
if (
a.sum(1).max() == 1 and a.sum(0).max() == 1 and
a.sum() == a.shape[0]
):
matched_indices = np.stack(np.where(a), axis=1)
else:
matched_indices = linear_assignment(iou_matrix)
else:
matched_indices = np.empty(shape=(0,2)).astype(int)
matched_indices = matched_indices[iou_matrix[tuple(matched_indices.T)] > 0]
matched_indices[:, 0] = np.array(new_det_idx)[matched_indices[:, 0]]
matched_indices[:, 1] = np.array(new_trk_idx)[matched_indices[:, 1]]
unmatched_detections = []
for d in range(len(detections)):
if (d not in matched_indices[:,0]):
unmatched_detections.append(d)
unmatched_trackers = []
for t, trk in enumerate(trackers):
if (t not in matched_indices[:,1]):
unmatched_trackers.append(t)
matches = []
for m in matched_indices:
matches.append(m.reshape(1,2))
if (len(matches)==0):
matches = np.empty((0,2),dtype=int)
else:
matches = np.concatenate(matches,axis=0)
return (
matches.astype(np.uint16),
np.array(unmatched_detections, dtype=np.uint16),
np.array(unmatched_trackers, dtype=np.uint16))
[docs]def associate_secondary(
matched, dets, unmatched_dets, tracks, unmatched_trks, distance=None):
"""
Assign detections to trackers after filtering for already matched data.
Parameters
----------
matched : array
Indices of matched detections and trackers.
dets : array
The detection data.
unmatched_dets : array
Indices of unmatched detections.
tracks : array
The tracker data.
unmatched_trks : array
Indices of unmatched trackers.
distance : int, optional
The cutoff Euclidean distance. The default is None.
Returns
-------
matched : array
Updated indices of matched detections and trackers.
unmatched_dets : array
Updated indices of unmatched detections.
unmatched_trks : array
Updated indices of unmatched trackers.
"""
to_match_dets = dets[unmatched_dets]
new_matched, unmatched_to_match_dets, unmatched_to_match_trks = associate(
to_match_dets, tracks, distance)
new_matched[:,0] = unmatched_dets[new_matched[:,0]]
new_matched[:,1] = unmatched_trks[new_matched[:,1]]
matched = np.concatenate((matched, new_matched))
unmatched_dets = unmatched_dets[unmatched_to_match_dets]
unmatched_trks = unmatched_trks[unmatched_to_match_trks]
return matched, unmatched_dets, unmatched_trks