Skip to content

Commit

Permalink
wrap tests with underflow and overflow warning only
Browse files Browse the repository at this point in the history
  • Loading branch information
xjing76 committed Jul 9, 2020
1 parent 00fdd69 commit ef4b0f3
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 98 deletions.
90 changes: 48 additions & 42 deletions tests/test_sampling_seasonality.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,56 +22,62 @@ def gen_design_matrix(N):


def test_seasonality_sampling(N: int = 200, off_param=1):
np.random.seed(2032)
with np.errstate(over="warn", under="warn"):
np.random.seed(2032)

X_t = gen_design_matrix(N)
betas_intercept = np.random.normal(np.log(3000), 1, size=1)
betas_hour = np.sort(np.random.normal(1, 0.5, size=23))
betas_week = np.sort(np.random.normal(1, 0.5, size=6))
X_t = gen_design_matrix(N)
betas_intercept = np.random.normal(np.log(3000), 1, size=1)
betas_hour = np.sort(np.random.normal(1, 0.5, size=23))
betas_week = np.sort(np.random.normal(1, 0.5, size=6))

betas = tt.concatenate([betas_intercept, betas_hour, betas_week])
eta_r = tt.exp(tt.dot(X_t, betas))
betas = tt.concatenate([betas_intercept, betas_hour, betas_week])
eta_r = tt.exp(tt.dot(X_t, betas))

kwargs = {
"N": N,
"mus": np.r_[eta_r],
"pi_0_a": np.r_[1, 1],
"Gamma": np.r_["0,2,1", [10, 1], [5, 5]],
}
simulation, _ = simulate_poiszero_hmm(**kwargs)
kwargs = {
"N": N,
"mus": np.r_[eta_r],
"pi_0_a": np.r_[1, 1],
"Gamma": np.r_["0,2,1", [10, 1], [5, 5]],
}
simulation, _ = simulate_poiszero_hmm(**kwargs)

with pm.Model() as test_model:
p_0_rv = pm.Dirichlet("p_0", np.r_[1, 1])
p_1_rv = pm.Dirichlet("p_1", np.r_[1, 1])
with pm.Model() as test_model:
p_0_rv = pm.Dirichlet("p_0", np.r_[1, 1])
p_1_rv = pm.Dirichlet("p_1", np.r_[1, 1])

P_tt = tt.stack([p_0_rv, p_1_rv])
P_rv = pm.Deterministic("P_tt", P_tt)
P_tt = tt.stack([p_0_rv, p_1_rv])
P_rv = pm.Deterministic("P_tt", P_tt)

pi_0_tt = simulation["pi_0"]
y_test = simulation["Y_t"]
pi_0_tt = simulation["pi_0"]
y_test = simulation["Y_t"]

S_rv = HMMStateSeq("S_t", y_test.shape[0], P_rv, pi_0_tt)
S_rv.tag.test_value = (y_test > 0).astype(np.int)
S_rv = HMMStateSeq("S_t", y_test.shape[0], P_rv, pi_0_tt)
S_rv.tag.test_value = (y_test > 0).astype(np.int)

X = gen_design_matrix(N)
beta_s_intercept = pm.Normal("beta_s_intercept", 8, 1, shape=(1,))
beta_s_hour = pm.Normal("beta_s_hour", 1, 0.5, shape=(23,))
beta_s_week = pm.Normal("beta_s_week", 1, 0.5, shape=(6,))
X = gen_design_matrix(N)
beta_s_intercept = pm.Normal(
"beta_s_intercept", np.log(3000), 1, shape=(1,)
)
beta_s_hour = pm.Normal("beta_s_hour", 1, 0.5, shape=(23,))
beta_s_week = pm.Normal("beta_s_week", 1, 0.5, shape=(6,))

beta_s = pm.Deterministic(
"beta_s", tt.concatenate([beta_s_intercept, beta_s_hour, beta_s_week])
)
mu = tt.exp(tt.dot(X, beta_s))
beta_s = pm.Deterministic(
"beta_s", tt.concatenate([beta_s_intercept, beta_s_hour, beta_s_week])
)
mu = tt.exp(tt.dot(X, beta_s))

Y_rv = SwitchingProcess(
"Y_t", [pm.Constant.dist(0), pm.Poisson.dist(mu)], S_rv, observed=y_test,
)
with test_model:
mu_step = pm.NUTS([mu, beta_s])
ffbs = FFBSStep([S_rv])
transitions = TransMatConjugateStep([p_0_rv, p_1_rv], S_rv)
steps = [ffbs, mu_step, transitions]
trace_ = pm.sample(N, step=steps, return_inferencedata=True, chains=1)
posterior = pm.sample_posterior_predictive(trace_.posterior)
Y_rv = SwitchingProcess(
"Y_t",
[pm.Constant.dist(0), pm.Poisson.dist(mu)],
S_rv,
observed=y_test,
)
with test_model:
mu_step = pm.NUTS([mu, beta_s])
ffbs = FFBSStep([S_rv])
transitions = TransMatConjugateStep([p_0_rv, p_1_rv], S_rv)
steps = [ffbs, mu_step, transitions]
trace_ = pm.sample(N, step=steps, return_inferencedata=True, chains=1)
posterior = pm.sample_posterior_predictive(trace_.posterior)

check_metrics_for_sampling(trace_, posterior, simulation)
check_metrics_for_sampling(trace_, posterior, simulation)
115 changes: 59 additions & 56 deletions tests/test_ztp.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,59 +70,62 @@ def gen_design_matrix(N):


def test_seasonality_ztp_sampling(N: int = 200, off_param=1):
np.random.seed(2032)

X_t = gen_design_matrix(N)
betas_intercept = np.random.normal(np.log(3000), 1, size=1)
betas_hour = np.sort(np.random.normal(0.5, 0.1, size=23))
betas_week = np.sort(np.random.normal(1, 0.1, size=6))

betas = tt.concatenate([betas_intercept, betas_hour, betas_week])
eta_r = tt.exp(tt.dot(X_t, betas))

cls = ZeroTruncatedPoisson

kwargs = {
"N": N,
"mus": np.r_[eta_r],
"pi_0_a": np.r_[1, 1],
"Gamma": np.r_["0,2,1", [10, 1], [5, 5]],
"cls": cls,
}
simulation, _ = simulate_poiszero_hmm(**kwargs)

with pm.Model() as test_model:
p_0_rv = pm.Dirichlet("p_0", np.r_[1, 1])
p_1_rv = pm.Dirichlet("p_1", np.r_[1, 1])

P_tt = tt.stack([p_0_rv, p_1_rv])
P_rv = pm.Deterministic("P_tt", P_tt)

pi_0_tt = simulation["pi_0"]
y_test = simulation["Y_t"]

S_rv = HMMStateSeq("S_t", y_test.shape[0], P_rv, pi_0_tt)
S_rv.tag.test_value = (y_test > 0).astype(np.int)

X = gen_design_matrix(N)
beta_s_intercept = pm.Normal("beta_s_intercept", np.log(3000), 1, shape=(1,))
beta_s_hour = pm.Normal("beta_s_hour", 0.5, 0.1, shape=(23,))
beta_s_week = pm.Normal("beta_s_week", 1, 0.1, shape=(6,))

beta_s = pm.Deterministic(
"beta_s", tt.concatenate([beta_s_intercept, beta_s_hour, beta_s_week])
)
mu = tt.exp(tt.dot(X, beta_s))

Y_rv = SwitchingProcess(
"Y_t", [pm.Constant.dist(0), cls.dist(mu)], S_rv, observed=y_test,
)
with test_model:
mu_step = pm.NUTS([mu, beta_s])
ffbs = FFBSStep([S_rv])
transitions = TransMatConjugateStep([p_0_rv, p_1_rv], S_rv)
steps = [ffbs, mu_step, transitions]
trace_ = pm.sample(N, step=steps, return_inferencedata=True, chains=1)
posterior = pm.sample_posterior_predictive(trace_.posterior)

check_metrics_for_sampling(trace_, posterior, simulation)
with np.errstate(over="warn", under="warn"):
np.random.seed(2032)

X_t = gen_design_matrix(N)
betas_intercept = np.random.normal(np.log(3000), 1, size=1)
betas_hour = np.sort(np.random.normal(0.5, 0.1, size=23))
betas_week = np.sort(np.random.normal(1, 0.1, size=6))

betas = tt.concatenate([betas_intercept, betas_hour, betas_week])
eta_r = tt.exp(tt.dot(X_t, betas))

cls = ZeroTruncatedPoisson

kwargs = {
"N": N,
"mus": np.r_[eta_r],
"pi_0_a": np.r_[1, 1],
"Gamma": np.r_["0,2,1", [10, 1], [5, 5]],
"cls": cls,
}
simulation, _ = simulate_poiszero_hmm(**kwargs)

with pm.Model() as test_model:
p_0_rv = pm.Dirichlet("p_0", np.r_[1, 1])
p_1_rv = pm.Dirichlet("p_1", np.r_[1, 1])

P_tt = tt.stack([p_0_rv, p_1_rv])
P_rv = pm.Deterministic("P_tt", P_tt)

pi_0_tt = simulation["pi_0"]
y_test = simulation["Y_t"]

S_rv = HMMStateSeq("S_t", y_test.shape[0], P_rv, pi_0_tt)
S_rv.tag.test_value = (y_test > 0).astype(np.int)

X = gen_design_matrix(N)
beta_s_intercept = pm.Normal(
"beta_s_intercept", np.log(3000), 1, shape=(1,)
)
beta_s_hour = pm.Normal("beta_s_hour", 0.5, 0.1, shape=(23,))
beta_s_week = pm.Normal("beta_s_week", 1, 0.1, shape=(6,))

beta_s = pm.Deterministic(
"beta_s", tt.concatenate([beta_s_intercept, beta_s_hour, beta_s_week])
)
mu = tt.exp(tt.dot(X, beta_s))

Y_rv = SwitchingProcess(
"Y_t", [pm.Constant.dist(0), cls.dist(mu)], S_rv, observed=y_test,
)
with test_model:
mu_step = pm.NUTS([mu, beta_s])
ffbs = FFBSStep([S_rv])
transitions = TransMatConjugateStep([p_0_rv, p_1_rv], S_rv)
steps = [ffbs, mu_step, transitions]
trace_ = pm.sample(N, step=steps, return_inferencedata=True, chains=1)
posterior = pm.sample_posterior_predictive(trace_.posterior)

check_metrics_for_sampling(trace_, posterior, simulation)

0 comments on commit ef4b0f3

Please sign in to comment.