Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ResidualVisitor to compute residuals #1388

Open
wants to merge 27 commits into
base: main
Choose a base branch
from

Conversation

tusharchou
Copy link

closes issue: Count rows as a metadata-only operation #1223

@jayceslesar
Copy link
Contributor

Question: Does it make sense to expose this as the __len__ dunder method because python? It would just return the self.count()

* added residual evaluator in plan files

* tested counts with positional deletes

* merged main

* implemented batch reader in count

* breaking integration test

* fixed integration test

* git pull main

* revert

* revert

* revert test_partitioning_key.py

* revert test_parser.py

* added residual evaluator in visitor

* deleted residual_evaluator.py

* removed test count from test_sql.py

* ignored lint type

* fixed lint

* working on plan_files

* type ignored

* make lint
@tusharchou
Copy link
Author

Hi @Fokko @kevinjqliu @gli-chris-hao ,

I have implemented these suggestions with my best understanding.

  • residual evaluator
  • positional deletes
  • batch processing of files larger than 512mb

It would be helpful to get fresh review

@Fokko Fokko changed the title Count rows as a metadata only operation Add ResidualVisitor to compute residuals Jan 13, 2025
Copy link
Contributor

@Fokko Fokko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great @tusharchou Thanks for working on this. I left some comments, but this is a great start 🚀

pyiceberg/expressions/visitors.py Show resolved Hide resolved
pyiceberg/expressions/visitors.py Show resolved Hide resolved
pyiceberg/expressions/visitors.py Outdated Show resolved Hide resolved
pyiceberg/expressions/visitors.py Show resolved Hide resolved
pyiceberg/expressions/visitors.py Outdated Show resolved Hide resolved
pyiceberg/table/__init__.py Outdated Show resolved Hide resolved
pyiceberg/table/__init__.py Outdated Show resolved Hide resolved
pyiceberg/table/__init__.py Show resolved Hide resolved
pyiceberg/table/__init__.py Outdated Show resolved Hide resolved
pyiceberg/table/__init__.py Show resolved Hide resolved
* added residual evaluator in plan files

* tested counts with positional deletes

* merged main

* implemented batch reader in count

* breaking integration test

* fixed integration test

* git pull main

* revert

* revert

* revert test_partitioning_key.py

* revert test_parser.py

* added residual evaluator in visitor

* deleted residual_evaluator.py

* removed test count from test_sql.py

* ignored lint type

* fixed lint

* working on plan_files

* type ignored

* make lint

* explicit delete files len is zero

* residual eval only if manifest is true

* default residual is always true

* used projection schema

* refactored residual in plan files
* added residual evaluator in plan files

* tested counts with positional deletes

* merged main

* implemented batch reader in count

* breaking integration test

* fixed integration test

* git pull main

* revert

* revert

* revert test_partitioning_key.py

* revert test_parser.py

* added residual evaluator in visitor

* deleted residual_evaluator.py

* removed test count from test_sql.py

* ignored lint type

* fixed lint

* working on plan_files

* type ignored

* make lint

* explicit delete files len is zero

* residual eval only if manifest is true

* default residual is always true

* used projection schema

* refactored residual in plan files

* fixed lint issue with isnan
Comment on lines +1746 to +1753
1. If d > day(a) and d < day(b), the residual is always true
2. If d == day(a) and d != day(b), the residual is utc_timestamp >= a
3. if d == day(b) and d != day(a), the residual is utc_timestamp <= b
4. If d == day(a) == day(b), the residual is utc_timestamp >= a and utc_timestamp <= b

Partition data is passed using StructLike. Residuals are returned by residualFor(StructLike).

This class is thread-safe.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
1. If d > day(a) and d < day(b), the residual is always true
2. If d == day(a) and d != day(b), the residual is utc_timestamp >= a
3. if d == day(b) and d != day(a), the residual is utc_timestamp <= b
4. If d == day(a) == day(b), the residual is utc_timestamp >= a and utc_timestamp <= b
Partition data is passed using StructLike. Residuals are returned by residualFor(StructLike).
This class is thread-safe.
1. If d > day(a) and d < day(b), the residual is always true
2. If d == day(a) and d != day(b), the residual is utc_timestamp > a
3. if d == day(b) and d != day(a), the residual is utc_timestamp < b
4. If d == day(a) == day(b), the residual is utc_timestamp > a and utc_timestamp < b
Partition data is passed using StructLike. Residuals are returned by residualFor(StructLike).


A residual expression is made by partially evaluating an expression using partition values.
For example, if a table is partitioned by day(utc_timestamp) and is read with a filter expression
utc_timestamp &gt;= a and utc_timestamp &lt;= b, then there are 4 possible residuals expressions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
utc_timestamp &gt;= a and utc_timestamp &lt;= b, then there are 4 possible residuals expressions
utc_timestamp > a and utc_timestamp < b, then there are 4 possible residuals expressions

# if the result is not a predicate, then it must be a constant like alwaysTrue or alwaysFalse
strict_result = bound

if strict_result is not None and isinstance(strict_result, AlwaysTrue):
Copy link
Contributor

@Fokko Fokko Feb 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Python we can simplify this a bit:

Suggested change
if strict_result is not None and isinstance(strict_result, AlwaysTrue):
if isinstance(strict_result, AlwaysTrue):
>>> from pyiceberg.expressions import AlwaysTrue
>>> isinstance(None, AlwaysTrue)
False

strict_result = None

if strict_projection is not None:
bound = strict_projection.bind(struct_to_schema(self.spec.partition_type(self.schema)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
bound = strict_projection.bind(struct_to_schema(self.spec.partition_type(self.schema)))
bound = strict_projection.bind(struct_to_schema(self.spec.partition_type(self.schema)), case_sensitive=self.case_sensitive)

inclusive_projection = part.transform.project(part.name, predicate)
inclusive_result = None
if inclusive_projection is not None:
bound_inclusive = inclusive_projection.bind(struct_to_schema(self.spec.partition_type(self.schema)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
bound_inclusive = inclusive_projection.bind(struct_to_schema(self.spec.partition_type(self.schema)))
bound_inclusive = inclusive_projection.bind(struct_to_schema(self.spec.partition_type(self.schema)), case_sensitive=self.case_sensitive)

return predicate

def visit_unbound_predicate(self, predicate: UnboundPredicate[L]) -> BooleanExpression:
bound = predicate.bind(self.schema, case_sensitive=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
bound = predicate.bind(self.schema, case_sensitive=True)
bound = predicate.bind(self.schema, case_sensitive=self.case_sensitive)

# if the result is not a predicate, then it must be a constant like alwaysTrue or
# alwaysFalse
inclusive_result = bound_inclusive
if inclusive_result is not None and isinstance(inclusive_result, AlwaysFalse):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if inclusive_result is not None and isinstance(inclusive_result, AlwaysFalse):
if isinstance(inclusive_result, AlwaysFalse):

if isinstance(bound, BoundPredicate):
bound_residual = self.visit_bound_predicate(predicate=bound)
# if isinstance(bound_residual, BooleanExpression):
if bound_residual not in (AlwaysFalse(), AlwaysTrue()):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if bound_residual not in (AlwaysFalse(), AlwaysTrue()):
if not isinstance(bound_residual, (AlwaysFalse, AlwaysTrue)):

Comment on lines +1958 to +1961
if len(spec.fields) != 0:
return ResidualEvaluator(spec=spec, expr=expr, schema=schema, case_sensitive=case_sensitive)
else:
return UnpartitionedResidualEvaluator(schema=schema, expr=expr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just as style thing :)

Suggested change
if len(spec.fields) != 0:
return ResidualEvaluator(spec=spec, expr=expr, schema=schema, case_sensitive=case_sensitive)
else:
return UnpartitionedResidualEvaluator(schema=schema, expr=expr)
return UnpartitionedResidualEvaluator(schema=schema, expr=expr) if spec.is_unpartitioned() else ResidualEvaluator(spec=spec, expr=expr, schema=schema, case_sensitive=case_sensitive)

Comment on lines +1683 to +1693
if task.file.file_size_in_bytes > 512 * 1024 * 1024:
target_schema = schema_to_pyarrow(self.projection())
batches = arrow_scan.to_record_batches([task])
from pyarrow import RecordBatchReader

reader = RecordBatchReader.from_batches(target_schema, batches)
for batch in reader:
res += batch.num_rows
else:
tbl = arrow_scan.to_table([task])
res += len(tbl)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep it simple for now, I don't think we cover the other case in a test

Suggested change
if task.file.file_size_in_bytes > 512 * 1024 * 1024:
target_schema = schema_to_pyarrow(self.projection())
batches = arrow_scan.to_record_batches([task])
from pyarrow import RecordBatchReader
reader = RecordBatchReader.from_batches(target_schema, batches)
for batch in reader:
res += batch.num_rows
else:
tbl = arrow_scan.to_table([task])
res += len(tbl)
tbl = arrow_scan.to_table([task])
res += len(tbl)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants