Skip to content

create standard dgp for metric aggregation #705

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 14 commits into
base: main
Choose a base branch
from
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 180 additions & 0 deletions econml/data/dgps.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.utils import check_random_state
from scipy.special import expit

_ihdp_sim_file = os.path.join(os.path.dirname(__file__), "ihdp", "sim.csv")
_ihdp_sim_data = pd.read_csv(_ihdp_sim_file)
Expand Down Expand Up @@ -93,3 +94,182 @@ def _process_ihdp_sim_data():
# Append a column of ones as intercept
X = np.insert(X, 0, np.ones(X.shape[0]), axis=1)
return T, X


class StandardDGP():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(very minor)

Suggested change
class StandardDGP():
class StandardDGP:

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add docstring, especially since some arguments can be a function or a dict.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably support generating and using W as well.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably support using a user-specified featurizer on X.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add tests to at least verify that the API can actually be used and that you get arrays of the right dimensions out, etc., even if there aren't good checks for the actual data (but perhaps there are useful checks there, too)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we expect users to use this class to test things? If so, perhaps also add a DGP section to the documentation (and at least add it to the module docs). But if this is mainly for internal use, add a comment to the docstring indicating that instead.

def __init__(self,
n=1000,
d_t=1,
d_y=1,
d_x=5,
d_z=None,
discrete_treatment=False,
discrete_instrument=False,
squeeze_T=False,
squeeze_Y=False,
nuisance_Y=None,
nuisance_T=None,
nuisance_TZ=None,
theta=None,
y_of_t=None,
x_eps=1,
y_eps=1,
t_eps=1
):
self.n = n
self.d_t = d_t
self.d_y = d_y
self.d_x = d_x
self.d_z = d_z

self.discrete_treatment = discrete_treatment
self.discrete_instrument = discrete_instrument
self.squeeze_T = squeeze_T
self.squeeze_Y = squeeze_Y

if callable(nuisance_Y):
self.nuisance_Y = nuisance_Y
else: # else must be dict
if nuisance_Y is None:
nuisance_Y = {'support': self.d_x, 'degree': 1}
nuisance_Y['k'] = self.d_x
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider asserting the expected type here, something like:

Suggested change
nuisance_Y['k'] = self.d_x
assert(isinstance(nuisance_Y, dict), f"nuisance_Y must be a callable or dict, but got {type(nuisance_Y)}")
nuisance_Y['k'] = self.d_x

(and likewise for the other similar arguments)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting nuisance_Y['k'] is not strictly necessary since the default behavior of gen_nuisance will do this for you, but perhaps it's worth being explicit.

More importantly, making this assignment will mutate the dictionary that was passed in, which is possibly undesirable; probably better to do something like:

Suggested change
nuisance_Y['k'] = self.d_x
nuisance_Y = { **nuisance_Y, 'k':self.d_x }

which will create a new dictionary instead of altering the old one.

Furthermore, what if the dictionary already had a 'k' entry - do you really want to ignore and override it?

self.nuisance_Y, self.nuisance_Y_coefs = self.gen_nuisance(**nuisance_Y)

if callable(nuisance_T):
self.nuisance_T = nuisance_T
else: # else must be dict
if nuisance_T is None:
nuisance_T = {'support': self.d_x, 'degree': 1}
nuisance_T['k'] = self.d_x
self.nuisance_T, self.nuisance_T_coefs = self.gen_nuisance(**nuisance_T)

if self.d_z:
if callable(nuisance_TZ):
self.nuisance_TZ = nuisance_TZ
else: # else must be dict
if nuisance_TZ is None:
nuisance_TZ = {'support': self.d_z, 'degree': 1}
nuisance_TZ['k'] = self.d_z
self.nuisance_TZ, self.nuisance_TZ_coefs = self.gen_nuisance(**nuisance_TZ)
else:
self.nuisance_TZ = lambda x: 0

if callable(theta):
self.theta = theta
else: # else must be dict
if theta is None:
theta = {'support': self.d_x, 'degree': 1, 'bounds': [1, 2], 'intercept': True}
theta['k'] = self.d_x
self.theta, self.theta_coefs = self.gen_nuisance(**theta)

if callable(y_of_t):
self.y_of_t = y_of_t
else: # else must be dict
if y_of_t is None:
y_of_t = {'support': self.d_t, 'degree': 1, 'bounds': [1, 1]}
y_of_t['k'] = self.d_t
self.y_of_t, self.y_of_t_coefs = self.gen_nuisance(**y_of_t)

self.x_eps = x_eps
self.y_eps = y_eps
self.t_eps = t_eps

def gen_Y(self):
self.y_noise = np.random.normal(size=(self.n, self.d_y), scale=self.y_eps)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider allowing the user to specify a seed so that results are reproducible.

self.Y = self.theta(self.X) * self.y_of_t(self.T) + self.nuisance_Y(self.X) + self.y_noise
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like y_of_t is really a treatment featurizer, is that right? If so, change the name accordingly.

return self.Y

def gen_X(self):
self.X = np.random.normal(size=(self.n, self.d_x), scale=self.x_eps)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider allowing heavier-tailed distributions than just normal (or incorporate outliers in some other way), here and wherever else you're using randomness.

return self.X

def gen_T(self):
noise = np.random.normal(size=(self.n, self.d_t), scale=self.t_eps)
self.T_noise = noise

if self.discrete_treatment:
prob_T = expit(self.nuisance_T(self.X) + self.nuisance_TZ(self.Z) + self.T_noise)
self.T = np.random.binomial(1, prob_T)
return self.T

else:
self.T = self.nuisance_T(self.X) + self.nuisance_TZ(self.Z) + self.T_noise
return self.T

def gen_Z(self):
if self.d_z:
if self.discrete_instrument:
# prob_Z = expit(np.random.normal(size=(self.n, self.d_z)))
# self.Z = np.random.binomial(1, prob_Z, size=(self.n, 1))
# self.Z = np.random.binomial(1, prob_Z)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove dead code unless it has a purpose (like presenting an alternative we might want to use in some specific scenario, in which case also document what that purpose is), in which case also document what that purpose is

self.Z = np.random.binomial(1, 0.5, size=(self.n, self.d_z))
return self.Z

else:
Z_noise = np.random.normal(size=(self.n, self.d_z), loc=3, scale=3)
self.Z = Z_noise
return self.Z

else:
self.Z = None
return self.Z

def gen_nuisance(self, k=None, support=1, bounds=[-1, 1], degree=1, intercept=False):
if not k:
k = self.d_x

coefs = np.random.uniform(low=bounds[0], high=bounds[1], size=k)
supports = np.random.choice(k, size=support, replace=False)
mask = np.zeros(shape=k)
mask[supports] = 1
coefs = coefs * mask

# orders = np.random.randint(1, degree, k) if degree!=1 else np.ones(shape=(k,))
orders = np.ones(shape=(k,)) * degree # enforce all to be the degree for now

if intercept:
intercept = np.random.uniform(low=1, high=2)
else:
intercept = 0

def calculate_nuisance(W):
W2 = np.copy(W)
for i in range(0, k):
W2[:, i] = W[:, i]**orders[i]
out = W2.dot(coefs)
return out.reshape(-1, 1) + intercept

return calculate_nuisance, coefs

def effect(self, X, T0, T1):
if T0 is None or T0 == 0:
T0 = np.zeros(shape=(T1.shape[0], self.d_t))

effect_t1 = self.theta(X) * self.y_of_t(T1)
effect_t0 = self.theta(X) * self.y_of_t(T0)
return effect_t1 - effect_t0

def const_marginal_effect(self, X):
return self.theta(X)

def gen_data(self):
X = self.gen_X()
Z = self.gen_Z()
T = self.gen_T()
Y = self.gen_Y()

if self.squeeze_T:
T = T.squeeze()
if self.squeeze_Y:
Y = Y.squeeze()

data_dict = {
'Y': Y,
'T': T,
'X': X
}

if self.d_z:
data_dict['Z'] = Z

return data_dict