Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consecutive execution of fuzzy deduplication on different columns fails with errors #501

Open
sarahyurick opened this issue Jan 29, 2025 · 3 comments
Assignees
Labels
bug Something isn't working

Comments

@sarahyurick
Copy link
Collaborator

sarahyurick commented Jan 29, 2025

Python script to reproduce:

from functools import partial

from nemo_curator import FuzzyDuplicates, FuzzyDuplicatesConfig, Sequential, get_client
from nemo_curator.datasets import DocumentDataset


def fuzzy_dedupe(dataset, cache_dir, id_field, text_field):
    # dataset.df.reset_index(drop=True)

    fuzzy_dedup_config = FuzzyDuplicatesConfig(
        cache_dir=cache_dir,
        id_field=id_field,
        text_field=text_field,
        false_positive_check=True,
    )

    fuzzy_dup = FuzzyDuplicates(config=fuzzy_dedup_config)

    duplicates = fuzzy_dup(dataset=dataset)

    docs_to_remove = duplicates.df.map_partitions(
        lambda x: x[x.group.duplicated(keep="first")]
    )

    result = dataset.df[~dataset.df[id_field].isin(docs_to_remove[id_field].compute())]

    print("Quick look at the DataFrame here...")
    print(result.head())
    print(result.columns)
    print(len(result))
    print("---")

    return DocumentDataset(result)


def main():
    client = get_client(cluster_type="gpu")

    # JSONL dataset with "text" and "text2" fields
    # ID field is "adlr_id"
    dataset = DocumentDataset.read_json(
        "/path/to/jsonl/data",
        backend="cudf",
    )

    df = dataset.df
    df["text2"] = df["text2"].sample(frac=1, random_state=42).reset_index(drop=True)
    dataset = DocumentDataset(dataset_df=df)

    print("Original DataFrame:")
    print(dataset.df.head())
    print(dataset.df.columns)
    print(len(dataset.df))
    print("---")

    # Use different cache directories to avoid existing cache_dir errors
    cache_input = "/path/to/cache_input"
    dedupe_input = partial(
        fuzzy_dedupe, id_field="adlr_id", text_field="text", cache_dir=cache_input
    )

    cache_output = (
        "/path/to/cache_output"
    )
    dedupe_output = partial(
        fuzzy_dedupe, id_field="adlr_id", text_field="text2", cache_dir=cache_output
    )

    curation_steps = Sequential(
        [
            dedupe_input,
            dedupe_output,
        ]
    )
    dataset = curation_steps(dataset)
    dataset = dataset.persist()

    print("Result:")
    print(dataset.df.head())
    print(dataset.df.columns)
    print(len(dataset.df))
    print("---")

    client.close()


if __name__ == "__main__":
    main()

Console output:

