From 8f951fb7a3900753160861c2e3f8c2b6da4e1c69 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 27 Jan 2025 15:16:19 +0000 Subject: [PATCH 1/9] Bump pyspark from 3.5.3 to 3.5.4 Bumps [pyspark](https://github.com/apache/spark) from 3.5.3 to 3.5.4. - [Commits](https://github.com/apache/spark/compare/v3.5.3...v3.5.4) --- updated-dependencies: - dependency-name: pyspark dependency-type: direct:development update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] --- poetry.lock | 6 +++--- pyproject.toml | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/poetry.lock b/poetry.lock index 4b1c85b68e..e312498c83 100644 --- a/poetry.lock +++ b/poetry.lock @@ -3797,12 +3797,12 @@ files = [ [[package]] name = "pyspark" -version = "3.5.3" +version = "3.5.4" description = "Apache Spark Python API" optional = false python-versions = ">=3.8" files = [ - {file = "pyspark-3.5.3.tar.gz", hash = "sha256:68b7cc0c0c570a7d8644f49f40d2da8709b01d30c9126cc8cf93b4f84f3d9747"}, + {file = "pyspark-3.5.4.tar.gz", hash = "sha256:1c2926d63020902163f58222466adf6f8016f6c43c1f319b8e7a71dbaa05fc51"}, ] [package.dependencies] @@ -5373,4 +5373,4 @@ zstandard = ["zstandard"] [metadata] lock-version = "2.0" python-versions = "^3.9, !=3.9.7" -content-hash = "589420084d166312bbd226bde6624cbfe8632fe8fd758c6b0af759ed10ae0120" +content-hash = "7c24f0058f31f863321733ae667976a39b800cdc943f1c0e92400a471258ffbe" diff --git a/pyproject.toml b/pyproject.toml index c71818e7ff..2ce5533206 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -92,7 +92,7 @@ requests-mock = "1.12.1" moto = { version = "^5.0.2", extras = ["server"] } typing-extensions = "4.12.2" pytest-mock = "3.14.0" -pyspark = "3.5.3" +pyspark = "3.5.4" cython = "3.0.11" deptry = ">=0.14,<0.23" docutils = "!=0.21.post1" # https://github.com/python-poetry/poetry/issues/9248#issuecomment-2026240520 From 90aa78803b740281e6eec0558b549f394fbac171 Mon Sep 17 00:00:00 2001 From: Fokko Date: Mon, 27 Jan 2025 16:19:29 +0100 Subject: [PATCH 2/9] Test out Iceberg 1.7.2 RC0 --- dev/Dockerfile | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dev/Dockerfile b/dev/Dockerfile index b55be39e9d..dab8dc85f8 100644 --- a/dev/Dockerfile +++ b/dev/Dockerfile @@ -37,7 +37,7 @@ RUN mkdir -p ${HADOOP_HOME} && mkdir -p ${SPARK_HOME} && mkdir -p /home/iceberg/ WORKDIR ${SPARK_HOME} # Remember to also update `tests/conftest`'s spark setting -ENV SPARK_VERSION=3.5.3 +ENV SPARK_VERSION=3.5.4 ENV ICEBERG_SPARK_RUNTIME_VERSION=3.5_2.12 ENV ICEBERG_VERSION=1.6.0 ENV PYICEBERG_VERSION=0.8.1 @@ -47,11 +47,11 @@ RUN curl --retry 5 -s -C - https://archive.apache.org/dist/spark/spark-${SPARK_V && rm -rf spark-${SPARK_VERSION}-bin-hadoop3.tgz # Download iceberg spark runtime -RUN curl --retry 5 -s https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-${ICEBERG_SPARK_RUNTIME_VERSION}/${ICEBERG_VERSION}/iceberg-spark-runtime-${ICEBERG_SPARK_RUNTIME_VERSION}-${ICEBERG_VERSION}.jar -Lo iceberg-spark-runtime-${ICEBERG_SPARK_RUNTIME_VERSION}-${ICEBERG_VERSION}.jar \ +RUN curl --retry 5 -s https://repository.apache.org/content/repositories/orgapacheiceberg-1180/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.7.2/iceberg-spark-runtime-3.5_2.12-1.7.2.jar -Lo iceberg-spark-runtime-${ICEBERG_SPARK_RUNTIME_VERSION}-${ICEBERG_VERSION}.jar \ && mv iceberg-spark-runtime-${ICEBERG_SPARK_RUNTIME_VERSION}-${ICEBERG_VERSION}.jar /opt/spark/jars # Download AWS bundle -RUN curl --retry 5 -s https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-aws-bundle/${ICEBERG_VERSION}/iceberg-aws-bundle-${ICEBERG_VERSION}.jar -Lo /opt/spark/jars/iceberg-aws-bundle-${ICEBERG_VERSION}.jar +RUN curl --retry 5 -s https://repository.apache.org/content/repositories/orgapacheiceberg-1180/org/apache/iceberg/iceberg-aws-bundle/1.7.2/iceberg-aws-bundle-1.7.2.jar -Lo /opt/spark/jars/iceberg-aws-bundle-${ICEBERG_VERSION}.jar COPY spark-defaults.conf /opt/spark/conf ENV PATH="/opt/spark/sbin:/opt/spark/bin:${PATH}" From 2fc9e78ff050ac85dcd1bedc2dc98977686d4099 Mon Sep 17 00:00:00 2001 From: Fokko Date: Mon, 27 Jan 2025 17:20:07 +0100 Subject: [PATCH 3/9] See if we can reduce memory --- tests/integration/test_deletes.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/integration/test_deletes.py b/tests/integration/test_deletes.py index ae03beea53..8778c6d673 100644 --- a/tests/integration/test_deletes.py +++ b/tests/integration/test_deletes.py @@ -313,6 +313,7 @@ def test_delete_partitioned_table_positional_deletes_empty_batch(spark: SparkSes @pytest.mark.filterwarnings("ignore:Merge on read is not yet supported, falling back to copy-on-write") def test_read_multiple_batches_in_task_with_position_deletes(spark: SparkSession, session_catalog: RestCatalog) -> None: identifier = "default.test_read_multiple_batches_in_task_with_position_deletes" + multiplier = 10 run_spark_commands( spark, @@ -337,7 +338,7 @@ def test_read_multiple_batches_in_task_with_position_deletes(spark: SparkSession arrow_table = pa.Table.from_arrays( [ - pa.array(list(range(1, 1001)) * 100), + pa.array(list(range(1, 1001)) * multiplier), ], schema=pa.schema([pa.field("number", pa.int32())]), ) @@ -358,7 +359,7 @@ def test_read_multiple_batches_in_task_with_position_deletes(spark: SparkSession reader = tbl.scan(row_filter="number <= 50").to_arrow_batch_reader() assert isinstance(reader, pa.RecordBatchReader) pyiceberg_count = len(reader.read_all()) - expected_count = 46 * 100 + expected_count = 46 * multiplier assert pyiceberg_count == expected_count, f"Failing check. {pyiceberg_count} != {expected_count}" From fed83e86f0b023126660dee2f237758befedca50 Mon Sep 17 00:00:00 2001 From: Fokko Date: Mon, 27 Jan 2025 17:47:13 +0100 Subject: [PATCH 4/9] Test --- tests/integration/test_deletes.py | 104 +++++++++++++++--------------- 1 file changed, 52 insertions(+), 52 deletions(-) diff --git a/tests/integration/test_deletes.py b/tests/integration/test_deletes.py index 8778c6d673..76b3a6bb61 100644 --- a/tests/integration/test_deletes.py +++ b/tests/integration/test_deletes.py @@ -309,58 +309,58 @@ def test_delete_partitioned_table_positional_deletes_empty_batch(spark: SparkSes assert len(reader.read_all()) == 0 -@pytest.mark.integration -@pytest.mark.filterwarnings("ignore:Merge on read is not yet supported, falling back to copy-on-write") -def test_read_multiple_batches_in_task_with_position_deletes(spark: SparkSession, session_catalog: RestCatalog) -> None: - identifier = "default.test_read_multiple_batches_in_task_with_position_deletes" - multiplier = 10 - - run_spark_commands( - spark, - [ - f"DROP TABLE IF EXISTS {identifier}", - f""" - CREATE TABLE {identifier} ( - number int - ) - USING iceberg - TBLPROPERTIES( - 'format-version' = 2, - 'write.delete.mode'='merge-on-read', - 'write.update.mode'='merge-on-read', - 'write.merge.mode'='merge-on-read' - ) - """, - ], - ) - - tbl = session_catalog.load_table(identifier) - - arrow_table = pa.Table.from_arrays( - [ - pa.array(list(range(1, 1001)) * multiplier), - ], - schema=pa.schema([pa.field("number", pa.int32())]), - ) - - tbl.append(arrow_table) - - run_spark_commands( - spark, - [ - f""" - DELETE FROM {identifier} WHERE number in (1, 2, 3, 4) - """, - ], - ) - - tbl.refresh() - - reader = tbl.scan(row_filter="number <= 50").to_arrow_batch_reader() - assert isinstance(reader, pa.RecordBatchReader) - pyiceberg_count = len(reader.read_all()) - expected_count = 46 * multiplier - assert pyiceberg_count == expected_count, f"Failing check. {pyiceberg_count} != {expected_count}" +# @pytest.mark.integration +# @pytest.mark.filterwarnings("ignore:Merge on read is not yet supported, falling back to copy-on-write") +# def test_read_multiple_batches_in_task_with_position_deletes(spark: SparkSession, session_catalog: RestCatalog) -> None: +# identifier = "default.test_read_multiple_batches_in_task_with_position_deletes" +# multiplier = 10 +# +# run_spark_commands( +# spark, +# [ +# f"DROP TABLE IF EXISTS {identifier}", +# f""" +# CREATE TABLE {identifier} ( +# number int +# ) +# USING iceberg +# TBLPROPERTIES( +# 'format-version' = 2, +# 'write.delete.mode'='merge-on-read', +# 'write.update.mode'='merge-on-read', +# 'write.merge.mode'='merge-on-read' +# ) +# """, +# ], +# ) +# +# tbl = session_catalog.load_table(identifier) +# +# arrow_table = pa.Table.from_arrays( +# [ +# pa.array(list(range(1, 1001)) * multiplier), +# ], +# schema=pa.schema([pa.field("number", pa.int32())]), +# ) +# +# tbl.append(arrow_table) +# +# run_spark_commands( +# spark, +# [ +# f""" +# DELETE FROM {identifier} WHERE number in (1, 2, 3, 4) +# """, +# ], +# ) +# +# tbl.refresh() +# +# reader = tbl.scan(row_filter="number <= 50").to_arrow_batch_reader() +# assert isinstance(reader, pa.RecordBatchReader) +# pyiceberg_count = len(reader.read_all()) +# expected_count = 46 * multiplier +# assert pyiceberg_count == expected_count, f"Failing check. {pyiceberg_count} != {expected_count}" @pytest.mark.integration From bca56a9af93b78dffb1c0bd404c92fae626c5cd5 Mon Sep 17 00:00:00 2001 From: Fokko Date: Mon, 27 Jan 2025 20:25:38 +0100 Subject: [PATCH 5/9] Limit parallism to reduce pressure on the memory --- tests/conftest.py | 2 + tests/integration/test_deletes.py | 103 +++++++++++++++--------------- 2 files changed, 53 insertions(+), 52 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index cfd9796312..7f784d5be2 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2433,6 +2433,8 @@ def spark() -> "SparkSession": spark = ( SparkSession.builder.appName("PyIceberg integration test") .config("spark.sql.session.timeZone", "UTC") + .config("spark.sql.shuffle.partitions", "1") + .config("spark.default.parallelism", "1") .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") .config("spark.sql.catalog.integration", "org.apache.iceberg.spark.SparkCatalog") .config("spark.sql.catalog.integration.catalog-impl", "org.apache.iceberg.rest.RESTCatalog") diff --git a/tests/integration/test_deletes.py b/tests/integration/test_deletes.py index 76b3a6bb61..ae03beea53 100644 --- a/tests/integration/test_deletes.py +++ b/tests/integration/test_deletes.py @@ -309,58 +309,57 @@ def test_delete_partitioned_table_positional_deletes_empty_batch(spark: SparkSes assert len(reader.read_all()) == 0 -# @pytest.mark.integration -# @pytest.mark.filterwarnings("ignore:Merge on read is not yet supported, falling back to copy-on-write") -# def test_read_multiple_batches_in_task_with_position_deletes(spark: SparkSession, session_catalog: RestCatalog) -> None: -# identifier = "default.test_read_multiple_batches_in_task_with_position_deletes" -# multiplier = 10 -# -# run_spark_commands( -# spark, -# [ -# f"DROP TABLE IF EXISTS {identifier}", -# f""" -# CREATE TABLE {identifier} ( -# number int -# ) -# USING iceberg -# TBLPROPERTIES( -# 'format-version' = 2, -# 'write.delete.mode'='merge-on-read', -# 'write.update.mode'='merge-on-read', -# 'write.merge.mode'='merge-on-read' -# ) -# """, -# ], -# ) -# -# tbl = session_catalog.load_table(identifier) -# -# arrow_table = pa.Table.from_arrays( -# [ -# pa.array(list(range(1, 1001)) * multiplier), -# ], -# schema=pa.schema([pa.field("number", pa.int32())]), -# ) -# -# tbl.append(arrow_table) -# -# run_spark_commands( -# spark, -# [ -# f""" -# DELETE FROM {identifier} WHERE number in (1, 2, 3, 4) -# """, -# ], -# ) -# -# tbl.refresh() -# -# reader = tbl.scan(row_filter="number <= 50").to_arrow_batch_reader() -# assert isinstance(reader, pa.RecordBatchReader) -# pyiceberg_count = len(reader.read_all()) -# expected_count = 46 * multiplier -# assert pyiceberg_count == expected_count, f"Failing check. {pyiceberg_count} != {expected_count}" +@pytest.mark.integration +@pytest.mark.filterwarnings("ignore:Merge on read is not yet supported, falling back to copy-on-write") +def test_read_multiple_batches_in_task_with_position_deletes(spark: SparkSession, session_catalog: RestCatalog) -> None: + identifier = "default.test_read_multiple_batches_in_task_with_position_deletes" + + run_spark_commands( + spark, + [ + f"DROP TABLE IF EXISTS {identifier}", + f""" + CREATE TABLE {identifier} ( + number int + ) + USING iceberg + TBLPROPERTIES( + 'format-version' = 2, + 'write.delete.mode'='merge-on-read', + 'write.update.mode'='merge-on-read', + 'write.merge.mode'='merge-on-read' + ) + """, + ], + ) + + tbl = session_catalog.load_table(identifier) + + arrow_table = pa.Table.from_arrays( + [ + pa.array(list(range(1, 1001)) * 100), + ], + schema=pa.schema([pa.field("number", pa.int32())]), + ) + + tbl.append(arrow_table) + + run_spark_commands( + spark, + [ + f""" + DELETE FROM {identifier} WHERE number in (1, 2, 3, 4) + """, + ], + ) + + tbl.refresh() + + reader = tbl.scan(row_filter="number <= 50").to_arrow_batch_reader() + assert isinstance(reader, pa.RecordBatchReader) + pyiceberg_count = len(reader.read_all()) + expected_count = 46 * 100 + assert pyiceberg_count == expected_count, f"Failing check. {pyiceberg_count} != {expected_count}" @pytest.mark.integration From f3f6295a907a1acf4b6e2ed655b85ab61408f55c Mon Sep 17 00:00:00 2001 From: Fokko Date: Mon, 27 Jan 2025 20:45:59 +0100 Subject: [PATCH 6/9] Test --- tests/integration/test_deletes.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/tests/integration/test_deletes.py b/tests/integration/test_deletes.py index ae03beea53..fd892f3d68 100644 --- a/tests/integration/test_deletes.py +++ b/tests/integration/test_deletes.py @@ -337,7 +337,7 @@ def test_read_multiple_batches_in_task_with_position_deletes(spark: SparkSession arrow_table = pa.Table.from_arrays( [ - pa.array(list(range(1, 1001)) * 100), + pa.array(list(range(1, 100)) * 10), ], schema=pa.schema([pa.field("number", pa.int32())]), ) @@ -347,9 +347,7 @@ def test_read_multiple_batches_in_task_with_position_deletes(spark: SparkSession run_spark_commands( spark, [ - f""" - DELETE FROM {identifier} WHERE number in (1, 2, 3, 4) - """, + f"DELETE FROM {identifier} WHERE number in (1, 2, 3, 4)", ], ) @@ -358,7 +356,7 @@ def test_read_multiple_batches_in_task_with_position_deletes(spark: SparkSession reader = tbl.scan(row_filter="number <= 50").to_arrow_batch_reader() assert isinstance(reader, pa.RecordBatchReader) pyiceberg_count = len(reader.read_all()) - expected_count = 46 * 100 + expected_count = 46 * 10 assert pyiceberg_count == expected_count, f"Failing check. {pyiceberg_count} != {expected_count}" From 0418008e87fc90872748ea5407227fc88eee5f34 Mon Sep 17 00:00:00 2001 From: Fokko Date: Mon, 27 Jan 2025 21:35:21 +0100 Subject: [PATCH 7/9] Revert "Bump pyspark from 3.5.3 to 3.5.4" This reverts commit 8f951fb7a3900753160861c2e3f8c2b6da4e1c69. --- poetry.lock | 6 +++--- pyproject.toml | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/poetry.lock b/poetry.lock index e312498c83..4b1c85b68e 100644 --- a/poetry.lock +++ b/poetry.lock @@ -3797,12 +3797,12 @@ files = [ [[package]] name = "pyspark" -version = "3.5.4" +version = "3.5.3" description = "Apache Spark Python API" optional = false python-versions = ">=3.8" files = [ - {file = "pyspark-3.5.4.tar.gz", hash = "sha256:1c2926d63020902163f58222466adf6f8016f6c43c1f319b8e7a71dbaa05fc51"}, + {file = "pyspark-3.5.3.tar.gz", hash = "sha256:68b7cc0c0c570a7d8644f49f40d2da8709b01d30c9126cc8cf93b4f84f3d9747"}, ] [package.dependencies] @@ -5373,4 +5373,4 @@ zstandard = ["zstandard"] [metadata] lock-version = "2.0" python-versions = "^3.9, !=3.9.7" -content-hash = "7c24f0058f31f863321733ae667976a39b800cdc943f1c0e92400a471258ffbe" +content-hash = "589420084d166312bbd226bde6624cbfe8632fe8fd758c6b0af759ed10ae0120" diff --git a/pyproject.toml b/pyproject.toml index 2ce5533206..c71818e7ff 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -92,7 +92,7 @@ requests-mock = "1.12.1" moto = { version = "^5.0.2", extras = ["server"] } typing-extensions = "4.12.2" pytest-mock = "3.14.0" -pyspark = "3.5.4" +pyspark = "3.5.3" cython = "3.0.11" deptry = ">=0.14,<0.23" docutils = "!=0.21.post1" # https://github.com/python-poetry/poetry/issues/9248#issuecomment-2026240520 From 026e020dd4ff9f6d9f97816d60fc4b5060b6ea2c Mon Sep 17 00:00:00 2001 From: Fokko Date: Mon, 27 Jan 2025 21:36:41 +0100 Subject: [PATCH 8/9] WIP --- tests/integration/test_deletes.py | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/tests/integration/test_deletes.py b/tests/integration/test_deletes.py index fd892f3d68..7dac15cd93 100644 --- a/tests/integration/test_deletes.py +++ b/tests/integration/test_deletes.py @@ -42,7 +42,8 @@ def run_spark_commands(spark: SparkSession, sqls: List[str]) -> None: @pytest.fixture() def test_table(session_catalog: RestCatalog) -> Generator[Table, None, None]: identifier = "default.__test_table" - arrow_table = pa.Table.from_arrays([pa.array([1, 2, 3, 4, 5]), pa.array(["a", "b", "c", "d", "e"])], names=["idx", "value"]) + arrow_table = pa.Table.from_arrays([pa.array([1, 2, 3, 4, 5]), pa.array(["a", "b", "c", "d", "e"])], + names=["idx", "value"]) test_table = session_catalog.create_table( identifier, schema=Schema( @@ -59,7 +60,8 @@ def test_table(session_catalog: RestCatalog) -> Generator[Table, None, None]: @pytest.mark.integration @pytest.mark.parametrize("format_version", [1, 2]) -def test_partitioned_table_delete_full_file(spark: SparkSession, session_catalog: RestCatalog, format_version: int) -> None: +def test_partitioned_table_delete_full_file(spark: SparkSession, session_catalog: RestCatalog, + format_version: int) -> None: identifier = "default.table_partitioned_delete" run_spark_commands( @@ -129,7 +131,8 @@ def test_partitioned_table_rewrite(spark: SparkSession, session_catalog: RestCat @pytest.mark.integration @pytest.mark.parametrize("format_version", [1, 2]) -def test_rewrite_partitioned_table_with_null(spark: SparkSession, session_catalog: RestCatalog, format_version: int) -> None: +def test_rewrite_partitioned_table_with_null(spark: SparkSession, session_catalog: RestCatalog, + format_version: int) -> None: identifier = "default.table_partitioned_delete" run_spark_commands( @@ -243,7 +246,8 @@ def test_delete_partitioned_table_positional_deletes(spark: SparkSession, sessio @pytest.mark.integration @pytest.mark.filterwarnings("ignore:Merge on read is not yet supported, falling back to copy-on-write") -def test_delete_partitioned_table_positional_deletes_empty_batch(spark: SparkSession, session_catalog: RestCatalog) -> None: +def test_delete_partitioned_table_positional_deletes_empty_batch(spark: SparkSession, + session_catalog: RestCatalog) -> None: identifier = "default.test_delete_partitioned_table_positional_deletes_empty_batch" run_spark_commands( @@ -337,7 +341,7 @@ def test_read_multiple_batches_in_task_with_position_deletes(spark: SparkSession arrow_table = pa.Table.from_arrays( [ - pa.array(list(range(1, 100)) * 10), + pa.array(list(range(1, 1001)) * 100), ], schema=pa.schema([pa.field("number", pa.int32())]), ) @@ -347,7 +351,9 @@ def test_read_multiple_batches_in_task_with_position_deletes(spark: SparkSession run_spark_commands( spark, [ - f"DELETE FROM {identifier} WHERE number in (1, 2, 3, 4)", + f""" + DELETE FROM {identifier} WHERE number in (1, 2, 3, 4) + """, ], ) @@ -356,7 +362,7 @@ def test_read_multiple_batches_in_task_with_position_deletes(spark: SparkSession reader = tbl.scan(row_filter="number <= 50").to_arrow_batch_reader() assert isinstance(reader, pa.RecordBatchReader) pyiceberg_count = len(reader.read_all()) - expected_count = 46 * 10 + expected_count = 46 * 100 assert pyiceberg_count == expected_count, f"Failing check. {pyiceberg_count} != {expected_count}" @@ -412,7 +418,8 @@ def test_overwrite_partitioned_table(spark: SparkSession, session_catalog: RestC @pytest.mark.integration @pytest.mark.filterwarnings("ignore:Merge on read is not yet supported, falling back to copy-on-write") -def test_partitioned_table_positional_deletes_sequence_number(spark: SparkSession, session_catalog: RestCatalog) -> None: +def test_partitioned_table_positional_deletes_sequence_number(spark: SparkSession, + session_catalog: RestCatalog) -> None: identifier = "default.table_partitioned_delete_sequence_number" # This test case is a bit more complex. Here we run a MoR delete on a file, we make sure that @@ -765,7 +772,8 @@ def test_delete_after_partition_evolution_from_partitioned(session_catalog: Rest tbl = session_catalog.create_table( identifier, schema=Schema(NestedField(1, "idx", LongType()), NestedField(2, "ts", TimestampType())), - partition_spec=PartitionSpec(PartitionField(source_id=2, field_id=1000, transform=IdentityTransform(), name="ts")), + partition_spec=PartitionSpec( + PartitionField(source_id=2, field_id=1000, transform=IdentityTransform(), name="ts")), ) tbl.append(arrow_table) From de7a491e7797284c105f2a59a4b61666af337e6a Mon Sep 17 00:00:00 2001 From: Fokko Date: Mon, 27 Jan 2025 21:38:05 +0100 Subject: [PATCH 9/9] lint --- tests/integration/test_deletes.py | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/tests/integration/test_deletes.py b/tests/integration/test_deletes.py index 7dac15cd93..ae03beea53 100644 --- a/tests/integration/test_deletes.py +++ b/tests/integration/test_deletes.py @@ -42,8 +42,7 @@ def run_spark_commands(spark: SparkSession, sqls: List[str]) -> None: @pytest.fixture() def test_table(session_catalog: RestCatalog) -> Generator[Table, None, None]: identifier = "default.__test_table" - arrow_table = pa.Table.from_arrays([pa.array([1, 2, 3, 4, 5]), pa.array(["a", "b", "c", "d", "e"])], - names=["idx", "value"]) + arrow_table = pa.Table.from_arrays([pa.array([1, 2, 3, 4, 5]), pa.array(["a", "b", "c", "d", "e"])], names=["idx", "value"]) test_table = session_catalog.create_table( identifier, schema=Schema( @@ -60,8 +59,7 @@ def test_table(session_catalog: RestCatalog) -> Generator[Table, None, None]: @pytest.mark.integration @pytest.mark.parametrize("format_version", [1, 2]) -def test_partitioned_table_delete_full_file(spark: SparkSession, session_catalog: RestCatalog, - format_version: int) -> None: +def test_partitioned_table_delete_full_file(spark: SparkSession, session_catalog: RestCatalog, format_version: int) -> None: identifier = "default.table_partitioned_delete" run_spark_commands( @@ -131,8 +129,7 @@ def test_partitioned_table_rewrite(spark: SparkSession, session_catalog: RestCat @pytest.mark.integration @pytest.mark.parametrize("format_version", [1, 2]) -def test_rewrite_partitioned_table_with_null(spark: SparkSession, session_catalog: RestCatalog, - format_version: int) -> None: +def test_rewrite_partitioned_table_with_null(spark: SparkSession, session_catalog: RestCatalog, format_version: int) -> None: identifier = "default.table_partitioned_delete" run_spark_commands( @@ -246,8 +243,7 @@ def test_delete_partitioned_table_positional_deletes(spark: SparkSession, sessio @pytest.mark.integration @pytest.mark.filterwarnings("ignore:Merge on read is not yet supported, falling back to copy-on-write") -def test_delete_partitioned_table_positional_deletes_empty_batch(spark: SparkSession, - session_catalog: RestCatalog) -> None: +def test_delete_partitioned_table_positional_deletes_empty_batch(spark: SparkSession, session_catalog: RestCatalog) -> None: identifier = "default.test_delete_partitioned_table_positional_deletes_empty_batch" run_spark_commands( @@ -418,8 +414,7 @@ def test_overwrite_partitioned_table(spark: SparkSession, session_catalog: RestC @pytest.mark.integration @pytest.mark.filterwarnings("ignore:Merge on read is not yet supported, falling back to copy-on-write") -def test_partitioned_table_positional_deletes_sequence_number(spark: SparkSession, - session_catalog: RestCatalog) -> None: +def test_partitioned_table_positional_deletes_sequence_number(spark: SparkSession, session_catalog: RestCatalog) -> None: identifier = "default.table_partitioned_delete_sequence_number" # This test case is a bit more complex. Here we run a MoR delete on a file, we make sure that @@ -772,8 +767,7 @@ def test_delete_after_partition_evolution_from_partitioned(session_catalog: Rest tbl = session_catalog.create_table( identifier, schema=Schema(NestedField(1, "idx", LongType()), NestedField(2, "ts", TimestampType())), - partition_spec=PartitionSpec( - PartitionField(source_id=2, field_id=1000, transform=IdentityTransform(), name="ts")), + partition_spec=PartitionSpec(PartitionField(source_id=2, field_id=1000, transform=IdentityTransform(), name="ts")), ) tbl.append(arrow_table)