-

Feature Selection transforms

-

Each header below represents a feature selection transform. These transforms are used in the context of feature_selections.

-
[[feature_selections]]
-input_column = "clean_birthyr"
-output_column = "replaced_birthyr"
-condition = "case when clean_birthyr is null or clean_birthyr == '' then year - age else clean_birthyr end"
-transform = "sql_condition"
-
-
-

There are some additional attributes available for all transforms: checkpoint, override_column_a, override_column_b, set_value_column_a, set_value_column_b.

+

Feature Selection Transforms

+

Each feature selection in the [[feature_selections]] list must have a +transform attribute which tells hlink which transform it uses. The available +feature selection transforms are listed below. The attributes of the feature +selection often vary with the feature selection transform. However, there are a +few utility attributes which are available for all transforms:

+
    +
  • override_column_a - Type: string. Optional. Given the name of a column in +dataset A, copy that column into the output column instead of computing the +feature selection for dataset A. This does not affect dataset B.

  • +
  • override_column_b - Type: string. Optional. Given the name of a column in +dataset B, copy that column into the output column instead of computing the +feature selection for dataset B. This does not affect dataset A.

  • +
  • set_value_column_a - Type: any. Optional. Instead of computing the feature +selection for dataset A, use the given value for every row in the output +column. This does not affect dataset B.

  • +
  • set_value_column_b - Type: any. Optional. Instead of computing the feature +selection for dataset B, use the given value for every row in the output +column. This does not affect dataset A.

  • +
  • checkpoint - Type: boolean. Optional. If set to true, checkpoint the +dataset in Spark before computing the feature selection. This can reduce some +resource usage for very complex workflows, but should not be necessary.

  • +

bigrams

Split the given string column into bigrams.

diff --git a/docs/objects.inv b/docs/objects.inv index 1fe6cd1df376b9375c621a27eca302534c7e09ce..cdb55e660a0ded999710629d753fcc1e8df1b482 100644 GIT binary patch delta 420 zcmV;V0bBl%1e64jdVfo993c>f_ne}<&qVSzyC{mKNU=1sEqRH`O%2@|UgD8>cD{zM zmy;NpX1a+-_71-PuLlJTRwZq?M&~9Y1B58P2|K{<8iro_I!3MF#|+_ifo!nap`U^^ zJf0l=m!Q{LhDKqgUm$uk;b6T66k=yhMku8xiILzy7?UqHG=Bm{n*xnii3^V0i#bfP z@1T%Yx2%)LI9bD%ARb83PzXPF(VgKVU(I_{9dO?q@Njl8NoF#Fp68SbpGcLYjn)Le z%95?NrSit{NGi{gAX*sJWRCw{UIS!X(q!U%o~s-mA}TfS`ZOzV^;VL@qPHRq@55H! z!dAjc7d&D(c7MZKX$|?E_wZ2mOj|*!GgnfLxAVG~E=rc^rv5Z>2}Q?dO7AbKO;;UM~xT zei-}ml~wnB)%Pfgh4d8S6jQmD?(Chg#BCAFKZ(;bY8|Kgp>27M`!4>V8S<1Ktv?== O{R=-AoxK5p^^@o%+s>i@ delta 416 zcmV;R0bl-<1ds%fdVkAK10fWK_dEset%+NA#%N-qNn=gAB*)>DnUq_&XuI<@d_A8; z7zSt1MtAW2|G990&d8`VQ(*0&g{MC7D{eZvx};qzUGzaodNV@$I6=}GWx);JXcmqZ zeNQZITph_-`F zYTdk!4#HqGnSz){iCre_yNl)wfB8z@yW*gm>X^^ti9kJQj)yU&gcp=KX{9B>eO{8; zHdS6YwkXpqkp@G1Iq3c8`-i8=78U9+p64vZ`v6joyEe_tYrUDIu;2`j-S4oO*RYw8 z+yx8Z_sy_oT7NV&KH5{K_H(W(W2TvZLE)>PeW_EaZ#tM6)g0~tTV!aTK027F1enHP_CDF1lRR# z`NE2OQ}i_mBtE|QFoc+|r8#>eOmRy@(obS|0A*!6v>7EYao7A6beE>IHEzF?=8fJ? KI{5 Date: Mon, 26 Aug 2024 19:29:30 +0000 Subject: [PATCH 06/13] Add a test for the error when there's an unrecognized feature selection transform --- hlink/tests/core/transforms_test.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/hlink/tests/core/transforms_test.py b/hlink/tests/core/transforms_test.py index 7b6041f..7c83775 100644 --- a/hlink/tests/core/transforms_test.py +++ b/hlink/tests/core/transforms_test.py @@ -192,3 +192,16 @@ def test_generate_transforms_override_column_b( Row(id=2, mother_nativity=0, test_override_column=-1, mbpl_range=-1), Row(id=3, mother_nativity=6, test_override_column=-1, mbpl_range=-1), ] + + +@pytest.mark.parametrize("is_a", [True, False]) +def test_generate_transforms_error_when_unrecognized_transform( + spark: SparkSession, preprocessing: LinkTask, is_a: bool +) -> None: + feature_selections = [ + {"input_column": "age", "output_column": "age2", "transform": "not_supported"} + ] + df = spark.createDataFrame([], "id:integer, age:integer") + + with pytest.raises(ValueError, match="Invalid transform type"): + generate_transforms(spark, df, feature_selections, preprocessing, is_a, "id") From d7b6c4b896fa4f74fec19e09d754b64d93d9c2f2 Mon Sep 17 00:00:00 2001 From: rileyh Date: Mon, 26 Aug 2024 19:53:45 +0000 Subject: [PATCH 07/13] Add type hints to core.transforms.apply_transform() --- hlink/linking/core/transforms.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/hlink/linking/core/transforms.py b/hlink/linking/core/transforms.py index e7073b5..c0eac7b 100755 --- a/hlink/linking/core/transforms.py +++ b/hlink/linking/core/transforms.py @@ -26,7 +26,7 @@ ) from pyspark.sql.types import ArrayType, LongType, StringType from pyspark.ml import Pipeline -from pyspark.sql import DataFrame, SparkSession, Window +from pyspark.sql import Column, DataFrame, SparkSession, Window from pyspark.ml.feature import NGram, RegexTokenizer, CountVectorizer, MinHashLSH @@ -402,7 +402,9 @@ def get_transforms(name: str, is_a: bool) -> list[dict[str, Any]]: # These apply to the column mappings in the current config -def apply_transform(column_select, transform, is_a): +def apply_transform( + column_select: Column, transform: dict[str, Any], is_a: bool +) -> Column: """Given a dataframe select string return a new string having applied the given transform. column_select: A PySpark column type transform: The transform info from the current config From ccb665215b92a95d468c16ac8fb1d696e7831cc4 Mon Sep 17 00:00:00 2001 From: rileyh Date: Mon, 26 Aug 2024 20:02:48 +0000 Subject: [PATCH 08/13] Update the documentation for core.transforms.apply_transform() --- hlink/linking/core/transforms.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/hlink/linking/core/transforms.py b/hlink/linking/core/transforms.py index c0eac7b..0c771f0 100755 --- a/hlink/linking/core/transforms.py +++ b/hlink/linking/core/transforms.py @@ -405,12 +405,15 @@ def get_transforms(name: str, is_a: bool) -> list[dict[str, Any]]: def apply_transform( column_select: Column, transform: dict[str, Any], is_a: bool ) -> Column: - """Given a dataframe select string return a new string having applied the given transform. - column_select: A PySpark column type - transform: The transform info from the current config - is_a: Is running on dataset 'a' or 'b ? - - See the json_schema config file in config_schemas/config.json for definitions on each transform type. + """Return a new column that is the result of applying the given transform + to the given input column (column_select). The is_a parameter controls the + behavior of the transforms like "add_to_a" which act differently on + datasets A and B. + + Args: + column_select: a PySpark Column + transform: the transform to apply + is_a: whether this is dataset A (True) or B (False) """ transform_type = transform["type"] if transform_type == "add_to_a": From 8e3ddb82efeb8ceb0305d7102ee02c7d4f0d1c82 Mon Sep 17 00:00:00 2001 From: rileyh Date: Mon, 26 Aug 2024 20:31:49 +0000 Subject: [PATCH 09/13] Add a test for the when_value column mapping transform --- hlink/tests/core/transforms_test.py | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/hlink/tests/core/transforms_test.py b/hlink/tests/core/transforms_test.py index 7c83775..e56a662 100644 --- a/hlink/tests/core/transforms_test.py +++ b/hlink/tests/core/transforms_test.py @@ -1,7 +1,8 @@ from pyspark.sql import Row, SparkSession +from pyspark.sql.functions import col import pytest -from hlink.linking.core.transforms import generate_transforms +from hlink.linking.core.transforms import apply_transform, generate_transforms from hlink.linking.link_task import LinkTask @@ -205,3 +206,22 @@ def test_generate_transforms_error_when_unrecognized_transform( with pytest.raises(ValueError, match="Invalid transform type"): generate_transforms(spark, df, feature_selections, preprocessing, is_a, "id") + + +@pytest.mark.parametrize("is_a", [True, False]) +def test_apply_transform_when_value(spark: SparkSession, is_a: bool) -> None: + transform = {"type": "when_value", "value": 6, "if_value": 0, "else_value": 1} + column_select = col("marst") + output_col = apply_transform(column_select, transform, is_a) + + df = spark.createDataFrame([[3], [6], [2], [6], [1]], "marst:integer") + transformed = df.select("marst", output_col.alias("output")) + result = transformed.collect() + + assert result == [ + Row(marst=3, output=1), + Row(marst=6, output=0), + Row(marst=2, output=1), + Row(marst=6, output=0), + Row(marst=1, output=1), + ] From 1b768a0b672a159a0103c8def5c6278fea380392 Mon Sep 17 00:00:00 2001 From: rileyh Date: Mon, 26 Aug 2024 20:49:03 +0000 Subject: [PATCH 10/13] Add a test to check the error when you pass an unrecognized transform type to apply_transform() --- hlink/tests/core/transforms_test.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/hlink/tests/core/transforms_test.py b/hlink/tests/core/transforms_test.py index e56a662..54220e1 100644 --- a/hlink/tests/core/transforms_test.py +++ b/hlink/tests/core/transforms_test.py @@ -210,6 +210,13 @@ def test_generate_transforms_error_when_unrecognized_transform( @pytest.mark.parametrize("is_a", [True, False]) def test_apply_transform_when_value(spark: SparkSession, is_a: bool) -> None: + """The when_value transform supports simple if-then-otherwise logic on + columns: + + if the column is equal to "when_value" + then return "if_value" + otherwise return "else_value" + """ transform = {"type": "when_value", "value": 6, "if_value": 0, "else_value": 1} column_select = col("marst") output_col = apply_transform(column_select, transform, is_a) @@ -225,3 +232,11 @@ def test_apply_transform_when_value(spark: SparkSession, is_a: bool) -> None: Row(marst=6, output=0), Row(marst=1, output=1), ] + + +@pytest.mark.parametrize("is_a", [True, False]) +def test_apply_transform_error_when_unrecognized_transform_type(is_a: bool) -> None: + column_select = col("test") + transform = {"type": "not_supported"} + with pytest.raises(ValueError, match="Invalid transform type"): + apply_transform(column_select, transform, is_a) From 903a2009ec5775b7a9e584b232e8db39db7e34a9 Mon Sep 17 00:00:00 2001 From: rileyh Date: Tue, 27 Aug 2024 14:35:19 +0000 Subject: [PATCH 11/13] Add a test for the SparkFactory class --- hlink/tests/spark_factory_test.py | 35 +++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 hlink/tests/spark_factory_test.py diff --git a/hlink/tests/spark_factory_test.py b/hlink/tests/spark_factory_test.py new file mode 100644 index 0000000..895131c --- /dev/null +++ b/hlink/tests/spark_factory_test.py @@ -0,0 +1,35 @@ +from pathlib import Path + +from pyspark.sql import Row + +from hlink.spark.factory import SparkFactory + + +def test_spark_factory_can_create_spark_session(tmp_path: Path) -> None: + derby_dir = tmp_path / "derby" + spark_tmp_dir = tmp_path / "tmp" + warehouse_dir = tmp_path / "warehouse" + + factory = ( + SparkFactory() + .set_local() + .set_derby_dir(derby_dir) + .set_warehouse_dir(warehouse_dir) + .set_tmp_dir(spark_tmp_dir) + .set_num_cores(1) + .set_executor_cores(1) + .set_executor_memory("1G") + ) + spark = factory.create() + + # Make sure we can do some basic operations with the SparkSession we get back + df = spark.createDataFrame( + [[0, "a"], [1, "b"], [2, "c"]], "id:integer,letter:string" + ) + expr = (df.letter == "b").alias("equals_b") + result = df.select(expr).collect() + assert result == [ + Row(equals_b=False), + Row(equals_b=True), + Row(equals_b=False), + ] From 41588413a56370c743a1824a3a4a26b44116566b Mon Sep 17 00:00:00 2001 From: rileyh Date: Tue, 27 Aug 2024 15:29:46 +0000 Subject: [PATCH 12/13] Add a test for the remove_punctuation column mapping transform --- hlink/tests/core/transforms_test.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/hlink/tests/core/transforms_test.py b/hlink/tests/core/transforms_test.py index 54220e1..e71650e 100644 --- a/hlink/tests/core/transforms_test.py +++ b/hlink/tests/core/transforms_test.py @@ -234,6 +234,32 @@ def test_apply_transform_when_value(spark: SparkSession, is_a: bool) -> None: ] +@pytest.mark.parametrize("is_a", [True, False]) +def test_apply_transform_remove_punctuation(spark: SparkSession, is_a: bool) -> None: + transform = {"type": "remove_punctuation"} + input_col = col("input") + output_col = apply_transform(input_col, transform, is_a) + + df = spark.createDataFrame( + [ + # All of these characters are considered punctuation and should be removed + ["?-\\/\"':,.[]{}"], + ["abcdefghijklmnop"], + # The address of the Minnesota state capitol + ["75 Rev. Dr. Martin Luther King, Jr. Blvd. Saint Paul, MN 55155"], + ], + "input:string", + ) + transformed = df.select(output_col.alias("output")) + result = transformed.collect() + + assert result == [ + Row(output=""), + Row(output="abcdefghijklmnop"), + Row(output="75 Rev Dr Martin Luther King Jr Blvd Saint Paul MN 55155"), + ] + + @pytest.mark.parametrize("is_a", [True, False]) def test_apply_transform_error_when_unrecognized_transform_type(is_a: bool) -> None: column_select = col("test") From e4c9941141153f31aded705d95ff7ceb770d150b Mon Sep 17 00:00:00 2001 From: rileyh Date: Tue, 27 Aug 2024 18:20:54 +0000 Subject: [PATCH 13/13] Add a test for the substring column mapping transform This confirms that the transform handles the case where the values list doesn't have length 2 by raising an error. This prompted me to make issue #146, which I think should really simplify this transform. --- hlink/tests/core/transforms_test.py | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/hlink/tests/core/transforms_test.py b/hlink/tests/core/transforms_test.py index e71650e..f072333 100644 --- a/hlink/tests/core/transforms_test.py +++ b/hlink/tests/core/transforms_test.py @@ -260,6 +260,35 @@ def test_apply_transform_remove_punctuation(spark: SparkSession, is_a: bool) -> ] +@pytest.mark.parametrize("values", [[1], [1, 2, 3]]) +@pytest.mark.parametrize("is_a", [True, False]) +def test_apply_transform_substring_error_when_not_exactly_2_values( + values: list[int], is_a: bool +) -> None: + """ + The substring transform takes a list of exactly two values, which are the + start position of the substring and its length. If the list has the wrong + number of values, then apply_transform() raises an error. + + TODO: It would be simpler to have two separate attributes for the substring + start and length, like this: + + { + "type": "substring", + "start_index": 0, + "length": 4, + } + + See issue #146. Making these changes would eliminate the need for this + test. + """ + input_col = col("input") + transform = {"type": "substring", "values": values} + + with pytest.raises(ValueError, match="Length of substr transform should be 2"): + apply_transform(input_col, transform, is_a) + + @pytest.mark.parametrize("is_a", [True, False]) def test_apply_transform_error_when_unrecognized_transform_type(is_a: bool) -> None: column_select = col("test")