Skip to content

fix(ops): fix GeneralFusedOP discarding Mapper results in fused pipeline#928

Open
dubin555 wants to merge 2 commits intodatajuicer:mainfrom
dubin555:oss-scout/verify-fb-data_juicer-ops-op_fusion-py-240
Open

fix(ops): fix GeneralFusedOP discarding Mapper results in fused pipeline#928
dubin555 wants to merge 2 commits intodatajuicer:mainfrom
dubin555:oss-scout/verify-fb-data_juicer-ops-op_fusion-py-240

Conversation

@dubin555
Copy link
Contributor

@dubin555 dubin555 commented Mar 3, 2026

Summary

Fix a bug in GeneralFusedOP.process_batched() where the Mapper branch assigns the result of op.process_batched() to samples (the method parameter) instead of tmp_samples (the working variable). Since the loop reads from tmp_samples and the method returns tmp_samples, any Mapper that returns a new dict rather than mutating in-place has its results silently discarded.

Problem

# op_fusion.py:240 — BEFORE (buggy)
samples = op.process_batched(tmp_samples, **process_args)

The Filter branch on line 245 correctly updates tmp_samples:

tmp_samples = op.compute_stats_batched(tmp_samples, **process_args)

The Mapper branch should do the same but doesn't, so any Mapper that returns a new dict (rather than mutating the input in-place) has its transformations silently dropped.

Affected scenarios:

  • python_lambda_mapper with batched=True returning a new dict
  • Any custom/third-party Mapper that creates a new output dict instead of mutating in-place
  • Chained Mappers in general_fused_op where transformations should accumulate

Fix

One-line change — samplestmp_samples:

-                samples = op.process_batched(tmp_samples, **process_args)
+                tmp_samples = op.process_batched(tmp_samples, **process_args)

Tests

Added tests/ops/test_op_fusion_mapper_bug.py with three regression cases using mock Mappers:

Test Input Expected Bug gave
Single new-dict mapper ['hello', 'world'] ['HELLO', 'WORLD'] ['hello', 'world']
Two chained new-dict mappers ['hello', 'world'] ['HELLO_DONE', 'WORLD_DONE'] ['hello', 'world']
In-place mapper + new-dict mapper ['HELLO', 'WORLD'] ['hello_END', 'world_END'] ['hello', 'world']

All three pass after the fix. Existing test_op_fusion.py tests (test_border_cases, test_regular_config) continue to pass.

In GeneralFusedOP.process_batched(), the Mapper branch assigned the
result of op.process_batched() to `samples` (the method parameter)
instead of `tmp_samples` (the working variable used throughout the
loop). Since the method returns `tmp_samples` and subsequent ops read
from `tmp_samples`, any Mapper that returns a new dict (rather than
mutating in-place) had its results silently discarded.

This caused data loss when using GeneralFusedOP with mappers like
python_lambda_mapper (batched=True) or any custom mapper that creates
a new output dict. In-place-mutating mappers (the majority) were
unaffected, which masked the bug.

The fix changes `samples = ...` to `tmp_samples = ...` on line 240,
consistent with how the Filter branch already updates `tmp_samples`.
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request addresses a significant bug within the GeneralFusedOP component, ensuring that data transformations applied by Mappers are correctly propagated through the processing pipeline. Previously, Mappers that returned new dictionaries instead of modifying data in-place would have their results ignored, leading to incorrect or incomplete data processing. The fix rectifies this by correctly assigning the output of Mapper operations, thereby preserving transformations and maintaining data integrity within fused operations.

Highlights

  • Bug Fix: Corrected a critical bug in GeneralFusedOP.process_batched that caused Mapper operation results to be silently discarded if the Mapper returned a new dictionary instead of mutating in-place.
  • Test Coverage: Introduced a new test file (tests/ops/test_op_fusion_mapper_bug.py) with three regression tests to specifically target and verify the fix for this Mapper result discarding issue, covering single, chained, and mixed Mapper scenarios.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • data_juicer/ops/op_fusion.py
    • Corrected the variable assignment for Mapper results within the process_batched method of GeneralFusedOP to ensure that the tmp_samples working variable is updated with the Mapper's output.
  • tests/ops/test_op_fusion_mapper_bug.py
    • Added a new test file containing unittest cases to reproduce the GeneralFusedOP Mapper bug.
    • Included mock Mapper implementations (MockUpperCaseMapper, MockSuffixMapper, MockInPlaceMapper) to simulate different Mapper behaviors (returning new dicts vs. in-place mutation).
    • Implemented three specific regression tests: test_two_new_dict_mappers_results_chained, test_single_mapper_result_returned, and test_inplace_mapper_masks_bug, to validate the fix under various scenarios.
Activity
  • No specific review comments or activities have been recorded for this pull request yet.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request addresses a critical bug in GeneralFusedOP.process_batched() that caused results from certain Mappers to be silently discarded. The one-line fix correctly assigns the output of a processed batch to the working variable, ensuring that transformations are properly chained. The addition of test_op_fusion_mapper_bug.py provides excellent regression testing, with clear cases that reproduce the bug and validate the fix. The changes are correct and well-tested. I have a few suggestions to improve the new test code for better maintainability.

Comment on lines +39 to +48
def process_batched(self, samples, **kwargs):
# Create a NEW dict instead of mutating in-place
# This is what triggers the bug
new_samples = {}
for key in samples:
if key == self.text_key:
new_samples[key] = [t.upper() for t in samples[key]]
else:
new_samples[key] = samples[key]
return new_samples
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The implementation of process_batched can be made more concise and Pythonic. Instead of manually iterating over all keys to create a new dictionary, you can use samples.copy() for a shallow copy and then modify the required key. This improves readability and reduces boilerplate.

    def process_batched(self, samples, **kwargs):
        # Create a NEW dict instead of mutating in-place
        # This is what triggers the bug
        new_samples = samples.copy()
        new_samples[self.text_key] = [t.upper() for t in samples[self.text_key]]
        return new_samples

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion, updated to use samples.copy() — much cleaner. Also removed unused imports while at it.

Comment on lines +61 to +69
def process_batched(self, samples, **kwargs):
# Also returns a new dict
new_samples = {}
for key in samples:
if key == self.text_key:
new_samples[key] = [t + self.suffix for t in samples[key]]
else:
new_samples[key] = samples[key]
return new_samples
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Similar to MockUpperCaseMapper, this method can be simplified by using dict.copy() to create a shallow copy, which is more concise and readable than iterating through the dictionary keys.

    def process_batched(self, samples, **kwargs):
        # Also returns a new dict
        new_samples = samples.copy()
        new_samples[self.text_key] = [t + self.suffix for t in samples[self.text_key]]
        return new_samples

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, same change applied here.

Comment on lines +122 to +123
print(f"Result texts: {result['text']}")
print(f"Expected: ['HELLO_DONE', 'WORLD_DONE']")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

These print statements appear to be for debugging. It's a best practice to remove such statements from test code to keep the test runner's output clean and focused on failures. The assertions are sufficient for verifying correctness. Please remove this and similar print statements in the other test cases in this file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, removed all debug print statements. The assertions are sufficient.

- Use dict.copy() instead of manual key iteration in mock mappers
- Remove debug print statements from test methods
- Remove unused imports (deepcopy, OPERATORS, Filter)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant