From f814ee162de27eed2c3bc9ef6d5d3624cf73afe8 Mon Sep 17 00:00:00 2001 From: Gabriel Igliozzi Date: Wed, 18 Dec 2024 12:01:10 -0500 Subject: [PATCH 01/10] Initial commit for fix --- pyiceberg/io/pyarrow.py | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 7956a83242..98f3faa6fd 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -138,7 +138,7 @@ ) from pyiceberg.table.metadata import TableMetadata from pyiceberg.table.name_mapping import NameMapping, apply_name_mapping -from pyiceberg.transforms import TruncateTransform +from pyiceberg.transforms import IdentityTransform, TruncateTransform from pyiceberg.typedef import EMPTY_DICT, Properties, Record from pyiceberg.types import ( BinaryType, @@ -1226,6 +1226,7 @@ def _task_to_record_batches( case_sensitive: bool, name_mapping: Optional[NameMapping] = None, use_large_types: bool = True, + partition_spec: PartitionSpec = None, ) -> Iterator[pa.RecordBatch]: _, _, path = _parse_location(task.file.file_path) arrow_format = ds.ParquetFileFormat(pre_buffer=True, buffer_size=(ONE_MEGABYTE * 8)) @@ -1248,6 +1249,17 @@ def _task_to_record_batches( if file_schema is None: raise ValueError(f"Missing Iceberg schema in Metadata for file: {path}") + # Apply column projection rules for missing partitions and default values (ref: https://iceberg.apache.org/spec/#column-projection) + projected_missing_fields = None + for field_id in projected_field_ids.difference(file_project_schema.field_ids): + for partition_field in partition_spec.fields_by_source_id(field_id): + if isinstance(partition_field.transform, IdentityTransform) and task.file.partition is not None: + projected_missing_fields = (partition_field.name, task.file.partition[0]) + + if nested_field := projected_schema.find_field(field_id) and projected_missing_fields is None and task.file.partition is None: + if nested_field.initial_default is not None: + projected_missing_fields(nested_field.name, nested_field) + fragment_scanner = ds.Scanner.from_fragment( fragment=fragment, # With PyArrow 16.0.0 there is an issue with casting record-batches: @@ -1286,7 +1298,7 @@ def _task_to_record_batches( continue output_batches = arrow_table.to_batches() for output_batch in output_batches: - yield _to_requested_schema( + result_batch = _to_requested_schema( projected_schema, file_project_schema, output_batch, @@ -1294,6 +1306,12 @@ def _task_to_record_batches( use_large_types=use_large_types, ) + if projected_missing_fields is not None: + name, value = projected_missing_fields + result_batch = result_batch.set_column(result_batch.schema.get_field_index(name), name, [value]) + + yield result_batch + def _task_to_table( fs: FileSystem, @@ -1517,6 +1535,7 @@ def _record_batches_from_scan_tasks_and_deletes( self._case_sensitive, self._table_metadata.name_mapping(), self._use_large_types, + self._table_metadata.spec(), ) for batch in batches: if self._limit is not None: From cf36660a0edd388b1aa924eb567858b8d3d6805a Mon Sep 17 00:00:00 2001 From: Gabriel Igliozzi Date: Wed, 18 Dec 2024 15:19:59 -0500 Subject: [PATCH 02/10] Add test and commit lint changes --- pyiceberg/io/pyarrow.py | 6 ++++- tests/io/test_pyarrow.py | 53 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 58 insertions(+), 1 deletion(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 98f3faa6fd..dcdfee18b3 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1256,7 +1256,11 @@ def _task_to_record_batches( if isinstance(partition_field.transform, IdentityTransform) and task.file.partition is not None: projected_missing_fields = (partition_field.name, task.file.partition[0]) - if nested_field := projected_schema.find_field(field_id) and projected_missing_fields is None and task.file.partition is None: + if ( + nested_field := projected_schema.find_field(field_id) + and projected_missing_fields is None + and task.file.partition is None + ): if nested_field.initial_default is not None: projected_missing_fields(nested_field.name, nested_field) diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index e4017e1df5..533141628c 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -77,6 +77,7 @@ from pyiceberg.schema import Schema, make_compatible_name, visit from pyiceberg.table import FileScanTask, TableProperties from pyiceberg.table.metadata import TableMetadataV2 +from pyiceberg.table.name_mapping import create_mapping_from_schema from pyiceberg.transforms import IdentityTransform from pyiceberg.typedef import UTF8, Properties, Record from pyiceberg.types import ( @@ -1122,6 +1123,58 @@ def test_projection_concat_files(schema_int: Schema, file_int: str) -> None: assert repr(result_table.schema) == "id: int32" +def test_projection_partition_inference(tmp_path: str, example_task: FileScanTask): + schema = Schema( + NestedField(1, "partition_field", IntegerType(), required=False), + NestedField(2, "other_field", StringType(), required=False), + ) + + partition_spec = PartitionSpec(PartitionField(1, 1000, IdentityTransform(), "partition_field")) + + table = TableMetadataV2( + location="file://a/b/c.json", + last_column_id=2, + format_version=2, + current_schema_id=0, + schemas=[schema], + partition_specs=[partition_spec], + properties={TableProperties.DEFAULT_NAME_MAPPING: create_mapping_from_schema(schema).model_dump_json()}, + ) + + pa_schema = pa.schema([pa.field("other_field", pa.string())]) + pa_table = pa.table({"other_field": ["x"]}, schema=pa_schema) + pq.write_table(pa_table, f"{tmp_path}/datafile.parquet") + + data_file = DataFile( + content=DataFileContent.DATA, + file_path=f"{tmp_path}/datafile.parquet", + file_format=FileFormat.PARQUET, + partition=Record(partition_id=123456), + file_size_in_bytes=os.path.getsize(f"{tmp_path}/datafile.parquet"), + sort_order_id=None, + spec_id=0, + equality_ids=None, + key_metadata=None, + ) + + table_result_scan = ArrowScan( + table_metadata=table, + io=load_file_io(), + projected_schema=schema, + row_filter=AlwaysTrue(), + ).to_table(tasks=[FileScanTask(data_file=data_file)]) + + assert ( + str(table_result_scan) + == """pyarrow.Table +partition_field: int64 +other_field: large_string +---- +partition_field: [[123456]] +other_field: [["x"]]""" + ) + + def test_projection_filter(schema_int: Schema, file_int: str) -> None: result_table = project(schema_int, [file_int], GreaterThan("id", 4)) assert len(result_table.columns[0]) == 0 From 79824651a126801ef5bef420a66d5fc0331b39ac Mon Sep 17 00:00:00 2001 From: Gabriel Igliozzi Date: Thu, 19 Dec 2024 00:18:46 -0500 Subject: [PATCH 03/10] default-value bug fixes and adding more tests --- pyiceberg/io/pyarrow.py | 19 ++++++-------- tests/io/test_pyarrow.py | 54 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 61 insertions(+), 12 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index dcdfee18b3..acff6e4624 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1249,20 +1249,17 @@ def _task_to_record_batches( if file_schema is None: raise ValueError(f"Missing Iceberg schema in Metadata for file: {path}") - # Apply column projection rules for missing partitions and default values (ref: https://iceberg.apache.org/spec/#column-projection) - projected_missing_fields = None + # Apply column projection rules for missing partitions and default values + # https://iceberg.apache.org/spec/#column-projection + projected_missing_fields = {} for field_id in projected_field_ids.difference(file_project_schema.field_ids): for partition_field in partition_spec.fields_by_source_id(field_id): if isinstance(partition_field.transform, IdentityTransform) and task.file.partition is not None: - projected_missing_fields = (partition_field.name, task.file.partition[0]) + projected_missing_fields[partition_field.name] = task.file.partition[0] - if ( - nested_field := projected_schema.find_field(field_id) - and projected_missing_fields is None - and task.file.partition is None - ): + if nested_field := projected_schema.find_field(field_id): if nested_field.initial_default is not None: - projected_missing_fields(nested_field.name, nested_field) + projected_missing_fields[nested_field.name] = nested_field.initial_default fragment_scanner = ds.Scanner.from_fragment( fragment=fragment, @@ -1310,8 +1307,8 @@ def _task_to_record_batches( use_large_types=use_large_types, ) - if projected_missing_fields is not None: - name, value = projected_missing_fields + # Inject projected column values if available + for name, value in projected_missing_fields.items(): result_batch = result_batch.set_column(result_batch.schema.get_field_index(name), name, [value]) yield result_batch diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 533141628c..822e188f4a 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -1123,7 +1123,7 @@ def test_projection_concat_files(schema_int: Schema, file_int: str) -> None: assert repr(result_table.schema) == "id: int32" -def test_projection_partition_inference(tmp_path: str, example_task: FileScanTask): +def test_projection_partition_inference(tmp_path: str): schema = Schema( NestedField(1, "partition_field", IntegerType(), required=False), NestedField(2, "other_field", StringType(), required=False), @@ -1175,6 +1175,58 @@ def test_projection_partition_inference(tmp_path: str, example_task: FileScanTas ) +def test_projection_initial_default_inference(tmp_path: str) -> None: + schema = Schema( + NestedField(1, "other_field", StringType(), required=False), + NestedField(2, "other_field_1", StringType(), required=False, initial_default="foo"), + ) + + table = TableMetadataV2( + location="file://a/b/c.json", + last_column_id=2, + format_version=2, + current_schema_id=0, + schemas=[schema], + partition_specs=[PartitionSpec()], + properties={TableProperties.DEFAULT_NAME_MAPPING: create_mapping_from_schema(schema).model_dump_json()}, + ) + + pa_schema = pa.schema([pa.field("other_field", pa.string())]) + pa_table = pa.table({"other_field": ["x"]}, schema=pa_schema) + pq.write_table(pa_table, f"{tmp_path}/datafile.parquet") + + data_file = DataFile( + content=DataFileContent.DATA, + file_path=f"{tmp_path}/datafile.parquet", + file_format=FileFormat.PARQUET, + partition=Record(), + file_size_in_bytes=os.path.getsize(f"{tmp_path}/datafile.parquet"), + sort_order_id=None, + spec_id=0, + equality_ids=None, + key_metadata=None, + ) + + table_result_scan = ArrowScan( + table_metadata=table, + io=load_file_io(), + projected_schema=schema, + row_filter=AlwaysTrue(), + ).to_table(tasks=[FileScanTask(data_file=data_file)]) + + print(str(table_result_scan)) + + assert ( + str(table_result_scan) + == """pyarrow.Table +other_field: large_string +other_field_1: string +---- +other_field: [["x"]] +other_field_1: [["foo"]]""" + ) + + def test_projection_filter(schema_int: Schema, file_int: str) -> None: result_table = project(schema_int, [file_int], GreaterThan("id", 4)) assert len(result_table.columns[0]) == 0 From e4d58826f4c42a82c0dba380d1c3e771c14ad900 Mon Sep 17 00:00:00 2001 From: Gabriel Igliozzi Date: Thu, 19 Dec 2024 00:41:21 -0500 Subject: [PATCH 04/10] Add continue, check file_schema before using it, group steps of projection together --- pyiceberg/io/pyarrow.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index acff6e4624..c052403d55 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1238,24 +1238,27 @@ def _task_to_record_batches( # When V3 support is introduced, we will update `downcast_ns_timestamp_to_us` flag based on # the table format version. file_schema = pyarrow_to_schema(physical_schema, name_mapping, downcast_ns_timestamp_to_us=True) + + if file_schema is None: + raise ValueError(f"Missing Iceberg schema in Metadata for file: {path}") + pyarrow_filter = None if bound_row_filter is not AlwaysTrue(): translated_row_filter = translate_column_names(bound_row_filter, file_schema, case_sensitive=case_sensitive) bound_file_filter = bind(file_schema, translated_row_filter, case_sensitive=case_sensitive) pyarrow_filter = expression_to_pyarrow(bound_file_filter) - file_project_schema = prune_columns(file_schema, projected_field_ids, select_full_types=False) - - if file_schema is None: - raise ValueError(f"Missing Iceberg schema in Metadata for file: {path}") - # Apply column projection rules for missing partitions and default values # https://iceberg.apache.org/spec/#column-projection + + file_project_schema = prune_columns(file_schema, projected_field_ids, select_full_types=False) projected_missing_fields = {} + for field_id in projected_field_ids.difference(file_project_schema.field_ids): for partition_field in partition_spec.fields_by_source_id(field_id): if isinstance(partition_field.transform, IdentityTransform) and task.file.partition is not None: projected_missing_fields[partition_field.name] = task.file.partition[0] + continue if nested_field := projected_schema.find_field(field_id): if nested_field.initial_default is not None: From 694a52d6f3b7cd5353c50a8cc0ed5fb6ab25ffb5 Mon Sep 17 00:00:00 2001 From: Gabriel Igliozzi Date: Thu, 19 Dec 2024 10:10:10 -0500 Subject: [PATCH 05/10] Fix lint issues, reorder partition spec to be of higher importance than initial-default --- pyiceberg/io/pyarrow.py | 12 +++++------- tests/io/test_pyarrow.py | 2 +- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index c052403d55..0bd90ec216 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1226,7 +1226,7 @@ def _task_to_record_batches( case_sensitive: bool, name_mapping: Optional[NameMapping] = None, use_large_types: bool = True, - partition_spec: PartitionSpec = None, + partition_spec: Optional[PartitionSpec] = None, ) -> Iterator[pa.RecordBatch]: _, _, path = _parse_location(task.file.file_path) arrow_format = ds.ParquetFileFormat(pre_buffer=True, buffer_size=(ONE_MEGABYTE * 8)) @@ -1250,19 +1250,17 @@ def _task_to_record_batches( # Apply column projection rules for missing partitions and default values # https://iceberg.apache.org/spec/#column-projection - file_project_schema = prune_columns(file_schema, projected_field_ids, select_full_types=False) projected_missing_fields = {} for field_id in projected_field_ids.difference(file_project_schema.field_ids): - for partition_field in partition_spec.fields_by_source_id(field_id): - if isinstance(partition_field.transform, IdentityTransform) and task.file.partition is not None: - projected_missing_fields[partition_field.name] = task.file.partition[0] - continue - if nested_field := projected_schema.find_field(field_id): if nested_field.initial_default is not None: projected_missing_fields[nested_field.name] = nested_field.initial_default + if partition_spec is not None: + for partition_field in partition_spec.fields_by_source_id(field_id): + if isinstance(partition_field.transform, IdentityTransform) and task.file.partition is not None: + projected_missing_fields[partition_field.name] = task.file.partition[0] fragment_scanner = ds.Scanner.from_fragment( fragment=fragment, diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 822e188f4a..b0455280c2 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -1123,7 +1123,7 @@ def test_projection_concat_files(schema_int: Schema, file_int: str) -> None: assert repr(result_table.schema) == "id: int32" -def test_projection_partition_inference(tmp_path: str): +def test_projection_partition_inference(tmp_path: str) -> None: schema = Schema( NestedField(1, "partition_field", IntegerType(), required=False), NestedField(2, "other_field", StringType(), required=False), From fee24ab4be7de1671bfead19a5fe9cd4202f496c Mon Sep 17 00:00:00 2001 From: Gabriel Igliozzi Date: Thu, 26 Dec 2024 12:31:37 -0500 Subject: [PATCH 06/10] Removed file_schema check and initial-default logic, separated projection logic to helper method, changed test to use high-level table scan --- pyiceberg/io/pyarrow.py | 36 +++++++----- tests/io/test_pyarrow.py | 115 ++++++++++++--------------------------- 2 files changed, 59 insertions(+), 92 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 0bd90ec216..a3a821d197 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1216,6 +1216,25 @@ def _field_id(self, field: pa.Field) -> int: return -1 +def _get_column_projection_values( + file: DataFile, + projected_schema: Schema, + projected_field_ids: Set[int], + file_project_schema: Schema, + partition_spec: Optional[PartitionSpec] = None, +) -> Dict[str, object]: + """Apply Column Projection rules to File Schema.""" + projected_missing_fields = {} + + for field_id in projected_field_ids.difference(file_project_schema.field_ids): + if partition_spec is not None: + for partition_field in partition_spec.fields_by_source_id(field_id): + if isinstance(partition_field.transform, IdentityTransform) and partition_field.name in file.partition.__dict__: + projected_missing_fields[partition_field.name] = file.partition.__dict__[partition_field.name] + + return projected_missing_fields + + def _task_to_record_batches( fs: FileSystem, task: FileScanTask, @@ -1239,9 +1258,6 @@ def _task_to_record_batches( # the table format version. file_schema = pyarrow_to_schema(physical_schema, name_mapping, downcast_ns_timestamp_to_us=True) - if file_schema is None: - raise ValueError(f"Missing Iceberg schema in Metadata for file: {path}") - pyarrow_filter = None if bound_row_filter is not AlwaysTrue(): translated_row_filter = translate_column_names(bound_row_filter, file_schema, case_sensitive=case_sensitive) @@ -1251,16 +1267,10 @@ def _task_to_record_batches( # Apply column projection rules for missing partitions and default values # https://iceberg.apache.org/spec/#column-projection file_project_schema = prune_columns(file_schema, projected_field_ids, select_full_types=False) - projected_missing_fields = {} - - for field_id in projected_field_ids.difference(file_project_schema.field_ids): - if nested_field := projected_schema.find_field(field_id): - if nested_field.initial_default is not None: - projected_missing_fields[nested_field.name] = nested_field.initial_default - if partition_spec is not None: - for partition_field in partition_spec.fields_by_source_id(field_id): - if isinstance(partition_field.transform, IdentityTransform) and task.file.partition is not None: - projected_missing_fields[partition_field.name] = task.file.partition[0] + + projected_missing_fields = _get_column_projection_values( + task.file, projected_schema, projected_field_ids, file_project_schema, partition_spec + ) fragment_scanner = ds.Scanner.from_fragment( fragment=fragment, diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index b0455280c2..277fa050cb 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -69,7 +69,10 @@ _read_deletes, _to_requested_schema, bin_pack_arrow_table, + compute_statistics_plan, + data_file_statistics_from_parquet_metadata, expression_to_pyarrow, + parquet_path_to_id_mapping, schema_to_pyarrow, ) from pyiceberg.manifest import DataFile, DataFileContent, FileFormat @@ -100,6 +103,7 @@ TimestamptzType, TimeType, ) +from tests.catalog.test_base import InMemoryCatalog from tests.conftest import UNIFIED_AWS_SESSION_PROPERTIES @@ -1123,110 +1127,63 @@ def test_projection_concat_files(schema_int: Schema, file_int: str) -> None: assert repr(result_table.schema) == "id: int32" -def test_projection_partition_inference(tmp_path: str) -> None: +def test_projection_partition_inference(tmp_path: str, catalog: InMemoryCatalog) -> None: schema = Schema( - NestedField(1, "partition_field", IntegerType(), required=False), - NestedField(2, "other_field", StringType(), required=False), + NestedField(1, "other_field", StringType(), required=False), NestedField(2, "partition_id", IntegerType(), required=False) ) - partition_spec = PartitionSpec(PartitionField(1, 1000, IdentityTransform(), "partition_field")) + partition_spec = PartitionSpec(PartitionField(2, 1000, IdentityTransform(), "partition_id")) - table = TableMetadataV2( - location="file://a/b/c.json", - last_column_id=2, - format_version=2, - current_schema_id=0, - schemas=[schema], - partition_specs=[partition_spec], + table = catalog.create_table( + "default.test_projection_partition_inference", + schema=schema, + partition_spec=partition_spec, properties={TableProperties.DEFAULT_NAME_MAPPING: create_mapping_from_schema(schema).model_dump_json()}, ) - pa_schema = pa.schema([pa.field("other_field", pa.string())]) - pa_table = pa.table({"other_field": ["x"]}, schema=pa_schema) - pq.write_table(pa_table, f"{tmp_path}/datafile.parquet") + file_data = pa.array(["foo"], type=pa.string()) + file_loc = f"{tmp_path}/test.parquet" + pq.write_table(pa.table([file_data], names=["other_field"]), file_loc) - data_file = DataFile( - content=DataFileContent.DATA, - file_path=f"{tmp_path}/datafile.parquet", - file_format=FileFormat.PARQUET, - partition=Record(partition_id=123456), - file_size_in_bytes=os.path.getsize(f"{tmp_path}/datafile.parquet"), - sort_order_id=None, - spec_id=0, - equality_ids=None, - key_metadata=None, - ) - - table_result_scan = ArrowScan( - table_metadata=table, - io=load_file_io(), - projected_schema=schema, - row_filter=AlwaysTrue(), - ).to_table(tasks=[FileScanTask(data_file=data_file)]) - - assert ( - str(table_result_scan) - == """pyarrow.Table -partition_field: int64 -other_field: large_string ----- -partition_field: [[123456]] -other_field: [["x"]]""" + statistics = data_file_statistics_from_parquet_metadata( + parquet_metadata=pq.read_metadata(file_loc), + stats_columns=compute_statistics_plan(table.schema(), table.metadata.properties), + parquet_column_mapping=parquet_path_to_id_mapping(table.schema()), ) - -def test_projection_initial_default_inference(tmp_path: str) -> None: - schema = Schema( - NestedField(1, "other_field", StringType(), required=False), - NestedField(2, "other_field_1", StringType(), required=False, initial_default="foo"), - ) - - table = TableMetadataV2( - location="file://a/b/c.json", - last_column_id=2, - format_version=2, - current_schema_id=0, - schemas=[schema], - partition_specs=[PartitionSpec()], - properties={TableProperties.DEFAULT_NAME_MAPPING: create_mapping_from_schema(schema).model_dump_json()}, - ) - - pa_schema = pa.schema([pa.field("other_field", pa.string())]) - pa_table = pa.table({"other_field": ["x"]}, schema=pa_schema) - pq.write_table(pa_table, f"{tmp_path}/datafile.parquet") - - data_file = DataFile( + unpartitioned_file = DataFile( content=DataFileContent.DATA, - file_path=f"{tmp_path}/datafile.parquet", + file_path=file_loc, file_format=FileFormat.PARQUET, - partition=Record(), - file_size_in_bytes=os.path.getsize(f"{tmp_path}/datafile.parquet"), + partition=Record(partition_id=1), + file_size_in_bytes=os.path.getsize(file_loc), sort_order_id=None, - spec_id=0, + spec_id=table.metadata.default_spec_id, equality_ids=None, key_metadata=None, + **statistics.to_serialized_dict(), ) - table_result_scan = ArrowScan( - table_metadata=table, - io=load_file_io(), - projected_schema=schema, - row_filter=AlwaysTrue(), - ).to_table(tasks=[FileScanTask(data_file=data_file)]) - - print(str(table_result_scan)) + with table.transaction() as transaction: + with transaction.update_snapshot().overwrite() as update: + update.append_data_file(unpartitioned_file) assert ( - str(table_result_scan) + str(table.scan().to_arrow()) == """pyarrow.Table other_field: large_string -other_field_1: string +partition_id: int64 ---- -other_field: [["x"]] -other_field_1: [["foo"]]""" +other_field: [["foo"]] +partition_id: [[1]]""" ) +@pytest.fixture +def catalog() -> InMemoryCatalog: + return InMemoryCatalog("test.in_memory.catalog", **{"test.key": "test.value"}) + + def test_projection_filter(schema_int: Schema, file_int: str) -> None: result_table = project(schema_int, [file_int], GreaterThan("id", 4)) assert len(result_table.columns[0]) == 0 From 8362803322a031605a019fee83ef15b5d619c67b Mon Sep 17 00:00:00 2001 From: Gabriel Igliozzi Date: Fri, 17 Jan 2025 14:58:25 -0800 Subject: [PATCH 07/10] Add should_project check, add lookup by accessor, multiple-partition test --- pyiceberg/io/pyarrow.py | 43 ++++++++++++++++-------- tests/io/test_pyarrow.py | 72 +++++++++++++++++++++++++++++++++++++--- 2 files changed, 97 insertions(+), 18 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index a3a821d197..117b40cfcc 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -129,6 +129,7 @@ SchemaVisitorPerPrimitiveType, SchemaWithPartnerVisitor, _check_schema_compatible, + build_position_accessors, pre_order_visit, promote, prune_columns, @@ -1219,18 +1220,25 @@ def _field_id(self, field: pa.Field) -> int: def _get_column_projection_values( file: DataFile, projected_schema: Schema, - projected_field_ids: Set[int], - file_project_schema: Schema, - partition_spec: Optional[PartitionSpec] = None, + project_schema_diff: Set[int], + partition_spec: PartitionSpec, ) -> Dict[str, object]: """Apply Column Projection rules to File Schema.""" - projected_missing_fields = {} + projected_missing_fields: Dict[str, Any] = {} + + partition_schema = partition_spec.partition_type(projected_schema) + accessors = build_position_accessors(partition_schema) + + for field_id in project_schema_diff: + for partition_field in partition_spec.fields_by_source_id(field_id): + if isinstance(partition_field.transform, IdentityTransform): + accesor = accessors.get(partition_field.field_id) - for field_id in projected_field_ids.difference(file_project_schema.field_ids): - if partition_spec is not None: - for partition_field in partition_spec.fields_by_source_id(field_id): - if isinstance(partition_field.transform, IdentityTransform) and partition_field.name in file.partition.__dict__: - projected_missing_fields[partition_field.name] = file.partition.__dict__[partition_field.name] + if accesor is None: + continue + + if partition_value := accesor.get(file.partition): + projected_missing_fields[partition_field.name] = partition_value return projected_missing_fields @@ -1268,9 +1276,15 @@ def _task_to_record_batches( # https://iceberg.apache.org/spec/#column-projection file_project_schema = prune_columns(file_schema, projected_field_ids, select_full_types=False) - projected_missing_fields = _get_column_projection_values( - task.file, projected_schema, projected_field_ids, file_project_schema, partition_spec - ) + project_schema_diff = projected_field_ids.difference(file_project_schema.field_ids) + should_project_columns = len(project_schema_diff) > 0 + + projected_missing_fields = {} + + if should_project_columns and partition_spec is not None: + projected_missing_fields = _get_column_projection_values( + task.file, projected_schema, project_schema_diff, partition_spec + ) fragment_scanner = ds.Scanner.from_fragment( fragment=fragment, @@ -1319,8 +1333,9 @@ def _task_to_record_batches( ) # Inject projected column values if available - for name, value in projected_missing_fields.items(): - result_batch = result_batch.set_column(result_batch.schema.get_field_index(name), name, [value]) + if should_project_columns: + for name, value in projected_missing_fields.items(): + result_batch = result_batch.set_column(result_batch.schema.get_field_index(name), name, [value]) yield result_batch diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 277fa050cb..00c5607c67 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -81,7 +81,7 @@ from pyiceberg.table import FileScanTask, TableProperties from pyiceberg.table.metadata import TableMetadataV2 from pyiceberg.table.name_mapping import create_mapping_from_schema -from pyiceberg.transforms import IdentityTransform +from pyiceberg.transforms import IdentityTransform, VoidTransform from pyiceberg.typedef import UTF8, Properties, Record from pyiceberg.types import ( BinaryType, @@ -1127,15 +1127,21 @@ def test_projection_concat_files(schema_int: Schema, file_int: str) -> None: assert repr(result_table.schema) == "id: int32" -def test_projection_partition_inference(tmp_path: str, catalog: InMemoryCatalog) -> None: +def test_projection_single_partition_value(tmp_path: str, catalog: InMemoryCatalog) -> None: + # Test by adding a non-partitioned data file to a partitioned table, verifying partition value projection from manifest metadata. + # TODO: Update to use a data file created by writing data to an unpartitioned table once add_files supports field IDs. + # (context: https://github.com/apache/iceberg-python/pull/1443#discussion_r1901374875) + schema = Schema( NestedField(1, "other_field", StringType(), required=False), NestedField(2, "partition_id", IntegerType(), required=False) ) - partition_spec = PartitionSpec(PartitionField(2, 1000, IdentityTransform(), "partition_id")) + partition_spec = PartitionSpec( + PartitionField(2, 1000, IdentityTransform(), "partition_id"), + ) table = catalog.create_table( - "default.test_projection_partition_inference", + "default.test_projection_partition", schema=schema, partition_spec=partition_spec, properties={TableProperties.DEFAULT_NAME_MAPPING: create_mapping_from_schema(schema).model_dump_json()}, @@ -1179,6 +1185,64 @@ def test_projection_partition_inference(tmp_path: str, catalog: InMemoryCatalog) ) +def test_projection_multiple_partition_values(tmp_path: str, catalog: InMemoryCatalog) -> None: + # Test by adding a non-partitioned data file to a multi-partitioned table, verifying partition value projection from manifest metadata. + # TODO: Update to use a data file created by writing data to an unpartitioned table once add_files supports field IDs. + # (context: https://github.com/apache/iceberg-python/pull/1443#discussion_r1901374875) + schema = Schema( + NestedField(1, "other_field", StringType(), required=False), NestedField(2, "partition_id", IntegerType(), required=False) + ) + + partition_spec = PartitionSpec( + PartitionField(2, 1000, VoidTransform(), "void_partition_id"), + PartitionField(2, 1001, IdentityTransform(), "partition_id"), + ) + + table = catalog.create_table( + "default.test_projection_partitions", + schema=schema, + partition_spec=partition_spec, + properties={TableProperties.DEFAULT_NAME_MAPPING: create_mapping_from_schema(schema).model_dump_json()}, + ) + + file_data = pa.array(["foo"], type=pa.string()) + file_loc = f"{tmp_path}/test.parquet" + pq.write_table(pa.table([file_data], names=["other_field"]), file_loc) + + statistics = data_file_statistics_from_parquet_metadata( + parquet_metadata=pq.read_metadata(file_loc), + stats_columns=compute_statistics_plan(table.schema(), table.metadata.properties), + parquet_column_mapping=parquet_path_to_id_mapping(table.schema()), + ) + + unpartitioned_file = DataFile( + content=DataFileContent.DATA, + file_path=file_loc, + file_format=FileFormat.PARQUET, + partition=Record(void_partition_id=None, partition_id=1), + file_size_in_bytes=os.path.getsize(file_loc), + sort_order_id=None, + spec_id=table.metadata.default_spec_id, + equality_ids=None, + key_metadata=None, + **statistics.to_serialized_dict(), + ) + + with table.transaction() as transaction: + with transaction.update_snapshot().overwrite() as update: + update.append_data_file(unpartitioned_file) + + assert ( + str(table.scan().to_arrow()) + == """pyarrow.Table +other_field: large_string +partition_id: int64 +---- +other_field: [["foo"]] +partition_id: [[1]]""" + ) + + @pytest.fixture def catalog() -> InMemoryCatalog: return InMemoryCatalog("test.in_memory.catalog", **{"test.key": "test.value"}) From 4333dc0e89f74b3ea459832012210e7da9ee483b Mon Sep 17 00:00:00 2001 From: Gabriel Igliozzi Date: Sat, 25 Jan 2025 16:45:02 -0700 Subject: [PATCH 08/10] Check for name before injecting, fix lint issues --- pyiceberg/io/pyarrow.py | 44 ++++++++++++++++++++++------------------ tests/io/test_pyarrow.py | 12 ++++++----- 2 files changed, 31 insertions(+), 25 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 117b40cfcc..930aeba678 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -123,6 +123,7 @@ ) from pyiceberg.partitioning import PartitionField, PartitionFieldValue, PartitionKey, PartitionSpec, partition_record_value from pyiceberg.schema import ( + Accessor, PartnerAccessor, PreOrderSchemaVisitor, Schema, @@ -1218,16 +1219,24 @@ def _field_id(self, field: pa.Field) -> int: def _get_column_projection_values( - file: DataFile, - projected_schema: Schema, - project_schema_diff: Set[int], - partition_spec: PartitionSpec, -) -> Dict[str, object]: + file: DataFile, projected_schema: Schema, partition_spec: Optional[PartitionSpec], file_project_field_ids: Set[int] +) -> Tuple[bool, Dict[str, Any]]: """Apply Column Projection rules to File Schema.""" + project_schema_diff = projected_schema.field_ids.difference(file_project_field_ids) + should_project_columns = len(project_schema_diff) > 0 projected_missing_fields: Dict[str, Any] = {} - partition_schema = partition_spec.partition_type(projected_schema) - accessors = build_position_accessors(partition_schema) + if not should_project_columns: + return False, {} + + partition_schema: StructType + accessors: Dict[int, Accessor] + + if partition_spec is not None: + partition_schema = partition_spec.partition_type(projected_schema) + accessors = build_position_accessors(partition_schema) + else: + return False, {} for field_id in project_schema_diff: for partition_field in partition_spec.fields_by_source_id(field_id): @@ -1240,7 +1249,7 @@ def _get_column_projection_values( if partition_value := accesor.get(file.partition): projected_missing_fields[partition_field.name] = partition_value - return projected_missing_fields + return True, projected_missing_fields def _task_to_record_batches( @@ -1272,19 +1281,12 @@ def _task_to_record_batches( bound_file_filter = bind(file_schema, translated_row_filter, case_sensitive=case_sensitive) pyarrow_filter = expression_to_pyarrow(bound_file_filter) - # Apply column projection rules for missing partitions and default values + # Apply column projection rules # https://iceberg.apache.org/spec/#column-projection file_project_schema = prune_columns(file_schema, projected_field_ids, select_full_types=False) - - project_schema_diff = projected_field_ids.difference(file_project_schema.field_ids) - should_project_columns = len(project_schema_diff) > 0 - - projected_missing_fields = {} - - if should_project_columns and partition_spec is not None: - projected_missing_fields = _get_column_projection_values( - task.file, projected_schema, project_schema_diff, partition_spec - ) + should_project_columns, projected_missing_fields = _get_column_projection_values( + task.file, projected_schema, partition_spec, file_project_schema.field_ids + ) fragment_scanner = ds.Scanner.from_fragment( fragment=fragment, @@ -1335,7 +1337,9 @@ def _task_to_record_batches( # Inject projected column values if available if should_project_columns: for name, value in projected_missing_fields.items(): - result_batch = result_batch.set_column(result_batch.schema.get_field_index(name), name, [value]) + index = result_batch.schema.get_field_index(name) + if index != -1: + result_batch = result_batch.set_column(index, name, [value]) yield result_batch diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 00c5607c67..ec149c0d6b 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -81,7 +81,7 @@ from pyiceberg.table import FileScanTask, TableProperties from pyiceberg.table.metadata import TableMetadataV2 from pyiceberg.table.name_mapping import create_mapping_from_schema -from pyiceberg.transforms import IdentityTransform, VoidTransform +from pyiceberg.transforms import IdentityTransform from pyiceberg.typedef import UTF8, Properties, Record from pyiceberg.types import ( BinaryType, @@ -1127,7 +1127,7 @@ def test_projection_concat_files(schema_int: Schema, file_int: str) -> None: assert repr(result_table.schema) == "id: int32" -def test_projection_single_partition_value(tmp_path: str, catalog: InMemoryCatalog) -> None: +def test_identity_transform_column_projection(tmp_path: str, catalog: InMemoryCatalog) -> None: # Test by adding a non-partitioned data file to a partitioned table, verifying partition value projection from manifest metadata. # TODO: Update to use a data file created by writing data to an unpartitioned table once add_files supports field IDs. # (context: https://github.com/apache/iceberg-python/pull/1443#discussion_r1901374875) @@ -1161,6 +1161,7 @@ def test_projection_single_partition_value(tmp_path: str, catalog: InMemoryCatal content=DataFileContent.DATA, file_path=file_loc, file_format=FileFormat.PARQUET, + # projected value partition=Record(partition_id=1), file_size_in_bytes=os.path.getsize(file_loc), sort_order_id=None, @@ -1185,7 +1186,7 @@ def test_projection_single_partition_value(tmp_path: str, catalog: InMemoryCatal ) -def test_projection_multiple_partition_values(tmp_path: str, catalog: InMemoryCatalog) -> None: +def test_identity_transform_columns_projection(tmp_path: str, catalog: InMemoryCatalog) -> None: # Test by adding a non-partitioned data file to a multi-partitioned table, verifying partition value projection from manifest metadata. # TODO: Update to use a data file created by writing data to an unpartitioned table once add_files supports field IDs. # (context: https://github.com/apache/iceberg-python/pull/1443#discussion_r1901374875) @@ -1194,7 +1195,7 @@ def test_projection_multiple_partition_values(tmp_path: str, catalog: InMemoryCa ) partition_spec = PartitionSpec( - PartitionField(2, 1000, VoidTransform(), "void_partition_id"), + PartitionField(2, 1000, IdentityTransform(), "void_partition_id"), PartitionField(2, 1001, IdentityTransform(), "partition_id"), ) @@ -1219,7 +1220,8 @@ def test_projection_multiple_partition_values(tmp_path: str, catalog: InMemoryCa content=DataFileContent.DATA, file_path=file_loc, file_format=FileFormat.PARQUET, - partition=Record(void_partition_id=None, partition_id=1), + # projected value + partition=Record(void_partition_id=12, partition_id=1), file_size_in_bytes=os.path.getsize(file_loc), sort_order_id=None, spec_id=table.metadata.default_spec_id, From 92b5f7e82a3242a3b4b2e7e3e6006e6833a9dd45 Mon Sep 17 00:00:00 2001 From: Gabriel Igliozzi Date: Mon, 3 Feb 2025 20:22:25 -0800 Subject: [PATCH 09/10] Address CI runtime error, add more partitions to tests --- pyiceberg/io/pyarrow.py | 14 ++++++++++---- tests/io/test_pyarrow.py | 22 +++++++++++++--------- 2 files changed, 23 insertions(+), 13 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 930aeba678..b5cca12d68 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1246,8 +1246,13 @@ def _get_column_projection_values( if accesor is None: continue - if partition_value := accesor.get(file.partition): - projected_missing_fields[partition_field.name] = partition_value + # The partition field may not exist in the partition record of the data file. + # This can happen when new partition fields are introduced after the file was written. + try: + if partition_value := accesor.get(file.partition): + projected_missing_fields[partition_field.name] = partition_value + except IndexError: + continue return True, projected_missing_fields @@ -1283,11 +1288,12 @@ def _task_to_record_batches( # Apply column projection rules # https://iceberg.apache.org/spec/#column-projection - file_project_schema = prune_columns(file_schema, projected_field_ids, select_full_types=False) should_project_columns, projected_missing_fields = _get_column_projection_values( - task.file, projected_schema, partition_spec, file_project_schema.field_ids + task.file, projected_schema, partition_spec, file_schema.field_ids ) + file_project_schema = prune_columns(file_schema, projected_field_ids, select_full_types=False) + fragment_scanner = ds.Scanner.from_fragment( fragment=fragment, # With PyArrow 16.0.0 there is an issue with casting record-batches: diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index ec149c0d6b..082fe0128f 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -1191,12 +1191,14 @@ def test_identity_transform_columns_projection(tmp_path: str, catalog: InMemoryC # TODO: Update to use a data file created by writing data to an unpartitioned table once add_files supports field IDs. # (context: https://github.com/apache/iceberg-python/pull/1443#discussion_r1901374875) schema = Schema( - NestedField(1, "other_field", StringType(), required=False), NestedField(2, "partition_id", IntegerType(), required=False) + NestedField(1, "field_1", StringType(), required=False), + NestedField(2, "field_2", IntegerType(), required=False), + NestedField(3, "field_3", IntegerType(), required=False), ) partition_spec = PartitionSpec( - PartitionField(2, 1000, IdentityTransform(), "void_partition_id"), - PartitionField(2, 1001, IdentityTransform(), "partition_id"), + PartitionField(2, 1000, IdentityTransform(), "field_2"), + PartitionField(3, 1001, IdentityTransform(), "field_3"), ) table = catalog.create_table( @@ -1208,7 +1210,7 @@ def test_identity_transform_columns_projection(tmp_path: str, catalog: InMemoryC file_data = pa.array(["foo"], type=pa.string()) file_loc = f"{tmp_path}/test.parquet" - pq.write_table(pa.table([file_data], names=["other_field"]), file_loc) + pq.write_table(pa.table([file_data], names=["field_1"]), file_loc) statistics = data_file_statistics_from_parquet_metadata( parquet_metadata=pq.read_metadata(file_loc), @@ -1221,7 +1223,7 @@ def test_identity_transform_columns_projection(tmp_path: str, catalog: InMemoryC file_path=file_loc, file_format=FileFormat.PARQUET, # projected value - partition=Record(void_partition_id=12, partition_id=1), + partition=Record(field_2=2, field_3=3), file_size_in_bytes=os.path.getsize(file_loc), sort_order_id=None, spec_id=table.metadata.default_spec_id, @@ -1237,11 +1239,13 @@ def test_identity_transform_columns_projection(tmp_path: str, catalog: InMemoryC assert ( str(table.scan().to_arrow()) == """pyarrow.Table -other_field: large_string -partition_id: int64 +field_1: large_string +field_2: int64 +field_3: int64 ---- -other_field: [["foo"]] -partition_id: [[1]]""" +field_1: [["foo"]] +field_2: [[2]] +field_3: [[3]]""" ) From 9884f4172aa71fa63e34d53d5604a04d6d5e985b Mon Sep 17 00:00:00 2001 From: Gabriel Igliozzi Date: Tue, 4 Feb 2025 10:52:54 -0800 Subject: [PATCH 10/10] Fix linter issues --- pyiceberg/io/pyarrow.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index b5cca12d68..1eb919e190 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1241,15 +1241,15 @@ def _get_column_projection_values( for field_id in project_schema_diff: for partition_field in partition_spec.fields_by_source_id(field_id): if isinstance(partition_field.transform, IdentityTransform): - accesor = accessors.get(partition_field.field_id) + accessor = accessors.get(partition_field.field_id) - if accesor is None: + if accessor is None: continue # The partition field may not exist in the partition record of the data file. # This can happen when new partition fields are introduced after the file was written. try: - if partition_value := accesor.get(file.partition): + if partition_value := accessor.get(file.partition): projected_missing_fields[partition_field.name] = partition_value except IndexError: continue