Skip to content

Commit

Permalink
support distinct_counts
Browse files Browse the repository at this point in the history
  • Loading branch information
jpugliesi committed Feb 5, 2025
1 parent a051584 commit a4995fd
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 1 deletion.
3 changes: 3 additions & 0 deletions pyiceberg/expressions/visitors.py
Original file line number Diff line number Diff line change
Expand Up @@ -1096,6 +1096,7 @@ class _MetricsEvaluator(BoundBooleanExpressionVisitor[bool], ABC):
value_counts: Dict[int, int]
null_counts: Dict[int, int]
nan_counts: Dict[int, int]
distinct_counts: Dict[int, int]
lower_bounds: Dict[int, bytes]
upper_bounds: Dict[int, bytes]

Expand Down Expand Up @@ -1159,6 +1160,7 @@ def eval(self, file: DataFile) -> bool:
self.value_counts = file.value_counts or EMPTY_DICT
self.null_counts = file.null_value_counts or EMPTY_DICT
self.nan_counts = file.nan_value_counts or EMPTY_DICT
self.distinct_counts = file.distinct_counts or EMPTY_DICT
self.lower_bounds = file.lower_bounds or EMPTY_DICT
self.upper_bounds = file.upper_bounds or EMPTY_DICT

Expand Down Expand Up @@ -1486,6 +1488,7 @@ def eval(self, file: DataFile) -> bool:
self.value_counts = file.value_counts or EMPTY_DICT
self.null_counts = file.null_value_counts or EMPTY_DICT
self.nan_counts = file.nan_value_counts or EMPTY_DICT
self.distinct_counts = file.distinct_counts or EMPTY_DICT
self.lower_bounds = file.lower_bounds or EMPTY_DICT
self.upper_bounds = file.upper_bounds or EMPTY_DICT

Expand Down
6 changes: 5 additions & 1 deletion pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2254,6 +2254,7 @@ class DataFileStatistics:
value_counts: Dict[int, int]
null_value_counts: Dict[int, int]
nan_value_counts: Dict[int, int]
distinct_counts: Dict[int, int]
column_aggregates: Dict[int, StatsAggregator]
split_offsets: List[int]

Expand Down Expand Up @@ -2302,6 +2303,7 @@ def to_serialized_dict(self) -> Dict[str, Any]:
"value_counts": self.value_counts,
"null_value_counts": self.null_value_counts,
"nan_value_counts": self.nan_value_counts,
"distinct_counts": self.distinct_counts,
"lower_bounds": lower_bounds,
"upper_bounds": upper_bounds,
"split_offsets": self.split_offsets,
Expand All @@ -2321,6 +2323,7 @@ def data_file_statistics_from_parquet_metadata(
- value_counts
- null_value_counts
- nan_value_counts
- distinct_counts
- column_aggregates
- split_offsets
Expand All @@ -2336,7 +2339,7 @@ def data_file_statistics_from_parquet_metadata(

null_value_counts: Dict[int, int] = {}
nan_value_counts: Dict[int, int] = {}

distinct_counts: Dict[int, int] = {}
col_aggs = {}

invalidate_col: Set[int] = set()
Expand Down Expand Up @@ -2406,6 +2409,7 @@ def data_file_statistics_from_parquet_metadata(
value_counts=value_counts,
null_value_counts=null_value_counts,
nan_value_counts=nan_value_counts,
distinct_counts=distinct_counts,
column_aggregates=col_aggs,
split_offsets=split_offsets,
)
Expand Down
16 changes: 16 additions & 0 deletions pyiceberg/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,13 @@ def __repr__(self) -> str:
required=False,
doc="Map of column id to number of NaN values in the column",
),
NestedField(
field_id=111,
name="distinct_counts",
field_type=MapType(key_id=123, key_type=IntegerType(), value_id=124, value_type=LongType()),
required=False,
doc="Map of column id to distinct count",
),
NestedField(
field_id=125,
name="lower_bounds",
Expand Down Expand Up @@ -241,6 +248,13 @@ def __repr__(self) -> str:
required=False,
doc="Map of column id to number of NaN values in the column",
),
NestedField(
field_id=111,
name="distinct_counts",
field_type=MapType(key_id=123, key_type=IntegerType(), value_id=124, value_type=LongType()),
required=False,
doc="Map of column id to distinct count",
),
NestedField(
field_id=125,
name="lower_bounds",
Expand Down Expand Up @@ -320,6 +334,7 @@ class DataFile(Record):
"value_counts",
"null_value_counts",
"nan_value_counts",
"distinct_counts",
"lower_bounds",
"upper_bounds",
"key_metadata",
Expand All @@ -338,6 +353,7 @@ class DataFile(Record):
value_counts: Dict[int, int]
null_value_counts: Dict[int, int]
nan_value_counts: Dict[int, int]
distinct_counts: Dict[int, int]
lower_bounds: Dict[int, bytes]
upper_bounds: Dict[int, bytes]
key_metadata: Optional[bytes]
Expand Down
10 changes: 10 additions & 0 deletions pyiceberg/table/inspect.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
pa.field("value_count", pa.int64(), nullable=True),
pa.field("null_value_count", pa.int64(), nullable=True),
pa.field("nan_value_count", pa.int64(), nullable=True),
pa.field("distinct_count", pa.int64(), nullable=True),
pa.field("lower_bound", pa_bound_type, nullable=True),
pa.field("upper_bound", pa_bound_type, nullable=True),
])
Expand Down Expand Up @@ -135,6 +136,7 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
pa.field("value_counts", pa.map_(pa.int32(), pa.int64()), nullable=True),
pa.field("null_value_counts", pa.map_(pa.int32(), pa.int64()), nullable=True),
pa.field("nan_value_counts", pa.map_(pa.int32(), pa.int64()), nullable=True),
pa.field("distinct_counts", pa.map_(pa.int32(), pa.int64()), nullable=True),
pa.field("lower_bounds", pa.map_(pa.int32(), pa.binary()), nullable=True),
pa.field("upper_bounds", pa.map_(pa.int32(), pa.binary()), nullable=True),
pa.field("key_metadata", pa.binary(), nullable=True),
Expand All @@ -155,6 +157,7 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
value_counts = entry.data_file.value_counts or {}
null_value_counts = entry.data_file.null_value_counts or {}
nan_value_counts = entry.data_file.nan_value_counts or {}
distinct_counts = entry.data_file.distinct_counts or {}
lower_bounds = entry.data_file.lower_bounds or {}
upper_bounds = entry.data_file.upper_bounds or {}
readable_metrics = {
Expand All @@ -163,6 +166,7 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
"value_count": value_counts.get(field.field_id),
"null_value_count": null_value_counts.get(field.field_id),
"nan_value_count": nan_value_counts.get(field.field_id),
"distinct_count": distinct_counts.get(field.field_id),
# Makes them readable
"lower_bound": from_bytes(field.field_type, lower_bound)
if (lower_bound := lower_bounds.get(field.field_id))
Expand Down Expand Up @@ -196,6 +200,7 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
"value_counts": dict(entry.data_file.value_counts),
"null_value_counts": dict(entry.data_file.null_value_counts),
"nan_value_counts": entry.data_file.nan_value_counts,
"distinct_counts": entry.data_file.distinct_counts,
"lower_bounds": entry.data_file.lower_bounds,
"upper_bounds": entry.data_file.upper_bounds,
"key_metadata": entry.data_file.key_metadata,
Expand Down Expand Up @@ -488,6 +493,7 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
pa.field("value_count", pa.int64(), nullable=True),
pa.field("null_value_count", pa.int64(), nullable=True),
pa.field("nan_value_count", pa.int64(), nullable=True),
pa.field("distinct_count", pa.int64(), nullable=True),
pa.field("lower_bound", pa_bound_type, nullable=True),
pa.field("upper_bound", pa_bound_type, nullable=True),
])
Expand All @@ -508,6 +514,7 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
pa.field("value_counts", pa.map_(pa.int32(), pa.int64()), nullable=True),
pa.field("null_value_counts", pa.map_(pa.int32(), pa.int64()), nullable=True),
pa.field("nan_value_counts", pa.map_(pa.int32(), pa.int64()), nullable=True),
pa.field("distinct_counts", pa.map_(pa.int32(), pa.int64()), nullable=True),
pa.field("lower_bounds", pa.map_(pa.int32(), pa.binary()), nullable=True),
pa.field("upper_bounds", pa.map_(pa.int32(), pa.binary()), nullable=True),
pa.field("key_metadata", pa.binary(), nullable=True),
Expand Down Expand Up @@ -536,6 +543,7 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
value_counts = data_file.value_counts or {}
null_value_counts = data_file.null_value_counts or {}
nan_value_counts = data_file.nan_value_counts or {}
distinct_counts = data_file.distinct_counts or {}
lower_bounds = data_file.lower_bounds or {}
upper_bounds = data_file.upper_bounds or {}
readable_metrics = {
Expand All @@ -544,6 +552,7 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
"value_count": value_counts.get(field.field_id),
"null_value_count": null_value_counts.get(field.field_id),
"nan_value_count": nan_value_counts.get(field.field_id),
"distinct_count": distinct_counts.get(field.field_id),
"lower_bound": from_bytes(field.field_type, lower_bound)
if (lower_bound := lower_bounds.get(field.field_id))
else None,
Expand All @@ -564,6 +573,7 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
"value_counts": dict(data_file.value_counts) if data_file.value_counts is not None else None,
"null_value_counts": dict(data_file.null_value_counts) if data_file.null_value_counts is not None else None,
"nan_value_counts": dict(data_file.nan_value_counts) if data_file.nan_value_counts is not None else None,
"distinct_counts": dict(data_file.distinct_counts) if data_file.distinct_counts is not None else None,
"lower_bounds": dict(data_file.lower_bounds) if data_file.lower_bounds is not None else None,
"upper_bounds": dict(data_file.upper_bounds) if data_file.upper_bounds is not None else None,
"key_metadata": data_file.key_metadata,
Expand Down

0 comments on commit a4995fd

Please sign in to comment.