Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hudi.common.table.timeline.HoodieTimeline.CLEAN_ACTION;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
Expand Down Expand Up @@ -746,7 +747,7 @@ public HoodieCleanMetadata clean(String cleanInstantTime, boolean scheduleInline
LOG.info("Cleaner started");
// proceed only if multiple clean schedules are enabled or if there are no pending cleans.
if (scheduleInline) {
scheduleTableServiceInternal(cleanInstantTime, Option.empty(), TableServiceType.CLEAN);
scheduleClean(cleanInstantTime);
Comment thread
nsivabalan marked this conversation as resolved.
table.getMetaClient().reloadActiveTimeline();
}

Expand All @@ -769,6 +770,16 @@ public HoodieCleanMetadata clean(String cleanInstantTime, boolean scheduleInline
return metadata;
}

private void scheduleClean(String cleanInstantTime) {
HoodieInstant cleanInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, CLEAN_ACTION, cleanInstantTime);
try {
txnManager.beginTransaction(Option.of(cleanInstant), Option.empty());
scheduleTableServiceInternal(cleanInstantTime, Option.empty(), TableServiceType.CLEAN);
} finally {
txnManager.endTransaction(Option.of(cleanInstant));
}
}

/**
* Trigger archival for the table. This ensures that the number of commits do not explode
* and keep increasing unbounded over time.
Expand Down Expand Up @@ -1034,8 +1045,7 @@ public boolean rollback(final String commitInstantTime, Option<HoodiePendingRoll
+ "(exists in active timeline: %s), with rollback plan: %s",
rollbackInstantTime, commitInstantOpt.isPresent(), pendingRollbackInfo.isPresent()));
Option<HoodieRollbackPlan> rollbackPlanOption = pendingRollbackInfo.map(entry -> Option.of(entry.getRollbackPlan()))
.orElseGet(() -> table.scheduleRollback(context, rollbackInstantTime, commitInstantOpt.get(), false, config.shouldRollbackUsingMarkers(),
false));
.orElseGet(() -> scheduleRollback(table, rollbackInstantTime, commitInstantOpt, skipLocking));
if (rollbackPlanOption.isPresent()) {
// There can be a case where the inflight rollback failed after the instant files
// are deleted for commitInstantTime, so that commitInstantOpt is empty as it is
Expand Down Expand Up @@ -1065,6 +1075,22 @@ public boolean rollback(final String commitInstantTime, Option<HoodiePendingRoll
}
}

private Option<HoodieRollbackPlan> scheduleRollback(HoodieTable table, String rollbackInstantTime, Option<HoodieInstant> commitInstantOpt,
boolean skipLocking) {
HoodieInstant rollbackInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, rollbackInstantTime, HoodieTimeline.ROLLBACK_ACTION);
try {
if (!skipLocking) {
txnManager.beginTransaction(Option.of(rollbackInstant), Option.empty());
}
return table.scheduleRollback(context, rollbackInstantTime, commitInstantOpt.get(), false, config.shouldRollbackUsingMarkers(),
false);
} finally {
if (!skipLocking) {
txnManager.endTransaction(Option.of(rollbackInstant));
}
}
}

/**
* Main API to rollback failed bootstrap.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Either;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
Expand Down Expand Up @@ -342,8 +343,8 @@ protected final void saveInternalSchema(HoodieTable table, String instantTime, H
protected abstract void validateTimestamp(HoodieTableMetaClient metaClient, String instantTime);

protected void validateTimestampInternal(HoodieTableMetaClient metaClient, String instantTime) {
if (config.shouldEnableTimestampOrderinValidation() && config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
TimestampUtils.validateForLatestTimestamp(metaClient, instantTime);
if (config.shouldEnableTimestampOrderingValidation() && config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
TimestampUtils.validateForLatestTimestamp(Either.left(metaClient), metaClient.isMetadataTable(), instantTime);
}
}

Expand Down Expand Up @@ -829,20 +830,7 @@ private Pair<String, Option<HoodieRestorePlan>> scheduleAndGetRestorePlan(final
* cleaned)
*/
public HoodieCleanMetadata clean(String cleanInstantTime) throws HoodieIOException {
return clean(cleanInstantTime, true, false);
}

