Skip to content

Add GrassiaIIGeometric Distribution #528

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 18 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pymc_extras/distributions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
BetaNegativeBinomial,
GeneralizedPoisson,
Skellam,
GrassiaIIGeometric,
)
from pymc_extras.distributions.histogram_utils import histogram_approximation
from pymc_extras.distributions.multivariate import R2D2M2CP
Expand All @@ -38,5 +39,6 @@
"R2D2M2CP",
"Skellam",
"histogram_approximation",
"GrassiaIIGeometric",
"PartialOrder",
]
217 changes: 217 additions & 0 deletions pymc_extras/distributions/discrete.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import pymc as pm

from pymc.distributions.dist_math import betaln, check_parameters, factln, logpow
from pymc.distributions.distribution import Discrete
from pymc.distributions.shape_utils import rv_size_is_none
from pytensor import tensor as pt
from pytensor.tensor.random.op import RandomVariable
Expand Down Expand Up @@ -399,3 +400,219 @@ def dist(cls, mu1, mu2, **kwargs):
class_name="Skellam",
**kwargs,
)


class GrassiaIIGeometricRV(RandomVariable):
name = "g2g"
signature = "(),(),()->()"

dtype = "int64"
_print_name = ("GrassiaIIGeometric", "\\operatorname{GrassiaIIGeometric}")

def __call__(self, r, alpha, time_covariate_vector=None, size=None, **kwargs):
return super().__call__(r, alpha, time_covariate_vector, size=size, **kwargs)

@classmethod
def rng_fn(cls, rng, r, alpha, time_covariate_vector, size):
# Handle None case for time_covariate_vector
if time_covariate_vector is None:
time_covariate_vector = 0.0

# Convert inputs to numpy arrays - these should be concrete values
r = np.asarray(r, dtype=np.float64)
alpha = np.asarray(alpha, dtype=np.float64)
time_covariate_vector = np.asarray(time_covariate_vector, dtype=np.float64)

# Determine output size
if size is None:
size = np.broadcast_shapes(r.shape, alpha.shape, time_covariate_vector.shape)

# Broadcast parameters to the output size
r = np.broadcast_to(r, size)
alpha = np.broadcast_to(alpha, size)
time_covariate_vector = np.broadcast_to(time_covariate_vector, size)

# Calculate exp(time_covariate_vector) for all samples
exp_time_covar_sum = np.exp(time_covariate_vector)

# Use a simpler approach: generate from a geometric distribution with transformed parameters
# This is an approximation but should be much faster and more reliable
lam = rng.gamma(shape=r, scale=1 / alpha, size=size)
lam_covar = lam * exp_time_covar_sum

# Handle numerical stability for very small lambda values
p = np.where(
lam_covar < 0.0001,
lam_covar, # For small values, set this to p
1 - np.exp(-lam_covar),
)

# Ensure p is in valid range for geometric distribution
p = np.clip(p, np.finfo(float).tiny, 1.0)

# Generate geometric samples
return rng.geometric(p)


g2g = GrassiaIIGeometricRV()


class GrassiaIIGeometric(Discrete):
r"""Grassia(II)-Geometric distribution.

This distribution is a flexible alternative to the Geometric distribution for the number of trials until a
discrete event, and can be extended to support both static and time-varying covariates.

Hardie and Fader describe this distribution with the following PMF and survival functions in [1]_:

.. math::
\mathbb{P}T=t|r,\alpha,\beta;Z(t)) = (\frac{\alpha}{\alpha+C(t-1)})^{r} - (\frac{\alpha}{\alpha+C(t)})^{r} \\
\begin{align}
\mathbb{S}(t|r,\alpha,\beta;Z(t)) = (\frac{\alpha}{\alpha+C(t)})^{r} \\
\end{align}

.. plot::
:context: close-figs

import matplotlib.pyplot as plt
import numpy as np
import scipy.stats as st
import arviz as az
plt.style.use('arviz-darkgrid')
t = np.arange(1, 11)
alpha_vals = [1., 1., 2., 2.]
r_vals = [.1, .25, .5, 1.]
for alpha, r in zip(alpha_vals, r_vals):
pmf = (alpha/(alpha + t - 1))**r - (alpha/(alpha+t))**r
plt.plot(t, pmf, '-o', label=r'$\alpha$ = {}, $r$ = {}'.format(alpha, r))
plt.xlabel('t', fontsize=12)
plt.ylabel('p(t)', fontsize=12)
plt.legend(loc=1)
plt.show()

======== ===============================================
Support :math:`t \in \mathbb{N}_{>0}`
======== ===============================================

Parameters
----------
r : tensor_like of float
Shape parameter (r > 0).
alpha : tensor_like of float
Scale parameter (alpha > 0).
time_covariate_vector : tensor_like of float, optional
Optional vector of dot product of time-varying covariates and their coefficients by time period.

References
----------
.. [1] Fader, Peter & G. S. Hardie, Bruce (2020).
"Incorporating Time-Varying Covariates in a Simple Mixture Model for Discrete-Time Duration Data."
https://www.brucehardie.com/notes/037/time-varying_covariates_in_BG.pdf
"""

rv_op = g2g

@classmethod
def dist(cls, r, alpha, time_covariate_vector=None, *args, **kwargs):
r = pt.as_tensor_variable(r)
alpha = pt.as_tensor_variable(alpha)
if time_covariate_vector is None:
time_covariate_vector = pt.constant(0.0)
time_covariate_vector = pt.as_tensor_variable(time_covariate_vector)
return super().dist([r, alpha, time_covariate_vector], *args, **kwargs)

def logp(value, r, alpha, time_covariate_vector=None):
if time_covariate_vector is None:
time_covariate_vector = pt.constant(0.0)
time_covariate_vector = pt.as_tensor_variable(time_covariate_vector)

def C_t(t):
# Aggregate time_covariate_vector over active time periods
if t == 0:
return pt.constant(1.0)
# Handle case where time_covariate_vector is a scalar
if time_covariate_vector.ndim == 0:
return t * pt.exp(time_covariate_vector)
else:
# For vector time_covariate_vector, we need to handle symbolic indexing
# Since we can't slice with symbolic indices, we'll use a different approach
# For now, we'll use the first element multiplied by t
# This is a simplification but should work for basic cases
return t * pt.exp(time_covariate_vector[:t])

# Calculate the PMF on log scale
logp = pt.log(
pt.pow(alpha / (alpha + C_t(value - 1)), r) - pt.pow(alpha / (alpha + C_t(value)), r)
)

# Handle invalid values
logp = pt.switch(
pt.or_(
value < 1, # Value must be >= 1
pt.isnan(logp), # Handle NaN cases
),
-np.inf,
logp,
)

return check_parameters(
logp,
r > 0,
alpha > 0,
msg="r > 0, alpha > 0",
)

def logcdf(value, r, alpha, time_covariate_vector=None):
if time_covariate_vector is None:
time_covariate_vector = pt.constant(0.0)
time_covariate_vector = pt.as_tensor_variable(time_covariate_vector)

# Calculate CDF on log scale
# For the GrassiaIIGeometric, the CDF is 1 - survival function
# S(t) = (alpha/(alpha + C(t)))^r
# CDF(t) = 1 - S(t)

def C_t(t):
if t == 0:
return pt.constant(1.0)
if time_covariate_vector.ndim == 0:
return t * pt.exp(time_covariate_vector)
else:
return t * pt.exp(time_covariate_vector[:t])

survival = pt.pow(alpha / (alpha + C_t(value)), r)
logcdf = pt.log(1 - survival)

return check_parameters(
logcdf,
r > 0,
alpha > 0,
msg="r > 0, alpha > 0",
)

def support_point(rv, size, r, alpha, time_covariate_vector=None):
"""Calculate a reasonable starting point for sampling.

For the GrassiaIIGeometric distribution, we use a point estimate based on
the expected value of the mixing distribution. Since the mixing distribution
is Gamma(r, 1/alpha), its mean is r/alpha. We then transform this through
the geometric link function and round to ensure an integer value.

When time_covariate_vector is provided, it affects the expected value through
the exponential link function: exp(time_covariate_vector).
"""
# Base mean without covariates
mean = pt.exp(alpha / r)

# Apply time-varying covariates if provided
if time_covariate_vector is None:
time_covariate_vector = pt.constant(0.0)
mean = mean * pt.exp(time_covariate_vector)

# Round up to nearest integer
mean = pt.ceil(mean)

if not rv_size_is_none(size):
mean = pt.full(size, mean)

return mean
Loading
Loading