Skip to content

Commit

Permalink
add partition_on logic
Browse files Browse the repository at this point in the history
Signed-off-by: Vibhu Jawa <[email protected]>
  • Loading branch information
VibhuJawa committed Feb 4, 2025
1 parent c3fb61d commit 2272d3d
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 7 deletions.
4 changes: 4 additions & 0 deletions nemo_curator/datasets/doc_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ def to_json(
output_path: str,
write_to_filename: Union[bool, str] = False,
keep_filename_column: bool = False,
partition_on: Optional[str] = None,
):
"""
See nemo_curator.utils.distributed_utils.write_to_disk docstring for parameters.
Expand All @@ -170,6 +171,7 @@ def to_json(
output_path=output_path,
write_to_filename=write_to_filename,
keep_filename_column=keep_filename_column,
partition_on=partition_on,
output_type="jsonl",
)

Expand All @@ -178,6 +180,7 @@ def to_parquet(
output_path: str,
write_to_filename: Union[bool, str] = False,
keep_filename_column: bool = False,
partition_on: Optional[str] = None,
):
"""
See nemo_curator.utils.distributed_utils.write_to_disk docstring for parameters.
Expand All @@ -188,6 +191,7 @@ def to_parquet(
output_path=output_path,
write_to_filename=write_to_filename,
keep_filename_column=keep_filename_column,
partition_on=partition_on,
output_type="parquet",
)

Expand Down
46 changes: 39 additions & 7 deletions nemo_curator/utils/distributed_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -843,6 +843,7 @@ def write_to_disk(
write_to_filename: Union[bool, str] = False,
keep_filename_column: bool = False,
output_type: str = "jsonl",
partition_on: Optional[str] = None,
):
"""
This function writes a Dask DataFrame to the specified file path.
Expand Down Expand Up @@ -879,6 +880,11 @@ def write_to_disk(
f"write_using_filename is True but no {filename_col} column found in DataFrame"
)

if partition_on is not None and write_to_filename:
raise ValueError(
"Cannot use both partition_on and write_to_filename parameters simultaneously. "
)

if is_cudf_type(df):
import cudf

Expand All @@ -904,7 +910,12 @@ def write_to_disk(
# output_path is a directory
else:
if output_type == "jsonl" or output_type == "parquet":
_write_to_jsonl_or_parquet(df, output_path, output_type)
_write_to_jsonl_or_parquet(
df,
output_path=output_path,
output_type=output_type,
partition_on=partition_on,
)
elif output_type == "bitext":
if write_to_filename:
os.makedirs(output_path, exist_ok=True)
Expand Down Expand Up @@ -938,16 +949,37 @@ def _write_to_jsonl_or_parquet(
df,
output_path: str,
output_type: Literal["jsonl", "parquet"] = "jsonl",
partition_on: Optional[str] = None,
):
if output_type == "jsonl":
if is_cudf_type(df):
# See open issue here: https://github.com/rapidsai/cudf/issues/15211
# df.to_json(output_path, orient="records", lines=True, engine="cudf", force_ascii=False)
df.to_json(output_path, orient="records", lines=True, force_ascii=False)
if partition_on is not None:
unique_values = (
df[partition_on]
.unique()
.to_backend(backend="pandas")
.compute()
.to_list()
)
for value in unique_values:
os.makedirs(output_path, exist_ok=True)
partition_output_path = os.path.join(
output_path, f"{partition_on}={value}"
)
df[df[partition_on] == value].to_json(
partition_output_path,
orient="records",
lines=True,
force_ascii=False,
)
else:
df.to_json(output_path, orient="records", lines=True, force_ascii=False)
if is_cudf_type(df):
# See open issue here: https://github.com/rapidsai/cudf/issues/15211
# df.to_json(output_path, orient="records", lines=True, engine="cudf", force_ascii=False)
df.to_json(output_path, orient="records", lines=True, force_ascii=False)
else:
df.to_json(output_path, orient="records", lines=True, force_ascii=False)
elif output_type == "parquet":
df.to_parquet(output_path, write_index=False)
df.to_parquet(output_path, write_index=False, partition_on=partition_on)
else:
raise ValueError(f"Unknown output type: {output_type}")

Expand Down
129 changes: 129 additions & 0 deletions tests/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
write_to_disk,
)
from nemo_curator.utils.file_utils import get_all_files_paths_under
from nemo_curator.utils.import_utils import gpu_only_import, is_unavailable

cudf = gpu_only_import("cudf")


def _generate_dummy_dataset(num_rows: int = 50) -> str:
Expand Down Expand Up @@ -293,3 +296,129 @@ def test_write_single_jsonl_file(self, tmp_path):

result = DocumentDataset.read_json(output_path)
assert json_df.equals(result.df.compute())


class TestPartitionOn:
def test_partition_on_and_write_to_filename_error(self, tmp_path):
"""Verify that using partition_on and write_to_filename together raises an error."""
# Skip cudf tests if cudf is not installed.
df = pd.DataFrame(
{
"id": [1, 2, 3],
"file_name": ["f1", "f1", "f1"],
"category": ["A", "B", "A"],
}
)
ddf = dd.from_pandas(df, npartitions=1)
dataset = DocumentDataset(ddf)
with pytest.raises(
ValueError,
match="Cannot use both partition_on and write_to_filename parameters simultaneously.",
):
dataset.to_json(
output_path=str(tmp_path / "output"),
write_to_filename=True, # Intentionally provided to trigger the error
partition_on="category",
)

@pytest.mark.parametrize("backend", ["pandas", "cudf"])
@pytest.mark.parametrize(
"category_values",
[
["A", "B", "A", "B"],
[10, 20, 10, 20],
[1.0, 2.0, 1.0, 2.0],
],
)
def test_write_to_disk_with_partition_on_jsonl(
self, tmp_path, backend, category_values
):
"""
Test writing a partitioned JSONL dataset.
The function is expected to create subdirectories in the output directory
with names of the form 'category=<value>' for each unique partition column value.
"""
if backend == "cudf" and is_unavailable(cudf):
pytest.skip("cudf is not installed")

df = pd.DataFrame(
{"id": [1, 2, 3, 4], "category": category_values, "value": [10, 20, 30, 40]}
)
ddf = dd.from_pandas(df, npartitions=2)
ddf = ddf.to_backend(backend)
output_dir = tmp_path / "output_jsonl"
dataset = DocumentDataset(ddf)
dataset.to_json(output_path=str(output_dir), partition_on="category")
# Check that the output directory contains subdirectories for each partition.
# Unique partition values (as strings) to be used in the directory names.
unique_partitions = {str(x) for x in category_values}
for part in unique_partitions:
expected_dir = output_dir / f"category={part}"
assert expected_dir.exists(), f"Expected directory {expected_dir} not found"

# For each partition directory, load the JSONL files and verify that all records have the correct partition value.
# (Here we assume the files are written with extension ".part")
for part_dir in output_dir.glob("category=*"):
# The partition value is taken from the directory name.
partition_value = part_dir.name.split("=")[-1]
jsonl_files = list(part_dir.glob("*.part"))
assert (
jsonl_files
), f"No JSONL files found in partition directory {part_dir}"
for file in jsonl_files:
with open(file, "r") as f:
for line in f:
record = json.loads(line)
if "category" in record:
# Compare as strings, to work with both integer and string partition values.
assert (
str(record["category"]) == partition_value
), f"Record partition value {record['category']} does not match directory {partition_value}"

@pytest.mark.parametrize("backend", ["pandas", "cudf"])
@pytest.mark.parametrize(
"category_values",
[
["A", "B", "A", "B"],
[10, 20, 10, 20],
[1.0, 2.0, 1.0, 2.0],
],
)
def test_write_to_disk_with_partition_on_parquet(
self, tmp_path, backend, category_values
):
"""
Test writing a partitioned Parquet dataset.
The test writes a DataFrame partitioned on the 'category' column and then reads it back
using dd.read_parquet. The output is compared (after sorting) to the original DataFrame.
"""
if backend == "cudf" and is_unavailable(cudf):
pytest.skip("cudf is not installed")

df = pd.DataFrame(
{"id": [1, 2, 3, 4], "category": category_values, "value": [10, 20, 30, 40]}
)
ddf = dd.from_pandas(df, npartitions=2)
ddf = ddf.to_backend(backend)
output_dir = tmp_path / "output_parquet"
dataset = DocumentDataset(ddf)
dataset.to_parquet(output_path=str(output_dir), partition_on="category")

# Check that the output directory contains subdirectories for each partition.
# Unique partition values (as strings) to be used in the directory names.
unique_partitions = {str(x) for x in category_values}
for part in unique_partitions:
expected_dir = output_dir / f"category={part}"
assert expected_dir.exists(), f"Expected directory {expected_dir} not found"

ddf_loaded = dd.read_parquet(str(output_dir))
df_loaded = ddf_loaded.compute().reset_index(drop=True)
df_loaded["category"] = df_loaded["category"].astype(df["category"].dtype)
# To ensure a fair comparison, sort the dataframes by 'id' and reindex.
pd.testing.assert_frame_equal(
df.sort_values("id").reset_index(drop=True),
df_loaded.sort_values("id").reset_index(drop=True)[df.columns],
check_dtype=False,
)

0 comments on commit 2272d3d

Please sign in to comment.