You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
From @dongsup kim on slack,
"""
Hi team, There have been occasional reports from internal users that the number of records retrieved when loading data with PyIceberg sometimes differs. Since it is difficult to determine the specific circumstances under which this occurs, reproducing the issue has been challenging. What kind of logging should be implemented to help identify the root cause when the issue arises?
"""
Git commit: a051584a3684392d2db6556449eb299145d47d15 (pyiceberg-0.8.1 tag)
def to_table(self, tasks: Iterable[FileScanTask]) -> pa.Table:
"""Scan the Iceberg table and return a pa.Table.
Returns a pa.Table with data from the Iceberg table by resolving the
right columns that match the current table schema. Only data that
matches the provided row_filter expression is returned.
Args:
tasks: FileScanTasks representing the data files and delete files to read from.
Returns:
A PyArrow table. Total number of rows will be capped if specified.
Raises:
ResolveError: When a required field cannot be found in the file
ValueError: When a field type in the file cannot be projected to the schema type
"""
deletes_per_file = _read_all_delete_files(self._fs, tasks)
executor = ExecutorFactory.get_or_create()
def _table_from_scan_task(task: FileScanTask) -> pa.Table:
batches = list(self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file))
if len(batches) > 0:
return pa.Table.from_batches(batches)
else:
return None
futures = [
executor.submit(
_table_from_scan_task,
task,
)
for task in tasks
]
logger.info(f"Number of tasks: {len(tasks)} Number of Futures: {len(futures)}")
total_row_count = 0
# for consistent ordering, we need to maintain future order
futures_index = {f: i for i, f in enumerate(futures)}
completed_futures: SortedList[Future[pa.Table]] = SortedList(iterable=[], key=lambda f: futures_index[f])
for future in concurrent.futures.as_completed(futures):
completed_futures.add(future)
if table_result := future.result():
total_row_count += len(table_result)
# stop early if limit is satisfied
if self._limit is not None and total_row_count >= self._limit:
break
# by now, we've either completed all tasks or satisfied the limit
if self._limit is not None:
_ = [f.cancel() for f in futures if not f.done()]
tables = [f.result() for f in completed_futures if f.result()]
if len(tables) < 1:
return pa.Table.from_batches([], schema=schema_to_pyarrow(self._projected_schema, include_field_ids=False))
result = pa.concat_tables(tables, promote_options="permissive")
if self._limit is not None:
return result.slice(0, self._limit)
logger.info(f"total_row_count: {total_row_count}, len(tables): {len(tables)} len(completed_futures): {len(completed_futures)}")
logger.info([(i, t.num_rows) for i, t in enumerate(tables)])
logger.info([(i, t.file.file_path) for i, t in enumerate(tasks)])
return result
"""
Is the tasks variable in the to_table() function of the ArrowScan class non-deterministic?
While debugging, I observed that applying the same row_filter to the same table sometimes results in a different number of tasks. In cases where data loss occurs, I noticed that the number of table_result objects retrieved via multiprocessing varies.
"""
Apache Iceberg version
None
Please describe the bug 🐞
From
@dongsup kim
on slack,"""
Hi team, There have been occasional reports from internal users that the number of records retrieved when loading data with PyIceberg sometimes differs. Since it is difficult to determine the specific circumstances under which this occurs, reproducing the issue has been challenging. What kind of logging should be implemented to help identify the root cause when the issue arises?
"""
"""
Is the tasks variable in the to_table() function of the ArrowScan class non-deterministic?
While debugging, I observed that applying the same row_filter to the same table sometimes results in a different number of tasks. In cases where data loss occurs, I noticed that the number of table_result objects retrieved via multiprocessing varies.
"""
Willingness to contribute
The text was updated successfully, but these errors were encountered: