Skip to content

Multi-Branch Execution (DAG feature enhancment) #915

@yxdyc

Description

@yxdyc

Before Submitting

  • I have asked the Data-Juicer Q&A Copilot (available on Doc Site, DingTalk, or Discord) about this feature, but still want to submit it as a formal request.

  • I have searched the Data-Juicer issues and found no similar feature requests.

Description

Users may need to execute multiple processing branches in parallel from a common checkpoint. Concretely,

  1. Execute a common preprocessing pipeline once (e.g., minerU → deduplication)
  2. Use the common result as input for multiple parallel branches
  3. Apply different processing pipelines to each branch independently
  4. Export each branch to separate output files

Use Case

Scenario: Data scientists and ML engineers need to apply different enhancement strategies to the same preprocessed dataset, especially in data-model co-dev, dataset lineage, cost-sensitive scenarios.

Example: After common preprocessing (mining, deduplication), you want to:

  • Apply data augmentation techniques in one branch
  • Apply HTML enhancement in another branch
  • Apply quality filtering in a third branch

All branches should run in parallel from the common checkpoint to save time and computational resources.

Who would use this:

  • Data engineers processing large datasets with multiple downstream applications
  • ML researchers experimenting with different data enhancement strategies
  • Teams needing to generate multiple dataset variants from a common base

Problem it solves:

  • Eliminates redundant execution of common preprocessing steps
  • Reduces total processing time through parallel branch execution
  • Provides a clean, declarative way to manage multiple processing pipelines
  • Enables efficient resource utilization when generating multiple dataset variants

Proposed Solution

Current Status

  • ✅ Common process execution with checkpointing
  • ✅ Sequential branch execution
  • ✅ Individual export paths per branch

Missing Features & Improvements Needed

  1. True Parallel Execution (High Priority)

    • Current: Branches execute sequentially in a for loop
    • Needed: Implement actual parallel execution using:
      • ThreadPoolExecutor for CPU-bound tasks
      • ProcessPoolExecutor for better isolation
      • Ray integration for distributed execution
      • Async/await pattern for I/O-bound operations
  2. DAG Execution Support (Medium Priority)

    • Current: BranchExecutor doesn't inherit from DAGExecutionMixin
    • Needed: Integrate DAG execution planning and monitoring:
      • Visualize branch execution as a DAG
      • Track execution status per branch
      • Enable better debugging and monitoring
  3. Event Logging Integration (Medium Priority)

    • Current: Missing EventLoggingMixin integration
    • Needed: Add event logging for:
      • Branch execution start/end events
      • Common process execution tracking
      • Error handling and recovery events
  4. Error Handling & Recovery (High Priority)

    • Current: Limited error handling
    • Needed:
      • Graceful failure handling per branch
      • Option to continue other branches if one fails
      • Retry mechanisms for failed branches
      • Better error reporting and diagnostics
  5. Resource Management (Medium Priority)

    • Current: No resource limits per branch
    • Needed:
      • Memory limits per branch
      • CPU allocation per branch
      • Progress tracking and resource usage monitoring
  6. Checkpoint Optimization (Low Priority)

    • Current: Basic checkpointing exists
    • Needed:
      • Incremental checkpoint support
      • Checkpoint validation and recovery
      • Shared checkpoint optimization
  7. Configuration Validation (Medium Priority)

    • Current: Basic validation exists
    • Needed:
      • Schema validation for branch configuration
      • Dependency checking between branches
      • Export path validation
  8. Testing & Documentation (High Priority)

    • Current: Basic documentation exists
    • Needed:
      • Unit tests for branch executor
      • Integration tests with various operators
      • Performance benchmarks
      • More comprehensive examples

Implementation Approach

  1. Phase 1: Parallel Execution

    # Use ThreadPoolExecutor or ProcessPoolExecutor
    from concurrent.futures import ThreadPoolExecutor, as_completed
    
    with ThreadPoolExecutor(max_workers=len(self.branches)) as executor:
        futures = {
            executor.submit(self._execute_branch, branch, ...): branch
            for branch in self.branches
        }
        # Handle results and errors
  2. Phase 2: Mixin Integration

    • Add DAGExecutionMixin and EventLoggingMixin to BranchExecutor
    • Implement DAG visualization for branch structure
    • Add event logging hooks
  3. Phase 3: Error Handling

    • Implement try-catch blocks around branch execution
    • Add configuration option for failure behavior (fail-fast vs. continue)
    • Implement retry logic with exponential backoff
  4. Phase 4: Testing & Documentation

    • Write comprehensive test suite
    • Add performance benchmarks
    • Update documentation with best practices

Additional Context

Related Features (mostly implemented in other executors from #748 )

  • DAG execution planning
  • Event logging system
  • Checkpoint management

Are you willing to submit a PR for this feature?

  • Yes I'd like to help by submitting a PR!

Metadata

Metadata

Assignees

Labels

dj:coreissues/PRs about the core functions of Data-JuicerenhancementNew feature or request

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions