Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Explore potential issue with scan returning the incorrect results #1506

Open
3 tasks
kevinjqliu opened this issue Jan 10, 2025 · 4 comments
Open
3 tasks

Explore potential issue with scan returning the incorrect results #1506

kevinjqliu opened this issue Jan 10, 2025 · 4 comments

Comments

@kevinjqliu
Copy link
Contributor

kevinjqliu commented Jan 10, 2025

Apache Iceberg version

None

Please describe the bug 🐞

From @dongsup kim on slack,
"""
Hi team, There have been occasional reports from internal users that the number of records retrieved when loading data with PyIceberg sometimes differs. Since it is difficult to determine the specific circumstances under which this occurs, reproducing the issue has been challenging. What kind of logging should be implemented to help identify the root cause when the issue arises?
"""

Git commit: a051584a3684392d2db6556449eb299145d47d15 (pyiceberg-0.8.1 tag)
    def to_table(self, tasks: Iterable[FileScanTask]) -> pa.Table:
        """Scan the Iceberg table and return a pa.Table.

        Returns a pa.Table with data from the Iceberg table by resolving the
        right columns that match the current table schema. Only data that
        matches the provided row_filter expression is returned.

        Args:
            tasks: FileScanTasks representing the data files and delete files to read from.

        Returns:
            A PyArrow table. Total number of rows will be capped if specified.

        Raises:
            ResolveError: When a required field cannot be found in the file
            ValueError: When a field type in the file cannot be projected to the schema type
        """
        deletes_per_file = _read_all_delete_files(self._fs, tasks)
        executor = ExecutorFactory.get_or_create()

        def _table_from_scan_task(task: FileScanTask) -> pa.Table:
            batches = list(self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file))
            if len(batches) > 0:
                return pa.Table.from_batches(batches)
            else:
                return None

        futures = [
            executor.submit(
                _table_from_scan_task,
                task,
            )
            for task in tasks
        ]
        logger.info(f"Number of tasks: {len(tasks)} Number of Futures: {len(futures)}")
        total_row_count = 0
        # for consistent ordering, we need to maintain future order
        futures_index = {f: i for i, f in enumerate(futures)}
        completed_futures: SortedList[Future[pa.Table]] = SortedList(iterable=[], key=lambda f: futures_index[f])
        for future in concurrent.futures.as_completed(futures):
            completed_futures.add(future)
            if table_result := future.result():
                total_row_count += len(table_result)
            # stop early if limit is satisfied
            if self._limit is not None and total_row_count >= self._limit:
                break
        # by now, we've either completed all tasks or satisfied the limit
        if self._limit is not None:
            _ = [f.cancel() for f in futures if not f.done()]

        tables = [f.result() for f in completed_futures if f.result()]

        if len(tables) < 1:
            return pa.Table.from_batches([], schema=schema_to_pyarrow(self._projected_schema, include_field_ids=False))

        result = pa.concat_tables(tables, promote_options="permissive")

        if self._limit is not None:
            return result.slice(0, self._limit)

        logger.info(f"total_row_count: {total_row_count}, len(tables): {len(tables)} len(completed_futures): {len(completed_futures)}")
        logger.info([(i, t.num_rows) for i, t in enumerate(tables)])
        logger.info([(i, t.file.file_path) for i, t in enumerate(tasks)])
        return result

"""
Is the tasks variable in the to_table() function of the ArrowScan class non-deterministic?
While debugging, I observed that applying the same row_filter to the same table sometimes results in a different number of tasks. In cases where data loss occurs, I noticed that the number of table_result objects retrieved via multiprocessing varies.
"""

2025-01-10 18:34:26 - pyiceberg.io.pyarrow - INFO - log init
2025-01-10 18:34:26 - pyiceberg.io - INFO - Loaded FileIO: pyiceberg.io.pyarrow.PyArrowFileIO
2025-01-10 18:36:02 - pyiceberg.io.pyarrow - INFO - total_row_count: 100000, len(completed_futures): 114
2025-01-10 18:36:02 - pyiceberg.io.pyarrow - INFO - [(0, 186), (1, 303), (2, 861), (3, 832), (4, 849), (5, 922), (6, 933), (7, 911), (8, 903), (9, 859), (10, 882), (11, 898), (12, 890), (13, 809), (14, 855), (15, 908), (16, 929), (17, 852), (18, 898), (19, 857), (20, 935), (21, 840), (22, 849), (23, 853), (24, 893), (25, 929), (26, 877), (27, 905), (28, 917), (29, 904), (30, 868), (31, 883), (32, 857), (33, 896), (34, 905), (35, 899), (36, 885), (37, 878), (38, 855), (39, 921), (40, 891), (41, 933), (42, 830), (43, 857), (44, 887), (45, 909), (46, 883), (47, 903), (48, 860), (49, 907), (50, 878), (51, 937), (52, 870), (53, 884), (54, 877), (55, 926), (56, 891), (57, 861), (58, 920), (59, 922), (60, 880), (61, 892), (62, 922), (63, 885), (64, 854), (65, 882), (66, 892), (67, 870), (68, 881), (69, 876), (70, 844), (71, 858), (72, 921), (73, 948), (74, 898), (75, 910), (76, 852), (77, 971), (78, 899), (79, 887), (80, 915), (81, 874), (82, 906), (83, 930), (84, 874), (85, 872), (86, 871), (87, 877), (88, 891), (89, 820), (90, 877), (91, 876), (92, 928), (93, 859), (94, 878), (95, 884), (96, 954), (97, 856), (98, 924), (99, 859), (100, 914), (101, 892), (102, 889), (103, 886), (104, 882), (105, 915), (106, 862), (107, 907), (108, 886), (109, 837), (110, 910), (111, 963), (112, 926), (113, 872)] # tables row index and count

