Source code for DiatomTrack.core.assignment

import ray

import cv2 as cv
import numpy as np

from scipy.optimize import linear_sum_assignment
from ..utils import contours


[docs]def linear_assignment(cost_matrix): # Solve LAP x, y = linear_sum_assignment(cost_matrix, maximize=True) return np.array(list(zip(x, y)))
[docs]def iou_batch(trackers, detections): """ Calculate the IoU cost matrix for trackers and detections. Parameters ---------- trackers : array Tracker data. detections : array Detection data. Returns ------- array IoU cost matrix for all tracker and detection combinations. """ trks = np.repeat(trackers, len(detections), axis=0).reshape( (-1,len(detections),5)) dets = np.tile(detections, (len(trackers),1)).reshape((len(trackers),-1,5)) input_matrix = np.concatenate((trks, dets), axis=-1) # calculate row-wise via ray refs = [] for row in input_matrix: row_ref = calculate_row.remote(row) refs.append(row_ref) iou_matrix = [] for row in ray.get(refs): iou_matrix.append(row) return np.array(iou_matrix)
[docs]def euclid_batch(trackers, detections, distance): """ Calculate the Euclidean cost matrix for trackers and detections. Parameters ---------- trackers : array Tracker data. detections : array Detection data. distance : int Cutoff distance. Returns ------- array Euclidean cost matrix for all tracker and detection combinations. """ trks = np.repeat(trackers, len(detections), axis=0).reshape( (-1,len(detections),5)) dets = np.tile(detections, (len(trackers),1)).reshape((len(trackers),-1,5)) input_matrix = np.concatenate((trks, dets), axis=-1) trk_coord = input_matrix[:,:,[0,1]] det_coord = input_matrix[:,:,[5,6]] iou_matrix = np.sum((trk_coord - det_coord)**2, axis=2) iou_matrix /= distance iou_matrix[iou_matrix > 1] = 1 return 1 - iou_matrix
@ray.remote def calculate_row(array): # Caculate IoU along a row in parallel with ray row = np.apply_along_axis(calculate_iou, axis=1, arr=array) return row
[docs]def calculate_iou(array): """ Caculate IoU between two rotated rectangles in OpenCV's format. Parameters ---------- array : array The two rotated rectangles concatenated. Returns ------- float The IoU of the rectangles. """ if np.array_equal(array[:5], array[5:]): return 1. r_ht = contours.convert_array_to_rectangle(array[:5]) r_gt = contours.convert_array_to_rectangle(array[5:]) intersect = cv.rotatedRectangleIntersection(r_ht, r_gt) # there are two possibilities to intersect: intersection(=1) and enclosed(=2) if intersect[0] > 0: intersection = cv.contourArea(intersect[1]) union = sum(array[2:4]*array[7:9]) - intersection return intersection / union return 0.
[docs]def associate(detections, trackers, distance=None): """ Assign detections to trackers. Parameters ---------- detections : array The detection data. trackers : array The tracker data. distance : int, optional The cutoff Euclidean distance. The default is None. Returns ------- matched : array Indices of matched detections and trackers. unmatched_dets : array Indices of unmatched detections. unmatched_trks : array Indices of unmatched trackers. """ if(len(trackers)==0): return ( np.empty((0,2),dtype=int), np.arange(len(detections)), np.empty((0,5),dtype=int)) if not distance: iou_matrix_full = iou_batch(detections, trackers) else: iou_matrix_full = euclid_batch(detections, trackers, distance) # sort out rows and columns that are 0 and lead to wrong assignments trk_idx = np.argwhere(np.all(iou_matrix_full[...,:] == 0, axis=0)) det_idx = np.argwhere(np.all(iou_matrix_full[...,:] == 0, axis=1)) iou_matrix = np.delete(np.delete(iou_matrix_full, det_idx, axis=0), trk_idx, axis=1) new_det_idx = [i for i in range(iou_matrix_full.shape[0]) if i not in det_idx] new_trk_idx = [i for i in range(iou_matrix_full.shape[1]) if i not in trk_idx] if min(iou_matrix.shape) > 0: a = (iou_matrix > 0.5).astype(np.int32) # check whether there is only one good overlap pairwise if ( a.sum(1).max() == 1 and a.sum(0).max() == 1 and a.sum() == a.shape[0] ): matched_indices = np.stack(np.where(a), axis=1) else: matched_indices = linear_assignment(iou_matrix) else: matched_indices = np.empty(shape=(0,2)).astype(int) matched_indices = matched_indices[iou_matrix[tuple(matched_indices.T)] > 0] matched_indices[:, 0] = np.array(new_det_idx)[matched_indices[:, 0]] matched_indices[:, 1] = np.array(new_trk_idx)[matched_indices[:, 1]] unmatched_detections = [] for d in range(len(detections)): if (d not in matched_indices[:,0]): unmatched_detections.append(d) unmatched_trackers = [] for t, trk in enumerate(trackers): if (t not in matched_indices[:,1]): unmatched_trackers.append(t) matches = [] for m in matched_indices: matches.append(m.reshape(1,2)) if (len(matches)==0): matches = np.empty((0,2),dtype=int) else: matches = np.concatenate(matches,axis=0) return ( matches.astype(np.uint16), np.array(unmatched_detections, dtype=np.uint16), np.array(unmatched_trackers, dtype=np.uint16))
[docs]def associate_secondary( matched, dets, unmatched_dets, tracks, unmatched_trks, distance=None): """ Assign detections to trackers after filtering for already matched data. Parameters ---------- matched : array Indices of matched detections and trackers. dets : array The detection data. unmatched_dets : array Indices of unmatched detections. tracks : array The tracker data. unmatched_trks : array Indices of unmatched trackers. distance : int, optional The cutoff Euclidean distance. The default is None. Returns ------- matched : array Updated indices of matched detections and trackers. unmatched_dets : array Updated indices of unmatched detections. unmatched_trks : array Updated indices of unmatched trackers. """ to_match_dets = dets[unmatched_dets] new_matched, unmatched_to_match_dets, unmatched_to_match_trks = associate( to_match_dets, tracks, distance) new_matched[:,0] = unmatched_dets[new_matched[:,0]] new_matched[:,1] = unmatched_trks[new_matched[:,1]] matched = np.concatenate((matched, new_matched)) unmatched_dets = unmatched_dets[unmatched_to_match_dets] unmatched_trks = unmatched_trks[unmatched_to_match_trks] return matched, unmatched_dets, unmatched_trks