init commit

This commit is contained in:
2025-11-08 19:15:39 +01:00
parent ecffcb08e8
commit c7adacf53b
470 changed files with 73751 additions and 0 deletions

View File

@@ -0,0 +1,295 @@
<a href="https://www.ultralytics.com/" target="_blank"><img src="https://raw.githubusercontent.com/ultralytics/assets/main/logo/Ultralytics_Logotype_Original.svg" width="320" alt="Ultralytics logo"></a>
# Multi-Object Tracking with Ultralytics YOLO
<img width="1024" src="https://user-images.githubusercontent.com/26833433/243418637-1d6250fd-1515-4c10-a844-a32818ae6d46.png" alt="Ultralytics YOLO trackers visualization">
[Object tracking](https://www.ultralytics.com/glossary/object-tracking), a key aspect of [video analytics](https://en.wikipedia.org/wiki/Video_content_analysis), involves identifying the location and class of objects within video frames and assigning a unique ID to each detected object as it moves. This capability enables a wide range of applications, from surveillance and security systems to [real-time](https://www.ultralytics.com/glossary/real-time-inference) sports analysis and autonomous vehicle navigation. Learn more about tracking on our [tracking documentation page](https://docs.ultralytics.com/modes/track/).
## 🎯 Why Choose Ultralytics YOLO for Object Tracking?
Ultralytics YOLO trackers provide output consistent with standard [object detection](https://docs.ultralytics.com/tasks/detect/) but add persistent object IDs. This simplifies the process of tracking objects in video streams and performing subsequent analyses. Heres why Ultralytics YOLO is an excellent choice for your object tracking needs:
- **Efficiency:** Process video streams in real-time without sacrificing accuracy.
- **Flexibility:** Supports multiple robust tracking algorithms and configurations.
- **Ease of Use:** Offers straightforward [Python API](https://docs.ultralytics.com/usage/python/) and [CLI](https://docs.ultralytics.com/usage/cli/) options for rapid integration and deployment.
- **Customizability:** Easily integrates with [custom-trained YOLO models](https://docs.ultralytics.com/modes/train/), enabling deployment in specialized, domain-specific applications.
**Watch:** Object Detection and Tracking with Ultralytics YOLOv8.
[![Watch the video](https://user-images.githubusercontent.com/26833433/244171528-66a4a68d-cb85-466a-984a-34301616b7a3.png)](https://www.youtube.com/watch?v=hHyHmOtmEgs)
## ✨ Features at a Glance
Ultralytics YOLO extends its powerful object detection features to deliver robust and versatile object tracking:
- **Real-Time Tracking:** Seamlessly track objects in high-frame-rate videos.
- **Multiple Tracker Support:** Choose from a selection of established tracking algorithms.
- **Customizable Tracker Configurations:** Adapt the tracking algorithm to specific requirements by adjusting various parameters.
## 🛠️ Available Trackers
Ultralytics YOLO supports the following tracking algorithms. Enable them by passing the relevant YAML configuration file, such as `tracker=tracker_type.yaml`:
- **BoT-SORT:** Use [`botsort.yaml`](https://github.com/ultralytics/ultralytics/blob/main/ultralytics/cfg/trackers/botsort.yaml) to enable this tracker. Based on the [BoT-SORT paper](https://arxiv.org/abs/2206.14651) and its official [code implementation](https://github.com/NirAharon/BoT-SORT).
- **ByteTrack:** Use [`bytetrack.yaml`](https://github.com/ultralytics/ultralytics/blob/main/ultralytics/cfg/trackers/bytetrack.yaml) to enable this tracker. Based on the [ByteTrack paper](https://arxiv.org/abs/2110.06864) and its official [code implementation](https://github.com/FoundationVision/ByteTrack).
The default tracker is **BoT-SORT**.
## ⚙️ Usage
To run the tracker on video streams, use a trained Detect, Segment, or Pose model like [Ultralytics YOLO11n](https://docs.ultralytics.com/models/yolo11/), YOLO11n-seg, or YOLO11n-pose.
```python
# Python
from ultralytics import YOLO
# Load an official or custom model
model = YOLO("yolo11n.pt") # Load an official Detect model
# model = YOLO("yolo11n-seg.pt") # Load an official Segment model
# model = YOLO("yolo11n-pose.pt") # Load an official Pose model
# model = YOLO("path/to/best.pt") # Load a custom trained model
# Perform tracking with the model
results = model.track(source="https://youtu.be/LNwODJXcvt4", show=True) # Tracking with default tracker
# results = model.track(source="https://youtu.be/LNwODJXcvt4", show=True, tracker="bytetrack.yaml") # Tracking with ByteTrack tracker
```
```bash
# CLI
# Perform tracking with various models using the command line interface
yolo track model=yolo11n.pt source="https://youtu.be/LNwODJXcvt4" # Official Detect model
# yolo track model=yolo11n-seg.pt source="https://youtu.be/LNwODJXcvt4" # Official Segment model
# yolo track model=yolo11n-pose.pt source="https://youtu.be/LNwODJXcvt4" # Official Pose model
# yolo track model=path/to/best.pt source="https://youtu.be/LNwODJXcvt4" # Custom trained model
# Track using ByteTrack tracker
# yolo track model=path/to/best.pt tracker="bytetrack.yaml"
```
As shown above, tracking is available for all [Detect](https://docs.ultralytics.com/tasks/detect/), [Segment](https://docs.ultralytics.com/tasks/segment/), and [Pose](https://docs.ultralytics.com/tasks/pose/) models when run on videos or streaming sources.
## 🔧 Configuration
### Tracking Arguments
Tracking configuration shares properties with the Predict mode, such as `conf` (confidence threshold), `iou` ([Intersection over Union](https://www.ultralytics.com/glossary/intersection-over-union-iou) threshold), and `show` (display results). For additional configurations, refer to the [Predict mode documentation](https://docs.ultralytics.com/modes/predict/).
```python
# Python
from ultralytics import YOLO
# Configure the tracking parameters and run the tracker
model = YOLO("yolo11n.pt")
results = model.track(source="https://youtu.be/LNwODJXcvt4", conf=0.3, iou=0.5, show=True)
```
```bash
# CLI
# Configure tracking parameters and run the tracker using the command line interface
yolo track model=yolo11n.pt source="https://youtu.be/LNwODJXcvt4" conf=0.3 iou=0.5 show
```
### Tracker Selection
Ultralytics allows you to use a modified tracker configuration file. Create a copy of a tracker config file (e.g., `custom_tracker.yaml`) from [ultralytics/cfg/trackers](https://github.com/ultralytics/ultralytics/tree/main/ultralytics/cfg/trackers) and adjust any configurations (except `tracker_type`) according to your needs.
```python
# Python
from ultralytics import YOLO
# Load the model and run the tracker with a custom configuration file
model = YOLO("yolo11n.pt")
results = model.track(source="https://youtu.be/LNwODJXcvt4", tracker="custom_tracker.yaml")
```
```bash
# CLI
# Load the model and run the tracker with a custom configuration file using the command line interface
yolo track model=yolo11n.pt source="https://youtu.be/LNwODJXcvt4" tracker='custom_tracker.yaml'
```
For a comprehensive list of tracking arguments, consult the [Tracking Configuration files](https://github.com/ultralytics/ultralytics/tree/main/ultralytics/cfg/trackers) in the repository.
## 🐍 Python Examples
### Persisting Tracks Loop
This Python script uses [OpenCV (`cv2`)](https://opencv.org/) and Ultralytics YOLO11 to perform object tracking on video frames. Ensure you have installed the necessary packages (`opencv-python` and `ultralytics`). The [`persist=True`](https://docs.ultralytics.com/modes/predict/#tracking) argument indicates that the current frame is the next in a sequence, allowing the tracker to maintain track continuity from the previous frame.
```python
# Python
import cv2
from ultralytics import YOLO
# Load the YOLO11 model
model = YOLO("yolo11n.pt")
# Open the video file
video_path = "path/to/video.mp4"
cap = cv2.VideoCapture(video_path)
# Loop through the video frames
while cap.isOpened():
# Read a frame from the video
success, frame = cap.read()
if success:
# Run YOLO11 tracking on the frame, persisting tracks between frames
results = model.track(frame, persist=True)
# Visualize the results on the frame
annotated_frame = results[0].plot()
# Display the annotated frame
cv2.imshow("YOLO11 Tracking", annotated_frame)
# Break the loop if 'q' is pressed
if cv2.waitKey(1) & 0xFF == ord("q"):
break
else:
# Break the loop if the end of the video is reached
break
# Release the video capture object and close the display window
cap.release()
cv2.destroyAllWindows()
```
Note the use of `model.track(frame)` instead of `model(frame)`, which specifically enables object tracking. This script processes each video frame, visualizes the tracking results, and displays them. Press 'q' to exit the loop.
### Plotting Tracks Over Time
Visualizing object tracks across consecutive frames offers valuable insights into movement patterns within a video. Ultralytics YOLO11 makes plotting these tracks efficient.
The following example demonstrates how to use YOLO11's tracking capabilities to plot the movement of detected objects. The script opens a video, reads it frame by frame, and uses the YOLO model built on [PyTorch](https://pytorch.org/) to identify and track objects. By storing the center points of the detected [bounding boxes](https://www.ultralytics.com/glossary/bounding-box) and connecting them, we can draw lines representing the paths of tracked objects using [NumPy](https://numpy.org/) for numerical operations.
```python
# Python
from collections import defaultdict
import cv2
import numpy as np
from ultralytics import YOLO
# Load the YOLO11 model
model = YOLO("yolo11n.pt")
# Open the video file
video_path = "path/to/video.mp4"
cap = cv2.VideoCapture(video_path)
# Store the track history
track_history = defaultdict(lambda: [])
# Loop through the video frames
while cap.isOpened():
# Read a frame from the video
success, frame = cap.read()
if success:
# Run YOLO11 tracking on the frame, persisting tracks between frames
result = model.track(frame, persist=True)[0]
# Get the boxes and track IDs
if result.boxes and result.boxes.is_track:
boxes = result.boxes.xywh.cpu()
track_ids = result.boxes.id.int().cpu().tolist()
# Visualize the result on the frame
frame = result.plot()
# Plot the tracks
for box, track_id in zip(boxes, track_ids):
x, y, w, h = box
track = track_history[track_id]
track.append((float(x), float(y))) # x, y center point
if len(track) > 30: # retain 30 tracks for 30 frames
track.pop(0)
# Draw the tracking lines
points = np.hstack(track).astype(np.int32).reshape((-1, 1, 2))
cv2.polylines(frame, [points], isClosed=False, color=(230, 230, 230), thickness=10)
# Display the annotated frame
cv2.imshow("YOLO11 Tracking", frame)
# Break the loop if 'q' is pressed
if cv2.waitKey(1) & 0xFF == ord("q"):
break
else:
# Break the loop if the end of the video is reached
break
# Release the video capture object and close the display window
cap.release()
cv2.destroyAllWindows()
```
### Multithreaded Tracking
Multithreaded tracking allows running object tracking on multiple video streams simultaneously, which is highly beneficial for systems handling inputs from several cameras, improving efficiency through concurrent processing.
This Python script utilizes Python's [`threading`](https://docs.python.org/3/library/threading.html) module for concurrent tracker execution. Each thread manages tracking for a single video file.
The `run_tracker_in_thread` function accepts parameters like the video file path, model, and a unique window index. It contains the main tracking loop, reading frames, running the tracker, and displaying results in a dedicated window.
This example uses two models, `yolo11n.pt` and `yolo11n-seg.pt`, tracking objects in `video_file1` and `video_file2`, respectively.
Setting `daemon=True` in `threading.Thread` ensures threads exit when the main program finishes. Threads are started with `start()` and the main thread waits for their completion using `join()`.
Finally, `cv2.destroyAllWindows()` closes all OpenCV windows after the threads finish.
```python
# Python
import threading
import cv2
from ultralytics import YOLO
# Define model names and video sources
MODEL_NAMES = ["yolo11n.pt", "yolo11n-seg.pt"]
SOURCES = ["path/to/video.mp4", "0"] # local video, 0 for webcam
def run_tracker_in_thread(model_name, filename):
"""
Run YOLO tracker in its own thread for concurrent processing.
Args:
model_name (str): The YOLO11 model object.
filename (str): The path to the video file or the identifier for the webcam/external camera source.
"""
model = YOLO(model_name)
results = model.track(filename, save=True, stream=True)
for r in results:
pass
# Create and start tracker threads using a for loop
tracker_threads = []
for video_file, model_name in zip(SOURCES, MODEL_NAMES):
thread = threading.Thread(target=run_tracker_in_thread, args=(model_name, video_file), daemon=True)
tracker_threads.append(thread)
thread.start()
# Wait for all tracker threads to finish
for thread in tracker_threads:
thread.join()
# Clean up and close windows
cv2.destroyAllWindows()
```
This setup can be easily scaled to handle more video streams by creating additional threads following the same pattern. Explore more applications in our [blog post on object tracking](https://www.ultralytics.com/blog/object-detection-and-tracking-with-ultralytics-yolov8).
## 🤝 Contribute New Trackers
Are you experienced in multi-object tracking and have implemented or adapted an algorithm with Ultralytics YOLO? We encourage you to contribute to our Trackers section in [ultralytics/cfg/trackers](https://github.com/ultralytics/ultralytics/tree/main/ultralytics/cfg/trackers)! Your contributions can help expand the tracking solutions available within the Ultralytics [ecosystem](https://docs.ultralytics.com/).
To contribute, please review our [Contributing Guide](https://docs.ultralytics.com/help/contributing/) for instructions on submitting a [Pull Request (PR)](https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/proposing-changes-to-your-work-with-pull-requests/about-pull-requests) 🛠️. We look forward to your contributions!
Let's work together to enhance the tracking capabilities of Ultralytics YOLO and provide more powerful tools for the [computer vision](https://www.ultralytics.com/glossary/computer-vision-cv) and [deep learning](https://www.ultralytics.com/glossary/deep-learning-dl) community 🙏!

View File

@@ -0,0 +1,7 @@
# Ultralytics 🚀 AGPL-3.0 License - https://ultralytics.com/license
from .bot_sort import BOTSORT
from .byte_tracker import BYTETracker
from .track import register_tracker
__all__ = "register_tracker", "BOTSORT", "BYTETracker" # allow simpler import

View File

@@ -0,0 +1,117 @@
# Ultralytics 🚀 AGPL-3.0 License - https://ultralytics.com/license
"""Module defines the base classes and structures for object tracking in YOLO."""
from collections import OrderedDict
from typing import Any
import numpy as np
class TrackState:
"""
Enumeration class representing the possible states of an object being tracked.
Attributes:
New (int): State when the object is newly detected.
Tracked (int): State when the object is successfully tracked in subsequent frames.
Lost (int): State when the object is no longer tracked.
Removed (int): State when the object is removed from tracking.
Examples:
>>> state = TrackState.New
>>> if state == TrackState.New:
>>> print("Object is newly detected.")
"""
New = 0
Tracked = 1
Lost = 2
Removed = 3
class BaseTrack:
"""
Base class for object tracking, providing foundational attributes and methods.
Attributes:
_count (int): Class-level counter for unique track IDs.
track_id (int): Unique identifier for the track.
is_activated (bool): Flag indicating whether the track is currently active.
state (TrackState): Current state of the track.
history (OrderedDict): Ordered history of the track's states.
features (list): List of features extracted from the object for tracking.
curr_feature (Any): The current feature of the object being tracked.
score (float): The confidence score of the tracking.
start_frame (int): The frame number where tracking started.
frame_id (int): The most recent frame ID processed by the track.
time_since_update (int): Frames passed since the last update.
location (tuple): The location of the object in the context of multi-camera tracking.
Methods:
end_frame: Returns the ID of the last frame where the object was tracked.
next_id: Increments and returns the next global track ID.
activate: Abstract method to activate the track.
predict: Abstract method to predict the next state of the track.
update: Abstract method to update the track with new data.
mark_lost: Marks the track as lost.
mark_removed: Marks the track as removed.
reset_id: Resets the global track ID counter.
Examples:
Initialize a new track and mark it as lost:
>>> track = BaseTrack()
>>> track.mark_lost()
>>> print(track.state) # Output: 2 (TrackState.Lost)
"""
_count = 0
def __init__(self):
"""Initialize a new track with a unique ID and foundational tracking attributes."""
self.track_id = 0
self.is_activated = False
self.state = TrackState.New
self.history = OrderedDict()
self.features = []
self.curr_feature = None
self.score = 0
self.start_frame = 0
self.frame_id = 0
self.time_since_update = 0
self.location = (np.inf, np.inf)
@property
def end_frame(self) -> int:
"""Return the ID of the most recent frame where the object was tracked."""
return self.frame_id
@staticmethod
def next_id() -> int:
"""Increment and return the next unique global track ID for object tracking."""
BaseTrack._count += 1
return BaseTrack._count
def activate(self, *args: Any) -> None:
"""Activate the track with provided arguments, initializing necessary attributes for tracking."""
raise NotImplementedError
def predict(self) -> None:
"""Predict the next state of the track based on the current state and tracking model."""
raise NotImplementedError
def update(self, *args: Any, **kwargs: Any) -> None:
"""Update the track with new observations and data, modifying its state and attributes accordingly."""
raise NotImplementedError
def mark_lost(self) -> None:
"""Mark the track as lost by updating its state to TrackState.Lost."""
self.state = TrackState.Lost
def mark_removed(self) -> None:
"""Mark the track as removed by setting its state to TrackState.Removed."""
self.state = TrackState.Removed
@staticmethod
def reset_id() -> None:
"""Reset the global track ID counter to its initial value."""
BaseTrack._count = 0

View File

@@ -0,0 +1,274 @@
# Ultralytics 🚀 AGPL-3.0 License - https://ultralytics.com/license
from __future__ import annotations
from collections import deque
from typing import Any
import numpy as np
import torch
from ultralytics.utils.ops import xywh2xyxy
from ultralytics.utils.plotting import save_one_box
from .basetrack import TrackState
from .byte_tracker import BYTETracker, STrack
from .utils import matching
from .utils.gmc import GMC
from .utils.kalman_filter import KalmanFilterXYWH
class BOTrack(STrack):
"""
An extended version of the STrack class for YOLO, adding object tracking features.
This class extends the STrack class to include additional functionalities for object tracking, such as feature
smoothing, Kalman filter prediction, and reactivation of tracks.
Attributes:
shared_kalman (KalmanFilterXYWH): A shared Kalman filter for all instances of BOTrack.
smooth_feat (np.ndarray): Smoothed feature vector.
curr_feat (np.ndarray): Current feature vector.
features (deque): A deque to store feature vectors with a maximum length defined by `feat_history`.
alpha (float): Smoothing factor for the exponential moving average of features.
mean (np.ndarray): The mean state of the Kalman filter.
covariance (np.ndarray): The covariance matrix of the Kalman filter.
Methods:
update_features: Update features vector and smooth it using exponential moving average.
predict: Predict the mean and covariance using Kalman filter.
re_activate: Reactivate a track with updated features and optionally new ID.
update: Update the track with new detection and frame ID.
tlwh: Property that gets the current position in tlwh format `(top left x, top left y, width, height)`.
multi_predict: Predict the mean and covariance of multiple object tracks using shared Kalman filter.
convert_coords: Convert tlwh bounding box coordinates to xywh format.
tlwh_to_xywh: Convert bounding box to xywh format `(center x, center y, width, height)`.
Examples:
Create a BOTrack instance and update its features
>>> bo_track = BOTrack(tlwh=[100, 50, 80, 40], score=0.9, cls=1, feat=np.random.rand(128))
>>> bo_track.predict()
>>> new_track = BOTrack(tlwh=[110, 60, 80, 40], score=0.85, cls=1, feat=np.random.rand(128))
>>> bo_track.update(new_track, frame_id=2)
"""
shared_kalman = KalmanFilterXYWH()
def __init__(
self, xywh: np.ndarray, score: float, cls: int, feat: np.ndarray | None = None, feat_history: int = 50
):
"""
Initialize a BOTrack object with temporal parameters, such as feature history, alpha, and current features.
Args:
xywh (np.ndarray): Bounding box coordinates in xywh format (center x, center y, width, height).
score (float): Confidence score of the detection.
cls (int): Class ID of the detected object.
feat (np.ndarray, optional): Feature vector associated with the detection.
feat_history (int): Maximum length of the feature history deque.
Examples:
Initialize a BOTrack object with bounding box, score, class ID, and feature vector
>>> xywh = np.array([100, 150, 60, 50])
>>> score = 0.9
>>> cls = 1
>>> feat = np.random.rand(128)
>>> bo_track = BOTrack(xywh, score, cls, feat)
"""
super().__init__(xywh, score, cls)
self.smooth_feat = None
self.curr_feat = None
if feat is not None:
self.update_features(feat)
self.features = deque([], maxlen=feat_history)
self.alpha = 0.9
def update_features(self, feat: np.ndarray) -> None:
"""Update the feature vector and apply exponential moving average smoothing."""
feat /= np.linalg.norm(feat)
self.curr_feat = feat
if self.smooth_feat is None:
self.smooth_feat = feat
else:
self.smooth_feat = self.alpha * self.smooth_feat + (1 - self.alpha) * feat
self.features.append(feat)
self.smooth_feat /= np.linalg.norm(self.smooth_feat)
def predict(self) -> None:
"""Predict the object's future state using the Kalman filter to update its mean and covariance."""
mean_state = self.mean.copy()
if self.state != TrackState.Tracked:
mean_state[6] = 0
mean_state[7] = 0
self.mean, self.covariance = self.kalman_filter.predict(mean_state, self.covariance)
def re_activate(self, new_track: BOTrack, frame_id: int, new_id: bool = False) -> None:
"""Reactivate a track with updated features and optionally assign a new ID."""
if new_track.curr_feat is not None:
self.update_features(new_track.curr_feat)
super().re_activate(new_track, frame_id, new_id)
def update(self, new_track: BOTrack, frame_id: int) -> None:
"""Update the track with new detection information and the current frame ID."""
if new_track.curr_feat is not None:
self.update_features(new_track.curr_feat)
super().update(new_track, frame_id)
@property
def tlwh(self) -> np.ndarray:
"""Return the current bounding box position in `(top left x, top left y, width, height)` format."""
if self.mean is None:
return self._tlwh.copy()
ret = self.mean[:4].copy()
ret[:2] -= ret[2:] / 2
return ret
@staticmethod
def multi_predict(stracks: list[BOTrack]) -> None:
"""Predict the mean and covariance for multiple object tracks using a shared Kalman filter."""
if len(stracks) <= 0:
return
multi_mean = np.asarray([st.mean.copy() for st in stracks])
multi_covariance = np.asarray([st.covariance for st in stracks])
for i, st in enumerate(stracks):
if st.state != TrackState.Tracked:
multi_mean[i][6] = 0
multi_mean[i][7] = 0
multi_mean, multi_covariance = BOTrack.shared_kalman.multi_predict(multi_mean, multi_covariance)
for i, (mean, cov) in enumerate(zip(multi_mean, multi_covariance)):
stracks[i].mean = mean
stracks[i].covariance = cov
def convert_coords(self, tlwh: np.ndarray) -> np.ndarray:
"""Convert tlwh bounding box coordinates to xywh format."""
return self.tlwh_to_xywh(tlwh)
@staticmethod
def tlwh_to_xywh(tlwh: np.ndarray) -> np.ndarray:
"""Convert bounding box from tlwh (top-left-width-height) to xywh (center-x-center-y-width-height) format."""
ret = np.asarray(tlwh).copy()
ret[:2] += ret[2:] / 2
return ret
class BOTSORT(BYTETracker):
"""
An extended version of the BYTETracker class for YOLO, designed for object tracking with ReID and GMC algorithm.
Attributes:
proximity_thresh (float): Threshold for spatial proximity (IoU) between tracks and detections.
appearance_thresh (float): Threshold for appearance similarity (ReID embeddings) between tracks and detections.
encoder (Any): Object to handle ReID embeddings, set to None if ReID is not enabled.
gmc (GMC): An instance of the GMC algorithm for data association.
args (Any): Parsed command-line arguments containing tracking parameters.
Methods:
get_kalmanfilter: Return an instance of KalmanFilterXYWH for object tracking.
init_track: Initialize track with detections, scores, and classes.
get_dists: Get distances between tracks and detections using IoU and (optionally) ReID.
multi_predict: Predict and track multiple objects with a YOLO model.
reset: Reset the BOTSORT tracker to its initial state.
Examples:
Initialize BOTSORT and process detections
>>> bot_sort = BOTSORT(args, frame_rate=30)
>>> bot_sort.init_track(dets, scores, cls, img)
>>> bot_sort.multi_predict(tracks)
Note:
The class is designed to work with a YOLO object detection model and supports ReID only if enabled via args.
"""
def __init__(self, args: Any, frame_rate: int = 30):
"""
Initialize BOTSORT object with ReID module and GMC algorithm.
Args:
args (Any): Parsed command-line arguments containing tracking parameters.
frame_rate (int): Frame rate of the video being processed.
Examples:
Initialize BOTSORT with command-line arguments and a specified frame rate:
>>> args = parse_args()
>>> bot_sort = BOTSORT(args, frame_rate=30)
"""
super().__init__(args, frame_rate)
self.gmc = GMC(method=args.gmc_method)
# ReID module
self.proximity_thresh = args.proximity_thresh
self.appearance_thresh = args.appearance_thresh
self.encoder = (
(lambda feats, s: [f.cpu().numpy() for f in feats]) # native features do not require any model
if args.with_reid and self.args.model == "auto"
else ReID(args.model)
if args.with_reid
else None
)
def get_kalmanfilter(self) -> KalmanFilterXYWH:
"""Return an instance of KalmanFilterXYWH for predicting and updating object states in the tracking process."""
return KalmanFilterXYWH()
def init_track(self, results, img: np.ndarray | None = None) -> list[BOTrack]:
"""Initialize object tracks using detection bounding boxes, scores, class labels, and optional ReID features."""
if len(results) == 0:
return []
bboxes = results.xywhr if hasattr(results, "xywhr") else results.xywh
bboxes = np.concatenate([bboxes, np.arange(len(bboxes)).reshape(-1, 1)], axis=-1)
if self.args.with_reid and self.encoder is not None:
features_keep = self.encoder(img, bboxes)
return [BOTrack(xywh, s, c, f) for (xywh, s, c, f) in zip(bboxes, results.conf, results.cls, features_keep)]
else:
return [BOTrack(xywh, s, c) for (xywh, s, c) in zip(bboxes, results.conf, results.cls)]
def get_dists(self, tracks: list[BOTrack], detections: list[BOTrack]) -> np.ndarray:
"""Calculate distances between tracks and detections using IoU and optionally ReID embeddings."""
dists = matching.iou_distance(tracks, detections)
dists_mask = dists > (1 - self.proximity_thresh)
if self.args.fuse_score:
dists = matching.fuse_score(dists, detections)
if self.args.with_reid and self.encoder is not None:
emb_dists = matching.embedding_distance(tracks, detections) / 2.0
emb_dists[emb_dists > (1 - self.appearance_thresh)] = 1.0
emb_dists[dists_mask] = 1.0
dists = np.minimum(dists, emb_dists)
return dists
def multi_predict(self, tracks: list[BOTrack]) -> None:
"""Predict the mean and covariance of multiple object tracks using a shared Kalman filter."""
BOTrack.multi_predict(tracks)
def reset(self) -> None:
"""Reset the BOTSORT tracker to its initial state, clearing all tracked objects and internal states."""
super().reset()
self.gmc.reset_params()
class ReID:
"""YOLO model as encoder for re-identification."""
def __init__(self, model: str):
"""
Initialize encoder for re-identification.
Args:
model (str): Path to the YOLO model for re-identification.
"""
from ultralytics import YOLO
self.model = YOLO(model)
self.model(embed=[len(self.model.model.model) - 2 if ".pt" in model else -1], verbose=False, save=False) # init
def __call__(self, img: np.ndarray, dets: np.ndarray) -> list[np.ndarray]:
"""Extract embeddings for detected objects."""
feats = self.model.predictor(
[save_one_box(det, img, save=False) for det in xywh2xyxy(torch.from_numpy(dets[:, :4]))]
)
if len(feats) != dets.shape[0] and feats[0].shape[0] == dets.shape[0]:
feats = feats[0] # batched prediction with non-PyTorch backend
return [f.cpu().numpy() for f in feats]

View File

@@ -0,0 +1,485 @@
# Ultralytics 🚀 AGPL-3.0 License - https://ultralytics.com/license
from __future__ import annotations
from typing import Any
import numpy as np
from ..utils import LOGGER
from ..utils.ops import xywh2ltwh
from .basetrack import BaseTrack, TrackState
from .utils import matching
from .utils.kalman_filter import KalmanFilterXYAH
class STrack(BaseTrack):
"""
Single object tracking representation that uses Kalman filtering for state estimation.
This class is responsible for storing all the information regarding individual tracklets and performs state updates
and predictions based on Kalman filter.
Attributes:
shared_kalman (KalmanFilterXYAH): Shared Kalman filter used across all STrack instances for prediction.
_tlwh (np.ndarray): Private attribute to store top-left corner coordinates and width and height of bounding box.
kalman_filter (KalmanFilterXYAH): Instance of Kalman filter used for this particular object track.
mean (np.ndarray): Mean state estimate vector.
covariance (np.ndarray): Covariance of state estimate.
is_activated (bool): Boolean flag indicating if the track has been activated.
score (float): Confidence score of the track.
tracklet_len (int): Length of the tracklet.
cls (Any): Class label for the object.
idx (int): Index or identifier for the object.
frame_id (int): Current frame ID.
start_frame (int): Frame where the object was first detected.
angle (float | None): Optional angle information for oriented bounding boxes.
Methods:
predict: Predict the next state of the object using Kalman filter.
multi_predict: Predict the next states for multiple tracks.
multi_gmc: Update multiple track states using a homography matrix.
activate: Activate a new tracklet.
re_activate: Reactivate a previously lost tracklet.
update: Update the state of a matched track.
convert_coords: Convert bounding box to x-y-aspect-height format.
tlwh_to_xyah: Convert tlwh bounding box to xyah format.
Examples:
Initialize and activate a new track
>>> track = STrack(xywh=[100, 200, 50, 80, 0], score=0.9, cls="person")
>>> track.activate(kalman_filter=KalmanFilterXYAH(), frame_id=1)
"""
shared_kalman = KalmanFilterXYAH()
def __init__(self, xywh: list[float], score: float, cls: Any):
"""
Initialize a new STrack instance.
Args:
xywh (list[float]): Bounding box coordinates and dimensions in the format (x, y, w, h, [a], idx), where
(x, y) is the center, (w, h) are width and height, [a] is optional aspect ratio, and idx is the id.
score (float): Confidence score of the detection.
cls (Any): Class label for the detected object.
Examples:
>>> xywh = [100.0, 150.0, 50.0, 75.0, 1]
>>> score = 0.9
>>> cls = "person"
>>> track = STrack(xywh, score, cls)
"""
super().__init__()
# xywh+idx or xywha+idx
assert len(xywh) in {5, 6}, f"expected 5 or 6 values but got {len(xywh)}"
self._tlwh = np.asarray(xywh2ltwh(xywh[:4]), dtype=np.float32)
self.kalman_filter = None
self.mean, self.covariance = None, None
self.is_activated = False
self.score = score
self.tracklet_len = 0
self.cls = cls
self.idx = xywh[-1]
self.angle = xywh[4] if len(xywh) == 6 else None
def predict(self):
"""Predict the next state (mean and covariance) of the object using the Kalman filter."""
mean_state = self.mean.copy()
if self.state != TrackState.Tracked:
mean_state[7] = 0
self.mean, self.covariance = self.kalman_filter.predict(mean_state, self.covariance)
@staticmethod
def multi_predict(stracks: list[STrack]):
"""Perform multi-object predictive tracking using Kalman filter for the provided list of STrack instances."""
if len(stracks) <= 0:
return
multi_mean = np.asarray([st.mean.copy() for st in stracks])
multi_covariance = np.asarray([st.covariance for st in stracks])
for i, st in enumerate(stracks):
if st.state != TrackState.Tracked:
multi_mean[i][7] = 0
multi_mean, multi_covariance = STrack.shared_kalman.multi_predict(multi_mean, multi_covariance)
for i, (mean, cov) in enumerate(zip(multi_mean, multi_covariance)):
stracks[i].mean = mean
stracks[i].covariance = cov
@staticmethod
def multi_gmc(stracks: list[STrack], H: np.ndarray = np.eye(2, 3)):
"""Update state tracks positions and covariances using a homography matrix for multiple tracks."""
if stracks:
multi_mean = np.asarray([st.mean.copy() for st in stracks])
multi_covariance = np.asarray([st.covariance for st in stracks])
R = H[:2, :2]
R8x8 = np.kron(np.eye(4, dtype=float), R)
t = H[:2, 2]
for i, (mean, cov) in enumerate(zip(multi_mean, multi_covariance)):
mean = R8x8.dot(mean)
mean[:2] += t
cov = R8x8.dot(cov).dot(R8x8.transpose())
stracks[i].mean = mean
stracks[i].covariance = cov
def activate(self, kalman_filter: KalmanFilterXYAH, frame_id: int):
"""Activate a new tracklet using the provided Kalman filter and initialize its state and covariance."""
self.kalman_filter = kalman_filter
self.track_id = self.next_id()
self.mean, self.covariance = self.kalman_filter.initiate(self.convert_coords(self._tlwh))
self.tracklet_len = 0
self.state = TrackState.Tracked
if frame_id == 1:
self.is_activated = True
self.frame_id = frame_id
self.start_frame = frame_id
def re_activate(self, new_track: STrack, frame_id: int, new_id: bool = False):
"""Reactivate a previously lost track using new detection data and update its state and attributes."""
self.mean, self.covariance = self.kalman_filter.update(
self.mean, self.covariance, self.convert_coords(new_track.tlwh)
)
self.tracklet_len = 0
self.state = TrackState.Tracked
self.is_activated = True
self.frame_id = frame_id
if new_id:
self.track_id = self.next_id()
self.score = new_track.score
self.cls = new_track.cls
self.angle = new_track.angle
self.idx = new_track.idx
def update(self, new_track: STrack, frame_id: int):
"""
Update the state of a matched track.
Args:
new_track (STrack): The new track containing updated information.
frame_id (int): The ID of the current frame.
Examples:
Update the state of a track with new detection information
>>> track = STrack([100, 200, 50, 80, 0.9, 1])
>>> new_track = STrack([105, 205, 55, 85, 0.95, 1])
>>> track.update(new_track, 2)
"""
self.frame_id = frame_id
self.tracklet_len += 1
new_tlwh = new_track.tlwh
self.mean, self.covariance = self.kalman_filter.update(
self.mean, self.covariance, self.convert_coords(new_tlwh)
)
self.state = TrackState.Tracked
self.is_activated = True
self.score = new_track.score
self.cls = new_track.cls
self.angle = new_track.angle
self.idx = new_track.idx
def convert_coords(self, tlwh: np.ndarray) -> np.ndarray:
"""Convert a bounding box's top-left-width-height format to its x-y-aspect-height equivalent."""
return self.tlwh_to_xyah(tlwh)
@property
def tlwh(self) -> np.ndarray:
"""Get the bounding box in top-left-width-height format from the current state estimate."""
if self.mean is None:
return self._tlwh.copy()
ret = self.mean[:4].copy()
ret[2] *= ret[3]
ret[:2] -= ret[2:] / 2
return ret
@property
def xyxy(self) -> np.ndarray:
"""Convert bounding box from (top left x, top left y, width, height) to (min x, min y, max x, max y) format."""
ret = self.tlwh.copy()
ret[2:] += ret[:2]
return ret
@staticmethod
def tlwh_to_xyah(tlwh: np.ndarray) -> np.ndarray:
"""Convert bounding box from tlwh format to center-x-center-y-aspect-height (xyah) format."""
ret = np.asarray(tlwh).copy()
ret[:2] += ret[2:] / 2
ret[2] /= ret[3]
return ret
@property
def xywh(self) -> np.ndarray:
"""Get the current position of the bounding box in (center x, center y, width, height) format."""
ret = np.asarray(self.tlwh).copy()
ret[:2] += ret[2:] / 2
return ret
@property
def xywha(self) -> np.ndarray:
"""Get position in (center x, center y, width, height, angle) format, warning if angle is missing."""
if self.angle is None:
LOGGER.warning("`angle` attr not found, returning `xywh` instead.")
return self.xywh
return np.concatenate([self.xywh, self.angle[None]])
@property
def result(self) -> list[float]:
"""Get the current tracking results in the appropriate bounding box format."""
coords = self.xyxy if self.angle is None else self.xywha
return coords.tolist() + [self.track_id, self.score, self.cls, self.idx]
def __repr__(self) -> str:
"""Return a string representation of the STrack object including start frame, end frame, and track ID."""
return f"OT_{self.track_id}_({self.start_frame}-{self.end_frame})"
class BYTETracker:
"""
BYTETracker: A tracking algorithm built on top of YOLOv8 for object detection and tracking.
This class encapsulates the functionality for initializing, updating, and managing the tracks for detected objects in a
video sequence. It maintains the state of tracked, lost, and removed tracks over frames, utilizes Kalman filtering for
predicting the new object locations, and performs data association.
Attributes:
tracked_stracks (list[STrack]): List of successfully activated tracks.
lost_stracks (list[STrack]): List of lost tracks.
removed_stracks (list[STrack]): List of removed tracks.
frame_id (int): The current frame ID.
args (Namespace): Command-line arguments.
max_time_lost (int): The maximum frames for a track to be considered as 'lost'.
kalman_filter (KalmanFilterXYAH): Kalman Filter object.
Methods:
update: Update object tracker with new detections.
get_kalmanfilter: Return a Kalman filter object for tracking bounding boxes.
init_track: Initialize object tracking with detections.
get_dists: Calculate the distance between tracks and detections.
multi_predict: Predict the location of tracks.
reset_id: Reset the ID counter of STrack.
reset: Reset the tracker by clearing all tracks.
joint_stracks: Combine two lists of stracks.
sub_stracks: Filter out the stracks present in the second list from the first list.
remove_duplicate_stracks: Remove duplicate stracks based on IoU.
Examples:
Initialize BYTETracker and update with detection results
>>> tracker = BYTETracker(args, frame_rate=30)
>>> results = yolo_model.detect(image)
>>> tracked_objects = tracker.update(results)
"""
def __init__(self, args, frame_rate: int = 30):
"""
Initialize a BYTETracker instance for object tracking.
Args:
args (Namespace): Command-line arguments containing tracking parameters.
frame_rate (int): Frame rate of the video sequence.
Examples:
Initialize BYTETracker with command-line arguments and a frame rate of 30
>>> args = Namespace(track_buffer=30)
>>> tracker = BYTETracker(args, frame_rate=30)
"""
self.tracked_stracks = [] # type: list[STrack]
self.lost_stracks = [] # type: list[STrack]
self.removed_stracks = [] # type: list[STrack]
self.frame_id = 0
self.args = args
self.max_time_lost = int(frame_rate / 30.0 * args.track_buffer)
self.kalman_filter = self.get_kalmanfilter()
self.reset_id()
def update(self, results, img: np.ndarray | None = None, feats: np.ndarray | None = None) -> np.ndarray:
"""Update the tracker with new detections and return the current list of tracked objects."""
self.frame_id += 1
activated_stracks = []
refind_stracks = []
lost_stracks = []
removed_stracks = []
scores = results.conf
remain_inds = scores >= self.args.track_high_thresh
inds_low = scores > self.args.track_low_thresh
inds_high = scores < self.args.track_high_thresh
inds_second = inds_low & inds_high
results_second = results[inds_second]
results = results[remain_inds]
feats_keep = feats_second = img
if feats is not None and len(feats):
feats_keep = feats[remain_inds]
feats_second = feats[inds_second]
detections = self.init_track(results, feats_keep)
# Add newly detected tracklets to tracked_stracks
unconfirmed = []
tracked_stracks = [] # type: list[STrack]
for track in self.tracked_stracks:
if not track.is_activated:
unconfirmed.append(track)
else:
tracked_stracks.append(track)
# Step 2: First association, with high score detection boxes
strack_pool = self.joint_stracks(tracked_stracks, self.lost_stracks)
# Predict the current location with KF
self.multi_predict(strack_pool)
if hasattr(self, "gmc") and img is not None:
# use try-except here to bypass errors from gmc module
try:
warp = self.gmc.apply(img, results.xyxy)
except Exception:
warp = np.eye(2, 3)
STrack.multi_gmc(strack_pool, warp)
STrack.multi_gmc(unconfirmed, warp)
dists = self.get_dists(strack_pool, detections)
matches, u_track, u_detection = matching.linear_assignment(dists, thresh=self.args.match_thresh)
for itracked, idet in matches:
track = strack_pool[itracked]
det = detections[idet]
if track.state == TrackState.Tracked:
track.update(det, self.frame_id)
activated_stracks.append(track)
else:
track.re_activate(det, self.frame_id, new_id=False)
refind_stracks.append(track)
# Step 3: Second association, with low score detection boxes association the untrack to the low score detections
detections_second = self.init_track(results_second, feats_second)
r_tracked_stracks = [strack_pool[i] for i in u_track if strack_pool[i].state == TrackState.Tracked]
# TODO
dists = matching.iou_distance(r_tracked_stracks, detections_second)
matches, u_track, u_detection_second = matching.linear_assignment(dists, thresh=0.5)
for itracked, idet in matches:
track = r_tracked_stracks[itracked]
det = detections_second[idet]
if track.state == TrackState.Tracked:
track.update(det, self.frame_id)
activated_stracks.append(track)
else:
track.re_activate(det, self.frame_id, new_id=False)
refind_stracks.append(track)
for it in u_track:
track = r_tracked_stracks[it]
if track.state != TrackState.Lost:
track.mark_lost()
lost_stracks.append(track)
# Deal with unconfirmed tracks, usually tracks with only one beginning frame
detections = [detections[i] for i in u_detection]
dists = self.get_dists(unconfirmed, detections)
matches, u_unconfirmed, u_detection = matching.linear_assignment(dists, thresh=0.7)
for itracked, idet in matches:
unconfirmed[itracked].update(detections[idet], self.frame_id)
activated_stracks.append(unconfirmed[itracked])
for it in u_unconfirmed:
track = unconfirmed[it]
track.mark_removed()
removed_stracks.append(track)
# Step 4: Init new stracks
for inew in u_detection:
track = detections[inew]
if track.score < self.args.new_track_thresh:
continue
track.activate(self.kalman_filter, self.frame_id)
activated_stracks.append(track)
# Step 5: Update state
for track in self.lost_stracks:
if self.frame_id - track.end_frame > self.max_time_lost:
track.mark_removed()
removed_stracks.append(track)
self.tracked_stracks = [t for t in self.tracked_stracks if t.state == TrackState.Tracked]
self.tracked_stracks = self.joint_stracks(self.tracked_stracks, activated_stracks)
self.tracked_stracks = self.joint_stracks(self.tracked_stracks, refind_stracks)
self.lost_stracks = self.sub_stracks(self.lost_stracks, self.tracked_stracks)
self.lost_stracks.extend(lost_stracks)
self.lost_stracks = self.sub_stracks(self.lost_stracks, self.removed_stracks)
self.tracked_stracks, self.lost_stracks = self.remove_duplicate_stracks(self.tracked_stracks, self.lost_stracks)
self.removed_stracks.extend(removed_stracks)
if len(self.removed_stracks) > 1000:
self.removed_stracks = self.removed_stracks[-999:] # clip remove stracks to 1000 maximum
return np.asarray([x.result for x in self.tracked_stracks if x.is_activated], dtype=np.float32)
def get_kalmanfilter(self) -> KalmanFilterXYAH:
"""Return a Kalman filter object for tracking bounding boxes using KalmanFilterXYAH."""
return KalmanFilterXYAH()
def init_track(self, results, img: np.ndarray | None = None) -> list[STrack]:
"""Initialize object tracking with given detections, scores, and class labels using the STrack algorithm."""
if len(results) == 0:
return []
bboxes = results.xywhr if hasattr(results, "xywhr") else results.xywh
bboxes = np.concatenate([bboxes, np.arange(len(bboxes)).reshape(-1, 1)], axis=-1)
return [STrack(xywh, s, c) for (xywh, s, c) in zip(bboxes, results.conf, results.cls)]
def get_dists(self, tracks: list[STrack], detections: list[STrack]) -> np.ndarray:
"""Calculate the distance between tracks and detections using IoU and optionally fuse scores."""
dists = matching.iou_distance(tracks, detections)
if self.args.fuse_score:
dists = matching.fuse_score(dists, detections)
return dists
def multi_predict(self, tracks: list[STrack]):
"""Predict the next states for multiple tracks using Kalman filter."""
STrack.multi_predict(tracks)
@staticmethod
def reset_id():
"""Reset the ID counter for STrack instances to ensure unique track IDs across tracking sessions."""
STrack.reset_id()
def reset(self):
"""Reset the tracker by clearing all tracked, lost, and removed tracks and reinitializing the Kalman filter."""
self.tracked_stracks = [] # type: list[STrack]
self.lost_stracks = [] # type: list[STrack]
self.removed_stracks = [] # type: list[STrack]
self.frame_id = 0
self.kalman_filter = self.get_kalmanfilter()
self.reset_id()
@staticmethod
def joint_stracks(tlista: list[STrack], tlistb: list[STrack]) -> list[STrack]:
"""Combine two lists of STrack objects into a single list, ensuring no duplicates based on track IDs."""
exists = {}
res = []
for t in tlista:
exists[t.track_id] = 1
res.append(t)
for t in tlistb:
tid = t.track_id
if not exists.get(tid, 0):
exists[tid] = 1
res.append(t)
return res
@staticmethod
def sub_stracks(tlista: list[STrack], tlistb: list[STrack]) -> list[STrack]:
"""Filter out the stracks present in the second list from the first list."""
track_ids_b = {t.track_id for t in tlistb}
return [t for t in tlista if t.track_id not in track_ids_b]
@staticmethod
def remove_duplicate_stracks(stracksa: list[STrack], stracksb: list[STrack]) -> tuple[list[STrack], list[STrack]]:
"""Remove duplicate stracks from two lists based on Intersection over Union (IoU) distance."""
pdist = matching.iou_distance(stracksa, stracksb)
pairs = np.where(pdist < 0.15)
dupa, dupb = [], []
for p, q in zip(*pairs):
timep = stracksa[p].frame_id - stracksa[p].start_frame
timeq = stracksb[q].frame_id - stracksb[q].start_frame
if timep > timeq:
dupb.append(q)
else:
dupa.append(p)
resa = [t for i, t in enumerate(stracksa) if i not in dupa]
resb = [t for i, t in enumerate(stracksb) if i not in dupb]
return resa, resb

View File

@@ -0,0 +1,119 @@
# Ultralytics 🚀 AGPL-3.0 License - https://ultralytics.com/license
from functools import partial
from pathlib import Path
import torch
from ultralytics.utils import YAML, IterableSimpleNamespace
from ultralytics.utils.checks import check_yaml
from .bot_sort import BOTSORT
from .byte_tracker import BYTETracker
# A mapping of tracker types to corresponding tracker classes
TRACKER_MAP = {"bytetrack": BYTETracker, "botsort": BOTSORT}
def on_predict_start(predictor: object, persist: bool = False) -> None:
"""
Initialize trackers for object tracking during prediction.
Args:
predictor (ultralytics.engine.predictor.BasePredictor): The predictor object to initialize trackers for.
persist (bool, optional): Whether to persist the trackers if they already exist.
Examples:
Initialize trackers for a predictor object
>>> predictor = SomePredictorClass()
>>> on_predict_start(predictor, persist=True)
"""
if predictor.args.task == "classify":
raise ValueError("❌ Classification doesn't support 'mode=track'")
if hasattr(predictor, "trackers") and persist:
return
tracker = check_yaml(predictor.args.tracker)
cfg = IterableSimpleNamespace(**YAML.load(tracker))
if cfg.tracker_type not in {"bytetrack", "botsort"}:
raise AssertionError(f"Only 'bytetrack' and 'botsort' are supported for now, but got '{cfg.tracker_type}'")
predictor._feats = None # reset in case used earlier
if hasattr(predictor, "_hook"):
predictor._hook.remove()
if cfg.tracker_type == "botsort" and cfg.with_reid and cfg.model == "auto":
from ultralytics.nn.modules.head import Detect
if not (
isinstance(predictor.model.model, torch.nn.Module)
and isinstance(predictor.model.model.model[-1], Detect)
and not predictor.model.model.model[-1].end2end
):
cfg.model = "yolo11n-cls.pt"
else:
# Register hook to extract input of Detect layer
def pre_hook(module, input):
predictor._feats = list(input[0]) # unroll to new list to avoid mutation in forward
predictor._hook = predictor.model.model.model[-1].register_forward_pre_hook(pre_hook)
trackers = []
for _ in range(predictor.dataset.bs):
tracker = TRACKER_MAP[cfg.tracker_type](args=cfg, frame_rate=30)
trackers.append(tracker)
if predictor.dataset.mode != "stream": # only need one tracker for other modes
break
predictor.trackers = trackers
predictor.vid_path = [None] * predictor.dataset.bs # for determining when to reset tracker on new video
def on_predict_postprocess_end(predictor: object, persist: bool = False) -> None:
"""
Postprocess detected boxes and update with object tracking.
Args:
predictor (object): The predictor object containing the predictions.
persist (bool, optional): Whether to persist the trackers if they already exist.
Examples:
Postprocess predictions and update with tracking
>>> predictor = YourPredictorClass()
>>> on_predict_postprocess_end(predictor, persist=True)
"""
is_obb = predictor.args.task == "obb"
is_stream = predictor.dataset.mode == "stream"
for i, result in enumerate(predictor.results):
tracker = predictor.trackers[i if is_stream else 0]
vid_path = predictor.save_dir / Path(result.path).name
if not persist and predictor.vid_path[i if is_stream else 0] != vid_path:
tracker.reset()
predictor.vid_path[i if is_stream else 0] = vid_path
det = (result.obb if is_obb else result.boxes).cpu().numpy()
tracks = tracker.update(det, result.orig_img, getattr(result, "feats", None))
if len(tracks) == 0:
continue
idx = tracks[:, -1].astype(int)
predictor.results[i] = result[idx]
update_args = {"obb" if is_obb else "boxes": torch.as_tensor(tracks[:, :-1])}
predictor.results[i].update(**update_args)
def register_tracker(model: object, persist: bool) -> None:
"""
Register tracking callbacks to the model for object tracking during prediction.
Args:
model (object): The model object to register tracking callbacks for.
persist (bool): Whether to persist the trackers if they already exist.
Examples:
Register tracking callbacks to a YOLO model
>>> model = YOLOModel()
>>> register_tracker(model, persist=True)
"""
model.add_callback("on_predict_start", partial(on_predict_start, persist=persist))
model.add_callback("on_predict_postprocess_end", partial(on_predict_postprocess_end, persist=persist))

View File

@@ -0,0 +1 @@
# Ultralytics 🚀 AGPL-3.0 License - https://ultralytics.com/license

View File

@@ -0,0 +1,350 @@
# Ultralytics 🚀 AGPL-3.0 License - https://ultralytics.com/license
from __future__ import annotations
import copy
import cv2
import numpy as np
from ultralytics.utils import LOGGER
class GMC:
"""
Generalized Motion Compensation (GMC) class for tracking and object detection in video frames.
This class provides methods for tracking and detecting objects based on several tracking algorithms including ORB,
SIFT, ECC, and Sparse Optical Flow. It also supports downscaling of frames for computational efficiency.
Attributes:
method (str): The tracking method to use. Options include 'orb', 'sift', 'ecc', 'sparseOptFlow', 'none'.
downscale (int): Factor by which to downscale the frames for processing.
prevFrame (np.ndarray): Previous frame for tracking.
prevKeyPoints (list): Keypoints from the previous frame.
prevDescriptors (np.ndarray): Descriptors from the previous frame.
initializedFirstFrame (bool): Flag indicating if the first frame has been processed.
Methods:
apply: Apply the chosen method to a raw frame and optionally use provided detections.
apply_ecc: Apply the ECC algorithm to a raw frame.
apply_features: Apply feature-based methods like ORB or SIFT to a raw frame.
apply_sparseoptflow: Apply the Sparse Optical Flow method to a raw frame.
reset_params: Reset the internal parameters of the GMC object.
Examples:
Create a GMC object and apply it to a frame
>>> gmc = GMC(method="sparseOptFlow", downscale=2)
>>> frame = np.array([[1, 2, 3], [4, 5, 6]])
>>> processed_frame = gmc.apply(frame)
>>> print(processed_frame)
array([[1, 2, 3],
[4, 5, 6]])
"""
def __init__(self, method: str = "sparseOptFlow", downscale: int = 2) -> None:
"""
Initialize a Generalized Motion Compensation (GMC) object with tracking method and downscale factor.
Args:
method (str): The tracking method to use. Options include 'orb', 'sift', 'ecc', 'sparseOptFlow', 'none'.
downscale (int): Downscale factor for processing frames.
Examples:
Initialize a GMC object with the 'sparseOptFlow' method and a downscale factor of 2
>>> gmc = GMC(method="sparseOptFlow", downscale=2)
"""
super().__init__()
self.method = method
self.downscale = max(1, downscale)
if self.method == "orb":
self.detector = cv2.FastFeatureDetector_create(20)
self.extractor = cv2.ORB_create()
self.matcher = cv2.BFMatcher(cv2.NORM_HAMMING)
elif self.method == "sift":
self.detector = cv2.SIFT_create(nOctaveLayers=3, contrastThreshold=0.02, edgeThreshold=20)
self.extractor = cv2.SIFT_create(nOctaveLayers=3, contrastThreshold=0.02, edgeThreshold=20)
self.matcher = cv2.BFMatcher(cv2.NORM_L2)
elif self.method == "ecc":
number_of_iterations = 5000
termination_eps = 1e-6
self.warp_mode = cv2.MOTION_EUCLIDEAN
self.criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, number_of_iterations, termination_eps)
elif self.method == "sparseOptFlow":
self.feature_params = dict(
maxCorners=1000, qualityLevel=0.01, minDistance=1, blockSize=3, useHarrisDetector=False, k=0.04
)
elif self.method in {"none", "None", None}:
self.method = None
else:
raise ValueError(f"Unknown GMC method: {method}")
self.prevFrame = None
self.prevKeyPoints = None
self.prevDescriptors = None
self.initializedFirstFrame = False
def apply(self, raw_frame: np.ndarray, detections: list | None = None) -> np.ndarray:
"""
Apply object detection on a raw frame using the specified method.
Args:
raw_frame (np.ndarray): The raw frame to be processed, with shape (H, W, C).
detections (list, optional): List of detections to be used in the processing.
Returns:
(np.ndarray): Transformation matrix with shape (2, 3).
Examples:
>>> gmc = GMC(method="sparseOptFlow")
>>> raw_frame = np.random.rand(480, 640, 3)
>>> transformation_matrix = gmc.apply(raw_frame)
>>> print(transformation_matrix.shape)
(2, 3)
"""
if self.method in {"orb", "sift"}:
return self.apply_features(raw_frame, detections)
elif self.method == "ecc":
return self.apply_ecc(raw_frame)
elif self.method == "sparseOptFlow":
return self.apply_sparseoptflow(raw_frame)
else:
return np.eye(2, 3)
def apply_ecc(self, raw_frame: np.ndarray) -> np.ndarray:
"""
Apply the ECC (Enhanced Correlation Coefficient) algorithm to a raw frame for motion compensation.
Args:
raw_frame (np.ndarray): The raw frame to be processed, with shape (H, W, C).
Returns:
(np.ndarray): Transformation matrix with shape (2, 3).
Examples:
>>> gmc = GMC(method="ecc")
>>> processed_frame = gmc.apply_ecc(np.array([[[1, 2, 3], [4, 5, 6]], [[7, 8, 9], [10, 11, 12]]]))
>>> print(processed_frame)
[[1. 0. 0.]
[0. 1. 0.]]
"""
height, width, c = raw_frame.shape
frame = cv2.cvtColor(raw_frame, cv2.COLOR_BGR2GRAY) if c == 3 else raw_frame
H = np.eye(2, 3, dtype=np.float32)
# Downscale image for computational efficiency
if self.downscale > 1.0:
frame = cv2.GaussianBlur(frame, (3, 3), 1.5)
frame = cv2.resize(frame, (width // self.downscale, height // self.downscale))
# Handle first frame initialization
if not self.initializedFirstFrame:
self.prevFrame = frame.copy()
self.initializedFirstFrame = True
return H
# Run the ECC algorithm to find transformation matrix
try:
(_, H) = cv2.findTransformECC(self.prevFrame, frame, H, self.warp_mode, self.criteria, None, 1)
except Exception as e:
LOGGER.warning(f"find transform failed. Set warp as identity {e}")
return H
def apply_features(self, raw_frame: np.ndarray, detections: list | None = None) -> np.ndarray:
"""
Apply feature-based methods like ORB or SIFT to a raw frame.
Args:
raw_frame (np.ndarray): The raw frame to be processed, with shape (H, W, C).
detections (list, optional): List of detections to be used in the processing.
Returns:
(np.ndarray): Transformation matrix with shape (2, 3).
Examples:
>>> gmc = GMC(method="orb")
>>> raw_frame = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)
>>> transformation_matrix = gmc.apply_features(raw_frame)
>>> print(transformation_matrix.shape)
(2, 3)
"""
height, width, c = raw_frame.shape
frame = cv2.cvtColor(raw_frame, cv2.COLOR_BGR2GRAY) if c == 3 else raw_frame
H = np.eye(2, 3)
# Downscale image for computational efficiency
if self.downscale > 1.0:
frame = cv2.resize(frame, (width // self.downscale, height // self.downscale))
width = width // self.downscale
height = height // self.downscale
# Create mask for keypoint detection, excluding border regions
mask = np.zeros_like(frame)
mask[int(0.02 * height) : int(0.98 * height), int(0.02 * width) : int(0.98 * width)] = 255
# Exclude detection regions from mask to avoid tracking detected objects
if detections is not None:
for det in detections:
tlbr = (det[:4] / self.downscale).astype(np.int_)
mask[tlbr[1] : tlbr[3], tlbr[0] : tlbr[2]] = 0
# Find keypoints and compute descriptors
keypoints = self.detector.detect(frame, mask)
keypoints, descriptors = self.extractor.compute(frame, keypoints)
# Handle first frame initialization
if not self.initializedFirstFrame:
self.prevFrame = frame.copy()
self.prevKeyPoints = copy.copy(keypoints)
self.prevDescriptors = copy.copy(descriptors)
self.initializedFirstFrame = True
return H
# Match descriptors between previous and current frame
knnMatches = self.matcher.knnMatch(self.prevDescriptors, descriptors, 2)
# Filter matches based on spatial distance constraints
matches = []
spatialDistances = []
maxSpatialDistance = 0.25 * np.array([width, height])
# Handle empty matches case
if len(knnMatches) == 0:
self.prevFrame = frame.copy()
self.prevKeyPoints = copy.copy(keypoints)
self.prevDescriptors = copy.copy(descriptors)
return H
# Apply Lowe's ratio test and spatial distance filtering
for m, n in knnMatches:
if m.distance < 0.9 * n.distance:
prevKeyPointLocation = self.prevKeyPoints[m.queryIdx].pt
currKeyPointLocation = keypoints[m.trainIdx].pt
spatialDistance = (
prevKeyPointLocation[0] - currKeyPointLocation[0],
prevKeyPointLocation[1] - currKeyPointLocation[1],
)
if (np.abs(spatialDistance[0]) < maxSpatialDistance[0]) and (
np.abs(spatialDistance[1]) < maxSpatialDistance[1]
):
spatialDistances.append(spatialDistance)
matches.append(m)
# Filter outliers using statistical analysis
meanSpatialDistances = np.mean(spatialDistances, 0)
stdSpatialDistances = np.std(spatialDistances, 0)
inliers = (spatialDistances - meanSpatialDistances) < 2.5 * stdSpatialDistances
# Extract good matches and corresponding points
goodMatches = []
prevPoints = []
currPoints = []
for i in range(len(matches)):
if inliers[i, 0] and inliers[i, 1]:
goodMatches.append(matches[i])
prevPoints.append(self.prevKeyPoints[matches[i].queryIdx].pt)
currPoints.append(keypoints[matches[i].trainIdx].pt)
prevPoints = np.array(prevPoints)
currPoints = np.array(currPoints)
# Estimate transformation matrix using RANSAC
if prevPoints.shape[0] > 4:
H, inliers = cv2.estimateAffinePartial2D(prevPoints, currPoints, cv2.RANSAC)
# Scale translation components back to original resolution
if self.downscale > 1.0:
H[0, 2] *= self.downscale
H[1, 2] *= self.downscale
else:
LOGGER.warning("not enough matching points")
# Store current frame data for next iteration
self.prevFrame = frame.copy()
self.prevKeyPoints = copy.copy(keypoints)
self.prevDescriptors = copy.copy(descriptors)
return H
def apply_sparseoptflow(self, raw_frame: np.ndarray) -> np.ndarray:
"""
Apply Sparse Optical Flow method to a raw frame.
Args:
raw_frame (np.ndarray): The raw frame to be processed, with shape (H, W, C).
Returns:
(np.ndarray): Transformation matrix with shape (2, 3).
Examples:
>>> gmc = GMC()
>>> result = gmc.apply_sparseoptflow(np.array([[[1, 2, 3], [4, 5, 6]], [[7, 8, 9], [10, 11, 12]]]))
>>> print(result)
[[1. 0. 0.]
[0. 1. 0.]]
"""
height, width, c = raw_frame.shape
frame = cv2.cvtColor(raw_frame, cv2.COLOR_BGR2GRAY) if c == 3 else raw_frame
H = np.eye(2, 3)
# Downscale image for computational efficiency
if self.downscale > 1.0:
frame = cv2.resize(frame, (width // self.downscale, height // self.downscale))
# Find good features to track
keypoints = cv2.goodFeaturesToTrack(frame, mask=None, **self.feature_params)
# Handle first frame initialization
if not self.initializedFirstFrame or self.prevKeyPoints is None:
self.prevFrame = frame.copy()
self.prevKeyPoints = copy.copy(keypoints)
self.initializedFirstFrame = True
return H
# Calculate optical flow using Lucas-Kanade method
matchedKeypoints, status, _ = cv2.calcOpticalFlowPyrLK(self.prevFrame, frame, self.prevKeyPoints, None)
# Extract successfully tracked points
prevPoints = []
currPoints = []
for i in range(len(status)):
if status[i]:
prevPoints.append(self.prevKeyPoints[i])
currPoints.append(matchedKeypoints[i])
prevPoints = np.array(prevPoints)
currPoints = np.array(currPoints)
# Estimate transformation matrix using RANSAC
if (prevPoints.shape[0] > 4) and (prevPoints.shape[0] == currPoints.shape[0]):
H, _ = cv2.estimateAffinePartial2D(prevPoints, currPoints, cv2.RANSAC)
# Scale translation components back to original resolution
if self.downscale > 1.0:
H[0, 2] *= self.downscale
H[1, 2] *= self.downscale
else:
LOGGER.warning("not enough matching points")
# Store current frame data for next iteration
self.prevFrame = frame.copy()
self.prevKeyPoints = copy.copy(keypoints)
return H
def reset_params(self) -> None:
"""Reset the internal parameters including previous frame, keypoints, and descriptors."""
self.prevFrame = None
self.prevKeyPoints = None
self.prevDescriptors = None
self.initializedFirstFrame = False

View File

@@ -0,0 +1,493 @@
# Ultralytics 🚀 AGPL-3.0 License - https://ultralytics.com/license
import numpy as np
import scipy.linalg
class KalmanFilterXYAH:
"""
A KalmanFilterXYAH class for tracking bounding boxes in image space using a Kalman filter.
Implements a simple Kalman filter for tracking bounding boxes in image space. The 8-dimensional state space
(x, y, a, h, vx, vy, va, vh) contains the bounding box center position (x, y), aspect ratio a, height h, and their
respective velocities. Object motion follows a constant velocity model, and bounding box location (x, y, a, h) is
taken as a direct observation of the state space (linear observation model).
Attributes:
_motion_mat (np.ndarray): The motion matrix for the Kalman filter.
_update_mat (np.ndarray): The update matrix for the Kalman filter.
_std_weight_position (float): Standard deviation weight for position.
_std_weight_velocity (float): Standard deviation weight for velocity.
Methods:
initiate: Create a track from an unassociated measurement.
predict: Run the Kalman filter prediction step.
project: Project the state distribution to measurement space.
multi_predict: Run the Kalman filter prediction step (vectorized version).
update: Run the Kalman filter correction step.
gating_distance: Compute the gating distance between state distribution and measurements.
Examples:
Initialize the Kalman filter and create a track from a measurement
>>> kf = KalmanFilterXYAH()
>>> measurement = np.array([100, 200, 1.5, 50])
>>> mean, covariance = kf.initiate(measurement)
>>> print(mean)
>>> print(covariance)
"""
def __init__(self):
"""
Initialize Kalman filter model matrices with motion and observation uncertainty weights.
The Kalman filter is initialized with an 8-dimensional state space (x, y, a, h, vx, vy, va, vh), where (x, y)
represents the bounding box center position, 'a' is the aspect ratio, 'h' is the height, and their respective
velocities are (vx, vy, va, vh). The filter uses a constant velocity model for object motion and a linear
observation model for bounding box location.
Examples:
Initialize a Kalman filter for tracking:
>>> kf = KalmanFilterXYAH()
"""
ndim, dt = 4, 1.0
# Create Kalman filter model matrices
self._motion_mat = np.eye(2 * ndim, 2 * ndim)
for i in range(ndim):
self._motion_mat[i, ndim + i] = dt
self._update_mat = np.eye(ndim, 2 * ndim)
# Motion and observation uncertainty are chosen relative to the current state estimate
self._std_weight_position = 1.0 / 20
self._std_weight_velocity = 1.0 / 160
def initiate(self, measurement: np.ndarray):
"""
Create a track from an unassociated measurement.
Args:
measurement (np.ndarray): Bounding box coordinates (x, y, a, h) with center position (x, y), aspect ratio a,
and height h.
Returns:
mean (np.ndarray): Mean vector (8-dimensional) of the new track. Unobserved velocities are initialized to 0 mean.
covariance (np.ndarray): Covariance matrix (8x8 dimensional) of the new track.
Examples:
>>> kf = KalmanFilterXYAH()
>>> measurement = np.array([100, 50, 1.5, 200])
>>> mean, covariance = kf.initiate(measurement)
"""
mean_pos = measurement
mean_vel = np.zeros_like(mean_pos)
mean = np.r_[mean_pos, mean_vel]
std = [
2 * self._std_weight_position * measurement[3],
2 * self._std_weight_position * measurement[3],
1e-2,
2 * self._std_weight_position * measurement[3],
10 * self._std_weight_velocity * measurement[3],
10 * self._std_weight_velocity * measurement[3],
1e-5,
10 * self._std_weight_velocity * measurement[3],
]
covariance = np.diag(np.square(std))
return mean, covariance
def predict(self, mean: np.ndarray, covariance: np.ndarray):
"""
Run Kalman filter prediction step.
Args:
mean (np.ndarray): The 8-dimensional mean vector of the object state at the previous time step.
covariance (np.ndarray): The 8x8-dimensional covariance matrix of the object state at the previous time step.
Returns:
mean (np.ndarray): Mean vector of the predicted state. Unobserved velocities are initialized to 0 mean.
covariance (np.ndarray): Covariance matrix of the predicted state.
Examples:
>>> kf = KalmanFilterXYAH()
>>> mean = np.array([0, 0, 1, 1, 0, 0, 0, 0])
>>> covariance = np.eye(8)
>>> predicted_mean, predicted_covariance = kf.predict(mean, covariance)
"""
std_pos = [
self._std_weight_position * mean[3],
self._std_weight_position * mean[3],
1e-2,
self._std_weight_position * mean[3],
]
std_vel = [
self._std_weight_velocity * mean[3],
self._std_weight_velocity * mean[3],
1e-5,
self._std_weight_velocity * mean[3],
]
motion_cov = np.diag(np.square(np.r_[std_pos, std_vel]))
mean = np.dot(mean, self._motion_mat.T)
covariance = np.linalg.multi_dot((self._motion_mat, covariance, self._motion_mat.T)) + motion_cov
return mean, covariance
def project(self, mean: np.ndarray, covariance: np.ndarray):
"""
Project state distribution to measurement space.
Args:
mean (np.ndarray): The state's mean vector (8 dimensional array).
covariance (np.ndarray): The state's covariance matrix (8x8 dimensional).
Returns:
mean (np.ndarray): Projected mean of the given state estimate.
covariance (np.ndarray): Projected covariance matrix of the given state estimate.
Examples:
>>> kf = KalmanFilterXYAH()
>>> mean = np.array([0, 0, 1, 1, 0, 0, 0, 0])
>>> covariance = np.eye(8)
>>> projected_mean, projected_covariance = kf.project(mean, covariance)
"""
std = [
self._std_weight_position * mean[3],
self._std_weight_position * mean[3],
1e-1,
self._std_weight_position * mean[3],
]
innovation_cov = np.diag(np.square(std))
mean = np.dot(self._update_mat, mean)
covariance = np.linalg.multi_dot((self._update_mat, covariance, self._update_mat.T))
return mean, covariance + innovation_cov
def multi_predict(self, mean: np.ndarray, covariance: np.ndarray):
"""
Run Kalman filter prediction step for multiple object states (Vectorized version).
Args:
mean (np.ndarray): The Nx8 dimensional mean matrix of the object states at the previous time step.
covariance (np.ndarray): The Nx8x8 covariance matrix of the object states at the previous time step.
Returns:
mean (np.ndarray): Mean matrix of the predicted states with shape (N, 8).
covariance (np.ndarray): Covariance matrix of the predicted states with shape (N, 8, 8).
Examples:
>>> mean = np.random.rand(10, 8) # 10 object states
>>> covariance = np.random.rand(10, 8, 8) # Covariance matrices for 10 object states
>>> predicted_mean, predicted_covariance = kalman_filter.multi_predict(mean, covariance)
"""
std_pos = [
self._std_weight_position * mean[:, 3],
self._std_weight_position * mean[:, 3],
1e-2 * np.ones_like(mean[:, 3]),
self._std_weight_position * mean[:, 3],
]
std_vel = [
self._std_weight_velocity * mean[:, 3],
self._std_weight_velocity * mean[:, 3],
1e-5 * np.ones_like(mean[:, 3]),
self._std_weight_velocity * mean[:, 3],
]
sqr = np.square(np.r_[std_pos, std_vel]).T
motion_cov = [np.diag(sqr[i]) for i in range(len(mean))]
motion_cov = np.asarray(motion_cov)
mean = np.dot(mean, self._motion_mat.T)
left = np.dot(self._motion_mat, covariance).transpose((1, 0, 2))
covariance = np.dot(left, self._motion_mat.T) + motion_cov
return mean, covariance
def update(self, mean: np.ndarray, covariance: np.ndarray, measurement: np.ndarray):
"""
Run Kalman filter correction step.
Args:
mean (np.ndarray): The predicted state's mean vector (8 dimensional).
covariance (np.ndarray): The state's covariance matrix (8x8 dimensional).
measurement (np.ndarray): The 4 dimensional measurement vector (x, y, a, h), where (x, y) is the center
position, a the aspect ratio, and h the height of the bounding box.
Returns:
new_mean (np.ndarray): Measurement-corrected state mean.
new_covariance (np.ndarray): Measurement-corrected state covariance.
Examples:
>>> kf = KalmanFilterXYAH()
>>> mean = np.array([0, 0, 1, 1, 0, 0, 0, 0])
>>> covariance = np.eye(8)
>>> measurement = np.array([1, 1, 1, 1])
>>> new_mean, new_covariance = kf.update(mean, covariance, measurement)
"""
projected_mean, projected_cov = self.project(mean, covariance)
chol_factor, lower = scipy.linalg.cho_factor(projected_cov, lower=True, check_finite=False)
kalman_gain = scipy.linalg.cho_solve(
(chol_factor, lower), np.dot(covariance, self._update_mat.T).T, check_finite=False
).T
innovation = measurement - projected_mean
new_mean = mean + np.dot(innovation, kalman_gain.T)
new_covariance = covariance - np.linalg.multi_dot((kalman_gain, projected_cov, kalman_gain.T))
return new_mean, new_covariance
def gating_distance(
self,
mean: np.ndarray,
covariance: np.ndarray,
measurements: np.ndarray,
only_position: bool = False,
metric: str = "maha",
) -> np.ndarray:
"""
Compute gating distance between state distribution and measurements.
A suitable distance threshold can be obtained from `chi2inv95`. If `only_position` is False, the chi-square
distribution has 4 degrees of freedom, otherwise 2.
Args:
mean (np.ndarray): Mean vector over the state distribution (8 dimensional).
covariance (np.ndarray): Covariance of the state distribution (8x8 dimensional).
measurements (np.ndarray): An (N, 4) matrix of N measurements, each in format (x, y, a, h) where (x, y) is the
bounding box center position, a the aspect ratio, and h the height.
only_position (bool, optional): If True, distance computation is done with respect to box center position only.
metric (str, optional): The metric to use for calculating the distance. Options are 'gaussian' for the squared
Euclidean distance and 'maha' for the squared Mahalanobis distance.
Returns:
(np.ndarray): Returns an array of length N, where the i-th element contains the squared distance between
(mean, covariance) and `measurements[i]`.
Examples:
Compute gating distance using Mahalanobis metric:
>>> kf = KalmanFilterXYAH()
>>> mean = np.array([0, 0, 1, 1, 0, 0, 0, 0])
>>> covariance = np.eye(8)
>>> measurements = np.array([[1, 1, 1, 1], [2, 2, 1, 1]])
>>> distances = kf.gating_distance(mean, covariance, measurements, only_position=False, metric="maha")
"""
mean, covariance = self.project(mean, covariance)
if only_position:
mean, covariance = mean[:2], covariance[:2, :2]
measurements = measurements[:, :2]
d = measurements - mean
if metric == "gaussian":
return np.sum(d * d, axis=1)
elif metric == "maha":
cholesky_factor = np.linalg.cholesky(covariance)
z = scipy.linalg.solve_triangular(cholesky_factor, d.T, lower=True, check_finite=False, overwrite_b=True)
return np.sum(z * z, axis=0) # square maha
else:
raise ValueError("Invalid distance metric")
class KalmanFilterXYWH(KalmanFilterXYAH):
"""
A KalmanFilterXYWH class for tracking bounding boxes in image space using a Kalman filter.
Implements a Kalman filter for tracking bounding boxes with state space (x, y, w, h, vx, vy, vw, vh), where
(x, y) is the center position, w is the width, h is the height, and vx, vy, vw, vh are their respective velocities.
The object motion follows a constant velocity model, and the bounding box location (x, y, w, h) is taken as a direct
observation of the state space (linear observation model).
Attributes:
_motion_mat (np.ndarray): The motion matrix for the Kalman filter.
_update_mat (np.ndarray): The update matrix for the Kalman filter.
_std_weight_position (float): Standard deviation weight for position.
_std_weight_velocity (float): Standard deviation weight for velocity.
Methods:
initiate: Create a track from an unassociated measurement.
predict: Run the Kalman filter prediction step.
project: Project the state distribution to measurement space.
multi_predict: Run the Kalman filter prediction step in a vectorized manner.
update: Run the Kalman filter correction step.
Examples:
Create a Kalman filter and initialize a track
>>> kf = KalmanFilterXYWH()
>>> measurement = np.array([100, 50, 20, 40])
>>> mean, covariance = kf.initiate(measurement)
>>> print(mean)
>>> print(covariance)
"""
def initiate(self, measurement: np.ndarray):
"""
Create track from unassociated measurement.
Args:
measurement (np.ndarray): Bounding box coordinates (x, y, w, h) with center position (x, y), width, and height.
Returns:
mean (np.ndarray): Mean vector (8 dimensional) of the new track. Unobserved velocities are initialized to 0 mean.
covariance (np.ndarray): Covariance matrix (8x8 dimensional) of the new track.
Examples:
>>> kf = KalmanFilterXYWH()
>>> measurement = np.array([100, 50, 20, 40])
>>> mean, covariance = kf.initiate(measurement)
>>> print(mean)
[100. 50. 20. 40. 0. 0. 0. 0.]
>>> print(covariance)
[[ 4. 0. 0. 0. 0. 0. 0. 0.]
[ 0. 4. 0. 0. 0. 0. 0. 0.]
[ 0. 0. 4. 0. 0. 0. 0. 0.]
[ 0. 0. 0. 4. 0. 0. 0. 0.]
[ 0. 0. 0. 0. 0.25 0. 0. 0.]
[ 0. 0. 0. 0. 0. 0.25 0. 0.]
[ 0. 0. 0. 0. 0. 0. 0.25 0.]
[ 0. 0. 0. 0. 0. 0. 0. 0.25]]
"""
mean_pos = measurement
mean_vel = np.zeros_like(mean_pos)
mean = np.r_[mean_pos, mean_vel]
std = [
2 * self._std_weight_position * measurement[2],
2 * self._std_weight_position * measurement[3],
2 * self._std_weight_position * measurement[2],
2 * self._std_weight_position * measurement[3],
10 * self._std_weight_velocity * measurement[2],
10 * self._std_weight_velocity * measurement[3],
10 * self._std_weight_velocity * measurement[2],
10 * self._std_weight_velocity * measurement[3],
]
covariance = np.diag(np.square(std))
return mean, covariance
def predict(self, mean: np.ndarray, covariance: np.ndarray):
"""
Run Kalman filter prediction step.
Args:
mean (np.ndarray): The 8-dimensional mean vector of the object state at the previous time step.
covariance (np.ndarray): The 8x8-dimensional covariance matrix of the object state at the previous time step.
Returns:
mean (np.ndarray): Mean vector of the predicted state. Unobserved velocities are initialized to 0 mean.
covariance (np.ndarray): Covariance matrix of the predicted state.
Examples:
>>> kf = KalmanFilterXYWH()
>>> mean = np.array([0, 0, 1, 1, 0, 0, 0, 0])
>>> covariance = np.eye(8)
>>> predicted_mean, predicted_covariance = kf.predict(mean, covariance)
"""
std_pos = [
self._std_weight_position * mean[2],
self._std_weight_position * mean[3],
self._std_weight_position * mean[2],
self._std_weight_position * mean[3],
]
std_vel = [
self._std_weight_velocity * mean[2],
self._std_weight_velocity * mean[3],
self._std_weight_velocity * mean[2],
self._std_weight_velocity * mean[3],
]
motion_cov = np.diag(np.square(np.r_[std_pos, std_vel]))
mean = np.dot(mean, self._motion_mat.T)
covariance = np.linalg.multi_dot((self._motion_mat, covariance, self._motion_mat.T)) + motion_cov
return mean, covariance
def project(self, mean: np.ndarray, covariance: np.ndarray):
"""
Project state distribution to measurement space.
Args:
mean (np.ndarray): The state's mean vector (8 dimensional array).
covariance (np.ndarray): The state's covariance matrix (8x8 dimensional).
Returns:
mean (np.ndarray): Projected mean of the given state estimate.
covariance (np.ndarray): Projected covariance matrix of the given state estimate.
Examples:
>>> kf = KalmanFilterXYWH()
>>> mean = np.array([0, 0, 1, 1, 0, 0, 0, 0])
>>> covariance = np.eye(8)
>>> projected_mean, projected_cov = kf.project(mean, covariance)
"""
std = [
self._std_weight_position * mean[2],
self._std_weight_position * mean[3],
self._std_weight_position * mean[2],
self._std_weight_position * mean[3],
]
innovation_cov = np.diag(np.square(std))
mean = np.dot(self._update_mat, mean)
covariance = np.linalg.multi_dot((self._update_mat, covariance, self._update_mat.T))
return mean, covariance + innovation_cov
def multi_predict(self, mean: np.ndarray, covariance: np.ndarray):
"""
Run Kalman filter prediction step (Vectorized version).
Args:
mean (np.ndarray): The Nx8 dimensional mean matrix of the object states at the previous time step.
covariance (np.ndarray): The Nx8x8 covariance matrix of the object states at the previous time step.
Returns:
mean (np.ndarray): Mean matrix of the predicted states with shape (N, 8).
covariance (np.ndarray): Covariance matrix of the predicted states with shape (N, 8, 8).
Examples:
>>> mean = np.random.rand(5, 8) # 5 objects with 8-dimensional state vectors
>>> covariance = np.random.rand(5, 8, 8) # 5 objects with 8x8 covariance matrices
>>> kf = KalmanFilterXYWH()
>>> predicted_mean, predicted_covariance = kf.multi_predict(mean, covariance)
"""
std_pos = [
self._std_weight_position * mean[:, 2],
self._std_weight_position * mean[:, 3],
self._std_weight_position * mean[:, 2],
self._std_weight_position * mean[:, 3],
]
std_vel = [
self._std_weight_velocity * mean[:, 2],
self._std_weight_velocity * mean[:, 3],
self._std_weight_velocity * mean[:, 2],
self._std_weight_velocity * mean[:, 3],
]
sqr = np.square(np.r_[std_pos, std_vel]).T
motion_cov = [np.diag(sqr[i]) for i in range(len(mean))]
motion_cov = np.asarray(motion_cov)
mean = np.dot(mean, self._motion_mat.T)
left = np.dot(self._motion_mat, covariance).transpose((1, 0, 2))
covariance = np.dot(left, self._motion_mat.T) + motion_cov
return mean, covariance
def update(self, mean: np.ndarray, covariance: np.ndarray, measurement: np.ndarray):
"""
Run Kalman filter correction step.
Args:
mean (np.ndarray): The predicted state's mean vector (8 dimensional).
covariance (np.ndarray): The state's covariance matrix (8x8 dimensional).
measurement (np.ndarray): The 4 dimensional measurement vector (x, y, w, h), where (x, y) is the center
position, w the width, and h the height of the bounding box.
Returns:
new_mean (np.ndarray): Measurement-corrected state mean.
new_covariance (np.ndarray): Measurement-corrected state covariance.
Examples:
>>> kf = KalmanFilterXYWH()
>>> mean = np.array([0, 0, 1, 1, 0, 0, 0, 0])
>>> covariance = np.eye(8)
>>> measurement = np.array([0.5, 0.5, 1.2, 1.2])
>>> new_mean, new_covariance = kf.update(mean, covariance, measurement)
"""
return super().update(mean, covariance, measurement)

View File

@@ -0,0 +1,157 @@
# Ultralytics 🚀 AGPL-3.0 License - https://ultralytics.com/license
import numpy as np
import scipy
from scipy.spatial.distance import cdist
from ultralytics.utils.metrics import batch_probiou, bbox_ioa
try:
import lap # for linear_assignment
assert lap.__version__ # verify package is not directory
except (ImportError, AssertionError, AttributeError):
from ultralytics.utils.checks import check_requirements
check_requirements("lap>=0.5.12") # https://github.com/gatagat/lap
import lap
def linear_assignment(cost_matrix: np.ndarray, thresh: float, use_lap: bool = True):
"""
Perform linear assignment using either the scipy or lap.lapjv method.
Args:
cost_matrix (np.ndarray): The matrix containing cost values for assignments, with shape (N, M).
thresh (float): Threshold for considering an assignment valid.
use_lap (bool): Use lap.lapjv for the assignment. If False, scipy.optimize.linear_sum_assignment is used.
Returns:
matched_indices (np.ndarray): Array of matched indices of shape (K, 2), where K is the number of matches.
unmatched_a (np.ndarray): Array of unmatched indices from the first set, with shape (L,).
unmatched_b (np.ndarray): Array of unmatched indices from the second set, with shape (M,).
Examples:
>>> cost_matrix = np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]])
>>> thresh = 5.0
>>> matched_indices, unmatched_a, unmatched_b = linear_assignment(cost_matrix, thresh, use_lap=True)
"""
if cost_matrix.size == 0:
return np.empty((0, 2), dtype=int), tuple(range(cost_matrix.shape[0])), tuple(range(cost_matrix.shape[1]))
if use_lap:
# Use lap.lapjv
# https://github.com/gatagat/lap
_, x, y = lap.lapjv(cost_matrix, extend_cost=True, cost_limit=thresh)
matches = [[ix, mx] for ix, mx in enumerate(x) if mx >= 0]
unmatched_a = np.where(x < 0)[0]
unmatched_b = np.where(y < 0)[0]
else:
# Use scipy.optimize.linear_sum_assignment
# https://docs.scipy.org/doc/scipy/reference/generated/scipy.optimize.linear_sum_assignment.html
x, y = scipy.optimize.linear_sum_assignment(cost_matrix) # row x, col y
matches = np.asarray([[x[i], y[i]] for i in range(len(x)) if cost_matrix[x[i], y[i]] <= thresh])
if len(matches) == 0:
unmatched_a = list(np.arange(cost_matrix.shape[0]))
unmatched_b = list(np.arange(cost_matrix.shape[1]))
else:
unmatched_a = list(frozenset(np.arange(cost_matrix.shape[0])) - frozenset(matches[:, 0]))
unmatched_b = list(frozenset(np.arange(cost_matrix.shape[1])) - frozenset(matches[:, 1]))
return matches, unmatched_a, unmatched_b
def iou_distance(atracks: list, btracks: list) -> np.ndarray:
"""
Compute cost based on Intersection over Union (IoU) between tracks.
Args:
atracks (list[STrack] | list[np.ndarray]): List of tracks 'a' or bounding boxes.
btracks (list[STrack] | list[np.ndarray]): List of tracks 'b' or bounding boxes.
Returns:
(np.ndarray): Cost matrix computed based on IoU with shape (len(atracks), len(btracks)).
Examples:
Compute IoU distance between two sets of tracks
>>> atracks = [np.array([0, 0, 10, 10]), np.array([20, 20, 30, 30])]
>>> btracks = [np.array([5, 5, 15, 15]), np.array([25, 25, 35, 35])]
>>> cost_matrix = iou_distance(atracks, btracks)
"""
if atracks and isinstance(atracks[0], np.ndarray) or btracks and isinstance(btracks[0], np.ndarray):
atlbrs = atracks
btlbrs = btracks
else:
atlbrs = [track.xywha if track.angle is not None else track.xyxy for track in atracks]
btlbrs = [track.xywha if track.angle is not None else track.xyxy for track in btracks]
ious = np.zeros((len(atlbrs), len(btlbrs)), dtype=np.float32)
if len(atlbrs) and len(btlbrs):
if len(atlbrs[0]) == 5 and len(btlbrs[0]) == 5:
ious = batch_probiou(
np.ascontiguousarray(atlbrs, dtype=np.float32),
np.ascontiguousarray(btlbrs, dtype=np.float32),
).numpy()
else:
ious = bbox_ioa(
np.ascontiguousarray(atlbrs, dtype=np.float32),
np.ascontiguousarray(btlbrs, dtype=np.float32),
iou=True,
)
return 1 - ious # cost matrix
def embedding_distance(tracks: list, detections: list, metric: str = "cosine") -> np.ndarray:
"""
Compute distance between tracks and detections based on embeddings.
Args:
tracks (list[STrack]): List of tracks, where each track contains embedding features.
detections (list[BaseTrack]): List of detections, where each detection contains embedding features.
metric (str): Metric for distance computation. Supported metrics include 'cosine', 'euclidean', etc.
Returns:
(np.ndarray): Cost matrix computed based on embeddings with shape (N, M), where N is the number of tracks
and M is the number of detections.
Examples:
Compute the embedding distance between tracks and detections using cosine metric
>>> tracks = [STrack(...), STrack(...)] # List of track objects with embedding features
>>> detections = [BaseTrack(...), BaseTrack(...)] # List of detection objects with embedding features
>>> cost_matrix = embedding_distance(tracks, detections, metric="cosine")
"""
cost_matrix = np.zeros((len(tracks), len(detections)), dtype=np.float32)
if cost_matrix.size == 0:
return cost_matrix
det_features = np.asarray([track.curr_feat for track in detections], dtype=np.float32)
# for i, track in enumerate(tracks):
# cost_matrix[i, :] = np.maximum(0.0, cdist(track.smooth_feat.reshape(1,-1), det_features, metric))
track_features = np.asarray([track.smooth_feat for track in tracks], dtype=np.float32)
cost_matrix = np.maximum(0.0, cdist(track_features, det_features, metric)) # Normalized features
return cost_matrix
def fuse_score(cost_matrix: np.ndarray, detections: list) -> np.ndarray:
"""
Fuse cost matrix with detection scores to produce a single similarity matrix.
Args:
cost_matrix (np.ndarray): The matrix containing cost values for assignments, with shape (N, M).
detections (list[BaseTrack]): List of detections, each containing a score attribute.
Returns:
(np.ndarray): Fused similarity matrix with shape (N, M).
Examples:
Fuse a cost matrix with detection scores
>>> cost_matrix = np.random.rand(5, 10) # 5 tracks and 10 detections
>>> detections = [BaseTrack(score=np.random.rand()) for _ in range(10)]
>>> fused_matrix = fuse_score(cost_matrix, detections)
"""
if cost_matrix.size == 0:
return cost_matrix
iou_sim = 1 - cost_matrix
det_scores = np.array([det.score for det in detections])
det_scores = np.expand_dims(det_scores, axis=0).repeat(cost_matrix.shape[0], axis=0)
fuse_sim = iou_sim * det_scores
return 1 - fuse_sim # fuse_cost