2025-01-10 18:38:32 - pyiceberg.io.pyarrow - INFO - total_row_count: 99126, len(completed_futures): 116
2025-01-10 18:38:32 - pyiceberg.io.pyarrow - INFO - [(0, 186), (1, 303), (2, 861), (3, 832), (4, 849), (5, 922), (6, 933), (7, 911), (8, 903), (9, 859), (10, 882), (11, 898), (12, 890), (13, 809), (14, 855), (15, 908), (16, 929), (17, 852), (18, 898), (19, 857), (20, 935), (21, 840), (22, 849), (23, 853), (24, 893), (25, 929), (26, 877), (27, 905), (28, 917), (29, 904), (30, 868), (31, 883), (32, 857), (33, 896), (34, 905), (35, 899), (36, 885), (37, 878), (38, 855), (39, 921), (40, 891), (41, 933), (42, 830), (43, 857), (44, 887), (45, 909), (46, 883), (47, 903), (48, 860), (49, 907), (50, 878), (51, 937), (52, 870), (53, 884), (54, 877), (55, 926), (56, 891), (57, 861), (58, 920), (59, 922), (60, 880), (61, 892), (62, 922), (63, 885), (64, 854), (65, 882), (66, 892), (67, 870), (68, 881), (69, 876), (70, 844), (71, 858), (72, 921), (73, 948), (74, 898), (75, 910), (76, 852), (77, 971), (78, 899), (79, 887), (80, 915), (81, 906), (82, 930), (83, 874), (84, 872), (85, 871), (86, 877), (87, 891), (88, 820), (89, 877), (90, 876), (91, 928), (92, 859), (93, 878), (94, 884), (95, 954), (96, 856), (97, 924), (98, 859), (99, 914), (100, 892), (101, 889), (102, 886), (103, 882), (104, 915), (105, 862), (106, 907), (107, 886), (108, 837), (109, 910), (110, 963), (111, 926), (112, 872)] # tables row index and count

2025-01-10 19:12:18 - pyiceberg.io.pyarrow - INFO - log init
2025-01-10 19:12:18 - pyiceberg.io - INFO - Loaded FileIO: pyiceberg.io.pyarrow.PyArrowFileIO
2025-01-10 19:14:36 - pyiceberg.io.pyarrow - INFO - Number of tasks: 115 Number of Futures: 115
2025-01-10 19:16:26 - pyiceberg.io.pyarrow - INFO - total_row_count: 100000, len(completed_futures): 115
2025-01-10 19:16:26 - pyiceberg.io.pyarrow - INFO - [(0, 186), (1, 303), (2, 861), (3, 832), (4, 849), (5, 922), (6, 933), (7, 911), (8, 903), (9, 859), (10, 882), (11, 898), (12, 890), (13, 809), (14, 855), (15, 908), (16, 929), (17, 852), (18, 898), (19, 857), (20, 935), (21, 840), (22, 849), (23, 853), (24, 893), (25, 929), (26, 877), (27, 905), (28, 917), (29, 904), (30, 868), (31, 883), (32, 857), (33, 896), (34, 905), (35, 899), (36, 885), (37, 878), (38, 855), (39, 921), (40, 891), (41, 933), (42, 830), (43, 857), (44, 887), (45, 909), (46, 883), (47, 903), (48, 860), (49, 907), (50, 878), (51, 937), (52, 870), (53, 884), (54, 877), (55, 926), (56, 891), (57, 861), (58, 920), (59, 922), (60, 880), (61, 892), (62, 922), (63, 885), (64, 854), (65, 882), (66, 892), (67, 870), (68, 881), (69, 876), (70, 844), (71, 858), (72, 921), (73, 948), (74, 898), (75, 910), (76, 852), (77, 971), (78, 899), (79, 887), (80, 915), (81, 874), (82, 906), (83, 930), (84, 874), (85, 872), (86, 871), (87, 877), (88, 891), (89, 820), (90, 877), (91, 876), (92, 928), (93, 859), (94, 878), (95, 884), (96, 954), (97, 856), (98, 924), (99, 859), (100, 914), (101, 892), (102, 889), (103, 886), (104, 882), (105, 915), (106, 862), (107, 907), (108, 886), (109, 837), (110, 910), (111, 963), (112, 926), (113, 872)] # tables row index and count

Willingness to contribute

  • I can contribute a fix for this bug independently
  • I would be willing to contribute a fix for this bug with guidance from the Iceberg community
  • I cannot contribute a fix for this bug at this time
@kevinjqliu
Copy link
Contributor Author

It looks like the results are sample data, it would be great if you can provide the source table so i can try to reproduce it

@kevinjqliu
Copy link
Contributor Author

From the log above, presumably ran with the same table, snapshot, and filter.

total_row_count: 100000, len(completed_futures): 114
total_row_count: 99126, len(completed_futures): 116
total_row_count: 100000, len(completed_futures): 115

@kevinjqliu
Copy link
Contributor Author

In the second example, total_row_count is 99126, which means we didnt hit the 10_000 limit and exit early..

@dongsupkim-onepredict
Copy link

It looks like the results are sample data, it would be great if you can provide the source table so i can try to reproduce it

The source table has a very large size. How can it be provided effectively?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants