diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index c2d05835..2b6b39af 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -21,7 +21,7 @@ jobs: python-version: ${{ matrix.python-version }} - if: matrix.os == 'windows-latest' name: Install dependencies - Windows - run: pip install 'torch>=1,<2' -f https://download.pytorch.org/whl/torch_stable.html + run: pip install 'torch>=1,<1.8' -f https://download.pytorch.org/whl/torch_stable.html - name: Install package run: pip install invoke .[dev] - name: invoke lint @@ -58,7 +58,7 @@ jobs: python-version: ${{ matrix.python-version }} - if: matrix.os == 'windows-latest' name: Install dependencies - Windows - run: pip install 'torch>=1,<2' -f https://download.pytorch.org/whl/torch_stable.html + run: pip install 'torch>=1,<1.8' -f https://download.pytorch.org/whl/torch_stable.html - name: Install package and dependencies run: pip install invoke .[test] - name: invoke pytest @@ -105,7 +105,7 @@ jobs: - if: matrix.os == 'windows-latest' name: Install dependencies - Windows run: | - pip install 'torch>=1,<2' -f https://download.pytorch.org/whl/torch_stable.html + pip install 'torch>=1,<1.8' -f https://download.pytorch.org/whl/torch_stable.html choco install graphviz - name: Install package and dependencies run: pip install invoke jupyter .[ctgan] diff --git a/conda/meta.yaml b/conda/meta.yaml index a4af70ff..99c2f407 100644 --- a/conda/meta.yaml +++ b/conda/meta.yaml @@ -25,8 +25,8 @@ requirements: - pomegranate >=0.13.4,<0.14.2 - pytorch >=1.4,<2 - sktime >=0.4,<0.6 + - copulas>=0.5.0,<0.6 - rdt >=0.4.0,<0.5 - run: - python >=3.6,<3.9 - scikit-learn >=0.23,<1 @@ -36,6 +36,7 @@ requirements: - pomegranate >=0.13.4,<0.14.2 - pytorch >=1.4,<2 - sktime >=0.4,<0.6 + - copulas>=0.5.0,<0.6 - rdt >=0.4.0,<0.5 about: diff --git a/sdmetrics/single_table/README.md b/sdmetrics/single_table/README.md index 973789e2..82b7baf5 100644 --- a/sdmetrics/single_table/README.md +++ b/sdmetrics/single_table/README.md @@ -44,6 +44,30 @@ Implemented metrics: * `MLEfficacy`: Generic ML Efficacy metric that detects the type of ML Problem associated with the dataset by analyzing the target column type and then applies all the metrics that are compatible with it. +* Privacy Metrics: Metrics that fit an adversial attacker model on the synthetic data and + then evaluate its accuracy (or probability of making the correct attack) on the real data. + * `CategoricalCAP`: Privacy Metric for categorical columns, based + on the Correct Attribution Probability method. + * `CategoricalZeroCAP`: Privacy Metric for categorical columns, based + on the Correct Attribution Probability method. + * `CategoricalGeneralizedCAP`: Privacy Metric for categorical columns, based + on the Correct Attribution Probability method. + * `NumericalMLP`: Privacy Metric for numerical columns, based + on MLPRegressor from scikit-learn. + * `NumericalLR`: Privacy Metric for numerical columns, based + on LinearRegression from scikit-learn. + * `NumericalSVR`: Privacy Metric for numerical columns, based + on SVR from scikit-learn. + * `CategoricalKNN`: Privacy Metric for categorical columns, based + on KNeighborsClassifier from scikit-learn. + * `CategoricalNB`: Privacy Metric for categorical columns, based + on CategoricalNB from scikit-learn. + * `CategoricalRF`: Privacy Metric for categorical columns, based + on RandomForestClassifier from scikit-learn. + * `CategoricalEnsemble`: Privacy Metric for categorical columns, based + on an 'ensemble' of other categorical Privacy Metrics. + * `NumericalRadiusNearestNeighbor`: Privacy Metric for numerical columns, based + on an implementation of the Radius Nearest Neighbor method. * MultiSingleColumn Metrics: Metrics that apply a Single Column metric on each column from the table that is compatible with it and then compute the average across all the columns. * `CSTest`: MultiSingleColumn metric based on applying the Single Column CSTest on all @@ -86,7 +110,18 @@ Out[2]: 'KSTest': sdmetrics.single_table.multi_single_column.KSTest, 'KSTestExtended': sdmetrics.single_table.multi_single_column.KSTestExtended, 'ContinuousKLDivergence': sdmetrics.single_table.multi_column_pairs.ContinuousKLDivergence, - 'DiscreteKLDivergence': sdmetrics.single_table.multi_column_pairs.DiscreteKLDivergence} + 'DiscreteKLDivergence': sdmetrics.single_table.multi_column_pairs.DiscreteKLDivergence, + 'CategoricalCAP': sdmetrics.single_table.privacy.cap, + 'CategoricalGeneralizedCAP': sdmetrics.single_table.privacy.cap, + 'CategoricalZeroCAP': sdmetrics.single_table.privacy.cap, + 'CategoricalKNN': sdmetrics.single_table.privacy.cap, + 'CategoricalNB': sdmetrics.single_table.privacy.cap, + 'CategoricalRF': sdmetrics.single_table.privacy.cap, + 'CategoricalEnsemble': sdmetrics.single_table.privacy.ensemble, + 'NumericalLR': sdmetrics.single_table.privacy.numerical_sklearn, + 'NumericalMLP': sdmetrics.single_table.privacy.numerical_sklearn, + 'NumericalSVR': sdmetrics.single_table.privacy.numerical_sklearn, + 'NumericalRadiusNearestNeighbor': sdmetrics.single_table.privacy.radius_nearest_neighbor} ``` ## Single Table Inputs and Outputs diff --git a/sdmetrics/single_table/privacy/__init__.py b/sdmetrics/single_table/privacy/__init__.py new file mode 100644 index 00000000..03a8a986 --- /dev/null +++ b/sdmetrics/single_table/privacy/__init__.py @@ -0,0 +1,25 @@ +from sdmetrics.single_table.privacy.base import CategoricalPrivacyMetric, NumericalPrivacyMetric +from sdmetrics.single_table.privacy.cap import ( + CategoricalCAP, CategoricalGeneralizedCAP, CategoricalZeroCAP) +from sdmetrics.single_table.privacy.categorical_sklearn import ( + CategoricalKNN, CategoricalNB, CategoricalRF) +from sdmetrics.single_table.privacy.ensemble import CategoricalEnsemble +from sdmetrics.single_table.privacy.numerical_sklearn import ( + NumericalLR, NumericalMLP, NumericalSVR) +from sdmetrics.single_table.privacy.radius_nearest_neighbor import NumericalRadiusNearestNeighbor + +__all__ = [ + 'CategoricalCAP', + 'CategoricalZeroCAP', + 'CategoricalGeneralizedCAP', + 'NumericalMLP', + 'NumericalLR', + 'NumericalSVR', + 'CategoricalKNN', + 'CategoricalNB', + 'CategoricalRF', + 'CategoricalPrivacyMetric', + 'NumericalPrivacyMetric', + 'CategoricalEnsemble', + 'NumericalRadiusNearestNeighbor' +] diff --git a/sdmetrics/single_table/privacy/base.py b/sdmetrics/single_table/privacy/base.py new file mode 100644 index 00000000..54ce376e --- /dev/null +++ b/sdmetrics/single_table/privacy/base.py @@ -0,0 +1,348 @@ +"""Base class for privacy metrics for single table datasets.""" +from enum import Enum + +import numpy as np + +from sdmetrics.goal import Goal +from sdmetrics.single_table.base import SingleTableMetric +from sdmetrics.single_table.privacy.loss import InverseCDFDistance + + +class CategoricalType(Enum): + """Enumerates the type required for a categorical data. + + The value can be one-hot-encoded, or coded as class number. + """ + + CLASS_NUM = "Class_num" + ONE_HOT = "One_hot" + + +class CategoricalPrivacyMetric(SingleTableMetric): + """Base class for Categorical Privacy metrics on single tables. + + These metrics fit an adversial attacker model on the synthetic data and + then evaluate its accuracy (or probability of making the correct attack) + on the real data. + + Attributes: + name (str): + Name to use when reports about this metric are printed. + goal (sdmetrics.goal.Goal): + The goal of this metric. + min_value (Union[float, tuple[float]]): + Minimum value or values that this metric can take. + max_value (Union[float, tuple[float]]): + Maximum value or values that this metric can take. + model: + Model class to use for the prediction. + model_kwargs: + Keyword arguments to use to create the model instance. + accuracy_base (bool): + True if the privacy score should be based on the accuracy of the attacker, + False if it should be based on the probability of making the correct attack. + """ + + name = None + goal = Goal.MAXIMIZE + min_value = 0 + max_value = 1 + MODEL = None + MODEL_KWARGS = {} + ACCURACY_BASE = None + + @classmethod + def _fit(cls, synthetic_data, key_fields, sensitive_fields, model_kwargs): + if model_kwargs is None: + model_kwargs = cls.MODEL_KWARGS.copy() if cls.MODEL_KWARGS else {} + + model = cls.MODEL(**model_kwargs) + model.fit(synthetic_data, key_fields, sensitive_fields) + return model + + @classmethod + def _validate_inputs(cls, real_data, synthetic_data, metadata, key_fields, sensitive_fields): + metadata = super()._validate_inputs(real_data, synthetic_data, metadata) + if 'key_fields' in metadata: + key_fields = metadata['key_fields'] + elif key_fields is None: + raise TypeError('`key_fields` must be passed either directly or inside `metadata`') + + if 'sensitive_fields' in metadata: + sensitive_fields = metadata['sensitive_fields'] + elif sensitive_fields is None: + raise TypeError( + '`sensitive_fields` must be passed either directly or inside `metadata`') + + return key_fields, sensitive_fields, metadata + + @classmethod + def compute(cls, real_data, synthetic_data, metadata=None, key_fields=None, + sensitive_fields=None, model_kwargs=None): + """Compute this metric. + + This fits an adversial attacker model on the synthetic data and + then evaluates it making predictions on the real data. + + A ``key_fields`` column(s) name must be given, either directly or as a first level + entry in the ``metadata`` dict, which will be used as the key column(s) for the + attack. + + A ``sensitive_fields`` column(s) name must be given, either directly or as a first level + entry in the ``metadata`` dict, which will be used as the sensitive_fields column(s) + for the attack. + + Args: + real_data (Union[numpy.ndarray, pandas.DataFrame]): + The values from the real dataset. + synthetic_data (Union[numpy.ndarray, pandas.DataFrame]): + The values from the synthetic dataset. + metadata (dict): + Table metadata dict. If not passed, it is build based on the + real_data fields and dtypes. + key_fields (list(str)): + Name of the column(s) to use as the key attributes. + sensitive_fields (list(str)): + Name of the column(s) to use as the sensitive attributes. + model_kwargs (dict): + Key word arguments of the attacker model. cls.MODEL_KWARGS will be used + if none is provided. + + Returns: + union[float, tuple[float]]: + Scores obtained by the attackers when evaluated on the real data. + """ + key_fields, sensitive_fields, metadata = cls._validate_inputs( + real_data, + synthetic_data, + metadata, + key_fields, + sensitive_fields + ) + + if len(key_fields) == 0 or len(sensitive_fields) == 0: + return np.nan + + for col in key_fields + sensitive_fields: + data_type = metadata['fields'][col] + if (data_type != cls._DTYPES_TO_TYPES['i'] + and data_type != cls._DTYPES_TO_TYPES['O'] + and data_type != cls._DTYPES_TO_TYPES['b']): # check data type + return np.nan + + model = cls._fit(synthetic_data, key_fields, sensitive_fields, model_kwargs) + + if cls.ACCURACY_BASE: # calculate privacy score based on prediction accuracy + count = len(real_data) + match = 0 + for idx in range(count): + key_data = tuple(real_data[key_fields].iloc[idx]) + sensitive_data = tuple(real_data[sensitive_fields].iloc[idx]) + pred_sensitive = model.predict(key_data) + if pred_sensitive == sensitive_data: + match += 1 + + return 1.0 - match / count + + else: # calculate privacy score based on posterior prob of the correct sensitive data + count = 0 + score = 0 + for idx in range(len(real_data)): + key_data = tuple(real_data[key_fields].iloc[idx]) + sensitive_data = tuple(real_data[sensitive_fields].iloc[idx]) + row_score = model.score(key_data, sensitive_data) + if row_score is not None: + count += 1 + score += row_score + + if count == 0: + return 0 + + return 1.0 - score / count + + +class NumericalPrivacyMetric(SingleTableMetric): + """Base class for Numerical Privacy metrics on single tables. + + These metrics fit an adversial attacker model on the synthetic data and + then evaluate its accuracy (or probability of making the correct attack) + on the real data. + + Attributes: + name (str): + Name to use when reports about this metric are printed. + goal (sdmetrics.goal.Goal): + The goal of this metric. + min_value (Union[float, tuple[float]]): + Minimum value or values that this metric can take. + max_value (Union[float, tuple[float]]): + Maximum value or values that this metric can take. + model (Class): + Model class to use for the prediction. + model_kwargs (dict): + Keyword arguments to use to create the model instance. + loss_function (Class): + Loss function to use when evaluating the accuracy of the privacy attack. + loss_function_kwargs (dict): + Keyword arguments to use to create the loss function instance. + """ + + name = None + goal = Goal.MAXIMIZE + min_value = 0 + max_value = np.inf + MODEL = None + MODEL_KWARGS = {} + LOSS_FUNCTION = InverseCDFDistance + LOSS_FUNCTION_KWARGS = {'p': 2} + + @classmethod + def _fit(cls, synthetic_data, key_fields, sensitive_fields, model_kwargs): + if model_kwargs is None: + model_kwargs = cls.MODEL_KWARGS.copy() if cls.MODEL_KWARGS else {} + + model = cls.MODEL(**model_kwargs) + model.fit(synthetic_data, key_fields, sensitive_fields) + + return model + + @classmethod + def _validate_inputs(cls, real_data, synthetic_data, metadata, key_fields, sensitive_fields): + metadata = super()._validate_inputs(real_data, synthetic_data, metadata) + if 'key_fields' in metadata: + key_fields = metadata['key_fields'] + elif key_fields is None: + raise TypeError('`key_fields` must be passed either directly or inside `metadata`') + + if 'sensitive_fields' in metadata: + sensitive_fields = metadata['sensitive_fields'] + elif sensitive_fields is None: + raise TypeError( + '`sensitive_fields` must be passed either directly or inside `metadata`') + + return key_fields, sensitive_fields, metadata + + @classmethod + def compute(cls, real_data, synthetic_data, metadata=None, key_fields=None, + sensitive_fields=None, model_kwargs=None, loss_function=None, + loss_function_kwargs=None): + """Compute this metric. + + This fits an adversial attacker model on the synthetic data and + then evaluates it making predictions on the real data. + + A ``key_fields`` column(s) name must be given, either directly or as a first level + entry in the ``metadata`` dict, which will be used as the key column(s) for the + attack. + + A ``sensitive_fields`` column(s) name must be given, either directly or as a first level + entry in the ``metadata`` dict, which will be used as the sensitive column(s) for the + attack. + + Args: + real_data (Union[numpy.ndarray, pandas.DataFrame]): + The values from the real dataset. + synthetic_data (Union[numpy.ndarray, pandas.DataFrame]): + The values from the synthetic dataset. + metadata (dict): + Table metadata dict. If not passed, it is build based on the + real_data fields and dtypes. + key_fields (list(str)): + Name of the column(s) to use as the key attributes. + sensitive_fields (list(str)): + Name of the column(s) to use as the sensitive attributes. + model_kwargs (dict): + Key word arguments of the attacker model. cls.MODEL_KWARGS will be used + if none is provided. + loss_function (Class): + The loss function to use. cls.LOSS_FUNCTION will be used if none is provided. + loss_function_kwargs (dict): + Key word arguments of the loss function. cls.LOSS_FUNCTION_KWARGS will be used + if none is provided. + + Returns: + union[float, tuple[float]]: + Scores obtained by the attackers when evaluated on the real data. + """ + key_fields, sensitive_fields, metadata = ( + cls._validate_inputs(real_data, synthetic_data, metadata, key_fields, sensitive_fields) + ) + + if len(key_fields) == 0 or len(sensitive_fields) == 0: + return np.nan + + for col in key_fields + sensitive_fields: + data_type = metadata['fields'][col] + + # check data type + if data_type != cls._DTYPES_TO_TYPES['i'] and data_type != cls._DTYPES_TO_TYPES['f']: + return np.nan + + model = cls._fit(synthetic_data, key_fields, sensitive_fields, model_kwargs) + + if loss_function_kwargs is None: + loss_function_kwargs = cls.LOSS_FUNCTION_KWARGS + + if loss_function is None: + loss_function = cls.LOSS_FUNCTION(**loss_function_kwargs) + else: + loss_function = loss_function(**loss_function_kwargs) + + loss_function.fit(real_data, sensitive_fields) + + count = len(real_data) + score = 0 + for idx in range(count): + key_data = tuple(real_data[key_fields].iloc[idx]) + sensitive_data = tuple(real_data[sensitive_fields].iloc[idx]) + pred_sensitive = model.predict(key_data) + score += loss_function.measure(pred_sensitive, sensitive_data) + + return score / count + + +class PrivacyAttackerModel(): + """Train and evaluate a privacy model. + + Train a model to predict sensitive attributes from key attributes + using the synthetic data. Then, evaluate the privacy of the model by + trying to predict the sensitive attributes of the real data. + """ + + def fit(self, synthetic_data, key_fields, sensitive_fields): + """Fit the attacker on the synthetic data. + + Args: + synthetic_data(pandas.DataFrame): + The synthetic data table used for adverserial learning. + key_fields(list[str]): + The names of the key columns. + sensitive_fields(list[str]): + The names of the sensitive columns. + """ + raise NotImplementedError("Please implement fit method of attackers") + + def predict(self, key_data): + """Make a prediction of the sensitive data given keys. + + Args: + key_data(tuple): + The key data. + + Returns: + tuple: + The predicted sensitive data. + """ + raise NotImplementedError("Please implement predict method of attackers") + + def score(self, key_data, sensitive_data): + """Score based on the belief of the attacker, in the form P(sensitive_data|key|data). + + Args: + key_data(tuple): + The key data. + sensitive_data(tuple): + The sensitive data. + """ + raise NotImplementedError('Posterior probability based scoring not supported' + 'for this attacker!') diff --git a/sdmetrics/single_table/privacy/cap.py b/sdmetrics/single_table/privacy/cap.py new file mode 100644 index 00000000..2c402a14 --- /dev/null +++ b/sdmetrics/single_table/privacy/cap.py @@ -0,0 +1,169 @@ +from sdmetrics.single_table.privacy.base import CategoricalPrivacyMetric, PrivacyAttackerModel +from sdmetrics.single_table.privacy.util import closest_neighbors, count_frequency, majority + + +class CAPAttacker(PrivacyAttackerModel): + """The CAP (Correct Attribution Probability) privacy attacker. + + It will find out all rows in synthetic table that match the target key attributes, and + predict the sensitive entry that appears most frequently among them. The privacy score will + be the frequency the correct sensitive entry appears among all such entries. In the case that + no such row is found, the attack will be ignored and not counted towards the privacy score. + """ + + def __init__(self): + self.synthetic_dict = {} # {key attribute: [sensitive attribute]} + + def fit(self, synthetic_data, key_fields, sensitive_fields): + """Fit the attacker on the synthetic data. + + Args: + synthetic_data (Union[numpy.ndarray, pandas.DataFrame]): + The values from the synthetic dataset. + key_fields (list(str)): + Name of the column(s) to use as the key attributes. + sensitive_fields (list(str)): + Name of the column(s) to use as the sensitive attributes. + """ + for idx in range(len(synthetic_data)): + key_value = tuple(synthetic_data[key_fields].iloc[idx]) + sensitive_value = tuple(synthetic_data[sensitive_fields].iloc[idx]) + if key_value in self.synthetic_dict: + self.synthetic_dict[key_value].append(sensitive_value) + else: + self.synthetic_dict[key_value] = [sensitive_value] + + def predict(self, key_data): + """Make a prediction of the sensitive data given keys. + + Args: + key_data (tuple): + The key data. + + Returns: + tuple: + The predicted sensitive data. + """ + if key_data not in self.synthetic_dict: + return None # target key attribute not found in synthetic table + + return majority(self.synthetic_dict[key_data]) + + def score(self, key_data, sensitive_data): + """Score based on the belief of the attacker, in the form P(sensitive_data|key|data). + + Args: + key_data (tuple): + The key data. + sensitive_data (tuple): + The sensitive data. + + Returns: + float or None: + The frequency of the correct sensitive entry. + Returns `None` if the key is not in the data. + """ + if key_data in self.synthetic_dict: + return count_frequency(self.synthetic_dict[key_data], sensitive_data) + else: + return None + + +class CategoricalCAP(CategoricalPrivacyMetric): + """The Categorical CAP privacy metric. Scored based on the CAPAttacker.""" + + name = 'CategoricalCAP' + MODEL = CAPAttacker + ACCURACY_BASE = False + + +class ZeroCAPAttacker(CAPAttacker): + """The 0CAP privacy attacker, which operates in the same way as CAP does. + + The difference is that when a match in key attribute is not found, the attack will + be classified as failed and a score of 0 will be recorded. + """ + + def score(self, key_data, sensitive_data): + """Score based on the belief of the attacker, in the form P(sensitive_data|key|data). + + Args: + key_data (tuple): + The key data. + sensitive_data (tuple): + The sensitive data. + + Returns: + float or None: + The frequency of the correct sensitive entry. + Returns `0` if the key is not in the data. + """ + if key_data in self.synthetic_dict: + return count_frequency(self.synthetic_dict[key_data], sensitive_data) + else: + return 0 + + +class CategoricalZeroCAP(CategoricalPrivacyMetric): + """The Categorical 0CAP privacy metric. Scored based on the ZeroCAPAttacker.""" + + name = '0CAP' + MODEL = ZeroCAPAttacker + ACCURACY_BASE = False + + +class GeneralizedCAPAttacker(CAPAttacker): + """The GeneralizedCAP privacy attacker. + + It will find out all rows in synthetic table that are closest (in hamming distance) to the + target key attributes, and predict the sensitive entry that appears most frequently among + them. The privacy score for each row in the real table will be calculated as the frequency + that the true sensitive attribute appears among all rows in the synthetic table with closest + key attribute. + """ + + def predict(self, key_data): + """Make a prediction of the sensitive data given keys. + + Args: + key_data (tuple): + The key data. + + Returns: + tuple: + The predicted sensitive data. + """ + ref_key_attributes = closest_neighbors(self.synthetic_dict.keys(), key_data) + ref_sensitive_attributes = [] + for key in ref_key_attributes: + ref_sensitive_attributes.extend(self.synthetic_dict[key]) + + return majority(ref_sensitive_attributes) + + def score(self, key_data, sensitive_data): + """Score based on the belief of the attacker, in the form P(sensitive_data|key|data). + + Args: + key_data (tuple): + The key data. + sensitive_data (tuple): + The sensitive data. + + Returns: + float or None: + The frequency of the correct sensitive entry. + """ + ref_key_attributes = closest_neighbors(self.synthetic_dict.keys(), key_data) + ref_sensitive_attributes = [] + for key in ref_key_attributes: + ref_sensitive_attributes.extend(self.synthetic_dict[key]) + + return count_frequency(ref_sensitive_attributes, sensitive_data) + + +class CategoricalGeneralizedCAP(CategoricalPrivacyMetric): + """The GeneralizedCAP privacy metric. Scored based on the ZeroCAPAttacker.""" + + name = 'Categorical GeneralizedCAP' + MODEL = GeneralizedCAPAttacker + ACCURACY_BASE = False diff --git a/sdmetrics/single_table/privacy/categorical_sklearn.py b/sdmetrics/single_table/privacy/categorical_sklearn.py new file mode 100644 index 00000000..a56182a9 --- /dev/null +++ b/sdmetrics/single_table/privacy/categorical_sklearn.py @@ -0,0 +1,242 @@ +import numpy as np +import sklearn.naive_bayes +from sklearn.ensemble import RandomForestClassifier +from sklearn.neighbors import KNeighborsClassifier +from sklearn.preprocessing import OneHotEncoder, OrdinalEncoder +from sklearn.svm import SVC + +from sdmetrics.single_table.privacy.base import ( + CategoricalPrivacyMetric, CategoricalType, PrivacyAttackerModel) +from sdmetrics.single_table.privacy.util import allow_nan, allow_nan_array + + +class CategoricalSklearnAttacker(PrivacyAttackerModel): + """Base class for the categorical attackers based on sklearn models. + + It is used to train a model to predict sensitive attributes from key attributes + using the synthetic data. Then, evaluate the privacy of the model by + trying to predict the sensitive attributes of the real data. + + Attributes: + key_type (CategoricalType): + Required key attribute type (class_num or one_hot) by the learner. + sensitive_type (CategoricalType): + Required sensitive attribute type (class_num or one_hot) by the learner. + skl_learner (Class): + A (wrapped) sklearn classifier class that can be called with no arguments. + """ + + KEY_TYPE = None + SENSITIVE_TYPE = None + SKL_LEARNER = None + + def __init__(self): + self.predictor = self.SKL_LEARNER() + self.key_processor = ( + OrdinalEncoder() if self.KEY_TYPE == CategoricalType.CLASS_NUM + else OneHotEncoder() + ) + self.sensitive_processor = ( + OrdinalEncoder() if + self.SENSITIVE_TYPE == CategoricalType.CLASS_NUM else OneHotEncoder() + ) + + def fit(self, synthetic_data, key_fields, sensitive_fields): + """Fit the CategoricalSklearnAttacker on the synthetic data. + + Args: + synthetic_data(pandas.DataFrame): + The synthetic data table used for adverserial learning. + key_fields(list[str]): + The names of the key columns. + sensitive_fields(list[str]): + The names of the sensitive columns. + """ + key_table = allow_nan(synthetic_data[key_fields]) + sensitive_table = allow_nan(synthetic_data[sensitive_fields]) + self.key_processor.fit(key_table) + self.sensitive_processor.fit(sensitive_table) + + key_train = self.key_processor.transform(key_table) + sensitive_train = self.sensitive_processor.transform(sensitive_table) + self.predictor.fit(key_train, sensitive_train) + + def predict(self, key_data): + """Make a prediction of the sensitive data given keys. + + Args: + key_data(tuple): + The key data. + + Returns: + tuple: + The predicted sensitive data. + """ + keys = allow_nan_array(key_data) # de-nan key attributes + try: + # key attributes in ML ready format + keys_transform = self.key_processor.transform([keys]) + except ValueError: # Some attributes of the input haven't appeared in synthetic tables + return None + + sensitive_pred = self.predictor.predict(keys_transform) + if len(np.array(sensitive_pred).shape) == 1: + sensitive_pred = [sensitive_pred] + + # predicted sensitive attributes in original format + sensitives = self.sensitive_processor.inverse_transform(sensitive_pred) + return tuple(sensitives[0]) + + +class SVCWrapper(): + """A wrapper arround `sklearn.svm.SVC` to support multidimensional y.""" + + def __init__(self): + self.predictors = [] + + def fit(self, X, Y): + """Fit the classifier to training data X and lables Y. + + Arguments: + X (np.array): + training data matrix of shape (n_samples, n_features) + Y (np.array): + label matrix of shape (n_samples, n_labels) + """ + n_labels = Y.shape[1] + for idx in range(n_labels): + Y_col = Y[:, idx] + predictor = SVC() + predictor.fit(X, Y_col) + self.predictors.append(predictor) + + def predict(self, X): + """Predict the labels corresponding to data X. + + Arguments: + X (np.array): + training data matrix of shape (n_samples, n_features) + + Returns: + np.array: label matrix of shape (n_samples, n_labels) + """ + Y = [] + for predictor in self.predictors: + Y.append(predictor.predict(X)) + + Y = np.array(Y).T + return Y + + +class NBWrapper(): + """A wrapper arround `sklearn.naive_bayes.CategoricalNB` to support multidimensional y.""" + + def __init__(self): + self.predictors = [] + + def fit(self, X, Y): + """Fit the classifier to training data X and lables Y. + + Arguments: + X (np.array): + training data matrix of shape (n_samples, n_features) + Y (np.array): + label matrix of shape (n_samples, n_labels) + """ + n_labels = Y.shape[1] + for idx in range(n_labels): + Y_col = Y[:, idx] + predictor = sklearn.naive_bayes.CategoricalNB() + predictor.fit(X, Y_col) + self.predictors.append(predictor) + + def predict(self, X): + """Predict the labels corresponding to data X. + + Arguments: + X (np.array): training data matrix of shape (n_samples, n_features) + + Returns: + np.array: label matrix of shape (n_samples, n_labels) + """ + Y = [] + for predictor in self.predictors: + Y.append(predictor.predict(X)) + Y = np.array(Y).T + return Y + + +class CategoricalNBAttacker(CategoricalSklearnAttacker): + """The Categorical NaiveBaysian privacy attaker. + + Uses a naive bayesian classifier to calculate the score based on prediction accuracy. + """ + + KEY_TYPE = CategoricalType.CLASS_NUM + SENSITIVE_TYPE = CategoricalType.CLASS_NUM + SKL_LEARNER = NBWrapper + + +class CategoricalNB(CategoricalPrivacyMetric): + """The Categorical NaiveBaysian privacy metric. Scored based on the CategoricalNBAttacker.""" + + name = 'Categorical NaiveBayesian' + MODEL = CategoricalNBAttacker + ACCURACY_BASE = True + + +class CategoricalKNNAttacker(CategoricalSklearnAttacker): + """The Categorical KNN (k nearest neighbors) privacy attaker. + + Uses a KNN classifier to calculate the score based on prediction accuracy. + """ + + KEY_TYPE = CategoricalType.ONE_HOT + SENSITIVE_TYPE = CategoricalType.CLASS_NUM + SKL_LEARNER = KNeighborsClassifier + + +class CategoricalKNN(CategoricalPrivacyMetric): + """The Categorical KNN privacy metric. Scored based on the KNNAttacker.""" + + name = 'K-Nearest Neighbors' + MODEL = CategoricalKNNAttacker + ACCURACY_BASE = True + + +class CategoricalRFAttacker(CategoricalSklearnAttacker): + """The Categorical RF (Random Forest) privacy attaker. + + Uses a RF classifier to calculate the score based on prediction accuracy. + """ + + KEY_TYPE = CategoricalType.ONE_HOT + SENSITIVE_TYPE = CategoricalType.CLASS_NUM + SKL_LEARNER = RandomForestClassifier + + +class CategoricalRF(CategoricalPrivacyMetric): + """The Categorical RF privacy metric. Scored based on the CategoricalRFAttacker.""" + + name = 'Categorical Random Forest' + MODEL = CategoricalRFAttacker + ACCURACY_BASE = True + + +class CategoricalSVMAttacker(CategoricalSklearnAttacker): + """The Categorical SVM (Support Vector Machine) privacy attaker. + + Uses a SVM classifier to calculate the score based on prediction accuracy. + """ + + KEY_TYPE = CategoricalType.ONE_HOT + SENSITIVE_TYPE = CategoricalType.CLASS_NUM + SKL_LEARNER = SVCWrapper + + +class CategoricalSVM(CategoricalPrivacyMetric): + """The Categorical SVM privacy metric. Scored based on the CategoricalSVMAttacker.""" + + name = 'Support Vector Classifier' + MODEL = CategoricalSVMAttacker + ACCURACY_BASE = True diff --git a/sdmetrics/single_table/privacy/ensemble.py b/sdmetrics/single_table/privacy/ensemble.py new file mode 100644 index 00000000..5f2a19fc --- /dev/null +++ b/sdmetrics/single_table/privacy/ensemble.py @@ -0,0 +1,109 @@ +import numpy as np + +from sdmetrics.single_table.privacy.base import CategoricalPrivacyMetric, PrivacyAttackerModel +from sdmetrics.single_table.privacy.util import majority + + +class CategoricalEnsembleAttacker(PrivacyAttackerModel): + """The Categorical ENS (ensemble 'majority vote' classifier) privacy attacker. + + It will predict the majority of the specified sub-attackers's predicions, and the privacy + score will be calculated based on the accuracy of its prediction. + """ + + def __init__(self, attackers=[]): + self.attackers = [attacker() for attacker in attackers] + + def fit(self, synthetic_data, key_fields, sensitive_fields): + """Fit the CategoricalEnsembleAttacker on the synthetic data. + + Args: + synthetic_data(pandas.DataFrame): + The synthetic data table used for adverserial learning. + key_fields(list[str]): + The names of the key columns. + sensitive_fields(list[str]): + The names of the sensitive columns. + """ + for attacker in self.attackers: + attacker.fit(synthetic_data, key_fields, sensitive_fields) + + def predict(self, key_data): + """Make a prediction of the sensitive data given keys. + + Args: + key_data(tuple): + The key data. + + Returns: + tuple: + The predicted sensitive data. + """ + predictions = [attacker.predict(key_data) for attacker in self.attackers] + return majority(predictions) + + +class CategoricalEnsemble(CategoricalPrivacyMetric): + """The Categorical Ensemble privacy metric. Scored based on the CategoricalEnsembleAttacker. + + When calling `cls.compute`, please make sure to pass in the argument + `model_kwargs (dict): {attackers: list[PrivacyAttackerModel]}`. + """ + + name = 'Ensemble' + MODEL = CategoricalEnsembleAttacker + ACCURACY_BASE = True + + @classmethod + def compute(cls, real_data, synthetic_data, metadata=None, key_fields=None, + sensitive_fields=None, model_kwargs=None): + """Compute this metric. + + This fits the CategoricalEnsembleAttacker on the synthetic data and + then evaluates it making predictions on the real data. + + A ``key_fields`` column(s) name must be given, either directly or as a first level + entry in the ``metadata`` dict, which will be used as the key column(s) for the + attack. + + A ``sensitive_fields`` column(s) name must be given, either directly or as a first level + entry in the ``metadata`` dict, which will be used as the sensitive_fields column(s) + for the attack. + + Args: + real_data (Union[numpy.ndarray, pandas.DataFrame]): + The values from the real dataset. + synthetic_data (Union[numpy.ndarray, pandas.DataFrame]): + The values from the synthetic dataset. + metadata (dict): + Table metadata dict. If not passed, it is build based on the + real_data fields and dtypes. + key_fields (list(str)): + Name of the column(s) to use as the key attributes. + sensitive_fields (list(str)): + Name of the column(s) to use as the sensitive attributes. + model_kwargs (dict): + Key word arguments of the attacker model. cls.MODEL_KWARGS will be used + if none is provided. + + Returns: + union[float, tuple[float]]: + Score obtained by the CategoricalEnsembleAttacker when evaluated on the real data. + """ + if model_kwargs is None: + model_kwargs = cls.MODEL_KWARGS + + if 'attackers' not in model_kwargs: # no attackers specfied + return np.nan + elif (not isinstance(model_kwargs['attackers'], list) + or len(model_kwargs['attackers']) == 0): # zero attackers specfied + return np.nan + + return super().compute( + real_data, + synthetic_data, + metadata, + key_fields, + sensitive_fields, + model_kwargs + ) diff --git a/sdmetrics/single_table/privacy/loss.py b/sdmetrics/single_table/privacy/loss.py new file mode 100644 index 00000000..26b08e98 --- /dev/null +++ b/sdmetrics/single_table/privacy/loss.py @@ -0,0 +1,86 @@ +import numpy as np +from copulas.univariate.base import Univariate + + +class LossFunction(): + """Base class for a loss function.""" + + def fit(self, data, cols): + """Learn the metric on the value space. + + Args: + real_data (pandas.DataFrame): + The real data table. + cols (list[str]): + The names for the target columns (usually the sensitive cols). + """ + + def measure(self, pred, real): + """Calculate the loss of a single prediction. + + Args: + pred (tuple): + The predicted value. + real (tuple): + The actual value. + """ + raise NotImplementedError('Please implement the loss measuring algorithm!') + + +class InverseCDFDistance(LossFunction): + """Measure the distance between continuous key fields. + + This loss function first applies the fitted cdfs to every single entry (i.e. turning + the numerical values into their respective percentiles) and then measures the Lp distance + to the pth power, between the predicted value and the real value. + """ + + def __init__(self, p=2): + """ + Args: + p (float): + The p parameter in L_p metric. Must be positive. + """ + self.p = p + self.cdfs = [] + + def fit(self, data, cols): + """Fits univariate distributions (automatically selected). + + Args: + data (DataFrame): + Data, where each column in `cols` is a continuous column. + cols (list[str]): + Column names. + """ + for col in cols: + col_data = np.array(data[col]) + dist_model = Univariate() + dist_model.fit(col_data) + self.cdfs.append(dist_model) + + def measure(self, pred, real): + """Compute the distance (L_p norm) between the pred and real values. + + This uses the probability integral transform to map the pred/real values + to a CDF value (between 0.0 and 1.0). Then, it computes the L_p norm + between the CDF(pred) and CDF(real). + + Args: + pred (tuple): + Predicted value(s) corresponding to the columns specified in fit. + real (tuple): + Real value(s) corresponding to the columns specified in fit. + + Returns: + float: + The L_p norm of the CDF value. + """ + assert len(pred) == len(real) + + dist = 0 + for idx in range(len(real)): + percentiles = self.cdfs[idx].cdf(np.array([pred[idx], real[idx]])) + dist += abs(percentiles[0] - percentiles[1])**self.p + + return dist diff --git a/sdmetrics/single_table/privacy/numerical_sklearn.py b/sdmetrics/single_table/privacy/numerical_sklearn.py new file mode 100644 index 00000000..bc652ab7 --- /dev/null +++ b/sdmetrics/single_table/privacy/numerical_sklearn.py @@ -0,0 +1,134 @@ +import numpy as np +from sklearn.linear_model import LinearRegression +from sklearn.neural_network import MLPRegressor +from sklearn.svm import SVR as svr + +from sdmetrics.single_table.privacy.base import NumericalPrivacyMetric, PrivacyAttackerModel + + +class NumericalSklearnAttacker(PrivacyAttackerModel): + """Base class for numerical attacker based on sklearn models. + + It is used to train a model to predict sensitive attributes from key attributes + using the synthetic data. Then, evaluate the privacy of the model by + trying to predict the sensitive attributes of the real data. + + Attributes: + skl_learner (Class): + A (wrapped) sklearn classifier class that can be called with no arguments. + """ + SKL_LEARNER = None + + def __init__(self): + self.predictor = self.SKL_LEARNER() + + def fit(self, synthetic_data, key, sensitive): + """Fit the NumericalSklearnAttacker on the synthetic data. + + Args: + synthetic_data(pandas.DataFrame): + The synthetic data table used for adverserial learning. + key_fields(list[str]): + The names of the key columns. + sensitive_fields(list[str]): + The names of the sensitive columns. + """ + key_table = np.array(synthetic_data[key]) + sensitive_table = np.array(synthetic_data[sensitive]) + + self.predictor.fit(key_table, sensitive_table) + + def predict(self, key_data): + """Make a prediction of the sensitive data given keys. + + Args: + key_data(tuple): + The key data. + + Returns: + tuple: + The predicted sensitive data. + """ + sensitive_pred = self.predictor.predict([key_data]) + if len(np.array(sensitive_pred).shape) == 1: + sensitive_pred = [sensitive_pred] + + return tuple(sensitive_pred[0]) + + +class SVRWrapper(): + """A wrapper arround `sklearn.svm.SVR` to support multidimensional y.""" + + def __init__(self): + self.predictors = [] + + def fit(self, X, Y): + """Fit the classifier to training data X and lables Y. + + Arguments: + X (np.array): + training data matrix of shape (n_samples, n_features). + Y (np.array): + label matrix of shape (n_samples, n_labels). + """ + n_labels = Y.shape[1] + for idx in range(n_labels): + Y_col = Y[:, idx] + predictor = svr() + predictor.fit(X, Y_col) + self.predictors.append(predictor) + + def predict(self, X): + """Predict the labels corresponding to data X. + + Arguments: + X (np.array): training data matrix of shape (n_samples, n_features) + + Returns: + np.array: label matrix of shape (n_samples, n_labels) + """ + Y = [] + for predictor in self.predictors: + Y.append(predictor.predict(X)) + + Y = np.array(Y).T + return Y + + +class LRAttacker(NumericalSklearnAttacker): + """The privacy attaker based on the Linear Regression model.""" + + SKL_LEARNER = LinearRegression + + +class NumericalLR(NumericalPrivacyMetric): + """The Numerical Linear Regression privacy metric. Scored based on the LRAttacker.""" + + name = 'Numerical Linear Regression' + MODEL = LRAttacker + + +class MLPAttacker(NumericalSklearnAttacker): + """The privacy attaker based on the MLP (Multi-layer Perceptron) regression model.""" + + SKL_LEARNER = MLPRegressor + + +class NumericalMLP(NumericalPrivacyMetric): + """The Multi-layer Perceptron regression privacy metric. Scored based on the MLPAttacker.""" + + name = 'Multi-layer Perceptron Regression' + MODEL = MLPAttacker + + +class SVRAttacker(NumericalSklearnAttacker): + """The privacy attaker based on the SVR (Support-vector Regression) model.""" + + SKL_LEARNER = SVRWrapper + + +class NumericalSVR(NumericalPrivacyMetric): + """The Numerical Support-vector Regression privacy metric. Scored based on the SVRAttacker.""" + + name = 'Numerical Support-vector Regression' + MODEL = SVRAttacker diff --git a/sdmetrics/single_table/privacy/radius_nearest_neighbor.py b/sdmetrics/single_table/privacy/radius_nearest_neighbor.py new file mode 100644 index 00000000..ba2b61ad --- /dev/null +++ b/sdmetrics/single_table/privacy/radius_nearest_neighbor.py @@ -0,0 +1,135 @@ +import numpy as np + +from sdmetrics.single_table.privacy.base import NumericalPrivacyMetric, PrivacyAttackerModel +from sdmetrics.single_table.privacy.loss import InverseCDFDistance + + +class NumericalRadiusNearestNeighborAttacker(PrivacyAttackerModel): + """The Radius Nearest Neighbor Attacker. + + It will predict the sensitive value to be a weighted mean of the entries in the + synthetic table. Where this weight is given by a separate function, and typically + describes the closeness between the given key and the corresponding entry in the table. + """ + + def __init__(self, weight_func=None, weight_func_kwargs=None): + """ + Args: + weight_func (Class): + The weight function to use. + weight_func_kwargs (dict): + Parameters of the weight function. + """ + if weight_func_kwargs is None: + weight_func_kwargs = {} + + self.weight_func = weight_func(**weight_func_kwargs) + self.synthetic_data = None + self.key = None + self.sensitive_fields = None + self.key_fields = None + + def fit(self, synthetic_data, key_fields, sensitive_fields): + """Fit the NumericalRadiusNearestNeighborAttacker on the synthetic data. + + Args: + synthetic_data(pandas.DataFrame): + The synthetic data table used for adverserial learning. + key_fields(list[str]): + The names of the key columns. + sensitive_fields(list[str]): + The names of the sensitive columns. + """ + self.weight_func.fit(synthetic_data, key_fields) + self.synthetic_data = synthetic_data + self.key_fields = key_fields + self.sensitive_fields = sensitive_fields + + def predict(self, key_data): + """Make a prediction of the sensitive data given keys. + + Args: + key_data(tuple): + The key data. + + Returns: + tuple: + The predicted sensitive data. + """ + weights = 0 + summ = None + modified = False + for idx in range(len(self.synthetic_data)): + ref_key = tuple(self.synthetic_data[self.key_fields].iloc[idx]) + sensitive_data = np.array(self.synthetic_data[self.sensitive_fields].iloc[idx]) + weight = self.weight_func.measure(key_data, ref_key) + weights += weight + if not modified: + summ = sensitive_data.copy() + modified = True + else: + summ += sensitive_data + + if weights == 0: + return (0,) * len(self.sensitive_fields) + else: + return tuple(summ / weights) + + +class InverseCDFCutoff(InverseCDFDistance): + """Gives weight = 1 if the Lp averaged distance between the entries is below a given cutoff. + + Formally, suppose given key = (k1,..,kn), while the reference key is (k1',...,kn'). + Suppose the cdfs of each entry are c1,...,cn, resp. + Then weight = 1 if and only if (sum |c_i(ki) - c_i(ki')|**p) / n <= cutoff**p. + """ + + def __init__(self, p=2, cutoff=0.1): + self.p = p + self.cdfs = [] + self.cutoff = cutoff**p + + def fit(self, data, cols): + """Fits univariate distributions (automatically selected). + + Args: + data (DataFrame): + Data, where each column in `cols` is a continuous column. + cols (list[str]): + Column names. + """ + InverseCDFDistance.fit(self, data, cols) + self.cutoff *= len(cols) + + def measure(self, pred, real): + """Compute the distance (L_p norm) between the pred and real values. + + This uses the probability integral transform to map the pred/real values + to a CDF value (between 0.0 and 1.0). Then, it computes the L_p norm + between the CDF(pred) and CDF(real). + + Args: + pred (tuple): + Predicted value(s) corresponding to the columns specified in fit. + real (tuple): + Real value(s) corresponding to the columns specified in fit. + + Returns: + float: + The L_p norm of the CDF value. + """ + dist = InverseCDFDistance.measure(self, pred, real) + return 1 if dist < self.cutoff else 0 + + +class NumericalRadiusNearestNeighbor(NumericalPrivacyMetric): + """The Radius Nearest Neighbor privacy metric. + + Scored based on the NumericalRadiusNearestNeighborAttacker. + """ + + name = 'Numerical Radius Nearest Neighbor' + MODEL = NumericalRadiusNearestNeighborAttacker + MODEL_KWARGS = { + 'weight_func': InverseCDFCutoff, 'weight_func_kwargs': {'p': 2, 'cutoff': 0.3} + } diff --git a/sdmetrics/single_table/privacy/util.py b/sdmetrics/single_table/privacy/util.py new file mode 100644 index 00000000..2c664821 --- /dev/null +++ b/sdmetrics/single_table/privacy/util.py @@ -0,0 +1,145 @@ +import numpy as np + + +def majority(samples, ignore_none=True): + """Find the most frequent element in a list. + + Arguments: + samples (list): + Input list. Its elements must be hashable. + ignore_none (bool): + If `None` is a valid value. + + Returns: + object: + The most frequent element in samples. Returns none if the input list is empty. + """ + freq_dict = {} + most_freq_ele = None + highest_freq = 0 + for element in samples: + if ignore_none and element is None: + continue + if element not in freq_dict: + freq_dict[element] = 0 + + freq = freq_dict[element] + 1 + freq_dict[element] = freq + if freq > highest_freq: + highest_freq = freq + most_freq_ele = element + + return most_freq_ele + + +def count_frequency(samples, target): + """Calculate how frequent an target attribute appear in a list + + Arguments: + samples (list): + Input list. Its elements must be hashable. + target (object): + The target element. + + Returns: + float: + The frequency that target appears in samples. Must be in between 0 and 1. + """ + count = 0 + for ele in samples: + if ele == target: + count += 1 + + return count / len(samples) + + +def hamming_distance(target, test): + """Calculate the hamming distance between two tuples. + + Arguments: + target (tuple): + The target tuple. + test (tuple): + The test tuple. Must have same length as target + + Returns: + int: + The hamming distance + """ + dist = 0 + assert len(target) == len(test), ('Tuples must have the same length in the' + 'calculation of hamming distance!') + + for target_entry, test_entry in zip(target, test): + if target_entry != test_entry: + dist += 1 + + return dist + + +def closest_neighbors(samples, target): + """Find elements in a given list that are closest to a given element in hamming distance. + + Arguments: + samples (iterable[tuple]): + The given list to look up for. + target (tuple): + The target tuple. + + Returns: + list [tuple]: + Elements in samples that are closest to target. + """ + dist = float('inf') + ret = [] + for element in samples: + hamming_dist = hamming_distance(target, element) + if hamming_dist < dist: + dist = hamming_dist + ret = [element, ] + elif hamming_dist == dist: + ret.append(element) + + return ret + + +def allow_nan(df): + """Replace all invalid (`nan` and `None`) entries in a dataframe with valid placeholders. + + Arguments: + df (pandas.DataFrame): + The target dataframe. + + Returns: + pandas.DataFrame: + A modified dataframe. + """ + df_copy = df.copy() + for i in df_copy: + for j in range(len(df_copy[i])): + entry = df_copy[i][j] + if (isinstance(entry, float) and np.isnan(entry)) or entry is None: + df_copy[i][j] = 'place_holder_for_nan' + + return df_copy + + +def allow_nan_array(attributes): + """Replace all invalid (`nan` and `None`) entries in an array with valid placeholders. + + Arguments: + attributes (tuple): + The target array. + + Returns: + list: + The modified array. + """ + ret = [] + for entry in attributes: + if (isinstance(entry, float) and np.isnan(entry)) or entry is None: + ret.append('place_holder_for_nan') + else: + ret.append(entry) + + return ret diff --git a/setup.cfg b/setup.cfg index 9c7dbcd0..09cdb55c 100644 --- a/setup.cfg +++ b/setup.cfg @@ -34,7 +34,7 @@ universal = 1 [flake8] max-line-length = 99 exclude = docs, .tox, .git, __pycache__, .ipynb_checkpoints -ignore = SFS3 +ignore = SFS3, W503 [isort] include_trailing_comment = True diff --git a/setup.py b/setup.py index 3176076f..e9df0b6c 100644 --- a/setup.py +++ b/setup.py @@ -19,6 +19,7 @@ 'scipy>=1.4.1,<2', 'sktime>=0.4,<0.6', 'torch>=1.4,<2', + 'copulas>=0.5.0,<0.6', 'rdt>=0.4.0,<0.5', ] diff --git a/tests/integration/single_table/privacy/test_privacy.py b/tests/integration/single_table/privacy/test_privacy.py new file mode 100644 index 00000000..5c474f5b --- /dev/null +++ b/tests/integration/single_table/privacy/test_privacy.py @@ -0,0 +1,173 @@ +import numpy as np +import pandas as pd +import pytest + +from sdmetrics.single_table.privacy import ( + CategoricalEnsemble, CategoricalPrivacyMetric, NumericalPrivacyMetric) +from sdmetrics.single_table.privacy.categorical_sklearn import ( + CategoricalKNNAttacker, CategoricalNBAttacker, CategoricalRFAttacker) + +categorical_metrics = CategoricalPrivacyMetric.get_subclasses() +numerical_metrics = NumericalPrivacyMetric.get_subclasses() + + +def cat_real_data(): + return pd.DataFrame({ + 'key1': ['a', 'b', 'c', 'd', 'e'] * 20, + 'key2': [0, 1, 2, 3, 4] * 20, + 'sensitive1': ['a', 'b', 'c', 'd', 'e'] * 20, + 'sensitive2': [0, 1, 2, 3, 4] * 20 + }) + + +def cat_perfect_synthetic_data(): + return pd.DataFrame({ + 'key1': np.random.choice(['a', 'b', 'c', 'd', 'e'], 20), + 'key2': np.random.randint(0, 5, size=20), + 'sensitive1': np.random.choice(['f', 'g', 'h', 'i', 'j'], 20), + 'sensitive2': np.random.randint(5, 10, size=20) + }) + + +def cat_good_synthetic_data(): + return pd.DataFrame({ + 'key1': np.random.choice(['a', 'b', 'c', 'd', 'e'], 20), + 'key2': np.random.randint(0, 5, size=20), + 'sensitive1': np.random.choice(['a', 'b', 'c', 'd', 'e'], 20), + 'sensitive2': np.random.randint(0, 5, size=20) + }) + + +def cat_bad_synthetic_data(): + return pd.DataFrame({ + 'key1': ['a', 'b', 'c', 'd', 'e'] * 20, + 'key2': [0, 1, 2, 3, 4] * 20, + 'sensitive1': ['a', 'b', 'c', 'e', 'd'] * 20, + 'sensitive2': [0, 1, 2, 3, 4] * 20 + }) + + +@pytest.mark.parametrize('metric', categorical_metrics.values()) +def test_categoricals_non_ens(metric): + if metric != CategoricalEnsemble: # Ensemble needs additional args to work + perfect = metric.compute( + cat_real_data(), cat_perfect_synthetic_data(), + key_fields=['key1', 'key2'], sensitive_fields=['sensitive1', 'sensitive2'] + ) + + good = metric.compute( + cat_real_data(), cat_good_synthetic_data(), + key_fields=['key1', 'key2'], sensitive_fields=['sensitive1', 'sensitive2'] + ) + + bad = metric.compute( + cat_real_data(), cat_bad_synthetic_data(), + key_fields=['key1', 'key2'], sensitive_fields=['sensitive1', 'sensitive2'] + ) + + horrible = metric.compute( + cat_real_data(), cat_real_data(), + key_fields=['key1', 'key2'], sensitive_fields=['sensitive1', 'sensitive2'] + ) + + assert metric.min_value <= horrible <= bad <= good <= perfect <= metric.max_value + + +def test_categorical_ens(): + model_kwargs = { + 'attackers': [CategoricalNBAttacker, CategoricalRFAttacker, CategoricalKNNAttacker] + } + perfect = CategoricalEnsemble.compute( + cat_real_data(), cat_perfect_synthetic_data(), + key_fields=['key1', 'key2'], sensitive_fields=['sensitive1', 'sensitive2'], + model_kwargs=model_kwargs + ) + + good = CategoricalEnsemble.compute( + cat_real_data(), cat_good_synthetic_data(), + key_fields=['key1', 'key2'], sensitive_fields=['sensitive1', 'sensitive2'], + model_kwargs=model_kwargs + ) + + bad = CategoricalEnsemble.compute( + cat_real_data(), cat_bad_synthetic_data(), + key_fields=['key1', 'key2'], sensitive_fields=['sensitive1', 'sensitive2'], + model_kwargs=model_kwargs + ) + + horrible = CategoricalEnsemble.compute( + cat_real_data(), cat_real_data(), + key_fields=['key1', 'key2'], sensitive_fields=['sensitive1', 'sensitive2'], + model_kwargs=model_kwargs + ) + + assert (CategoricalEnsemble.min_value <= horrible <= bad + <= good <= perfect <= CategoricalEnsemble.max_value) + + +def numerical_real_data(): + return pd.DataFrame({ + 'key1': [0.0, 0.1, 0.2, 0.3, 0.4] * 4, + 'key2': [-0.0, -0.1, -0.2, -0.3, -0.4] * 4, + 'sensitive1': [0.0, 0.1, 0.2, 0.3, 0.4] * 4, + 'sensitive2': [-0.0, -0.1, -0.2, -0.3, -0.4] * 4 + }) + + +def numerical_good_synthetic_data(): + return pd.DataFrame({ + 'key1': np.random.normal(loc=0.2, scale=0.1, size=20), + 'key2': np.random.normal(loc=-0.2, scale=0.1, size=20), + 'sensitive1': np.random.normal(loc=10.0, size=20), + 'sensitive2': np.random.normal(loc=-10.0, size=20) + }) + + +def numerical_bad_synthetic_data(): + return pd.DataFrame({ + 'key1': np.random.normal(loc=0.2, scale=0.1, size=20), + 'key2': np.random.normal(loc=-0.2, scale=0.1, size=20), + 'sensitive1': np.random.normal(size=20), + 'sensitive2': np.random.normal(size=20) + }) + + +@pytest.mark.parametrize('metric', numerical_metrics.values()) +def test_num(metric): + good = metric.compute( + numerical_real_data(), numerical_good_synthetic_data(), + key_fields=['key1', 'key2'], sensitive_fields=['sensitive1', 'sensitive2'] + ) + + bad = metric.compute( + numerical_real_data(), numerical_bad_synthetic_data(), + key_fields=['key1', 'key2'], sensitive_fields=['sensitive1', 'sensitive2'] + ) + + horrible = metric.compute( + numerical_real_data(), numerical_real_data(), + key_fields=['key1', 'key2'], sensitive_fields=['sensitive1', 'sensitive2'] + ) + + assert metric.min_value <= horrible <= bad <= good <= metric.max_value + + +@pytest.mark.parametrize('metric', categorical_metrics.values()) +def test_categorical_empty_keys(metric): + if metric != CategoricalEnsemble: + with pytest.raises(TypeError): + metric.compute(cat_real_data(), cat_real_data(), sensitive_fields=['sensitive1']) + + +@pytest.mark.parametrize('metric', categorical_metrics.values()) +def test_categorical_empty_sensitive(metric): + if metric != CategoricalEnsemble: + with pytest.raises(TypeError): + metric.compute(cat_real_data(), cat_real_data(), key_fields=['key1']) + + +@pytest.mark.parametrize('metric', categorical_metrics.values()) +def test_categorical_empty_keys_sensitive(metric): + if metric != CategoricalEnsemble: + with pytest.raises(TypeError): + metric.compute(cat_real_data(), cat_real_data()) diff --git a/tests/unit/test_util.py b/tests/unit/test_util.py new file mode 100644 index 00000000..36c22374 --- /dev/null +++ b/tests/unit/test_util.py @@ -0,0 +1,32 @@ +from sdmetrics.single_table.privacy.util import closest_neighbors + + +def test_closest_neighbors_exact(): + samples = [ + ('a', '1'), + ('a', '2'), + ('a', '3'), + ('b', '1'), + ('b', '2'), + ('b', '3'), + ] + target = ('a', '2') + results = closest_neighbors(samples, target) + assert len(results) == 1 + assert results[0] == ('a', '2') + + +def test_closest_neighbors_non_exact(): + samples = [ + ('a', '1'), + ('a', '3'), + ('b', '1'), + ('b', '2'), + ('b', '3'), + ] + target = ('a', '2') + results = closest_neighbors(samples, target) + assert len(results) == 3 + assert ('a', '1') in results + assert ('a', '3') in results + assert ('b', '2') in results