From 93a5c4ea2786c7becf31930cd235e1aefbf6da8c Mon Sep 17 00:00:00 2001 From: Colin Davis Date: Fri, 6 Dec 2024 17:31:11 -0600 Subject: [PATCH 1/5] WIP: refactor to combine threshold test results from all outer folds. Doesn't work yet. --- .../link_step_train_test_models.py | 299 +++++++++++------- 1 file changed, 177 insertions(+), 122 deletions(-) diff --git a/hlink/linking/model_exploration/link_step_train_test_models.py b/hlink/linking/model_exploration/link_step_train_test_models.py index a05c3ed..58c92c6 100644 --- a/hlink/linking/model_exploration/link_step_train_test_models.py +++ b/hlink/linking/model_exploration/link_step_train_test_models.py @@ -137,6 +137,18 @@ def make_threshold_matrix(self) -> list[list[float]]: return _calc_threshold_matrix(self.threshold, self.threshold_ratio) +# Both training and test results can be captured in this type +@dataclass(kw_only=True) +class ThresholdTestResult: + precision: float + recall: float + pr_auc: float + mcc: float + model_id: str + alpha_threshold: float + threshold_ratio: float + + class LinkStepTrainTestModels(LinkStep): def __init__(self, task) -> None: super().__init__( @@ -329,7 +341,7 @@ def _choose_best_training_results(self, evals: list[ModelEval]) -> ModelEval: def _evaluate_threshold_combinations( self, - hyperparam_evaluation_results: list[ModelEval], + best_model: ModelEval, suspicious_data: Any, split: dict[str : pyspark.sql.DataFrame], dep_var: str, @@ -342,8 +354,6 @@ def _evaluate_threshold_combinations( id_column = config["id_column"] training_settings = config[training_config_name] - thresholded_metrics_df = _create_thresholded_metrics_df() - thresholding_training_data = split.get("training") thresholding_test_data = split.get("test") if thresholding_training_data is None: @@ -351,29 +361,25 @@ def _evaluate_threshold_combinations( if thresholding_test_data is None: raise RuntimeError("Must give some data with the 'test' key.") - # Note: We may change this to contain a list of best per model or something else - # but for now it's a single ModelEval instance -- the one with the highest score. - best_results = self._choose_best_training_results(hyperparam_evaluation_results) - print(f"\n======== Best Model and Parameters ========\n") - print(f"\t{best_results}\n") + print(f"\t{best_model}\n") print("=============================================\n\n") - logger.debug(f"Best model results: {best_results}") + logger.debug(f"Best model results: {best_model}") - threshold_matrix = best_results.make_threshold_matrix() + threshold_matrix = best_model.make_threshold_matrix() logger.debug(f"The threshold matrix has {len(threshold_matrix)} entries") info = f"\nTesting the best model + parameters against all {len(threshold_matrix)} threshold combinations.\n" logger.debug(info) - results_dfs: dict[int, pd.DataFrame] = {} - for i in range(len(threshold_matrix)): - results_dfs[i] = _create_results_df() + + prediction_results = dict[int, ThresholdTestResult] = {} + training_results: dict[int, ThresholdTestResult] = {} cached_training_data = thresholding_training_data.cache() cached_test_data = thresholding_test_data.cache() thresholding_classifier, thresholding_post_transformer = ( classifier_core.choose_classifier( - best_results.model_type, best_results.hyperparams, dep_var + best_model.model_type, best_model.hyperparams, dep_var ) ) start_time = perf_counter() @@ -400,14 +406,13 @@ def _evaluate_threshold_combinations( dep_var, ) - i = 0 for threshold_index, ( this_alpha_threshold, this_threshold_ratio, - ) in enumerate(threshold_matrix, 1): + ) in enumerate(threshold_matrix, 0): diag = ( - f"Predicting with threshold matrix entry {threshold_index} of {len(threshold_matrix)}: " + f"Predicting with threshold matrix entry {threshold_index+1} of {len(threshold_matrix)}: " f"{this_alpha_threshold=} and {this_threshold_ratio=}" ) logger.debug(diag) @@ -432,32 +437,30 @@ def _evaluate_threshold_combinations( info = f"Predictions for test-train data on threshold took {end_predict_time - start_predict_time:.2f}s" logger.debug(info) - results_dfs[i] = self._capture_results( + prediction_results[threshold_index] = self._capture_prediction_results( predictions, - predict_train, dep_var, thresholding_model, - results_dfs[i], suspicious_data, this_alpha_threshold, this_threshold_ratio, - best_results.score, + best_model.score, ) - i += 1 - - for i in range(len(threshold_matrix)): - thresholded_metrics_df = _append_results( - thresholded_metrics_df, - results_dfs[i], - best_results.model_type, - best_results.hyperparams, + training_results[threshold_index] = self._capture_training_results( + predict_train, + dep_var, + thresholding_model, + suspicious_data, + this_alpha_threshold, + this_threshold_ratio, + best_model.score, ) thresholding_test_data.unpersist() thresholding_training_data.unpersist() - return thresholded_metrics_df, suspicious_data + return prediction_results, training_results, suspicious_data def _run(self) -> None: training_section_name = str(self.task.training_conf) @@ -487,6 +490,12 @@ def _run(self) -> None: if outer_fold_count < 3: raise RuntimeError("You must use at least three outer folds.") + # At the end we combine this information collected from every outer fold + threshold_test_results: list[ThresholdTestResult] = [] + threshold_training_results: list[ThresholdTestResult] + all_suspicious_data: list[Any] = [] + best_models: list[ModelEval] = [] + seed = training_settings.get("seed", 2133) outer_folds = self._get_outer_folds(prepped_data, id_a, outer_fold_count, seed) @@ -523,9 +532,15 @@ def _run(self) -> None: f"Take the best hyper-parameter set from {len(hyperparam_evaluation_results)} results and test every threshold combination against it..." ) - thresholded_metrics_df, suspicious_data = ( + # Note: We may change this to contain a list of best per model or something else + # but for now it's a single ModelEval instance -- the one with the highest score. + best_model = self._choose_best_training_results( + hyperparam_evaluation_results + ) + + prediction_results, training_results, suspicious_data_for_threshold = ( self._evaluate_threshold_combinations( - hyperparam_evaluation_results, + best_model, suspicious_data, {"test": outer_test_data, "training": outer_training_data}, dep_var, @@ -534,16 +549,33 @@ def _run(self) -> None: ) ) - # thresholded_metrics_df has one row per threshold combination. and each outer fold - thresholded_metrics_df = _load_thresholded_metrics_df_params( - thresholded_metrics_df - ) - _print_thresholded_metrics_df( - thresholded_metrics_df.sort_values(by="mcc_test_mean", ascending=False) + # Collect the outputs for each fold + threshold_test_results.append(prediction_results) + threshold_training_results.append(training_results) + all_suspicious_data.append(suspicious_data_for_threshold) + best_models.append(best_model) + + combined_test = (_combine_by_threshold_matrix_entry(prediction_results),) + combined_train = (_combine_by_threshold_matrix_entry(training_results),) + + threshold_matrix_size = len(threshold_test_results[0]) + + thresholded_metrics_df = _create_thresholded_metrics_df() + for i in range(threshold_matrix_size): + thresholded_metrics_df = _aggregate_per_threshold_results( + thresholded_metrics_df, combined_test[i], combined_train[i], best_models ) print("*** Final thresholded metrics ***") + # thresholded_metrics_df has one row per threshold combination. and each outer fold + thresholded_metrics_df = _load_thresholded_metrics_df_params( + thresholded_metrics_df + ) + _print_thresholded_metrics_df( + thresholded_metrics_df.sort_values(by="mcc_test_mean", ascending=False) + ) + self._save_training_results(thresholded_metrics_df, self.task.spark) self._save_suspicious_data(suspicious_data, self.task.spark) self.task.spark.sql("set spark.sql.shuffle.partitions=200") @@ -637,29 +669,51 @@ def _get_splits( ) return splits - def _capture_results( + def _capture_training_results( self, - predictions: pyspark.sql.DataFrame, predict_train: pyspark.sql.DataFrame, dep_var: str, model: Model, - results_df: pd.DataFrame, suspicious_data: dict[str, Any] | None, alpha_threshold: float, threshold_ratio: float | None, pr_auc: float, - ) -> pd.DataFrame: + ) -> ThresholdTestResult: table_prefix = self.task.table_prefix + predict_train.createOrReplaceTempView(f"{table_prefix}predict_train") + ( + train_TP_count, + train_FP_count, + train_FN_count, + train_TN_count, + ) = _get_confusion_matrix(predict_train, dep_var, suspicious_data) + train_precision, train_recall, train_mcc = _get_aggregate_metrics( + train_TP_count, train_FP_count, train_FN_count, train_TN_count + ) + result = ThresholdTestResult( + precision=train_precision, + recall=train_recall, + mcc=train_mcc, + pr_auc=pr_auc, + model_id=model, + alpha_threshold=alpha_threshold, + threshold_ratio=threshold_ratio, + ) + return result + def _capture_prediction_results( + self, + predictions: pyspark.sql.DataFrame, + dep_var: str, + model: Model, + suspicious_data: dict[str, Any] | None, + alpha_threshold: float, + threshold_ratio: float | None, + pr_auc: float, + ) -> pd.DataFrame: + table_prefix = self.task.table_prefix # write to sql tables for testing predictions.createOrReplaceTempView(f"{table_prefix}predictions") - predict_train.createOrReplaceTempView(f"{table_prefix}predict_train") - # print("------------------------------------------------------------") - # print(f"Capturing predictions:") - # predictions.show() - # print(f"Capturing predict_train:") - # predict_train.show() - # print("------------------------------------------------------------") ( test_TP_count, @@ -671,31 +725,17 @@ def _capture_results( test_TP_count, test_FP_count, test_FN_count, test_TN_count ) - ( - train_TP_count, - train_FP_count, - train_FN_count, - train_TN_count, - ) = _get_confusion_matrix(predict_train, dep_var, suspicious_data) - train_precision, train_recall, train_mcc = _get_aggregate_metrics( - train_TP_count, train_FP_count, train_FN_count, train_TN_count + result = ThresholdTestResult( + precision=test_precision, + recall=test_recall, + mcc=test_mcc, + pr_auc=pr_auc, + model_id=model, + alpha_threshold=alpha_threshold, + threshold_ratio=threshold_ratio, ) - new_results = pd.DataFrame( - { - "precision_test": [test_precision], - "recall_test": [test_recall], - "precision_train": [train_precision], - "recall_train": [train_recall], - "pr_auc": [pr_auc], - "test_mcc": [test_mcc], - "train_mcc": [train_mcc], - "model_id": [model], - "alpha_threshold": [alpha_threshold], - "threshold_ratio": [threshold_ratio], - }, - ) - return pd.concat([results_df, new_results], ignore_index=True) + return result def _save_training_results( self, desc_df: pd.DataFrame, spark: pyspark.sql.SparkSession @@ -950,52 +990,78 @@ def _get_aggregate_metrics( return precision, recall, mcc -def _create_results_df() -> pd.DataFrame: - return pd.DataFrame( - columns=[ - "precision_test", - "recall_test", - "precision_train", - "recall_train", - "pr_auc", - "test_mcc", - "train_mcc", - "model_id", - "alpha_threshold", - "threshold_ratio", - ] - ) +# The outer list entries hold results from each outer fold, the inner list has a ThresholdTestResult per threshold +# matrix entry. We need to get data for each threshold entry together. Basically we need to invert the data. +def _combine_by_threshold_matrix_entry( + threshold_results: list[list[ThresholdTestResult]], +) -> list[ThresholdTestResult]: + # This list will have a size of the number of threshold matrix entries + results: list[ThresholdTestResult] = [] + + if len(threshold_results) < 2: + raise RuntimeError( + "Can't combine threshold results from less than two outer folds." + ) + + if len(threshold_results[0]) == 0: + raise RuntimeError( + "No entries in the first set of threshold results; can't determine threshold matrix size." + ) + + inferred_threshold_matrix_size = len(threshold_results[0]) + for t in range(inferred_threshold_matrix_size): + results[t] = None -def _append_results( + for fold_results in threshold_results: + for t in range(inferred_threshold_matrix_size): + results[t].append(fold_results[t]) + + return results + + +def _aggregate_per_threshold_results( thresholded_metrics_df: pd.DataFrame, - results_df: pd.DataFrame, - model_type: str, - params: dict[str, Any], + prediction_results: list[ThresholdTestResult], + training_results: list[ThresholdTestResult], + best_models: list[ModelEval], ) -> pd.DataFrame: - # run.pop("type") - # print(f"appending results_df : {results_df}") + + # The threshold is the same for all entries in the lists + alpha_threshold = prediction_results[0].alpha_threshold + threshold_ratio = prediction_results[0].threshold_ratio + + # Pull out columns to be aggregated + precision_test = [r.precision for r in prediction_results] + recall_test = [r.recall for r in prediction_results] + pr_auc_test = [r.pr_auc for r in prediction_results] + mcc_test = [r.mcc for r in prediction_results] + + precision_train = [r.precision for r in training_results] + recall_train = [r.recall for r in training_results] + pr_auc_train = [r.pr_auc for r in training_results] + mcc_train = [r.mcc for r in training_results] new_desc = pd.DataFrame( { - "model": [model_type], - "parameters": [params], - "alpha_threshold": [results_df["alpha_threshold"][0]], - "threshold_ratio": [results_df["threshold_ratio"][0]], - "precision_test_mean": [results_df["precision_test"].mean()], - "precision_test_sd": [results_df["precision_test"].std()], - "recall_test_mean": [results_df["recall_test"].mean()], - "recall_test_sd": [results_df["recall_test"].std()], - "pr_auc_mean": [results_df["pr_auc"].mean()], - "pr_auc_sd": [results_df["pr_auc"].std()], - "mcc_test_mean": [results_df["test_mcc"].mean()], - "mcc_test_sd": [results_df["test_mcc"].std()], - "precision_train_mean": [results_df["precision_train"].mean()], - "precision_train_sd": [results_df["precision_train"].std()], - "recall_train_mean": [results_df["recall_train"].mean()], - "recall_train_sd": [results_df["recall_train"].std()], - "mcc_train_mean": [results_df["train_mcc"].mean()], - "mcc_train_sd": [results_df["train_mcc"].std()], + "model": [best_models[0].model_type], + "parameters": [best_models[0].hyperparams], + "alpha_threshold": [alpha_threshold], + "threshold_ratio": [threshold_ratio], + "precision_test_mean": [statistics.mean(precision_test)], + "precision_test_sd": [statistics.stdev(precision_test)], + "recall_test_mean": [statistics.mean(recall_test)], + "recall_test_sd": [statistics.stdev(recall_test)], + "pr_auc_test_mean": [statistics.mean(pr_auc_test)], + "pr_auc_test_sd": [statistics.stdev(pr_auc_test)], + "mcc_test_mean": [statistics.mean(mcc_test)], + "mcc_test_sd": [statistics.stdev(mcc_test)], + "precision_train_mean": [statistics.mean(precision_train)], + "precision_train_sd": [statistics.stdev(precision_train)], + "recall_train_mean": [statistics.mean(recall_train)], + "recall_train_sd": [statistics.stdev(recall_train)], + "mcc_train_mean": [statistics.mean(mcc_train)], + "mcc_train_sd": [statistics.stdev(mcc_train)], }, ) @@ -1049,17 +1115,6 @@ def _load_thresholded_metrics_df_params(desc_df: pd.DataFrame) -> pd.DataFrame: return desc_df -def _create_probability_metrics_df() -> pd.DataFrame: - return pd.DataFrame( - columns=[ - "model", - "parameters", - "pr_auc_mean", - "pr_auc_standard_deviation", - ] - ) - - def _create_thresholded_metrics_df() -> pd.DataFrame: return pd.DataFrame( columns=[ From dd49937691fab3fccd9124d62d20fd1dbf8a7b8e Mon Sep 17 00:00:00 2001 From: Colin Davis Date: Mon, 9 Dec 2024 12:28:21 -0600 Subject: [PATCH 2/5] WIP on correct metrics output; some tests break because of not enough threshold matrix entries --- .../link_step_train_test_models.py | 110 +++++++----------- hlink/tests/model_exploration_test.py | 2 +- 2 files changed, 46 insertions(+), 66 deletions(-) diff --git a/hlink/linking/model_exploration/link_step_train_test_models.py b/hlink/linking/model_exploration/link_step_train_test_models.py index 58c92c6..e5f4769 100644 --- a/hlink/linking/model_exploration/link_step_train_test_models.py +++ b/hlink/linking/model_exploration/link_step_train_test_models.py @@ -347,7 +347,7 @@ def _evaluate_threshold_combinations( dep_var: str, id_a: str, id_b: str, - ) -> tuple[pd.DataFrame, Any]: + ) -> tuple[dict[int, pd.DataFrame], Any]: training_config_name = str(self.task.training_conf) config = self.task.link_run.config @@ -371,8 +371,8 @@ def _evaluate_threshold_combinations( info = f"\nTesting the best model + parameters against all {len(threshold_matrix)} threshold combinations.\n" logger.debug(info) - prediction_results = dict[int, ThresholdTestResult] = {} - training_results: dict[int, ThresholdTestResult] = {} + prediction_results: dict[int, ThresholdTestResult] = {} + # training_results: dict[int, ThresholdTestResult] = {} cached_training_data = thresholding_training_data.cache() cached_test_data = thresholding_test_data.cache() @@ -397,6 +397,7 @@ def _evaluate_threshold_combinations( id_b, dep_var, ) + """ thresholding_predict_train = _get_probability_and_select_pred_columns( cached_training_data, thresholding_model, @@ -405,6 +406,7 @@ def _evaluate_threshold_combinations( id_b, dep_var, ) + """ for threshold_index, ( this_alpha_threshold, @@ -418,6 +420,7 @@ def _evaluate_threshold_combinations( logger.debug(diag) decision = training_settings.get("decision") start_predict_time = perf_counter() + predictions = threshold_core.predict_using_thresholds( thresholding_predictions, this_alpha_threshold, @@ -425,6 +428,7 @@ def _evaluate_threshold_combinations( id_column, decision, ) + """ predict_train = threshold_core.predict_using_thresholds( thresholding_predict_train, this_alpha_threshold, @@ -432,6 +436,7 @@ def _evaluate_threshold_combinations( id_column, decision, ) + """ end_predict_time = perf_counter() info = f"Predictions for test-train data on threshold took {end_predict_time - start_predict_time:.2f}s" @@ -446,7 +451,7 @@ def _evaluate_threshold_combinations( this_threshold_ratio, best_model.score, ) - + """ training_results[threshold_index] = self._capture_training_results( predict_train, dep_var, @@ -456,11 +461,12 @@ def _evaluate_threshold_combinations( this_threshold_ratio, best_model.score, ) + """ thresholding_test_data.unpersist() thresholding_training_data.unpersist() - return prediction_results, training_results, suspicious_data + return prediction_results, suspicious_data def _run(self) -> None: training_section_name = str(self.task.training_conf) @@ -482,7 +488,8 @@ def _run(self) -> None: ) # Stores suspicious data - suspicious_data = self._create_suspicious_data(id_a, id_b) + # suspicious_data = self._create_suspicious_data(id_a, id_b) + suspicious_data = None outer_fold_count = training_settings.get("n_training_iterations", 10) inner_fold_count = 3 @@ -492,7 +499,7 @@ def _run(self) -> None: # At the end we combine this information collected from every outer fold threshold_test_results: list[ThresholdTestResult] = [] - threshold_training_results: list[ThresholdTestResult] + # threshold_training_results: list[ThresholdTestResult] all_suspicious_data: list[Any] = [] best_models: list[ModelEval] = [] @@ -538,7 +545,7 @@ def _run(self) -> None: hyperparam_evaluation_results ) - prediction_results, training_results, suspicious_data_for_threshold = ( + prediction_results, suspicious_data_for_threshold = ( self._evaluate_threshold_combinations( best_model, suspicious_data, @@ -551,19 +558,24 @@ def _run(self) -> None: # Collect the outputs for each fold threshold_test_results.append(prediction_results) - threshold_training_results.append(training_results) - all_suspicious_data.append(suspicious_data_for_threshold) + # threshold_training_results.append(training_results) + # all_suspicious_data.append(suspicious_data_for_threshold) best_models.append(best_model) - combined_test = (_combine_by_threshold_matrix_entry(prediction_results),) - combined_train = (_combine_by_threshold_matrix_entry(training_results),) + combined_test = _combine_by_threshold_matrix_entry(threshold_test_results) + # combined_train = (_combine_by_threshold_matrix_entry(training_results),) + # there are 'm' threshold_test_results items matching the number of + # inner folds. Each entry has 'n' items matching the number of + # threshold matrix entries. threshold_matrix_size = len(threshold_test_results[0]) thresholded_metrics_df = _create_thresholded_metrics_df() for i in range(threshold_matrix_size): + print(type(combined_test[i])) + print(combined_test[i]) thresholded_metrics_df = _aggregate_per_threshold_results( - thresholded_metrics_df, combined_test[i], combined_train[i], best_models + thresholded_metrics_df, combined_test[i], best_models ) print("*** Final thresholded metrics ***") @@ -577,7 +589,7 @@ def _run(self) -> None: ) self._save_training_results(thresholded_metrics_df, self.task.spark) - self._save_suspicious_data(suspicious_data, self.task.spark) + # self._save_suspicious_data(suspicious_data, self.task.spark) self.task.spark.sql("set spark.sql.shuffle.partitions=200") def _split_into_folds( @@ -669,38 +681,6 @@ def _get_splits( ) return splits - def _capture_training_results( - self, - predict_train: pyspark.sql.DataFrame, - dep_var: str, - model: Model, - suspicious_data: dict[str, Any] | None, - alpha_threshold: float, - threshold_ratio: float | None, - pr_auc: float, - ) -> ThresholdTestResult: - table_prefix = self.task.table_prefix - predict_train.createOrReplaceTempView(f"{table_prefix}predict_train") - ( - train_TP_count, - train_FP_count, - train_FN_count, - train_TN_count, - ) = _get_confusion_matrix(predict_train, dep_var, suspicious_data) - train_precision, train_recall, train_mcc = _get_aggregate_metrics( - train_TP_count, train_FP_count, train_FN_count, train_TN_count - ) - result = ThresholdTestResult( - precision=train_precision, - recall=train_recall, - mcc=train_mcc, - pr_auc=pr_auc, - model_id=model, - alpha_threshold=alpha_threshold, - threshold_ratio=threshold_ratio, - ) - return result - def _capture_prediction_results( self, predictions: pyspark.sql.DataFrame, @@ -710,7 +690,7 @@ def _capture_prediction_results( alpha_threshold: float, threshold_ratio: float | None, pr_auc: float, - ) -> pd.DataFrame: + ) -> ThresholdTestResult: table_prefix = self.task.table_prefix # write to sql tables for testing predictions.createOrReplaceTempView(f"{table_prefix}predictions") @@ -993,16 +973,16 @@ def _get_aggregate_metrics( # The outer list entries hold results from each outer fold, the inner list has a ThresholdTestResult per threshold # matrix entry. We need to get data for each threshold entry together. Basically we need to invert the data. def _combine_by_threshold_matrix_entry( - threshold_results: list[list[ThresholdTestResult]], + threshold_results: list[dict[int, ThresholdTestResult]], ) -> list[ThresholdTestResult]: # This list will have a size of the number of threshold matrix entries results: list[ThresholdTestResult] = [] + # Check number of folds if len(threshold_results) < 2: - raise RuntimeError( - "Can't combine threshold results from less than two outer folds." - ) + raise RuntimeError("Must have at least two outer folds.") + # Check if there are more than 0 threshold matrix entries if len(threshold_results[0]) == 0: raise RuntimeError( "No entries in the first set of threshold results; can't determine threshold matrix size." @@ -1011,36 +991,40 @@ def _combine_by_threshold_matrix_entry( inferred_threshold_matrix_size = len(threshold_results[0]) for t in range(inferred_threshold_matrix_size): - results[t] = None + # One list per threshold matrix entry + results.append([]) for fold_results in threshold_results: for t in range(inferred_threshold_matrix_size): - results[t].append(fold_results[t]) - + threshold_results_for_this_fold = fold_results[t] + results[t].append(threshold_results_for_this_fold) return results def _aggregate_per_threshold_results( thresholded_metrics_df: pd.DataFrame, prediction_results: list[ThresholdTestResult], - training_results: list[ThresholdTestResult], + # training_results: list[ThresholdTestResult], best_models: list[ModelEval], ) -> pd.DataFrame: - # The threshold is the same for all entries in the lists alpha_threshold = prediction_results[0].alpha_threshold threshold_ratio = prediction_results[0].threshold_ratio # Pull out columns to be aggregated - precision_test = [r.precision for r in prediction_results] - recall_test = [r.recall for r in prediction_results] + precision_test = [ + r.precision for r in prediction_results if r.precision is not np.nan + ] + recall_test = [r.recall for r in prediction_results if r.recall is not np.NaN] pr_auc_test = [r.pr_auc for r in prediction_results] mcc_test = [r.mcc for r in prediction_results] + """ precision_train = [r.precision for r in training_results] recall_train = [r.recall for r in training_results] pr_auc_train = [r.pr_auc for r in training_results] mcc_train = [r.mcc for r in training_results] + """ new_desc = pd.DataFrame( { @@ -1056,12 +1040,6 @@ def _aggregate_per_threshold_results( "pr_auc_test_sd": [statistics.stdev(pr_auc_test)], "mcc_test_mean": [statistics.mean(mcc_test)], "mcc_test_sd": [statistics.stdev(mcc_test)], - "precision_train_mean": [statistics.mean(precision_train)], - "precision_train_sd": [statistics.stdev(precision_train)], - "recall_train_mean": [statistics.mean(recall_train)], - "recall_train_sd": [statistics.stdev(recall_train)], - "mcc_train_mean": [statistics.mean(mcc_train)], - "mcc_train_sd": [statistics.stdev(mcc_train)], }, ) @@ -1127,7 +1105,8 @@ def _create_thresholded_metrics_df() -> pd.DataFrame: "recall_test_mean", "recall_test_sd", "mcc_test_mean", - "mcc_test_sd", + "mcc_test_sd" + """ "precision_train_mean", "precision_train_sd", "recall_train_mean", @@ -1136,6 +1115,7 @@ def _create_thresholded_metrics_df() -> pd.DataFrame: "pr_auc_sd", "mcc_train_mean", "mcc_train_sd", + """, ] ) diff --git a/hlink/tests/model_exploration_test.py b/hlink/tests/model_exploration_test.py index f9b8a73..cc2e9c1 100644 --- a/hlink/tests/model_exploration_test.py +++ b/hlink/tests/model_exploration_test.py @@ -584,7 +584,7 @@ def feature_conf(training_conf): training_conf["training"]["independent_vars"] = ["namelast_jw", "regionf"] training_conf["training"]["model_parameters"] = [] - training_conf["training"]["n_training_iterations"] = 2 + training_conf["training"]["n_training_iterations"] = 3 return training_conf From a041274285cf1eb2c7db197e5338a0d374e5d519 Mon Sep 17 00:00:00 2001 From: Colin Davis Date: Mon, 9 Dec 2024 15:57:52 -0600 Subject: [PATCH 3/5] Cleaning up metrics --- .../link_step_train_test_models.py | 56 +++++++------------ hlink/tests/model_exploration_test.py | 4 +- 2 files changed, 21 insertions(+), 39 deletions(-) diff --git a/hlink/linking/model_exploration/link_step_train_test_models.py b/hlink/linking/model_exploration/link_step_train_test_models.py index e5f4769..a2e65c5 100644 --- a/hlink/linking/model_exploration/link_step_train_test_models.py +++ b/hlink/linking/model_exploration/link_step_train_test_models.py @@ -572,8 +572,7 @@ def _run(self) -> None: thresholded_metrics_df = _create_thresholded_metrics_df() for i in range(threshold_matrix_size): - print(type(combined_test[i])) - print(combined_test[i]) + print(f"Aggregate threshold matrix entry {i}") thresholded_metrics_df = _aggregate_per_threshold_results( thresholded_metrics_df, combined_test[i], best_models ) @@ -1007,6 +1006,7 @@ def _aggregate_per_threshold_results( # training_results: list[ThresholdTestResult], best_models: list[ModelEval], ) -> pd.DataFrame: + # The threshold is the same for all entries in the lists alpha_threshold = prediction_results[0].alpha_threshold threshold_ratio = prediction_results[0].threshold_ratio @@ -1015,16 +1015,17 @@ def _aggregate_per_threshold_results( precision_test = [ r.precision for r in prediction_results if r.precision is not np.nan ] - recall_test = [r.recall for r in prediction_results if r.recall is not np.NaN] - pr_auc_test = [r.pr_auc for r in prediction_results] - mcc_test = [r.mcc for r in prediction_results] + recall_test = [r.recall for r in prediction_results if r.recall is not np.nan] + pr_auc_test = [r.pr_auc for r in prediction_results if r.pr_auc is not np.nan] + mcc_test = [r.mcc for r in prediction_results if r.mcc is not np.nan] - """ - precision_train = [r.precision for r in training_results] - recall_train = [r.recall for r in training_results] - pr_auc_train = [r.pr_auc for r in training_results] - mcc_train = [r.mcc for r in training_results] - """ + # # variance requires at least two values + precision_test_sd = ( + statistics.stdev(precision_test) if len(precision_test) > 1 else np.nan + ) + recall_test_sd = statistics.stdev(recall_test) if len(recall_test) > 1 else np.nan + pr_auc_test_sd = statistics.stdev(pr_auc_test) if len(pr_auc_test) > 1 else np.nan + mcc_test_sd = statistics.stdev(mcc_test) if len(mcc_test) > 1 else np.nan new_desc = pd.DataFrame( { @@ -1033,13 +1034,13 @@ def _aggregate_per_threshold_results( "alpha_threshold": [alpha_threshold], "threshold_ratio": [threshold_ratio], "precision_test_mean": [statistics.mean(precision_test)], - "precision_test_sd": [statistics.stdev(precision_test)], + "precision_test_sd": [precision_test_sd], "recall_test_mean": [statistics.mean(recall_test)], - "recall_test_sd": [statistics.stdev(recall_test)], + "recall_test_sd": [recall_test_sd], "pr_auc_test_mean": [statistics.mean(pr_auc_test)], - "pr_auc_test_sd": [statistics.stdev(pr_auc_test)], + "pr_auc_test_sd": [pr_auc_test_sd], "mcc_test_mean": [statistics.mean(mcc_test)], - "mcc_test_sd": [statistics.stdev(mcc_test)], + "mcc_test_sd": [mcc_test_sd], }, ) @@ -1052,17 +1053,8 @@ def _aggregate_per_threshold_results( def _print_thresholded_metrics_df(desc_df: pd.DataFrame) -> None: pd.set_option("display.max_colwidth", None) - print( - desc_df.drop( - [ - "recall_test_sd", - "recall_train_sd", - "precision_test_sd", - "precision_train_sd", - ], - axis=1, - ).iloc[-1] - ) + print(desc_df.iloc[-1]) + print("\n") @@ -1105,17 +1097,7 @@ def _create_thresholded_metrics_df() -> pd.DataFrame: "recall_test_mean", "recall_test_sd", "mcc_test_mean", - "mcc_test_sd" - """ - "precision_train_mean", - "precision_train_sd", - "recall_train_mean", - "recall_train_sd", - "pr_auc_mean", - "pr_auc_sd", - "mcc_train_mean", - "mcc_train_sd", - """, + "mcc_test_sd", ] ) diff --git a/hlink/tests/model_exploration_test.py b/hlink/tests/model_exploration_test.py index cc2e9c1..30bca92 100644 --- a/hlink/tests/model_exploration_test.py +++ b/hlink/tests/model_exploration_test.py @@ -725,7 +725,7 @@ def test_step_2_train_logistic_regression_spark( tr = spark.table("model_eval_training_results").toPandas() - assert tr.shape == (1, 9) + assert tr.shape == (1, 11) # This is now 0.83333333333.... I'm not sure it's worth testing against # assert tr.query("model == 'logistic_regression'")["pr_auc_mean"].iloc[0] == 0.75 assert tr.query("model == 'logistic_regression'")["pr_auc_mean"].iloc[0] > 0.74 @@ -754,7 +754,7 @@ def test_step_2_train_decision_tree_spark( print(f"Decision tree results: {tr}") # TODO This is 1,12 instead of 1,13, because the precision_test_mean column is dropped as it is NaN - assert tr.shape == (1, 12) + assert tr.shape == (1, 13) # assert tr.query("model == 'decision_tree'")["precision_test_mean"].iloc[0] > 0 assert tr.query("model == 'decision_tree'")["maxDepth"].iloc[0] == 3 assert tr.query("model == 'decision_tree'")["minInstancesPerNode"].iloc[0] == 1 From f0833781d0205f4989005b5ef19ada3ac24caf8f Mon Sep 17 00:00:00 2001 From: Colin Davis Date: Tue, 10 Dec 2024 11:25:45 -0600 Subject: [PATCH 4/5] Tests pass --- .../link_step_train_test_models.py | 26 ++++++++++++++++--- hlink/tests/model_exploration_test.py | 12 ++++++--- 2 files changed, 31 insertions(+), 7 deletions(-) diff --git a/hlink/linking/model_exploration/link_step_train_test_models.py b/hlink/linking/model_exploration/link_step_train_test_models.py index a2e65c5..070c1da 100644 --- a/hlink/linking/model_exploration/link_step_train_test_models.py +++ b/hlink/linking/model_exploration/link_step_train_test_models.py @@ -975,7 +975,7 @@ def _combine_by_threshold_matrix_entry( threshold_results: list[dict[int, ThresholdTestResult]], ) -> list[ThresholdTestResult]: # This list will have a size of the number of threshold matrix entries - results: list[ThresholdTestResult] = [] + results: list[list[ThresholdTestResult]] = [] # Check number of folds if len(threshold_results) < 2: @@ -1027,15 +1027,35 @@ def _aggregate_per_threshold_results( pr_auc_test_sd = statistics.stdev(pr_auc_test) if len(pr_auc_test) > 1 else np.nan mcc_test_sd = statistics.stdev(mcc_test) if len(mcc_test) > 1 else np.nan + # Deal with tiny test data. This should never arise in practice but if it did we ought + # to issue a warning. + if len(precision_test) < 1: + # raise RuntimeError("Not enough training data to get any valid precision values.") + precision_test_mean = np.nan + else: + precision_test_mean = ( + statistics.mean(precision_test) + if len(precision_test) > 1 + else precision_test[0] + ) + + if len(recall_test) < 1: + # raise RuntimeError("Not enough training data to get any valid recall values.") + recall_test_mean = np.nan + else: + recall_test_mean = ( + statistics.mean(recall_test) if len(recall_test) > 1 else recall_test[0] + ) + new_desc = pd.DataFrame( { "model": [best_models[0].model_type], "parameters": [best_models[0].hyperparams], "alpha_threshold": [alpha_threshold], "threshold_ratio": [threshold_ratio], - "precision_test_mean": [statistics.mean(precision_test)], + "precision_test_mean": [precision_test_mean], "precision_test_sd": [precision_test_sd], - "recall_test_mean": [statistics.mean(recall_test)], + "recall_test_mean": [recall_test_mean], "recall_test_sd": [recall_test_sd], "pr_auc_test_mean": [statistics.mean(pr_auc_test)], "pr_auc_test_sd": [pr_auc_test_sd], diff --git a/hlink/tests/model_exploration_test.py b/hlink/tests/model_exploration_test.py index 30bca92..46166c5 100644 --- a/hlink/tests/model_exploration_test.py +++ b/hlink/tests/model_exploration_test.py @@ -684,7 +684,6 @@ def test_step_2_train_random_forest_spark( "featureSubsetStrategy": "sqrt", } ] - feature_conf["training"]["output_suspicious_TD"] = True feature_conf["training"]["n_training_iterations"] = 3 model_exploration.run_step(0) @@ -694,9 +693,12 @@ def test_step_2_train_random_forest_spark( tr = spark.table("model_eval_training_results").toPandas() print(f"training results {tr}") # assert tr.shape == (1, 18) - assert tr.query("model == 'random_forest'")["pr_auc_mean"].iloc[0] > 2.0 / 3.0 + assert tr.query("model == 'random_forest'")["pr_auc_test_mean"].iloc[0] > 2.0 / 3.0 assert tr.query("model == 'random_forest'")["maxDepth"].iloc[0] == 3 + # TODO probably remove these since we're not planning to test suspicious data anymore. + # I disabled the saving of suspicious in this test config so these are invalid currently. + """ FNs = spark.table("model_eval_repeat_fns").toPandas() assert FNs.shape == (3, 4) assert FNs.query("id_a == 30")["count"].iloc[0] == 3 @@ -706,6 +708,7 @@ def test_step_2_train_random_forest_spark( TNs = spark.table("model_eval_repeat_tns").toPandas() assert TNs.shape == (6, 4) + """ main.do_drop_all("") @@ -717,18 +720,19 @@ def test_step_2_train_logistic_regression_spark( feature_conf["training"]["model_parameters"] = [ {"type": "logistic_regression", "threshold": 0.7} ] - feature_conf["training"]["n_training_iterations"] = 4 + feature_conf["training"]["n_training_iterations"] = 3 model_exploration.run_step(0) model_exploration.run_step(1) model_exploration.run_step(2) tr = spark.table("model_eval_training_results").toPandas() + # assert tr.count == 3 assert tr.shape == (1, 11) # This is now 0.83333333333.... I'm not sure it's worth testing against # assert tr.query("model == 'logistic_regression'")["pr_auc_mean"].iloc[0] == 0.75 - assert tr.query("model == 'logistic_regression'")["pr_auc_mean"].iloc[0] > 0.74 + assert tr.query("model == 'logistic_regression'")["pr_auc_test_mean"].iloc[0] > 0.74 assert ( round(tr.query("model == 'logistic_regression'")["alpha_threshold"].iloc[0], 1) == 0.7 From 1f162dc0926e69e745b051143eca7d1285915d9c Mon Sep 17 00:00:00 2001 From: Colin Davis Date: Tue, 10 Dec 2024 12:41:37 -0600 Subject: [PATCH 5/5] Adjust hh model exploration test for new column names, no training columns and nnot saving suspicious data. --- hlink/tests/hh_model_exploration_test.py | 22 +++++++++------------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/hlink/tests/hh_model_exploration_test.py b/hlink/tests/hh_model_exploration_test.py index edda799..baa4d33 100644 --- a/hlink/tests/hh_model_exploration_test.py +++ b/hlink/tests/hh_model_exploration_test.py @@ -57,10 +57,7 @@ def test_all_hh_mod_ev( "precision_test_mean", "recall_test_mean", "mcc_test_mean", - "precision_train_mean", - "recall_train_mean", - "pr_auc_mean", - "mcc_train_mean", + "pr_auc_test_mean", ] # TODO we should expect to get most of these columns once the results reporting is finished. @@ -75,14 +72,8 @@ def test_all_hh_mod_ev( "recall_test_sd", "mcc_test_sd", "mcc_test_mean", - "precision_train_mean", - "precision_train_sd", - "recall_train_mean", - "recall_train_sd", - "pr_auc_mean", - "pr_auc_sd", - "mcc_train_mean", - "mcc_train_sd", + "pr_auc_test_mean", + "pr_auc_test_sd", "maxDepth", "numTrees", ] @@ -97,7 +88,9 @@ def test_all_hh_mod_ev( ) assert tr.query("model == 'logistic_regression'")["alpha_threshold"].iloc[0] == 0.5 assert ( - 0.7 < tr.query("model == 'logistic_regression'")["pr_auc_mean"].iloc[0] <= 1.0 + 0.7 + < tr.query("model == 'logistic_regression'")["pr_auc_test_mean"].iloc[0] + <= 1.0 ) assert ( 0.9 @@ -131,6 +124,8 @@ def test_all_hh_mod_ev( assert 0.0 < pm0["second_best_prob"].iloc[0] < 0.5 """ + # Not saving predict-train test results anymore + """ pred_train = spark.table("hh_model_eval_predict_train").toPandas() assert all( elem in list(pred_train.columns) @@ -145,6 +140,7 @@ def test_all_hh_mod_ev( "match", ] ) + """ # TODO the exact links are different. """