Skip to content

Conversation

yihua
Copy link
Contributor

@yihua yihua commented Oct 14, 2025

Describe the issue this Pull Request addresses

This PR addresses the correctness issue of secondary index during secondary index record generation from the update operation based on the commit metadata. There are cases where a delete and another update on the same metadata record key in secondary index happen, e.g., from partition path update. In this case, the secondary index is corrupted after the partition path update because the delete on the secondary index record takes precedence.

Summary and Changelog

The fix is to introduce reduceByKeys operation on the secondary index records before sending the records to the MDT partition for updates, similar to record index (RLI).

  • Refactors HoodieTableMetadataUtils#reduceByKeys so it can be reused by secondary index.
  • For streaming MDT writes: changes HoodieBackedTableMetadataWriter#streamWriteToMetadataPartitions to incur reduceByKeys operation on secondary index records only so the shuffling of records is limited to the secondary index records. MetadataIndexMapper is refactored and implementation classes are added for record index and secondary index. Note that before this PR, each spark task generates records for record index and secondary index. After this PR, there is one transformation with spark tasks generating records for record index without reduceByKey, with another transformation with spark tasks generating records for secondary index which requires reduceByKey. In the streaming MDT writes, the reason to incur reduceByKeys is that the secondary key can change in addition to partition path update, so we need to use reduceByKeys to determine that.
  • For non-streaming MDT writes: SecondaryIndexRecordGenerationUtils#convertWriteStatsToSecondaryIndexRecords is changed to apply reduceByKeys operation on the secondary index records before returning.
  • Cleans up unused interface and methods.
  • Adds TestSecondaryIndexPruning#testSecondaryIndexWithPartitionPathUpdateUsingGlobalIndex as functional tests to cover the changes.

Impact

Fixes correctness issue on handling deletes and updates in secondary index

Risk Level

low

Documentation Update

N/A

Contributor's checklist

  • Read through contributor's guide
  • Enough context is provided in the sections above
  • Adequate tests were added if applicable

@github-actions github-actions bot added the size:XS PR with lines of changes in <= 10 label Oct 14, 2025
@github-actions github-actions bot added size:M PR with lines of changes in (100, 300] and removed size:XS PR with lines of changes in <= 10 labels Oct 16, 2025
@yihua yihua changed the title fix!: Change how deletes are encoded in record index and secondary index fix!: Handle deletes and updates properly in secondary index Oct 16, 2025
@github-actions github-actions bot added size:L PR with lines of changes in (300, 1000] and removed size:M PR with lines of changes in (100, 300] labels Oct 16, 2025
@yihua yihua changed the title fix!: Handle deletes and updates properly in secondary index fix: Handle deletes and updates properly in secondary index Oct 16, 2025
@hudi-bot
Copy link

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@nsivabalan
Copy link
Contributor

PR description please

*/
@ParameterizedTest
@CsvSource(Array("COPY_ON_WRITE,true", "COPY_ON_WRITE,false", "MERGE_ON_READ,true", "MERGE_ON_READ,false"))
def testSecondaryIndexWithPartitionPathUpdateUsingGlobalIndex(tableType: HoodieTableType,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need both table types?
can we just keep it to COW table.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partition path updates for MERGE_ON_READ table would add log files for deletes and inserts after global index, which also reads the file groups. So it would be good to have test coverage on MERGE_ON_READ table type.

* @param writeStatus the write status to process
* @return list of metadata records
*/
protected abstract List<HoodieRecord> generateRecords(WriteStatus writeStatus);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we file a follow up ticket to fix this to be an iterator.
not sure how much benefit we might get.
but to generate SI records for one file group, we have to read prev version of file slice, and new version of file slice and then compare to generate the SI records. not sure if we can do much by converting this to an iterator.
but we can file a follow up to attend do.

but don't expect it to give us any material gains.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm only refactoring the code here. We can improve the logic by using the iterator if needed in a separate PR.

@nsivabalan
Copy link
Contributor

nsivabalan commented Oct 16, 2025

LGTM for the most part.
guess you plan to add more tests.

@yihua
Copy link
Contributor Author

yihua commented Oct 17, 2025

LGTM for the most part. guess you plan to add more tests.

Yes, I'll add more tests and also fill out the PR description. Thanks for the initial review.

@nsivabalan
Copy link
Contributor

sure. lmk once the patch is ready for review.

Copy link
Contributor

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hey @yihua : what more tests you are planning to add?
for regular RLI and SI, existing test should suffice.
for update partition path, I see you have added testSecondaryIndexWithPartitionPathUpdateUsingGlobalIndex in this patch.

@nsivabalan
Copy link
Contributor

also, please open up the PR from draft state once you feel the patch is ready

@yihua
Copy link
Contributor Author

yihua commented Oct 20, 2025

hey @yihua : what more tests you are planning to add? for regular RLI and SI, existing test should suffice. for update partition path, I see you have added testSecondaryIndexWithPartitionPathUpdateUsingGlobalIndex in this patch.

We can add more unit tests if needed; currently, the functional tests I added already cover the new changes.

metadataRecordsPair = metadataRecords.mapToPair(r -> Pair.of(r.getKey(), r));
}
return recordIndexRecordsPair.reduceByKey((SerializableBiFunction<HoodieRecord, HoodieRecord, HoodieRecord>) (record1, record2) -> {
return metadataRecordsPair.reduceByKey((SerializableBiFunction<HoodieRecord, HoodieRecord, HoodieRecord>) (record1, record2) -> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nsivabalan actually, for secondary index, if there are a delete and update on the same metadata record key (e.g., secondary_key$record_key), we can remove both records, correct? This is because there is no other information stored outside secondary_key$record_key, unlike RLI which stores the location in the record.

@yihua yihua marked this pull request as ready for review October 20, 2025 06:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size:L PR with lines of changes in (300, 1000]

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants