Skip to content

Commit

Permalink
[#162] Get LightGBM to work with bucketized features
Browse files Browse the repository at this point in the history
The Spark Bucketizer adds commas to vector slot names, which cause
problems with LightGBM later in the pipeline. This is similar to the
issue with colons for Interaction, but the metadata for bucketized
vectors is a little bit different. So RenameVectorAttributes needed to
change a bit to handle the two different forms of metadata.
  • Loading branch information
riley-harper committed Dec 3, 2024
1 parent 96b0e0f commit ac3bb71
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 10 deletions.
6 changes: 6 additions & 0 deletions hlink/linking/core/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,13 @@ def generate_pipeline_stages(conf, ind_vars, tf, tconf):
inputCol=input_col,
outputCol=pipeline_feature["output_column"],
)
remove_commas_from_bucketizer_vector = RenameVectorAttributes(
inputCol=bucketizer.getOutputCol(),
strsToReplace=[","],
replaceWith="",
)
pipeline_stages.append(bucketizer)
pipeline_stages.append(remove_commas_from_bucketizer_vector)

elif pipeline_feature["transformer_type"] == "interaction":
input_cols = []
Expand Down
26 changes: 17 additions & 9 deletions hlink/linking/transformers/rename_vector_attributes.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,21 +68,29 @@ def _transform(self, dataset: DataFrame) -> DataFrame:
to_replace = self.getOrDefault("strsToReplace")
replacement_str = self.getOrDefault("replaceWith")
metadata = dataset.schema[input_col].metadata
attributes_by_type = metadata["ml_attr"]["attrs"]

logger.debug(
f"Renaming the attributes of vector column '{input_col}': "
f"replacing {to_replace} with '{replacement_str}'"
)

# The attributes are grouped by type, which may be numeric, binary, or
# nominal. We don't care about the type here; we'll just rename all of
# the attributes.
for _attribute_type, attributes in attributes_by_type.items():
for attribute in attributes:
if "attrs" in metadata["ml_attr"]:
attributes_by_type = metadata["ml_attr"]["attrs"]

# The attributes are grouped by type, which may be numeric, binary, or
# nominal. We don't care about the type here; we'll just rename all of
# the attributes.
for _attribute_type, attributes in attributes_by_type.items():
for attribute in attributes:
for substring in to_replace:
attribute["name"] = attribute["name"].replace(
substring, replacement_str
)
elif "vals" in metadata["ml_attr"]:
values = metadata["ml_attr"]["vals"]

for index in range(len(values)):
for substring in to_replace:
attribute["name"] = attribute["name"].replace(
substring, replacement_str
)
values[index] = values[index].replace(substring, replacement_str)

return dataset.withMetadata(input_col, metadata)
68 changes: 68 additions & 0 deletions hlink/tests/training_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,14 @@ def test_step_2_bucketizer(spark, main, conf):
prep_pipeline = Pipeline(stages=pipeline_stages)
prep_model = prep_pipeline.fit(tf)
prepped_data = prep_model.transform(tf)

prepped_data.show()
metadata = prepped_data.schema["features_vector"].metadata
attributes_by_type = metadata["ml_attr"]["attrs"]
for _attribute_type, attributes in attributes_by_type.items():
for attribute in attributes:
print(f"""'{attribute["name"]}'""")

prepped_data = prepped_data.toPandas()

assert prepped_data.shape == (8, 7)
Expand Down Expand Up @@ -499,6 +507,11 @@ def test_step_3_with_lightgbm_model(
def test_lightgbm_with_interacted_features(
spark, training, training_conf, datasource_training_input
):
"""
Interacted features add colons to vector attribute names, which cause
problems for LightGBM. Hlink handles this automatically by renaming the
vector attributes to remove the colons before invoking LightGBM.
"""
training_data_path, prepped_df_a_path, prepped_df_b_path = datasource_training_input
training_conf["comparison_features"] = [
{
Expand Down Expand Up @@ -544,6 +557,61 @@ def test_lightgbm_with_interacted_features(

training.run_all_steps()

importances_df = spark.table("training_feature_importances")
importances_df.show()
assert importances_df.columns == [
"feature_name",
"category",
"weight",
"gain",
]


def test_lightgbm_with_bucketized_features(
spark, training, training_conf, datasource_training_input
):
"""
Bucketized features add commas to vector attribute names, which cause
problems for LightGBM. Hlink handles this automatically by renaming the
vector attributes to remove the commas before invoking LightGBM.
"""
training_data_path, prepped_df_a_path, prepped_df_b_path = datasource_training_input
training_conf["comparison_features"] = [
{
"alias": "namelast_jw",
"column_name": "namelast",
"comparison_type": "jaro_winkler",
},
]
training_conf["pipeline_features"] = [
{
"input_column": "namelast_jw",
"output_column": "namelast_jw_buckets",
"transformer_type": "bucketizer",
"categorical": True,
"splits": [0.0, 0.33, 0.67, 1.0],
}
]
training_conf["training"]["dataset"] = training_data_path
training_conf["training"]["dependent_var"] = "match"
training_conf["training"]["independent_vars"] = [
"namelast_jw_buckets",
]
training_conf["training"]["chosen_model"] = {
"type": "lightgbm",
"threshold": 0.5,
}
training_conf["training"]["score_with_model"] = True
training_conf["training"]["feature_importances"] = True

prepped_df_a = spark.read.csv(prepped_df_a_path, header=True, inferSchema=True)
prepped_df_b = spark.read.csv(prepped_df_b_path, header=True, inferSchema=True)

prepped_df_a.write.mode("overwrite").saveAsTable("prepped_df_a")
prepped_df_b.write.mode("overwrite").saveAsTable("prepped_df_b")

training.run_all_steps()

importances_df = spark.table("training_feature_importances")
assert importances_df.columns == [
"feature_name",
Expand Down
26 changes: 25 additions & 1 deletion hlink/tests/transformers_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# https://github.com/ipums/hlink

from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import Bucketizer, VectorAssembler

from hlink.linking.transformers.rename_vector_attributes import RenameVectorAttributes

Expand Down Expand Up @@ -51,3 +51,27 @@ def test_rename_vector_attributes_multiple_replacements(spark: SparkSession) ->
attrs = df.schema["vector"].metadata["ml_attr"]["attrs"]["numeric"]
attr_names = [attr["name"] for attr in attrs]
assert attr_names == ["column1hasstars", "column2multiplesymbols"]


def test_rename_vector_attributes_on_bucketized_feature(spark: SparkSession) -> None:
df = spark.createDataFrame(
[[0.1, 0.7, 0.2, 0.5, 0.8, 0.2, 0.3]], schema=["namelast_jw"]
)

bucketizer = Bucketizer(
inputCol="namelast_jw",
outputCol="namelast_jw_buckets",
splits=[0.0, 0.33, 0.67, 1.0],
)
rename_attrs = RenameVectorAttributes(
inputCol="namelast_jw_buckets", strsToReplace=[","], replaceWith=""
)
transformed = rename_attrs.transform(bucketizer.transform(df))

# Save to Java, then reload to confirm that the metadata changes are persistent
transformed.write.mode("overwrite").saveAsTable("transformed")
output_df = spark.table("transformed")

# Bucketized vectors have different metadata
values = output_df.schema["namelast_jw_buckets"].metadata["ml_attr"]["vals"]
assert values == ["0.0 0.33", "0.33 0.67", "0.67 1.0"]

0 comments on commit ac3bb71

Please sign in to comment.