-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-7507] Fixing Rollbacks and Cleaning to acquire locks as needed #13064
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
nsivabalan
merged 9 commits into
apache:branch-0.x
from
nsivabalan:branch-0.x-rollback-timestamp-ordering
Jun 5, 2025
Merged
Changes from 7 commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
e23cbe1
Adding locks to rollback planning and checking for completed rollback…
nsivabalan f20131a
fixing build failures
nsivabalan 2be6cb1
Fixing test
nsivabalan 8686f3b
Fixing clean also to take locks to do timestamp validation
nsivabalan ebc1964
Addressing feedback
nsivabalan 911c925
Fixing reloading of timeline with timestamp validation
nsivabalan cdf9466
Fixing clean planning to take lock as well
nsivabalan 61a7e2e
Addressing minor feedback
nsivabalan 31a39b9
Removing transaction manager from HoodieTable
nsivabalan File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -32,6 +32,7 @@ | |
| import org.apache.hudi.avro.model.HoodieRollbackPlan; | ||
| import org.apache.hudi.avro.model.HoodieSavepointMetadata; | ||
| import org.apache.hudi.client.timeline.TimestampUtils; | ||
| import org.apache.hudi.client.transaction.TransactionManager; | ||
| import org.apache.hudi.common.HoodiePendingRollbackInfo; | ||
| import org.apache.hudi.common.config.HoodieMetadataConfig; | ||
| import org.apache.hudi.common.engine.HoodieEngineContext; | ||
|
|
@@ -58,6 +59,7 @@ | |
| import org.apache.hudi.common.table.view.TableFileSystemView; | ||
| import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView; | ||
| import org.apache.hudi.common.table.view.TableFileSystemView.SliceView; | ||
| import org.apache.hudi.common.util.Either; | ||
| import org.apache.hudi.common.util.Functions; | ||
| import org.apache.hudi.common.util.Option; | ||
| import org.apache.hudi.common.util.StringUtils; | ||
|
|
@@ -134,6 +136,7 @@ public abstract class HoodieTable<T, I, K, O> implements Serializable { | |
| private final HoodieTableMetadata metadata; | ||
| private final HoodieStorageLayout storageLayout; | ||
| private final boolean isMetadataTable; | ||
| private final TransactionManager txnManager; | ||
|
|
||
| private transient FileSystemViewManager viewManager; | ||
| protected final transient HoodieEngineContext context; | ||
|
|
@@ -153,6 +156,7 @@ protected HoodieTable(HoodieWriteConfig config, HoodieEngineContext context, Hoo | |
| this.index = getIndex(config, context); | ||
| this.storageLayout = getStorageLayout(config); | ||
| this.taskContextSupplier = context.getTaskContextSupplier(); | ||
| this.txnManager = new TransactionManager(config, metaClient.getStorage()); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure we introduce txnManager into hoodie table? It looks like it is only used for rollback, can we move it out? |
||
| } | ||
|
|
||
| public boolean isMetadataTable() { | ||
|
|
@@ -540,6 +544,7 @@ public HoodieCleanMetadata clean(HoodieEngineContext context, String cleanInstan | |
| * @param instantTime Instant Time for scheduling rollback | ||
| * @param instantToRollback instant to be rolled back | ||
| * @param shouldRollbackUsingMarkers uses marker based rollback strategy when set to true. uses list based rollback when false. | ||
| * @param isRestore {@code true} when invoked as part of restore. | ||
| * @return HoodieRollbackPlan containing info on rollback. | ||
| */ | ||
| public abstract Option<HoodieRollbackPlan> scheduleRollback(HoodieEngineContext context, | ||
|
|
@@ -665,8 +670,7 @@ private void rollbackInflightInstant(HoodieInstant inflightInstant, | |
| final String commitTime = getPendingRollbackInstantFunc.apply(inflightInstant.getTimestamp()).map(entry | ||
| -> entry.getRollbackInstant().getTimestamp()) | ||
| .orElseGet(HoodieActiveTimeline::createNewInstantTime); | ||
| scheduleRollback(context, commitTime, inflightInstant, false, config.shouldRollbackUsingMarkers(), | ||
| false); | ||
| scheduleRollback(commitTime, inflightInstant); | ||
| rollback(context, commitTime, inflightInstant, false, false); | ||
| getActiveTimeline().revertInstantFromInflightToRequested(inflightInstant); | ||
| } | ||
|
|
@@ -681,11 +685,21 @@ public void rollbackInflightLogCompaction(HoodieInstant inflightInstant, Functio | |
| final String commitTime = getPendingRollbackInstantFunc.apply(inflightInstant.getTimestamp()).map(entry | ||
| -> entry.getRollbackInstant().getTimestamp()) | ||
| .orElseGet(HoodieActiveTimeline::createNewInstantTime); | ||
| scheduleRollback(context, commitTime, inflightInstant, false, config.shouldRollbackUsingMarkers(), | ||
| false); | ||
| scheduleRollback(commitTime, inflightInstant); | ||
| rollback(context, commitTime, inflightInstant, true, false); | ||
| } | ||
|
|
||
| private void scheduleRollback(String commitTime, HoodieInstant inflightInstant) { | ||
| HoodieInstant rollbackInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, commitTime, HoodieTimeline.ROLLBACK_ACTION); | ||
| try { | ||
| txnManager.beginTransaction(Option.of(rollbackInstant), Option.empty()); | ||
| scheduleRollback(context, commitTime, inflightInstant, false, config.shouldRollbackUsingMarkers(), | ||
| false); | ||
| } finally { | ||
| txnManager.endTransaction(Option.of(rollbackInstant)); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Finalize the written data onto storage. Perform any final cleanups. | ||
| * | ||
|
|
@@ -897,15 +911,25 @@ public void validateInsertSchema() throws HoodieInsertException { | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Validates that the instantTime is latest in the write timeline. This method is specifically used to avoid additional timeline reloads. | ||
| * If the caller expects the validation to explicitly reload, please use {@link #validateForLatestTimestampInternal(Either, boolean, String)}. | ||
| * @param metaClient instance of {@link HoodieTableMetaClient} to use. | ||
| * @param instantTime instant time of interest. | ||
| */ | ||
| public void validateForLatestTimestampWithoutReload(HoodieTableMetaClient metaClient, String instantTime) { | ||
| validateForLatestTimestampInternal(Either.right(metaClient.getActiveTimeline()), metaClient.isMetadataTable(), instantTime); | ||
| } | ||
|
|
||
| /** | ||
| * Validates that the instantTime is latest in the write timeline. | ||
| * @param instantTime instant time of interest. | ||
| */ | ||
| public abstract void validateForLatestTimestamp(String instantTime); | ||
|
|
||
| protected void validateForLatestTimestampInternal(String instantTime) { | ||
| if (this.config.shouldEnableTimestampOrderinValidation() && config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) { | ||
| TimestampUtils.validateForLatestTimestamp(metaClient, instantTime); | ||
| protected void validateForLatestTimestampInternal(Either<HoodieTableMetaClient, HoodieActiveTimeline> metaClientOrActiveTimeline, boolean isMetadataTable, String instantTime) { | ||
| if (this.config.shouldEnableTimestampOrderingValidation() && config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) { | ||
| TimestampUtils.validateForLatestTimestamp(metaClientOrActiveTimeline, isMetadataTable, instantTime); | ||
| } | ||
| } | ||
|
|
||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.