fix(ops): fix GeneralFusedOP discarding Mapper results in fused pipeline#928
fix(ops): fix GeneralFusedOP discarding Mapper results in fused pipeline#928dubin555 wants to merge 2 commits intodatajuicer:mainfrom
Conversation
In GeneralFusedOP.process_batched(), the Mapper branch assigned the result of op.process_batched() to `samples` (the method parameter) instead of `tmp_samples` (the working variable used throughout the loop). Since the method returns `tmp_samples` and subsequent ops read from `tmp_samples`, any Mapper that returns a new dict (rather than mutating in-place) had its results silently discarded. This caused data loss when using GeneralFusedOP with mappers like python_lambda_mapper (batched=True) or any custom mapper that creates a new output dict. In-place-mutating mappers (the majority) were unaffected, which masked the bug. The fix changes `samples = ...` to `tmp_samples = ...` on line 240, consistent with how the Filter branch already updates `tmp_samples`.
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request addresses a significant bug within the Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request addresses a critical bug in GeneralFusedOP.process_batched() that caused results from certain Mappers to be silently discarded. The one-line fix correctly assigns the output of a processed batch to the working variable, ensuring that transformations are properly chained. The addition of test_op_fusion_mapper_bug.py provides excellent regression testing, with clear cases that reproduce the bug and validate the fix. The changes are correct and well-tested. I have a few suggestions to improve the new test code for better maintainability.
| def process_batched(self, samples, **kwargs): | ||
| # Create a NEW dict instead of mutating in-place | ||
| # This is what triggers the bug | ||
| new_samples = {} | ||
| for key in samples: | ||
| if key == self.text_key: | ||
| new_samples[key] = [t.upper() for t in samples[key]] | ||
| else: | ||
| new_samples[key] = samples[key] | ||
| return new_samples |
There was a problem hiding this comment.
The implementation of process_batched can be made more concise and Pythonic. Instead of manually iterating over all keys to create a new dictionary, you can use samples.copy() for a shallow copy and then modify the required key. This improves readability and reduces boilerplate.
def process_batched(self, samples, **kwargs):
# Create a NEW dict instead of mutating in-place
# This is what triggers the bug
new_samples = samples.copy()
new_samples[self.text_key] = [t.upper() for t in samples[self.text_key]]
return new_samplesThere was a problem hiding this comment.
Good suggestion, updated to use samples.copy() — much cleaner. Also removed unused imports while at it.
| def process_batched(self, samples, **kwargs): | ||
| # Also returns a new dict | ||
| new_samples = {} | ||
| for key in samples: | ||
| if key == self.text_key: | ||
| new_samples[key] = [t + self.suffix for t in samples[key]] | ||
| else: | ||
| new_samples[key] = samples[key] | ||
| return new_samples |
There was a problem hiding this comment.
Similar to MockUpperCaseMapper, this method can be simplified by using dict.copy() to create a shallow copy, which is more concise and readable than iterating through the dictionary keys.
def process_batched(self, samples, **kwargs):
# Also returns a new dict
new_samples = samples.copy()
new_samples[self.text_key] = [t + self.suffix for t in samples[self.text_key]]
return new_samplesThere was a problem hiding this comment.
Done, same change applied here.
| print(f"Result texts: {result['text']}") | ||
| print(f"Expected: ['HELLO_DONE', 'WORLD_DONE']") |
There was a problem hiding this comment.
These print statements appear to be for debugging. It's a best practice to remove such statements from test code to keep the test runner's output clean and focused on failures. The assertions are sufficient for verifying correctness. Please remove this and similar print statements in the other test cases in this file.
There was a problem hiding this comment.
Agreed, removed all debug print statements. The assertions are sufficient.
- Use dict.copy() instead of manual key iteration in mock mappers - Remove debug print statements from test methods - Remove unused imports (deepcopy, OPERATORS, Filter)
Summary
Fix a bug in
GeneralFusedOP.process_batched()where the Mapper branch assigns the result ofop.process_batched()tosamples(the method parameter) instead oftmp_samples(the working variable). Since the loop reads fromtmp_samplesand the method returnstmp_samples, any Mapper that returns a new dict rather than mutating in-place has its results silently discarded.Problem
The Filter branch on line 245 correctly updates
tmp_samples:The Mapper branch should do the same but doesn't, so any Mapper that returns a new dict (rather than mutating the input in-place) has its transformations silently dropped.
Affected scenarios:
python_lambda_mapperwithbatched=Truereturning a new dictgeneral_fused_opwhere transformations should accumulateFix
One-line change —
samples→tmp_samples:Tests
Added
tests/ops/test_op_fusion_mapper_bug.pywith three regression cases using mock Mappers:['hello', 'world']['HELLO', 'WORLD']['hello', 'world']['hello', 'world']['HELLO_DONE', 'WORLD_DONE']['hello', 'world']['HELLO', 'WORLD']['hello_END', 'world_END']['hello', 'world']All three pass after the fix. Existing
test_op_fusion.pytests (test_border_cases,test_regular_config) continue to pass.