Draft
Conversation
Changes:
1. Consolidate op_fusion into optimization framework
- OptimizationManager now handles both enable_optimizer and legacy op_fusion configs
- Legacy op_fusion automatically maps to optimizer strategies:
- greedy -> filter_fusion
- probe -> op_reorder + filter_fusion
- Removed duplicate fuse_operators code from all executors
- Added deprecation warnings for op_fusion config
2. Add Pipeline Optimization design doc (docs/PipelineOptimization.md)
- Documents Ray vs DJ optimization layers
- Describes optimization strategies (filter pushdown, reordering, fusion)
- Covers DataConnector interface for data source integration
- Includes cost model and performance considerations
3. Fix Ray test directory issue (cherry-picked from release/v1.5.0)
- Use shared /workspace directory instead of system /tmp
- Ensures temp directories are accessible by all Ray workers
- Fixes FileNotFoundError in distributed Ray mode
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Core fix:
- Fixed critical bug in optimization_manager.py where fused operations
(fused_filter, fused_mapper) weren't being created from AST nodes
- Added _extract_ops_recursive, _create_fused_filter, _create_fused_mapper
methods to properly construct FusedFilter/FusedMapper objects
Benchmark improvements:
- Renamed perf_test_pipeline_comparison.py to run_test.py
- Added --strategies flag to test specific optimization strategies
(e.g., --strategies filter_fusion or --strategies op_reorder)
- Removed redundant shell script wrapper
- Added generate_test_data.py for synthetic test data generation
- Added optimizer_benchmark.yaml test config
- Simplified README
Verified: 1.24x speedup (19.6% improvement) with 10k samples
1. Removed double execution bug in FusedFilter.run() 2. Added context=True to enable sharing intermediate variables 3. Added stats field initialization 4. Skip filter fusion in Ray mode (Ray already parallelizes efficiently) 5. Simplified _should_skip_fusion to always use fusion for shared-registry filters 6. Fixed import av at module level
Implement op_reorder strategy that moves cheap filters before expensive
filters to reduce data volume early, improving pipeline performance by
up to 33% when aggressive filtering removes significant data.
Changes:
- Rewrite op_reorder_strategy.py with cost-based filter prioritization
- Define CHEAP_FILTERS (text_length, words_num, alphanumeric, etc.)
- Define EXPENSIVE_FILTERS (stopwords, flagged_words, perplexity, etc.)
- Preserve mapper order while reordering filters by cost
- Add 19 unit tests for op_reorder strategy
- Add benchmark config for testing reorder performance
- Rename fusion_strategies.md to optimization_strategies.md
- Add comprehensive documentation for all optimization strategies
Benchmark results (C4 dataset, 356K samples, 75% filtered):
- Baseline: 86.3s
- Optimized: 57.9s
- Speedup: 1.49x (+33%)
- Add optimizer_benchmark_combined.yaml with all 3 strategies
(op_reorder, filter_fusion, mapper_fusion)
- Update all configs to use np: 12 (was 4)
- Remove redundant configs (heavy, mappers, reorder, words)
- Update optimization_strategies.md with:
- Combined benchmark results (Default: +13%, Ray: +2.5%)
- When to use Ray vs Default executor guidance
- Simplified benchmarking commands
- Add _log_optimization_result() to show operation order changes - Log BEFORE/AFTER operation chains for easy verification - Log specific position changes when reordering occurs - Log fusion count when operations are fused - Fix singleton issue in apply_optimizations (always create new manager) - Update Ray benchmark results with realistic variance (~3-6%)
add new optimization strategy that identifies and removes redundant
operations from the pipeline:
1. No-op filters: Filters with pass-through conditions (min=0, max=inf)
- text_length_filter, words_num_filter, alphanumeric_filter
- special_characters_filter, character/word_repetition_filter
- stopwords_filter, flagged_words_filter, suffix_filter, etc.
2. Duplicate operations: Consecutive identical operations
3. No-op mappers: Mappers with empty configs (no chars to remove, etc.)
Key changes:
- Add op_pruning_strategy.py with OpPruningStrategy class
- Add to default strategies (runs first, before op_reorder)
- Config comparison ignores internal attributes (accelerator, batch_size)
- Add 31 unit tests covering all prunable operation types
- Add benchmark config with intentionally redundant operations
- Refactor run_benchmark.py to use A/B testing framework in data_juicer/benchmark
- Move synthetic data generation from tools/optimizer_benchmark to
data_juicer/benchmark/workloads/synthetic_data.py for reuse
- Simplify BenchmarkRunner to write optimizer config directly to files
instead of using environment variables
- Add op_pruning and all_optimizations strategies to strategy library
- Add synthetic workload definitions with auto-generation support
- Fix workload config paths to use demos/pipeline_optimization/configs
- Update README with new API documentation
Add operation probing capability to measure actual execution costs on sample data for more accurate pipeline optimization decisions. Changes: - Add OpProber class to measure operation time and selectivity on samples - Add optimizer_probe_enabled and optimizer_probe_samples config options - Update op_reorder_strategy to use probed costs when available - Add _reload_fresh_ops() to reset operation state after probing - Add detailed BEFORE/AFTER logging for optimization results - Add probe_benchmark.yaml config for demonstrating probing
- Add FusionProbeResult dataclass and probe_fusion_benefit() method to OpProber
for empirically testing whether filter fusion provides performance benefit
- Update FilterFusionStrategy to support optional probe-based fusion validation
with configurable speedup threshold (default 1.1x)
- Improve Ray mode skip message with empirical justification: Ray's streaming
executor pipelines many lightweight operations more efficiently than fewer
heavy fused operations (tested: baseline 70s vs fusion 78s)
- Fix cfg.get() AttributeError on Namespace objects by using getattr() in:
- data_juicer/core/data/ray_dataset.py (video_key, image_key, audio_key,
auto_op_parallelism)
- data_juicer/utils/ray_utils.py (custom_operator_paths)
- Update tests for new FilterFusionStrategy constructor parameters
- Re-align all the benchmark configs
Performance validation:
- Local mode: 1.58x speedup with fusion (99s → 63s on 356K samples)
- Ray mode: Correctly skipped (streaming pipeline more efficient)
fengrui-z
reviewed
Feb 26, 2026
Collaborator
fengrui-z
left a comment
There was a problem hiding this comment.
a few type annotations fix
| strategy_config: Dict[str, Any] = None | ||
| sample_ratio: float = 1.0 | ||
| sample_method: str = "random" | ||
| executor_type: str = None # 'default' or 'ray', None uses config value |
Collaborator
There was a problem hiding this comment.
executor_type: str = None → should be Optional[str] = None to match WorkloadDefinition.
| synthetic: bool = False # Whether this uses synthetic data | ||
| executor_type: Optional[str] = None # 'default' or 'ray', None uses config value | ||
|
|
||
| def __post_init__(self): |
Collaborator
There was a problem hiding this comment.
Add executor_type validation in __post_init__:
like
if self.executor_type is not None and self.executor_type not in ('default', 'ray', 'ray_partitioned'):
raise ValueError(f"Invalid executor_type: {self.executor_type}")| base_config = self._apply_sampling_config(base_config) | ||
|
|
||
| # Apply executor type override if specified | ||
| if self.config.executor_type: |
Collaborator
There was a problem hiding this comment.
Add a validation for executor_type value
if self.config.executor_type:
if self.config.executor_type not in ("default", "ray"):
raise ValueError(f"Invalid executor_type: {self.config.executor_type}. Must be 'default' or 'ray'.")
base_config["executor_type"] = self.config.executor_type
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Introduces an automatic pipeline optimization framework that improves Data-Juicer processing performance through AST-based transformations and intelligent operation scheduling.
Key Features
Results
Usage
Files