(nemo_curator) syurick@exp02:~$ python /home/nfs/syurick/NaeMo-Curator-scratch/fuzzy_dedup_api/test1.py
cuDF Spilling is enabled
Reading 2 files with blocksize='1gb' / files_per_partition=None
/home/nfs/syurick/NeMo-Curator/nemo_curator/utils/distributed_utils.py:412: UserWarning: If underlying JSONL data does not have a consistent schema, reading with blocksize will fail. Please use files_per_partition approach instead.
  warnings.warn(
Original DataFrame:
                                  adlr_id  ...                                                url
0  ad21470b-7b97-4fb7-95db-1f51e654771e-0  ...  http://128922.homepagemodules.de/u1797_AutumnD...
1  6d91b7a3-2444-4edc-afe8-479fa95f83a3-1  ...  http://1actaday.blogspot.com/2016/11/before-fl...
2  cb27d499-7eee-4710-b68c-c01f6c874496-2  ...  http://22508.dynamicboard.de/t1892f2-pnigeriaj...
3  beeb1a42-030c-4942-8d46-0ce5e5baae96-3  ...  http://22508.dynamicboard.de/t2384f22-Were-fig...
4  650600a6-9a51-4900-8d9a-e2a09cc2db4b-4  ...  http://2ndgoorkhas.com/this-day-in-history/191...

[5 rows x 8 columns]
Index(['adlr_id', 'filename', 'language', 'quality_pred', 'source_id', 'text',
       'text2', 'url'],
      dtype='object')
23415
---
/home/nfs/syurick/NeMo-Curator/nemo_curator/modules/config.py:94: UserWarning: Identifying false positives during the Minhash deduplication is computationally expensive. For improved performance consider setting this to False
  warnings.warn(
Stage 1: Starting Minhash + LSH computation
Stage 1: Minhash + LSH complete!
Stage 2 (False Positive Check): Starting Map_Buckets
Stage 2 (False Postive Check): Map_Buckets Complete!
Stage 3 (False Postive Check): Shuffle docs
  0%|                                                                                              | 0/1 [00:00<?, ?it/s]
Started processing bucket-map partitions 0 through 1 of 1
Using 16 text partitions.
Text-df partition  1/1 completed in 1.2325401306152344
Bucket partition  1/1 completed in 1.2382714748382568
100%|██████████████████████████████████████████████████████████████████████████████████████| 1/1 [00:01<00:00,  1.24s/it]
Stage 3 (False Postive Check): Shuffle docs complete!
Stage 4 (False Postive Check): Jaccard Similarity in Buckets
Stage 4 (False Postive Check): Jaccard Similarity in Buckets Complete!
Stage 5: Connected Components across buckets
/home/nfs/syurick/miniforge3/envs/nemo_curator/lib/python3.10/site-packages/cudf/core/reshape.py:383: FutureWarning: The behavior of array concatenation with empty entries is deprecated. In a future version, this will no longer exclude empty items when determining the result dtype. To retain the old behavior, exclude the empty entries before the concat operation.
  warnings.warn(
Stage 5: Connected Components across buckets complete!
Quick look at the DataFrame here...
                                  adlr_id  ...                                                url
0  ad21470b-7b97-4fb7-95db-1f51e654771e-0  ...  http://128922.homepagemodules.de/u1797_AutumnD...
1  6d91b7a3-2444-4edc-afe8-479fa95f83a3-1  ...  http://1actaday.blogspot.com/2016/11/before-fl...
2  cb27d499-7eee-4710-b68c-c01f6c874496-2  ...  http://22508.dynamicboard.de/t1892f2-pnigeriaj...
3  beeb1a42-030c-4942-8d46-0ce5e5baae96-3  ...  http://22508.dynamicboard.de/t2384f22-Were-fig...
4  650600a6-9a51-4900-8d9a-e2a09cc2db4b-4  ...  http://2ndgoorkhas.com/this-day-in-history/191...

[5 rows x 8 columns]
Index(['adlr_id', 'filename', 'language', 'quality_pred', 'source_id', 'text',
       'text2', 'url'],
      dtype='object')
23168
---
/home/nfs/syurick/NeMo-Curator/nemo_curator/modules/config.py:94: UserWarning: Identifying false positives during the Minhash deduplication is computationally expensive. For improved performance consider setting this to False
  warnings.warn(
Stage 1: Starting Minhash + LSH computation
Stage 1: Minhash + LSH complete!
Stage 2 (False Positive Check): Starting Map_Buckets
Stage 2 (False Postive Check): Map_Buckets Complete!
Stage 3 (False Postive Check): Shuffle docs
  0%|                                                                                              | 0/1 [00:00<?, ?it/s]
Started processing bucket-map partitions 0 through 1 of 1
Using 16 text partitions.
2025-01-29 14:16:05,639 - distributed.worker - ERROR - Compute Failed
Key:       _run_coroutine_on_worker-d1c0dcf9-e5c3-4e0f-8c54-5774bce32a8a
State:     executing
Task:  <Task '_run_coroutine_on_worker-d1c0dcf9-e5c3-4e0f-8c54-5774bce32a8a' _run_coroutine_on_worker(...)>
Exception: "RuntimeError('CUDF failure at: /__w/cudf/cudf/cpp/src/partitioning/partitioning.cu:793: Unexpected null values in partition_map.')"
Traceback: '  File "/home/nfs/syurick/miniforge3/envs/nemo_curator/lib/python3.10/site-packages/dask_cuda/explicit_comms/comms.py", line 101, in _run_coroutine_on_worker\n    return executor.submit(_run).result()\n  File "/home/nfs/syurick/miniforge3/envs/nemo_curator/lib/python3.10/concurrent/futures/_base.py", line 458, in result\n    return self.__get_result()\n  File "/home/nfs/syurick/miniforge3/envs/nemo_curator/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result\n    raise self._exception\n  File "/home/nfs/syurick/miniforge3/envs/nemo_curator/lib/python3.10/concurrent/futures/thread.py", line 58, in run\n    result = self.fn(*self.args, **self.kwargs)\n  File "/home/nfs/syurick/miniforge3/envs/nemo_curator/lib/python3.10/site-packages/dask_cuda/explicit_comms/comms.py", line 98, in _run\n    return future.result()\n  File "/home/nfs/syurick/miniforge3/envs/nemo_curator/lib/python3.10/concurrent/futures/_base.py", line 458, in result\n    return self.__get_result()\n  File "/home/nfs/syurick/miniforge3/envs/nemo_curator/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result\n    raise self._exception\n  File "/home/nfs/syurick/miniforge3/envs/nemo_curator/lib/python3.10/site-packages/dask_cuda/explicit_comms/dataframe/shuffle.py", line 381, in shuffle_task\n    partitions = create_partitions(\n  File "/home/nfs/syurick/miniforge3/envs/nemo_curator/lib/python3.10/site-packages/dask_cuda/explicit_comms/dataframe/shuffle.py", line 245, in create_partitions\n    partition_dataframe(\n  File "/home/nfs/syurick/miniforge3/envs/nemo_curator/lib/python3.10/site-packages/dask_cuda/explicit_comms/dataframe/shuffle.py", line 203, in partition_dataframe\n    return group_split_dispatch(df, map_index, npartitions, ignore_index=ignore_index)\n  File "/home/nfs/syurick/miniforge3/envs/nemo_curator/lib/python3.10/site-packages/dask/utils.py", line 772, in __call__\n    return meth(arg, *args, **kwargs)\n  File "/home/nfs/syurick/miniforge3/envs/nemo_curator/lib/python3.10/site-packages/cudf/utils/performance_tracking.py", line 51, in wrapper\n    return func(*args, **kwargs)\n  File "/home/nfs/syurick/miniforge3/envs/nemo_curator/lib/python3.10/site-packages/dask_cudf/backends.py", line 443, in group_split_cudf\n    df.scatter_by_map(\n  File "/home/nfs/syurick/miniforge3/envs/nemo_curator/lib/python3.10/site-packages/cudf/utils/performance_tracking.py", line 51, in wrapper\n    return func(*args, **kwargs)\n  File "/home/nfs/syurick/miniforge3/envs/nemo_curator/lib/python3.10/site-packages/cudf/core/dataframe.py", line 2487, in scatter_by_map\n    partitioned_columns, output_offsets = libcudf.partitioning.partition(\n  File "/home/nfs/syurick/miniforge3/envs/nemo_curator/lib/python3.10/contextlib.py", line 79, in inner\n    return func(*args, **kwds)\n  File "partitioning.pyx", line 48, in cudf._lib.partitioning.partition\n  File "partitioning.pyx", line 57, in pylibcudf.partitioning.partition\n  File "partitioning.pyx", line 82, in pylibcudf.partitioning.partition\n'

Please note, this error only happens when false_positive_check=True. When False (the default), it works as expected.

@sarahyurick sarahyurick added the bug Something isn't working label Jan 29, 2025
@ayushdg
Copy link
Collaborator

ayushdg commented Jan 29, 2025

As a workaround could you try adding a dataset.df.reset_index(drop=True) in the fuzzy_dedupe method before calling fuzzy_dup. My best guess is it's related to #48 since after the first removal the indices are no longer consistent.

@sarahyurick
Copy link
Collaborator Author

@ayushdg Hmm that did not work for me, I still get the same error during the false positive check.

@ayushdg
Copy link
Collaborator

ayushdg commented Jan 29, 2025

Thanks for checking. I'll investigate further.

@ayushdg ayushdg self-assigned this Jan 29, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants