我们将编码规范定义在 .pre-commit-config.yaml
中。在向仓库贡献代码之前,请使用 pre-commit
工具对代码进行自动规范化。
# ===========install pre-commit tool===========
pip install pre-commit
cd <path_to_data_juicer>
# install pre-commit script for data_juicer
pre-commit install
# ===========check all files===========
git add .
pre-commit run --all-files
# commit after all checking are passed
git commit -m "<your_commit_message>"
注意:我们在github workflow配置了pre-commit的检查。如果您的PR中该检查没通过,请在本地①确保pre-commit 的相关依赖与项目配置一致(可通过pre-commit clean
和pre-commit install
完成);②push前执行了pre-commit run --all-files
.
- Data-Juicer 支持每个人灵活、便捷定义自己的算子。
- 在实现新的算子之前,请参考已有 算子池 以避免不必要的重复。
- 根据实现完整性,算子会被分类为3类:
- 📣📣📣 社区贡献者可在alpha状态后就提相应算子PR。此后该贡献者可以与Data-Juicer团队一起在后续PR中,将其渐进完善到beta和stable版本。我们非常欢迎共建,并会高亮致谢!
下面以 “TextLengthFilter” 的算子(过滤仅包含预期文本长度的样本语料)为例,展示相应开发构建过程。
class StatsKeys(object):
... # other keys
text_len = 'text_len'
-
() 在
data_juicer/ops/filter/
目录下创建一个新的算子文件text_length_filter.py
,内容如下:- 因为它是一个 Filter 算子,所以需要继承
base_op.py
中的Filter
基类,并用@OPERATORS.register_module(xx_op)
装饰器标记,以实现自动注册。 - 为了方便实现,我们可以按单样本处理的方式实现两个核心方法
compute_stats_single
和process_single
,它们的输入输出均为单个样本的字典结构。 - 【进阶】如果你比较熟悉 Data-Juicer 中的batch化处理,你也可以通过覆写
compute_stats_batched
和process_batched
方法直接实现它们的batch化版本,它的处理会比单样本版本稍快一些。它们的输入和输出则是按列存储的字典结构,其中包括多个样本 (详见下方 2.1.3 小节)。
import sys from jsonargparse.typing import PositiveInt from data_juicer.utils.constant import Fields, StatsKeys from ..base_op import OPERATORS, Filter @OPERATORS.register_module('text_length_filter') class TextLengthFilter(Filter): """Filter to keep samples with total text length within a specific range.""" def __init__(self, min_len: PositiveInt = 10, max_len: PositiveInt = sys.maxsize, *args, **kwargs): """ Initialization method. :param min_len: The min text length in the filtering. samples will be filtered if their text length is below this parameter. :param max_len: The max text length in the filtering. samples will be filtered if their text length exceeds this parameter. :param args: extra args :param kwargs: extra args """ super().__init__(*args, **kwargs) self.min_len = min_len self.max_len = max_len def compute_stats_single(self, sample): # check if it's computed already if StatsKeys.text_len in sample[Fields.stats]: return sample sample[Fields.stats][StatsKeys.text_len] = len(sample[self.text_key]) return sample def process_single(self, sample): if self.min_len <= sample[Fields.stats][StatsKeys.text_len] <= self.max_len: return True else: return False
- 因为它是一个 Filter 算子,所以需要继承
-
() 实现后,将其添加到
data_juicer/ops/filter
目录下__init__.py
文件中的算子字典中:
from . import (..., # other OPs
text_length_filter) # import this new OP module
# other OPs
from text_length_filter import TextLengthFilter # import this new OP class
__all__ = [
# other Ops
text_length_filter, # add this new Op to __all__
]
-
() 算子有
environments/science_requires.txt
中列举的包依赖时,需要在data_juicer/utils/auto_install_mapping.py
里的OPS_TO_PKG
中添加对应的依赖包,以支持算子粒度的依赖安装。 -
全部完成!现在您可以在自己的配置文件中使用新添加的算子:
# other configs
...
# process configs
process:
- text_length_filter: # add this op to your process list and set the parameters
min_len: 10
max_len: 1000
- ( 强烈推荐)为了增强代码鲁棒性、验证正确性和直观展示如何使用其功能,最好为新添加的算子进行单元测试。对于上面的
TextLengthFilter
算子,在tests/ops/filter/
中实现如test_text_length_filter.py
的测试文件:
import unittest
from data_juicer.ops.filter.text_length_filter import TextLengthFilter
from data_juicer.utils.unittest_utils import DataJuicerTestCaseBase
class TextLengthFilterTest(DataJuicerTestCaseBase):
def test_func1(self):
pass
def test_func2(self):
pass
def test_func3(self):
pass
if __name__ == '__main__':
unittest.main()
-
( 强烈推荐)为了方便其他用户理解和使用,最好将新增的算子信息更新到相应的文档中,具体包括如下两个基本动作:
- 请在算子基类的doc string中补充基础信息,确保其完整可读(包括算子基本功能描述、入参、出参等)。无需用户麻烦地多处撰写,我们的
pre-commit
和sphinx构建脚本会自动抽取doc string形成算子池文档和API文档。 configs/config_all.yaml
:该全集配置文件保存了所有算子及参数的一个列表,作为一些自动化特性的信息来源以及用户参考可用算子的一个重要文档之一。因此,在新增算子后,请将其也添加到该文档process列表里(按算子类型分组并按字母序排序):
... - stopwords_filter: # filter text with stopword ratio smaller than a specific min value lang: en # consider stopwords in what language tokenization: false # whether to use model to tokenize documents min_ratio: 0.3 # the min ratio to filter text stopwords_dir: ./assets # directory to store stopwords dictionaries use_words_aug: false # whether to augment words, especially for Chinese and Vietnamese words_aug_group_sizes: [2] # the group size of words to augment words_aug_join_char: "" # the join char between words to augment - text_length_filter: # filter text with length out of specific range min_len: 10 # the min length of filter range max_len: 10000 # the max length of filter range - token_num_filter: # filter text with total token number out of specific range hf_tokenizer: EleutherAI/pythia-6.9b-deduped # name of used Hugging Face tokenizer min_num: 10 # the min number of filter range max_num: 10000 # the max number of filter range ...
- 请在算子基类的doc string中补充基础信息,确保其完整可读(包括算子基本功能描述、入参、出参等)。无需用户麻烦地多处撰写,我们的
-
() 如果在算子中使用了 Hugging Face 模型,您可能希望利用 GPU 加速。为了实现这一点,请在算子的构造函数中声明
_accelerator = 'cuda'
,并确保compute_stats_single/batched
和process_single/batched
方法接受一个额外的位置参数rank
。# ... (same as above) @OPERATORS.register_module('text_length_filter') class TextLengthFilter(Filter): _accelerator = 'cuda' def __init__(self, min_len: PositiveInt = 10, max_len: PositiveInt = sys.maxsize, *args, **kwargs): # ... (same as above) def compute_stats_single(self, sample, rank=None): # ... (same as above) def process_single(self, sample, rank=None): # ... (same as above)
-
() 如果算子批量处理数据,输入不是一个样本而是一个batch,或者你想在单样本实现上直接激活batch化处理,需要声明
_batched_op = True
。 - 对于单样本实现中原来的compute_stats_single
和process_single
方法,你可以保持它们不变,Data-Juicer 会调用默认的batch化处理版本,它们会自动拆分单个样本以调用单样本版本的两个方法来支持batch化处理。你也可以自行实现更高效的batch化的版本。# ... (import some other libraries) OP_NAME = 'image_diffusion_mapper' @OPERATORS.register_module(OP_NAME) @LOADED_IMAGES.register_module(OP_NAME) class ImageDiffusionMapper(Mapper): _batched_op = True def __init__(self, # ... (OP parameters) *args, **kwargs): super().__init__(*args, **kwargs) def process_batched(self, samples): # ... (some codes)
-
() 在mapper算子中,我们提供了产生额外数据的存储路径生成接口,避免出现进程冲突和数据覆盖的情况。生成的存储路径格式为
{ORIGINAL_DATAPATH}/__dj__produced_data__/{OP_NAME}/{ORIGINAL_FILENAME}__dj_hash_#{HASH_VALUE}#.{EXT}
,其中HASH_VALUE
是算子初始化参数、每个样本中相关参数、进程ID和时间戳的哈希值。为了方便,可以在OP类初始化开头调用self.remove_extra_parameters(locals())
获取算子初始化参数,同时可以调用self.add_parameters
添加每个样本与生成额外数据相关的参数。例如,利用diffusion模型对图像进行增强的算子:# ... (import some library) OP_NAME = 'image_diffusion_mapper' @OPERATORS.register_module(OP_NAME) @LOADED_IMAGES.register_module(OP_NAME) class ImageDiffusionMapper(Mapper): def __init__(self, # ... (OP parameters) *args, **kwargs): super().__init__(*args, **kwargs) self._init_parameters = self.remove_extra_parameters(locals()) def process_single(self, sample): # ... (some codes) # captions[index] is the prompt for diffusion model related_parameters = self.add_parameters( self._init_parameters, caption=captions[index]) new_image_path = transfer_filename( origin_image_path, OP_NAME, **related_parameters) # ... (some codes)
针对一个数据源衍生出多个额外数据的情况,我们允许在生成的存储路径后面再加后缀。比如,根据关键帧将视频拆分成多个视频:
# ... (import some library) OP_NAME = 'video_split_by_key_frame_mapper' @OPERATORS.register_module(OP_NAME) @LOADED_VIDEOS.register_module(OP_NAME) class VideoSplitByKeyFrameMapper(Mapper): def __init__(self, # ... (OP parameters) *args, **kwargs): super().__init__(*args, **kwargs) self._init_parameters = self.remove_extra_parameters(locals()) def process_single(self, sample): # ... (some codes) split_video_path = transfer_filename( original_video_path, OP_NAME, **self._init_parameters) split_video_path = add_suffix_to_filename(split_video_path, f'_{count}') # ... (some codes)
- 如果我们的新算子中的部分中间变量的计算过程与已有的算子重复,那么可以将其添加到可融合算子中,以在数据处理时利用算子融合进行加速。(如
words_num_filter
与word_repetition_filter
都需要对输入文本进行分词) - 当算子融合(OP Fusion)功能开启时,这些重复的计算过程和中间变量是可以在算子之间的
context
中共享的,从而可以减少重复计算。 - 可通过如下步骤使包含共有中间变量的算子可进行算子融合(以
words_num_filter
算子为例)。
- (可选)如果新算子中产生了新的中间变量,需要在
utils/constant.py
中的InterVars
类中添加新的中间变量名称。通常需要在名称前加上DEFAULT_PREFIX
前缀。
class InterVars(object):
# text
lines = DEFAULT_PREFIX + 'lines'
words = DEFAULT_PREFIX + 'words' # 在这里添加新的中间变量
...
- (可选)第1步中添加的新的中间变量还需在
ops/op_fusion.py
中为其定义一个注册组,并添加到保存了所有注册组的列表中,方便算子融合模块追踪涉及到这些中间变量的算子。
...
# Type of intermediate vars
# text
INTER_LINES = Registry(InterVars.lines)
INTER_WORDS = Registry(InterVars.words) # 为新的中间变量定义注册组
# images
LOADED_IMAGES = Registry(InterVars.loaded_images)
# all
ALL_INTER_VARS = [INTER_LINES, INTER_WORDS, LOADED_IMAGES] # 并添加到注册组列表中
...
- 在涉及到该中间变量的算子前,将该算子注册到中间变量对应的注册组中,表示该算子中可能对该中间变量进行了计算与使用。
...
@OPERATORS.register_module(OP_NAME)
@INTER_WORDS.register_module(OP_NAME) # 将该算子注册到注册组中
class WordsNumFilter(Filter):
...
- 在算子计算该中间变量的过程中,可将计算逻辑修改为:
- 如果
context
参数为True,则表示已开启了算子融合,优先从context
中获取前序算子已经计算过的该中间变量的值 - 如果中间变量在
context
中不存在,则表示在该算子中首次对该中间变量进行计算,在计算完成后,定义一个唯一的key并将其存放到context
中,以供后续算子使用 - 如果
context
参数为False,则按照正常计算流程进行
- 如果
# 修改计算逻辑前
...
tokenizer = get_model(self.model_key)
words = get_words_from_document(
sample[self.text_key],
token_func=tokenizer.encode_as_pieces if tokenizer else None)
...
# 修改计算逻辑后
...
words_key = f'{InterVars.words}-{self.model_key}'
if context and words_key in sample[Fields.context]:
# 直接使用context中已有的中间变量值
words = sample[Fields.context][words_key]
else:
# 正常计算流程
tokenizer = get_model(self.model_key)
words = get_words_from_document(
sample[self.text_key],
token_func=tokenizer.encode_as_pieces if tokenizer else None)
if context:
# 第一次计算该中间变量后,放入context供后续算子使用
sample[Fields.context][words_key] = words
...
- 随着算子数量的增加,Data-Juicer的依赖也不断增多。为了防止Data-Juicer的依赖越来越重,我们为算子额外增加的依赖提供了一套延迟加载加上使用时安装依赖的策略。
LazyLoader
会检查加载的module对应的package有没有都安装,没有的话会动态自动安装。AUTOINSTALL
用于安装额外的补丁。如下样例:
# ... (import some library)
from data_juicer.utils.lazy_loader import LazyLoader, AUTOINSTALL
# lazy import
kenlm = LazyLoader('kenlm', 'kenlm')
sp = LazyLoader('sp', 'sentencepiece')
class PerplexityFilter(Filter):
def __init__(self,
# ... (OP parameters)
*args,
**kwargs):
# auto install before init
super().__init__(*args, **kwargs)
AUTOINSTALL.check(['fasttext-wheel'])
# ... (some codes)
def process_single(self, sample):
# ... (some codes)
- 至此,该算子已经能够在算子融合功能开启后,自动地与其他算子进行融合并共享共有的中间变量,减少重复计算,加快整体的数据处理速度
- 我们提供基于 jsonargparse 的简单配置以降低样板代码的成本。
- 我们提供大量的示例性菜谱以供参阅复用和扩展,数据菜谱Gallery。
- 📣📣📣 社区贡献者可提PR在数据菜谱Gallery中添加自定义的数据菜谱,促进传播、复用和相关技术演进。我们非常欢迎共建,并会高亮致谢!
- 全局配置对象可以通过以下方式初始化
# core.executor.py
self.cfg = init_configs()
-
其中可以指定和混合来自不同来源的函数参数,包括
- 硬编码默认值 将配置注册到解析器中或在类的
__init__
函数中指定 - json 格式的默认配置文件(yaml 或 jsonnet 超集)
- 环境变量
- POSIX-style 命令行参数, 例如
--project_name my_data_demo
或--project_name=my_data_demo
,包含配置文件
- 硬编码默认值 将配置注册到解析器中或在类的
-
最终解析的值是来自这些来源的混合。 并且覆盖顺序与上面的数字相同。
此外,还支持许多参数类型和相应的验证。
包含 Python内置类型、来自 Lib/typing 的类型,以及来自 jsonargparse 的 扩展类型,例如具有自定义限制的 restricted types
和 Paths
。
- 您可以在参数名称中自由使用点符号来定义层次结构, 例如
maximum_line_length_filter.min
. 更重要的是,默认情况下,我们自动注册已实现的运算符的 docstring。 也就是说,所有的结构配置始终与代码同步。 - 您可以通过运行脚本来获取层次化的帮助信息,例如:
$ python tools/process_data.py --help
usage: process_data.py [-h] [--config CONFIG] [--print_config[=flags]] [--project_name PROJECT_NAME] [--dataset_path DATASET_PATH] [--dataset_dir DATASET_DIR] [--export_path EXPORT_PATH] [--process PROCESS]
[--np NP] [--text_kes TEXT_KEYS] [--document_deduplicator CONFIG] [--document_deduplicator.hash_method HASH_METHOD] [--document_deduplicator.lowercase LOWERCASE]
[--document_deduplicator.ignore_non_character IGNORE_NON_CHARACTER] [--language_id_score_filter CONFIG] [--language_id_score_filter.lang LANG] [--words_num_filter CONFIG] [--words_num_filter.min MIN] [--words_num_filter.max MAX]
[--alphanumeric_filter CONFIG] [--alphanumeric_filter.min MIN] [--alphanumeric_filter.max MAX] [--average_line_length_filter CONFIG] [--average_line_length_filter.min MIN] [--average_line_length_filter.max MAX]
[--maximum_line_length_filter CONFIG] [--maximum_line_length_filter.min MIN] [--maximum_line_length_filter.max MAX] [--text_length_filter CONFIG] [--text_length_filter.min MIN] [--text_length_filter.max MAX]
[--remove_comments_mapper CONFIG] [--remove_comments_mapper.type TYPE] [--remove_comments_mapper.inline INLINE] [--remove_comments_mapper.multiline MULTILINE] [--remove_header_mapper CONFIG]
[--remove_header_mapper.before_section BEFORE_SECTION]
optional arguments:
-h, --help Show this help message and exit.
--config CONFIG Path to a configuration file.
--print_config[=flags]
Print the configuration after applying all other arguments and exit. The optional flags customizes the output and are one or more keywords separated by comma. The supported flags are: comments, skip_default, skip_null.
--project_name PROJECT_NAME
name of your data process project. (type: str, default: null)
--dataset_path DATASET_PATH
path to your dataset file, relative with respect to the config file’s location (type: Path_fr, default: null)
--dataset_dir DATASET_DIR
path to your dataset(s) within a directory, relative with respect to the config file’s location (type: Path_drw, default: null)
--export_path EXPORT_PATH
path to the output processed dataset, relative with respect to the config file’s location (type: Path_fc, default: null)
--process PROCESS, --process+ PROCESS
a list of several process operators with their arguments (type: List[Dict], default: null)
--np NP number of subprocess to process your dataset. (type: PositiveInt, default: null)
<class 'data_juicer.ops.filter.alphanumeric_filter.AlphanumericFilter'>:
--alphanumeric_filter CONFIG
Path to a configuration file.
--alphanumeric_filter.min MIN
the min filter rate in alphanumeric op. (type: ClosedUnitInterval, default: 0.0)
--alphanumeric_filter.max MAX
the max filter rate in alphanumeric op. (type: ClosedUnitInterval, default: 0.25)
<class 'data_juicer.ops.filter.text_length_filter.TextLengthFilter'>:
--text_length_filter CONFIG
Path to a configuration file.
--text_length_filter.min MIN
min text length in the filtering (type: int, default: 10)
--text_length_filter.max MAX
max text length in the filtering (type: int, default: 10000)
......