-
Notifications
You must be signed in to change notification settings - Fork 2.4k
fix: Handle deletes and updates properly in secondary index #14090
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
Show resolved
Hide resolved
090e3b7
to
4c4f403
Compare
PR description please |
*/ | ||
@ParameterizedTest | ||
@CsvSource(Array("COPY_ON_WRITE,true", "COPY_ON_WRITE,false", "MERGE_ON_READ,true", "MERGE_ON_READ,false")) | ||
def testSecondaryIndexWithPartitionPathUpdateUsingGlobalIndex(tableType: HoodieTableType, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need both table types?
can we just keep it to COW table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partition path updates for MERGE_ON_READ
table would add log files for deletes and inserts after global index, which also reads the file groups. So it would be good to have test coverage on MERGE_ON_READ
table type.
* @param writeStatus the write status to process | ||
* @return list of metadata records | ||
*/ | ||
protected abstract List<HoodieRecord> generateRecords(WriteStatus writeStatus); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we file a follow up ticket to fix this to be an iterator.
not sure how much benefit we might get.
but to generate SI records for one file group, we have to read prev version of file slice, and new version of file slice and then compare to generate the SI records. not sure if we can do much by converting this to an iterator.
but we can file a follow up to attend do.
but don't expect it to give us any material gains.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm only refactoring the code here. We can improve the logic by using the iterator if needed in a separate PR.
LGTM for the most part. |
Yes, I'll add more tests and also fill out the PR description. Thanks for the initial review. |
sure. lmk once the patch is ready for review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hey @yihua : what more tests you are planning to add?
for regular RLI and SI, existing test should suffice.
for update partition path, I see you have added testSecondaryIndexWithPartitionPathUpdateUsingGlobalIndex
in this patch.
also, please open up the PR from draft state once you feel the patch is ready |
We can add more unit tests if needed; currently, the functional tests I added already cover the new changes. |
metadataRecordsPair = metadataRecords.mapToPair(r -> Pair.of(r.getKey(), r)); | ||
} | ||
return recordIndexRecordsPair.reduceByKey((SerializableBiFunction<HoodieRecord, HoodieRecord, HoodieRecord>) (record1, record2) -> { | ||
return metadataRecordsPair.reduceByKey((SerializableBiFunction<HoodieRecord, HoodieRecord, HoodieRecord>) (record1, record2) -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nsivabalan actually, for secondary index, if there are a delete and update on the same metadata record key (e.g., secondary_key$record_key
), we can remove both records, correct? This is because there is no other information stored outside secondary_key$record_key
, unlike RLI which stores the location in the record.
Describe the issue this Pull Request addresses
This PR addresses the correctness issue of secondary index during secondary index record generation from the update operation based on the commit metadata. There are cases where a delete and another update on the same metadata record key in secondary index happen, e.g., from partition path update. In this case, the secondary index is corrupted after the partition path update because the delete on the secondary index record takes precedence.
Summary and Changelog
The fix is to introduce
reduceByKeys
operation on the secondary index records before sending the records to the MDT partition for updates, similar to record index (RLI).HoodieTableMetadataUtils#reduceByKeys
so it can be reused by secondary index.HoodieBackedTableMetadataWriter#streamWriteToMetadataPartitions
to incurreduceByKeys
operation on secondary index records only so the shuffling of records is limited to the secondary index records.MetadataIndexMapper
is refactored and implementation classes are added for record index and secondary index. Note that before this PR, each spark task generates records for record index and secondary index. After this PR, there is one transformation with spark tasks generating records for record index withoutreduceByKey
, with another transformation with spark tasks generating records for secondary index which requiresreduceByKey
. In the streaming MDT writes, the reason to incurreduceByKeys
is that the secondary key can change in addition to partition path update, so we need to usereduceByKeys
to determine that.SecondaryIndexRecordGenerationUtils#convertWriteStatsToSecondaryIndexRecords
is changed to applyreduceByKeys
operation on the secondary index records before returning.TestSecondaryIndexPruning#testSecondaryIndexWithPartitionPathUpdateUsingGlobalIndex
as functional tests to cover the changes.Impact
Fixes correctness issue on handling deletes and updates in secondary index
Risk Level
low
Documentation Update
N/A
Contributor's checklist