Skip to content

[DNM][HUDI-9030] Run Azure CI with table version 6 #12852

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public class HoodieWriteConfig extends HoodieConfig {

public static final ConfigProperty<Integer> WRITE_TABLE_VERSION = ConfigProperty
.key("hoodie.write.table.version")
.defaultValue(HoodieTableVersion.current().versionCode())
.defaultValue(HoodieTableVersion.SIX.versionCode())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, now I see why CI is failing.

Lets keep this as 8. But only in those 15 to 20 odd tests classes, we can add option to override "hoodie.write.table.version"

So, that we can still expect a green CI for rest of test classes.

.withValidValues(
String.valueOf(HoodieTableVersion.SIX.versionCode()),
String.valueOf(HoodieTableVersion.current().versionCode())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,10 @@ public Pair<Option<HoodieInstant>, Set<String>> getIncrementalPartitions(TableSe
.filter(this::filterCommitByTableType).flatMap(instant -> {
try {
String completionTime = instant.getCompletionTime();
if (completionTime.compareTo(leftBoundary) >= 0 && completionTime.compareTo(rightBoundary) < 0) {
// Just for test
if (completionTime.compareTo(leftBoundary) >= 0
&& ((instant.requestedTime().length() < completionTime.length() && instant.requestedTime().compareTo(rightBoundary) < 0)
|| completionTime.compareTo(rightBoundary) < 0)) {
HoodieCommitMetadata metadata = TimelineUtils.getCommitMetadata(instant, activeTimeline);
return metadata.getWriteStats().stream().map(HoodieWriteStat::getPartitionPath);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,9 @@ public Option<HoodieCompactionPlan> execute() {
.getWriteTimeline().filterCompletedAndCompactionInstants().getInstantsAsStream()
.filter(instant -> compareTimestamps(instant.requestedTime(), GREATER_THAN_OR_EQUALS, instantTime))
.collect(Collectors.toList());
ValidationUtils.checkArgument(conflictingInstants.isEmpty(),
"Following instants have timestamps >= compactionInstant (" + instantTime + ") Instants :"
+ conflictingInstants);
// ValidationUtils.checkArgument(conflictingInstants.isEmpty(),
// "Following instants have timestamps >= compactionInstant (" + instantTime + ") Instants :"
// + conflictingInstants);
}

HoodieCompactionPlan plan = scheduleCompaction();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,29 @@
import org.apache.hudi.avro.model.HoodieRollbackRequest;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.CompletionTimeQueryView;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathFilter;
import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.table.HoodieTable;

import org.apache.hadoop.fs.Path;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -46,14 +52,18 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN;
import static org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN_OR_EQUALS;
import static org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps;
import static org.apache.hudi.common.table.timeline.MetadataConversionUtils.getHoodieCommitMetadata;
import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
import static org.apache.hudi.table.action.rollback.BaseRollbackHelper.EMPTY_STRING;

/**
Expand Down Expand Up @@ -164,7 +174,9 @@ public List<HoodieRollbackRequest> getRollbackRequests(HoodieInstant instantToRo
// We do not know fileIds for inserts (first inserts are either log files or base files),
// delete all files for the corresponding failed commit, if present (same as COW)
hoodieRollbackRequests.addAll(getHoodieRollbackRequests(partitionPath, filesToDelete.get()));

if (metaClient.getTableConfig().getTableVersion().lesserThan(HoodieTableVersion.EIGHT)) {
hoodieRollbackRequests.addAll(getRollbackRequestToAppendForVersionSix(partitionPath, instantToRollback, commitMetadataOptional.get(), table));
}
break;
default:
throw new HoodieRollbackException("Unknown listing type, during rollback of " + instantToRollback);
Expand All @@ -181,6 +193,62 @@ public List<HoodieRollbackRequest> getRollbackRequests(HoodieInstant instantToRo
}
}

public static List<HoodieRollbackRequest> getRollbackRequestToAppendForVersionSix(String partitionPath, HoodieInstant rollbackInstant,
HoodieCommitMetadata commitMetadata, HoodieTable<?, ?, ?, ?> table) {
List<HoodieRollbackRequest> hoodieRollbackRequests = new ArrayList<>();
checkArgument(table.version().lesserThan(HoodieTableVersion.EIGHT));
checkArgument(rollbackInstant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION));

// wStat.getPrevCommit() might not give the right commit time in the following
// scenario : If a compaction was scheduled, the new commitTime associated with the requested compaction will be
// used to write the new log files. In this case, the commit time for the log file is the compaction requested time.
// But the index (global) might store the baseCommit of the base and not the requested, hence get the
// baseCommit always by listing the file slice
// With multi writers, rollbacks could be lazy. and so we need to use getLatestFileSlicesBeforeOrOn() instead of getLatestFileSlices()
Map<String, FileSlice> latestFileSlices = table.getSliceView()
.getLatestFileSlicesBeforeOrOn(partitionPath, rollbackInstant.requestedTime(), true)
.collect(Collectors.toMap(FileSlice::getFileId, Function.identity()));

List<HoodieWriteStat> hoodieWriteStats = Option.ofNullable(commitMetadata.getPartitionToWriteStats().get(partitionPath)).orElse(Collections.emptyList());
hoodieWriteStats = hoodieWriteStats.stream()
.filter(writeStat -> {
// Filter out stats without prevCommit since they are all inserts
boolean validForRollback = (writeStat != null) && (!writeStat.getPrevCommit().equals(HoodieWriteStat.NULL_COMMIT))
&& (writeStat.getPrevCommit() != null) && latestFileSlices.containsKey(writeStat.getFileId());

if (!validForRollback) {
return false;
}

FileSlice latestFileSlice = latestFileSlices.get(writeStat.getFileId());

// For sanity, log-file base-instant time can never be less than base-commit on which we are rolling back
checkArgument(
compareTimestamps(latestFileSlice.getBaseInstantTime(), LESSER_THAN_OR_EQUALS, rollbackInstant.requestedTime()),
"Log-file base-instant could not be less than the instant being rolled back");

// Command block "rolling back" the preceding block {@link HoodieCommandBlockTypeEnum#ROLLBACK_PREVIOUS_BLOCK}
// w/in the latest file-slice is appended iff base-instant of the log-file is _strictly_ less
// than the instant of the Delta Commit being rolled back. Otherwise, log-file will be cleaned up
// in a different branch of the flow.
return compareTimestamps(latestFileSlice.getBaseInstantTime(), LESSER_THAN, rollbackInstant.requestedTime());
})
.collect(Collectors.toList());

for (HoodieWriteStat writeStat : hoodieWriteStats.stream().filter(
hoodieWriteStat -> !StringUtils.isNullOrEmpty(hoodieWriteStat.getFileId())).collect(Collectors.toList())) {
FileSlice latestFileSlice = latestFileSlices.get(writeStat.getFileId());
String fileId = writeStat.getFileId();
String latestBaseInstant = latestFileSlice.getBaseInstantTime();
Path fullLogFilePath = HadoopFSUtils.constructAbsolutePathInHadoopPath(table.getConfig().getBasePath(), writeStat.getPath());
Map<String, Long> logFilesWithBlocksToRollback = Collections.singletonMap(
fullLogFilePath.toString(), writeStat.getTotalWriteBytes() > 0 ? writeStat.getTotalWriteBytes() : 1L);
hoodieRollbackRequests.add(new HoodieRollbackRequest(partitionPath, fileId, latestBaseInstant,
Collections.emptyList(), logFilesWithBlocksToRollback));
}
return hoodieRollbackRequests;
}

private List<StoragePathInfo> listAllFilesSinceCommit(String commit,
String baseFileExtension,
String partitionPath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public List<HoodieRollbackRequest> getRollbackRequests(HoodieInstant instantToRo
switch (type) {
case MERGE:
case CREATE:
return createRollbackRequestForCreateAndMerge(fileId, partitionPath, filePath, instantToRollback);
return createRollbackRequestForCreateAndMerge(fileId, partitionPath, filePath, instantToRollback, filePathStr);
case APPEND:
return createRollbackRequestForAppend(fileId, partitionPath, filePath, instantToRollback, filePathStr);
default:
Expand All @@ -106,10 +106,8 @@ public List<HoodieRollbackRequest> getRollbackRequests(HoodieInstant instantToRo
}
}

protected HoodieRollbackRequest createRollbackRequestForCreateAndMerge(String fileId,
String partitionPath,
StoragePath filePath,
HoodieInstant instantToRollback) {
protected HoodieRollbackRequest createRollbackRequestForCreateAndMerge(String fileId, String partitionPath, StoragePath filePath,
HoodieInstant instantToRollback, String filePathToRollback) {
if (table.version().greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
return new HoodieRollbackRequest(partitionPath, fileId, instantToRollback.requestedTime(),
Collections.singletonList(filePath.toString()), Collections.emptyMap());
Expand All @@ -120,7 +118,7 @@ protected HoodieRollbackRequest createRollbackRequestForCreateAndMerge(String fi
fileId = baseFileToDelete.getFileId();
baseInstantTime = baseFileToDelete.getCommitTime();
} else if (FSUtils.isLogFile(filePath)) {
throw new HoodieRollbackException("Log files should have only APPEND as IOTypes " + filePath);
return createRollbackRequestForAppend(fileId, partitionPath, filePath, instantToRollback, filePathToRollback);
}
Objects.requireNonNull(fileId, "Cannot find valid fileId from path: " + filePath);
Objects.requireNonNull(baseInstantTime, "Cannot find valid base instant from path: " + filePath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.InstantFileNameGenerator;
Expand Down Expand Up @@ -62,6 +63,7 @@ public class SparkBroadcastManager extends EngineBroadcastManager {

private final transient HoodieEngineContext context;
private final transient HoodieTableMetaClient metaClient;
private final HoodieTableVersion tableVersion;

protected Option<SparkParquetReader> parquetReaderOpt = Option.empty();
protected Broadcast<SQLConf> sqlConfBroadcast;
Expand All @@ -71,6 +73,7 @@ public class SparkBroadcastManager extends EngineBroadcastManager {
public SparkBroadcastManager(HoodieEngineContext context, HoodieTableMetaClient metaClient) {
this.context = context;
this.metaClient = metaClient;
this.tableVersion = metaClient.getTableConfig().getTableVersion();
}

@Override
Expand Down Expand Up @@ -116,10 +119,8 @@ public Option<HoodieReaderContext> retrieveFileGroupReaderContext(StoragePath ba
SparkParquetReader sparkParquetReader = parquetReaderBroadcast.getValue();
if (sparkParquetReader != null) {
List<Filter> filters = new ArrayList<>();
return Option.of(new SparkFileFormatInternalRowReaderContext(
sparkParquetReader,
JavaConverters.asScalaBufferConverter(filters).asScala().toSeq(),
JavaConverters.asScalaBufferConverter(filters).asScala().toSeq()));
return Option.of(new SparkFileFormatInternalRowReaderContext(sparkParquetReader, JavaConverters.asScalaBufferConverter(filters).asScala().toSeq(),
JavaConverters.asScalaBufferConverter(filters).asScala().toSeq(), tableVersion));
} else {
throw new HoodieException("Cannot get the broadcast Spark Parquet reader.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.hudi.util.CloseableInternalRowIterator
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericRecord, IndexedRecord}
import org.apache.hadoop.conf.Configuration
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableVersion}
import org.apache.spark.sql.HoodieInternalRowUtils
import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer}
import org.apache.spark.sql.catalyst.InternalRow
Expand All @@ -59,17 +60,16 @@ import scala.collection.mutable
* @param filters spark filters that might be pushed down into the reader
* @param requiredFilters filters that are required and should always be used, even in merging situations
*/
class SparkFileFormatInternalRowReaderContext(parquetFileReader: SparkParquetReader,
filters: Seq[Filter],
requiredFilters: Seq[Filter]) extends BaseSparkInternalRowReaderContext {
class SparkFileFormatInternalRowReaderContext(parquetFileReader: SparkParquetReader, filters: Seq[Filter],
requiredFilters: Seq[Filter], tableVersion: HoodieTableVersion) extends BaseSparkInternalRowReaderContext {
lazy val sparkAdapter: SparkAdapter = SparkAdapterSupport.sparkAdapter
private lazy val bootstrapSafeFilters: Seq[Filter] = filters.filter(filterIsSafeForBootstrap) ++ requiredFilters
private val deserializerMap: mutable.Map[Schema, HoodieAvroDeserializer] = mutable.Map()
private val serializerMap: mutable.Map[Schema, HoodieAvroSerializer] = mutable.Map()
private lazy val allFilters = filters ++ requiredFilters

override def supportsParquetRowIndex: Boolean = {
HoodieSparkUtils.gteqSpark3_5
HoodieSparkUtils.gteqSpark3_5 && tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)
}

override def getFileRecordIterator(filePath: StoragePath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ public static void create(HoodieStorage storage, StoragePath metadataFolder, Pro
hoodieConfig.setDefaultValue(TIMELINE_PATH);
if (!hoodieConfig.contains(TIMELINE_LAYOUT_VERSION)) {
// Use latest Version as default unless forced by client
hoodieConfig.setValue(TIMELINE_LAYOUT_VERSION, TimelineLayoutVersion.CURR_VERSION.toString());
hoodieConfig.setValue(TIMELINE_LAYOUT_VERSION, TimelineLayoutVersion.LAYOUT_VERSION_1.toString());
}
if (hoodieConfig.contains(BOOTSTRAP_BASE_PATH)) {
if (tableVersion.greaterThan(HoodieTableVersion.SEVEN)) {
Expand Down Expand Up @@ -632,6 +632,8 @@ static boolean validateConfigVersion(ConfigProperty<?> configProperty, HoodieTab
// validate that the table version is greater than or equal to the config version
HoodieTableVersion firstVersion = HoodieTableVersion.fromReleaseVersion(configProperty.getSinceVersion().get());
boolean valid = tableVersion.greaterThan(firstVersion) || tableVersion.equals(firstVersion);
valid = valid || (tableVersion.lesserThan(HoodieTableVersion.EIGHT) && (configProperty.key().equals(KEY_GENERATOR_CLASS_NAME.key())
|| configProperty.key().equals(KEY_GENERATOR_TYPE.key())));
if (!valid) {
LOG.warn("Table version {} is lower than or equal to config's first version {}. Config {} will be ignored.",
tableVersion, firstVersion, configProperty.key());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_STRATEGY_ID;
import static org.apache.hudi.common.table.HoodieTableConfig.TIMELINE_PATH;
import static org.apache.hudi.common.table.HoodieTableConfig.VERSION;
import static org.apache.hudi.common.table.HoodieTableConfig.inferCorrectMergingBehavior;
import static org.apache.hudi.common.util.ConfigUtils.containsConfigProperty;
import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
Expand Down Expand Up @@ -1386,7 +1387,7 @@ public Properties build() {
tableConfig.setValue(HoodieTableConfig.TYPE, tableType.name());

if (null == tableVersion) {
tableVersion = HoodieTableVersion.current();
tableVersion = HoodieTableVersion.SIX;
}

tableConfig.setTableVersion(tableVersion);
Expand Down
Loading