Skip to content

Commit ee296ea

Browse files
ext.wanghao204HunterLine
authored andcommitted
Add support for json[l].gz, and make ray dataset support reading json[l].gz json[l].zst format
1 parent ae290f7 commit ee296ea

File tree

6 files changed

+164
-61
lines changed

6 files changed

+164
-61
lines changed

data_juicer/core/data/load_strategy.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -631,7 +631,7 @@ def load_data(self, **kwargs):
631631

632632
# Use ray.data functions directly with PyArrow filesystem support
633633
# Ray's read functions support filesystem parameter via PyArrow
634-
if data_format in {"json", "jsonl"}:
634+
if data_format in {"json", "jsonl", "json.gz", "jsonl.gz", "json.zst", "jsonl.zst"}:
635635
# For JSON, we need to use read_json_stream with filesystem
636636
from data_juicer.core.data.ray_dataset import read_json_stream
637637

data_juicer/core/data/ray_dataset.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,7 @@ def count(self) -> int:
355355

356356
@classmethod
357357
def read(cls, data_format: str, paths: Union[str, List[str]]) -> RayDataset:
358-
if data_format in {"json", "jsonl"}:
358+
if data_format in {"json", "jsonl", "json.gz", "jsonl.gz", "json.zst", "jsonl.zst"}:
359359
return RayDataset.read_json(paths)
360360
elif data_format == "webdataset":
361361
return RayDataset.read_webdataset(paths)
@@ -453,7 +453,7 @@ def read_json_stream(
453453
include_paths: bool = False,
454454
ignore_missing_paths: bool = False,
455455
shuffle: Union[Literal["files"], None] = None,
456-
file_extensions: Optional[List[str]] = ["json", "jsonl"],
456+
file_extensions: Optional[List[str]] = ["json", "jsonl", "json.gz", "jsonl.gz", "json.zst", "jsonl.zst"],
457457
concurrency: Optional[int] = None,
458458
override_num_blocks: Optional[int] = None,
459459
**arrow_json_args,

data_juicer/format/json_formatter.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ class JsonFormatter(LocalFormatter):
99
Default suffixes is `['.json', '.jsonl', '.jsonl.zst']`
1010
"""
1111

12-
SUFFIXES = [".json", ".jsonl", ".jsonl.zst"]
12+
SUFFIXES = [".json", ".jsonl", "json.gz", "jsonl.gz", "json.zst", "jsonl.zst"]
1313

1414
def __init__(self, dataset_path, suffixes=None, **kwargs):
1515
"""

data_juicer/utils/file_utils.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import aiohttp
1313
import pandas as pd
14+
from datasets.utils.extract import GzipExtractor
1415
from datasets.utils.extract import ZstdExtractor as Extractor
1516

1617
from data_juicer.utils.common_utils import dict_to_hash
@@ -112,6 +113,12 @@ def find_files_with_suffix(
112113
# just like '.jsonl.zst'
113114
file_suffixes = [suffix.lower() for suffix in file.suffixes]
114115
suffix = "".join(file_suffixes[-2:])
116+
elif GzipExtractor.is_extractable(file):
117+
# support gzip-format file
118+
# and use the last 2 sub-suffixes as the final suffix
119+
# just like '.jsonl.gz'
120+
file_suffixes = [suffix.lower() for suffix in file.suffixes]
121+
suffix = "".join(file_suffixes[-2:])
115122

116123
if not suffixes or (suffix in suffixes):
117124
if suffix not in file_dict:
Lines changed: 78 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,49 +1,117 @@
11
import os
22
import unittest
3+
import gzip
4+
import tempfile
5+
import shutil
36

47
from data_juicer.format.json_formatter import JsonFormatter
58
from data_juicer.format.load import load_formatter
69
from data_juicer.utils.unittest_utils import DataJuicerTestCaseBase
710

11+
try:
12+
import zstandard as zstd # type: ignore
13+
14+
HAS_ZSTD = True
15+
except Exception:
16+
zstd = None
17+
HAS_ZSTD = False
18+
819

920
class JsonFormatterTest(DataJuicerTestCaseBase):
1021

1122
def setUp(self):
1223
super().setUp()
1324

14-
self._path = os.path.join(os.path.dirname(os.path.realpath(__file__)),
15-
'data', 'structured')
16-
self._file = os.path.join(self._path, 'demo-dataset.jsonl')
25+
self._path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "data", "structured")
26+
self._file = os.path.join(self._path, "demo-dataset.jsonl")
1727
print(self._file)
28+
# create compressed variants for testing
29+
# create a temp directory to hold generated compressed files
30+
self._temp_dir = tempfile.mkdtemp()
31+
with open(self._file, "rb") as f:
32+
raw = f.read()
33+
34+
# .jsonl.gz
35+
self._jsonl_gz = os.path.join(self._temp_dir, "demo-dataset.jsonl.gz")
36+
with gzip.open(self._jsonl_gz, "wb") as f:
37+
f.write(raw)
38+
39+
# .json.gz (same content, different suffix)
40+
self._json_gz = os.path.join(self._temp_dir, "demo-dataset.json.gz")
41+
with gzip.open(self._json_gz, "wb") as f:
42+
f.write(raw)
43+
44+
# .json.zst and .jsonl.zst if zstandard available
45+
if HAS_ZSTD:
46+
self._jsonl_zst = os.path.join(self._temp_dir, "demo-dataset.jsonl.zst")
47+
self._json_zst = os.path.join(self._temp_dir, "demo-dataset.json.zst")
48+
cctx = zstd.ZstdCompressor()
49+
compressed = cctx.compress(raw)
50+
with open(self._jsonl_zst, "wb") as f:
51+
f.write(compressed)
52+
with open(self._json_zst, "wb") as f:
53+
f.write(compressed)
1854

1955
def test_json_file(self):
2056
formatter = JsonFormatter(self._file)
2157
ds = formatter.load_dataset()
2258
self.assertEqual(len(ds), 6)
23-
self.assertEqual(list(ds.features.keys()), ['text', 'meta'])
59+
self.assertEqual(list(ds.features.keys()), ["text", "meta"])
2460

2561
def test_json_path(self):
2662
formatter = JsonFormatter(self._path)
2763
ds = formatter.load_dataset()
2864
self.assertEqual(len(ds), 6)
29-
self.assertEqual(list(ds.features.keys()), ['text', 'meta'])
65+
self.assertEqual(list(ds.features.keys()), ["text", "meta"])
3066

3167
def test_load_formatter_with_file(self):
3268
"""Test load_formatter with a direct file path"""
3369
formatter = load_formatter(self._file)
3470
self.assertIsInstance(formatter, JsonFormatter)
3571
ds = formatter.load_dataset()
3672
self.assertEqual(len(ds), 6)
37-
self.assertEqual(list(ds.features.keys()), ['text', 'meta'])
73+
self.assertEqual(list(ds.features.keys()), ["text", "meta"])
3874

3975
def test_load_formatter_with_specified_suffix(self):
4076
"""Test load_formatter with specified suffixes"""
41-
formatter = load_formatter(self._path, suffixes=['.jsonl'])
77+
formatter = load_formatter(self._path, suffixes=[".jsonl"])
4278
self.assertIsInstance(formatter, JsonFormatter)
4379
ds = formatter.load_dataset()
4480
self.assertEqual(len(ds), 6)
45-
self.assertEqual(list(ds.features.keys()), ['text', 'meta'])
81+
self.assertEqual(list(ds.features.keys()), ["text", "meta"])
82+
83+
def tearDown(self):
84+
# cleanup temp dir and files
85+
if hasattr(self, "_temp_dir") and os.path.exists(self._temp_dir):
86+
shutil.rmtree(self._temp_dir)
87+
super().tearDown()
88+
89+
def test_jsonl_gz_file(self):
90+
formatter = JsonFormatter(self._jsonl_gz)
91+
ds = formatter.load_dataset()
92+
self.assertEqual(len(ds), 6)
93+
self.assertEqual(list(ds.features.keys()), ["text", "meta"])
94+
95+
def test_json_gz_file(self):
96+
formatter = JsonFormatter(self._json_gz)
97+
ds = formatter.load_dataset()
98+
self.assertEqual(len(ds), 6)
99+
self.assertEqual(list(ds.features.keys()), ["text", "meta"])
100+
101+
@unittest.skipUnless(HAS_ZSTD, "zstandard not installed")
102+
def test_json_zst_file(self):
103+
formatter = JsonFormatter(self._json_zst)
104+
ds = formatter.load_dataset()
105+
self.assertEqual(len(ds), 6)
106+
self.assertEqual(list(ds.features.keys()), ["text", "meta"])
107+
108+
@unittest.skipUnless(HAS_ZSTD, "zstandard not installed")
109+
def test_jsonl_zst_file(self):
110+
formatter = JsonFormatter(self._jsonl_zst)
111+
ds = formatter.load_dataset()
112+
self.assertEqual(len(ds), 6)
113+
self.assertEqual(list(ds.features.keys()), ["text", "meta"])
46114

47115

48-
if __name__ == '__main__':
49-
unittest.main()
116+
if __name__ == "__main__":
117+
unittest.main()

tests/utils/test_file_utils.py

Lines changed: 75 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,55 +1,56 @@
11
import os
22
import unittest
33
import regex as re
4+
import gzip
45

56
from data_juicer.utils.file_utils import (
6-
find_files_with_suffix, is_absolute_path,
7-
add_suffix_to_filename, create_directory_if_not_exists, transfer_filename,
8-
copy_data
7+
find_files_with_suffix,
8+
is_absolute_path,
9+
add_suffix_to_filename,
10+
create_directory_if_not_exists,
11+
transfer_filename,
12+
copy_data,
913
)
1014
from data_juicer.utils.mm_utils import Fields
1115

1216
from data_juicer.utils.unittest_utils import DataJuicerTestCaseBase
1317

18+
1419
class FileUtilsTest(DataJuicerTestCaseBase):
1520

1621
def setUp(self) -> None:
1722
super().setUp()
18-
self.temp_output_path = 'tmp/test_file_utils/'
23+
self.temp_output_path = "tmp/test_file_utils/"
1924
os.makedirs(self.temp_output_path)
2025

2126
def tearDown(self):
2227
if os.path.exists(self.temp_output_path):
23-
os.system(f'rm -rf {self.temp_output_path}')
28+
os.system(f"rm -rf {self.temp_output_path}")
2429
super().tearDown()
2530

2631
def test_find_files_with_suffix(self):
2732
# prepare test files
28-
fn_list = ['test1.txt', 'test2.txt', 'test3.md']
33+
fn_list = ["test1.txt", "test2.txt", "test3.md"]
2934
for fn in fn_list:
30-
with open(os.path.join(self.temp_output_path, fn), 'w') as f:
35+
with open(os.path.join(self.temp_output_path, fn), "w") as f:
3136
f.write(fn)
3237

33-
self.assertEqual(find_files_with_suffix(os.path.join(self.temp_output_path, 'test1.txt')),
34-
{'.txt': [os.path.join(self.temp_output_path, 'test1.txt')]})
38+
self.assertEqual(
39+
find_files_with_suffix(os.path.join(self.temp_output_path, "test1.txt")),
40+
{".txt": [os.path.join(self.temp_output_path, "test1.txt")]},
41+
)
3542
result = find_files_with_suffix(self.temp_output_path)
3643
expected = {
37-
'.txt': sorted([
38-
os.path.join(self.temp_output_path, 'test1.txt'),
39-
os.path.join(self.temp_output_path, 'test2.txt')
40-
]),
41-
'.md': [os.path.join(self.temp_output_path, 'test3.md')]
44+
".txt": sorted([os.path.join(self.temp_output_path, "test1.txt"), os.path.join(self.temp_output_path, "test2.txt")]),
45+
".md": [os.path.join(self.temp_output_path, "test3.md")],
4246
}
4347
for suffix in result:
4448
result[suffix] = sorted(result[suffix])
4549
self.assertEqual(result, expected)
4650

47-
result_txt = find_files_with_suffix(self.temp_output_path, 'txt')
51+
result_txt = find_files_with_suffix(self.temp_output_path, "txt")
4852
expected_txt = {
49-
'.txt': sorted([
50-
os.path.join(self.temp_output_path, 'test1.txt'),
51-
os.path.join(self.temp_output_path, 'test2.txt')
52-
])
53+
".txt": sorted([os.path.join(self.temp_output_path, "test1.txt"), os.path.join(self.temp_output_path, "test2.txt")])
5354
}
5455
for suffix in result_txt:
5556
result_txt[suffix] = sorted(result_txt[suffix])
@@ -60,10 +61,10 @@ def test_is_absolute_path(self):
6061
self.assertTrue(is_absolute_path(os.path.abspath(self.temp_output_path)))
6162

6263
def test_add_suffix_to_filename(self):
63-
self.assertEqual(add_suffix_to_filename('test.txt', '_suffix'), 'test_suffix.txt')
64-
self.assertEqual(add_suffix_to_filename('test.txt', ''), 'test.txt')
65-
self.assertEqual(add_suffix_to_filename('test', '_suffix'), 'test_suffix')
66-
self.assertEqual(add_suffix_to_filename('.git', '_suffix'), '.git_suffix')
64+
self.assertEqual(add_suffix_to_filename("test.txt", "_suffix"), "test_suffix.txt")
65+
self.assertEqual(add_suffix_to_filename("test.txt", ""), "test.txt")
66+
self.assertEqual(add_suffix_to_filename("test", "_suffix"), "test_suffix")
67+
self.assertEqual(add_suffix_to_filename(".git", "_suffix"), ".git_suffix")
6768

6869
def test_create_directory_if_not_exists(self):
6970
self.assertTrue(os.path.exists(self.temp_output_path))
@@ -76,55 +77,82 @@ def test_create_directory_if_not_exists(self):
7677

7778
def test_transfer_filename(self):
7879
# test existing file
79-
with open(os.path.join(self.temp_output_path, 'abc.jpg'), 'w') as f:
80-
f.write('test')
80+
with open(os.path.join(self.temp_output_path, "abc.jpg"), "w") as f:
81+
f.write("test")
8182
self.assertTrue(
8283
re.match(
83-
os.path.join(self.temp_output_path, Fields.multimodal_data_output_dir, 'op1', 'abc__dj_hash_#(.*?)#.jpg'),
84-
transfer_filename(os.path.join(self.temp_output_path, 'abc.jpg'), 'op1')))
84+
os.path.join(self.temp_output_path, Fields.multimodal_data_output_dir, "op1", "abc__dj_hash_#(.*?)#.jpg"),
85+
transfer_filename(os.path.join(self.temp_output_path, "abc.jpg"), "op1"),
86+
)
87+
)
8588
# test non-existing file
8689
self.assertTrue(
8790
re.match(
88-
os.path.join(self.temp_output_path, 'non-existing.jpg'),
89-
transfer_filename(os.path.join(self.temp_output_path, 'non-existing.jpg'), 'op1')))
91+
os.path.join(self.temp_output_path, "non-existing.jpg"),
92+
transfer_filename(os.path.join(self.temp_output_path, "non-existing.jpg"), "op1"),
93+
)
94+
)
9095
# test save_dir
9196
self.temp_output_path = os.path.abspath(self.temp_output_path)
9297
self.assertTrue(
9398
re.match(
94-
os.path.join(self.temp_output_path, 'tmp_save_dir', 'abc__dj_hash_#(.*?)#.jpg'),
95-
transfer_filename(os.path.join(self.temp_output_path, 'abc.jpg'), 'op1',
96-
save_dir=os.path.join(self.temp_output_path, 'tmp_save_dir'))))
99+
os.path.join(self.temp_output_path, "tmp_save_dir", "abc__dj_hash_#(.*?)#.jpg"),
100+
transfer_filename(
101+
os.path.join(self.temp_output_path, "abc.jpg"),
102+
"op1",
103+
save_dir=os.path.join(self.temp_output_path, "tmp_save_dir"),
104+
),
105+
)
106+
)
97107
# test env dir
98108
try:
99-
ori_env_dir = os.environ.get('DJ_PRODUCED_DATA_DIR', None)
100-
test_env_dir = os.path.join(self.temp_output_path, 'tmp_env_dir')
101-
os.environ['DJ_PRODUCED_DATA_DIR'] = test_env_dir
109+
ori_env_dir = os.environ.get("DJ_PRODUCED_DATA_DIR", None)
110+
test_env_dir = os.path.join(self.temp_output_path, "tmp_env_dir")
111+
os.environ["DJ_PRODUCED_DATA_DIR"] = test_env_dir
102112

