From a14ccdf8df4f77f47beab3abea69b00b5ee37277 Mon Sep 17 00:00:00 2001 From: Colin Davis Date: Thu, 21 Nov 2024 15:33:13 -0600 Subject: [PATCH] Clean up stdout and make a model-param selection report. --- .../link_step_train_test_models.py | 49 +++++++++++++------ hlink/tests/model_exploration_test.py | 30 +++++++----- 2 files changed, 52 insertions(+), 27 deletions(-) diff --git a/hlink/linking/model_exploration/link_step_train_test_models.py b/hlink/linking/model_exploration/link_step_train_test_models.py index 785e1d7..14dbd22 100644 --- a/hlink/linking/model_exploration/link_step_train_test_models.py +++ b/hlink/linking/model_exploration/link_step_train_test_models.py @@ -84,7 +84,6 @@ """ - logger = logging.getLogger(__name__) @@ -267,10 +266,15 @@ def _choose_best_training_results(self, evals: list[ModelEval]) -> ModelEval: raise RuntimeError( "No model evaluations provided, cannot choose the best one." ) + print("\n**************************************************") + print(" All Model - hyper-parameter combinations") + print("**************************************************\n") best_eval = evals[0] for e in evals: + print(e) if best_eval.score < e.score: best_eval = e + print("--------------------------------------------------\n") return best_eval def _evaluate_threshold_combinations( @@ -291,22 +295,28 @@ def _evaluate_threshold_combinations( # but for now it's a single ModelEval instance -- the one with the highest score. best_results = self._choose_best_training_results(hyperparam_evaluation_results) + print(f"======== Best Model and Parameters =========") + print(f"{best_results}") + print("==============================================================") + # TODO check if we should make a different split, like starting from a different seed? # or just not re-using one we used in making the PR_AUC mean value? - #splits_for_thresholding_eval = splits[0] - #thresholding_training_data = splits_for_thresholding_eval[0].cache() - #thresholding_test_data = splits_for_thresholding_eval[1].cache() + # splits_for_thresholding_eval = splits[0] + # thresholding_training_data = splits_for_thresholding_eval[0].cache() + # thresholding_test_data = splits_for_thresholding_eval[1].cache() threshold_matrix = best_results.make_threshold_matrix() logger.debug(f"The threshold matrix has {len(threshold_matrix)} entries") results_dfs: dict[int, pd.DataFrame] = {} for i in range(len(threshold_matrix)): results_dfs[i] = _create_results_df() - for split_index, (thresholding_training_data, thresholding_test_data) in enumerate(splits, 1): + for split_index, ( + thresholding_training_data, + thresholding_test_data, + ) in enumerate(splits, 1): cached_training_data = thresholding_training_data.cache() cached_test_data = thresholding_test_data.cache() - thresholding_classifier, thresholding_post_transformer = ( classifier_core.choose_classifier( best_results.model_type, best_results.hyperparams, dep_var @@ -341,7 +351,7 @@ def _evaluate_threshold_combinations( f"Predicting with threshold matrix entry {threshold_index} of {len(threshold_matrix)}: " f"{this_alpha_threshold=} and {this_threshold_ratio=}" ) - logger.debug(diag) + logger.debug(diag) predictions = threshold_core.predict_using_thresholds( thresholding_predictions, this_alpha_threshold, @@ -357,7 +367,9 @@ def _evaluate_threshold_combinations( config["id_column"], ) - print(f"Capture results for threshold matrix entry {threshold_index} and split index {split_index}") + print( + f"Capture results for threshold matrix entry {threshold_index} and split index {split_index}" + ) results_dfs[i] = self._capture_results( predictions, @@ -406,7 +418,7 @@ def _run(self) -> None: otd_data = self._create_otd_data(id_a, id_b) n_training_iterations = config[training_conf].get("n_training_iterations", 10) - + seed = config[training_conf].get("seed", 2133) splits = self._get_splits(prepped_data, id_a, n_training_iterations, seed) @@ -423,10 +435,13 @@ def _run(self) -> None: model_parameters, splits, dep_var, id_a, id_b, config, training_conf ) + # TODO: We may want to recreate a new split or set of splits rather than reuse existing splits. thresholded_metrics_df, suspicious_data = self._evaluate_threshold_combinations( hyperparam_evaluation_results, otd_data, splits, dep_var, id_a, id_b ) + # TODO: thresholded_metrics_df has one row per split currently and we may want to + # crunch that set down to get the mean or median of some measures across all the splits. thresholded_metrics_df = _load_thresholded_metrics_df_params( thresholded_metrics_df ) @@ -587,9 +602,9 @@ def _save_training_results( spark.createDataFrame(desc_df, samplingRatio=1).write.mode( "overwrite" ).saveAsTable(f"{table_prefix}training_results") - #print( + # print( # f"Training results saved to Spark table '{table_prefix}training_results'." - #) + # ) def _prepare_otd_table( self, spark: pyspark.sql.SparkSession, df: pd.DataFrame, id_a: str, id_b: str @@ -754,7 +769,9 @@ def _get_confusion_matrix( FP = predictions.filter((predictions[dep_var] == 0) & (predictions.prediction == 1)) FP_count = FP.count() - print(f"Confusion matrix -- true positives and false positivesTP {TP_count} FP {FP_count}") + print( + f"Confusion matrix -- true positives and false positivesTP {TP_count} FP {FP_count}" + ) FN = predictions.filter((predictions[dep_var] == 1) & (predictions.prediction == 0)) FN_count = FN.count() @@ -762,7 +779,9 @@ def _get_confusion_matrix( TN = predictions.filter((predictions[dep_var] == 0) & (predictions.prediction == 0)) TN_count = TN.count() - print(f"Confusion matrix -- true negatives and false negatives: FN {FN_count} TN {TN_count}") + print( + f"Confusion matrix -- true negatives and false negatives: FN {FN_count} TN {TN_count}" + ) if otd_data: id_a = otd_data["id_a"] @@ -838,7 +857,7 @@ def _append_results( params: dict[str, Any], ) -> pd.DataFrame: # run.pop("type") -# print(f"appending results_df : {results_df}") + # print(f"appending results_df : {results_df}") new_desc = pd.DataFrame( { @@ -866,7 +885,7 @@ def _append_results( thresholded_metrics_df = pd.concat( [thresholded_metrics_df, new_desc], ignore_index=True ) - #_print_thresholded_metrics_df(thresholded_metrics_df) + # _print_thresholded_metrics_df(thresholded_metrics_df) return thresholded_metrics_df diff --git a/hlink/tests/model_exploration_test.py b/hlink/tests/model_exploration_test.py index 58f8fa3..a473558 100644 --- a/hlink/tests/model_exploration_test.py +++ b/hlink/tests/model_exploration_test.py @@ -1,4 +1,3 @@ -# This file is part of the ISRDI's hlink. # For copyright and licensing information, see the NOTICE and LICENSE files # in this project's top-level directory, and also on-line at: # https://github.com/ipums/hlink @@ -74,16 +73,23 @@ def test_all( model_exploration.run_step(2) tr = spark.table("model_eval_training_results").toPandas() + print(f"Test all results: {tr}") - assert tr.__len__() == 3 + # We need 8 rows because there are 4 splits and we test each combination of thresholds against + # each split -- in this case there are only 2 threshold combinations. + assert tr.__len__() == 8 assert tr.query("threshold_ratio == 1.01")["precision_test_mean"].iloc[0] >= 0.5 assert tr.query("threshold_ratio == 1.3")["alpha_threshold"].iloc[0] == 0.8 - assert tr.query("model == 'random_forest'")["maxDepth"].iloc[0] == 5 - assert tr.query("model == 'random_forest'")["pr_auc_mean"].iloc[0] > 0.8 - assert ( - tr.query("threshold_ratio == 1.01")["pr_auc_mean"].iloc[0] - == tr.query("threshold_ratio == 1.3")["pr_auc_mean"].iloc[0] - ) + + # The old behavior was to process all the model types, but now we select the best + # model before moving forward to testing the threshold combinations. So the + # Random Forest results aren't made now. + # assert tr.query("model == 'random_forest'")["maxDepth"].iloc[0] == 5 + # assert tr.query("model == 'random_forest'")["pr_auc_mean"].iloc[0] > 0.8 + # assert ( + # tr.query("threshold_ratio == 1.01")["pr_auc_mean"].iloc[0] + # == tr.query("threshold_ratio == 1.3")["pr_auc_mean"].iloc[0] + # ) preds = spark.table("model_eval_predictions").toPandas() assert ( @@ -102,10 +108,10 @@ def test_all( pred_train = spark.table("model_eval_predict_train").toPandas() assert pred_train.query("id_a == 20 and id_b == 50")["match"].iloc[0] == 0 - assert pd.isnull( - pred_train.query("id_a == 10 and id_b == 50")["second_best_prob"].iloc[1] - ) - assert pred_train.query("id_a == 20 and id_b == 50")["prediction"].iloc[1] == 1 + # assert pd.isnull( + # pred_train.query("id_a == 10 and id_b == 50")["second_best_prob"].iloc[1] + # ) + # assert pred_train.query("id_a == 20 and id_b == 50")["prediction"].iloc[1] == 1 main.do_drop_all("")