Skip to content

Commit

Permalink
[#161, #162] Rename some variables and add logging in training step 3
Browse files Browse the repository at this point in the history
  • Loading branch information
riley-harper committed Nov 21, 2024
1 parent 010f46a commit 7864432
Showing 1 changed file with 33 additions and 15 deletions.
48 changes: 33 additions & 15 deletions hlink/linking/training/link_step_save_model_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
# in this project's top-level directory, and also on-line at:
# https://github.com/ipums/hlink

import logging

from pyspark.sql.types import (
FloatType,
IntegerType,
Expand All @@ -13,6 +15,8 @@

from hlink.linking.link_step import LinkStep

logger = logging.getLogger(__name__)


class LinkStepSaveModelMetadata(LinkStep):
"""Save metadata about the trained machine learning model.
Expand Down Expand Up @@ -44,6 +48,9 @@ def _run(self):
do_get_feature_importances = config[training_conf].get("feature_importances")

if do_get_feature_importances is None or not do_get_feature_importances:
logger.info(
"Skipping training step 3 - save model metadata since training.feature_importances is not set"
)
print(
"Skipping the save model metadata training step. "
"To run this step and save model metadata like feature importances, "
Expand All @@ -70,39 +77,45 @@ def _run(self):
vector_assembler = pipeline_model.stages[0]
model = pipeline_model.stages[1]

column_names = vector_assembler.getInputCols()
feature_names = vector_assembler.getInputCols()
logger.debug(f"Feature names are {feature_names}")
tf_prepped = self.task.spark.table(f"{table_prefix}training_features_prepped")
tf_prepped_schema = dict(tf_prepped.dtypes)
tf_prepped_row = tf_prepped.head()

# Expand categorical features into multiple columns for display with their
# respective coefficients / feature importances.
true_cols = []
for col in column_names:
# Expand categorical features into multiple rows for display with their
# respective coefficients / feature importances per category. Non-categorical
# features just get one entry that looks like ("feature_name", None).
expanded_features: list[(str, int | None)] = []
for feature_name in feature_names:
# Columns with type "vector" are categorical and may have more than one coefficient.
# Many of these columns end with "_onehotencoded", and we remove that
# suffix to clean up the column names. Categorical columns created through
# feature interaction will probably not have the "_onehotencoded" suffix,
# so we can't just check for that to find the categorical features.
data_type = tf_prepped_schema[col]
data_type = tf_prepped_schema[feature_name]
if data_type == "vector":
base_col = col.removesuffix("_onehotencoded")
num_categories = len(tf_prepped_row[col])
true_cols.extend((base_col, i) for i in range(num_categories))
base_name = feature_name.removesuffix("_onehotencoded")
num_categories = len(tf_prepped_row[feature_name])
# Categories are numeric, starting at 0.
expanded_features.extend(
(base_name, category) for category in range(num_categories)
)
else:
base_col = col.removesuffix("_imp")
true_cols.append((base_col, None))
base_name = feature_name.removesuffix("_imp")
expanded_features.append((base_name, None))

true_column_names = [column_name for (column_name, _) in true_cols]
true_categories = [category for (_, category) in true_cols]
model_type = config[training_conf]["chosen_model"]["type"]

logger.debug(f"Expanded features with categories are {expanded_features}")
logger.debug(f"The model type is '{model_type}'")

print("Retrieving model feature importances or coefficients...")

if model_type == "xgboost":
raw_weights = model.get_feature_importances("weight")
raw_gains = model.get_feature_importances("total_gain")
keys = [f"f{index}" for index in range(len(true_cols))]
keys = [f"f{index}" for index in range(len(expanded_features))]

weights = [raw_weights.get(key, 0.0) for key in keys]
gains = [raw_gains.get(key, 0.0) for key in keys]
Expand All @@ -129,6 +142,9 @@ def _run(self):
try:
feature_imp = model.featureImportances
except:
logger.warning(
f"Cannot compute feature importances for model of type '{model_type}'"
)
print(
"This model doesn't contain a coefficient or feature importances parameter -- check chosen model type."
)
Expand All @@ -153,9 +169,11 @@ def _run(self):
),
]

logger.debug("Creating the DataFrame and saving it as a table")
feature_names, categories = zip(*expanded_features)
importance_schema, importance_data = zip(*importance_columns)
features_df = self.task.spark.createDataFrame(
zip(true_column_names, true_categories, *importance_data, strict=True),
zip(feature_names, categories, *importance_data, strict=True),
StructType(
[
StructField("feature_name", StringType(), nullable=False),
Expand Down

0 comments on commit 7864432

Please sign in to comment.