Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(Experiments) Experimentation with DataFrames #491

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions analysis/tests/cross_partition_combiners_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,8 +378,8 @@ def test_create_report_partition_size_is_used_as_weight_wo_mocks(self):
_, _, weight = combiner.create_accumulator(per_partition_metrics)
self.assertEqual(weight, 5.0)

@patch(
"analysis.cross_partition_combiners._per_partition_to_utility_report")
@patch("analysis.cross_partition_combiners._per_partition_to_utility_report"
)
def test_create_report_with_mocks(self,
mock_per_partition_to_utility_report):
dp_metrics = [pipeline_dp.Metrics.COUNT]
Expand Down
76 changes: 63 additions & 13 deletions examples/restaurant_visits/run_without_frameworks.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,30 +21,47 @@
from absl import flags
import pipeline_dp
import pandas as pd
import pyspark
import os
import shutil
from pyspark.sql import SparkSession

FLAGS = flags.FLAGS
flags.DEFINE_string('input_file', 'restaurants_week_data.csv',
'The file with the restaraunt visits data')
flags.DEFINE_string('output_file', None, 'Output file')

RUN_ON_SPARK = True


def delete_if_exists(filename):
if os.path.exists(filename):
if os.path.isdir(filename):
shutil.rmtree(filename)
else:
os.remove(filename)


def write_to_file(col, filename):
with open(filename, 'w') as out:
out.write('\n'.join(map(str, col)))


def main(unused_argv):
# Here, we use a local backend for computations. This does not depend on
# any pipeline framework and it is implemented in pure Python in
# PipelineDP. It keeps all data in memory and is not optimized for large data.
# For datasets smaller than ~tens of megabytes, local execution without any
# framework is faster than local mode with Beam or Spark.
backend = pipeline_dp.LocalBackend()
def get_spark_context():
if not RUN_ON_SPARK:
return None
master = "local[1]" # use one worker thread to load the file as 1 partition
#Create PySpark SparkSession
spark = SparkSession.builder \
.master("local[1]") \
.appName("SparkByExamples.com") \
.getOrCreate()
return spark
# conf = pyspark.SparkConf().setMaster(master)
# return pyspark.SparkContext(conf=conf)

# Define the privacy budget available for our computation.
budget_accountant = pipeline_dp.NaiveBudgetAccountant(total_epsilon=1,
total_delta=1e-6)

def get_data(sc):
# Load and parse input data
df = pd.read_csv(FLAGS.input_file)
df.rename(inplace=True,
Expand All @@ -55,7 +72,37 @@ def main(unused_argv):
'Money spent (euros)': 'spent_money',
'Day': 'day'
})
restaraunt_visits_rows = [index_row[1] for index_row in df.iterrows()]
if not RUN_ON_SPARK:
return df

spark_df = sc.createDataFrame(df)
return spark_df


def get_backend(sc):
if RUN_ON_SPARK:
return pipeline_dp.pipeline_backend.SparkDataFrameBackend(sc)
return pipeline_dp.pipeline_backend.PandasDataFrameBackend(sc)


def main(unused_argv):
# Silence some Spark warnings
import warnings
warnings.simplefilter('ignore', UserWarning)
warnings.simplefilter('ignore', ResourceWarning)
delete_if_exists(FLAGS.output_file)
# Here, we use a local backend for computations. This does not depend on
# any pipeline framework and it is implemented in pure Python in
# PipelineDP. It keeps all data in memory and is not optimized for large data.
# For datasets smaller than ~tens of megabytes, local execution without any
# framework is faster than local mode with Beam or Spark.
# backend = pipeline_dp.LocalBackend()
sc = get_spark_context()
backend = get_backend(sc)

# Define the privacy budget available for our computation.
budget_accountant = pipeline_dp.NaiveBudgetAccountant(total_epsilon=1,
total_delta=1e-6)

