From 1711161bb84f49d500fd5f3c2ed50e23174d6e03 Mon Sep 17 00:00:00 2001 From: Vibhu Jawa Date: Fri, 10 Jan 2025 10:27:46 -0800 Subject: [PATCH 1/4] Enable ADD ID to work with CPU/GPU both Signed-off-by: Vibhu Jawa --- nemo_curator/modules/add_id.py | 7 ++++-- nemo_curator/scripts/add_id.py | 3 ++- tests/test_add_id.py | 40 ++++++++++++++++++++++++++-------- 3 files changed, 38 insertions(+), 12 deletions(-) diff --git a/nemo_curator/modules/add_id.py b/nemo_curator/modules/add_id.py index 244163912..4e3a37b32 100644 --- a/nemo_curator/modules/add_id.py +++ b/nemo_curator/modules/add_id.py @@ -37,8 +37,9 @@ def __call__(self, dataset: DocumentDataset) -> DocumentDataset: return self._add_id_ordered(dataset) def _add_id_fast(self, dataset: DocumentDataset) -> DocumentDataset: - meta = dataset.df.dtypes.to_dict() + meta = dataset.df._meta.copy() meta[self.id_field] = "string" + meta[self.id_field] = meta[self.id_field].astype("string") partition_zero_padding = count_digits(dataset.df.npartitions) id_df = dataset.df.map_partitions( @@ -59,12 +60,14 @@ def _add_id_fast_partition(self, partition, global_padding, partition_info=None) for local_id in range(len(partition)) ] partition[self.id_field] = id_column + partition[self.id_field] = partition[self.id_field].astype("string") return partition def _add_id_ordered(self, dataset: DocumentDataset) -> DocumentDataset: - original_meta = dataset.df.dtypes.to_dict() + original_meta = dataset.df._meta.copy() original_meta[self.id_field] = "string" + original_meta[self.id_field] = original_meta[self.id_field].astype("string") delayed_dataset = dataset.df.to_delayed() parition_lengths = [0] diff --git a/nemo_curator/scripts/add_id.py b/nemo_curator/scripts/add_id.py index c926e36dd..2a856af07 100644 --- a/nemo_curator/scripts/add_id.py +++ b/nemo_curator/scripts/add_id.py @@ -28,6 +28,7 @@ def main(args): client = get_client(**ArgumentHelper.parse_client_args(args)) + backend = "cudf" if args.device == "gpu" else "pandas" output_dir = expand_outdir_and_mkdir(args.output_data_dir) files = get_all_files_paths_under(args.input_data_dir) if args.shuffle: @@ -36,7 +37,7 @@ def main(args): dataset = DocumentDataset( read_data( - files, file_type=args.input_file_type, backend="pandas", add_filename=True + files, file_type=args.input_file_type, backend=backend, add_filename=True ) ) add_id = nemo_curator.AddId( diff --git a/tests/test_add_id.py b/tests/test_add_id.py index 42a8575e5..ccac825d6 100644 --- a/tests/test_add_id.py +++ b/tests/test_add_id.py @@ -18,26 +18,36 @@ import nemo_curator as nc from nemo_curator.datasets import DocumentDataset +from nemo_curator.utils.import_utils import gpu_only_import +cudf = gpu_only_import("cudf") -def list_to_dataset(documents, col_name="text", npartitions=2): + +def list_to_dataset(documents, col_name="text", npartitions=2, backend="pandas"): data = {col_name: documents} pdf = pd.DataFrame(data) - - return DocumentDataset(dd.from_pandas(pdf, npartitions=npartitions)) + ddf = dd.from_pandas(pdf, npartitions=npartitions) + if backend == "cudf" and cudf is None: + pytest.skip("cuDF is not installed or importable.") + ddf = ddf.to_backend(backend) + return DocumentDataset(ddf) -@pytest.fixture -def single_partition_dataset(): +@pytest.fixture(params=["pandas", "cudf"]) +def single_partition_dataset(request): return list_to_dataset( - ["First", "Second", "Third", "Fourth", "Fifth"], npartitions=1 + ["First", "Second", "Third", "Fourth", "Fifth"], + npartitions=1, + backend=request.param, ) -@pytest.fixture -def two_partition_dataset(): +@pytest.fixture(params=["pandas", "cudf"]) +def two_partition_dataset(request): return list_to_dataset( - ["First", "Second", "Third", "Fourth", "Fifth"], npartitions=2 + ["First", "Second", "Third", "Fourth", "Fifth"], + npartitions=2, + backend=request.param, ) @@ -56,6 +66,8 @@ def test_basic_id(self, single_partition_dataset): "doc_id-0000000004", ] ) + if isinstance(actual_ids, cudf.Series): + actual_ids = actual_ids.to_pandas() assert all( expected_ids == actual_ids @@ -75,6 +87,8 @@ def test_two_partitions(self, two_partition_dataset): "doc_id-0000000004", ] ) + if isinstance(actual_ids, cudf.Series): + actual_ids = actual_ids.to_pandas() assert all( expected_ids == actual_ids @@ -95,6 +109,8 @@ def test_id_prefix(self, two_partition_dataset): f"{id_prefix}-0000000004", ] ) + if isinstance(actual_ids, cudf.Series): + actual_ids = actual_ids.to_pandas() assert all( expected_ids == actual_ids @@ -115,6 +131,8 @@ def test_start_index(self, two_partition_dataset): "doc_id-0000000017", ] ) + if isinstance(actual_ids, cudf.Series): + actual_ids = actual_ids.to_pandas() assert all( expected_ids == actual_ids @@ -134,6 +152,8 @@ def test_fast_id_single_partition(self, single_partition_dataset): "doc_id-40", ] ) + if isinstance(actual_ids, cudf.Series): + actual_ids = actual_ids.to_pandas() assert all( expected_ids == actual_ids @@ -153,6 +173,8 @@ def test_fast_id_two_partitions(self, two_partition_dataset): "doc_id-11", ] ) + if isinstance(actual_ids, cudf.Series): + actual_ids = actual_ids.to_pandas() assert all( expected_ids == actual_ids From 8f843481303a3ad2a2ac1193eaa617d4b0ba4760 Mon Sep 17 00:00:00 2001 From: Vibhu Jawa Date: Fri, 10 Jan 2025 12:24:22 -0800 Subject: [PATCH 2/4] Make Test runable in a CPU only environment Signed-off-by: Vibhu Jawa --- tests/test_add_id.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/tests/test_add_id.py b/tests/test_add_id.py index ccac825d6..fd8a89a51 100644 --- a/tests/test_add_id.py +++ b/tests/test_add_id.py @@ -18,7 +18,7 @@ import nemo_curator as nc from nemo_curator.datasets import DocumentDataset -from nemo_curator.utils.import_utils import gpu_only_import +from nemo_curator.utils.import_utils import gpu_only_import, is_unavailable cudf = gpu_only_import("cudf") @@ -27,13 +27,13 @@ def list_to_dataset(documents, col_name="text", npartitions=2, backend="pandas") data = {col_name: documents} pdf = pd.DataFrame(data) ddf = dd.from_pandas(pdf, npartitions=npartitions) - if backend == "cudf" and cudf is None: + if backend == "cudf" and is_unavailable("cudf"): pytest.skip("cuDF is not installed or importable.") ddf = ddf.to_backend(backend) return DocumentDataset(ddf) -@pytest.fixture(params=["pandas", "cudf"]) +@pytest.fixture(params=["pandas", pytest.param("cudf", marks=pytest.mark.gpu)]) def single_partition_dataset(request): return list_to_dataset( ["First", "Second", "Third", "Fourth", "Fifth"], @@ -42,7 +42,7 @@ def single_partition_dataset(request): ) -@pytest.fixture(params=["pandas", "cudf"]) +@pytest.fixture(params=["pandas", pytest.param("cudf", marks=pytest.mark.gpu)]) def two_partition_dataset(request): return list_to_dataset( ["First", "Second", "Third", "Fourth", "Fifth"], @@ -66,7 +66,7 @@ def test_basic_id(self, single_partition_dataset): "doc_id-0000000004", ] ) - if isinstance(actual_ids, cudf.Series): + if cudf is not None and isinstance(actual_ids, cudf.Series): actual_ids = actual_ids.to_pandas() assert all( @@ -87,7 +87,7 @@ def test_two_partitions(self, two_partition_dataset): "doc_id-0000000004", ] ) - if isinstance(actual_ids, cudf.Series): + if not is_unavailable("cudf") and isinstance(actual_ids, cudf.Series): actual_ids = actual_ids.to_pandas() assert all( @@ -109,7 +109,7 @@ def test_id_prefix(self, two_partition_dataset): f"{id_prefix}-0000000004", ] ) - if isinstance(actual_ids, cudf.Series): + if not is_unavailable("cudf") and isinstance(actual_ids, cudf.Series): actual_ids = actual_ids.to_pandas() assert all( @@ -131,7 +131,7 @@ def test_start_index(self, two_partition_dataset): "doc_id-0000000017", ] ) - if isinstance(actual_ids, cudf.Series): + if not is_unavailable("cudf") and isinstance(actual_ids, cudf.Series): actual_ids = actual_ids.to_pandas() assert all( @@ -152,7 +152,7 @@ def test_fast_id_single_partition(self, single_partition_dataset): "doc_id-40", ] ) - if isinstance(actual_ids, cudf.Series): + if not is_unavailable("cudf") and isinstance(actual_ids, cudf.Series): actual_ids = actual_ids.to_pandas() assert all( @@ -173,7 +173,7 @@ def test_fast_id_two_partitions(self, two_partition_dataset): "doc_id-11", ] ) - if isinstance(actual_ids, cudf.Series): + if not is_unavailable("cudf") and isinstance(actual_ids, cudf.Series): actual_ids = actual_ids.to_pandas() assert all( From d9e9f6b9e4625373a4cd93d96ee156d5a9c76d46 Mon Sep 17 00:00:00 2001 From: Vibhu Jawa Date: Mon, 13 Jan 2025 07:10:41 -0800 Subject: [PATCH 3/4] Fix pytest skipping behavior in CPU/GPU environment Signed-off-by: Vibhu Jawa --- tests/test_add_id.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/tests/test_add_id.py b/tests/test_add_id.py index fd8a89a51..e4019d214 100644 --- a/tests/test_add_id.py +++ b/tests/test_add_id.py @@ -21,13 +21,14 @@ from nemo_curator.utils.import_utils import gpu_only_import, is_unavailable cudf = gpu_only_import("cudf") +is_cudf_available = not is_unavailable(cudf) def list_to_dataset(documents, col_name="text", npartitions=2, backend="pandas"): data = {col_name: documents} pdf = pd.DataFrame(data) ddf = dd.from_pandas(pdf, npartitions=npartitions) - if backend == "cudf" and is_unavailable("cudf"): + if backend == "cudf" and is_unavailable(cudf): pytest.skip("cuDF is not installed or importable.") ddf = ddf.to_backend(backend) return DocumentDataset(ddf) @@ -66,7 +67,7 @@ def test_basic_id(self, single_partition_dataset): "doc_id-0000000004", ] ) - if cudf is not None and isinstance(actual_ids, cudf.Series): + if is_cudf_available and isinstance(actual_ids, cudf.Series): actual_ids = actual_ids.to_pandas() assert all( @@ -87,7 +88,7 @@ def test_two_partitions(self, two_partition_dataset): "doc_id-0000000004", ] ) - if not is_unavailable("cudf") and isinstance(actual_ids, cudf.Series): + if is_cudf_available and isinstance(actual_ids, cudf.Series): actual_ids = actual_ids.to_pandas() assert all( @@ -109,7 +110,7 @@ def test_id_prefix(self, two_partition_dataset): f"{id_prefix}-0000000004", ] ) - if not is_unavailable("cudf") and isinstance(actual_ids, cudf.Series): + if is_cudf_available and isinstance(actual_ids, cudf.Series): actual_ids = actual_ids.to_pandas() assert all( @@ -131,7 +132,7 @@ def test_start_index(self, two_partition_dataset): "doc_id-0000000017", ] ) - if not is_unavailable("cudf") and isinstance(actual_ids, cudf.Series): + if is_cudf_available and isinstance(actual_ids, cudf.Series): actual_ids = actual_ids.to_pandas() assert all( @@ -152,7 +153,7 @@ def test_fast_id_single_partition(self, single_partition_dataset): "doc_id-40", ] ) - if not is_unavailable("cudf") and isinstance(actual_ids, cudf.Series): + if is_cudf_available and isinstance(actual_ids, cudf.Series): actual_ids = actual_ids.to_pandas() assert all( @@ -173,7 +174,7 @@ def test_fast_id_two_partitions(self, two_partition_dataset): "doc_id-11", ] ) - if not is_unavailable("cudf") and isinstance(actual_ids, cudf.Series): + if is_cudf_available and isinstance(actual_ids, cudf.Series): actual_ids = actual_ids.to_pandas() assert all( From dc3a13836d9fbfc72d4ec614df9dc19b192c2ff9 Mon Sep 17 00:00:00 2001 From: Vibhu Jawa Date: Thu, 6 Feb 2025 11:03:34 -0800 Subject: [PATCH 4/4] Raise error instead of skipping test Signed-off-by: Vibhu Jawa --- tests/test_add_id.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_add_id.py b/tests/test_add_id.py index e4019d214..c33c5e4a8 100644 --- a/tests/test_add_id.py +++ b/tests/test_add_id.py @@ -29,7 +29,7 @@ def list_to_dataset(documents, col_name="text", npartitions=2, backend="pandas") pdf = pd.DataFrame(data) ddf = dd.from_pandas(pdf, npartitions=npartitions) if backend == "cudf" and is_unavailable(cudf): - pytest.skip("cuDF is not installed or importable.") + raise ImportError("cuDF is not installed or importable.") ddf = ddf.to_backend(backend) return DocumentDataset(ddf)