-
Notifications
You must be signed in to change notification settings - Fork 218
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add ResidualVisitor to compute residuals #1388
base: main
Are you sure you want to change the base?
Conversation
…ta-only-op add count in data scan and test in catalog sql
Question: Does it make sense to expose this as the |
…-count Residual Evaluator with test
* added residual evaluator in plan files * tested counts with positional deletes * merged main
* added residual evaluator in plan files * tested counts with positional deletes * merged main * implemented batch reader in count * breaking integration test * fixed integration test * git pull main * revert * revert * revert test_partitioning_key.py * revert test_parser.py * added residual evaluator in visitor * deleted residual_evaluator.py * removed test count from test_sql.py * ignored lint type * fixed lint * working on plan_files * type ignored * make lint
Hi @Fokko @kevinjqliu @gli-chris-hao , I have implemented these suggestions with my best understanding.
It would be helpful to get fresh review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is great @tusharchou Thanks for working on this. I left some comments, but this is a great start 🚀
* added residual evaluator in plan files * tested counts with positional deletes * merged main * implemented batch reader in count * breaking integration test * fixed integration test * git pull main * revert * revert * revert test_partitioning_key.py * revert test_parser.py * added residual evaluator in visitor * deleted residual_evaluator.py * removed test count from test_sql.py * ignored lint type * fixed lint * working on plan_files * type ignored * make lint * explicit delete files len is zero * residual eval only if manifest is true * default residual is always true * used projection schema * refactored residual in plan files
* added residual evaluator in plan files * tested counts with positional deletes * merged main * implemented batch reader in count * breaking integration test * fixed integration test * git pull main * revert * revert * revert test_partitioning_key.py * revert test_parser.py * added residual evaluator in visitor * deleted residual_evaluator.py * removed test count from test_sql.py * ignored lint type * fixed lint * working on plan_files * type ignored * make lint * explicit delete files len is zero * residual eval only if manifest is true * default residual is always true * used projection schema * refactored residual in plan files * fixed lint issue with isnan
1. If d > day(a) and d < day(b), the residual is always true | ||
2. If d == day(a) and d != day(b), the residual is utc_timestamp >= a | ||
3. if d == day(b) and d != day(a), the residual is utc_timestamp <= b | ||
4. If d == day(a) == day(b), the residual is utc_timestamp >= a and utc_timestamp <= b | ||
|
||
Partition data is passed using StructLike. Residuals are returned by residualFor(StructLike). | ||
|
||
This class is thread-safe. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1. If d > day(a) and d < day(b), the residual is always true | |
2. If d == day(a) and d != day(b), the residual is utc_timestamp >= a | |
3. if d == day(b) and d != day(a), the residual is utc_timestamp <= b | |
4. If d == day(a) == day(b), the residual is utc_timestamp >= a and utc_timestamp <= b | |
Partition data is passed using StructLike. Residuals are returned by residualFor(StructLike). | |
This class is thread-safe. | |
1. If d > day(a) and d < day(b), the residual is always true | |
2. If d == day(a) and d != day(b), the residual is utc_timestamp > a | |
3. if d == day(b) and d != day(a), the residual is utc_timestamp < b | |
4. If d == day(a) == day(b), the residual is utc_timestamp > a and utc_timestamp < b | |
Partition data is passed using StructLike. Residuals are returned by residualFor(StructLike). |
|
||
A residual expression is made by partially evaluating an expression using partition values. | ||
For example, if a table is partitioned by day(utc_timestamp) and is read with a filter expression | ||
utc_timestamp >= a and utc_timestamp <= b, then there are 4 possible residuals expressions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
utc_timestamp >= a and utc_timestamp <= b, then there are 4 possible residuals expressions | |
utc_timestamp > a and utc_timestamp < b, then there are 4 possible residuals expressions |
# if the result is not a predicate, then it must be a constant like alwaysTrue or alwaysFalse | ||
strict_result = bound | ||
|
||
if strict_result is not None and isinstance(strict_result, AlwaysTrue): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Python we can simplify this a bit:
if strict_result is not None and isinstance(strict_result, AlwaysTrue): | |
if isinstance(strict_result, AlwaysTrue): |
>>> from pyiceberg.expressions import AlwaysTrue
>>> isinstance(None, AlwaysTrue)
False
strict_result = None | ||
|
||
if strict_projection is not None: | ||
bound = strict_projection.bind(struct_to_schema(self.spec.partition_type(self.schema))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bound = strict_projection.bind(struct_to_schema(self.spec.partition_type(self.schema))) | |
bound = strict_projection.bind(struct_to_schema(self.spec.partition_type(self.schema)), case_sensitive=self.case_sensitive) |
inclusive_projection = part.transform.project(part.name, predicate) | ||
inclusive_result = None | ||
if inclusive_projection is not None: | ||
bound_inclusive = inclusive_projection.bind(struct_to_schema(self.spec.partition_type(self.schema))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bound_inclusive = inclusive_projection.bind(struct_to_schema(self.spec.partition_type(self.schema))) | |
bound_inclusive = inclusive_projection.bind(struct_to_schema(self.spec.partition_type(self.schema)), case_sensitive=self.case_sensitive) |
return predicate | ||
|
||
def visit_unbound_predicate(self, predicate: UnboundPredicate[L]) -> BooleanExpression: | ||
bound = predicate.bind(self.schema, case_sensitive=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bound = predicate.bind(self.schema, case_sensitive=True) | |
bound = predicate.bind(self.schema, case_sensitive=self.case_sensitive) |
# if the result is not a predicate, then it must be a constant like alwaysTrue or | ||
# alwaysFalse | ||
inclusive_result = bound_inclusive | ||
if inclusive_result is not None and isinstance(inclusive_result, AlwaysFalse): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if inclusive_result is not None and isinstance(inclusive_result, AlwaysFalse): | |
if isinstance(inclusive_result, AlwaysFalse): |
if isinstance(bound, BoundPredicate): | ||
bound_residual = self.visit_bound_predicate(predicate=bound) | ||
# if isinstance(bound_residual, BooleanExpression): | ||
if bound_residual not in (AlwaysFalse(), AlwaysTrue()): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if bound_residual not in (AlwaysFalse(), AlwaysTrue()): | |
if not isinstance(bound_residual, (AlwaysFalse, AlwaysTrue)): |
if len(spec.fields) != 0: | ||
return ResidualEvaluator(spec=spec, expr=expr, schema=schema, case_sensitive=case_sensitive) | ||
else: | ||
return UnpartitionedResidualEvaluator(schema=schema, expr=expr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just as style thing :)
if len(spec.fields) != 0: | |
return ResidualEvaluator(spec=spec, expr=expr, schema=schema, case_sensitive=case_sensitive) | |
else: | |
return UnpartitionedResidualEvaluator(schema=schema, expr=expr) | |
return UnpartitionedResidualEvaluator(schema=schema, expr=expr) if spec.is_unpartitioned() else ResidualEvaluator(spec=spec, expr=expr, schema=schema, case_sensitive=case_sensitive) |
if task.file.file_size_in_bytes > 512 * 1024 * 1024: | ||
target_schema = schema_to_pyarrow(self.projection()) | ||
batches = arrow_scan.to_record_batches([task]) | ||
from pyarrow import RecordBatchReader | ||
|
||
reader = RecordBatchReader.from_batches(target_schema, batches) | ||
for batch in reader: | ||
res += batch.num_rows | ||
else: | ||
tbl = arrow_scan.to_table([task]) | ||
res += len(tbl) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's keep it simple for now, I don't think we cover the other case in a test
if task.file.file_size_in_bytes > 512 * 1024 * 1024: | |
target_schema = schema_to_pyarrow(self.projection()) | |
batches = arrow_scan.to_record_batches([task]) | |
from pyarrow import RecordBatchReader | |
reader = RecordBatchReader.from_batches(target_schema, batches) | |
for batch in reader: | |
res += batch.num_rows | |
else: | |
tbl = arrow_scan.to_table([task]) | |
res += len(tbl) | |
tbl = arrow_scan.to_table([task]) | |
res += len(tbl) |
closes issue: Count rows as a metadata-only operation #1223