Skip to content

Commit

Permalink
support all_entries in pyiceberg
Browse files Browse the repository at this point in the history
  • Loading branch information
amitgilad3 committed Feb 4, 2025
1 parent 5018efc commit 815408a
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 5 deletions.
30 changes: 25 additions & 5 deletions pyiceberg/table/inspect.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def snapshots(self) -> "pa.Table":
schema=snapshots_schema,
)

def entries(self, snapshot_id: Optional[int] = None) -> "pa.Table":
def _get_entries_schema(self) -> "pa.Schema":
import pyarrow as pa

from pyiceberg.io.pyarrow import schema_to_pyarrow
Expand Down Expand Up @@ -157,11 +157,18 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
pa.field("readable_metrics", pa.struct(readable_metrics_struct), nullable=True),
]
)
return entries_schema

def entries(self, snapshot_id: Optional[int] = None, discard_deleted: bool = True) -> "pa.Table":
import pyarrow as pa
schema = self.tbl.metadata.schema()

entries_schema = self._get_entries_schema()

entries = []
snapshot = self._get_snapshot(snapshot_id)
for manifest in snapshot.manifests(self.tbl.io):
for entry in manifest.fetch_manifest_entry(io=self.tbl.io):
for entry in manifest.fetch_manifest_entry(io=self.tbl.io, discard_deleted=discard_deleted):
column_sizes = entry.data_file.column_sizes or {}
value_counts = entry.data_file.value_counts or {}
null_value_counts = entry.data_file.null_value_counts or {}
Expand Down Expand Up @@ -205,9 +212,9 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
"record_count": entry.data_file.record_count,
"file_size_in_bytes": entry.data_file.file_size_in_bytes,
"column_sizes": dict(entry.data_file.column_sizes),
"value_counts": dict(entry.data_file.value_counts),
"null_value_counts": dict(entry.data_file.null_value_counts),
"nan_value_counts": dict(entry.data_file.nan_value_counts),
"value_counts": dict(entry.data_file.value_counts or {}),
"null_value_counts": dict(entry.data_file.null_value_counts or {}),
"nan_value_counts": dict(entry.data_file.nan_value_counts or {}),
"lower_bounds": entry.data_file.lower_bounds,
"upper_bounds": entry.data_file.upper_bounds,
"key_metadata": entry.data_file.key_metadata,
Expand Down Expand Up @@ -657,3 +664,16 @@ def all_manifests(self) -> "pa.Table":
lambda args: self._generate_manifests_table(*args), [(snapshot, True) for snapshot in snapshots]
)
return pa.concat_tables(manifests_by_snapshots)

def all_entries(self) -> "pa.Table":
import pyarrow as pa

snapshots = self.tbl.snapshots()
if not snapshots:
return pa.Table.from_pylist([], self._get_entries_schema())

executor = ExecutorFactory.get_or_create()
all_entries: Iterator["pa.Table"] = executor.map(
lambda snapshot_id: self.entries(snapshot_id, discard_deleted=False), [snapshot.snapshot_id for snapshot in snapshots]
)
return pa.concat_tables(all_entries)
53 changes: 53 additions & 0 deletions tests/integration/test_inspect_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -938,3 +938,56 @@ def test_inspect_all_manifests(spark: SparkSession, session_catalog: Catalog, fo
lhs = spark.table(f"{identifier}.all_manifests").toPandas()
rhs = df.to_pandas()
assert_frame_equal(lhs, rhs, check_dtype=False)


@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_inspect_all_entries(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None:
from pandas.testing import assert_frame_equal

identifier = "default.table_metadata_all_entries"
try:
session_catalog.drop_table(identifier=identifier)
except NoSuchTableError:
pass

spark.sql(
f"""
CREATE TABLE {identifier} (
id int,
data string
)
PARTITIONED BY (data)
TBLPROPERTIES ('write.update.mode'='merge-on-read',
'write.delete.mode'='merge-on-read', 'format-version'= {format_version})
"""
)
tbl = session_catalog.load_table(identifier)


spark.sql(f"INSERT INTO {identifier} VALUES (1, 'a')")

spark.sql(f"INSERT INTO {identifier} VALUES (2, 'b')")

spark.sql(f"UPDATE {identifier} SET data = 'c' WHERE id = 1")

spark.sql(f"DELETE FROM {identifier} WHERE id = 2")

spark.sql(f"INSERT OVERWRITE {identifier} VALUES (1, 'a')")

tbl.refresh()

df = tbl.inspect.all_entries()

assert df.column_names == [
"status",
"snapshot_id",
"sequence_number",
"file_sequence_number",
"data_file",
"readable_metrics",
]

lhs = spark.table(f"{identifier}.all_entries").toPandas()
rhs = df.to_pandas()
assert_frame_equal(lhs, rhs, check_dtype=False)

0 comments on commit 815408a

Please sign in to comment.