Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

position_deletes metadata table #1615

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -889,6 +889,12 @@ def _construct_fragment(fs: FileSystem, data_file: DataFile, file_format_kwargs:
return _get_file_format(data_file.file_format, **file_format_kwargs).make_fragment(path, fs)


def _read_delete_file(fs: FileSystem, data_file: DataFile, schema: "pa.Schema") -> pa.Table:
delete_fragment = _construct_fragment(fs, data_file, file_format_kwargs={"pre_buffer": True, "buffer_size": ONE_MEGABYTE})
table = ds.Scanner.from_fragment(fragment=delete_fragment, schema=schema).to_table()
return table


def _read_deletes(fs: FileSystem, data_file: DataFile) -> Dict[str, pa.ChunkedArray]:
delete_fragment = _construct_fragment(
fs, data_file, file_format_kwargs={"dictionary_columns": ("file_path",), "pre_buffer": True, "buffer_size": ONE_MEGABYTE}
Expand Down
78 changes: 77 additions & 1 deletion pyiceberg/table/inspect.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Set, Tuple

from pyiceberg.conversions import from_bytes
from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, PartitionFieldSummary
from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestFile, PartitionFieldSummary
from pyiceberg.partitioning import PartitionSpec
from pyiceberg.table.snapshots import Snapshot, ancestors_of
from pyiceberg.types import PrimitiveType
Expand Down Expand Up @@ -384,6 +384,41 @@ def _get_all_manifests_schema(self) -> "pa.Schema":
all_manifests_schema = all_manifests_schema.append(pa.field("reference_snapshot_id", pa.int64(), nullable=False))
return all_manifests_schema

def _get_positional_file_schema(self) -> "pa.Schema":
import pyarrow as pa

from pyiceberg.io.pyarrow import schema_to_pyarrow

pa_row_struct = schema_to_pyarrow(self.tbl.schema().as_struct())
positinal_delete_schema = pa.schema(
[
pa.field("file_path", pa.string(), nullable=False),
pa.field("pos", pa.int64(), nullable=False),
pa.field("row", pa_row_struct, nullable=True),
]
)
return positinal_delete_schema

def _get_positional_deletes_schema(self) -> "pa.Schema":
import pyarrow as pa

from pyiceberg.io.pyarrow import schema_to_pyarrow

partition_record = self.tbl.metadata.specs_struct()
pa_partition_struct = schema_to_pyarrow(partition_record)
pa_row_struct = schema_to_pyarrow(self.tbl.schema().as_struct())
positinal_delete_schema = pa.schema(
[
pa.field("file_path", pa.string(), nullable=False),
pa.field("pos", pa.int64(), nullable=False),
pa.field("row", pa_row_struct, nullable=True),
pa.field("partition", pa_partition_struct, nullable=False),
pa.field("spec_id", pa.int64(), nullable=True),
pa.field("delete_file_path", pa.string(), nullable=False),
]
)
return positinal_delete_schema

def _generate_manifests_table(self, snapshot: Optional[Snapshot], is_all_manifests_table: bool = False) -> "pa.Table":
import pyarrow as pa

Expand Down Expand Up @@ -453,6 +488,31 @@ def _partition_summaries_to_rows(
schema=self._get_all_manifests_schema() if is_all_manifests_table else self._get_manifests_schema(),
)

def _generate_positional_delete_table(self, manifest: ManifestFile, position_deletes_schema: "pa.Schema") -> "pa.Table":
import pyarrow as pa

positional_deletes: List["pa.Table"] = []
if manifest.content == ManifestContent.DELETES:
for entry in manifest.fetch_manifest_entry(self.tbl.io):
if entry.data_file.content == DataFileContent.POSITION_DELETES:
from pyiceberg.io.pyarrow import _fs_from_file_path, _read_delete_file

positional_delete_file = _read_delete_file(
_fs_from_file_path(self.tbl.io, entry.data_file.file_path),
entry.data_file,
self._get_positional_file_schema(),
).to_pylist()
for record in positional_delete_file:
record["partition"] = entry.data_file.partition.__dict__
record["spec_id"] = manifest.partition_spec_id
record["delete_file_path"] = entry.data_file.file_path

positional_deletes.append(pa.Table.from_pylist(positional_delete_file, position_deletes_schema))

if not positional_deletes:
return pa.Table.from_pylist([], position_deletes_schema)
return pa.concat_tables(positional_deletes)

def manifests(self) -> "pa.Table":
return self._generate_manifests_table(self.tbl.current_snapshot())

Expand Down Expand Up @@ -657,3 +717,19 @@ def all_manifests(self) -> "pa.Table":
lambda args: self._generate_manifests_table(*args), [(snapshot, True) for snapshot in snapshots]
)
return pa.concat_tables(manifests_by_snapshots)

def position_deletes(self) -> "pa.Table":
import pyarrow as pa

position_deletes_schema = self._get_positional_deletes_schema()
current_snapshot = self.tbl.current_snapshot()

if not current_snapshot:
return pa.Table.from_pylist([], schema=position_deletes_schema)

executor = ExecutorFactory.get_or_create()
positional_deletes: Iterator["pa.Table"] = executor.map(
lambda manifest: self._generate_positional_delete_table(manifest, position_deletes_schema),
current_snapshot.manifests(self.tbl.io),
)
return pa.concat_tables(positional_deletes)
53 changes: 53 additions & 0 deletions tests/integration/test_inspect_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -938,3 +938,56 @@ def test_inspect_all_manifests(spark: SparkSession, session_catalog: Catalog, fo
lhs = spark.table(f"{identifier}.all_manifests").toPandas()
rhs = df.to_pandas()
assert_frame_equal(lhs, rhs, check_dtype=False)


@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_inspect_positional_deletes(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None:
from pandas.testing import assert_frame_equal

identifier = "default.table_metadata_position_deletes"
try:
session_catalog.drop_table(identifier=identifier)
except NoSuchTableError:
pass

spark.sql(
f"""
CREATE TABLE {identifier} (
id int,
data string
)
PARTITIONED BY (data)
TBLPROPERTIES ('write.update.mode'='merge-on-read',
'write.delete.mode'='merge-on-read')
"""
)
tbl = session_catalog.load_table(identifier)

spark.sql(f"INSERT INTO {identifier} VALUES (1, 'a')")

spark.sql(f"INSERT INTO {identifier} VALUES (2, 'b')")

spark.sql(f"UPDATE {identifier} SET data = 'c' WHERE id = 1")

spark.sql(f"DELETE FROM {identifier} WHERE id = 2")

tbl.refresh()
df = tbl.inspect.position_deletes()

assert df.column_names == ["file_path", "pos", "row", "partition", "spec_id", "delete_file_path"]

int_cols = ["pos"]
string_cols = ["file_path", "delete_file_path"]

for column in int_cols:
for value in df[column]:
assert isinstance(value.as_py(), int)

for column in string_cols:
for value in df[column]:
assert isinstance(value.as_py(), str)

lhs = spark.table(f"{identifier}.position_deletes").toPandas()
rhs = df.to_pandas()
assert_frame_equal(lhs, rhs, check_dtype=False)