Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-7507] Fixing Rollbacks and Cleaning to acquire locks as needed #13064

Open
wants to merge 7 commits into
base: branch-0.x
Choose a base branch
from

Conversation

nsivabalan
Copy link
Contributor

@nsivabalan nsivabalan commented Mar 31, 2025

Change Logs

  • If there are two concurrent rollback planning by 2 concurrent writers, each could write two rollback plans to timeline. We are fixing that in this patch. Also, following up on [HUDI-7507] Adding timestamp ordering validation before creating requested instant #11580, we are adding locks to rollback planning as well.
  • Also, for cleaning, we added timestamp validation in PR 11580, but locks were not taken. We are adding locks to clean as well for performing timestamp validation.

This is targetted against 0.x branch.

Impact

Robust rollback planning even in the event of concurrent writers/planners.

Risk level (write none, low medium or high below)

low

Documentation Update

Describe any necessary documentation update if there is any new feature, config, or user-facing change. If not, put "none".

  • The config description must be updated if new configs are added or the default value of the configs are changed
  • Any new feature or user-facing change requires updating the Hudi website. Please create a Jira ticket, attach the
    ticket number here and follow the instruction to make
    changes to the website.

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

…s before adding new rollback requested to timeline
@github-actions github-actions bot added the size:M PR with lines of changes in (100, 300] label Mar 31, 2025
@github-actions github-actions bot added size:L PR with lines of changes in (300, 1000] and removed size:M PR with lines of changes in (100, 300] labels Mar 31, 2025
private boolean canProceedWithRollback(HoodieInstant rollbackInstant) {
if (config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
// check for concurrent rollbacks. i.e if the commit being rolledback is already rolled back, we can bail out.
HoodieTableMetaClient reloadedMetaClient = HoodieTableMetaClient.reload(table.getMetaClient());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to rebuild the meta client?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could just reload the active timeline and leverage that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually the reason is 2 call stacks down.
in L 152 we call
validateForLatestTimestampWithoutReload() w/ reloaded meta client as argument.

eventually this calls into

TimestampUtils.validateForLatestTimestamp

within this method, we might need to reload the timeline in most cases. Only in case of this rollback flow, we do not need to reload the timeline.
but to reload the timeline, we need an instant of "HoodieTableMetaClient" and so we could do metaClient.reloadActiveTimeline().

In other words, we can't just take in an activeTimeline as an argument to TimestampUtils.validateForLatestTimestamp

I did think about adding two Optional arguments to this method. but felt it may not be elegant.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have attempted a fix to make this elegant. you can check it out

@nsivabalan nsivabalan changed the title [HUDI-7507] Fixing rollbacks for concurrent rollback planning [HUDI-7507] Fixing Rollbacks and Cleaning to acquire locks as needed Apr 3, 2025
}
table.validateForLatestTimestamp(cleanInstant.getTimestamp());
} finally {
if (!skipLocking) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the lock only used for timestamp validation? so we still support concurrent cleaning?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

timestamp validation is just to ensure the timestamp chosen for this cleaning is higher than all other timestamps generated so far (based in the timeline).
So, does not mean concurrent cleaning.

We could have had a concurrent compaction instant which got added to timeline just around the same time clean instant generation happened, but has higher timestamp compared to this clean instant. Just that the clean planning took non trivial amount of time during which, the compaction plan was added to the timeline

@@ -666,7 +670,7 @@ private void rollbackInflightInstant(HoodieInstant inflightInstant,
-> entry.getRollbackInstant().getTimestamp())
.orElseGet(HoodieActiveTimeline::createNewInstantTime);
scheduleRollback(context, commitTime, inflightInstant, false, config.shouldRollbackUsingMarkers(),
false);
false, false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the skipLocking flag be maintained outside of the hoodie table or all kinds of executors? like this:

  if (skipLocking) {
    scheduleRollback(...)
  } else {
   txnManager.startTxn ...
   scheduleRollback(...)
   txnManager.endTxn ...
  }

I feel like the table and executor should not care about whether the lock should be used, that would make the code cleaner.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for rollbacks, we can do that. but for cleaning, we just need it for timestamp validation. So, it increases the locking granularity. entire planning will be under a lock :(
and esply clean planning could involve non-trivial amount of time, we are trying not to take locks for entire planning phase.

Ideally, if we standardize any planning to be under lock, all this will smoothen out. but locking is also costly. so, we don't wanna take locks just for code maintenance/structuring purpose.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate again why the cleaning service needs a timestamp check, I'm afraid the timestamp check in 0.x-branch would fail the cleaning a lot.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hey @kbuci : Can you help clarify w/ the use-case here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at least, when the check failes, just abort the cleaning instead of throwing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Because of the scenario mentioned in https://issues.apache.org/jira/browse/HUDI-7507 in (S1) , we need to make sure that when clean in scheduled at a given instant time in data table there isn't a compaction plan on MDT with a greater instant time. Using the guard in validate timestamp API (taking a lock + refreshing timelines + ensure there are no ingestion/clustering/compaction instant times on data table with greater timestamp) will avoid this. Alternatively we could use a different validate timestamp API for clean (that just checks for compaction timestamp on MDT) but then we would be no longer using a single validate timestamp API for all operations.
If clean planning were to generate its instant time within a lock, then I think that should lessen the chance of having to abort due to a new later ingestion write starting.

Copy link
Contributor

@danny0405 danny0405 Apr 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to make sure that when clean in scheduled at a given instant time in data table there isn't a compaction plan on MDT with a greater instant time.

it sounds very restrictive and may break the Flink cleaning workflow, we may need to skip it for Flink because Flink does not enable MDT in 0.x branch.

(S1) If Job 2 is ingestion commit and Job 1 is ingestion commit that also does compaction/log compaction on MDT, then when Job 1 runs before Job 2 and can create a compaction plan for all instant times (up to ( x ) ) that doesn’t include instant time (x-1) . Later Job 2 will create instant time (x-1), but timeline will be in a corrupted state since compaction plan was supposed to include (x-1)

As for S1, how could the MDT compaction plan being generated when there are pending instants on DT timeline with smaller timestmap? Should we allow that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it sounds very restrictive and may break the Flink cleaning workflow, we may need to skip it for Flink because Flink does not enable MDT in 0.x branch.

Yes good point for clean scheduling we can avoid doing validateTimestamp check if dataset has no MDT. I would prefer though that we skip if based on wether or not dataset has MDT rather than wether ingestion uses Flink engine. Since I'm not sure if there's a straightforward way for clean schedule call to infer the execution engine used by ingestion

As for S1, how could the MDT compaction plan being generated when there are pending instants on DT timeline with smaller timestmap? Should we allow that.

Oh so in S1 the MDT compaction plan is able to be scheduled since there is no inflight instant on data table at that point in time (which is correct/expected behavior). But (without the validateTimestamp check) the other concurrent clean schedule call on data table can generate a lower timestamp, which will be the same timestamp used on the MDT write (Since an operation on data table at instant time i will write a corresponding deltacommit to MDT at with instant time i).

@@ -746,7 +747,7 @@ public HoodieCleanMetadata clean(String cleanInstantTime, boolean scheduleInline
LOG.info("Cleaner started");
// proceed only if multiple clean schedules are enabled or if there are no pending cleans.
if (scheduleInline) {
scheduleTableServiceInternal(cleanInstantTime, Option.empty(), TableServiceType.CLEAN);
scheduleClean(cleanInstantTime);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this caller was not taking lock for schedule clean before this patch. I am standardizing all calls to scheduleClean to take locks in this patch.

@hudi-bot
Copy link

hudi-bot commented Apr 4, 2025

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
size:L PR with lines of changes in (300, 1000]
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants