Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cln(utils): split utils into plotting and data #17

Merged
merged 2 commits into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 3 additions & 8 deletions src/gen_experiments/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,9 @@
import numpy as np
import pysindy as ps

from gen_experiments.utils import (
FullTrialData,
NestedDict,
SeriesDef,
SeriesList,
_PlotPrefs,
_signal_avg_power,
)
from gen_experiments.data import _signal_avg_power
from gen_experiments.plotting import _PlotPrefs
from gen_experiments.utils import FullTrialData, NestedDict, SeriesDef, SeriesList

T = TypeVar("T")
U = TypeVar("U")
Expand Down
235 changes: 235 additions & 0 deletions src/gen_experiments/data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
from math import ceil
from pathlib import Path
from typing import Callable
from warnings import warn

import mitosis
import numpy as np
import scipy

from gen_experiments.utils import GridsearchResultDetails

INTEGRATOR_KEYWORDS = {"rtol": 1e-12, "method": "LSODA", "atol": 1e-12}
TRIALS_FOLDER = Path(__file__).parent.absolute() / "trials"


def gen_data(
rhs_func,
n_coord,
seed=None,
n_trajectories=1,
x0_center=None,
ic_stdev=3,
noise_abs=None,
noise_rel=None,
nonnegative=False,
dt=0.01,
t_end=10,
):
"""Generate random training and test data

Note that test data has no noise.

Arguments:
rhs_func (Callable): the function to integrate
n_coord (int): number of coordinates needed for rhs_func
seed (int): the random seed for number generation
n_trajectories (int): number of trajectories of training data
x0_center (np.array): center of random initial conditions
ic_stdev (float): standard deviation for generating initial
conditions
noise_abs (float): measurement noise standard deviation.
Defaults to .1 if noise_rel is None.
noise_rel (float): measurement noise-to-signal power ratio.
Either noise_abs or noise_rel must be None. Defaults to
None.
nonnegative (bool): Whether x0 must be nonnegative, such as for
population models. If so, a gamma distribution is
used, rather than a normal distribution.

Returns:
dt, t_train, x_train, x_test, x_dot_test, x_train_true
"""
if noise_abs is not None and noise_rel is not None:
raise ValueError("Cannot specify both noise_abs and noise_rel")
elif noise_abs is None and noise_rel is None:
noise_abs = 0.1
rng = np.random.default_rng(seed)
if x0_center is None:
x0_center = np.zeros((n_coord))
t_train = np.arange(0, t_end, dt)
t_train_span = (t_train[0], t_train[-1])
if nonnegative:
shape = ((x0_center + 1) / ic_stdev) ** 2
scale = ic_stdev**2 / (x0_center + 1)
x0_train = np.array(
[rng.gamma(k, theta, n_trajectories) for k, theta in zip(shape, scale)]
).T
x0_test = np.array([
rng.gamma(k, theta, ceil(n_trajectories / 2))
for k, theta in zip(shape, scale)
]).T
else:
x0_train = ic_stdev * rng.standard_normal((n_trajectories, n_coord)) + x0_center
x0_test = (
ic_stdev * rng.standard_normal((ceil(n_trajectories / 2), n_coord))
+ x0_center
)
x_train = []
for traj in range(n_trajectories):
x_train.append(
scipy.integrate.solve_ivp(
rhs_func,
t_train_span,
x0_train[traj, :],
t_eval=t_train,
**INTEGRATOR_KEYWORDS,
).y.T
)

def _drop_and_warn(arrs):
maxlen = max(arr.shape[0] for arr in arrs)

def _alert_short(arr):
if arr.shape[0] < maxlen:
warn(message="Dropping simulation due to blow-up")
return False
return True

arrs = list(filter(_alert_short, arrs))
if len(arrs) == 0:
raise ValueError(
"Simulations failed due to blow-up. System is too stiff for solver's"
" numerical tolerance"
)
return arrs

x_train = _drop_and_warn(x_train)
x_train = np.stack(x_train)
x_test = []
for traj in range(ceil(n_trajectories / 2)):
x_test.append(
scipy.integrate.solve_ivp(
rhs_func,
t_train_span,
x0_test[traj, :],
t_eval=t_train,
**INTEGRATOR_KEYWORDS,
).y.T
)
x_test = _drop_and_warn(x_test)
x_test = np.array(x_test)
x_dot_test = np.array([[rhs_func(0, xij) for xij in xi] for xi in x_test])
x_train_true = np.copy(x_train)
if noise_rel is not None:
noise_abs = np.sqrt(_signal_avg_power(x_test) * noise_rel)
x_train = x_train + noise_abs * rng.standard_normal(x_train.shape)
x_train = list(x_train)
x_test = list(x_test)
x_dot_test = list(x_dot_test)
return dt, t_train, x_train, x_test, x_dot_test, x_train_true


