Skip to content

Commit

Permalink
refactor motion models
Browse files Browse the repository at this point in the history
  • Loading branch information
kharitonov-ivan committed Oct 25, 2023
1 parent 443e893 commit 7a3af07
Show file tree
Hide file tree
Showing 10 changed files with 92 additions and 174 deletions.
222 changes: 67 additions & 155 deletions src/motion_models.py
Original file line number Diff line number Diff line change
@@ -1,170 +1,59 @@
import numpy as np

from src.common.state import Gaussian


class MotionModel:
def __init__(self, random_state=None, *args, **kwargs):
def __init__(self, random_state: int, d: int, *args, **kwargs):
self._generator = np.random.RandomState(random_state)
self.d = d

def f(self, state_vector: np.ndarray, dt: float) -> np.ndarray:
"""Calculates next state vector from current"""
return self.F(state_vector, dt) @ state_vector

def f(self, state_vector):
def F(self, state_vector: np.ndarray, dt: float) -> np.ndarray:
"""Represents transition matrix"""
raise NotImplementedError

def move(self, params):
def Q(self, dt: float) -> np.ndarray:
"""Represents motion noise covariance"""
raise NotImplementedError

def __repr__(self) -> str:
raise NotImplementedError


class ConstantVelocityMotionModel(MotionModel):
def __init__(self, dt: float, sigma_q: float, *args, **kwargs):
"""Creates a 2D nearly constant velocity model
Note:
the motion model assumes that the state vector x consists of the following states:
px -> X position
py -> Y position
vx -> X velocity
vy -> Y velocity
Attributes:
d (scalar): object state dimension
F (2 x 2 matrix): function handle return a motion transition function
Q (4 x 4 matrix): motion noise covariance
f (4 x 1 vector): function handle return state prediction
Args:
T (scalar): sampling time
sigma (scalar): standart deviation of motion noise
"""

self.d = 4
self.dt = dt
def __init__(self, random_state: int, sigma_q: float, *args, **kwargs):
super().__init__(random_state, d=4) # 4 states: x, y, vx, vy
self.sigma = sigma_q
super(ConstantVelocityMotionModel, self).__init__(*args, **kwargs)

def f(self, state_vector: np.ndarray, dt: float):
# TODO assert on state
return self._transition_matrix(dt=dt) @ state_vector

def F(self, state_vector: np.ndarray, dt: float):
return self._transition_matrix(dt=dt)

def Q(self, dt):
return self._get_motion_noise_covariance(dt=dt, sigma=self.sigma)

def move(self, state: Gaussian, dt: float = None, if_noisy: bool = False) -> Gaussian:
dt = self.dt if dt is None else dt
assert isinstance(dt, float)
assert isinstance(state, Gaussian), f"Argument of wrong type! Type ={type}"
if if_noisy:
next_state = Gaussian(
x=self._generator.multivariate_normal(
mean=self._transition_matrix(dt=dt) @ state.x,
cov=self._get_motion_noise_covariance(dt=dt),
),
P=self._get_motion_noise_covariance(),
)
else:
next_state = Gaussian(
x=self._transition_matrix(dt=dt) @ state.x,
P=self._get_motion_noise_covariance(),
)
return next_state

def _transition_matrix(self, dt=None):
dt = self.dt if dt is None else dt
assert isinstance(dt, float)
F = np.array(
[
[1.0, 0.0, dt, 0.0],
[0.0, 1.0, 0.0, dt],
[0.0, 0.0, 1.0, 0.0],
[0.0, 0.0, 0.0, 1.0],
]
)
return F

def _get_motion_noise_covariance(self, dt=None, sigma=None):
dt = self.dt if dt is None else dt
assert isinstance(dt, float)

sigma = self.sigma if sigma is None else sigma
assert isinstance(sigma, float)
def F(self, state_vector: np.ndarray, dt: float) -> np.ndarray:
"""Transition matrix for constant velocity model"""
return np.array([[1.0, 0.0, dt, 0.0], [0.0, 1.0, 0.0, dt], [0.0, 0.0, 1.0, 0.0], [0.0, 0.0, 0.0, 1.0]])

Q = sigma**2 * np.array(
def Q(self, dt: float) -> np.ndarray:
"""Motion noise covariance for constant velocity model"""
return self.sigma**2 * np.array(
[
[dt**4 / 4.0, 0.0, dt**3 / 2.0, 0.0],
[0.0, dt**4.0 / 4.0, 0.0, dt**3.0 / 2.0],
[dt**3.0 / 2.0, 0.0, dt**2.0, 0.0],
[0.0, dt**3.0 / 2.0, 0.0, dt**2.0],
]
)
return Q

def __repr__(self) -> str:
return f"Constant velocity motion model with dt = {self.dt} siqma = {self.sigma} "
return f"Constant velocity motion model with dt = {self.dt} sigma = {self.sigma}"


class CoordinateTurnMotionModel(MotionModel):
def __init__(self, dt: float, sigma_V: float, sigma_omega: float, *args, **kwargs):
"""Creates a 2D coordinate turn model with nearly constant polar velocity and turn rate
Note:
the motion model assumes that the state vactor x consists of the following states:
px -> X position
py -> Y position
v -> velocity
phi -> heading
omega -> turn rate
Attributes:
d (scalar): object state dimension
F (5 x 5 matrix): function handle return a motion transition function
Q (5 x 5 matrix): motion noise covariance
f (5 x 1 vector): function handle return state prediction
Args:
T (scalar): sampling time
sigma_V (scalar): standart deviation of motion noise added to polar velocity
sigma_omega (scalar): standard deviation of motion noise added to turn rate
"""
self.d = 5
self.dt = dt
self.sigma_V = sigma_V
def __init__(self, random_state: int, sigma_v: float, sigma_omega: float, *args, **kwargs):
super().__init__(random_state, d=5) # 5 states: x, y, v, phi, omega
self.sigma_v = sigma_v
self.sigma_omega = sigma_omega

def __repr__(self) -> str:
return self.__class__.__name__ + (f"(d={self.d}, " f"dt={self.dt}, " f"sigma_V={self.sigma_V}, " f"sigma_omega={self.sigma_omega}, ")

def move(self, state: Gaussian, dt: float = None, if_noisy: bool = False) -> Gaussian:
dt = self.dt if dt is None else dt
assert isinstance(dt, float)
assert isinstance(state, Gaussian), f"Argument of wrong type! Type ={type}"

if if_noisy:
next_state = Gaussian(
x=self._generator.multivariate_normal(
mean=self._f(state, dt=dt),
cov=self.Q(dt=dt),
),
P=self.get_motion_noise_covariance(),
)
else:
next_state = Gaussian(
x=self._f(state, dt=dt),
P=self.Q(dt=dt),
)
return next_state

def f(self, state_vector, dt):
# TODO assert on state
pos_x, pos_y, v, phi, omega = state_vector
next_state = np.array([dt * v * np.cos(phi), dt * v * np.sin(phi), 0, dt * omega, 0])
return state_vector + next_state

def F(self, state_vector, dt):
def F(self, state_vector: np.ndarray, dt: float) -> np.ndarray:
"""Transition matrix for coordinate turn model"""
pos_x, pos_y, v, phi, omega = state_vector
return np.array(
[
Expand All @@ -176,28 +65,51 @@ def F(self, state_vector, dt):
]
)

def _f(self, state, dt):
pos_x, pos_y, v, phi, omega = state.x
def Q(self, dt: float) -> np.ndarray:
"""Motion noise covariance for coordinate turn model"""
return np.diag([0, 0, self.sigma_v**2, 0, self.sigma_omega**2])

next_state = np.array(
[
pos_x + ((2 * v) / omega) * np.sin(omega * dt / 2) * np.cos(phi + omega * dt / 2),
pos_y + ((2 * v) / omega) * np.sin(omega * dt / 2) * np.sin(phi + omega * dt / 2),
v,
phi + omega * dt,
omega,
]
)
return next_state
def __repr__(self) -> str:
return self.__class__.__name__ + (f"(d={self.d}, " f"dt={self.dt}, " f"sigma_v={self.sigma_v}, " f"sigma_omega={self.sigma_omega}, ")


class ConstantAccelerationMotionModel(MotionModel):
def __init__(self, random_state: int, sigma_a: float, *args, **kwargs):
super().__init__(random_state, d=6) # 6 states: x, y, vx, vy, ax, ay
self.sigma_a = sigma_a

def Q(self, dt=None):
"""Motion noise covariance"""
def F(self, state_vector: np.ndarray, dt: float) -> np.ndarray:
"""Transition matrix for constant acceleration model"""
return np.array(
[
[0.0, 0.0, 0.0, 0.0, 0.0],
[0.0, 0.0, 0.0, 0.0, 0.0],
[0.0, 0.0, self.sigma_V**2, 0.0, 0.0],
[0.0, 0.0, 0.0, 0.0, 0.0],
[0.0, 0.0, 0.0, 0.0, self.sigma_omega**2],
[1.0, 0.0, dt, 0.0, 0.5 * dt**2, 0.0],
[0.0, 1.0, 0.0, dt, 0.0, 0.5 * dt**2],
[0.0, 0.0, 1.0, 0.0, dt, 0.0],
[0.0, 0.0, 0.0, 1.0, 0.0, dt],
[0.0, 0.0, 0.0, 0.0, 1.0, 0.0],
[0.0, 0.0, 0.0, 0.0, 0.0, 1.0],
]
) # yapf: disable
)

def Q(self, dt: float) -> np.ndarray:
"""Motion noise covariance for constant acceleration model"""
sigma_a2 = self.sigma_a**2
dt2 = dt**2
dt3 = dt**3
dt4 = dt**4
return (
np.array(
[
[dt4 / 4, 0, dt3 / 2, 0, dt2 / 2, 0],
[0, dt4 / 4, 0, dt3 / 2, 0, dt2 / 2],
[dt3 / 2, 0, dt2, 0, dt, 0],
[0, dt3 / 2, 0, dt2, 0, dt],
[dt2 / 2, 0, dt, 0, 1, 0],
[0, dt2 / 2, 0, dt, 0, 1],
]
)
* sigma_a2
)

def __repr__(self) -> str:
return f"Constant acceleration motion model with dt = {self.dt} sigma_a = {self.sigma_a}"
5 changes: 3 additions & 2 deletions src/scenarios/scenario_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ class ScenarioConfig:
sigma_r: float
object_data_noisy: bool
sigma_q: float = None
sigma_V: float = 0.0
sigma_v: float = 0.0
sigma_omega: float = 0.0
sigma_b: float = 0.0
sensor_pos: List = field(default_factory=lambda: np.array([0.0, 0.0]))
random_state: int = field(default_factory=lambda: 42)
P_S: float = None


Expand All @@ -48,7 +49,7 @@ class ScenarioConfig:
total_time=100,
object_configs=nonlinear_sot_object_life_params,
dt=1.0,
sigma_V=1.0,
sigma_v=1.0,
sigma_omega=0.1 * np.pi / 180,
P_D=0.9,
lambda_c=10.0,
Expand Down
10 changes: 7 additions & 3 deletions src/simulator/object_data_generator.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import copy

from ..configs.ground_truth_config import GroundTruthConfig
from ..motion_models import MotionModel

Expand Down Expand Up @@ -43,9 +45,11 @@ def generate_objects_data(self):
state = object_config.initial_state
for timestep in range(self._ground_truth_config.total_time):
if timestep in range(object_config.t_birth, object_config.t_death):
object_state_history[timestep][object_config.id] = state
next_state = self._motion_model.move(state, if_noisy=self._if_noisy)
state = next_state
object_state_history[timestep][object_config.id] = copy.copy(state)
dt = 1.0
next_mean = self._motion_model.f(state.x, dt)
state.P = self._motion_model.Q(dt)
state.x = self._motion_model._generator.multivariate_normal(next_mean, state.P) if self._if_noisy else next_mean
return tuple(object_state_history)

