Skip to content

Commit e8b9d8b

Browse files
committed
feat(dynamical-system): add first order dynamical system
#103
1 parent 6a09e0f commit e8b9d8b

File tree

6 files changed

+253
-0
lines changed

6 files changed

+253
-0
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
"""Collections of dynamical systems."""
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
"""Base classes for first order dynamical systems.
2+
3+
In this module, we define the base classes for first order dynamical systems.
4+
For example, we define the base class for first order dynamical systems called
5+
`FirstOrderDynamicsBase`.
6+
"""
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
"""Base classes for first order dynamical systems.
2+
3+
In this module, we introduce the base classes for
4+
first order dynamical systems, and the corresponding
5+
abstract classes for system and initial condition.
6+
"""
7+
8+
from abc import ABC, abstractmethod
9+
from functools import cached_property
10+
from typing import Any
11+
12+
import numpy as np
13+
import pandas as pd
14+
from numpy import typing as npt
15+
from pydantic import BaseModel, Field
16+
17+
from hamilflow.models.utils.typing import TypeTime
18+
19+
20+
class FirstOrderSystem(BaseModel, ABC):
21+
"""The base params for a first order dynamical system.
22+
23+
Create your own system by inheriting from this class
24+
and adding your own params.
25+
26+
```python
27+
class MyCustomSystem(FirstOrderSystem):
28+
# your fields here
29+
omega: float
30+
```
31+
"""
32+
33+
34+
class FirstOrderIC(BaseModel, ABC):
35+
"""The base initial condition for a first order dynamical system.
36+
37+
:cvar x0: the initial state of the system
38+
:cvar t0: the initial time (default to 0.0)
39+
"""
40+
41+
x0: float | list[float] = Field(...)
42+
t0: float = Field(default=0.0)
43+
44+
45+
class FirstOrderDynamicsBase(ABC):
46+
"""Base class to generate time series data for a first order dynamical system.
47+
48+
:param system: all the params that defines the system.
49+
:param initial_condition: the initial condition of the system.
50+
"""
51+
52+
def __init__(
53+
self,
54+
system: FirstOrderSystem,
55+
initial_condition: FirstOrderIC,
56+
) -> None:
57+
58+
self.system = system
59+
self.ic = initial_condition
60+
61+
@cached_property
62+
def definition(self) -> dict[str, dict[str, Any]]:
63+
"""Model params and initial conditions defined as a dictionary.
64+
65+
:return: dictionary containing system and initial condition parameters.
66+
"""
67+
return {
68+
"system": self.system.model_dump(),
69+
"initial_condition": self.ic.model_dump(),
70+
}
71+
72+
@abstractmethod
73+
def derivatives(
74+
self,
75+
state: npt.NDArray[np.float64],
76+
t: float,
77+
) -> npt.NDArray[np.float64]:
78+
"""Return the derivative of the state with respect to time.
79+
80+
:param state: the current state of the system.
81+
:param t: the current time.
82+
:return: the derivative of the state with respect to time.
83+
"""
84+
85+
def step(
86+
self,
87+
state: npt.NDArray[np.float64],
88+
t: float,
89+
dt: float,
90+
) -> tuple[npt.NDArray[np.float64], float]:
91+
"""Advances the system by one time step (dt) using
92+
4th-Order Runge-Kutta (RK4) integration.
93+
94+
:param state: the current state of the system.
95+
:param t: the current time.
96+
:param dt: the time step size.
97+
:return: the new state of the system and the new time.
98+
"""
99+
k1 = self.derivatives(state, t)
100+
k2 = self.derivatives(state + 0.5 * dt * k1, t + 0.5 * dt)
101+
k3 = self.derivatives(state + 0.5 * dt * k2, t + 0.5 * dt)
102+
k4 = self.derivatives(state + dt * k3, t + dt)
103+
104+
new_state = state + (dt / 6.0) * (k1 + 2 * k2 + 2 * k3 + k4)
105+
new_t = t + dt
106+
107+
return new_state, new_t
108+
109+
def __call__(self, t: TypeTime) -> pd.DataFrame:
110+
"""Generate time series data for the dynamical system.
111+
112+
:param t: sequence of time steps.
113+
:return: values of the variables including time `t` and state `x`.
114+
"""
115+
t_arr = np.asarray(t)
116+
117+
current_state = np.asarray(self.ic.x0, dtype=np.float64)
118+
current_t = self.ic.t0
119+
120+
effective_t = [current_t]
121+
history = [current_state]
122+
for target_t in t_arr:
123+
dt = target_t - current_t
124+
if dt > 0:
125+
current_state, current_t = self.step(current_state, current_t, dt)
126+
effective_t.append(current_t)
127+
history.append(current_state)
128+
129+
data = np.asarray(history)
130+
131+
if data.ndim == 1:
132+
columns = ["x"]
133+
else:
134+
columns = [f"x{i+1}" for i in range(data.shape[1])]
135+
136+
return (
137+
pd.DataFrame(data, columns=columns).assign(t=effective_t).sort_index(axis=1)
138+
)

pyproject.toml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,13 +66,16 @@ target-version = "py310"
6666
select = ["ALL"]
6767
ignore = [
6868
"D107", # undocumented-public-init: we document the class instead
69+
"D203", # one-blank-line-before-class: we use D211 instead
70+
"D213", # multi-line-summary-second-line: we use D212 instead
6971
"E501", # line-too-long: we use black instead
7072
"TID252", # relative-imports
7173
"PD901", # pandas-df-variable-name
7274
"FBT",
7375
"FIX",
7476
"TD",
7577
"RET",
78+
"D205",
7679
]
7780

7881
[tool.ruff.lint.per-file-ignores]
@@ -84,6 +87,15 @@ ignore = [
8487
"tests/**/*.py" = [
8588
"S101", # assert: Fine in tests
8689
"SLF001", # private-member-access: find in tests
90+
"D104", # missing-module-docstring
91+
"INP001", # implicit-namespace-package: tutorials are not packages
92+
"D101", # missing-class-docstring
93+
"D102", # missing-method-docstring
94+
"D103", # missing-function-docstring
95+
"D100", # missing-module-docstring
96+
"ARG002", # unused-method-argument
97+
"ANN201", # missing-return-type-annotation
98+
"PLR2004", # magic-value
8799
]
88100

89101
[tool.ruff.lint.isort]

tests/models/test_dynamical_systems/base/__init__.py

Whitespace-only changes.
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
import numpy as np
2+
from numpy import typing as npt
3+
from pydantic import Field
4+
5+
from hamilflow.models.dynamical_systems.base.first_order_dynamics import (
6+
FirstOrderDynamicsBase,
7+
FirstOrderIC,
8+
FirstOrderSystem,
9+
)
10+
11+
12+
class ExponentialDecaySystem(FirstOrderSystem):
13+
k: float = Field(default=1.0)
14+
15+
16+
class ExpontialDecayIC(FirstOrderIC):
17+
x0: float | list[float] = Field(...)
18+
t0: float = Field(default=0.0)
19+
20+
21+
class ExponentialDecayModel(FirstOrderDynamicsBase):
22+
def derivatives(
23+
self,
24+
state: npt.NDArray[np.float64],
25+
t: float,
26+
) -> npt.NDArray[np.float64]:
27+
# dy/dt = -k * y
28+
return -self.system.k * state # type: ignore[attr-defined]
29+
30+
31+
def test_first_order_dynamics_instantiation():
32+
model = ExponentialDecayModel(
33+
system=ExponentialDecaySystem(k=2.0),
34+
initial_condition=ExpontialDecayIC(x0=5.0, t0=0.0),
35+
)
36+
assert model.system.k == 2.0
37+
assert model.ic.x0 == 5.0
38+
assert model.ic.t0 == 0.0
39+
40+
definition = model.definition
41+
assert definition["system"]["k"] == 2.0
42+
assert definition["initial_condition"]["x0"] == 5.0
43+
44+
45+
def test_first_order_dynamics_step():
46+
model = ExponentialDecayModel(
47+
system=ExponentialDecaySystem(k=1.0),
48+
initial_condition=ExpontialDecayIC(x0=1.0),
49+
)
50+
state = np.array([1.0], dtype=np.float64)
51+
dt = 0.1
52+
t = 0.0
53+
54+
new_state, new_t = model.step(state, t, dt)
55+
assert new_t == 0.1
56+
57+
# For dx/dt = -x, RK4 step starting at x=1.0 with dt=0.1
58+
# RK4 match to 4th order Taylor expansion
59+
expected = 1.0 - 0.1 + (0.1**2) / 2.0 - (0.1**3) / 6.0 + (0.1**4) / 24.0
60+
np.testing.assert_allclose(new_state, expected, rtol=1e-5)
61+
62+
63+
def test_first_order_dynamics_call_scalar_x0():
64+
model = ExponentialDecayModel(
65+
system=ExponentialDecaySystem(k=1.0),
66+
initial_condition=ExpontialDecayIC(x0=3.0, t0=0.0),
67+
)
68+
69+
t_arr = np.linspace(0.0, 1.0, 101)
70+
df = model(t_arr)
71+
72+
assert list(df.columns) == ["t", "x"]
73+
np.testing.assert_allclose(df["t"].values, t_arr)
74+
75+
# Check values against analytical: x = 3 * e^{-t}
76+
expected_x = 3.0 * np.exp(-1.0 * t_arr)
77+
np.testing.assert_allclose(df["x"].values, expected_x, rtol=1e-4)
78+
79+
80+
def test_first_order_dynamics_call_array_x0():
81+
model = ExponentialDecayModel(
82+
system=ExponentialDecaySystem(k=2.0),
83+
initial_condition=ExpontialDecayIC(x0=[2.0, 5.0], t0=0.0),
84+
)
85+
86+
t_arr = np.linspace(0.0, 0.5, 101)
87+
df = model(t_arr)
88+
89+
assert list(df.columns) == ["t", "x1", "x2"]
90+
91+
# Check values against analytical: x = x0 * e^{-2t}
92+
expected_x1 = 2.0 * np.exp(-2.0 * t_arr)
93+
expected_x2 = 5.0 * np.exp(-2.0 * t_arr)
94+
95+
np.testing.assert_allclose(df["x1"].values, expected_x1, rtol=1e-4)
96+
np.testing.assert_allclose(df["x2"].values, expected_x2, rtol=1e-4)

0 commit comments

Comments
 (0)