103-
transfer_filename(os.path.join(self.temp_output_path, 'abc.jpg'), 'op1')
113+
transfer_filename(os.path.join(self.temp_output_path, "abc.jpg"), "op1")
104114
self.assertTrue(
105115
re.match(
106-
os.path.join(test_env_dir, 'op1', 'abc__dj_hash_#(.*?)#.jpg'),
107-
transfer_filename(os.path.join(self.temp_output_path, 'abc.jpg'), 'op1')))
116+
os.path.join(test_env_dir, "op1", "abc__dj_hash_#(.*?)#.jpg"),
117+
transfer_filename(os.path.join(self.temp_output_path, "abc.jpg"), "op1"),
118+
)
119+
)
108120
finally:
109121
if ori_env_dir:
110-
os.environ['DJ_PRODUCED_DATA_DIR'] = ori_env_dir
111-
elif 'DJ_PRODUCED_DATA_DIR' in os.environ:
112-
del os.environ['DJ_PRODUCED_DATA_DIR']
122+
os.environ["DJ_PRODUCED_DATA_DIR"] = ori_env_dir
123+
elif "DJ_PRODUCED_DATA_DIR" in os.environ:
124+
del os.environ["DJ_PRODUCED_DATA_DIR"]
113125

114126
def test_copy_data(self):
115-
tgt_fn = 'test.txt'
116-
ori_dir = os.path.join(self.temp_output_path, 'test1')
117-
tgt_dir = os.path.join(self.temp_output_path, 'test2')
127+
tgt_fn = "test.txt"
128+
ori_dir = os.path.join(self.temp_output_path, "test1")
129+
tgt_dir = os.path.join(self.temp_output_path, "test2")
118130

