中文 | English
BlossomData是一个专为大模型训练打造的数据处理框架,旨在提供灵活、高效的数据处理解决方案。它内置丰富的算子库,支持不同模态、不同训练阶段的数据处理与合成。能够从单机环境无缝迁移到分布式环境,允许用户灵活扩展自定义算子,从而大幅降低数据处理流程的开发和计算成本。
pip install blossom-data
# 从源码安装
# pip install git+https://github.com/Azure99/BlossomData.git
在使用之前,请在config.yaml
文件中配置模型服务提供商的API密钥和相关参数。(可参考config.yaml.example
)
下面是一个非常实用的示例,仅依赖数学题目和参考答案,即可合成经过验证的长推理中文训练数据。
框架提供了大量内置算子,可以在blossom.op中查看。例如,当原始数据缺失答案时,可以使用ChatDistiller生成回答,然后通过ChatMultiReasoningFilter基于投票方式过滤掉潜在错误样本。
from blossom import *
from blossom.dataframe import *
from blossom.op import *
from blossom.schema import *
# 示例数学指令数据
data = [
ChatSchema(
messages=[
user("Suppose that $wz = 12-8i$, and $|w| = \\sqrt{13}$. What is $|z|$?"),
assistant("4"),
]
),
]
# 定义要使用的算子
ops = [
# 利用非推理模型将指令数据翻译为中文
ChatTranslator(model="deepseek-chat", target_language="Chinese"),
# 使用推理模型生成问题的回答,并使用非推理模型验证回答正确性
ChatVerifyDistiller(
model="deepseek-reasoner",
mode=ChatVerifyDistiller.Mode.LLM,
validation_model="deepseek-chat",
),
# 将reasoning_content合并到content中,以便用于训练
ChatReasoningContentMerger(),
]
dataset = create_dataset(data)
result = dataset.execute(ops).collect()
print(result)
Schema是框架中的基础数据结构,用于表示和处理不同类型的数据。所有Schema都继承自基础Schema类,提供了统一的接口和功能。
# 文本数据
text_data = TextSchema(content="这是一段文本内容")
# 对话数据
chat_data = ChatSchema(
messages=[
user("你好"),
assistant("你好,请问需要什么帮助?")
]
)
# 结构化数据
row_data = RowSchema(data={"name": "张三", "age": 30, "score": 95})
# 自定义数据
custom_data = CustomSchema(data=1)
每个 Schema 实例都包含以下通用字段:
id
: 唯一标识符type
: Schema类型标识failed
: 处理失败标志metadata
: dict类型的附加元数据
DataFrame是对数据的抽象表示,提供了对数据进行转换、过滤和聚合的接口。框架支持多种DataFrame实现,包括Local、Spark和Ray,使得同一套代码可以在不同的执行引擎中运行。
Dataset是对DataFrame的高级封装,提供了更加便捷的接口和额外的功能,特别是对算子的支持。Dataset是用户交互的主要接口,隐藏了底层执行引擎的复杂性。
为保证数据的类型和结构一致,框架推荐使用强类型Schema。对于已有数据集,可通过实现自定义DataHandler来定制不同格式数据的加载与保存逻辑。
# 创建数据集
dataset = create_dataset(data, engine=DatasetEngine.LOCAL)
# 或从文件系统加载,并指定执行引擎为Ray,使用默认的数据加载器
dataset = load_dataset(
"example/data/chat.jsonl",
engine=DatasetEngine.RAY,
data_handler=DefaultDataHandler(),
)
dataset = (
# 对数据重新分区
dataset.repartition(2)
# 一对一映射, 取第一轮对话的用户prompt并转换为文本数据
# .map(lambda x: TextSchema(content=x.messages[0].content))
# 过滤出语言为英文的对话
.filter(lambda x: x.metadata["language"] == "en")
# 排序,按反馈分数降序
.sort(lambda x: x.metadata["feedback"], ascending=False)
# 新增元信息,基于对话内容计算总上下文长度
.add_metadata(
lambda x: {
"context_length": sum(len(message.content) for message in x.messages)
}
)
# 最多保留4条数据
.limit(4)
# 执行一系列算子
.execute(
[
# 翻译算子,将数据翻译为中文
ChatTranslator(model="gpt-4o-mini", target_language="Chinese"),
# 蒸馏算子,将翻译后的数据使用模型重新生成回复,并发为4
ChatDistiller(model="gpt-4o-mini", parallel=4),
]
)
# 缓存数据集,避免后续多个分支操作的重复计算
.cache()
)
# 收集结果(数据量较大时,请使用write_json)
results = dataset.collect()
# 将结果写入文件
dataset.write_json("output.jsonl")
# 聚合操作
agg_result = dataset.aggregate(
Sum(lambda x: x.metadata["context_length"]),
Mean(lambda x: x.metadata["context_length"]),
Count(),
)
# 分组后聚合
grouped_count = dataset.group_by(lambda x: x.metadata["country"]).count().collect()
print(results)
print(agg_result)
print(grouped_count)
Operator(算子)是数据处理的核心单元,封装了特定的数据处理逻辑。框架提供了三种基本类型的算子:
- MapOperator: 一对一映射,对每个元素单独处理
- FilterOperator: 过滤操作,决定保留或删除元素
- TransformOperator: 多对多映射,可以对多个元素同时操作
from blossom.op import *
from blossom.util import loads_markdown_first_json
# 使用装饰器定义自定义算子,第一个参数为元素
@map_operator()
def uppercase_text(item):
item.content = item.content.upper()
return item
@filter_operator()
def filter_short_text(item):
return len(item.content) > 10
@transform_operator()
def batch_process(items):
return [item for item in items if "keyword" in item.content]
# 使用上下文调用模型的算子,名称以context开头,第一个参数为上下文,第二个参数为元素
# 传入parallel以指定并发数
@context_map_operator(parallel=4)
def translate_with_model(context, item):
result = context.chat_completion(
"gpt-4o-mini",
[user(f"Translate to Chinese: {item.content}")]
)
return TextSchema(content=result)
# 继承Operator实现自定义算子
# 对于map算子,需要实现process_item方法,通过self.context访问上下文
class SelfQA(MapOperator):
def process_item(self, item):
self_qa_prompt = (
"基于给定的文本,随意生成一个问题以及对应的长答案。\n"
"你的输出应该是一个json,包含question、answer两个字符串字段,不需要输出任何其他的无关解释。\n"
f"给定的文本:{item.content}"
)
raw_result = self.context.chat_completion("gpt-4o-mini", [user(self_qa_prompt)])
result = loads_markdown_first_json(raw_result)
return ChatSchema(
messages=[
user(result["question"]),
assistant(result["answer"]),
]
)
Context(上下文)提供了算子执行所需的环境和资源,包括配置信息和模型提供者的访问。它是算子与外部资源交互的桥梁,使得算子可以访问模型服务、配置参数等。
# 创建上下文
context = Context()
# 访问配置
config = context.get_config()
# 获取模型提供者
provider = context.get_model("gpt-4o-mini")
# 使用模型生成内容
response = context.chat_completion(
"gpt-4o-mini",
[user("你好,请问今天天气如何?")]
)
# 生成嵌入向量
embedding = context.embedding("text-embedding-3-small", "这是一段文本")