diff --git a/tests/test_sampling_seasonality.py b/tests/test_sampling_seasonality.py index 0b77361..a73f3c8 100644 --- a/tests/test_sampling_seasonality.py +++ b/tests/test_sampling_seasonality.py @@ -22,56 +22,62 @@ def gen_design_matrix(N): def test_seasonality_sampling(N: int = 200, off_param=1): - np.random.seed(2032) + with np.errstate(over="warn", under="warn"): + np.random.seed(2032) - X_t = gen_design_matrix(N) - betas_intercept = np.random.normal(np.log(3000), 1, size=1) - betas_hour = np.sort(np.random.normal(1, 0.5, size=23)) - betas_week = np.sort(np.random.normal(1, 0.5, size=6)) + X_t = gen_design_matrix(N) + betas_intercept = np.random.normal(np.log(3000), 1, size=1) + betas_hour = np.sort(np.random.normal(1, 0.5, size=23)) + betas_week = np.sort(np.random.normal(1, 0.5, size=6)) - betas = tt.concatenate([betas_intercept, betas_hour, betas_week]) - eta_r = tt.exp(tt.dot(X_t, betas)) + betas = tt.concatenate([betas_intercept, betas_hour, betas_week]) + eta_r = tt.exp(tt.dot(X_t, betas)) - kwargs = { - "N": N, - "mus": np.r_[eta_r], - "pi_0_a": np.r_[1, 1], - "Gamma": np.r_["0,2,1", [10, 1], [5, 5]], - } - simulation, _ = simulate_poiszero_hmm(**kwargs) + kwargs = { + "N": N, + "mus": np.r_[eta_r], + "pi_0_a": np.r_[1, 1], + "Gamma": np.r_["0,2,1", [10, 1], [5, 5]], + } + simulation, _ = simulate_poiszero_hmm(**kwargs) - with pm.Model() as test_model: - p_0_rv = pm.Dirichlet("p_0", np.r_[1, 1]) - p_1_rv = pm.Dirichlet("p_1", np.r_[1, 1]) + with pm.Model() as test_model: + p_0_rv = pm.Dirichlet("p_0", np.r_[1, 1]) + p_1_rv = pm.Dirichlet("p_1", np.r_[1, 1]) - P_tt = tt.stack([p_0_rv, p_1_rv]) - P_rv = pm.Deterministic("P_tt", P_tt) + P_tt = tt.stack([p_0_rv, p_1_rv]) + P_rv = pm.Deterministic("P_tt", P_tt) - pi_0_tt = simulation["pi_0"] - y_test = simulation["Y_t"] + pi_0_tt = simulation["pi_0"] + y_test = simulation["Y_t"] - S_rv = HMMStateSeq("S_t", y_test.shape[0], P_rv, pi_0_tt) - S_rv.tag.test_value = (y_test > 0).astype(np.int) + S_rv = HMMStateSeq("S_t", y_test.shape[0], P_rv, pi_0_tt) + S_rv.tag.test_value = (y_test > 0).astype(np.int) - X = gen_design_matrix(N) - beta_s_intercept = pm.Normal("beta_s_intercept", 8, 1, shape=(1,)) - beta_s_hour = pm.Normal("beta_s_hour", 1, 0.5, shape=(23,)) - beta_s_week = pm.Normal("beta_s_week", 1, 0.5, shape=(6,)) + X = gen_design_matrix(N) + beta_s_intercept = pm.Normal( + "beta_s_intercept", np.log(3000), 1, shape=(1,) + ) + beta_s_hour = pm.Normal("beta_s_hour", 1, 0.5, shape=(23,)) + beta_s_week = pm.Normal("beta_s_week", 1, 0.5, shape=(6,)) - beta_s = pm.Deterministic( - "beta_s", tt.concatenate([beta_s_intercept, beta_s_hour, beta_s_week]) - ) - mu = tt.exp(tt.dot(X, beta_s)) + beta_s = pm.Deterministic( + "beta_s", tt.concatenate([beta_s_intercept, beta_s_hour, beta_s_week]) + ) + mu = tt.exp(tt.dot(X, beta_s)) - Y_rv = SwitchingProcess( - "Y_t", [pm.Constant.dist(0), pm.Poisson.dist(mu)], S_rv, observed=y_test, - ) - with test_model: - mu_step = pm.NUTS([mu, beta_s]) - ffbs = FFBSStep([S_rv]) - transitions = TransMatConjugateStep([p_0_rv, p_1_rv], S_rv) - steps = [ffbs, mu_step, transitions] - trace_ = pm.sample(N, step=steps, return_inferencedata=True, chains=1) - posterior = pm.sample_posterior_predictive(trace_.posterior) + Y_rv = SwitchingProcess( + "Y_t", + [pm.Constant.dist(0), pm.Poisson.dist(mu)], + S_rv, + observed=y_test, + ) + with test_model: + mu_step = pm.NUTS([mu, beta_s]) + ffbs = FFBSStep([S_rv]) + transitions = TransMatConjugateStep([p_0_rv, p_1_rv], S_rv) + steps = [ffbs, mu_step, transitions] + trace_ = pm.sample(N, step=steps, return_inferencedata=True, chains=1) + posterior = pm.sample_posterior_predictive(trace_.posterior) - check_metrics_for_sampling(trace_, posterior, simulation) + check_metrics_for_sampling(trace_, posterior, simulation) diff --git a/tests/test_ztp.py b/tests/test_ztp.py index 4f1866c..011d1cb 100644 --- a/tests/test_ztp.py +++ b/tests/test_ztp.py @@ -70,59 +70,62 @@ def gen_design_matrix(N): def test_seasonality_ztp_sampling(N: int = 200, off_param=1): - np.random.seed(2032) - - X_t = gen_design_matrix(N) - betas_intercept = np.random.normal(np.log(3000), 1, size=1) - betas_hour = np.sort(np.random.normal(0.5, 0.1, size=23)) - betas_week = np.sort(np.random.normal(1, 0.1, size=6)) - - betas = tt.concatenate([betas_intercept, betas_hour, betas_week]) - eta_r = tt.exp(tt.dot(X_t, betas)) - - cls = ZeroTruncatedPoisson - - kwargs = { - "N": N, - "mus": np.r_[eta_r], - "pi_0_a": np.r_[1, 1], - "Gamma": np.r_["0,2,1", [10, 1], [5, 5]], - "cls": cls, - } - simulation, _ = simulate_poiszero_hmm(**kwargs) - - with pm.Model() as test_model: - p_0_rv = pm.Dirichlet("p_0", np.r_[1, 1]) - p_1_rv = pm.Dirichlet("p_1", np.r_[1, 1]) - - P_tt = tt.stack([p_0_rv, p_1_rv]) - P_rv = pm.Deterministic("P_tt", P_tt) - - pi_0_tt = simulation["pi_0"] - y_test = simulation["Y_t"] - - S_rv = HMMStateSeq("S_t", y_test.shape[0], P_rv, pi_0_tt) - S_rv.tag.test_value = (y_test > 0).astype(np.int) - - X = gen_design_matrix(N) - beta_s_intercept = pm.Normal("beta_s_intercept", np.log(3000), 1, shape=(1,)) - beta_s_hour = pm.Normal("beta_s_hour", 0.5, 0.1, shape=(23,)) - beta_s_week = pm.Normal("beta_s_week", 1, 0.1, shape=(6,)) - - beta_s = pm.Deterministic( - "beta_s", tt.concatenate([beta_s_intercept, beta_s_hour, beta_s_week]) - ) - mu = tt.exp(tt.dot(X, beta_s)) - - Y_rv = SwitchingProcess( - "Y_t", [pm.Constant.dist(0), cls.dist(mu)], S_rv, observed=y_test, - ) - with test_model: - mu_step = pm.NUTS([mu, beta_s]) - ffbs = FFBSStep([S_rv]) - transitions = TransMatConjugateStep([p_0_rv, p_1_rv], S_rv) - steps = [ffbs, mu_step, transitions] - trace_ = pm.sample(N, step=steps, return_inferencedata=True, chains=1) - posterior = pm.sample_posterior_predictive(trace_.posterior) - - check_metrics_for_sampling(trace_, posterior, simulation) + with np.errstate(over="warn", under="warn"): + np.random.seed(2032) + + X_t = gen_design_matrix(N) + betas_intercept = np.random.normal(np.log(3000), 1, size=1) + betas_hour = np.sort(np.random.normal(0.5, 0.1, size=23)) + betas_week = np.sort(np.random.normal(1, 0.1, size=6)) + + betas = tt.concatenate([betas_intercept, betas_hour, betas_week]) + eta_r = tt.exp(tt.dot(X_t, betas)) + + cls = ZeroTruncatedPoisson + + kwargs = { + "N": N, + "mus": np.r_[eta_r], + "pi_0_a": np.r_[1, 1], + "Gamma": np.r_["0,2,1", [10, 1], [5, 5]], + "cls": cls, + } + simulation, _ = simulate_poiszero_hmm(**kwargs) + + with pm.Model() as test_model: + p_0_rv = pm.Dirichlet("p_0", np.r_[1, 1]) + p_1_rv = pm.Dirichlet("p_1", np.r_[1, 1]) + + P_tt = tt.stack([p_0_rv, p_1_rv]) + P_rv = pm.Deterministic("P_tt", P_tt) + + pi_0_tt = simulation["pi_0"] + y_test = simulation["Y_t"] + + S_rv = HMMStateSeq("S_t", y_test.shape[0], P_rv, pi_0_tt) + S_rv.tag.test_value = (y_test > 0).astype(np.int) + + X = gen_design_matrix(N) + beta_s_intercept = pm.Normal( + "beta_s_intercept", np.log(3000), 1, shape=(1,) + ) + beta_s_hour = pm.Normal("beta_s_hour", 0.5, 0.1, shape=(23,)) + beta_s_week = pm.Normal("beta_s_week", 1, 0.1, shape=(6,)) + + beta_s = pm.Deterministic( + "beta_s", tt.concatenate([beta_s_intercept, beta_s_hour, beta_s_week]) + ) + mu = tt.exp(tt.dot(X, beta_s)) + + Y_rv = SwitchingProcess( + "Y_t", [pm.Constant.dist(0), cls.dist(mu)], S_rv, observed=y_test, + ) + with test_model: + mu_step = pm.NUTS([mu, beta_s]) + ffbs = FFBSStep([S_rv]) + transitions = TransMatConjugateStep([p_0_rv, p_1_rv], S_rv) + steps = [ffbs, mu_step, transitions] + trace_ = pm.sample(N, step=steps, return_inferencedata=True, chains=1) + posterior = pm.sample_posterior_predictive(trace_.posterior) + + check_metrics_for_sampling(trace_, posterior, simulation)