Skip to content

Commit

Permalink
Return per partition utility analysis (#361)
Browse files Browse the repository at this point in the history
  • Loading branch information
dvadym authored Nov 2, 2022
1 parent 05482d8 commit 1142fef
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 23 deletions.
17 changes: 14 additions & 3 deletions examples/restaurant_visits/run_without_frameworks_tuning.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
flags.DEFINE_string('input_file', 'restaurants_week_data.csv',
'The file with the restaurant visits data')
flags.DEFINE_string('output_file', None, 'Output file')
flags.DEFINE_string(
'output_file_per_partition_analysis', None,
'If set, partition utility analysis is output to this file')
flags.DEFINE_boolean('public_partitions', False,
'Whether public partitions are used')

Expand Down Expand Up @@ -95,9 +98,17 @@ def tune_parameters():
aggregate_params=aggregate_params,
function_to_minimize=minimizing_function,
parameters_to_tune=parameters_to_tune)
result = parameter_tuning.tune(restaurant_visits_rows, backend, hist,
tune_options, data_extractors,
public_partitions)
if FLAGS.output_file_per_partition_analysis:
result, per_partition = parameter_tuning.tune(restaurant_visits_rows,
backend, hist,
tune_options,
data_extractors,
public_partitions, True)
write_to_file(per_partition, FLAGS.output_file_per_partition_analysis)
else:
result = parameter_tuning.tune(restaurant_visits_rows, backend, hist,
tune_options, data_extractors,
public_partitions, False)

# Here's where the lazy iterator initiates computations and gets transformed
# into actual results
Expand Down
18 changes: 14 additions & 4 deletions utility_analysis_new/parameter_tuning.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ def tune(col,
contribution_histograms: histograms.ContributionHistograms,
options: TuneOptions,
data_extractors: pipeline_dp.DataExtractors,
public_partitions=None) -> TuneResult:
public_partitions=None,
return_utility_analysis_per_partition: bool = False) -> TuneResult:
"""Tunes parameters.
It works in the following way:
Expand All @@ -196,6 +197,8 @@ def tune(col,
public_partitions: A collection of partition keys that will be present
in the result. If not provided, tuning will be performed assuming
private partition selection is used.
return_per_partition: if true, it returns tuple, with the 2nd element
utility analysis per partitions.
Returns:
1 element collection which contains TuneResult.
"""
Expand All @@ -209,15 +212,22 @@ def tune(col,
options.delta,
options.aggregate_params,
multi_param_configuration=candidates)
utility_analysis_result = utility_analysis.perform_utility_analysis(
result = utility_analysis.perform_utility_analysis(
col, backend, utility_analysis_options, data_extractors,
public_partitions)
public_partitions, return_utility_analysis_per_partition)
if return_utility_analysis_per_partition:
utility_analysis_result, utility_analysis_result_per_partition = result
else:
utility_analysis_result = result
use_public_partitions = public_partitions is not None
return backend.map(
utility_analysis_result = backend.map(
utility_analysis_result,
lambda result: _convert_utility_analysis_to_tune_result(
result, options, candidates, use_public_partitions,
contribution_histograms), "To Tune result")
if return_utility_analysis_per_partition:
return utility_analysis_result, utility_analysis_result_per_partition
return utility_analysis_result


def _check_tune_args(options: TuneOptions):
Expand Down
17 changes: 12 additions & 5 deletions utility_analysis_new/tests/parameter_tuning_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ def test_find_candidate_parameters(
self.assertEqual(expected_max_contributions_per_partition,
candidates.max_contributions_per_partition)

def test_tune(self):
@parameterized.parameters(False, True)
def test_tune(self, return_utility_analysis_per_partition: bool):
input = [(i % 10, f"pk{i/10}") for i in range(10)]
public_partitions = [f"pk{i}" for i in range(10)]
data_extractors = pipeline_dp.DataExtractors(
Expand All @@ -84,10 +85,16 @@ def test_tune(self):
function_to_minimize=parameter_tuning.MinimizingFunction.
ABSOLUTE_ERROR,
parameters_to_tune=parameter_tuning.ParametersToTune(True, True))
tune_result = list(
parameter_tuning.tune(input, pipeline_dp.LocalBackend(),
contribution_histograms, tune_options,
data_extractors, public_partitions))[0]
result = parameter_tuning.tune(input, pipeline_dp.LocalBackend(),
contribution_histograms, tune_options,
data_extractors, public_partitions,
return_utility_analysis_per_partition)
if return_utility_analysis_per_partition:
tune_result, per_partition_utility_anlaysis = result
self.assertLen(per_partition_utility_anlaysis, 10)
else:
tune_result = result
tune_result = list(tune_result)[0]

self.assertEqual(tune_options, tune_result.options)
self.assertEqual(contribution_histograms,
Expand Down
31 changes: 20 additions & 11 deletions utility_analysis_new/utility_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,39 +48,45 @@ def perform_utility_analysis(col,
backend: pipeline_backend.PipelineBackend,
options: UtilityAnalysisOptions,
data_extractors: pipeline_dp.DataExtractors,
public_partitions=None):
public_partitions=None,
return_per_partition: bool = False):
"""Performs utility analysis for DP aggregations.
Args:
col: collection where all elements are of the same type.
backend: PipelineBackend with which the utility analysis will be run.
options: options for utility analysis.
data_extractors: functions that extract needed pieces of information
from elements of 'col'.
from elements of 'col'.
public_partitions: A collection of partition keys that will be present
in the result. If not provided, the utility analysis with private
partition selection will be performed.
in the result. If not provided, the utility analysis with private
partition selection will be performed.
return_per_partition: if true, it returns tuple, with the 2nd element
utility analysis per partitions.
Returns:
1 element collection which contains utility analysis metrics.
"""
budget_accountant = pipeline_dp.NaiveBudgetAccountant(
total_epsilon=options.epsilon, total_delta=options.delta)
engine = dp_engine.UtilityAnalysisEngine(
budget_accountant=budget_accountant, backend=backend)
result = engine.aggregate(
per_partition_analysis_result = engine.aggregate(
col,
params=options.aggregate_params,
data_extractors=data_extractors,
public_partitions=public_partitions,
multi_param_configuration=options.multi_param_configuration)
budget_accountant.compute_budgets()
# result : (partition_key, per_partition_metrics)
# per_partition_analysis_result : (partition_key, per_partition_metrics)
per_partition_analysis_result = backend.to_multi_transformable_collection(
per_partition_analysis_result)

aggregate_error_combiners = _create_aggregate_error_compound_combiner(
options.aggregate_params, [0.1, 0.5, 0.9, 0.99], public_partitions,
options.n_parameters)
# TODO: Implement combine_accumulators (w/o per_key)
keyed_by_same_key = backend.map(result, lambda v: (None, v[1]),
keyed_by_same_key = backend.map(per_partition_analysis_result, lambda v:
(None, v[1]),
"Rekey partitions by the same key")
# keyed_by_same_key : (None, per_partition_metrics)
accumulators = backend.map_values(
Expand All @@ -99,7 +105,7 @@ def perform_utility_analysis(col,
aggregate_error_combiners.compute_metrics,
"Compute aggregate metrics")

# accumulators : (aggregate_metrics)
# aggregates : (aggregate_metrics)

def pack_metrics(aggregate_metrics) -> List[metrics.AggregateMetrics]:
if public_partitions is None:
Expand All @@ -118,9 +124,12 @@ def pack_metrics(aggregate_metrics) -> List[metrics.AggregateMetrics]:
for aggregate_error_metrics in aggregate_metrics
]

return backend.map(aggregates, pack_metrics,
"Pack metrics from the same run")
# (aggregate_metrics)
result = backend.map(aggregates, pack_metrics,
"Pack metrics from the same run")
# result: (aggregate_metrics)
if return_per_partition:
return result, per_partition_analysis_result
return result


def _create_aggregate_error_compound_combiner(
Expand Down

0 comments on commit 1142fef

Please sign in to comment.