Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nemo_curator.utils.distributed_utils.read_data doesn't work for my own parquet dataset unless cleaning text by myself #482

Open
RickyShi46 opened this issue Jan 16, 2025 · 0 comments
Assignees
Labels
bug Something isn't working

Comments

@RickyShi46
Copy link

RickyShi46 commented Jan 16, 2025

Describe the bug

I encountered the following bug when using our own Parquet dataset with nemo_curator.utils.distributed_utils.read_data and nemo_curator.AddId operations, following the approach outlined in this tutorial.
However, when I manually clean the data using ftfy before performing the read_data and AddId operations, the process completes successfully. I am aware that you also have data cleaning methods such as nemo_curator.Modify( UnicodeReformatter(), text_field=input_text_field ), but this similarly requires performing the read_data operation on the dataset first.

root@dask-nemo-pure-text-0:/lpai/volumes/lmp-guan/sy/11-26-dedup-test# python3 dedup-en-dataset.py
Num Workers = 2
Allowed failures: 100
Reading 8 files
Writing to disk complete for 8 partitions
Reading 8 files
Writing to disk complete for 8 partitions
Reading 8 files
Writing to disk complete for 8 partitions
Reading 8 files
Writing to disk complete for 8 partitions
Reading 8 files
Writing to disk complete for 8 partitions
Reading 8 files
Traceback (most
recent call last):
File "/mnt/volumes/lmp-guan/sy/11-26-dedup-test/dedup-en-dataset.py", line 96, in <module>
main()
File "/mnt/volumes/lmp-guan/sy/11-26-dedup-test/dedup-en-dataset.py", line 66, in main
id_dataset.to_json(id_data_dir, write_to_filename=True)
File_"/opt/NeMo-Curator/nemo_curator/datasets/doc_dataset.pyy", line 103, in to_json
write_to_disk(
File "/opt/NeMo-Curator/nemo_curator/utils/distributed_utils.py", line 514, in write_to_disk
output = output.compute()
File "/usr/local/lib/python3.10/dist-packages/dask/base.py", line 376, in compute
(result,) = compute(self, traverse=False, **kwargs)
File "/usr/local/lib/python3.10/dist-packages/dask/base.py", line 662, in compute
results = schedule(dsk, keys, **kwargs)
File "/usr/local/lib/python3.10/dist-packages/distributed/client.py", line 2423, in _gather
raise exception.with_traceback(traceback)
distributed.scheduler.KilledWorker: Attempted to runtask ('single_partition_write_with_filename-f6c1765edbf5a25f2b
0fe44f721022b9', 0) on 4 different workers, bbut all those workers died while running it.The last worker that attem
pt to run the task was tcp://127.0.0.1:37323. Inspecting worrker logs is often a good next step to diagnose what wen
t wrong. For more information see https://diistributed.dask.org/en/stable/killed.html.

The terminal that starts the dask worker will show the error :

root@dask-nemo-pure-text-0:/lpai# dask worker localhost:8786 --local-directory /lpai/volumes/lmp-guan/dask-tmp --nworkers=2
2024-12-12 07:33:01,905 - distributed.nanny - INFO -         Start Nanny at: 'tcp://127.0.0.1:44623'
2024-12-12 07:33:01,909 - distributed.nanny - INFO -         Start Nanny at: 'tcp://127.0.0.1:34143'
2024-12-12 07:33:02,704 - distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:44687
2024-12-12 07:33:02,704 - distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:40197
2024-12-12 07:33:02,704 - distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:44687
2024-12-12 07:33:02,704 - distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:40197
2024-12-12 07:33:02,704 - distributed.worker - INFO -          dashboard at:            127.0.0.1:41285
2024-12-12 07:33:02,704 - distributed.worker - INFO -          dashboard at:            127.0.0.1:35369
2024-12-12 07:33:02,704 - distributed.worker - INFO - Waiting to connect to:       tcp://localhost:8786
2024-12-12 07:33:02,704 - distributed.worker - INFO - Waiting to connect to:       tcp://localhost:8786
2024-12-12 07:33:02,704 - distributed.worker - INFO - -------------------------------------------------
2024-12-12 07:33:02,704 - distributed.worker - INFO - -------------------------------------------------
2024-12-12 07:33:02,704 - distributed.worker - INFO -               Threads:                         35
2024-12-12 07:33:02,704 - distributed.worker - INFO -               Threads:                         35
2024-12-12 07:33:02,704 - distributed.worker - INFO -                Memory:                 160.00 GiB
2024-12-12 07:33:02,704 - distributed.worker - INFO -       Local Directory: /lpai/volumes/lmp-guan/dask-tmp/dask-scratch-space/worker-mwqve110
2024-12-12 07:33:02,704 - distributed.worker - INFO -                Memory:                 160.00 GiB
2024-12-12 07:33:02,704 - distributed.worker - INFO -       Local Directory: /lpai/volumes/lmp-guan/dask-tmp/dask-scratch-space/worker-75nl0lei
2024-12-12 07:33:02,704 - distributed.worker - INFO - -------------------------------------------------
2024-12-12 07:33:02,704 - distributed.worker - INFO - -------------------------------------------------
2024-12-12 07:33:03,280 - distributed.worker - INFO - Starting Worker plugin shuffle
2024-12-12 07:33:03,281 - distributed.worker - INFO -         Registered to:       tcp://localhost:8786
2024-12-12 07:33:03,281 - distributed.worker - INFO - -------------------------------------------------
2024-12-12 07:33:03,281 - distributed.core - INFO - Starting established connection to tcp://localhost:8786
2024-12-12 07:33:03,290 - distributed.worker - INFO - Starting Worker plugin shuffle
2024-12-12 07:33:03,291 - distributed.worker - INFO -         Registered to:       tcp://localhost:8786
2024-12-12 07:33:03,291 - distributed.worker - INFO - -------------------------------------------------
2024-12-12 07:33:03,292 - distributed.core - INFO - Starting established connection to tcp://localhost:8786
2024-12-12 07:33:33,166 - distributed.core - INFO - Event loop was unresponsive in Worker for 5.63s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:33:47,127 - distributed.core - INFO - Event loop was unresponsive in Worker for 6.94s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:33:53,803 - distributed.core - INFO - Event loop was unresponsive in Worker for 6.12s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:34:18,915 - distributed.core - INFO - Event loop was unresponsive in Worker for 4.16s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:34:22,204 - distributed.core - INFO - Event loop was unresponsive in Worker for 3.29s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:34:25,273 - distributed.core - INFO - Event loop was unresponsive in Worker for 5.15s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:34:32,641 - distributed.core - INFO - Event loop was unresponsive in Worker for 10.44s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:34:40,474 - distributed.core - INFO - Event loop was unresponsive in Worker for 15.20s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:35:10,018 - distributed.core - INFO - Event loop was unresponsive in Worker for 3.67s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:35:12,616 - distributed.core - INFO - Event loop was unresponsive in Worker for 3.62s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:37:13,644 - distributed.core - INFO - Event loop was unresponsive in Worker for 123.63s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:38:22,558 - distributed.core - INFO - Event loop was unresponsive in Worker for 189.94s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:38:34,465 - distributed.core - INFO - Event loop was unresponsive in Worker for 80.82s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:38:47,169 - distributed.core - INFO - Event loop was unresponsive in Worker for 24.30s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:39:40,485 - distributed.core - INFO - Event loop was unresponsive in Worker for 3.57s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:39:47,336 - distributed.core - INFO - Event loop was unresponsive in Worker for 6.85s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:39:49,675 - distributed.core - INFO - Event loop was unresponsive in Worker for 10.50s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:39:50,510 - distributed.core - INFO - Event loop was unresponsive in Worker for 3.17s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:42:44,484 - distributed.core - INFO - Event loop was unresponsive in Worker for 144.75s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:42:55,764 - distributed.core - INFO - Event loop was unresponsive in Worker for 11.28s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:43:11,145 - distributed.core - INFO - Event loop was unresponsive in Worker for 171.46s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:43:13,438 - distributed.core - INFO - Event loop was unresponsive in Worker for 17.67s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:43:20,823 - distributed.core - INFO - Event loop was unresponsive in Worker for 9.68s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:43:37,805 - distributed.core - INFO - Event loop was unresponsive in Worker for 16.64s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:43:41,748 - distributed.core - INFO - Event loop was unresponsive in Worker for 3.94s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:43:45,435 - distributed.core - INFO - Event loop was unresponsive in Worker for 32.00s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:44:09,619 - distributed.core - INFO - Event loop was unresponsive in Worker for 3.03s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:44:39,284 - distributed.core - INFO - Event loop was unresponsive in Worker for 3.12s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:44:45,625 - distributed.core - INFO - Event loop was unresponsive in Worker for 7.96s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:44:49,009 - distributed.core - INFO - Event loop was unresponsive in Worker for 9.72s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:45:26,668 - distributed.core - INFO - Event loop was unresponsive in Worker for 11.00s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:45:28,544 - distributed.core - INFO - Event loop was unresponsive in Worker for 10.66s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:45:52,904 - distributed.core - INFO - Event loop was unresponsive in Worker for 24.36s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:48:43,041 - distributed.core - INFO - Event loop was unresponsive in Worker for 196.24s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:48:45,861 - distributed.core - INFO - Event loop was unresponsive in Worker for 172.96s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:48:46,152 - distributed.core - INFO - Event loop was unresponsive in Worker for 3.11s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:49:08,403 - distributed.core - INFO - Event loop was unresponsive in Worker for 3.12s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:49:36,473 - distributed.core - INFO - Event loop was unresponsive in Worker for 3.20s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:49:41,890 - distributed.core - INFO - Event loop was unresponsive in Worker for 7.83s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:49:46,064 - distributed.core - INFO - Event loop was unresponsive in Worker for 9.59s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:50:48,730 - distributed.core - INFO - Event loop was unresponsive in Worker for 34.86s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:52:18,330 - distributed.core - INFO - Event loop was unresponsive in Worker for 123.92s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:53:12,134 - distributed.core - INFO - Event loop was unresponsive in Worker for 143.40s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:53:18,679 - distributed.core - INFO - Event loop was unresponsive in Worker for 60.35s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:53:29,038 - distributed.core - INFO - Event loop was unresponsive in Worker for 16.81s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:53:32,511 - distributed.core - INFO - Event loop was unresponsive in Worker for 3.47s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:53:35,430 - distributed.core - INFO - Event loop was unresponsive in Worker for 16.47s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:53:41,302 - distributed.core - INFO - Event loop was unresponsive in Worker for 5.87s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:54:30,260 - distributed.core - INFO - Event loop was unresponsive in Worker for 3.14s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:54:39,575 - distributed.core - INFO - Event loop was unresponsive in Worker for 9.32s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:54:40,868 - distributed.core - INFO - Event loop was unresponsive in Worker for 8.15s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:56:27,921 - distributed.core - INFO - Event loop was unresponsive in Worker for 80.94s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:56:42,723 - distributed.core - INFO - Event loop was unresponsive in Worker for 94.91s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:58:18,566 - distributed.core - INFO - Event loop was unresponsive in Worker for 95.84s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:58:28,999 - distributed.core - INFO - Event loop was unresponsive in Worker for 121.08s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:58:50,769 - distributed.core - INFO - Event loop was unresponsive in Worker for 3.22s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:59:26,358 - distributed.core - INFO - Event loop was unresponsive in Worker for 5.94s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:59:26,472 - distributed.core - INFO - Event loop was unresponsive in Worker for 8.78s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 08:00:27,865 - distributed.core - INFO - Event loop was unresponsive in Worker for 34.24s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 08:02:14,697 - distributed.core - INFO - Event loop was unresponsive in Worker for 142.05s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 08:02:39,516 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:44687 (pid=37600) exceeded 95% memory budget. Restarting...
2024-12-12 08:02:47,441 - distributed.nanny - INFO - Worker process 37600 was killed by signal 15
2024-12-12 08:02:47,444 - distributed.nanny - WARNING - Restarting worker
2024-12-12 08:02:48,237 - distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:38627
2024-12-12 08:02:48,237 - distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:38627
2024-12-12 08:02:48,237 - distributed.worker - INFO -          dashboard at:            127.0.0.1:36669
2024-12-12 08:02:48,237 - distributed.worker - INFO - Waiting to connect to:       tcp://localhost:8786
2024-12-12 08:02:48,237 - distributed.worker - INFO - -------------------------------------------------
2024-12-12 08:02:48,237 - distributed.worker - INFO -               Threads:                         35
2024-12-12 08:02:48,237 - distributed.worker - INFO -                Memory:                 160.00 GiB
2024-12-12 08:02:48,237 - distributed.worker - INFO -       Local Directory: /lpai/volumes/lmp-guan/dask-tmp/dask-scratch-space/worker-bge05ma9
2024-12-12 08:02:48,237 - distributed.worker - INFO - -------------------------------------------------
2024-12-12 08:02:48,764 - distributed.worker - INFO - Starting Worker plugin shuffle
2024-12-12 08:02:48,765 - distributed.worker - INFO -         Registered to:       tcp://localhost:8786
2024-12-12 08:02:48,765 - distributed.worker - INFO - -------------------------------------------------
2024-12-12 08:02:48,765 - distributed.core - INFO - Starting established connection to tcp://localhost:8786
2024-12-12 08:03:07,511 - distributed.core - INFO - Event loop was unresponsive in Worker for 52.81s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 08:03:37,503 - distributed.core - INFO - Event loop was unresponsive in Worker for 3.01s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 08:04:13,292 - distributed.core - INFO - Event loop was unresponsive in Worker for 35.79s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 08:06:27,215 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:40197 (pid=37596) exceeded 95% memory budget. Restarting...
2024-12-12 08:06:35,250 - distributed.nanny - INFO - Worker process 37596 was killed by signal 15
2024-12-12 08:06:35,253 - distributed.nanny - WARNING - Restarting worker
2024-12-12 08:06:36,041 - distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:37323
2024-12-12 08:06:36,042 - distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:37323
2024-12-12 08:06:36,042 - distributed.worker - INFO -          dashboard at:            127.0.0.1:42437
2024-12-12 08:06:36,042 - distributed.worker - INFO - Waiting to connect to:       tcp://localhost:8786
2024-12-12 08:06:36,042 - distributed.worker - INFO - -------------------------------------------------
2024-12-12 08:06:36,042 - distributed.worker - INFO -               Threads:                         35
2024-12-12 08:06:36,042 - distributed.worker - INFO -                Memory:                 160.00 GiB
2024-12-12 08:06:36,042 - distributed.worker - INFO -       Local Directory: /lpai/volumes/lmp-guan/dask-tmp/dask-scratch-space/worker-csw22aqa
2024-12-12 08:06:36,042 - distributed.worker - INFO - -------------------------------------------------
2024-12-12 08:06:36,594 - distributed.worker - INFO - Starting Worker plugin shuffle
2024-12-12 08:06:36,594 - distributed.worker - INFO -         Registered to:       tcp://localhost:8786
2024-12-12 08:06:36,594 - distributed.worker - INFO - -------------------------------------------------
2024-12-12 08:06:36,595 - distributed.core - INFO - Starting established connection to tcp://localhost:8786
2024-12-12 08:06:40,979 - distributed.core - INFO - Event loop was unresponsive in Worker for 5.75s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 08:07:37,205 - distributed.core - INFO - Event loop was unresponsive in Worker for 4.76s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 08:12:36,214 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:38627 (pid=41421) exceeded 95% memory budget. Restarting...
2024-12-12 08:12:43,946 - distributed.nanny - INFO - Worker process 41421 was killed by signal 15
2024-12-12 08:12:43,949 - distributed.nanny - WARNING - Restarting worker
2024-12-12 08:12:44,764 - distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:39201
2024-12-12 08:12:44,764 - distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:39201
2024-12-12 08:12:44,764 - distributed.worker - INFO -          dashboard at:            127.0.0.1:39927
2024-12-12 08:12:44,764 - distributed.worker - INFO - Waiting to connect to:       tcp://localhost:8786
2024-12-12 08:12:44,765 - distributed.worker - INFO - -------------------------------------------------
2024-12-12 08:12:44,765 - distributed.worker - INFO -               Threads:                         35
2024-12-12 08:12:44,765 - distributed.worker - INFO -                Memory:                 160.00 GiB
2024-12-12 08:12:44,765 - distributed.worker - INFO -       Local Directory: /lpai/volumes/lmp-guan/dask-tmp/dask-scratch-space/worker-ujfq8kpc
2024-12-12 08:12:44,765 - distributed.worker - INFO - -------------------------------------------------
2024-12-12 08:12:45,237 - distributed.worker - INFO - Starting Worker plugin shuffle
2024-12-12 08:12:45,237 - distributed.worker - INFO -         Registered to:       tcp://localhost:8786
2024-12-12 08:12:45,237 - distributed.worker - INFO - -------------------------------------------------
2024-12-12 08:12:45,238 - distributed.core - INFO - Starting established connection to tcp://localhost:8786
2024-12-12 08:12:49,594 - distributed.core - INFO - Event loop was unresponsive in Worker for 5.66s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 08:14:14,003 - distributed.core - INFO - Event loop was unresponsive in Worker for 33.91s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 08:19:52,014 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:37323 (pid=41866) exceeded 95% memory budget. Restarting...
2024-12-12 08:20:00,211 - distributed.nanny - INFO - Worker process 41866 was killed by signal 15
2024-12-12 08:20:00,213 - distributed.nanny - WARNING - Restarting worker
2024-12-12 08:20:01,044 - distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:41803
2024-12-12 08:20:01,044 - distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:41803
2024-12-12 08:20:01,044 - distributed.worker - INFO -          dashboard at:            127.0.0.1:37033
2024-12-12 08:20:01,044 - distributed.worker - INFO - Waiting to connect to:       tcp://localhost:8786
2024-12-12 08:20:01,044 - distributed.worker - INFO - -------------------------------------------------
2024-12-12 08:20:01,044 - distributed.worker - INFO -               Threads:                         35
2024-12-12 08:20:01,044 - distributed.worker - INFO -                Memory:                 160.00 GiB
2024-12-12 08:20:01,044 - distributed.worker - INFO -       Local Directory: /lpai/volumes/lmp-guan/dask-tmp/dask-scratch-space/worker-_4xuy8xz
2024-12-12 08:20:01,044 - distributed.worker - INFO - -------------------------------------------------
2024-12-12 08:20:01,507 - distributed.worker - INFO - Starting Worker plugin shuffle
2024-12-12 08:20:01,508 - distributed.worker - INFO -         Registered to:       tcp://localhost:8786
2024-12-12 08:20:01,508 - distributed.worker - INFO - -------------------------------------------------
2024-12-12 08:20:01,508 - distributed.core - INFO - Starting established connection to tcp://localhost:8786
2024-12-12 08:20:05,951 - distributed.core - INFO - Event loop was unresponsive in Worker for 5.75s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 08:20:36,095 - distributed.core - INFO - Event loop was unresponsive in Worker for 4.20s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 08:22:03,164 - distributed.core - INFO - Event loop was unresponsive in Worker for 87.07s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.

The terminal that starts the dask scheduler will show the following error:

root@dask-nemo-pure-text-0:/lpai# dask scheduler
2024-12-12 07:32:56,541 - distributed.scheduler - INFO - -----------------------------------------------
2024-12-12 07:32:56,859 - distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
2024-12-12 07:32:56,911 - distributed.scheduler - INFO - State start
2024-12-12 07:32:56,926 - distributed.scheduler - INFO - -----------------------------------------------
2024-12-12 07:32:56,927 - distributed.scheduler - INFO -   Scheduler at: tcp://172.28.129.176:8786
2024-12-12 07:32:56,928 - distributed.scheduler - INFO -   dashboard at:  http://172.28.129.176:8787/status
2024-12-12 07:32:56,928 - distributed.scheduler - INFO - Registering Worker plugin shuffle
2024-12-12 07:33:03,276 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:44687', status: init, memory: 0, processing: 0>
2024-12-12 07:33:03,280 - distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:44687
2024-12-12 07:33:03,280 - distributed.core - INFO - Starting established connection to tcp://[::1]:53290
2024-12-12 07:33:03,290 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:40197', status: init, memory: 0, processing: 0>
2024-12-12 07:33:03,290 - distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:40197
2024-12-12 07:33:03,290 - distributed.core - INFO - Starting established connection to tcp://[::1]:53298
2024-12-12 07:33:12,007 - distributed.scheduler - INFO - Receive client connection: Client-576c0830-b85b-11ef-92fb-7e49b3567ca4
2024-12-12 07:33:12,007 - distributed.core - INFO - Starting established connection to tcp://127.0.0.1:51980
2024-12-12 07:33:27,532 - distributed.core - INFO - Event loop was unresponsive in Scheduler for 4.50s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 08:02:47,432 - distributed.core - INFO - Connection to tcp://[::1]:53290 has been closed.
2024-12-12 08:02:47,434 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://127.0.0.1:44687', status: running, memory: 0, processing: 4> (stimulus_id='handle-worker-cleanup-1733990567.434068')
2024-12-12 08:02:48,762 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:38627', status: init, memory: 0, processing: 0>
2024-12-12 08:02:48,764 - distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:38627
2024-12-12 08:02:48,764 - distributed.core - INFO - Starting established connection to tcp://[::1]:55716
2024-12-12 08:06:35,239 - distributed.core - INFO - Connection to tcp://[::1]:53298 has been closed.
2024-12-12 08:06:35,240 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://127.0.0.1:40197', status: running, memory: 4, processing: 4> (stimulus_id='handle-worker-cleanup-1733990795.240042')
2024-12-12 08:06:35,240 - distributed.scheduler - WARNING - Removing worker 'tcp://127.0.0.1:40197' caused the cluster to lose already computed task(s), which will be recomputed elsewhere: {('single_partition_write_with_filename-f6c1765edbf5a25f2b0fe44f721022b9', 7), ('single_partition_write_with_filename-f6c1765edbf5a25f2b0fe44f721022b9', 3), ('single_partition_write_with_filename-f6c1765edbf5a25f2b0fe44f721022b9', 5), ('single_partition_write_with_filename-f6c1765edbf5a25f2b0fe44f721022b9', 1)} (stimulus_id='handle-worker-cleanup-1733990795.240042')
2024-12-12 08:06:36,593 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:37323', status: init, memory: 0, processing: 0>
2024-12-12 08:06:36,593 - distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:37323
2024-12-12 08:06:36,593 - distributed.core - INFO - Starting established connection to tcp://[::1]:44604
2024-12-12 08:12:43,936 - distributed.core - INFO - Connection to tcp://[::1]:55716 has been closed.
2024-12-12 08:12:43,937 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://127.0.0.1:38627', status: running, memory: 0, processing: 8> (stimulus_id='handle-worker-cleanup-1733991163.9372256')
2024-12-12 08:12:45,236 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:39201', status: init, memory: 0, processing: 0>
2024-12-12 08:12:45,236 - distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:39201
2024-12-12 08:12:45,236 - distributed.core - INFO - Starting established connection to tcp://[::1]:36226
2024-12-12 08:17:21,249 - distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
2024-12-12 08:17:23,682 - distributed.utils_perf - WARNING - full garbage collections took 13% CPU time recently (threshold: 10%)
2024-12-12 08:17:32,228 - distributed.utils_perf - WARNING - full garbage collections took 13% CPU time recently (threshold: 10%)
2024-12-12 08:17:33,664 - distributed.utils_perf - WARNING - full garbage collections took 13% CPU time recently (threshold: 10%)
2024-12-12 08:17:35,155 - distributed.utils_perf - WARNING - full garbage collections took 13% CPU time recently (threshold: 10%)
2024-12-12 08:18:42,729 - distributed.utils_perf - WARNING - full garbage collections took 13% CPU time recently (threshold: 10%)
2024-12-12 08:18:46,158 - distributed.utils_perf - WARNING - full garbage collections took 13% CPU time recently (threshold: 10%)
2024-12-12 08:18:49,170 - distributed.utils_perf - WARNING - full garbage collections took 13% CPU time recently (threshold: 10%)
2024-12-12 08:19:38,212 - distributed.utils_perf - WARNING - full garbage collections took 13% CPU time recently (threshold: 10%)
2024-12-12 08:20:00,204 - distributed.core - INFO - Connection to tcp://[::1]:44604 has been closed.
2024-12-12 08:20:00,205 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://127.0.0.1:37323', status: running, memory: 0, processing: 8> (stimulus_id='handle-worker-cleanup-1733991600.2049491')
2024-12-12 08:20:00,205 - distributed.scheduler - ERROR - Task ('single_partition_write_with_filename-f6c1765edbf5a25f2b0fe44f721022b9', 0) marked as failed because 4 workers died while trying to run it
2024-12-12 08:20:00,205 - distributed.scheduler - ERROR - Task ('single_partition_write_with_filename-f6c1765edbf5a25f2b0fe44f721022b9', 6) marked as failed because 4 workers died while trying to run it
2024-12-12 08:20:00,205 - distributed.scheduler - ERROR - Task ('single_partition_write_with_filename-f6c1765edbf5a25f2b0fe44f721022b9', 2) marked as failed because 4 workers died while trying to run it
2024-12-12 08:20:00,205 - distributed.scheduler - ERROR - Task ('single_partition_write_with_filename-f6c1765edbf5a25f2b0fe44f721022b9', 4) marked as failed because 4 workers died while trying to run it
2024-12-12 08:20:00,221 - distributed.scheduler - INFO - Remove client Client-576c0830-b85b-11ef-92fb-7e49b3567ca4
2024-12-12 08:20:00,221 - distributed.core - INFO - Received 'close-stream' from tcp://127.0.0.1:51980; closing.
2024-12-12 08:20:00,221 - distributed.scheduler - INFO - Remove client Client-576c0830-b85b-11ef-92fb-7e49b3567ca4
2024-12-12 08:20:00,222 - distributed.scheduler - INFO - Close client connection: Client-576c0830-b85b-11ef-92fb-7e49b3567ca4
2024-12-12 08:20:01,506 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:41803', status: init, memory: 0, processing: 0>
2024-12-12 08:20:01,507 - distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:41803
2024-12-12 08:20:01,507 - distributed.core - INFO - Starting established connection to tcp://[::1]:57212
2024-12-12 08:22:27,221 - distributed.utils_perf - WARNING - full garbage collections took 13% CPU time recently (threshold: 10%)

Steps/Code to reproduce bug

import os
import time
from dask.distributed import Client, LocalCluster
import warnings
import dask.dataframe as dd
import dask_cudf
import cudf
import gzip
import json
import dask.bag as db
import glob
from dask.distributed import wait
import numpy as np

from nemo_curator import get_client
from nemo_curator.datasets import DocumentDataset
from nemo_curator.utils.distributed_utils import (
    get_num_workers,
    read_data,
    write_to_disk,
)
from nemo_curator.utils.file_utils import expand_outdir_and_mkdir
from nemo_curator.datasets import DocumentDataset
from nemo_curator.utils.file_utils import reshard_jsonl
from nemo_curator.utils.file_utils import expand_outdir_and_mkdir
from helper import convert_jsonl_gz_to_json
from nemo_curator.utils.file_utils import get_all_files_paths_under, get_batched_files
from dask_cuda import LocalCUDACluster
from nemo_curator import AddId
from dask import config


def pre_imports():
    import cudf
    
def main():
    warnings.filterwarnings('ignore')
    base_dir = "/lpai"
    cpu_client = get_client(cluster_type='cpu',scheduler_address='127.0.0.1:8786',)
    print(f"Num Workers = {get_num_workers(cpu_client)}", flush=True)
    config.set({'distributed.scheduler.allowed-failures': 100})
    allowed_failures = config.get('distributed.scheduler.allowed-failures')
    print(f"Allowed failures: {allowed_failures}")
    decompress_data_dir = os.path.join(base_dir,"volumes/lmp-guan/sy/cc-main-lan-en/24-11-11-1112-parquet")
    #decompress_data_dir = os.path.join(base_dir,"volumes/lmp-guan/sy/cc-main-lan-en/24-12-10-parquet-clean")
    id_data_dir = os.path.join(base_dir,"volumes/lmp-guan/sy/11-26-dedup-test/add-id-en-dataset")
    for files in get_batched_files(decompress_data_dir, id_data_dir, "parquet", batch_size=8):
        raw_data = read_data(files, file_type="parquet", backend="pandas", add_filename=True)
        input_dataset = DocumentDataset(raw_data)
        input_dataset.df.head()
        len(input_dataset.df)
        t0 = time.time()
        # specify add_id function
        add_id = AddId(
            id_field="id",
            id_prefix="add_id",
        )
        id_dataset = add_id(input_dataset)
        id_dataset.to_json(id_data_dir, write_to_filename=True)
    print(f"Adding ID took :{time.time()-t0}")
if __name__ == '__main__':
    main()

Here is the script I used to manually perform Unicode cleaning on the dataset using ftfy:

import os
import ftfy
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import ray

def fix_text(text):
    return ftfy.fix_text(text)

@ray.remote
def process_file(file_path, output_dir, text_field='text'):
    filename = os.path.basename(file_path)
    output_file_path = os.path.join(output_dir, filename)

    parquet_writer = None

    try:
        # Open the parquet file
        reader = pq.ParquetFile(file_path)
        
        for batch_index, batch in enumerate(reader.iter_batches(batch_size=1000)):
            print(f"start Processing batch {batch_index} of file {filename}")

            df = batch.to_pandas()
            if text_field in df.columns:
                df[text_field] = df[text_field].apply(fix_text)

            table = pa.Table.from_pandas(df)
            if parquet_writer is None:
                parquet_writer = pq.ParquetWriter(output_file_path, table.schema)
            
            parquet_writer.write_table(table)
            print(f"finished Processing batch {batch_index} of file {filename}")
    finally:
        if parquet_writer is not None:
            parquet_writer.close()

def clean_data_files_parallel(input_dir, output_dir, text_field='text'):
    os.makedirs(output_dir, exist_ok=True)

    futures = [
        process_file.remote(os.path.join(input_dir, filename), output_dir, text_field)
        for filename in os.listdir(input_dir)
        if filename.endswith('.parquet')
    ]
    
    ray.get(futures)

if __name__ == "__main__":
    ray.init()

    input_dir = "/lpai/volumes/lmp-guan/sy/cc-main-lan-en/24-11-11-1112-parquet"
    output_dir = "/lpai/volumes/lmp-guan/sy/cc-main-lan-en/24-12-10-parquet-clean"
    clean_data_files_parallel(input_dir, output_dir, text_field='text')

    ray.shutdown()

Expected behavior

Avoid using ftfy to do the text cleaning by myself before read_data operation

Environment overview

two cpu workers

@RickyShi46 RickyShi46 added the bug Something isn't working label Jan 16, 2025
@RickyShi46 RickyShi46 changed the title **read_data** doesn't work for my own parquet dataset unless cleaning text by myself nemo_curator.utils.distributed_utils.read_data doesn't work for my own parquet dataset unless cleaning text by myself Jan 16, 2025
@sithape2025 sithape2025 assigned ayushdg and ryantwolf and unassigned ayushdg Jan 29, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants