import os
import sys
import torch
import logging
import numpy as np
import pandas as pd
import numpy as np
from PIL import Image
from tqdm import tqdm
from pathlib import Path
from argparse import Namespace
from scipy.signal import medfilt, savgol_filter
from scipy.interpolate import interp1d
from .base_dataset import BaseDataset, EmptyDatasetError, RasterizedMap
from ..utils.homography import calc_homography_mat, affine_transformation, image_to_world
from typing import List
_logger = logging.getLogger(__name__)
[docs]
class SDDDataset(BaseDataset):
"""
Stanford Drone Dataset (SDD) loader.
This dataset contains top-down drone footage with multiple categories such
as pedestrians, bicycles, skaters, and vehicles.
"""
raw_fps = 30
[docs]
@classmethod
def load_data(cls, args: Namespace, data_path: str) -> "SDDDataset":
"""
Load data for a single SDD video scene.
Read `annotations.txt`, convert coordinates from pixels to meters,
apply nontrivial data-cleaning logic such as abnormal-speed removal and
trajectory smoothing, and load `map.png` if it exists; otherwise create
a blank map.
Args:
args (Namespace): Global arguments.
data_path (str): Path to `annotations.txt`.
Returns:
SDDDataset: Initialized dataset instance.
"""
data_path = Path(data_path)
name = (
data_path.parent.parent.name
+ "-"
+ data_path.parent.name.removeprefix("video")
)
## Check cache.
cache_path = cls._make_cache_path(args, str(data_path), name)
if args.cache_dataset and os.path.exists(cache_path):
_logger.info(f"Loading cached dataset from {cache_path}")
try:
dataset = cls.load_cache(cache_path)
if len(dataset) == 0: # If it can be read but is empty, regenerating it would still be empty, so fail early.
raise EmptyDatasetError(f"Cached dataset {cache_path} is empty.")
cls.collate_fn([dataset[0]]) # Sanity-check that the dataset is usable.
return dataset
except Exception as e:
_logger.error(f"Failed to use cached dataset {cache_path}: {e}")
if not data_path.exists():
raise FileNotFoundError(f"Data path {data_path} not found.")
## Read raw data.
df_data = pd.read_csv(
data_path,
sep=" ",
names=[
"id", "xmin", "ymin", "xmax", "ymax",
"frame", "lost", "occluded", "generated", "label",
],
usecols=["id", "xmin", "ymin", "xmax", "ymax", "frame", "label"],
)
df_data["x"] = (df_data["xmin"] + df_data["xmax"]) / 2
df_data["y"] = (df_data["ymin"] + df_data["ymax"]) / 2
df_data = df_data.rename(columns={"frame": "f", "label": "type"})
df_data['type'] = df_data['type'].replace({
'Pedestrian': 'pedestrian',
'Skater': 'vehicle',
'Biker': 'vehicle',
'Cart': 'vehicle',
'Car': 'vehicle',
'Bus': 'vehicle',
})
df_data = df_data[['f', 'id', 'x', 'y', 'type']] # First dimension points right, second dimension points down.
## Apply the affine transformation.
H = cls.get_homography_mat(data_path=data_path)
df_data[['x', 'y']] = affine_transformation(df_data[['x', 'y']].values, H)
## Clean data.
df_list = []
for pid, group in df_data.groupby('id'):
if group['type'].nunique() > 1:
raise ValueError(f"ID {pid} has multiple types: {group['type'].unique()}")
if group.iloc[0]['type'] != 'pedestrian':
df_list.append(group.assign(id=len(df_list))) # Reassign IDs.
else:
traj = group[['f', 'x', 'y']].values
traj[:, 0] /= args.fps # convert to seconds
cleaned_segs = cls.clean_trajectory(
traj,
median_k=5,
do_savgol=True, sg_win=9, sg_poly=2,
do_loess=False, loess_frac=0.08,
speed_window=11, speed_k=5.0,
angle_window=11, angle_k=5.0,
theta_thresh=np.deg2rad(90.0),
split_angle_thresh=np.deg2rad(120.0),
split_speed_delta=0.5,
split_time_thresh=1.0
)
for seg in cleaned_segs:
seg[:, 0] *= args.fps # convert back to frame index
df_list.append(pd.DataFrame(seg, columns=['f', 'x', 'y']).assign(id=len(df_list), type='pedestrian'))
df_data = pd.concat(df_list, ignore_index=True)
## Resample data.
df_data = cls.resample_dataframe(df_data, raw_fps=cls.raw_fps, target_fps=args.fps)
## Load map.
map_path = data_path.parent / f"map.png"
if map_path.exists():
image = np.array(Image.open(map_path).convert('L')) / 255.0 # (H, W), first dimension downward and second dimension rightward.
image_ = image.T # Transpose so the first dimension points right and the second points down, aligning with the dataframe coordinates.
map, xmin, xmax, ymin, ymax = image_to_world(image_, H, dot_per_meter=args.dot_per_meter) # First dimension rightward and second upward, i.e. xy coordinates.
else:
dot_per_meter = args.dot_per_meter
xmin, xmax = df_data['x'].min(), df_data['x'].max()
ymin, ymax = df_data['y'].min(), df_data['y'].max()
len_x = len(np.arange(xmin, xmax+1/dot_per_meter, 1/dot_per_meter)[:-1])
len_y = len(np.arange(ymin, ymax+1/dot_per_meter, 1/dot_per_meter)[:-1])
map = np.full((len_x, len_y), np.nan)
map_data = RasterizedMap(map=map, xmin=xmin, xmax=xmax, ymin=ymin, ymax=ymax)
## Normalize coordinates.
df_data, map_data = cls.normalize_xy(df_data, map_data)
## Build dataset object.
dataset = cls(name=name, args=args, df_data=df_data, map_data=map_data)
## Save cache.
cache_path = cls._make_cache_path(args, str(data_path), name)
_logger.info(f"Caching dataset to {cache_path}")
cls.save_cache(dataset, cache_path)
return dataset
[docs]
@classmethod
def load_data_batch(cls, args: Namespace, data_path: str, show_tqdm=True) -> List["SDDDataset"]:
"""Batch-load SDD datasets."""
name = '-'.join(Path(data_path).relative_to('./data').parts)
cache_path = Path('./data/.cache') / f"{name}.pkl"
try:
assert args.cache_dataset, f"Cache disabled"
assert cache_path.exists(), f"Cache {cache_path} not found"
_logger.info(f"Loading cached dataset-list from {cache_path}")
files = cls.load_cache(cache_path)
except Exception as e:
_logger.info(f"Failed to load cached dataset-list from {cache_path} since: {e}")
data_path = Path(data_path)
if data_path.is_dir():
files = list(sorted(data_path.glob("**/annotations.txt")))
elif "*" in str(data_path):
if data_path.is_absolute():
data_path = data_path.relative_to(".")
files = list(sorted(Path(".").glob(data_path)))
else:
files = [data_path]
_logger.info(f"Caching dataset-list to {cache_path}")
cls.save_cache(files, cache_path)
datasets = []
pbar = tqdm(files, disable=not show_tqdm, desc="Loading SDD datasets")
for file in pbar:
pbar.set_postfix_str(file.parent.parent.name + "/" + file.parent.name)
datasets.append(cls.load_data(args, file))
datasets[-1].path = str(file)
return datasets
[docs]
@staticmethod
def get_homography_mat(data_path):
"""
Get the homography matrix from the dataset directory structure,
effectively capturing the pixel-to-meter ratio.
Different SDD scenes such as `bookstore` and `deathCircle` use
different scale factors.
Args:
data_path (Path): Data file path, used to infer the scene name.
Returns:
np.ndarray: 3x3 scaling matrix, effectively diagonal in this case.
"""
image0 = np.array(Image.open(data_path.parent / 'reference.jpg'))
meter_per_pixel_dict = {
'bookstore': {
'video0': 36 / 1080, 'video1': 36 / 1080, 'video2': 39 / 1080,
'video3': 53 / 1080, 'video4': 54 / 1080,
'video5': 34 / 1080, 'video6': 49 / 1080,
},
'coupa': {
'video0': 30 / 1080, 'video1': 31 / 1080,
'video2': 33 / 1080, 'video3': 33 / 1080,
},
'deathCircle': {
'video0': 60 / 904, 'video1': 77 / 1080, 'video2': 82 / 1080,
'video3': 55 / 1080, 'video4': 77 / 1080,
},
'gates': {
'video0': 32 / 728, 'video1': 35 / 780, 'video2': 43 / 728,
'video3': 31 / 772, 'video4': 40 / 784, 'video5': 51 / 1080,
'video6': 32 / 712, 'video7': 37 / 728, 'video8': 32 / 728,
},
'hyang': {
'video0': 45 / 816, 'video1': 60 / 780, 'video2': 43 / 842,
'video3': 70 / 1434, 'video4': 41 / 836, 'video5': 40 / 788,
'video6': 71 / 1416, 'video7': 46 / 808, 'video8': 41 / 752,
'video9': 62 / 1080, 'video10': 36 / 748, 'video11': 36 / 748,
'video12': 44 / 848, 'video13': 39 / 748, 'video14': 39 / 748,
},
'little': {
'video0': 42/1080, 'video1': 40/1080,
'video2': 40/1080, 'video3': 40/1080,
},
'nexus': {
'video0': 60 / 740, 'video1': 45 / 796, 'video2': 43 / 716,
'video3': 37 / 716, 'video4': 39 / 740, 'video5': 38 / 728,
'video6': 42 / 788, 'video7': 38 / 728, 'video8': 36 / 732,
'video9': 37 / 788, 'video10': 33 / 732, 'video11': 42 / 772,
},
'quad': {
'video0': 56 / 1968, 'video1': 58 / 1968,
'video2': 58 / 1968, 'video3': 60 / 1968,
}
}
ratio = meter_per_pixel_dict[data_path.parent.parent.name][data_path.parent.name]
h, w, _ = image0.shape
H = calc_homography_mat(
np.array([[0, h], [w, h], [0, 0], [w, 0]]),
np.array([[0, 0], [w, 0], [0, h], [w, h]]) * ratio,
)
return H
[docs]
@staticmethod
def clean_trajectory(traj,
median_k=5,
do_savgol=True, sg_win=9, sg_poly=2,
do_loess=False, loess_frac=0.08,
speed_window=11, speed_k=5.0, # for MAD threshold
angle_window=11, angle_k=5.0, # for MAD threshold
theta_thresh=np.deg2rad(90.0),
split_angle_thresh=np.deg2rad(120.0),
split_speed_delta=0.5,
split_time_thresh=1.0):
"""
Static utility method for trajectory cleaning and smoothing.
Denoise and smooth the raw trajectory, using Savitzky-Golay or LOESS,
detect speed and angle anomalies, and split the trajectory into
reasonable segments.
Args:
traj (np.ndarray): Raw trajectory `(N, 3)` with columns `[t, x, y]`.
median_k (int): Median-filter window size.
do_savgol (bool): Whether to use Savitzky-Golay filtering.
... (other smoothing and splitting threshold parameters)
Returns:
List[np.ndarray]: List of cleaned trajectory segments.
"""
# 1. sort & unique
traj = np.array(traj)
order = np.argsort(traj[:,0])
traj = traj[order]
# unique by time
_, idx = np.unique(traj[:,0], return_index=True)
traj = traj[idx]
t = traj[:,0]
x = traj[:,1].copy()
y = traj[:,2].copy()
N = len(t)
if N < 4:
return []
# 2. median filter to remove spikes (operate on x,y separately)
if median_k is not None and median_k >= 3:
# medfilt requires odd kernel and pads; use same length
k = median_k if median_k%2==1 else median_k+1
x = medfilt(x, kernel_size=k)
y = medfilt(y, kernel_size=k)
# 3. regularize if needed: SG requires at least window length points and evenly sampled
if do_savgol:
if sg_win >= N:
sg_win = N-1 if (N-1)%2==1 else N-2
if sg_win < 3:
do_savgol = False
if do_savgol:
try:
x = savgol_filter(x, window_length=sg_win, polyorder=sg_poly, mode='interp')
y = savgol_filter(y, window_length=sg_win, polyorder=sg_poly, mode='interp')
except Exception:
do_savgol = False
# 4. LOESS as alternative (overrides SG if requested and available)
if do_loess:
try:
from statsmodels.nonparametric.smoothers_lowess import lowess
x = lowess(x, t, frac=loess_frac, return_sorted=False)
y = lowess(y, t, frac=loess_frac, return_sorted=False)
except Exception:
pass
# 5. compute velocities and directions
dt = np.diff(t)
vxs = np.diff(x) / dt
vys = np.diff(y) / dt
speeds = np.sqrt(vxs**2 + vys**2) # length N-1
# pad to length N for per-point measures (associate speed[i] as speed from i->i+1)
speeds_p = np.concatenate(([speeds[0]], speeds))
angles = np.arctan2(np.concatenate(([vys[0]], vys)), np.concatenate(([vxs[0]], vxs)))
# 6. local median & MAD on speed and on angle differences
def rolling_median_mad(arr, window):
# returns center-aligned median and MAD arrays (nan at edges)
n = len(arr)
med = np.full(n, np.nan)
mad = np.full(n, np.nan)
hw = window // 2
for i in range(n):
L = max(0, i - hw)
R = min(n, i + hw + 1)
w = arr[L:R]
med[i] = np.median(w)
mad[i] = np.median(np.abs(w - med[i])) if len(w)>0 else np.nan
return med, mad
med_speed, mad_speed = rolling_median_mad(speeds_p, speed_window)
# robust threshold
mad_eps = 1.4826 * mad_speed
# indicator of speed anomaly (use k * MAD)
speed_diff = np.abs(speeds_p - med_speed)
is_speed_anom = (speed_diff > speed_k * mad_eps) | np.isnan(speed_diff)
# angle change anomaly: look at difference between local median angles
def circular_diff(a, b):
d = a - b
d = (d + np.pi) % (2*np.pi) - np.pi
return np.abs(d)
# compute local median angle
med_angle, mad_angle = rolling_median_mad(angles, angle_window)
# angle difference to median
angle_diff = circular_diff(angles, med_angle)
# indicator of speed anomaly (use k * MAD)
mad_eps = 1.4826 * mad_angle
# Mark points with excessively large turning angles as anomalies.
delta_theta = circular_diff(angles[:-2], angles[2:]) # length N-1
delta_theta = np.concatenate(([0.0], delta_theta, [0.0])) # pad to length N
is_angle_anom = (angle_diff > angle_k * mad_eps) | np.isnan(angle_diff) | (delta_theta > theta_thresh)
# combine anomalies
is_point_anom = is_speed_anom | is_angle_anom
# 7. group consecutive anomalies into segments
segments = []
i = 0
while i < N:
if not is_point_anom[i]:
i += 1
continue
# start a candidate anomaly region
j = i
while j+1 < N and is_point_anom[j+1]:
j += 1
# region [i..j]
L = max(0, i-3) # context window
R = min(N-1, j+3)
# compute med speed/angle left vs right
left_idx = range(L, i)
right_idx = range(j+1, R+1)
if len(left_idx) == 0 or len(right_idx) == 0:
# edge case near start/end -> prefer split or trim
# if small region, remove by interpolation
do_split = False if (j-i)<=1 else True
else:
v_left = np.median(speeds_p[list(left_idx)])
v_right = np.median(speeds_p[list(right_idx)])
theta_left = np.median(angles[list(left_idx)])
theta_right = np.median(angles[list(right_idx)])
# decide
do_split = (abs(v_left - v_right) > split_speed_delta) or (circular_diff(theta_left, theta_right) > split_angle_thresh) or ((t[j] - t[i]) > split_time_thresh)
if do_split:
# cut trajectory: flush segment before i (if any) and start new segment after j
# we add the clean chunk from previous pointer up to i-1 as a segment
segments.append(('split', i, j))
else:
segments.append(('interp', i, j))
i = j+1
# 8. apply interpolation / splitting to produce final segments
# Strategy: accumulate indices to include in each output segment
cut_points = set()
to_remove_intervals = []
for typ, a, b in segments:
if typ == 'split':
# indicate a split between a-1 and a, and between b and b+1
cut_points.add(a) # split before index a
cut_points.add(b+1) # split before b+1 (i.e., after b)
else:
# interpolation: mark indices a..b for removal and interpolation
to_remove_intervals.append((a,b))
# build segments by cutting at cut_points (sorted)
cuts = sorted(list(cut_points))
# always include start 0 and end N
boundaries = [0] + cuts + [N]
final_segments = []
for sidx in range(len(boundaries)-1):
st = boundaries[sidx]
ed = boundaries[sidx+1]
# segment indices [st, ed)
if ed - st < 2:
continue
seg_t = t[st:ed].copy()
seg_x = x[st:ed].copy()
seg_y = y[st:ed].copy()
final_segments.append(np.stack((seg_t, seg_x, seg_y), axis=1))
# now apply interpolation removals within each final segment
def apply_interps(segment):
seg_t = segment[:,0]
seg_x = segment[:,1]
seg_y = segment[:,2]
# find intervals fully contained in this segment
base_start = seg_t[0]
base_end = seg_t[-1]
keep_mask = np.ones(len(seg_t), dtype=bool)
for a,b in to_remove_intervals:
# check if [a..b] indices map into this segment
# map by time: if any times in segment correspond to indices a..b in original
# original times are t
idxs = np.where((t >= t[a]) & (t <= t[b]))[0]
if len(idxs)==0:
continue
# mark these indices for removal
rel = np.where((seg_t >= t[a]) & (seg_t <= t[b]))[0]
keep_mask[rel] = False
if np.all(keep_mask):
return segment
# else, build new t/x/y with interpolation across removed points
new_t = seg_t[keep_mask]
new_x = seg_x[keep_mask]
new_y = seg_y[keep_mask]
# But we must ensure we can interpolate across gaps: check if gap endpoints exist
# Use linear interpolation on time
f_x = interp1d(new_t, new_x, kind='linear', bounds_error=False, fill_value="extrapolate")
f_y = interp1d(new_t, new_y, kind='linear', bounds_error=False, fill_value="extrapolate")
full_x = f_x(seg_t)
full_y = f_y(seg_t)
return np.stack((seg_t, full_x, full_y), axis=1)
cleaned = [apply_interps(seg) for seg in final_segments]
# filter tiny segments (<2 points)
cleaned = [c for c in cleaned if len(c) >= 3]
cleaned = [c for c in cleaned if np.sum(np.sqrt(np.sum(np.square(np.diff(c[:, 1:], axis=0)), axis=1))) > 3.0] # remove near-constant segments
return cleaned