A scalable data preprocessing framework built on PySpark for PCMind-2.1-Kaiyuan-2B, a leading fully open-source language model with 2B total parameters (1.4B non-embedding parameters).
This framework provides a tree-structured pipeline framework for large-scale data preprocessing, where input files serve as leaf nodes and the final output as the root. The design emphasizes:
- User-friendly: Declarative YAML-based configuration
- Scalable: Built on PySpark for distributed processing
- Modular: Extensible pipeline nodes for custom operations
- Production-ready: Supports both local development and YARN cluster deployment
Please deploy Spark on your server first. Then install the Python dependencies:
pip install -r requirements.txtModify the paths accordingly in example/read_write.yaml and execute:
bash script/run_yarn.sh main.py example/read_write.yamlThis basic pipeline demonstrates reading from Parquet files and writing back to Parquet format.
Note: We do not provide any dataset in this repository. You need to acquire the datasets according to their names.
The framework provides a comprehensive set of pipeline nodes. Some examples:
| Component | File | Description |
|---|---|---|
| Base Structure | base.py |
Abstract classes for custom node implementation |
| Filtering | filter.py |
Quality-based data filtering with configurable thresholds |
| Selection | selector.py |
Column selection and data projection |
| Sorting | reorder.py |
Single and multi-level sorting operations |
| Deduplication | deduplication/minhash.py |
MinHash-based near-duplicate detection |
| Group Mixing | group_reorder.py |
Stratified data mixing (see paper) |
| Quality Scoring | text_scorer.py |
FastText-based text quality assessment |
| I/O Operations | reader.py, writer.py |
Parquet, JSON, and custom format support |
Other specific pipeline nodes definition can see datafiner/. All nodes inherit from base classes in base.py, making it straightforward to implement custom operations.
Starter templates for common operations. Some examples:
read_write.yaml: Basic I/O pipeline (read from and write to Parquet files)dedup.yaml: Deduplication pipeline using MinHashfilter.yaml: Quality filtering based on score metricsreorder.yaml: Data sorting and shuffling examples
Other specific pipeline nodes definition can see example/.
Two deployment modes supported:
run_local.sh: Local mode for development and testingrun_yarn.sh: YARN cluster mode for production workloads
This directory contains the complete preprocessing pipeline used for PCMind-2.1-Kaiyuan-2B training data. The configuration is organized by processing stage:
Chinese text cleaning pipeline removing:
- Toxic content
- Slang and informal language
- Low-quality advertisements
Near-duplicate removal for major datasets:
- DCLM-Baseline: Deduplication of base training corpus
- Fineweb-Edu-Chinese-V2.1: Educational content deduplication
- FinePDF: Document-level deduplication
Score-based data sampling around target quality percentiles
Tokenization pipelines for various source datasets (JSON, Parquet)
Multi-phase training data preparation:
mix.yaml: Data mixing strategies per training phasecount.yaml: Token counting and statistics
Converting tokenized data back to text format for analysis
These configurations provide a complete recipe for reproducing the PCMind-2.1-Kaiyuan-2B training pipeline. Use them as reference for building custom preprocessing workflows.
You can modify the environment variables in the following scripts to suit your needs.
bash script/run_yarn.sh main.py /path/to/config.yamlbash script/run_local.sh main.py /path/to/config.yamlExample pipeline configuration:
spark:
app_name: my_preprocessing_pipeline
pipeline:
type: ParquetWriter
output_path: /output/path
child_configs:
- type: Filter
filter_col: quality_score
threshold: 0.7
child_configs:
- type: ParquetReader
input_path: /input/path/*.parquetPipelines are defined as trees where:
- Leaf nodes: Data readers (ParquetReader, JsonReader, etc.)
- Internal nodes: Transformations (Filter, Dedup, Reorder, etc.)
- Root node: Data writers (ParquetWriter, etc.)
Extend the framework by inheriting from PipelineNode:
from datafiner.base import PipelineNode
from datafiner.register import register
@register("CustomNode")
class CustomNode(PipelineNode):
def __init__(self, spark, custom_param, child_configs=None):
super().__init__(spark, child_configs)
self.custom_param = custom_param
def run(self):
df = self.children[0].run()
# Your custom logic here
return transformed_dfIf you use this framework in your research, please cite:
@article{luo2025pcmind21kaiyuan2btechnicalreport,
title={PCMind-2.1-Kaiyuan-2B Technical Report},
author={Kairong Luo and Zhenbo Sun and Xinyu Shi and Shengqi Chen and Bowen Yu anYunyi Chen and Chenyi Dang and Hengtao Tao and Hui Wang and Fangming Liu and KaifenLyu and Wenguang Chen},
year={2025},
eprint={2512.07612},
archivePrefix={arXiv},
primaryClass={cs.CL},
url={https://arxiv.org/abs/2512.07612},
}This repository is licensed under Apache-2.0 License with the following copyright notice:
Copyright 2025 Tsinghua University & Peng Cheng Laboratory
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.