Skip to content

create standard dgp for metric aggregation #705

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 14 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
303 changes: 303 additions & 0 deletions econml/data/dgps.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.utils import check_random_state
from scipy.special import expit

_ihdp_sim_file = os.path.join(os.path.dirname(__file__), "ihdp", "sim.csv")
_ihdp_sim_data = pd.read_csv(_ihdp_sim_file)
Expand Down Expand Up @@ -93,3 +94,305 @@ def _process_ihdp_sim_data():
# Append a column of ones as intercept
X = np.insert(X, 0, np.ones(X.shape[0]), axis=1)
return T, X


class StandardDGP:
"""
A class to generate synthetic causal datasets

Parameters
----------
n: int
Number of observations to generate

d_t: int
Dimensionality of treatment

d_y: int
Dimensionality of outcome

d_x: int
Dimensionality of features

d_z: int
Dimensionality of instrument

discrete_treatment: bool
Dimensionality of treatment

discrete_isntrument: bool
Dimensionality of instrument

squeeze_T: bool
Whether to squeeze the final T array on output

squeeze_Y: bool
Whether to squeeze the final Y array on output

nuisance_Y: func or dict
Nuisance function. Describes how the covariates affect the outcome.
If a function, this function will be used on features X to partially generate Y.
If a dict, must include 'support' and 'degree' keys.

nuisance_T: func or dict
Nuisance function. Describes how the covariates affect the treatment.
If a function, this function will be used on features X to partially generate T.
If a dict, must include 'support' and 'degree' keys.

nuisance_TZ: func or dict
Nuisance function. Describes how the instrument affects the treatment.
If a function, this function will be used on instrument Z to partially generate T.
If a dict, must include 'support' and 'degree' keys.

theta: func or dict
Describes how the features affects the treatment effect heterogenity.
If a function, this function will be used on features X to calculate treatment effect heterogenity.
If a dict, must include 'support' and 'degree' keys.

y_of_t: func or dict
Describes how the treatment affects the outcome.
If a function, this function will be used directly.
If a dict, must include 'support' and 'degree' keys.

x_noise: str
Type of noise to use for covariate generation. Must be a method of np.random.RandomState()

y_noise: str
Type of noise to use for outcome generation. Must be a method of np.random.RandomState()

t_noise: str
Type of noise to use for treatment generation. Must be a method of np.random.RandomState()

x_noise_params: dict
Parameters to pass to x noise function

y_noise_params: dict
Parameters to pass to x noise function

t_noise_params: dict
Parameters to pass to x noise function

"""

def __init__(self,
n=1000,
d_t=1,
d_y=1,
d_x=5,
d_z=None,
discrete_treatment=False,
discrete_instrument=False,
squeeze_T=False,
squeeze_Y=False,
nuisance_Y=None,
nuisance_T=None,
nuisance_TZ=None,
theta=None,
y_of_t=None,
x_noise='normal',
y_noise='normal',
t_noise='normal',
x_noise_params={},
y_noise_params={},
t_noise_params={},
random_state=None
):
self._random_state = check_random_state(random_state)
self.n = n
self.d_t = d_t
self.d_y = d_y
self.d_x = d_x
self.d_z = d_z

self.discrete_treatment = discrete_treatment
self.discrete_instrument = discrete_instrument
self.squeeze_T = squeeze_T
self.squeeze_Y = squeeze_Y

if callable(nuisance_Y):
self.nuisance_Y = nuisance_Y
else: # else must be dict
self.nuisance_Y_params = {'k': self.d_x, 'support': self.d_x, 'degree': 1}
if nuisance_Y:
assert isinstance(
nuisance_Y, dict), f"nuisance_Y must be a callable or dict, but got {type(nuisance_Y)}"
self.nuisance_Y_params.update(nuisance_Y)

self.nuisance_Y, self.nuisance_Y_coefs = self.gen_nuisance(**self.nuisance_Y_params)

if callable(nuisance_T):
self.nuisance_T = nuisance_T
else: # else must be dict
self.nuisance_T_params = {'k': self.d_x, 'support': self.d_x, 'degree': 1}
if nuisance_T:
assert isinstance(
nuisance_T, dict), f"nuisance_T must be a callable or dict, but got {type(nuisance_T)}"
self.nuisance_T_params.update(nuisance_T)

self.nuisance_T, self.nuisance_T_coefs = self.gen_nuisance(**self.nuisance_T_params)
if self.d_z:
if callable(nuisance_TZ):
self.nuisance_TZ = nuisance_TZ
else: # else must be dict
self.nuisance_TZ_params = {'k': self.d_z, 'support': self.d_z, 'degree': 1}
if nuisance_TZ:
assert isinstance(
nuisance_TZ, dict), f"nuisance_TZ must be a callable or dict, but got {type(nuisance_TZ)}"
self.nuisance_TZ_params.update(nuisance_TZ)

self.nuisance_TZ, self.nuisance_TZ_coefs = self.gen_nuisance(**self.nuisance_TZ_params)
else:
self.nuisance_TZ = lambda x: 0

if callable(theta):
self.theta = theta
else: # else must be dict
self.theta_params = {'k': self.d_x, 'support': self.d_x,
'degree': 1, 'bounds': [1, 2], 'intercept': [1, 2]}
if theta:
assert isinstance(theta, dict), f"theta must be a callable or dict, but got {type(theta)}"
self.theta_params.update(theta)

self.theta, self.theta_coefs = self.gen_nuisance(**self.theta_params)

if callable(y_of_t):
self.y_of_t = y_of_t
else: # else must be dict
self.y_of_t_params = {'k': self.d_t, 'support': self.d_t, 'degree': 1, 'bounds': [1, 1]}
if y_of_t:
assert isinstance(y_of_t, dict), f"y_of_t must be a callable or dict, but got {type(y_of_t)}"
self.y_of_t_params.update(y_of_t)

self.y_of_t, self.y_of_t_coefs = self.gen_nuisance(**self.y_of_t_params)

self.x_noise = x_noise
self.y_noise = y_noise
self.t_noise = t_noise

x_noise_params = x_noise_params.copy()
x_noise_params['size'] = (self.n, self.d_x)
y_noise_params = y_noise_params.copy()
y_noise_params['size'] = (self.n, self.d_y)
t_noise_params = t_noise_params.copy()
t_noise_params['size'] = (self.n, self.d_t)

self.x_noise_params = x_noise_params
self.y_noise_params = y_noise_params
self.t_noise_params = t_noise_params

def gen_Y(self):
self.y_noise = getattr(self._random_state, self.y_noise)(**self.y_noise_params)
self.Y = self.theta(self.X) * self.y_of_t(self.T) + self.nuisance_Y(self.X) + self.y_noise
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like y_of_t is really a treatment featurizer, is that right? If so, change the name accordingly.

return self.Y

def gen_X(self):
self.X = getattr(self._random_state, self.x_noise)(**self.x_noise_params)
return self.X

def gen_T(self):
noise = getattr(self._random_state, self.t_noise)(**self.t_noise_params)
self.T_noise = noise

if self.discrete_treatment:
prob_T = expit(self.nuisance_T(self.X) + self.nuisance_TZ(self.Z) + self.T_noise)
self.T = self._random_state.binomial(1, prob_T)
return self.T

else:
self.T = self.nuisance_T(self.X) + self.nuisance_TZ(self.Z) + self.T_noise
return self.T

def gen_Z(self):
if self.d_z:
if self.discrete_instrument:
self.Z = self._random_state.binomial(1, 0.5, size=(self.n, self.d_z))
return self.Z

else:
Z_noise = self._random_state.normal(size=(self.n, self.d_z), loc=0, scale=1)
self.Z = Z_noise
return self.Z

else:
self.Z = None
return self.Z

def gen_nuisance(self, k=None, support=1, bounds=[1, 2], degree=1, intercept=None):
"""
A function to generate nuisance functions. Returns a nuisance function and corresponding coefs.

Parameters
----------
k: int
Dimension of input for nuisance function

support: int
Number of non-zero coefficients

bounds: int list
Bounds for coefficients which will be generated uniformly. Represented as [low, high]

degree: int
Input will be raised to this degree before multiplying with coefficients

intercept:
Bounds for intercept which will be generated uniformly. Represented as [low, high]
"""
if not k:
k = self.d_x

coefs = self._random_state.uniform(low=bounds[0], high=bounds[1], size=k)
supports = self._random_state.choice(k, size=support, replace=False)
mask = np.zeros(shape=k)
mask[supports] = 1
coefs = coefs * mask

orders = np.ones(shape=(k,)) * degree # enforce all to be the degree for now

if intercept:
assert len(intercept) == 2, 'intercept must be a list of 2 numbers, representing lower and upper bounds'
intercept = self._random_state.uniform(low=intercept[0], high=intercept[1])
else:
intercept = 0

def calculate_nuisance(W):
W2 = np.copy(W)
for i in range(0, k):
W2[:, i] = W[:, i]**orders[i]
out = W2.dot(coefs)
return out.reshape(-1, 1) + intercept

return calculate_nuisance, coefs

def effect(self, X, T0, T1):
if T0 is None or T0 == 0:
T0 = np.zeros(shape=(T1.shape[0], self.d_t))

effect_t1 = self.theta(X) * self.y_of_t(T1)
effect_t0 = self.theta(X) * self.y_of_t(T0)
return effect_t1 - effect_t0

def const_marginal_effect(self, X):
return self.theta(X)

def gen_data(self):
X = self.gen_X()
Z = self.gen_Z()
T = self.gen_T()
Y = self.gen_Y()

if self.squeeze_T:
T = T.squeeze()
if self.squeeze_Y:
Y = Y.squeeze()

data_dict = {
'Y': Y,
'T': T,
'X': X
}

if self.d_z:
data_dict['Z'] = Z

return data_dict
Loading