Skip to content

Commit

Permalink
No errors, use model exploration approach that should get pr_auc mean…
Browse files Browse the repository at this point in the history
… and test all threshold matrix members against that set of params. Still has a failure.
  • Loading branch information
ccdavis committed Nov 15, 2024
1 parent 3b84f26 commit 62ff6e6
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 33 deletions.
114 changes: 81 additions & 33 deletions hlink/linking/model_exploration/link_step_train_test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# in this project's top-level directory, and also on-line at:
# https://github.com/ipums/hlink

import statistics
import itertools
import logging
import math
Expand Down Expand Up @@ -52,7 +53,7 @@ def _run(self) -> None:
dep_var = config[training_conf]["dependent_var"]
id_a = config["id_column"] + "_a"
id_b = config["id_column"] + "_b"
desc_df = _create_desc_df()
thresholded_metrics_df = _create_thresholded_metrics_df()
columns_to_keep = [id_a, id_b, "features_vector", dep_var]
prepped_data = (
self.task.spark.table(f"{table_prefix}training_vectorized")
Expand All @@ -74,6 +75,8 @@ def _run(self) -> None:
f"There are {len(model_parameters)} sets of model parameters to explore; "
f"each of these has {n_training_iterations} train-test splits to test on"
)

probability_metrics_df = _create_probability_metrics_df()
for run_index, run in enumerate(model_parameters, 1):
run_start_info = f"Starting run {run_index} of {len(model_parameters)} with these parameters: {run}"
print(run_start_info)
Expand Down Expand Up @@ -144,13 +147,7 @@ def _run(self) -> None:

pr_auc = auc(recall, precision)
print(f"The area under the precision-recall curve is {pr_auc}")
splits_results.append(
{
"auc": pr_auc,
"predictions_tmp": predictions_tmp,
"predict_train_tmp": predict_train_tmp,
}
)
splits_results.append(pr_auc)