# Create a DPEngine instance.
dp_engine = pipeline_dp.DPEngine(budget_accountant, backend)
Expand All @@ -80,16 +127,19 @@ def main(unused_argv):
# fail until budget is computed (below).
# It’s possible to call DPEngine.aggregate multiple times with different
# metrics to compute.
dp_result = dp_engine.aggregate(restaraunt_visits_rows,
df = get_data(sc)
dp_result = dp_engine.aggregate(df,
params,
data_extractors,
public_partitions=list(range(1, 8)))

budget_accountant.compute_budgets()
df = dp_result.collect()
# dp_result_df = sc.createDataFrame(dp_result)

# Here's where the lazy iterator initiates computations and gets transformed
# into actual results
dp_result = list(dp_result)
# dp_result = list(dp_result_df)

# Save the results
write_to_file(dp_result, FLAGS.output_file)
Expand Down
3 changes: 3 additions & 0 deletions pipeline_dp/budget_accounting.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ def eps(self):
AssertionError: The privacy budget is not calculated yet.
"""
if self._eps is None:
return 1.0
breakpoint()
raise AssertionError("Privacy budget is not calculated yet.")
return self._eps

Expand All @@ -80,6 +82,7 @@ def delta(self):
AssertionError: The privacy budget is not calculated yet.
"""
if self._delta is None:
return 1e-10
raise AssertionError("Privacy budget is not calculated yet.")
return self._delta

Expand Down
9 changes: 7 additions & 2 deletions pipeline_dp/dp_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ def _aggregate(self, col, params: pipeline_dp.AggregateParams,
data_extractors: pipeline_dp.DataExtractors,
public_partitions):

col = self._backend.if_dataframe_convert_to_collection(
col, "to_collection")
col = self._extract_columns(col, data_extractors)

if params.custom_combiners:
# TODO(dvadym): after finishing implementation of custom combiners
# to figure out whether it makes sense to encapsulate creation of
Expand All @@ -112,7 +116,6 @@ def _aggregate(self, col, params: pipeline_dp.AggregateParams,
else:
combiner = self._create_compound_combiner(params)

col = self._extract_columns(col, data_extractors)
# col : (privacy_id, partition_key, value)
if (public_partitions is not None and
not params.public_partitions_already_filtered):
Expand Down Expand Up @@ -173,6 +176,8 @@ def _aggregate(self, col, params: pipeline_dp.AggregateParams,
col = self._backend.map_values(col, combiner.compute_metrics,
"Compute DP metrics")

col = self._backend.convert_result_to_dataframe(col, "To DataFrame")

return col

def _check_select_private_partitions(
Expand Down Expand Up @@ -532,7 +537,7 @@ def _annotate(self, col, params: pipeline_dp.SelectPartitionsParams,


def _check_col(col):
if col is None or not col:
if col is None:
raise ValueError("col must be non-empty")


Expand Down
45 changes: 45 additions & 0 deletions pipeline_dp/pipeline_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
from typing import Callable

import abc

import pandas as pd

import pipeline_dp.combiners as dp_combiners
import typing
import collections
Expand Down Expand Up @@ -57,6 +60,12 @@ def to_collection(self, collection_or_iterable, col, stage_name: str):
"""
return collection_or_iterable

def convert_result_to_dataframe(self, col, stage_name: str):
return col

def if_dataframe_convert_to_collection(self, col, stage_name: str):
return col

def to_multi_transformable_collection(self, col):
"""Converts to a collection, for which multiple transformations can be applied.

Expand Down Expand Up @@ -474,6 +483,23 @@ def to_list(self, col, stage_name: str):
raise NotImplementedError("to_list is not implement in SparkBackend.")


class SparkDataFrameBackend(SparkRDDBackend):

def __init__(self, spark):
super().__init__(spark.sparkContext)
self._spark = spark

def convert_result_to_dataframe(self, col, stage_name: str):
col = col.values()
return self._spark.createDataFrame(col, schema="count:float, sum:float")
# return col.toDF()
# return col
def if_dataframe_convert_to_collection(self, col, stage_name: str):
if type(col).__name__ == "DataFrame":
return col.rdd
return col


class LocalBackend(PipelineBackend):
"""Local Pipeline adapter."""

Expand Down Expand Up @@ -583,6 +609,25 @@ def to_list(self, col, stage_name: str):
return (list(col) for _ in range(1))


class PandasDataFrameBackend(LocalBackend):

def if_dataframe_convert_to_collection(self, col, stage_name: str):
if not isinstance(col, pd.DataFrame):
return col
return col.itertuples()

def convert_result_to_dataframe(self, col, stage_name: str):

def generator():
l = list(col)
partition_keys, data = list(zip(*l))
df = pd.DataFrame(data=data)
df["partition_key"] = partition_keys
yield df

return generator()


# workaround for passing lambda functions to multiprocessing
# according to https://medium.com/@yasufumy/python-multiprocessing-c6d54107dd55
_pool_current_func = None
Expand Down
21 changes: 11 additions & 10 deletions tests/dp_engine_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,8 @@ def test_aggregate_computation_graph_per_partition_bounding(
unittest.mock.ANY,
unittest.mock.ANY)

@patch('pipeline_dp.dp_engine.DPEngine._drop_partitions',)
@patch(
'pipeline_dp.dp_engine.DPEngine._drop_partitions',)
def test_aggregate_no_partition_filtering_public_partitions(
self, mock_drop_partitions):
# Arrange
Expand Down Expand Up @@ -1093,9 +1094,9 @@ def run_e2e_private_partition_selection_large_budget(self, col, backend):

return col

@unittest.skipIf(
sys.version_info.major == 3 and sys.version_info.minor <= 8,
"dp_accounting library only support python >=3.9")
@unittest.skipIf(sys.version_info.major == 3 and
sys.version_info.minor <= 8,
"dp_accounting library only support python >=3.9")
@parameterized.parameters(False, True)
def test_run_e2e_count_public_partition_local(self, pld_accounting):
Accountant = pipeline_dp.PLDBudgetAccountant if pld_accounting else pipeline_dp.NaiveBudgetAccountant
Expand Down Expand Up @@ -1224,9 +1225,9 @@ def test_min_max_sum_per_partition(self):
self.assertLen(output, 1)
self.assertAlmostEqual(output[0][1].sum, -3, delta=0.1)

@unittest.skipIf(
sys.version_info.major == 3 and sys.version_info.minor <= 8,
"dp_accounting library only support python >=3.9")
@unittest.skipIf(sys.version_info.major == 3 and
sys.version_info.minor <= 8,
"dp_accounting library only support python >=3.9")
def test_pld_not_supported_metrics(self):
with self.assertRaisesRegex(
NotImplementedError,
Expand All @@ -1240,9 +1241,9 @@ def test_pld_not_supported_metrics(self):
engine.aggregate([1], aggregate_params,
self._get_default_extractors(), public_partitions)

@unittest.skipIf(
sys.version_info.major == 3 and sys.version_info.minor <= 8,
"dp_accounting library only support python >=3.9")
@unittest.skipIf(sys.version_info.major == 3 and
sys.version_info.minor <= 8,
"dp_accounting library only support python >=3.9")
def test_pld_not_support_private_partition_selection(self):
with self.assertRaisesRegex(
NotImplementedError,
Expand Down
Loading