def __repr__(self) -> str:
Expand Down
2 changes: 1 addition & 1 deletion tests/MOT/n_mot_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def test_tracker(config, motion_model, meas_model, name, tracker, tracker_initia
visulaize(object_data, meas_data, tracker_estimations, filepath)
gospa = get_gospa(object_data, tracker_estimations)
motmetrics = get_motmetrics(object_data, tracker_estimations) # noqa F841
assert np.mean(gospa) < 300
assert np.mean(gospa) < 500

if os.getenv("ANIMATE", "False") == "True":
animate(object_data, meas_data, tracker_estimations, filepath)
3 changes: 1 addition & 2 deletions tests/PMBM/bernoulli_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ def initial_bernoulli():

@pytest.fixture
def cv_motion_model():
dt = 1.0
sigma_q = 5.0
return ConstantVelocityMotionModel(dt, sigma_q)
return ConstantVelocityMotionModel(42, sigma_q)


@pytest.fixture
Expand Down
4 changes: 2 additions & 2 deletions tests/PMBM/pmbm_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,9 @@ def test_synthetic_scenario(
):
name, object_motion_fixture = object_motion_fixture
# Create linear motion model
dt = 1.0
sigma_q = 10.0
motion_model = ConstantVelocityMotionModel(dt, sigma_q)
random_state = 42
motion_model = ConstantVelocityMotionModel(random_state, sigma_q)

# Create linear measurement model
sigma_r = 10.0
Expand Down
7 changes: 4 additions & 3 deletions tests/PMBM/poisson_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ def test_PPP_predict_linear_motion(initial_PPP_intensity_linear, clutter_intensi

dt = 1.0
sigma_q = 10.0
motion_model = ConstantVelocityMotionModel(dt, sigma_q)
random_state = 42
motion_model = ConstantVelocityMotionModel(random_state, sigma_q)

# Set Poisson RFS
PPP = PoissonRFS(intensity=initial_PPP_intensity_linear)
Expand Down Expand Up @@ -101,9 +102,9 @@ def test_PPP_detected_update(initial_PPP_intensity_linear):
clutter_intensity = sensor_model.intensity_c

# Create nlinear motion model
dt = 1.0
sigma_q = 10.0
motion_model = ConstantVelocityMotionModel(dt, sigma_q) # noqa F841
random_state = 42
motion_model = ConstantVelocityMotionModel(random_state, sigma_q) # noqa F841

# Create linear measurement model
sigma_r = 10.0
Expand Down
4 changes: 2 additions & 2 deletions tests/common/ellipsoidal_gating_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ def test_ellipsoidal_gating():
]

grount_truth_config = GroundTruthConfig(object_configs=objects, total_time=total_time)
test_dt = 1.0
test_sigma_q = 2.0
motion_model = ConstantVelocityMotionModel(dt=test_dt, sigma_q=test_sigma_q)
random_seed = 42
motion_model = ConstantVelocityMotionModel(random_seed, test_sigma_q)

test_P_D = 1.0
test_lambda_c = 60.0
Expand Down
4 changes: 2 additions & 2 deletions tests/motion_modeling/measurement_data_generator_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ def test_function_name(self):
ground_truth = GroundTruthConfig(object_configs=objects_configs, total_time=total_time)

# Linear motion model
dt = 1.0
sigma_q = 5.0
motion_model = ConstantVelocityMotionModel(dt=dt, sigma_q=sigma_q)
random_state = 42
motion_model = ConstantVelocityMotionModel(sigma_q=sigma_q, random_state=random_state)

# Generate true object data (noisy or noiseless) and measurement
object_data = ObjectData(ground_truth_config=ground_truth, motion_model=motion_model, if_noisy=False)
Expand Down
Loading

0 comments on commit 7a3af07

Please sign in to comment.