def gen_pde_data(
rhs_func: Callable,
init_cond: np.ndarray,
args: tuple,
dimension: int,
seed: int | None = None,
noise_abs: float | None = None,
noise_rel: float | None = None,
dt: float = 0.01,
t_end: int = 100,
):
"""Generate PDE measurement data for training

For simplicity, Trajectories have been removed,
Test data is the same as Train data.

Arguments:
rhs_func: the function to integrate
init_cond: Initial Conditions for the PDE
args: Arguments for rhsfunc
dimension: Number of spatial dimensions (1, 2, or 3)
seed (int): the random seed for number generation
noise_abs (float): measurement noise standard deviation.
Defaults to .1 if noise_rel is None.
noise_rel (float): measurement noise relative to amplitude of
true data. Amplitude of data is calculated as the max value
of the power spectrum. Either noise_abs or noise_rel must
be None. Defaults to None.
dt (float): time step for the PDE simulation
t_end (int): total time for the PDE simulation

Returns:
dt, t_train, x_train, x_test, x_dot_test, x_train_true
"""
if noise_abs is not None and noise_rel is not None:
raise ValueError("Cannot specify both noise_abs and noise_rel")
elif noise_abs is None and noise_rel is None:
noise_abs = 0.1
rng = np.random.default_rng(seed)
t_train = np.arange(0, t_end, dt)
t_train_span = (t_train[0], t_train[-1])
x_train = []
x_train.append(
scipy.integrate.solve_ivp(
rhs_func,
t_train_span,
init_cond,
t_eval=t_train,
args=args,
**INTEGRATOR_KEYWORDS,
).y.T
)
t, x = x_train[0].shape
x_train = np.stack(x_train, axis=-1)
if dimension == 1:
pass
elif dimension == 2:
x_train = np.reshape(x_train, (t, int(np.sqrt(x)), int(np.sqrt(x)), 1))
elif dimension == 3:
x_train = np.reshape(
x_train, (t, int(np.cbrt(x)), int(np.cbrt(x)), int(np.cbrt(x)), 1)
)
x_test = x_train
x_test = np.moveaxis(x_test, -1, 0)
x_dot_test = np.array(
[[rhs_func(0, xij, args[0], args[1]) for xij in xi] for xi in x_test]
)
if dimension == 1:
x_dot_test = [np.moveaxis(x_dot_test, [0, 1], [-1, -2])]
pass
elif dimension == 2:
x_dot_test = np.reshape(x_dot_test, (t, int(np.sqrt(x)), int(np.sqrt(x)), 1))
x_dot_test = [np.moveaxis(x_dot_test, 0, -2)]
elif dimension == 3:
x_dot_test = np.reshape(
x_dot_test, (t, int(np.cbrt(x)), int(np.cbrt(x)), int(np.cbrt(x)), 1)
)
x_dot_test = [np.moveaxis(x_dot_test, 0, -2)]
x_train_true = np.copy(x_train)
if noise_rel is not None:
noise_abs = _max_amplitude(x_test) * noise_rel
x_train = x_train + noise_abs * rng.standard_normal(x_train.shape)
x_train = [np.moveaxis(x_train, 0, -2)]
x_train_true = np.moveaxis(x_train_true, 0, -2)
x_test = [np.moveaxis(x_test, [0, 1], [-1, -2])]
return dt, t_train, x_train, x_test, x_dot_test, x_train_true


def _max_amplitude(signal: np.ndarray):
return np.abs(scipy.fft.rfft(signal, axis=0)[1:]).max() / np.sqrt(len(signal))


def _signal_avg_power(signal: np.ndarray) -> float:
return np.square(signal).mean()


def load_results(hexstr: str) -> GridsearchResultDetails:
"""Load the results that mitosis saves

Args:
hexstr: randomly-assigned identifier for the results to open
"""
return mitosis.load_trial_data(hexstr, trials_folder=TRIALS_FOLDER)
2 changes: 1 addition & 1 deletion src/gen_experiments/gridsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import gen_experiments
from gen_experiments import config
from gen_experiments.odes import plot_ode_panel
from gen_experiments.plotting import _PlotPrefs
from gen_experiments.utils import (
GridsearchResult,
GridsearchResultDetails,
Expand All @@ -23,7 +24,6 @@
_amax_to_full_inds,
_argopt,
_grid_locator_match,
_PlotPrefs,
simulate_test_data,
)

Expand Down
22 changes: 16 additions & 6 deletions src/gen_experiments/odes.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,18 @@
import pysindy as ps

from . import config
from .data import gen_data
from .plotting import (
compare_coefficient_plots,
plot_test_trajectories,
plot_training_data,
)
from .utils import (
FullTrialData,
TrialData,
_make_model,
coeff_metrics,
compare_coefficient_plots,
gen_data,
integration_metrics,
plot_test_trajectories,
plot_training_data,
simulate_test_data,
unionize_coeff_matrices,
)
Expand Down Expand Up @@ -220,8 +222,8 @@ def plot_ode_panel(trial_data: FullTrialData):
compare_coefficient_plots(
trial_data["coeff_fit"],
trial_data["coeff_true"],
input_features=trial_data["input_features"],
feature_names=trial_data["feature_names"],
input_features=[_texify(feat) for feat in trial_data["input_features"]],
feature_names=[_texify(feat) for feat in trial_data["feature_names"]],
)
plot_test_trajectories(
trial_data["x_test"],
Expand All @@ -230,3 +232,11 @@ def plot_ode_panel(trial_data: FullTrialData):
trial_data["t_sim"],
)
plt.show()


def _texify(input: str) -> str:
if input[0] != "$":
input = "$" + input
if input[-1] != "$":
input = input + "$"
return input
5 changes: 2 additions & 3 deletions src/gen_experiments/pdes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@
from pysindy.differentiation import SpectralDerivative

from . import config
from .data import gen_pde_data
from .plotting import compare_coefficient_plots, plot_pde_training_data
from .utils import (
FullTrialData,
TrialData,
_make_model,
coeff_metrics,
compare_coefficient_plots,
gen_pde_data,
integration_metrics,
plot_pde_training_data,
simulate_test_data,
unionize_coeff_matrices,
)
Expand Down
Loading
Loading