Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DNM][HUDI-9030] Run Azure CI with table version 6 #12852

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public class HoodieWriteConfig extends HoodieConfig {

public static final ConfigProperty<Integer> WRITE_TABLE_VERSION = ConfigProperty
.key("hoodie.write.table.version")
.defaultValue(HoodieTableVersion.current().versionCode())
.defaultValue(HoodieTableVersion.SIX.versionCode())
.withValidValues(
String.valueOf(HoodieTableVersion.SIX.versionCode()),
String.valueOf(HoodieTableVersion.current().versionCode())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,29 @@
import org.apache.hudi.avro.model.HoodieRollbackRequest;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.CompletionTimeQueryView;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathFilter;
import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.table.HoodieTable;

import org.apache.hadoop.fs.Path;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -46,14 +52,18 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN;
import static org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN_OR_EQUALS;
import static org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps;
import static org.apache.hudi.common.table.timeline.MetadataConversionUtils.getHoodieCommitMetadata;
import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
import static org.apache.hudi.table.action.rollback.BaseRollbackHelper.EMPTY_STRING;

/**
Expand Down Expand Up @@ -164,7 +174,9 @@ public List<HoodieRollbackRequest> getRollbackRequests(HoodieInstant instantToRo
// We do not know fileIds for inserts (first inserts are either log files or base files),
// delete all files for the corresponding failed commit, if present (same as COW)
hoodieRollbackRequests.addAll(getHoodieRollbackRequests(partitionPath, filesToDelete.get()));

if (metaClient.getTableConfig().getTableVersion().lesserThan(HoodieTableVersion.EIGHT)) {
hoodieRollbackRequests.addAll(getRollbackRequestToAppendForVersionSix(partitionPath, instantToRollback, commitMetadataOptional.get(), table));
}
break;
default:
throw new HoodieRollbackException("Unknown listing type, during rollback of " + instantToRollback);
Expand All @@ -181,6 +193,62 @@ public List<HoodieRollbackRequest> getRollbackRequests(HoodieInstant instantToRo
}
}

public static List<HoodieRollbackRequest> getRollbackRequestToAppendForVersionSix(String partitionPath, HoodieInstant rollbackInstant,
HoodieCommitMetadata commitMetadata, HoodieTable<?, ?, ?, ?> table) {
List<HoodieRollbackRequest> hoodieRollbackRequests = new ArrayList<>();
checkArgument(table.version().lesserThan(HoodieTableVersion.EIGHT));
checkArgument(rollbackInstant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION));

// wStat.getPrevCommit() might not give the right commit time in the following
// scenario : If a compaction was scheduled, the new commitTime associated with the requested compaction will be
// used to write the new log files. In this case, the commit time for the log file is the compaction requested time.
// But the index (global) might store the baseCommit of the base and not the requested, hence get the
// baseCommit always by listing the file slice
// With multi writers, rollbacks could be lazy. and so we need to use getLatestFileSlicesBeforeOrOn() instead of getLatestFileSlices()
Map<String, FileSlice> latestFileSlices = table.getSliceView()
.getLatestFileSlicesBeforeOrOn(partitionPath, rollbackInstant.requestedTime(), true)
.collect(Collectors.toMap(FileSlice::getFileId, Function.identity()));

List<HoodieWriteStat> hoodieWriteStats = Option.ofNullable(commitMetadata.getPartitionToWriteStats().get(partitionPath)).orElse(Collections.emptyList());
hoodieWriteStats = hoodieWriteStats.stream()
.filter(writeStat -> {
// Filter out stats without prevCommit since they are all inserts
boolean validForRollback = (writeStat != null) && (!writeStat.getPrevCommit().equals(HoodieWriteStat.NULL_COMMIT))
&& (writeStat.getPrevCommit() != null) && latestFileSlices.containsKey(writeStat.getFileId());

if (!validForRollback) {
return false;
}

FileSlice latestFileSlice = latestFileSlices.get(writeStat.getFileId());

// For sanity, log-file base-instant time can never be less than base-commit on which we are rolling back
checkArgument(
compareTimestamps(latestFileSlice.getBaseInstantTime(), LESSER_THAN_OR_EQUALS, rollbackInstant.requestedTime()),
"Log-file base-instant could not be less than the instant being rolled back");

// Command block "rolling back" the preceding block {@link HoodieCommandBlockTypeEnum#ROLLBACK_PREVIOUS_BLOCK}
// w/in the latest file-slice is appended iff base-instant of the log-file is _strictly_ less
// than the instant of the Delta Commit being rolled back. Otherwise, log-file will be cleaned up
// in a different branch of the flow.
return compareTimestamps(latestFileSlice.getBaseInstantTime(), LESSER_THAN, rollbackInstant.requestedTime());
})
.collect(Collectors.toList());

for (HoodieWriteStat writeStat : hoodieWriteStats.stream().filter(
hoodieWriteStat -> !StringUtils.isNullOrEmpty(hoodieWriteStat.getFileId())).collect(Collectors.toList())) {
FileSlice latestFileSlice = latestFileSlices.get(writeStat.getFileId());
String fileId = writeStat.getFileId();
String latestBaseInstant = latestFileSlice.getBaseInstantTime();
Path fullLogFilePath = HadoopFSUtils.constructAbsolutePathInHadoopPath(table.getConfig().getBasePath(), writeStat.getPath());
Map<String, Long> logFilesWithBlocksToRollback = Collections.singletonMap(
fullLogFilePath.toString(), writeStat.getTotalWriteBytes() > 0 ? writeStat.getTotalWriteBytes() : 1L);
hoodieRollbackRequests.add(new HoodieRollbackRequest(partitionPath, fileId, latestBaseInstant,
Collections.emptyList(), logFilesWithBlocksToRollback));
}
return hoodieRollbackRequests;
}

private List<StoragePathInfo> listAllFilesSinceCommit(String commit,
String baseFileExtension,
String partitionPath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ public static void create(HoodieStorage storage, StoragePath metadataFolder, Pro
hoodieConfig.setDefaultValue(TIMELINE_PATH);
if (!hoodieConfig.contains(TIMELINE_LAYOUT_VERSION)) {
// Use latest Version as default unless forced by client
hoodieConfig.setValue(TIMELINE_LAYOUT_VERSION, TimelineLayoutVersion.CURR_VERSION.toString());
hoodieConfig.setValue(TIMELINE_LAYOUT_VERSION, TimelineLayoutVersion.LAYOUT_VERSION_1.toString());
}
if (hoodieConfig.contains(BOOTSTRAP_BASE_PATH)) {
if (tableVersion.greaterThan(HoodieTableVersion.SEVEN)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1386,7 +1386,7 @@ public Properties build() {
tableConfig.setValue(HoodieTableConfig.TYPE, tableType.name());

if (null == tableVersion) {
tableVersion = HoodieTableVersion.current();
tableVersion = HoodieTableVersion.SIX;
}

tableConfig.setTableVersion(tableVersion);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,13 +281,13 @@ private void scanInternalV1(Option<KeySpec> keySpecOpt) {
final String instantTime = logBlock.getLogBlockHeader().get(INSTANT_TIME);
totalLogBlocks.incrementAndGet();
if (logBlock.isDataOrDeleteBlock()) {
if (this.tableVersion.lesserThan(HoodieTableVersion.EIGHT)) {
if (!getOrCreateCompletedInstantsTimeline().containsOrBeforeTimelineStarts(instantTime)
|| getOrCreateInflightInstantsTimeline().containsInstant(instantTime)) {
// hit an uncommitted block possibly from a failed write, move to the next one and skip processing this one
continue;
}
}
// if (this.tableVersion.lesserThan(HoodieTableVersion.EIGHT)) {
// if (!getOrCreateCompletedInstantsTimeline().containsOrBeforeTimelineStarts(instantTime)
// || getOrCreateInflightInstantsTimeline().containsInstant(instantTime)) {
// // hit an uncommitted block possibly from a failed write, move to the next one and skip processing this one
// continue;
// }
// }
if (compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME), GREATER_THAN, this.latestInstantTime)) {
// Skip processing a data or delete block with the instant time greater than the latest instant time used by this log record reader
continue;
Expand Down Expand Up @@ -479,13 +479,13 @@ && compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME), GREATER_THA
continue;
}
if (logBlock.getBlockType() != COMMAND_BLOCK) {
if (this.tableVersion.lesserThan(HoodieTableVersion.EIGHT)) {
if (!getOrCreateCompletedInstantsTimeline().containsOrBeforeTimelineStarts(instantTime)
|| getOrCreateInflightInstantsTimeline().containsInstant(instantTime)) {
// hit an uncommitted block possibly from a failed write, move to the next one and skip processing this one
continue;
}
}
// if (this.tableVersion.lesserThan(HoodieTableVersion.EIGHT)) {
// if (!getOrCreateCompletedInstantsTimeline().containsOrBeforeTimelineStarts(instantTime)
// || getOrCreateInflightInstantsTimeline().containsInstant(instantTime)) {
// // hit an uncommitted block possibly from a failed write, move to the next one and skip processing this one
// continue;
// }
// }
if (instantRange.isPresent() && !instantRange.get().isInRange(instantTime)) {
// filter the log block by instant range
continue;
Expand Down
Loading