Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
Fokko committed Jan 27, 2025
1 parent 0418008 commit 026e020
Showing 1 changed file with 17 additions and 9 deletions.
26 changes: 17 additions & 9 deletions tests/integration/test_deletes.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ def run_spark_commands(spark: SparkSession, sqls: List[str]) -> None:
@pytest.fixture()
def test_table(session_catalog: RestCatalog) -> Generator[Table, None, None]:
identifier = "default.__test_table"
arrow_table = pa.Table.from_arrays([pa.array([1, 2, 3, 4, 5]), pa.array(["a", "b", "c", "d", "e"])], names=["idx", "value"])
arrow_table = pa.Table.from_arrays([pa.array([1, 2, 3, 4, 5]), pa.array(["a", "b", "c", "d", "e"])],
names=["idx", "value"])
test_table = session_catalog.create_table(
identifier,
schema=Schema(
Expand All @@ -59,7 +60,8 @@ def test_table(session_catalog: RestCatalog) -> Generator[Table, None, None]:

@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_partitioned_table_delete_full_file(spark: SparkSession, session_catalog: RestCatalog, format_version: int) -> None:
def test_partitioned_table_delete_full_file(spark: SparkSession, session_catalog: RestCatalog,
format_version: int) -> None:
identifier = "default.table_partitioned_delete"

run_spark_commands(
Expand Down Expand Up @@ -129,7 +131,8 @@ def test_partitioned_table_rewrite(spark: SparkSession, session_catalog: RestCat

@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_rewrite_partitioned_table_with_null(spark: SparkSession, session_catalog: RestCatalog, format_version: int) -> None:
def test_rewrite_partitioned_table_with_null(spark: SparkSession, session_catalog: RestCatalog,
format_version: int) -> None:
identifier = "default.table_partitioned_delete"

run_spark_commands(
Expand Down Expand Up @@ -243,7 +246,8 @@ def test_delete_partitioned_table_positional_deletes(spark: SparkSession, sessio

@pytest.mark.integration
@pytest.mark.filterwarnings("ignore:Merge on read is not yet supported, falling back to copy-on-write")
def test_delete_partitioned_table_positional_deletes_empty_batch(spark: SparkSession, session_catalog: RestCatalog) -> None:
def test_delete_partitioned_table_positional_deletes_empty_batch(spark: SparkSession,
session_catalog: RestCatalog) -> None:
identifier = "default.test_delete_partitioned_table_positional_deletes_empty_batch"

run_spark_commands(
Expand Down Expand Up @@ -337,7 +341,7 @@ def test_read_multiple_batches_in_task_with_position_deletes(spark: SparkSession

arrow_table = pa.Table.from_arrays(
[
pa.array(list(range(1, 100)) * 10),
pa.array(list(range(1, 1001)) * 100),
],
schema=pa.schema([pa.field("number", pa.int32())]),
)
Expand All @@ -347,7 +351,9 @@ def test_read_multiple_batches_in_task_with_position_deletes(spark: SparkSession
run_spark_commands(
spark,
[
f"DELETE FROM {identifier} WHERE number in (1, 2, 3, 4)",
f"""
DELETE FROM {identifier} WHERE number in (1, 2, 3, 4)
""",
],
)

Expand All @@ -356,7 +362,7 @@ def test_read_multiple_batches_in_task_with_position_deletes(spark: SparkSession
reader = tbl.scan(row_filter="number <= 50").to_arrow_batch_reader()
assert isinstance(reader, pa.RecordBatchReader)
pyiceberg_count = len(reader.read_all())
expected_count = 46 * 10
expected_count = 46 * 100
assert pyiceberg_count == expected_count, f"Failing check. {pyiceberg_count} != {expected_count}"


Expand Down Expand Up @@ -412,7 +418,8 @@ def test_overwrite_partitioned_table(spark: SparkSession, session_catalog: RestC

@pytest.mark.integration
@pytest.mark.filterwarnings("ignore:Merge on read is not yet supported, falling back to copy-on-write")
def test_partitioned_table_positional_deletes_sequence_number(spark: SparkSession, session_catalog: RestCatalog) -> None:
def test_partitioned_table_positional_deletes_sequence_number(spark: SparkSession,
session_catalog: RestCatalog) -> None:
identifier = "default.table_partitioned_delete_sequence_number"

# This test case is a bit more complex. Here we run a MoR delete on a file, we make sure that
Expand Down Expand Up @@ -765,7 +772,8 @@ def test_delete_after_partition_evolution_from_partitioned(session_catalog: Rest
tbl = session_catalog.create_table(
identifier,
schema=Schema(NestedField(1, "idx", LongType()), NestedField(2, "ts", TimestampType())),
partition_spec=PartitionSpec(PartitionField(source_id=2, field_id=1000, transform=IdentityTransform(), name="ts")),
partition_spec=PartitionSpec(
PartitionField(source_id=2, field_id=1000, transform=IdentityTransform(), name="ts")),
)

tbl.append(arrow_table)
Expand Down

0 comments on commit 026e020

Please sign in to comment.