Skip to content

Commit

Permalink
Enable ADD ID to work with CPU/GPU both (#479)
Browse files Browse the repository at this point in the history
* Enable ADD ID to work with CPU/GPU both

Signed-off-by: Vibhu Jawa <[email protected]>

* Make Test runable in a CPU only environment

Signed-off-by: Vibhu Jawa <[email protected]>

* Fix pytest skipping behavior in CPU/GPU environment

Signed-off-by: Vibhu Jawa <[email protected]>

* Raise error instead of skipping test

Signed-off-by: Vibhu Jawa <[email protected]>

---------

Signed-off-by: Vibhu Jawa <[email protected]>
  • Loading branch information
VibhuJawa authored Feb 6, 2025
1 parent 0a7136e commit 1dab545
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 12 deletions.
7 changes: 5 additions & 2 deletions nemo_curator/modules/add_id.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ def call(self, dataset: DocumentDataset) -> DocumentDataset:
return self._add_id_ordered(dataset)

def _add_id_fast(self, dataset: DocumentDataset) -> DocumentDataset:
meta = dataset.df.dtypes.to_dict()
meta = dataset.df._meta.copy()
meta[self.id_field] = "string"
meta[self.id_field] = meta[self.id_field].astype("string")

partition_zero_padding = count_digits(dataset.df.npartitions)
id_df = dataset.df.map_partitions(
Expand All @@ -61,12 +62,14 @@ def _add_id_fast_partition(self, partition, global_padding, partition_info=None)
for local_id in range(len(partition))
]
partition[self.id_field] = id_column
partition[self.id_field] = partition[self.id_field].astype("string")

return partition

def _add_id_ordered(self, dataset: DocumentDataset) -> DocumentDataset:
original_meta = dataset.df.dtypes.to_dict()
original_meta = dataset.df._meta.copy()
original_meta[self.id_field] = "string"
original_meta[self.id_field] = original_meta[self.id_field].astype("string")
delayed_dataset = dataset.df.to_delayed()

parition_lengths = [0]
Expand Down
3 changes: 2 additions & 1 deletion nemo_curator/scripts/add_id.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
def main(args):
client = get_client(**ArgumentHelper.parse_client_args(args))

backend = "cudf" if args.device == "gpu" else "pandas"
output_dir = expand_outdir_and_mkdir(args.output_data_dir)
files = get_all_files_paths_under(args.input_data_dir)
if args.shuffle:
Expand All @@ -36,7 +37,7 @@ def main(args):

dataset = DocumentDataset(
read_data(
files, file_type=args.input_file_type, backend="pandas", add_filename=True
files, file_type=args.input_file_type, backend=backend, add_filename=True
)
)
add_id = nemo_curator.AddId(
Expand Down
41 changes: 32 additions & 9 deletions tests/test_add_id.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,37 @@

import nemo_curator as nc
from nemo_curator.datasets import DocumentDataset
from nemo_curator.utils.import_utils import gpu_only_import, is_unavailable

cudf = gpu_only_import("cudf")
is_cudf_available = not is_unavailable(cudf)

def list_to_dataset(documents, col_name="text", npartitions=2):

def list_to_dataset(documents, col_name="text", npartitions=2, backend="pandas"):
data = {col_name: documents}
pdf = pd.DataFrame(data)

return DocumentDataset(dd.from_pandas(pdf, npartitions=npartitions))
ddf = dd.from_pandas(pdf, npartitions=npartitions)
if backend == "cudf" and is_unavailable(cudf):
raise ImportError("cuDF is not installed or importable.")
ddf = ddf.to_backend(backend)
return DocumentDataset(ddf)


@pytest.fixture
def single_partition_dataset():
@pytest.fixture(params=["pandas", pytest.param("cudf", marks=pytest.mark.gpu)])
def single_partition_dataset(request):
return list_to_dataset(
["First", "Second", "Third", "Fourth", "Fifth"], npartitions=1
["First", "Second", "Third", "Fourth", "Fifth"],
npartitions=1,
backend=request.param,
)


@pytest.fixture
def two_partition_dataset():
@pytest.fixture(params=["pandas", pytest.param("cudf", marks=pytest.mark.gpu)])
def two_partition_dataset(request):
return list_to_dataset(
["First", "Second", "Third", "Fourth", "Fifth"], npartitions=2
["First", "Second", "Third", "Fourth", "Fifth"],
npartitions=2,
backend=request.param,
)


Expand All @@ -56,6 +67,8 @@ def test_basic_id(self, single_partition_dataset):
"doc_id-0000000004",
]
)
if is_cudf_available and isinstance(actual_ids, cudf.Series):
actual_ids = actual_ids.to_pandas()

assert all(
expected_ids == actual_ids
Expand All @@ -75,6 +88,8 @@ def test_two_partitions(self, two_partition_dataset):
"doc_id-0000000004",
]
)
if is_cudf_available and isinstance(actual_ids, cudf.Series):
actual_ids = actual_ids.to_pandas()

assert all(
expected_ids == actual_ids
Expand All @@ -95,6 +110,8 @@ def test_id_prefix(self, two_partition_dataset):
f"{id_prefix}-0000000004",
]
)
if is_cudf_available and isinstance(actual_ids, cudf.Series):
actual_ids = actual_ids.to_pandas()

assert all(
expected_ids == actual_ids
Expand All @@ -115,6 +132,8 @@ def test_start_index(self, two_partition_dataset):
"doc_id-0000000017",
]
)
if is_cudf_available and isinstance(actual_ids, cudf.Series):
actual_ids = actual_ids.to_pandas()

assert all(
expected_ids == actual_ids
Expand All @@ -134,6 +153,8 @@ def test_fast_id_single_partition(self, single_partition_dataset):
"doc_id-40",
]
)
if is_cudf_available and isinstance(actual_ids, cudf.Series):
actual_ids = actual_ids.to_pandas()

assert all(
expected_ids == actual_ids
Expand All @@ -153,6 +174,8 @@ def test_fast_id_two_partitions(self, two_partition_dataset):
"doc_id-11",
]
)
if is_cudf_available and isinstance(actual_ids, cudf.Series):
actual_ids = actual_ids.to_pandas()

assert all(
expected_ids == actual_ids
Expand Down

0 comments on commit 1dab545

Please sign in to comment.