if first:
prc = pd.DataFrame(
Expand All @@ -173,16 +170,50 @@ def _run(self) -> None:
training_data.unpersist()
test_data.unpersist()

print(f"split_results: {len(splits_results)}")
# pluck out predictions_tmp, predict_train_tmp associated with highest pr_auc
best_pr_auc = 0.0
best_predictions_tmp = None
best_predict_train_tmp = None
for a in splits_results:
if a["auc"] > best_pr_auc:
best_pr_auc = a["auc"]
best_predictions_tmp = a["predictions_tmp"]
best_predict_train_tmp = a["predict_train_tmp"]
# Aggregate pr auc mean, median, std
auc_mean = statistics.mean(splits_results)
auc_std = statistics.stdev(splits_results)
pr_auc_dict = {
"auc_mean": auc_mean,
"auc_standard_deviation": auc_std,
"model": model_type,
"params": params,
}
print(f"PR AUC for splits on current model and params: {pr_auc_dict}")
this_model_results = pd.DataFrame(pr_auc_dict)
probability_metrics_df = pd.concat(
[probability_metrics_df, this_model_results]
)

# TODO check if we should make a different split, like starting from a different seed?
# or just not re-using one we used in making the PR_AUC mean value?
splits_for_thresholding_eval = splits[0]
thresholding_training_data = splits_for_thresholding_eval[0]
thresholding_test_data = splits_for_thresholding_eval[1]

thresholding_classifier, thresholding_post_transformer = (
classifier_core.choose_classifier(
pr_auc_dict["model"], pr_auc_dict["params"], dep_var
)
)
thresholding_model = classifier.fit(thresholding_training_data)

thresholding_predictions = _get_probability_and_select_pred_columns(
thresholding_test_data,
thresholding_model,
thresholding_post_transformer,
id_a,
id_b,
dep_var,
).cache()
thresholding_predict_train = _get_probability_and_select_pred_columns(
thresholding_training_data,
thresholding_model,
thresholding_post_transformer,
id_a,
id_b,
dep_var,
).cache()

i = 0
for threshold_index, (alpha_threshold, threshold_ratio) in enumerate(
Expand All @@ -193,14 +224,14 @@ def _run(self) -> None:
f"{alpha_threshold=} and {threshold_ratio=}"
)
predictions = threshold_core.predict_using_thresholds(
best_predictions_tmp,
thresholding_predictions,
alpha_threshold,
threshold_ratio,
config[training_conf],
config["id_column"],
)
predict_train = threshold_core.predict_using_thresholds(
best_predict_train_tmp,
thresholding_predict_train,
alpha_threshold,
threshold_ratio,
config[training_conf],
Expand All @@ -211,21 +242,25 @@ def _run(self) -> None:
predictions,
predict_train,
dep_var,
model,
thresholding_model,
results_dfs[i],
otd_data,
alpha_threshold,
threshold_ratio,
best_pr_auc,
pr_auc_dict["auc_mean"],
)
i += 1

for i in range(len(threshold_matrix)):
desc_df = _append_results(desc_df, results_dfs[i], model_type, params)
thresholded_metrics_df = _append_results(
thresholded_metrics_df, results_dfs[i], model_type, params
)

_print_desc_df(desc_df)
desc_df = _load_desc_df_params(desc_df)
self._save_training_results(desc_df, self.task.spark)
_print_thresholded_metrics_df(thresholded_metrics_df)
thresholded_metrics_df = _load_thresholded_metrics_df_params(
thresholded_metrics_df
)
self._save_training_results(thresholded_metrics_df, self.task.spark)
self._save_otd_data(otd_data, self.task.spark)
self.task.spark.sql("set spark.sql.shuffle.partitions=200")

Expand Down Expand Up @@ -611,7 +646,7 @@ def _create_results_df() -> pd.DataFrame:


def _append_results(
desc_df: pd.DataFrame,
thresholded_metrics_df: pd.DataFrame,
results_df: pd.DataFrame,
model_type: str,
params: dict[str, Any],
Expand Down Expand Up @@ -642,12 +677,14 @@ def _append_results(
},
)

desc_df = pd.concat([desc_df, new_desc], ignore_index=True)
_print_desc_df(desc_df)
return desc_df
thresholded_metrics_df = pd.concat(
[thresholded_metrics_df, new_desc], ignore_index=True
)
_print_thresholded_metrics_df(thresholded_metrics_df)
return thresholded_metrics_df


def _print_desc_df(desc_df: pd.DataFrame) -> None:
def _print_thresholded_metrics_df(desc_df: pd.DataFrame) -> None:
pd.set_option("display.max_colwidth", None)
print(
desc_df.drop(
Expand All @@ -663,7 +700,7 @@ def _print_desc_df(desc_df: pd.DataFrame) -> None:
print("\n")


def _load_desc_df_params(desc_df: pd.DataFrame) -> pd.DataFrame:
def _load_thresholded_metrics_df_params(desc_df: pd.DataFrame) -> pd.DataFrame:
params = [
"maxDepth",
"numTrees",
Expand All @@ -690,11 +727,22 @@ def _load_desc_df_params(desc_df: pd.DataFrame) -> pd.DataFrame:
return desc_df


def _create_desc_df() -> pd.DataFrame:
def _create_probability_metrics_df() -> pd.DataFrame:
return pd.DataFrame(
columns=[
"model",
"parameters",
"pr_auc_mean",
"pr_auc_standard_deviation",
]
)


def _create_thresholded_metrics_df() -> pd.DataFrame:
return pd.DataFrame(
columns=[
"model",
"pa rameters",
"alpha_threshold",
"threshold_ratio",
"precision_test_mean",
Expand Down
1 change: 1 addition & 0 deletions hlink/tests/model_exploration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ def test_step_2_train_random_forest_spark(
model_exploration.run_step(2)

tr = spark.table("model_eval_training_results").toPandas()
print(f"training results {tr}")
# assert tr.shape == (1, 18)
assert tr.query("model == 'random_forest'")["pr_auc_mean"].iloc[0] > 0.7
assert tr.query("model == 'random_forest'")["maxDepth"].iloc[0] == 3
Expand Down

0 comments on commit 62ff6e6

Please sign in to comment.