119131
self.assertFalse(copy_data(ori_dir, tgt_dir, tgt_fn))
120132

121133
os.makedirs(ori_dir, exist_ok=True)
122-
with open(os.path.join(ori_dir, tgt_fn), 'w') as f:
123-
f.write('test')
134+
with open(os.path.join(ori_dir, tgt_fn), "w") as f:
135+
f.write("test")
124136

125137
self.assertTrue(copy_data(ori_dir, tgt_dir, tgt_fn))
126138
self.assertTrue(os.path.exists(os.path.join(tgt_dir, tgt_fn)))
127139

140+
def test_find_files_with_suffix_gzip(self):
141+
# create a gzip compressed jsonl file and ensure it is detected as '.jsonl.gz'
142+
content = '{"text": "gzip test"}\n'
143+
gz_path = os.path.join(self.temp_output_path, "demo-dataset.jsonl.gz")
144+
with gzip.open(gz_path, "wb") as f:
145+
f.write(content.encode("utf-8"))
146+
147+
result = find_files_with_suffix(self.temp_output_path)
148+
149+
# normalize lists for comparison
150+
for suffix in result:
151+
result[suffix] = sorted(result[suffix])
152+
153+
self.assertIn(".jsonl.gz", result)
154+
self.assertEqual(result[".jsonl.gz"], [gz_path])
155+
128156

129-
if __name__ == '__main__':
157+
if __name__ == "__main__":
130158
unittest.main()

0 commit comments

Comments
 (0)