diff --git a/hlink/linking/model_exploration/link_step_train_test_models.py b/hlink/linking/model_exploration/link_step_train_test_models.py index 4cba9cb..e599dcd 100644 --- a/hlink/linking/model_exploration/link_step_train_test_models.py +++ b/hlink/linking/model_exploration/link_step_train_test_models.py @@ -3,6 +3,7 @@ # in this project's top-level directory, and also on-line at: # https://github.com/ipums/hlink +import statistics import itertools import logging import math @@ -52,7 +53,7 @@ def _run(self) -> None: dep_var = config[training_conf]["dependent_var"] id_a = config["id_column"] + "_a" id_b = config["id_column"] + "_b" - desc_df = _create_desc_df() + thresholded_metrics_df = _create_thresholded_metrics_df() columns_to_keep = [id_a, id_b, "features_vector", dep_var] prepped_data = ( self.task.spark.table(f"{table_prefix}training_vectorized") @@ -74,6 +75,8 @@ def _run(self) -> None: f"There are {len(model_parameters)} sets of model parameters to explore; " f"each of these has {n_training_iterations} train-test splits to test on" ) + + probability_metrics_df = _create_probability_metrics_df() for run_index, run in enumerate(model_parameters, 1): run_start_info = f"Starting run {run_index} of {len(model_parameters)} with these parameters: {run}" print(run_start_info) @@ -144,13 +147,7 @@ def _run(self) -> None: pr_auc = auc(recall, precision) print(f"The area under the precision-recall curve is {pr_auc}") - splits_results.append( - { - "auc": pr_auc, - "predictions_tmp": predictions_tmp, - "predict_train_tmp": predict_train_tmp, - } - ) + splits_results.append(pr_auc) if first: prc = pd.DataFrame( @@ -173,16 +170,50 @@ def _run(self) -> None: training_data.unpersist() test_data.unpersist() - print(f"split_results: {len(splits_results)}") - # pluck out predictions_tmp, predict_train_tmp associated with highest pr_auc - best_pr_auc = 0.0 - best_predictions_tmp = None - best_predict_train_tmp = None - for a in splits_results: - if a["auc"] > best_pr_auc: - best_pr_auc = a["auc"] - best_predictions_tmp = a["predictions_tmp"] - best_predict_train_tmp = a["predict_train_tmp"] + # Aggregate pr auc mean, median, std + auc_mean = statistics.mean(splits_results) + auc_std = statistics.stdev(splits_results) + pr_auc_dict = { + "auc_mean": auc_mean, + "auc_standard_deviation": auc_std, + "model": model_type, + "params": params, + } + print(f"PR AUC for splits on current model and params: {pr_auc_dict}") + this_model_results = pd.DataFrame(pr_auc_dict) + probability_metrics_df = pd.concat( + [probability_metrics_df, this_model_results] + ) + + # TODO check if we should make a different split, like starting from a different seed? + # or just not re-using one we used in making the PR_AUC mean value? + splits_for_thresholding_eval = splits[0] + thresholding_training_data = splits_for_thresholding_eval[0] + thresholding_test_data = splits_for_thresholding_eval[1] + + thresholding_classifier, thresholding_post_transformer = ( + classifier_core.choose_classifier( + pr_auc_dict["model"], pr_auc_dict["params"], dep_var + ) + ) + thresholding_model = classifier.fit(thresholding_training_data) + + thresholding_predictions = _get_probability_and_select_pred_columns( + thresholding_test_data, + thresholding_model, + thresholding_post_transformer, + id_a, + id_b, + dep_var, + ).cache() + thresholding_predict_train = _get_probability_and_select_pred_columns( + thresholding_training_data, + thresholding_model, + thresholding_post_transformer, + id_a, + id_b, + dep_var, + ).cache() i = 0 for threshold_index, (alpha_threshold, threshold_ratio) in enumerate( @@ -193,14 +224,14 @@ def _run(self) -> None: f"{alpha_threshold=} and {threshold_ratio=}" ) predictions = threshold_core.predict_using_thresholds( - best_predictions_tmp, + thresholding_predictions, alpha_threshold, threshold_ratio, config[training_conf], config["id_column"], ) predict_train = threshold_core.predict_using_thresholds( - best_predict_train_tmp, + thresholding_predict_train, alpha_threshold, threshold_ratio, config[training_conf], @@ -211,21 +242,25 @@ def _run(self) -> None: predictions, predict_train, dep_var, - model, + thresholding_model, results_dfs[i], otd_data, alpha_threshold, threshold_ratio, - best_pr_auc, + pr_auc_dict["auc_mean"], ) i += 1 for i in range(len(threshold_matrix)): - desc_df = _append_results(desc_df, results_dfs[i], model_type, params) + thresholded_metrics_df = _append_results( + thresholded_metrics_df, results_dfs[i], model_type, params + ) - _print_desc_df(desc_df) - desc_df = _load_desc_df_params(desc_df) - self._save_training_results(desc_df, self.task.spark) + _print_thresholded_metrics_df(thresholded_metrics_df) + thresholded_metrics_df = _load_thresholded_metrics_df_params( + thresholded_metrics_df + ) + self._save_training_results(thresholded_metrics_df, self.task.spark) self._save_otd_data(otd_data, self.task.spark) self.task.spark.sql("set spark.sql.shuffle.partitions=200") @@ -611,7 +646,7 @@ def _create_results_df() -> pd.DataFrame: def _append_results( - desc_df: pd.DataFrame, + thresholded_metrics_df: pd.DataFrame, results_df: pd.DataFrame, model_type: str, params: dict[str, Any], @@ -642,12 +677,14 @@ def _append_results( }, ) - desc_df = pd.concat([desc_df, new_desc], ignore_index=True) - _print_desc_df(desc_df) - return desc_df + thresholded_metrics_df = pd.concat( + [thresholded_metrics_df, new_desc], ignore_index=True + ) + _print_thresholded_metrics_df(thresholded_metrics_df) + return thresholded_metrics_df -def _print_desc_df(desc_df: pd.DataFrame) -> None: +def _print_thresholded_metrics_df(desc_df: pd.DataFrame) -> None: pd.set_option("display.max_colwidth", None) print( desc_df.drop( @@ -663,7 +700,7 @@ def _print_desc_df(desc_df: pd.DataFrame) -> None: print("\n") -def _load_desc_df_params(desc_df: pd.DataFrame) -> pd.DataFrame: +def _load_thresholded_metrics_df_params(desc_df: pd.DataFrame) -> pd.DataFrame: params = [ "maxDepth", "numTrees", @@ -690,11 +727,22 @@ def _load_desc_df_params(desc_df: pd.DataFrame) -> pd.DataFrame: return desc_df -def _create_desc_df() -> pd.DataFrame: +def _create_probability_metrics_df() -> pd.DataFrame: return pd.DataFrame( columns=[ "model", "parameters", + "pr_auc_mean", + "pr_auc_standard_deviation", + ] + ) + + +def _create_thresholded_metrics_df() -> pd.DataFrame: + return pd.DataFrame( + columns=[ + "model", + "pa rameters", "alpha_threshold", "threshold_ratio", "precision_test_mean", diff --git a/hlink/tests/model_exploration_test.py b/hlink/tests/model_exploration_test.py index e0cf593..7ef1f92 100644 --- a/hlink/tests/model_exploration_test.py +++ b/hlink/tests/model_exploration_test.py @@ -280,6 +280,7 @@ def test_step_2_train_random_forest_spark( model_exploration.run_step(2) tr = spark.table("model_eval_training_results").toPandas() + print(f"training results {tr}") # assert tr.shape == (1, 18) assert tr.query("model == 'random_forest'")["pr_auc_mean"].iloc[0] > 0.7 assert tr.query("model == 'random_forest'")["maxDepth"].iloc[0] == 3