Skip to content

Commit

Permalink
Support Realigning Data Frames and Video Frames (#317)
Browse files Browse the repository at this point in the history
* Renamed `DynamicProcessor` to `Preprocessor`. (#308)

* Renamed `StaticProcessor` to `Processor`. (#308)

* Bug fixed: missed unit cache. (#308)

* Supported reversing datasets. (#308)

* Code reformatted. (#308)

* Added `realign_visual_data()` but not completed. (#308)

* Bug fixed: dataset may not be loaded. (#308)

* Removed `reverse()` because Pandas does not support it. (#308)

* Removed `reverse()` because Pandas does not support it. (#308)

* Supported specifying required header in the constructor of `Inference`. (#308)

* Code reformatted. (#308)

* Bug fixed: wrong header. (#305) (#308)

* Added `VisualDataRealignmentByLatency`. (#308)
  • Loading branch information
ATATC authored Jul 25, 2024
1 parent 731c270 commit f8dc1f8
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 14 deletions.
2 changes: 1 addition & 1 deletion leads/data_persistence/analyzer/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from leads.data_persistence.analyzer.dynamic import *
from leads.data_persistence.analyzer.inference import *
from leads.data_persistence.analyzer.jarvis import *
from leads.data_persistence.analyzer.preprocess import *
from leads.data_persistence.analyzer.utils import *
42 changes: 35 additions & 7 deletions leads/data_persistence/analyzer/inference.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,31 @@
from abc import ABCMeta as _ABCMeta, abstractmethod as _abstractmethod
from typing import Any as _Any, override as _override, Generator as _Generator
from typing import Any as _Any, override as _override, Generator as _Generator, Literal as _Literal

from leads.data_persistence.analyzer.utils import time_invalid, speed_invalid, acceleration_invalid, \
mileage_invalid, latitude_invalid, longitude_invalid, distance_between
from leads.data_persistence.core import CSVDataset, DEFAULT_HEADER
from leads.data_persistence.core import CSVDataset, DEFAULT_HEADER, VISUAL_HEADER_ONLY


class Inference(object, metaclass=_ABCMeta):
def __init__(self, required_depth: tuple[int, int] = (0, 0)) -> None:
def __init__(self, required_depth: tuple[int, int] = (0, 0),
required_header: tuple[str, ...] = DEFAULT_HEADER) -> None:
"""
Declare the scale of data this inference requires.
:param required_depth: (-depth backward, depth forward)
:param required_header: the necessary header that the dataset must contain for this inference to work
"""
self._required_depth: tuple[int, int] = required_depth
self._required_header: tuple[str, ...] = required_header

def depth(self) -> tuple[int, int]:
"""
:return: (-depth backward, depth forward)
"""
return self._required_depth

def header(self) -> tuple[str, ...]:
return self._required_header

@_abstractmethod
def complete(self, *rows: dict[str, _Any], backward: bool = False) -> dict[str, _Any] | None:
"""
Expand All @@ -45,7 +51,7 @@ class SafeSpeedInference(SpeedInferenceBase):
"""

def __init__(self) -> None:
super().__init__((0, 0))
super().__init__()

@_override
def complete(self, *rows: dict[str, _Any], backward: bool = False) -> dict[str, _Any] | None:
Expand Down Expand Up @@ -111,7 +117,7 @@ class SpeedInferenceByGPSGroundSpeed(SpeedInferenceBase):
"""

def __init__(self) -> None:
super().__init__((0, 0))
super().__init__()

@_override
def complete(self, *rows: dict[str, _Any], backward: bool = False) -> dict[str, _Any] | None:
Expand Down Expand Up @@ -225,6 +231,27 @@ def complete(self, *rows: dict[str, _Any], backward: bool = False) -> dict[str,
}


class VisualDataRealignmentByLatency(Inference):
def __init__(self, *channels: _Literal["front", "left", "right", "rear"]) -> None:
super().__init__((0, 1), VISUAL_HEADER_ONLY)
self._channels: tuple[_Literal["front", "left", "right", "rear"], ...] = channels if channels else (
"front", "left", "right", "rear")

@_override
def complete(self, *rows: dict[str, _Any], backward: bool = False) -> dict[str, _Any] | None:
if backward:
return None
target, base = rows
original_target = target.copy()
t_0, t = target["t"], base["t"]
for channel in self._channels:
if (new_latency := t_0 - t + base[f"{channel}_view_latency"]) > 0:
continue
target[f"{channel}_view_base64"] = base[f"{channel}_view_base64"]
target[f"{channel}_view_latency"] = new_latency
return None if target == original_target else target


class InferredDataset(CSVDataset):
def __init__(self, file: str, chunk_size: int = 100) -> None:
super().__init__(file, chunk_size)
Expand Down Expand Up @@ -291,8 +318,9 @@ def complete(self, *inferences: Inference, enhanced: bool = False, assume_initia
:param enhanced: True: use inferred data to infer other data; False: use only raw data to infer other data
:param assume_initial_zeros: True: reasonably set any missing data in the first row to zero; False: no change
"""
if DEFAULT_HEADER in self.read_header():
raise KeyError("Your dataset must include the default header")
for inference in inferences:
if not set(rh := inference.header()).issubset(ah := self.read_header()):
raise KeyError(f"Inference {inference} requires header {rh} but the dataset only contains {ah}")
if assume_initial_zeros:
self.assume_initial_zeros()
self._complete(inferences, enhanced, False)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from .._computational import array as _array, ndarray as _ndarray


class DynamicProcessor(object):
class Preprocessor(object):
def __init__(self, data_seq: _Sequence[dict[str, _Any]]) -> None:
self._data_seq: _Sequence[dict[str, _Any]] = data_seq

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from .._computational import sqrt as _sqrt


class StaticProcessor(object):
class Processor(object):
def __init__(self, dataset: CSVDataset) -> None:
if DEFAULT_HEADER in dataset.read_header():
raise KeyError("Your dataset must include the default header")
Expand Down Expand Up @@ -54,6 +54,7 @@ def __init__(self, dataset: CSVDataset) -> None:
self._lap_d: list[float] = []
self._max_lap_x: float | None = None
self._max_lap_y: float | None = None
self._required_time: int = 0

def dataset(self) -> CSVDataset:
return self._dataset
Expand Down Expand Up @@ -119,7 +120,7 @@ def baking_results(self) -> tuple[str, str, str, str, str, str, str, str, str, s
return (
f"Baked {self._valid_rows_count} / {self._read_rows_count} ROWS",
f"Baking Rate: {100 * self._valid_rows_count / self._read_rows_count:.2f}%",
f"Skipped Rows: {StaticProcessor._hide_others(self._invalid_rows, 5)}",
f"Skipped Rows: {Processor._hide_others(self._invalid_rows, 5)}",
f"Start Time: {_datetime.fromtimestamp(self._start_time * .001).strftime("%Y-%m-%d %H:%M:%S")}",
f"End Time: {_datetime.fromtimestamp(self._end_time * .001).strftime("%Y-%m-%d %H:%M:%S")}",
f"Duration: {format_duration(self._duration * .001)}",
Expand All @@ -128,7 +129,7 @@ def baking_results(self) -> tuple[str, str, str, str, str, str, str, str, str, s
f"v\u2098\u2090\u2093: {self._max_speed:.2f} KM / H",
f"v\u2090\u1D65\u1D4D: {self._avg_speed:.2f} KM / H",
f"GPS Hit Rate: {100 * self._gps_valid_count / self._valid_rows_count:.2f}%",
f"GPS Skipped Rows: {StaticProcessor._hide_others(self._gps_invalid_rows, 5)}"
f"GPS Skipped Rows: {Processor._hide_others(self._gps_invalid_rows, 5)}"
)

def erase_unit_cache(self) -> None:
Expand All @@ -138,6 +139,8 @@ def erase_unit_cache(self) -> None:
self._lap_x.clear()
self._lap_y.clear()
self._lap_d.clear()
self._max_lap_x = None
self._max_lap_y = None

def foreach(self, do: _Callable[[dict[str, _Any], int], None], skip_invalid_rows: bool = True,
skip_gps_invalid_rows: bool = False) -> None:
Expand Down
4 changes: 2 additions & 2 deletions leads/data_persistence/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ def write_frame(self, *data: _Any) -> None:
frame[self._header[i]] = d = data[i]
if column := self._columns[i]:
column.append(d)
_DataFrame(data=frame, index=[self._i]).to_csv(self._file, mode="a", header=False)
_DataFrame(frame, [self._i]).to_csv(self._file, mode="a", header=False)
self._i += 1

def close(self) -> None:
Expand Down Expand Up @@ -261,7 +261,7 @@ def close(self) -> None:
)
VISUAL_HEADER_ONLY: tuple[str, str, str, str, str, str, str, str] = (
"front_view_base64", "front_view_latency", "left_view_base64", "left_view_latency", "right_view_base64",
"front_view_latency", "rear_view_base64", "rear_view_latency"
"right_view_latency", "rear_view_base64", "rear_view_latency"
)
VISUAL_HEADER: _VisualHeader = DEFAULT_HEADER + VISUAL_HEADER_ONLY
VISUAL_HEADER_FULL: _VisualHeaderFull = DEFAULT_HEADER_FULL + VISUAL_HEADER_ONLY

0 comments on commit f8dc1f8

Please sign in to comment.