diff --git a/pyiceberg/avro/writer.py b/pyiceberg/avro/writer.py index fba17e8971..ad6a755614 100644 --- a/pyiceberg/avro/writer.py +++ b/pyiceberg/avro/writer.py @@ -34,7 +34,7 @@ from uuid import UUID from pyiceberg.avro.encoder import BinaryEncoder -from pyiceberg.types import StructType +from pyiceberg.typedef import Record from pyiceberg.utils.decimal import decimal_required_bytes, decimal_to_bytes from pyiceberg.utils.singleton import Singleton @@ -160,7 +160,7 @@ def write(self, encoder: BinaryEncoder, val: Any) -> None: class StructWriter(Writer): field_writers: Tuple[Writer, ...] = dataclassfield() - def write(self, encoder: BinaryEncoder, val: StructType) -> None: + def write(self, encoder: BinaryEncoder, val: Record) -> None: for writer, value in zip(self.field_writers, val.record_fields()): writer.write(encoder, value) diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index 57ec11db77..8bdbfd3524 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -14,31 +14,51 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +from __future__ import annotations + +import math +from abc import ABC, abstractmethod from enum import Enum +from functools import singledispatch +from types import TracebackType from typing import ( Any, Dict, Iterator, List, + Literal, Optional, + Type, ) -from pyiceberg.avro.file import AvroFile -from pyiceberg.io import FileIO, InputFile +from pyiceberg.avro.file import AvroFile, AvroOutputFile +from pyiceberg.conversions import to_bytes +from pyiceberg.exceptions import ValidationError +from pyiceberg.io import FileIO, InputFile, OutputFile +from pyiceberg.partitioning import PartitionSpec from pyiceberg.schema import Schema from pyiceberg.typedef import Record from pyiceberg.types import ( BinaryType, BooleanType, + DateType, + IcebergType, IntegerType, ListType, LongType, MapType, NestedField, + PrimitiveType, StringType, StructType, + TimestampType, + TimestamptzType, + TimeType, ) +UNASSIGNED_SEQ = -1 +DEFAULT_BLOCK_SIZE = 67108864 # 64 * 1024 * 1024 + class DataFileContent(int, Enum): DATA = 0 @@ -79,7 +99,7 @@ def __repr__(self) -> str: return f"FileFormat.{self.name}" -DATA_FILE_TYPE = StructType( +DATA_FILE_TYPE_V1 = StructType( NestedField( field_id=134, name="content", @@ -90,7 +110,11 @@ def __repr__(self) -> str: ), NestedField(field_id=100, name="file_path", field_type=StringType(), required=True, doc="Location URI with FS scheme"), NestedField( - field_id=101, name="file_format", field_type=StringType(), required=True, doc="File format name: avro, orc, or parquet" + field_id=101, + name="file_format", + field_type=StringType(), + required=True, + doc="File format name: avro, orc, or parquet", ), NestedField( field_id=102, @@ -101,6 +125,13 @@ def __repr__(self) -> str: ), NestedField(field_id=103, name="record_count", field_type=LongType(), required=True, doc="Number of records in the file"), NestedField(field_id=104, name="file_size_in_bytes", field_type=LongType(), required=True, doc="Total file size in bytes"), + NestedField( + field_id=105, + name="block_size_in_bytes", + field_type=LongType(), + required=False, + doc="Deprecated. Always write a default in v1. Do not write in v2.", + ), NestedField( field_id=108, name="column_sizes", @@ -162,6 +193,55 @@ def __repr__(self) -> str: NestedField(field_id=141, name="spec_id", field_type=IntegerType(), required=False, doc="Partition spec ID"), ) +DATA_FILE_TYPE_V2 = StructType(*[field for field in DATA_FILE_TYPE_V1.fields if field.field_id != 105]) + + +@singledispatch +def partition_field_to_data_file_partition_field(partition_field_type: IcebergType) -> PrimitiveType: + raise TypeError(f"Unsupported partition field type: {partition_field_type}") + + +@partition_field_to_data_file_partition_field.register(LongType) +@partition_field_to_data_file_partition_field.register(DateType) +@partition_field_to_data_file_partition_field.register(TimeType) +@partition_field_to_data_file_partition_field.register(TimestampType) +@partition_field_to_data_file_partition_field.register(TimestamptzType) +def _(partition_field_type: PrimitiveType) -> IntegerType: + return IntegerType() + + +@partition_field_to_data_file_partition_field.register(PrimitiveType) +def _(partition_field_type: PrimitiveType) -> PrimitiveType: + return partition_field_type + + +def data_file_with_partition(partition_type: StructType, format_version: Literal[1, 2]) -> StructType: + data_file_partition_type = StructType( + *[ + NestedField( + field_id=field.field_id, + name=field.name, + field_type=partition_field_to_data_file_partition_field(field.field_type), + ) + for field in partition_type.fields + ] + ) + + return StructType( + *[ + NestedField( + field_id=102, + name="partition", + field_type=data_file_partition_type, + required=True, + doc="Partition data tuple, schema based on the partition spec", + ) + if field.field_id == 102 + else field + for field in (DATA_FILE_TYPE_V1.fields if format_version == 1 else DATA_FILE_TYPE_V2.fields) + ] + ) + class DataFile(Record): __slots__ = ( @@ -171,6 +251,7 @@ class DataFile(Record): "partition", "record_count", "file_size_in_bytes", + "block_size_in_bytes", "column_sizes", "value_counts", "null_value_counts", @@ -189,6 +270,7 @@ class DataFile(Record): partition: Record record_count: int file_size_in_bytes: int + block_size_in_bytes: Optional[int] column_sizes: Dict[int, int] value_counts: Dict[int, int] null_value_counts: Dict[int, int] @@ -208,8 +290,11 @@ def __setattr__(self, name: str, value: Any) -> None: value = FileFormat[value] super().__setattr__(name, value) - def __init__(self, *data: Any, **named_data: Any) -> None: - super().__init__(*data, **{"struct": DATA_FILE_TYPE, **named_data}) + def __init__(self, format_version: Literal[1, 2] = 1, *data: Any, **named_data: Any) -> None: + super().__init__( + *data, + **{"struct": DATA_FILE_TYPE_V1 if format_version == 1 else DATA_FILE_TYPE_V2, **named_data}, + ) def __hash__(self) -> int: """Return the hash of the file path.""" @@ -228,12 +313,21 @@ def __eq__(self, other: Any) -> bool: NestedField(1, "snapshot_id", LongType(), required=False), NestedField(3, "data_sequence_number", LongType(), required=False), NestedField(4, "file_sequence_number", LongType(), required=False), - NestedField(2, "data_file", DATA_FILE_TYPE, required=True), + NestedField(2, "data_file", DATA_FILE_TYPE_V1, required=True), ) MANIFEST_ENTRY_SCHEMA_STRUCT = MANIFEST_ENTRY_SCHEMA.as_struct() +def manifest_entry_schema_with_data_file(data_file: StructType) -> Schema: + return Schema( + *[ + NestedField(2, "data_file", data_file, required=True) if field.field_id == 2 else field + for field in MANIFEST_ENTRY_SCHEMA.fields + ] + ) + + class ManifestEntry(Record): __slots__ = ("status", "snapshot_id", "data_sequence_number", "file_sequence_number", "data_file") status: ManifestEntryStatus @@ -265,6 +359,54 @@ def __init__(self, *data: Any, **named_data: Any) -> None: super().__init__(*data, **{"struct": PARTITION_FIELD_SUMMARY_TYPE, **named_data}) +class PartitionFieldStats: + _type: PrimitiveType + _contains_null: bool + _contains_nan: bool + _min: Optional[Any] + _max: Optional[Any] + + def __init__(self, iceberg_type: PrimitiveType) -> None: + self._type = iceberg_type + self._contains_null = False + self._contains_nan = False + self._min = None + self._max = None + + def to_summary(self) -> PartitionFieldSummary: + return PartitionFieldSummary( + contains_null=self._contains_null, + contains_nan=self._contains_nan, + lower_bound=to_bytes(self._type, self._min) if self._min is not None else None, + upper_bound=to_bytes(self._type, self._max) if self._max is not None else None, + ) + + def update(self, value: Any) -> None: + if value is None: + self._contains_null = True + elif isinstance(value, float) and math.isnan(value): + self._contains_nan = True + else: + if self._min is None: + self._min = value + self._max = value + else: + self._max = max(self._max, value) + self._min = min(self._min, value) + + +def construct_partition_summaries(spec: PartitionSpec, schema: Schema, partitions: List[Record]) -> List[PartitionFieldSummary]: + types = [field.field_type for field in spec.partition_type(schema).fields] + field_stats = [PartitionFieldStats(field_type) for field_type in types] + for partition_keys in partitions: + for i, field_type in enumerate(types): + if not isinstance(field_type, PrimitiveType): + raise ValueError(f"Expected a primitive type for the partition field, got {field_type}") + partition_key = partition_keys[i] + field_stats[i].update(partition_key) + return [field.to_summary() for field in field_stats] + + MANIFEST_FILE_SCHEMA: Schema = Schema( NestedField(500, "manifest_path", StringType(), required=True, doc="Location URI with FS scheme"), NestedField(501, "manifest_length", LongType(), required=True), @@ -405,3 +547,315 @@ def _inherit_sequence_number(entry: ManifestEntry, manifest: ManifestFile) -> Ma entry.file_sequence_number = manifest.sequence_number return entry + + +class ManifestWriter(ABC): + closed: bool + _spec: PartitionSpec + _schema: Schema + _output_file: OutputFile + _writer: AvroOutputFile[ManifestEntry] + _snapshot_id: int + _meta: Dict[str, str] + _added_files: int + _added_rows: int + _existing_files: int + _existing_rows: int + _deleted_files: int + _deleted_rows: int + _min_data_sequence_number: Optional[int] + _partitions: List[Record] + + def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int, meta: Dict[str, str]): + self.closed = False + self._spec = spec + self._schema = schema + self._output_file = output_file + self._snapshot_id = snapshot_id + self._meta = meta + + self._added_files = 0 + self._added_rows = 0 + self._existing_files = 0 + self._existing_rows = 0 + self._deleted_files = 0 + self._deleted_rows = 0 + self._min_data_sequence_number = None + self._partitions = [] + + def __enter__(self) -> ManifestWriter: + """Open the writer.""" + self._writer = self.new_writer() + self._writer.__enter__() + return self + + def __exit__( + self, + exc_type: Optional[Type[BaseException]], + exc_value: Optional[BaseException], + traceback: Optional[TracebackType], + ) -> None: + """Close the writer.""" + self.closed = True + self._writer.__exit__(exc_type, exc_value, traceback) + + @abstractmethod + def content(self) -> ManifestContent: + ... + + @abstractmethod + def new_writer(self) -> AvroOutputFile[ManifestEntry]: + ... + + @abstractmethod + def prepare_entry(self, entry: ManifestEntry) -> ManifestEntry: + ... + + def to_manifest_file(self) -> ManifestFile: + """Return the manifest file.""" + # once the manifest file is generated, no more entries can be added + self.closed = True + min_sequence_number = self._min_data_sequence_number or UNASSIGNED_SEQ + return ManifestFile( + manifest_path=self._output_file.location, + manifest_length=len(self._writer.output_file), + partition_spec_id=self._spec.spec_id, + content=self.content(), + sequence_number=UNASSIGNED_SEQ, + min_sequence_number=min_sequence_number, + added_snapshot_id=self._snapshot_id, + added_files_count=self._added_files, + existing_files_count=self._existing_files, + deleted_files_count=self._deleted_files, + added_rows_count=self._added_rows, + existing_rows_count=self._existing_rows, + deleted_rows_count=self._deleted_rows, + partitions=construct_partition_summaries(self._spec, self._schema, self._partitions), + key_metadatas=None, + ) + + def add_entry(self, entry: ManifestEntry) -> ManifestWriter: + if self.closed: + raise RuntimeError("Cannot add entry to closed manifest writer") + if entry.status == ManifestEntryStatus.ADDED: + self._added_files += 1 + self._added_rows += entry.data_file.record_count + elif entry.status == ManifestEntryStatus.EXISTING: + self._existing_files += 1 + self._existing_rows += entry.data_file.record_count + elif entry.status == ManifestEntryStatus.DELETED: + self._deleted_files += 1 + self._deleted_rows += entry.data_file.record_count + + self._partitions.append(entry.data_file.partition) + + if ( + (entry.status == ManifestEntryStatus.ADDED or entry.status == ManifestEntryStatus.EXISTING) + and entry.data_sequence_number is not None + and (self._min_data_sequence_number is None or entry.data_sequence_number < self._min_data_sequence_number) + ): + self._min_data_sequence_number = entry.data_sequence_number + + self._writer.write_block([self.prepare_entry(entry)]) + return self + + +class ManifestWriterV1(ManifestWriter): + def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int): + super().__init__( + spec, + schema, + output_file, + snapshot_id, + { + "schema": schema.json(), + "partition-spec": spec.json(), + "partition-spec-id": str(spec.spec_id), + "format-version": "1", + }, + ) + + def content(self) -> ManifestContent: + return ManifestContent.DATA + + def new_writer(self) -> AvroOutputFile[ManifestEntry]: + v1_data_file_type = data_file_with_partition(self._spec.partition_type(self._schema), format_version=1) + v1_manifest_entry_schema = manifest_entry_schema_with_data_file(v1_data_file_type) + return AvroOutputFile[ManifestEntry](self._output_file, v1_manifest_entry_schema, "manifest_entry", self._meta) + + def prepare_entry(self, entry: ManifestEntry) -> ManifestEntry: + wrapped_entry = ManifestEntry(*entry.record_fields()) + wrapped_entry.data_file.block_size_in_bytes = DEFAULT_BLOCK_SIZE + return wrapped_entry + + +class ManifestWriterV2(ManifestWriter): + def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int): + super().__init__( + spec, + schema, + output_file, + snapshot_id, + { + "schema": schema.json(), + "partition-spec": spec.json(), + "partition-spec-id": str(spec.spec_id), + "format-version": "2", + "content": "data", + }, + ) + + def content(self) -> ManifestContent: + return ManifestContent.DATA + + def new_writer(self) -> AvroOutputFile[ManifestEntry]: + v2_data_file_type = data_file_with_partition(self._spec.partition_type(self._schema), format_version=2) + v2_manifest_entry_schema = manifest_entry_schema_with_data_file(v2_data_file_type) + return AvroOutputFile[ManifestEntry](self._output_file, v2_manifest_entry_schema, "manifest_entry", self._meta) + + def prepare_entry(self, entry: ManifestEntry) -> ManifestEntry: + if entry.data_sequence_number is None: + if entry.snapshot_id is not None and entry.snapshot_id != self._snapshot_id: + raise ValueError(f"Found unassigned sequence number for an entry from snapshot: {entry.snapshot_id}") + if entry.status != ManifestEntryStatus.ADDED: + raise ValueError("Only entries with status ADDED can have null sequence number") + # In v2, we should not write block_size_in_bytes field + wrapped_data_file_v2_debug = DataFile( + format_version=2, + content=entry.data_file.content, + file_path=entry.data_file.file_path, + file_format=entry.data_file.file_format, + partition=entry.data_file.partition, + record_count=entry.data_file.record_count, + file_size_in_bytes=entry.data_file.file_size_in_bytes, + column_sizes=entry.data_file.column_sizes, + value_counts=entry.data_file.value_counts, + null_value_counts=entry.data_file.null_value_counts, + nan_value_counts=entry.data_file.nan_value_counts, + lower_bounds=entry.data_file.lower_bounds, + upper_bounds=entry.data_file.upper_bounds, + key_metadata=entry.data_file.key_metadata, + split_offsets=entry.data_file.split_offsets, + equality_ids=entry.data_file.equality_ids, + sort_order_id=entry.data_file.sort_order_id, + spec_id=entry.data_file.spec_id, + ) + wrapped_entry = ManifestEntry( + status=entry.status, + snapshot_id=entry.snapshot_id, + data_sequence_number=entry.data_sequence_number, + file_sequence_number=entry.file_sequence_number, + data_file=wrapped_data_file_v2_debug, + ) + return wrapped_entry + + +def write_manifest( + format_version: Literal[1, 2], spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int +) -> ManifestWriter: + if format_version == 1: + return ManifestWriterV1(spec, schema, output_file, snapshot_id) + elif format_version == 2: + return ManifestWriterV2(spec, schema, output_file, snapshot_id) + else: + raise ValueError(f"Cannot write manifest for table version: {format_version}") + + +class ManifestListWriter(ABC): + _output_file: OutputFile + _meta: Dict[str, str] + _manifest_files: List[ManifestFile] + _commit_snapshot_id: int + _writer: AvroOutputFile[ManifestFile] + + def __init__(self, output_file: OutputFile, meta: Dict[str, str]): + self._output_file = output_file + self._meta = meta + self._manifest_files = [] + + def __enter__(self) -> ManifestListWriter: + """Open the writer for writing.""" + self._writer = AvroOutputFile[ManifestFile](self._output_file, MANIFEST_FILE_SCHEMA, "manifest_file", self._meta) + self._writer.__enter__() + return self + + def __exit__( + self, + exc_type: Optional[Type[BaseException]], + exc_value: Optional[BaseException], + traceback: Optional[TracebackType], + ) -> None: + """Close the writer.""" + self._writer.__exit__(exc_type, exc_value, traceback) + return + + @abstractmethod + def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile: + ... + + def add_manifests(self, manifest_files: List[ManifestFile]) -> ManifestListWriter: + self._writer.write_block([self.prepare_manifest(manifest_file) for manifest_file in manifest_files]) + return self + + +class ManifestListWriterV1(ManifestListWriter): + def __init__(self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id: int): + super().__init__( + output_file, {"snapshot-id": str(snapshot_id), "parent-snapshot-id": str(parent_snapshot_id), "format-version": "1"} + ) + + def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile: + if manifest_file.content != ManifestContent.DATA: + raise ValidationError("Cannot store delete manifests in a v1 table") + return manifest_file + + +class ManifestListWriterV2(ManifestListWriter): + _commit_snapshot_id: int + _sequence_number: int + + def __init__(self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id: int, sequence_number: int): + super().__init__( + output_file, + { + "snapshot-id": str(snapshot_id), + "parent-snapshot-id": str(parent_snapshot_id), + "sequence-number": str(sequence_number), + "format-version": "2", + }, + ) + self._commit_snapshot_id = snapshot_id + self._sequence_number = sequence_number + + def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile: + wrapped_manifest_file = ManifestFile(*manifest_file.record_fields()) + + if wrapped_manifest_file.sequence_number == UNASSIGNED_SEQ: + # if the sequence number is being assigned here, then the manifest must be created by the current operation. + # To validate this, check that the snapshot id matches the current commit + if self._commit_snapshot_id != wrapped_manifest_file.added_snapshot_id: + raise ValueError( + f"Found unassigned sequence number for a manifest from snapshot: {wrapped_manifest_file.added_snapshot_id}" + ) + wrapped_manifest_file.sequence_number = self._sequence_number + + if wrapped_manifest_file.min_sequence_number == UNASSIGNED_SEQ: + if self._commit_snapshot_id != wrapped_manifest_file.added_snapshot_id: + raise ValueError( + f"Found unassigned sequence number for a manifest from snapshot: {wrapped_manifest_file.added_snapshot_id}" + ) + # if the min sequence number is not determined, then there was no assigned sequence number for any file + # written to the wrapped manifest. Replace the unassigned sequence number with the one for this commit + wrapped_manifest_file.min_sequence_number = self._sequence_number + return wrapped_manifest_file + + +def write_manifest_list( + format_version: Literal[1, 2], output_file: OutputFile, snapshot_id: int, parent_snapshot_id: int, sequence_number: int +) -> ManifestListWriter: + if format_version == 1: + return ManifestListWriterV1(output_file, snapshot_id, parent_snapshot_id) + elif format_version == 2: + return ManifestListWriterV2(output_file, snapshot_id, parent_snapshot_id, sequence_number) + else: + raise ValueError(f"Cannot write manifest list for table version: {format_version}") diff --git a/pyiceberg/typedef.py b/pyiceberg/typedef.py index bb9a438a03..ff2a6d1cb0 100644 --- a/pyiceberg/typedef.py +++ b/pyiceberg/typedef.py @@ -195,4 +195,5 @@ def __repr__(self) -> str: return f"{self.__class__.__name__}[{', '.join(f'{key}={repr(value)}' for key, value in self.__dict__.items() if not key.startswith('_'))}]" def record_fields(self) -> List[str]: + """Return values of all the fields of the Record class except those specified in skip_fields.""" return [self.__getattribute__(v) if hasattr(self, v) else None for v in self._position_to_field_name] diff --git a/tests/avro/test_file.py b/tests/avro/test_file.py index 2738770492..e9dcc7eca1 100644 --- a/tests/avro/test_file.py +++ b/tests/avro/test_file.py @@ -124,6 +124,7 @@ def test_write_manifest_entry_with_iceberg_read_with_fastavro() -> None: partition=Record(), record_count=131327, file_size_in_bytes=220669226, + block_size_in_bytes=67108864, column_sizes={1: 220661854}, value_counts={1: 131327}, null_value_counts={1: 0}, diff --git a/tests/test_integration_manifest.py b/tests/test_integration_manifest.py new file mode 100644 index 0000000000..34b20f271d --- /dev/null +++ b/tests/test_integration_manifest.py @@ -0,0 +1,126 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# pylint:disable=redefined-outer-name + +import inspect +from enum import Enum +from tempfile import TemporaryDirectory +from typing import Any + +import pytest +from fastavro import reader + +from pyiceberg.catalog import Catalog, load_catalog +from pyiceberg.io.pyarrow import PyArrowFileIO +from pyiceberg.manifest import ( + DataFile, + ManifestEntry, + write_manifest, +) +from pyiceberg.table import Table +from pyiceberg.utils.lazydict import LazyDict + + +# helper function to serialize our objects to dicts to enable +# direct comparison with the dicts returned by fastavro +def todict(obj: Any) -> Any: + if isinstance(obj, dict) or isinstance(obj, LazyDict): + data = [] + for k, v in obj.items(): + data.append({"key": k, "value": v}) + return data + elif isinstance(obj, Enum): + return obj.value + elif hasattr(obj, "__iter__") and not isinstance(obj, str) and not isinstance(obj, bytes): + return [todict(v) for v in obj] + elif hasattr(obj, "__dict__"): + return {key: todict(value) for key, value in inspect.getmembers(obj) if not callable(value) and not key.startswith("_")} + else: + return obj + + +@pytest.fixture() +def catalog() -> Catalog: + return load_catalog( + "local", + **{ + "type": "rest", + "uri": "http://localhost:8181", + "s3.endpoint": "http://localhost:9000", + "s3.access-key-id": "admin", + "s3.secret-access-key": "password", + }, + ) + + +@pytest.fixture() +def table_test_all_types(catalog: Catalog) -> Table: + return catalog.load_table("default.test_all_types") + + +@pytest.mark.integration +def test_write_sample_manifest(table_test_all_types: Table) -> None: + test_snapshot = table_test_all_types.current_snapshot() + if test_snapshot is None: + raise ValueError("Table has no current snapshot, check the docker environment") + io = table_test_all_types.io + test_manifest_file = test_snapshot.manifests(io)[0] + test_manifest_entries = test_manifest_file.fetch_manifest_entry(io) + entry = test_manifest_entries[0] + test_schema = table_test_all_types.schema() + test_spec = table_test_all_types.spec() + wrapped_data_file_v2_debug = DataFile( + format_version=2, + content=entry.data_file.content, + file_path=entry.data_file.file_path, + file_format=entry.data_file.file_format, + partition=entry.data_file.partition, + record_count=entry.data_file.record_count, + file_size_in_bytes=entry.data_file.file_size_in_bytes, + column_sizes=entry.data_file.column_sizes, + value_counts=entry.data_file.value_counts, + null_value_counts=entry.data_file.null_value_counts, + nan_value_counts=entry.data_file.nan_value_counts, + lower_bounds=entry.data_file.lower_bounds, + upper_bounds=entry.data_file.upper_bounds, + key_metadata=entry.data_file.key_metadata, + split_offsets=entry.data_file.split_offsets, + equality_ids=entry.data_file.equality_ids, + sort_order_id=entry.data_file.sort_order_id, + spec_id=entry.data_file.spec_id, + ) + wrapped_entry_v2 = ManifestEntry(*entry.record_fields()) + wrapped_entry_v2.data_file = wrapped_data_file_v2_debug + with TemporaryDirectory() as tmpdir: + tmp_avro_file = tmpdir + "/test_write_manifest.avro" + output = PyArrowFileIO().new_output(tmp_avro_file) + with write_manifest( + format_version=2, + spec=test_spec, + schema=test_schema, + output_file=output, + snapshot_id=test_snapshot.snapshot_id, + ) as manifest_writer: + # For simplicity, try one entry first + manifest_writer.add_entry(test_manifest_entries[0]) + + with open(tmp_avro_file, "rb") as fo: + r = reader(fo=fo) + it = iter(r) + fa_entry = next(it) + + assert fa_entry == todict(wrapped_entry_v2) diff --git a/tests/utils/test_manifest.py b/tests/utils/test_manifest.py index 76a4a8a2b4..41af844bba 100644 --- a/tests/utils/test_manifest.py +++ b/tests/utils/test_manifest.py @@ -14,6 +14,12 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +# pylint: disable=redefined-outer-name,arguments-renamed,fixme +from tempfile import TemporaryDirectory +from typing import Dict + +import fastavro +import pytest from pyiceberg.io import load_file_io from pyiceberg.io.pyarrow import PyArrowFileIO @@ -26,9 +32,25 @@ ManifestFile, PartitionFieldSummary, read_manifest_list, + write_manifest, + write_manifest_list, ) +from pyiceberg.partitioning import PartitionField, PartitionSpec +from pyiceberg.schema import Schema from pyiceberg.table import Snapshot from pyiceberg.table.snapshots import Operation, Summary +from pyiceberg.transforms import IdentityTransform +from pyiceberg.typedef import Record +from pyiceberg.types import IntegerType, NestedField + + +def _verify_metadata_with_fastavro(avro_file: str, expected_metadata: Dict[str, str]) -> None: + with open(avro_file, "rb") as f: + reader = fastavro.reader(f) + metadata = reader.metadata + for k, v in expected_metadata.items(): + assert k in metadata + assert metadata[k] == v def test_read_manifest_entry(generated_manifest_entry_file: str) -> None: @@ -278,3 +300,245 @@ def test_read_manifest_v2(generated_manifest_file_file_v2: str) -> None: assert entry.file_sequence_number == 3 assert entry.snapshot_id == 8744736658442914487 assert entry.status == ManifestEntryStatus.ADDED + + +@pytest.mark.parametrize("format_version", [1, 2]) +def test_write_manifest(generated_manifest_file_file_v1: str, generated_manifest_file_file_v2: str, format_version: int) -> None: + io = load_file_io() + snapshot = Snapshot( + snapshot_id=25, + parent_snapshot_id=19, + timestamp_ms=1602638573590, + manifest_list=generated_manifest_file_file_v1 if format_version == 1 else generated_manifest_file_file_v2, + summary=Summary(Operation.APPEND), + schema_id=3, + ) + demo_manifest_file = snapshot.manifests(io)[0] + manifest_entries = demo_manifest_file.fetch_manifest_entry(io) + test_schema = Schema( + NestedField(1, "VendorID", IntegerType(), False), NestedField(2, "tpep_pickup_datetime", IntegerType(), False) + ) + test_spec = PartitionSpec( + PartitionField(source_id=1, field_id=1, transform=IdentityTransform(), name="VendorID"), + PartitionField(source_id=2, field_id=2, transform=IdentityTransform(), name="tpep_pickup_datetime"), + spec_id=demo_manifest_file.partition_spec_id, + ) + with TemporaryDirectory() as tmpdir: + tmp_avro_file = tmpdir + "/test_write_manifest.avro" + output = io.new_output(tmp_avro_file) + with write_manifest( + format_version=format_version, # type: ignore + spec=test_spec, + schema=test_schema, + output_file=output, + snapshot_id=8744736658442914487, + ) as writer: + for entry in manifest_entries: + writer.add_entry(entry) + new_manifest = writer.to_manifest_file() + with pytest.raises(RuntimeError): + writer.add_entry(manifest_entries[0]) + + expected_metadata = { + "schema": test_schema.json(), + "partition-spec": test_spec.json(), + "partition-spec-id": str(test_spec.spec_id), + "format-version": str(format_version), + } + if format_version == 2: + expected_metadata["content"] = "data" + _verify_metadata_with_fastavro( + tmp_avro_file, + expected_metadata, + ) + new_manifest_entries = new_manifest.fetch_manifest_entry(io) + + manifest_entry = new_manifest_entries[0] + + assert manifest_entry.status == ManifestEntryStatus.ADDED + assert manifest_entry.snapshot_id == 8744736658442914487 + assert manifest_entry.data_sequence_number == 0 if format_version == 1 else 3 + assert isinstance(manifest_entry.data_file, DataFile) + + data_file = manifest_entry.data_file + + assert data_file.content is DataFileContent.DATA + assert ( + data_file.file_path + == "/home/iceberg/warehouse/nyc/taxis_partitioned/data/VendorID=null/00000-633-d8a4223e-dc97-45a1-86e1-adaba6e8abd7-00001.parquet" + ) + assert data_file.file_format == FileFormat.PARQUET + assert data_file.partition == Record(VendorID=1, tpep_pickup_datetime=1925) + assert data_file.record_count == 19513 + assert data_file.file_size_in_bytes == 388872 + if format_version == 1: + assert data_file.block_size_in_bytes == 67108864 + else: + assert data_file.block_size_in_bytes is None + assert data_file.column_sizes == { + 1: 53, + 2: 98153, + 3: 98693, + 4: 53, + 5: 53, + 6: 53, + 7: 17425, + 8: 18528, + 9: 53, + 10: 44788, + 11: 35571, + 12: 53, + 13: 1243, + 14: 2355, + 15: 12750, + 16: 4029, + 17: 110, + 18: 47194, + 19: 2948, + } + assert data_file.value_counts == { + 1: 19513, + 2: 19513, + 3: 19513, + 4: 19513, + 5: 19513, + 6: 19513, + 7: 19513, + 8: 19513, + 9: 19513, + 10: 19513, + 11: 19513, + 12: 19513, + 13: 19513, + 14: 19513, + 15: 19513, + 16: 19513, + 17: 19513, + 18: 19513, + 19: 19513, + } + assert data_file.null_value_counts == { + 1: 19513, + 2: 0, + 3: 0, + 4: 19513, + 5: 19513, + 6: 19513, + 7: 0, + 8: 0, + 9: 19513, + 10: 0, + 11: 0, + 12: 19513, + 13: 0, + 14: 0, + 15: 0, + 16: 0, + 17: 0, + 18: 0, + 19: 0, + } + assert data_file.nan_value_counts == {16: 0, 17: 0, 18: 0, 19: 0, 10: 0, 11: 0, 12: 0, 13: 0, 14: 0, 15: 0} + assert data_file.lower_bounds == { + 2: b"2020-04-01 00:00", + 3: b"2020-04-01 00:12", + 7: b"\x03\x00\x00\x00", + 8: b"\x01\x00\x00\x00", + 10: b"\xf6(\\\x8f\xc2\x05S\xc0", + 11: b"\x00\x00\x00\x00\x00\x00\x00\x00", + 13: b"\x00\x00\x00\x00\x00\x00\x00\x00", + 14: b"\x00\x00\x00\x00\x00\x00\xe0\xbf", + 15: b")\\\x8f\xc2\xf5(\x08\xc0", + 16: b"\x00\x00\x00\x00\x00\x00\x00\x00", + 17: b"\x00\x00\x00\x00\x00\x00\x00\x00", + 18: b"\xf6(\\\x8f\xc2\xc5S\xc0", + 19: b"\x00\x00\x00\x00\x00\x00\x04\xc0", + } + assert data_file.upper_bounds == { + 2: b"2020-04-30 23:5:", + 3: b"2020-05-01 00:41", + 7: b"\t\x01\x00\x00", + 8: b"\t\x01\x00\x00", + 10: b"\xcd\xcc\xcc\xcc\xcc,_@", + 11: b"\x1f\x85\xebQ\\\xe2\xfe@", + 13: b"\x00\x00\x00\x00\x00\x00\x12@", + 14: b"\x00\x00\x00\x00\x00\x00\xe0?", + 15: b"q=\n\xd7\xa3\xf01@", + 16: b"\x00\x00\x00\x00\x00`B@", + 17: b"333333\xd3?", + 18: b"\x00\x00\x00\x00\x00\x18b@", + 19: b"\x00\x00\x00\x00\x00\x00\x04@", + } + assert data_file.key_metadata is None + assert data_file.split_offsets == [4] + assert data_file.equality_ids is None + assert data_file.sort_order_id == 0 + + +@pytest.mark.parametrize("format_version", [1, 2]) +def test_write_manifest_list( + generated_manifest_file_file_v1: str, generated_manifest_file_file_v2: str, format_version: int +) -> None: + io = load_file_io() + + snapshot = Snapshot( + snapshot_id=25, + parent_snapshot_id=19, + timestamp_ms=1602638573590, + manifest_list=generated_manifest_file_file_v1 if format_version == 1 else generated_manifest_file_file_v2, + summary=Summary(Operation.APPEND), + schema_id=3, + ) + + demo_manifest_list = snapshot.manifests(io) + with TemporaryDirectory() as tmp_dir: + path = tmp_dir + "/manifest-list.avro" + output = io.new_output(path) + with write_manifest_list( + format_version=format_version, output_file=output, snapshot_id=25, parent_snapshot_id=19, sequence_number=0 # type: ignore + ) as writer: + writer.add_manifests(demo_manifest_list) + new_manifest_list = list(read_manifest_list(io.new_input(path))) + + expected_metadata = {"snapshot-id": "25", "parent-snapshot-id": "19", "format-version": str(format_version)} + if format_version == 2: + expected_metadata["sequence-number"] = "0" + _verify_metadata_with_fastavro(path, expected_metadata) + + manifest_file = new_manifest_list[0] + + assert manifest_file.manifest_length == 7989 + assert manifest_file.partition_spec_id == 0 + assert manifest_file.content == ManifestContent.DATA if format_version == 1 else ManifestContent.DELETES + assert manifest_file.sequence_number == 0 if format_version == 1 else 3 + assert manifest_file.min_sequence_number == 0 if format_version == 1 else 3 + assert manifest_file.added_snapshot_id == 9182715666859759686 + assert manifest_file.added_files_count == 3 + assert manifest_file.existing_files_count == 0 + assert manifest_file.deleted_files_count == 0 + assert manifest_file.added_rows_count == 237993 + assert manifest_file.existing_rows_count == 0 + assert manifest_file.deleted_rows_count == 0 + assert manifest_file.key_metadata is None + + assert isinstance(manifest_file.partitions, list) + + partition = manifest_file.partitions[0] + + assert isinstance(partition, PartitionFieldSummary) + + assert partition.contains_null is True + assert partition.contains_nan is False + assert partition.lower_bound == b"\x01\x00\x00\x00" + assert partition.upper_bound == b"\x02\x00\x00\x00" + + entries = manifest_file.fetch_manifest_entry(io) + + assert isinstance(entries, list) + + entry = entries[0] + + assert entry.data_sequence_number == 0 if format_version == 1 else 3 + assert entry.file_sequence_number == 0 if format_version == 1 else 3 + assert entry.snapshot_id == 8744736658442914487 + assert entry.status == ManifestEntryStatus.ADDED