Skip to content

Optimization framework#702

Draft
cyruszhang wants to merge 105 commits intomainfrom
feat/cyrusz/optimization-framework
Draft

Optimization framework#702
cyruszhang wants to merge 105 commits intomainfrom
feat/cyrusz/optimization-framework

Conversation

@cyruszhang
Copy link
Collaborator

@cyruszhang cyruszhang commented Jun 13, 2025

Summary

Introduces an automatic pipeline optimization framework that improves Data-Juicer processing performance through AST-based transformations and intelligent operation scheduling.

Key Features

  • Pipeline AST: Programmatic representation and manipulation of processing pipelines
  • Optimization Strategies:
    • op_reorder - Reorders filters (cheap first) to reduce data early
    • op_pruning - Removes no-op and duplicate operations
    • filter_fusion - Fuses filters sharing intermediate variables (not very rewarding in ray mode)
    • mapper_fusion - Combines consecutive mappers into single pass
  • Operation Probing: Empirical cost measurement for data-driven optimization
  • A/B Testing Framework: Statistical validation of optimization effectiveness

Results

  • Combined optimizations: 1.1~1.22x speedup (20 ops, c4 dataset, 3 iterations)
python tools/optimizer_benchmark/run_benchmark.py \
      --recipe-path tools/optimizer_benchmark/configs/combined_benchmark.yaml \
      --dataset-path <your_dataset.jsonl> \
      --strategies all_optimizations \
      --executor default \
      --iterations 3 

Usage

  • enable_optimizer: true
  • optimizer_strategies: ['op_reorder', 'filter_fusion']

Files

  • data_juicer/core/optimizer/ - Optimization strategies
  • data_juicer/core/pipeline_ast.py - AST infrastructure
  • data_juicer/benchmark/ - A/B testing framework
  • tools/optimizer_benchmark/ - Benchmark CLI tool

@cyruszhang cyruszhang added dj:core issues/PRs about the core functions of Data-Juicer dj:efficiency regarding to efficiency issues and enhancements labels Jun 15, 2025
@cyruszhang cyruszhang marked this pull request as draft February 23, 2026 18:05
@cyruszhang cyruszhang changed the title [WIP] Optimization framework Optimization framework Feb 23, 2026
cyruszhang and others added 2 commits February 23, 2026 10:16
Changes:
1. Consolidate op_fusion into optimization framework
   - OptimizationManager now handles both enable_optimizer and legacy op_fusion configs
   - Legacy op_fusion automatically maps to optimizer strategies:
     - greedy -> filter_fusion
     - probe -> op_reorder + filter_fusion
   - Removed duplicate fuse_operators code from all executors
   - Added deprecation warnings for op_fusion config

2. Add Pipeline Optimization design doc (docs/PipelineOptimization.md)
   - Documents Ray vs DJ optimization layers
   - Describes optimization strategies (filter pushdown, reordering, fusion)
   - Covers DataConnector interface for data source integration
   - Includes cost model and performance considerations

3. Fix Ray test directory issue (cherry-picked from release/v1.5.0)
   - Use shared /workspace directory instead of system /tmp
   - Ensures temp directories are accessible by all Ray workers
   - Fixes FileNotFoundError in distributed Ray mode

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
  Core fix:
  - Fixed critical bug in optimization_manager.py where fused operations
    (fused_filter, fused_mapper) weren't being created from AST nodes
  - Added _extract_ops_recursive, _create_fused_filter, _create_fused_mapper
    methods to properly construct FusedFilter/FusedMapper objects

  Benchmark improvements:
  - Renamed perf_test_pipeline_comparison.py to run_test.py
  - Added --strategies flag to test specific optimization strategies
    (e.g., --strategies filter_fusion or --strategies op_reorder)
  - Removed redundant shell script wrapper
  - Added generate_test_data.py for synthetic test data generation
  - Added optimizer_benchmark.yaml test config
  - Simplified README

  Verified: 1.24x speedup (19.6% improvement) with 10k samples
  1. Removed double execution bug in FusedFilter.run()
  2. Added context=True to enable sharing intermediate variables
  3. Added stats field initialization
  4. Skip filter fusion in Ray mode (Ray already parallelizes efficiently)
  5. Simplified _should_skip_fusion to always use fusion for shared-registry filters
  6. Fixed import av at module level
  Implement op_reorder strategy that moves cheap filters before expensive
  filters to reduce data volume early, improving pipeline performance by
  up to 33% when aggressive filtering removes significant data.

  Changes:
  - Rewrite op_reorder_strategy.py with cost-based filter prioritization
    - Define CHEAP_FILTERS (text_length, words_num, alphanumeric, etc.)
    - Define EXPENSIVE_FILTERS (stopwords, flagged_words, perplexity, etc.)
    - Preserve mapper order while reordering filters by cost
  - Add 19 unit tests for op_reorder strategy
  - Add benchmark config for testing reorder performance
  - Rename fusion_strategies.md to optimization_strategies.md
  - Add comprehensive documentation for all optimization strategies

  Benchmark results (C4 dataset, 356K samples, 75% filtered):
  - Baseline: 86.3s
  - Optimized: 57.9s
  - Speedup: 1.49x (+33%)
  - Add optimizer_benchmark_combined.yaml with all 3 strategies
    (op_reorder, filter_fusion, mapper_fusion)
  - Update all configs to use np: 12 (was 4)
  - Remove redundant configs (heavy, mappers, reorder, words)
  - Update optimization_strategies.md with:
    - Combined benchmark results (Default: +13%, Ray: +2.5%)
    - When to use Ray vs Default executor guidance
    - Simplified benchmarking commands
  - Add _log_optimization_result() to show operation order changes
  - Log BEFORE/AFTER operation chains for easy verification
  - Log specific position changes when reordering occurs
  - Log fusion count when operations are fused
  - Fix singleton issue in apply_optimizations (always create new manager)
  - Update Ray benchmark results with realistic variance (~3-6%)
add new optimization strategy that identifies and removes redundant
  operations from the pipeline:

  1. No-op filters: Filters with pass-through conditions (min=0, max=inf)
     - text_length_filter, words_num_filter, alphanumeric_filter
     - special_characters_filter, character/word_repetition_filter
     - stopwords_filter, flagged_words_filter, suffix_filter, etc.

  2. Duplicate operations: Consecutive identical operations

  3. No-op mappers: Mappers with empty configs (no chars to remove, etc.)

  Key changes:
  - Add op_pruning_strategy.py with OpPruningStrategy class
  - Add to default strategies (runs first, before op_reorder)
  - Config comparison ignores internal attributes (accelerator, batch_size)
  - Add 31 unit tests covering all prunable operation types
  - Add benchmark config with intentionally redundant operations
  - Refactor run_benchmark.py to use A/B testing framework in data_juicer/benchmark
  - Move synthetic data generation from tools/optimizer_benchmark to
    data_juicer/benchmark/workloads/synthetic_data.py for reuse
  - Simplify BenchmarkRunner to write optimizer config directly to files
    instead of using environment variables
  - Add op_pruning and all_optimizations strategies to strategy library
  - Add synthetic workload definitions with auto-generation support
  - Fix workload config paths to use demos/pipeline_optimization/configs
  - Update README with new API documentation
  Add operation probing capability to measure actual execution costs on
  sample data for more accurate pipeline optimization decisions.

  Changes:
  - Add OpProber class to measure operation time and selectivity on samples
  - Add optimizer_probe_enabled and optimizer_probe_samples config options
  - Update op_reorder_strategy to use probed costs when available
  - Add _reload_fresh_ops() to reset operation state after probing
  - Add detailed BEFORE/AFTER logging for optimization results
  - Add probe_benchmark.yaml config for demonstrating probing
  - Add FusionProbeResult dataclass and probe_fusion_benefit() method to OpProber
    for empirically testing whether filter fusion provides performance benefit
  - Update FilterFusionStrategy to support optional probe-based fusion validation
    with configurable speedup threshold (default 1.1x)
  - Improve Ray mode skip message with empirical justification: Ray's streaming
    executor pipelines many lightweight operations more efficiently than fewer
    heavy fused operations (tested: baseline 70s vs fusion 78s)
  - Fix cfg.get() AttributeError on Namespace objects by using getattr() in:
    - data_juicer/core/data/ray_dataset.py (video_key, image_key, audio_key,
      auto_op_parallelism)
    - data_juicer/utils/ray_utils.py (custom_operator_paths)
  - Update tests for new FilterFusionStrategy constructor parameters
  - Re-align all the benchmark configs

  Performance validation:
  - Local mode: 1.58x speedup with fusion (99s → 63s on 356K samples)
  - Ray mode: Correctly skipped (streaming pipeline more efficient)
Copy link
Collaborator

@fengrui-z fengrui-z left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a few type annotations fix

strategy_config: Dict[str, Any] = None
sample_ratio: float = 1.0
sample_method: str = "random"
executor_type: str = None # 'default' or 'ray', None uses config value
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

executor_type: str = None → should be Optional[str] = None to match WorkloadDefinition.

synthetic: bool = False # Whether this uses synthetic data
executor_type: Optional[str] = None # 'default' or 'ray', None uses config value

def __post_init__(self):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add executor_type validation in __post_init__:
like

if self.executor_type is not None and self.executor_type not in ('default', 'ray', 'ray_partitioned'):
    raise ValueError(f"Invalid executor_type: {self.executor_type}")

base_config = self._apply_sampling_config(base_config)

# Apply executor type override if specified
if self.config.executor_type:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a validation for executor_type value

if self.config.executor_type:
    if self.config.executor_type not in ("default", "ray"):
        raise ValueError(f"Invalid executor_type: {self.config.executor_type}. Must be 'default' or 'ray'.")
    base_config["executor_type"] = self.config.executor_type

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

dj:core issues/PRs about the core functions of Data-Juicer dj:efficiency regarding to efficiency issues and enhancements

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants