Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added hudi-client/.hudi-client.iml.icloud
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,7 @@ protected Option<String> scheduleTableServiceInternal(String instantTime, Option
case CLEAN:
LOG.info("Scheduling cleaning at instant time: {}", instantTime);
Option<HoodieCleanerPlan> cleanerPlan = table
.scheduleCleaning(context, instantTime, extraMetadata);
.scheduleCleaning(context, instantTime, extraMetadata, true);
option = cleanerPlan.isPresent() ? Option.of(instantTime) : Option.empty();
break;
default:
Expand Down Expand Up @@ -1035,7 +1035,7 @@ public boolean rollback(final String commitInstantTime, Option<HoodiePendingRoll
rollbackInstantTime, commitInstantOpt.isPresent(), pendingRollbackInfo.isPresent()));
Option<HoodieRollbackPlan> rollbackPlanOption = pendingRollbackInfo.map(entry -> Option.of(entry.getRollbackPlan()))
.orElseGet(() -> table.scheduleRollback(context, rollbackInstantTime, commitInstantOpt.get(), false, config.shouldRollbackUsingMarkers(),
false));
false, skipLocking));
if (rollbackPlanOption.isPresent()) {
// There can be a case where the inflight rollback failed after the instant files
// are deleted for commitInstantTime, so that commitInstantOpt is empty as it is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,8 +342,8 @@ protected final void saveInternalSchema(HoodieTable table, String instantTime, H
protected abstract void validateTimestamp(HoodieTableMetaClient metaClient, String instantTime);

protected void validateTimestampInternal(HoodieTableMetaClient metaClient, String instantTime) {
if (config.shouldEnableTimestampOrderinValidation() && config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
TimestampUtils.validateForLatestTimestamp(metaClient, instantTime);
if (config.shouldEnableTimestampOrderingValidation() && config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
TimestampUtils.validateForLatestTimestamp(metaClient, instantTime, true);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,17 @@
package org.apache.hudi.client.timeline;

import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ValidationUtils;

public class TimestampUtils {

public static void validateForLatestTimestamp(HoodieTableMetaClient metaClient, String instantTime) {
public static void validateForLatestTimestamp(HoodieTableMetaClient metaClient, String instantTime, boolean reloadTimeline) {
// validate that the instant for which requested is about to be created is the latest in the timeline.
if (!metaClient.isMetadataTable()) { // lets validate data table that timestamps are generated in monotically increasing order.
HoodieTableMetaClient reloadedMetaClient = HoodieTableMetaClient.reload(metaClient);
reloadedMetaClient.getActiveTimeline().getWriteTimeline().lastInstant().ifPresent(entry -> {
HoodieActiveTimeline reloadedActiveTimeline = reloadTimeline ? metaClient.reloadActiveTimeline() : metaClient.getActiveTimeline();
reloadedActiveTimeline.getWriteTimeline().lastInstant().ifPresent(entry -> {
ValidationUtils.checkArgument(HoodieTimeline.compareTimestamps(entry.getTimestamp(), HoodieTimeline.LESSER_THAN_OR_EQUALS, instantTime),
"Found later commit time " + entry + ", compared to the current instant " + instantTime + ", hence failing to create requested commit meta file");
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2629,7 +2629,7 @@ public Integer getWritesFileIdEncoding() {
return props.getInteger(WRITES_FILEID_ENCODING, HoodieMetadataPayload.RECORD_INDEX_FIELD_FILEID_ENCODING_UUID);
}

public Boolean shouldEnableTimestampOrderinValidation() {
public Boolean shouldEnableTimestampOrderingValidation() {
return getBoolean(ENABLE_TIMESTAMP_ORDERING_VALIDATION);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -510,11 +510,12 @@ public abstract Option<HoodieClusteringPlan> scheduleClustering(HoodieEngineCont
* @param context HoodieEngineContext
* @param instantTime Instant Time for scheduling cleaning
* @param extraMetadata additional metadata to write into plan
* @param skipLocking {@code true} if locking needs to be skipped. false otherwise.
* @return HoodieCleanerPlan, if there is anything to clean.
*/
public abstract Option<HoodieCleanerPlan> scheduleCleaning(HoodieEngineContext context,
String instantTime,
Option<Map<String, String>> extraMetadata);
Option<Map<String, String>> extraMetadata, boolean skipLocking);

/**
* Executes a new clean action.
Expand All @@ -540,13 +541,16 @@ public HoodieCleanMetadata clean(HoodieEngineContext context, String cleanInstan
* @param instantTime Instant Time for scheduling rollback
* @param instantToRollback instant to be rolled back
* @param shouldRollbackUsingMarkers uses marker based rollback strategy when set to true. uses list based rollback when false.
* @param isRestore {@code true} when invoked as part of restore.
* @param skipLocking {@code true} when locking needs to be skipped. if not, lock is acquired to plan rollback.
* @return HoodieRollbackPlan containing info on rollback.
*/
public abstract Option<HoodieRollbackPlan> scheduleRollback(HoodieEngineContext context,
String instantTime,
HoodieInstant instantToRollback,
boolean skipTimelinePublish, boolean shouldRollbackUsingMarkers,
boolean isRestore);
boolean isRestore,
boolean skipLocking);

/**
* Rollback the (inflight/committed) record changes with the given commit time.
Expand Down Expand Up @@ -666,7 +670,7 @@ private void rollbackInflightInstant(HoodieInstant inflightInstant,
-> entry.getRollbackInstant().getTimestamp())
.orElseGet(HoodieActiveTimeline::createNewInstantTime);
scheduleRollback(context, commitTime, inflightInstant, false, config.shouldRollbackUsingMarkers(),
false);
false, false);
Comment thread
nsivabalan marked this conversation as resolved.
Outdated
rollback(context, commitTime, inflightInstant, false, false);
getActiveTimeline().revertInstantFromInflightToRequested(inflightInstant);
}
Expand All @@ -682,7 +686,7 @@ public void rollbackInflightLogCompaction(HoodieInstant inflightInstant, Functio
-> entry.getRollbackInstant().getTimestamp())
.orElseGet(HoodieActiveTimeline::createNewInstantTime);
scheduleRollback(context, commitTime, inflightInstant, false, config.shouldRollbackUsingMarkers(),
false);
false, false);
rollback(context, commitTime, inflightInstant, true, false);
}

Expand Down Expand Up @@ -897,15 +901,25 @@ public void validateInsertSchema() throws HoodieInsertException {
}
}

/**
* Validates that the instantTime is latest in the write timeline. This method is specifically used to avoid additional timeline reloads.
* If the caller expects the validation to explicitly reload, please use {@link #validateForLatestTimestampInternal(HoodieTableMetaClient, String, boolean)}.
* @param metaClient instance of {@link HoodieTableMetaClient} to use.
* @param instantTime instant time of interest.
*/
public void validateForLatestTimestampWithoutReload(HoodieTableMetaClient metaClient, String instantTime) {
validateForLatestTimestampInternal(metaClient, instantTime, false);
}

/**
* Validates that the instantTime is latest in the write timeline.
* @param instantTime instant time of interest.
*/
public abstract void validateForLatestTimestamp(String instantTime);

protected void validateForLatestTimestampInternal(String instantTime) {
if (this.config.shouldEnableTimestampOrderinValidation() && config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
TimestampUtils.validateForLatestTimestamp(metaClient, instantTime);
protected void validateForLatestTimestampInternal(HoodieTableMetaClient metaClient, String instantTime, boolean reloadTimeline) {
if (this.config.shouldEnableTimestampOrderingValidation() && config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
TimestampUtils.validateForLatestTimestamp(metaClient, instantTime, reloadTimeline);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.avro.model.HoodieCleanFileInfo;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.CleanFileInfo;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
Expand Down Expand Up @@ -56,14 +57,19 @@ public class CleanPlanActionExecutor<T, I, K, O> extends BaseActionExecutor<T, I

private static final Logger LOG = LoggerFactory.getLogger(CleanPlanActionExecutor.class);
private final Option<Map<String, String>> extraMetadata;
private final TransactionManager txnManager;
private final boolean skipLocking;

public CleanPlanActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable<T, I, K, O> table,
String instantTime,
Option<Map<String, String>> extraMetadata) {
Option<Map<String, String>> extraMetadata,
boolean skipLocking) {
super(context, config, table, instantTime);
this.extraMetadata = extraMetadata;
this.txnManager = new TransactionManager(config, table.getStorage());
Comment thread
nsivabalan marked this conversation as resolved.
Outdated
this.skipLocking = skipLocking;
}

private int getCommitsSinceLastCleaning() {
Expand Down Expand Up @@ -179,7 +185,7 @@ protected Option<HoodieCleanerPlan> requestClean(String startCleanTime) {
final HoodieInstant cleanInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION, startCleanTime);
// Save to both aux and timeline folder
try {
table.validateForLatestTimestamp(cleanInstant.getTimestamp());
validateForLatestTimestamp(cleanInstant);
table.getActiveTimeline().saveToCleanRequested(cleanInstant, TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan));
LOG.info("Requesting Cleaning with instant time " + cleanInstant);
} catch (IOException e) {
Expand All @@ -192,6 +198,19 @@ protected Option<HoodieCleanerPlan> requestClean(String startCleanTime) {
return option;
}

private void validateForLatestTimestamp(HoodieInstant cleanInstant) {
try {
if (!skipLocking) {
txnManager.beginTransaction(Option.of(cleanInstant), Option.empty());
}
table.validateForLatestTimestamp(cleanInstant.getTimestamp());
} finally {
if (!skipLocking) {
Comment thread
nsivabalan marked this conversation as resolved.
Outdated
txnManager.endTransaction(Option.of(cleanInstant));
}
}
}

@Override
public Option<HoodieCleanerPlan> execute() {
if (!needsCleaning(config.getCleaningTriggerStrategy())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ protected HoodieRollbackMetadata rollbackInstant(HoodieInstant instantToRollback
}
table.getMetaClient().reloadActiveTimeline();
String newInstantTime = HoodieActiveTimeline.createNewInstantTime();
table.scheduleRollback(context, newInstantTime, instantToRollback, false, false, true);
table.scheduleRollback(context, newInstantTime, instantToRollback, false, false, true, true);
table.getMetaClient().reloadActiveTimeline();
CopyOnWriteRollbackActionExecutor rollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(
context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ protected HoodieRollbackMetadata rollbackInstant(HoodieInstant instantToRollback
}
table.getMetaClient().reloadActiveTimeline();
String instantTime = HoodieActiveTimeline.createNewInstantTime();
table.scheduleRollback(context, instantTime, instantToRollback, false, false, true);
table.scheduleRollback(context, instantTime, instantToRollback, false, false, true, true);
table.getMetaClient().reloadActiveTimeline();
MergeOnReadRollbackActionExecutor rollbackActionExecutor = new MergeOnReadRollbackActionExecutor(
context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import org.apache.hudi.avro.model.HoodieInstantInfo;
import org.apache.hudi.avro.model.HoodieRollbackPlan;
import org.apache.hudi.avro.model.HoodieRollbackRequest;
import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
Expand Down Expand Up @@ -50,6 +52,8 @@ public class BaseRollbackPlanActionExecutor<T, I, K, O> extends BaseActionExecut
protected final HoodieInstant instantToRollback;
private final boolean skipTimelinePublish;
private final boolean shouldRollbackUsingMarkers;
private final TransactionManager txnManager;
private final boolean skipLocking;
protected final Boolean isRestore;

public static final Integer ROLLBACK_PLAN_VERSION_1 = 1;
Expand All @@ -62,12 +66,15 @@ public BaseRollbackPlanActionExecutor(HoodieEngineContext context,
HoodieInstant instantToRollback,
boolean skipTimelinePublish,
boolean shouldRollbackUsingMarkers,
boolean isRestore) {
boolean isRestore,
boolean skipLocking) {
super(context, config, table, instantTime);
this.instantToRollback = instantToRollback;
this.skipTimelinePublish = skipTimelinePublish;
this.shouldRollbackUsingMarkers = shouldRollbackUsingMarkers && !instantToRollback.isCompleted();
this.isRestore = isRestore;
this.txnManager = new TransactionManager(config, table.getStorage());
this.skipLocking = skipLocking;
Comment thread
nsivabalan marked this conversation as resolved.
Outdated
}

/**
Expand Down Expand Up @@ -113,7 +120,9 @@ protected Option<HoodieRollbackPlan> requestRollback(String startRollbackTime) {
HoodieRollbackPlan rollbackPlan = new HoodieRollbackPlan(new HoodieInstantInfo(instantToRollback.getTimestamp(),
instantToRollback.getAction()), rollbackRequests, LATEST_ROLLBACK_PLAN_VERSION);
if (!skipTimelinePublish) {
table.validateForLatestTimestamp(rollbackInstant.getTimestamp());
if (!canProceedWithRollback(rollbackInstant)) {
return Option.empty();
}
if (table.getRollbackTimeline().filterInflightsAndRequested().containsInstant(rollbackInstant.getTimestamp())) {
LOG.warn("Request Rollback found with instant time " + rollbackInstant + ", hence skipping scheduling rollback");
} else {
Expand All @@ -129,9 +138,36 @@ protected Option<HoodieRollbackPlan> requestRollback(String startRollbackTime) {
}
}

private boolean canProceedWithRollback(HoodieInstant rollbackInstant) {
if (config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
// check for concurrent rollbacks. i.e if the commit being rolledback is already rolled back, we can bail out.
HoodieTableMetaClient reloadedMetaClient = HoodieTableMetaClient.reload(table.getMetaClient());
Comment thread
nsivabalan marked this conversation as resolved.
HoodieTimeline reloadedActiveTimeline = reloadedMetaClient.reloadActiveTimeline();
Comment thread
nsivabalan marked this conversation as resolved.
Outdated
if (!reloadedActiveTimeline.filterInflightsAndRequested().containsInstant(instantToRollback.getTimestamp())
&& !reloadedActiveTimeline.filterCompletedInstants().containsInstant(instantToRollback.getTimestamp())) {
// if instant to rollback is already rolled back, we can bail out.
return false;
} else {
// since we had already reloaded the timeline above, lets avoid additional reload with validateForLatestTimestamp.
table.validateForLatestTimestampWithoutReload(reloadedMetaClient, rollbackInstant.getTimestamp());
}
}
return true;
}

@Override
public Option<HoodieRollbackPlan> execute() {
// Plan a new rollback action
return requestRollback(instantTime);
HoodieInstant rollbackInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, instantTime, HoodieTimeline.ROLLBACK_ACTION);
try {
if (!skipLocking) {
txnManager.beginTransaction(Option.of(rollbackInstant), Option.empty());
}
// Plan a new rollback action
return requestRollback(instantTime);
} finally {
if (!skipLocking) {
txnManager.endTransaction(Option.of(rollbackInstant));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -356,18 +356,20 @@ public void rollbackBootstrap(HoodieEngineContext context, String instantTime) {
* @param context HoodieEngineContext
* @param instantTime Instant Time for scheduling cleaning
* @param extraMetadata additional metadata to write into plan
* @param skipLocking
* @return
*/
@Override
public Option<HoodieCleanerPlan> scheduleCleaning(HoodieEngineContext context, String instantTime, Option<Map<String, String>> extraMetadata) {
return new CleanPlanActionExecutor(context, config, this, instantTime, extraMetadata).execute();
public Option<HoodieCleanerPlan> scheduleCleaning(HoodieEngineContext context, String instantTime, Option<Map<String, String>> extraMetadata, boolean skipLocking) {
return new CleanPlanActionExecutor(context, config, this, instantTime, extraMetadata, skipLocking).execute();
}

@Override
public Option<HoodieRollbackPlan> scheduleRollback(HoodieEngineContext context, String instantTime, HoodieInstant instantToRollback,
boolean skipTimelinePublish, boolean shouldRollbackUsingMarkers, boolean isRestore) {
boolean skipTimelinePublish, boolean shouldRollbackUsingMarkers, boolean isRestore,
boolean skipLocking) {
return new BaseRollbackPlanActionExecutor(context, config, this, instantTime, instantToRollback, skipTimelinePublish,
shouldRollbackUsingMarkers, isRestore).execute();
shouldRollbackUsingMarkers, isRestore, skipLocking).execute();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,10 @@ public Iterator<List<WriteStatus>> handleInsertsForLogCompaction(String instantT

@Override
public Option<HoodieRollbackPlan> scheduleRollback(HoodieEngineContext context, String instantTime, HoodieInstant instantToRollback,
boolean skipTimelinePublish, boolean shouldRollbackUsingMarkers, boolean isRestore) {
boolean skipTimelinePublish, boolean shouldRollbackUsingMarkers, boolean isRestore,
boolean skipLocking) {
return new BaseRollbackPlanActionExecutor(context, config, this, instantTime, instantToRollback, skipTimelinePublish,
shouldRollbackUsingMarkers, isRestore).execute();
shouldRollbackUsingMarkers, isRestore, skipLocking).execute();
}

@Override
Expand Down
Loading