-
Notifications
You must be signed in to change notification settings - Fork 334
Description
Before Submitting
-
I have asked the Data-Juicer Q&A Copilot (available on Doc Site, DingTalk, or Discord) about this feature, but still want to submit it as a formal request.
-
I have searched the Data-Juicer issues and found no similar feature requests.
Description
Users may need to execute multiple processing branches in parallel from a common checkpoint. Concretely,
- Execute a common preprocessing pipeline once (e.g., minerU → deduplication)
- Use the common result as input for multiple parallel branches
- Apply different processing pipelines to each branch independently
- Export each branch to separate output files
Use Case
Scenario: Data scientists and ML engineers need to apply different enhancement strategies to the same preprocessed dataset, especially in data-model co-dev, dataset lineage, cost-sensitive scenarios.
Example: After common preprocessing (mining, deduplication), you want to:
- Apply data augmentation techniques in one branch
- Apply HTML enhancement in another branch
- Apply quality filtering in a third branch
All branches should run in parallel from the common checkpoint to save time and computational resources.
Who would use this:
- Data engineers processing large datasets with multiple downstream applications
- ML researchers experimenting with different data enhancement strategies
- Teams needing to generate multiple dataset variants from a common base
Problem it solves:
- Eliminates redundant execution of common preprocessing steps
- Reduces total processing time through parallel branch execution
- Provides a clean, declarative way to manage multiple processing pipelines
- Enables efficient resource utilization when generating multiple dataset variants
Proposed Solution
Current Status
- ✅ Common process execution with checkpointing
- ✅ Sequential branch execution
- ✅ Individual export paths per branch
Missing Features & Improvements Needed
-
True Parallel Execution (High Priority)
- Current: Branches execute sequentially in a for loop
- Needed: Implement actual parallel execution using:
- ThreadPoolExecutor for CPU-bound tasks
- ProcessPoolExecutor for better isolation
- Ray integration for distributed execution
- Async/await pattern for I/O-bound operations
-
DAG Execution Support (Medium Priority)
- Current:
BranchExecutordoesn't inherit fromDAGExecutionMixin - Needed: Integrate DAG execution planning and monitoring:
- Visualize branch execution as a DAG
- Track execution status per branch
- Enable better debugging and monitoring
- Current:
-
Event Logging Integration (Medium Priority)
- Current: Missing
EventLoggingMixinintegration - Needed: Add event logging for:
- Branch execution start/end events
- Common process execution tracking
- Error handling and recovery events
- Current: Missing
-
Error Handling & Recovery (High Priority)
- Current: Limited error handling
- Needed:
- Graceful failure handling per branch
- Option to continue other branches if one fails
- Retry mechanisms for failed branches
- Better error reporting and diagnostics
-
Resource Management (Medium Priority)
- Current: No resource limits per branch
- Needed:
- Memory limits per branch
- CPU allocation per branch
- Progress tracking and resource usage monitoring
-
Checkpoint Optimization (Low Priority)
- Current: Basic checkpointing exists
- Needed:
- Incremental checkpoint support
- Checkpoint validation and recovery
- Shared checkpoint optimization
-
Configuration Validation (Medium Priority)
- Current: Basic validation exists
- Needed:
- Schema validation for branch configuration
- Dependency checking between branches
- Export path validation
-
Testing & Documentation (High Priority)
- Current: Basic documentation exists
- Needed:
- Unit tests for branch executor
- Integration tests with various operators
- Performance benchmarks
- More comprehensive examples
Implementation Approach
-
Phase 1: Parallel Execution
# Use ThreadPoolExecutor or ProcessPoolExecutor from concurrent.futures import ThreadPoolExecutor, as_completed with ThreadPoolExecutor(max_workers=len(self.branches)) as executor: futures = { executor.submit(self._execute_branch, branch, ...): branch for branch in self.branches } # Handle results and errors
-
Phase 2: Mixin Integration
- Add
DAGExecutionMixinandEventLoggingMixintoBranchExecutor - Implement DAG visualization for branch structure
- Add event logging hooks
- Add
-
Phase 3: Error Handling
- Implement try-catch blocks around branch execution
- Add configuration option for failure behavior (fail-fast vs. continue)
- Implement retry logic with exponential backoff
-
Phase 4: Testing & Documentation
- Write comprehensive test suite
- Add performance benchmarks
- Update documentation with best practices
Additional Context
Related Features (mostly implemented in other executors from #748 )
- DAG execution planning
- Event logging system
- Checkpoint management
Are you willing to submit a PR for this feature?
- Yes I'd like to help by submitting a PR!