47
47
from typing_extensions import Annotated
48
48
49
49
import pyiceberg .expressions .parser as parser
50
- import pyiceberg .expressions .visitors as visitors
51
50
from pyiceberg .exceptions import CommitFailedException , ResolveError , ValidationError
52
51
from pyiceberg .expressions import (
53
52
AlwaysTrue ,
56
55
EqualTo ,
57
56
Reference ,
58
57
)
58
+ from pyiceberg .expressions .visitors import (
59
+ _InclusiveMetricsEvaluator ,
60
+ expression_evaluator ,
61
+ inclusive_projection ,
62
+ manifest_evaluator ,
63
+ )
59
64
from pyiceberg .io import FileIO , load_file_io
60
65
from pyiceberg .manifest import (
61
66
POSITIONAL_DELETE_SCHEMA ,
@@ -1445,9 +1450,7 @@ def _match_deletes_to_data_file(data_entry: ManifestEntry, positional_delete_ent
1445
1450
relevant_entries = positional_delete_entries [positional_delete_entries .bisect_right (data_entry ) :]
1446
1451
1447
1452
if len (relevant_entries ) > 0 :
1448
- evaluator = visitors ._InclusiveMetricsEvaluator (
1449
- POSITIONAL_DELETE_SCHEMA , EqualTo ("file_path" , data_entry .data_file .file_path )
1450
- )
1453
+ evaluator = _InclusiveMetricsEvaluator (POSITIONAL_DELETE_SCHEMA , EqualTo ("file_path" , data_entry .data_file .file_path ))
1451
1454
return {
1452
1455
positional_delete_entry .data_file
1453
1456
for positional_delete_entry in relevant_entries
@@ -1471,7 +1474,7 @@ def __init__(
1471
1474
super ().__init__ (table , row_filter , selected_fields , case_sensitive , snapshot_id , options , limit )
1472
1475
1473
1476
def _build_partition_projection (self , spec_id : int ) -> BooleanExpression :
1474
- project = visitors . inclusive_projection (self .table .schema (), self .table .specs ()[spec_id ])
1477
+ project = inclusive_projection (self .table .schema (), self .table .specs ()[spec_id ])
1475
1478
return project (self .row_filter )
1476
1479
1477
1480
@cached_property
@@ -1480,7 +1483,7 @@ def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]:
1480
1483
1481
1484
def _build_manifest_evaluator (self , spec_id : int ) -> Callable [[ManifestFile ], bool ]:
1482
1485
spec = self .table .specs ()[spec_id ]
1483
- return visitors . manifest_evaluator (spec , self .table .schema (), self .partition_filters [spec_id ], self .case_sensitive )
1486
+ return manifest_evaluator (spec , self .table .schema (), self .partition_filters [spec_id ], self .case_sensitive )
1484
1487
1485
1488
def _build_partition_evaluator (self , spec_id : int ) -> Callable [[DataFile ], bool ]:
1486
1489
spec = self .table .specs ()[spec_id ]
@@ -1491,9 +1494,7 @@ def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool]
1491
1494
# The lambda created here is run in multiple threads.
1492
1495
# So we avoid creating _EvaluatorExpression methods bound to a single
1493
1496
# shared instance across multiple threads.
1494
- return lambda data_file : visitors .expression_evaluator (partition_schema , partition_expr , self .case_sensitive )(
1495
- data_file .partition
1496
- )
1497
+ return lambda data_file : expression_evaluator (partition_schema , partition_expr , self .case_sensitive )(data_file .partition )
1497
1498
1498
1499
def _check_sequence_number (self , min_data_sequence_number : int , manifest : ManifestFile ) -> bool :
1499
1500
"""Ensure that no manifests are loaded that contain deletes that are older than the data.
@@ -1538,7 +1539,7 @@ def plan_files(self) -> Iterable[FileScanTask]:
1538
1539
# this filter depends on the partition spec used to write the manifest file
1539
1540
1540
1541
partition_evaluators : Dict [int , Callable [[DataFile ], bool ]] = KeyDefaultDict (self ._build_partition_evaluator )
1541
- metrics_evaluator = visitors . _InclusiveMetricsEvaluator (
1542
+ metrics_evaluator = _InclusiveMetricsEvaluator (
1542
1543
self .table .schema (), self .row_filter , self .case_sensitive , self .options .get ("include_empty_files" ) == "true"
1543
1544
).eval
1544
1545
0 commit comments