/**
* Clean up any stale/old files/data lying around (either on file storage or index storage) based on the
* configurations and CleaningPolicy used. (typically files that no longer can be used by a running query can be
* cleaned)
* @param cleanInstantTime instant time for clean.
* @param skipLocking if this is triggered by another parent transaction, locking can be skipped.
* @return instance of {@link HoodieCleanMetadata}.
*/
@Deprecated
public HoodieCleanMetadata clean(String cleanInstantTime, boolean skipLocking) throws HoodieIOException {
return clean(cleanInstantTime, true, false);
return clean(cleanInstantTime, true);
}

/**
Expand All @@ -853,9 +841,8 @@ public HoodieCleanMetadata clean(String cleanInstantTime, boolean skipLocking) t
* of clean.
* @param cleanInstantTime instant time for clean.
* @param scheduleInline true if needs to be scheduled inline. false otherwise.
* @param skipLocking if this is triggered by another parent transaction, locking can be skipped.
*/
public HoodieCleanMetadata clean(String cleanInstantTime, boolean scheduleInline, boolean skipLocking) throws HoodieIOException {
public HoodieCleanMetadata clean(String cleanInstantTime, boolean scheduleInline) throws HoodieIOException {
return tableServiceClient.clean(cleanInstantTime, scheduleInline);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,18 @@
package org.apache.hudi.client.timeline;

import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Either;
import org.apache.hudi.common.util.ValidationUtils;

public class TimestampUtils {

public static void validateForLatestTimestamp(HoodieTableMetaClient metaClient, String instantTime) {
public static void validateForLatestTimestamp(Either<HoodieTableMetaClient, HoodieActiveTimeline> metaClientOrActiveTimeline, boolean isMetadataTable, String instantTime) {
// validate that the instant for which requested is about to be created is the latest in the timeline.
if (!metaClient.isMetadataTable()) { // lets validate data table that timestamps are generated in monotically increasing order.
HoodieTableMetaClient reloadedMetaClient = HoodieTableMetaClient.reload(metaClient);
reloadedMetaClient.getActiveTimeline().getWriteTimeline().lastInstant().ifPresent(entry -> {
if (!isMetadataTable) { // lets validate data table that timestamps are generated in monotically increasing order.
HoodieActiveTimeline reloadedActiveTimeline = metaClientOrActiveTimeline.isLeft() ? metaClientOrActiveTimeline.asLeft().reloadActiveTimeline() : metaClientOrActiveTimeline.asRight();
reloadedActiveTimeline.getWriteTimeline().lastInstant().ifPresent(entry -> {
ValidationUtils.checkArgument(HoodieTimeline.compareTimestamps(entry.getTimestamp(), HoodieTimeline.LESSER_THAN_OR_EQUALS, instantTime),
"Found later commit time " + entry + ", compared to the current instant " + instantTime + ", hence failing to create requested commit meta file");
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2629,7 +2629,7 @@ public Integer getWritesFileIdEncoding() {
return props.getInteger(WRITES_FILEID_ENCODING, HoodieMetadataPayload.RECORD_INDEX_FIELD_FILEID_ENCODING_UUID);
}

public Boolean shouldEnableTimestampOrderinValidation() {
public Boolean shouldEnableTimestampOrderingValidation() {
return getBoolean(ENABLE_TIMESTAMP_ORDERING_VALIDATION);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.hudi.avro.model.HoodieRollbackPlan;
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.client.timeline.TimestampUtils;
import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.common.HoodiePendingRollbackInfo;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.engine.HoodieEngineContext;
Expand All @@ -58,6 +59,7 @@
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
import org.apache.hudi.common.util.Either;
import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
Expand Down Expand Up @@ -134,6 +136,7 @@ public abstract class HoodieTable<T, I, K, O> implements Serializable {
private final HoodieTableMetadata metadata;
private final HoodieStorageLayout storageLayout;
private final boolean isMetadataTable;
private final TransactionManager txnManager;

private transient FileSystemViewManager viewManager;
protected final transient HoodieEngineContext context;
Expand All @@ -153,6 +156,7 @@ protected HoodieTable(HoodieWriteConfig config, HoodieEngineContext context, Hoo
this.index = getIndex(config, context);
this.storageLayout = getStorageLayout(config);
this.taskContextSupplier = context.getTaskContextSupplier();
this.txnManager = new TransactionManager(config, metaClient.getStorage());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure we introduce txnManager into hoodie table? It looks like it is only used for rollback, can we move it out?

}

public boolean isMetadataTable() {
Expand Down Expand Up @@ -540,6 +544,7 @@ public HoodieCleanMetadata clean(HoodieEngineContext context, String cleanInstan
* @param instantTime Instant Time for scheduling rollback
* @param instantToRollback instant to be rolled back
* @param shouldRollbackUsingMarkers uses marker based rollback strategy when set to true. uses list based rollback when false.
* @param isRestore {@code true} when invoked as part of restore.
* @return HoodieRollbackPlan containing info on rollback.
*/
public abstract Option<HoodieRollbackPlan> scheduleRollback(HoodieEngineContext context,
Expand Down Expand Up @@ -665,8 +670,7 @@ private void rollbackInflightInstant(HoodieInstant inflightInstant,
final String commitTime = getPendingRollbackInstantFunc.apply(inflightInstant.getTimestamp()).map(entry
-> entry.getRollbackInstant().getTimestamp())
.orElseGet(HoodieActiveTimeline::createNewInstantTime);
scheduleRollback(context, commitTime, inflightInstant, false, config.shouldRollbackUsingMarkers(),
false);
scheduleRollback(commitTime, inflightInstant);
rollback(context, commitTime, inflightInstant, false, false);
getActiveTimeline().revertInstantFromInflightToRequested(inflightInstant);
}
Expand All @@ -681,11 +685,21 @@ public void rollbackInflightLogCompaction(HoodieInstant inflightInstant, Functio
final String commitTime = getPendingRollbackInstantFunc.apply(inflightInstant.getTimestamp()).map(entry
-> entry.getRollbackInstant().getTimestamp())
.orElseGet(HoodieActiveTimeline::createNewInstantTime);
scheduleRollback(context, commitTime, inflightInstant, false, config.shouldRollbackUsingMarkers(),
false);
scheduleRollback(commitTime, inflightInstant);
rollback(context, commitTime, inflightInstant, true, false);
}

private void scheduleRollback(String commitTime, HoodieInstant inflightInstant) {
HoodieInstant rollbackInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, commitTime, HoodieTimeline.ROLLBACK_ACTION);
try {
txnManager.beginTransaction(Option.of(rollbackInstant), Option.empty());
scheduleRollback(context, commitTime, inflightInstant, false, config.shouldRollbackUsingMarkers(),
false);
} finally {
txnManager.endTransaction(Option.of(rollbackInstant));
}
}

/**
* Finalize the written data onto storage. Perform any final cleanups.
*
Expand Down Expand Up @@ -897,15 +911,25 @@ public void validateInsertSchema() throws HoodieInsertException {
}
}

/**
* Validates that the instantTime is latest in the write timeline. This method is specifically used to avoid additional timeline reloads.
* If the caller expects the validation to explicitly reload, please use {@link #validateForLatestTimestampInternal(Either, boolean, String)}.
* @param metaClient instance of {@link HoodieTableMetaClient} to use.
* @param instantTime instant time of interest.
*/
public void validateForLatestTimestampWithoutReload(HoodieTableMetaClient metaClient, String instantTime) {
validateForLatestTimestampInternal(Either.right(metaClient.getActiveTimeline()), metaClient.isMetadataTable(), instantTime);
}

/**
* Validates that the instantTime is latest in the write timeline.
* @param instantTime instant time of interest.
*/
public abstract void validateForLatestTimestamp(String instantTime);

protected void validateForLatestTimestampInternal(String instantTime) {
if (this.config.shouldEnableTimestampOrderinValidation() && config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
TimestampUtils.validateForLatestTimestamp(metaClient, instantTime);
protected void validateForLatestTimestampInternal(Either<HoodieTableMetaClient, HoodieActiveTimeline> metaClientOrActiveTimeline, boolean isMetadataTable, String instantTime) {
if (this.config.shouldEnableTimestampOrderingValidation() && config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
TimestampUtils.validateForLatestTimestamp(metaClientOrActiveTimeline, isMetadataTable, instantTime);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.avro.model.HoodieRollbackPlan;
import org.apache.hudi.avro.model.HoodieRollbackRequest;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
Expand Down Expand Up @@ -113,7 +114,9 @@ protected Option<HoodieRollbackPlan> requestRollback(String startRollbackTime) {
HoodieRollbackPlan rollbackPlan = new HoodieRollbackPlan(new HoodieInstantInfo(instantToRollback.getTimestamp(),
instantToRollback.getAction()), rollbackRequests, LATEST_ROLLBACK_PLAN_VERSION);
if (!skipTimelinePublish) {
table.validateForLatestTimestamp(rollbackInstant.getTimestamp());
if (!canProceedWithRollback(rollbackInstant)) {
return Option.empty();
}
if (table.getRollbackTimeline().filterInflightsAndRequested().containsInstant(rollbackInstant.getTimestamp())) {
LOG.warn("Request Rollback found with instant time " + rollbackInstant + ", hence skipping scheduling rollback");
} else {
Expand All @@ -129,6 +132,23 @@ protected Option<HoodieRollbackPlan> requestRollback(String startRollbackTime) {
}
}

private boolean canProceedWithRollback(HoodieInstant rollbackInstant) {
if (config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
// check for concurrent rollbacks. i.e if the commit being rolledback is already rolled back, we can bail out.
HoodieTableMetaClient reloadedMetaClient = HoodieTableMetaClient.reload(table.getMetaClient());
Comment thread
nsivabalan marked this conversation as resolved.
HoodieTimeline reloadedActiveTimeline = reloadedMetaClient.reloadActiveTimeline();
Comment thread
nsivabalan marked this conversation as resolved.
Outdated
if (!reloadedActiveTimeline.filterInflightsAndRequested().containsInstant(instantToRollback.getTimestamp())
&& !reloadedActiveTimeline.filterCompletedInstants().containsInstant(instantToRollback.getTimestamp())) {
// if instant to rollback is already rolled back, we can bail out.
return false;
} else {
// since we had already reloaded the timeline above, lets avoid additional reload with validateForLatestTimestamp.
table.validateForLatestTimestampWithoutReload(reloadedMetaClient, rollbackInstant.getTimestamp());
}
}
return true;
}

@Override
public Option<HoodieRollbackPlan> execute() {
// Plan a new rollback action
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Either;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
Expand Down Expand Up @@ -94,7 +95,7 @@ protected HoodieJavaCopyOnWriteTable(HoodieWriteConfig config,

@Override
public void validateForLatestTimestamp(String instantTime) {
validateForLatestTimestampInternal(instantTime);
validateForLatestTimestampInternal(Either.left(metaClient), metaClient.isMetadataTable(), instantTime);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Either;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
Expand Down Expand Up @@ -106,7 +107,7 @@ public HoodieSparkCopyOnWriteTable(HoodieWriteConfig config, HoodieEngineContext

@Override
public void validateForLatestTimestamp(String instantTime) {
validateForLatestTimestampInternal(instantTime);
validateForLatestTimestampInternal(Either.left(metaClient), metaClient.isMetadataTable(), instantTime);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
Expand Down Expand Up @@ -140,7 +141,13 @@ private HoodieData<HoodieRecord<T>> clusteringHandleUpdate(HoodieData<HoodieReco
.collect(Collectors.toSet());
pendingClusteringInstantsToRollback.forEach(instant -> {
String commitTime = HoodieActiveTimeline.createNewInstantTime();
table.scheduleRollback(context, commitTime, instant, false, config.shouldRollbackUsingMarkers(), false);
HoodieInstant rollbackInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, commitTime, HoodieTimeline.ROLLBACK_ACTION);
try {
txnManagerOption.ifPresent(txnManager -> txnManager.beginTransaction(Option.of(rollbackInstant), Option.empty()));
table.scheduleRollback(context, commitTime, instant, false, config.shouldRollbackUsingMarkers(), false);
} finally {
txnManagerOption.ifPresent(txnManager -> txnManager.endTransaction(Option.of(rollbackInstant)));
}
table.rollback(context, commitTime, instant, true, true);
});
table.getMetaClient().reloadActiveTimeline();
Expand Down
Loading