Skip to content

[HUDI-8474] Metadata table upsert prepped optimized #13005

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,19 @@ public void bootstrap(Option<Map<String, String>> extraMetadata) {
*/
public abstract O upsertPreppedRecords(I preppedRecords, final String instantTime);

/**
* Upserts the given prepared records into the Hoodie table, at the supplied instantTime.
* <p>
* This implementation requires that the input records are already tagged, and de-duped if needed.
*
* @param preppedRecords Prepared HoodieRecords to upsert
* @param instantTime Instant time of the commit
* @return Collection of WriteStatus to inspect errors and counts
*/
public O upsertPreppedRecords(I preppedRecords, final String instantTime, Option<List<Pair<String, String>>> partitionFileIdPairsHolderOpt) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the partitionFileIdPairsHolderOpt be huge? Can we design it as lazy fetched for each partition?

return upsertPreppedRecords(preppedRecords, instantTime);
}

/**
* Inserts the given HoodieRecords, into the table. This API is intended to be used for normal writes.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1344,7 +1344,9 @@ public void close() throws Exception {
protected void commitInternal(String instantTime, Map<String, HoodieData<HoodieRecord>> partitionRecordsMap, boolean isInitializing,
Option<BulkInsertPartitioner> bulkInsertPartitioner) {
ValidationUtils.checkState(metadataMetaClient != null, "Metadata table is not fully initialized yet.");
HoodieData<HoodieRecord> preppedRecords = prepRecords(partitionRecordsMap);
Pair<HoodieData<HoodieRecord>, List<Pair<String, String>>> result = prepRecords(partitionRecordsMap);
HoodieData<HoodieRecord> preppedRecords = result.getKey();
List<Pair<String, String>> partitionFileIdPairs = result.getValue();
I preppedRecordInputs = convertHoodieDataToEngineSpecificData(preppedRecords);

BaseHoodieWriteClient<?, I, ?, ?> writeClient = getWriteClient();
Expand Down Expand Up @@ -1382,7 +1384,8 @@ protected void commitInternal(String instantTime, Map<String, HoodieData<HoodieR
writeClient.bulkInsertPreppedRecords(preppedRecordInputs, instantTime, bulkInsertPartitioner);
} else {
engineContext.setJobStatus(this.getClass().getSimpleName(), String.format("Upserting at %s into metadata table %s", instantTime, metadataWriteConfig.getTableName()));
writeClient.upsertPreppedRecords(preppedRecordInputs, instantTime);
// last argument is required so that we take optimized writes flow for metadata table.
writeClient.upsertPreppedRecords(preppedRecordInputs, instantTime, Option.of(partitionFileIdPairs));
}

metadataMetaClient.reloadActiveTimeline();
Expand Down Expand Up @@ -1433,7 +1436,7 @@ protected abstract void bulkCommit(
* Tag each record with the location in the given partition.
* The record is tagged with respective file slice's location based on its record key.
*/
protected HoodieData<HoodieRecord> prepRecords(Map<String, HoodieData<HoodieRecord>> partitionRecordsMap) {
protected Pair<HoodieData<HoodieRecord>, List<Pair<String, String>>> prepRecords(Map<String, HoodieData<HoodieRecord>> partitionRecordsMap) {
// The result set
HoodieData<HoodieRecord> allPartitionRecords = engineContext.emptyHoodieData();
try (HoodieTableFileSystemView fsView = HoodieTableMetadataUtil.getFileSystemViewForMetadataTable(metadataMetaClient)) {
Expand Down Expand Up @@ -1462,7 +1465,10 @@ protected HoodieData<HoodieRecord> prepRecords(Map<String, HoodieData<HoodieReco

allPartitionRecords = allPartitionRecords.union(rddSinglePartitionRecords);
}
return allPartitionRecords;

List<Pair<String, String>> partitionFileIdPairs = allPartitionRecords.map(record -> Pair.of(record.getPartitionPath(), record.getCurrentLocation().getFileId()))
.distinct().collectAsList();
return Pair.of(allPartitionRecords, partitionFileIdPairs);
}
}

Expand Down Expand Up @@ -1726,4 +1732,5 @@ public boolean isInitialized() {
}

protected abstract BaseHoodieWriteClient<?, I, ?, ?> initializeWriteClient();

}
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,11 @@ public abstract HoodieWriteMetadata<O> bulkInsert(HoodieEngineContext context, S
public abstract HoodieWriteMetadata<O> upsertPrepped(HoodieEngineContext context, String instantTime,
I preppedRecords);

public HoodieWriteMetadata<O> upsertPrepped(HoodieEngineContext context, String instantTime,
I preppedRecords, Option<List<Pair<String, String>>> partitionFileIdPairsOpt) {
return upsertPrepped(context, instantTime, preppedRecords);
}

/**
* Inserts the given prepared records into the Hoodie table, at the supplied instantTime.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public HoodieWriteMetadata<O> execute(I inputRecords, Option<HoodieTimer> source
* are unknown across batches Inserts (which are new parquet files) are rolled back based on commit time. // TODO :
* Create a new WorkloadProfile metadata file instead of using HoodieCommitMetadata
*/
void saveWorkloadProfileMetadataToInflight(WorkloadProfile profile, String instantTime)
protected void saveWorkloadProfileMetadataToInflight(WorkloadProfile profile, String instantTime)
throws HoodieCommitException {
try {
HoodieCommitMetadata metadata = new HoodieCommitMetadata();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ protected void bulkCommit(String instantTime, String partitionName, HoodieData<H
protected void commitInternal(String instantTime, Map<String, HoodieData<HoodieRecord>> partitionRecordsMap, boolean isInitializing,
Option<BulkInsertPartitioner> bulkInsertPartitioner) {
ValidationUtils.checkState(metadataMetaClient != null, "Metadata table is not fully initialized yet.");
HoodieData<HoodieRecord> preppedRecords = prepRecords(partitionRecordsMap);
HoodieData<HoodieRecord> preppedRecords = prepRecords(partitionRecordsMap).getKey();
List<HoodieRecord> preppedRecordList = preppedRecords.collectAsList();

// Flink engine does not optimize initialCommit to MDT as bulk insert is not yet supported
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.client;

import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.index.HoodieSparkIndexClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
Expand Down Expand Up @@ -148,6 +149,20 @@ public JavaRDD<WriteStatus> upsertPreppedRecords(JavaRDD<HoodieRecord<T>> preppe
return postWrite(resultRDD, instantTime, table);
}

/**
* For metadata table.
*/
@Override
public JavaRDD<WriteStatus> upsertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords, String instantTime, Option<List<Pair<String, String>>> partitionFileIdPairsOpt) {
HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table =
initTable(WriteOperationType.UPSERT_PREPPED, Option.ofNullable(instantTime));
table.validateUpsertSchema();
preWrite(instantTime, WriteOperationType.UPSERT_PREPPED, table.getMetaClient());
HoodieWriteMetadata<HoodieData<WriteStatus>> result = table.upsertPrepped(context, instantTime, HoodieJavaRDD.of(preppedRecords), partitionFileIdPairsOpt);
HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));
return postWrite(resultRDD, instantTime, table);
}

@Override
public JavaRDD<WriteStatus> insert(JavaRDD<HoodieRecord<T>> records, String instantTime) {
HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
Expand All @@ -43,6 +45,7 @@
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
import org.apache.hudi.table.action.bootstrap.SparkBootstrapDeltaCommitActionExecutor;
import org.apache.hudi.table.action.commit.SparkMetadataTableUpsertCommitActionExecutor;
import org.apache.hudi.table.action.compact.HoodieSparkMergeOnReadTableCompactor;
import org.apache.hudi.table.action.compact.RunCompactionActionExecutor;
import org.apache.hudi.table.action.compact.ScheduleCompactionActionExecutor;
Expand Down Expand Up @@ -126,6 +129,15 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>> upsertPrepped(HoodieEngineCo
return new SparkUpsertPreppedDeltaCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, preppedRecords).execute();
}

@Override
public HoodieWriteMetadata<HoodieData<WriteStatus>> upsertPrepped(HoodieEngineContext context, String instantTime,
HoodieData<HoodieRecord<T>> preppedRecords, Option<List<Pair<String, String>>> partitionFileIdPairsOpt) {
ValidationUtils.checkArgument(partitionFileIdPairsOpt.isPresent(), "ParitionFileIdPairsHolder is expected to be present with upsert prepped for metadata table");
// Uses optimized upsert partitoner for metadata table when all records are upsert and locaitons are known upfront
return new SparkMetadataTableUpsertCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, preppedRecords,
partitionFileIdPairsOpt.get()).execute();
}

@Override
public HoodieWriteMetadata<HoodieData<WriteStatus>> insertPrepped(HoodieEngineContext context, String instantTime,
HoodieData<HoodieRecord<T>> preppedRecords) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public BaseSparkCommitActionExecutor(HoodieEngineContext context,
keyGeneratorOpt = HoodieSparkKeyGeneratorFactory.createBaseKeyGenerator(config);
}

private HoodieData<HoodieRecord<T>> clusteringHandleUpdate(HoodieData<HoodieRecord<T>> inputRecords) {
protected HoodieData<HoodieRecord<T>> clusteringHandleUpdate(HoodieData<HoodieRecord<T>> inputRecords) {
context.setJobStatus(this.getClass().getSimpleName(), "Handling updates which are under clustering: " + config.getTableName());
Set<HoodieFileGroupId> fileGroupsInPendingClustering =
table.getFileSystemView().getFileGroupsInPendingClustering().map(Pair::getKey).collect(Collectors.toSet());
Expand Down Expand Up @@ -155,7 +155,7 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>> execute(HoodieData<HoodieRec
public HoodieWriteMetadata<HoodieData<WriteStatus>> execute(HoodieData<HoodieRecord<T>> inputRecords, Option<HoodieTimer> sourceReadAndIndexTimer) {
// Cache the tagged records, so we don't end up computing both
JavaRDD<HoodieRecord<T>> inputRDD = HoodieJavaRDD.getJavaRDD(inputRecords);
if (!config.isSourceRddPersisted() || inputRDD.getStorageLevel() == StorageLevel.NONE()) {
if (shouldPersistInputRecords(inputRDD)) {
HoodieJavaRDD.of(inputRDD).persist(config.getTaggedRecordStorageLevel(),
context, HoodieDataCacheKey.of(config.getBasePath(), instantTime));
} else {
Expand All @@ -166,18 +166,13 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>> execute(HoodieData<HoodieRec
HoodieData<HoodieRecord<T>> inputRecordsWithClusteringUpdate = clusteringHandleUpdate(inputRecords);
LOG.info("Num spark partitions for inputRecords before triggering workload profile {}", inputRecordsWithClusteringUpdate.getNumPartitions());

context.setJobStatus(this.getClass().getSimpleName(), "Building workload profile:" + config.getTableName());
WorkloadProfile workloadProfile =
new WorkloadProfile(buildProfile(inputRecordsWithClusteringUpdate), operationType, table.getIndex().canIndexLogFiles());
LOG.debug("Input workload profile :{}", workloadProfile);
WorkloadProfile workloadProfile = prepareWorkloadProfileAndSaveToInflight(inputRecordsWithClusteringUpdate);
Long sourceReadAndIndexDurationMs = null;
if (sourceReadAndIndexTimer.isPresent()) {
sourceReadAndIndexDurationMs = sourceReadAndIndexTimer.get().endTimer();
LOG.info("Source read and index timer {}", sourceReadAndIndexDurationMs);
}
// partition using the insert partitioner
final Partitioner partitioner = getPartitioner(workloadProfile);
saveWorkloadProfileMetadataToInflight(workloadProfile, instantTime);

context.setJobStatus(this.getClass().getSimpleName(), "Doing partition and writing data: " + config.getTableName());
HoodieData<WriteStatus> writeStatuses = mapPartitionsAsRDD(inputRecordsWithClusteringUpdate, partitioner);
Expand All @@ -189,6 +184,20 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>> execute(HoodieData<HoodieRec
return result;
}

protected WorkloadProfile prepareWorkloadProfileAndSaveToInflight(HoodieData<HoodieRecord<T>> inputRecordsWithClusteringUpdate) {
context.setJobStatus(this.getClass().getSimpleName(), "Building workload profile:" + config.getTableName());
WorkloadProfile workloadProfile =
new WorkloadProfile(buildProfile(inputRecordsWithClusteringUpdate), operationType, table.getIndex().canIndexLogFiles());
LOG.debug("Input workload profile :{}", workloadProfile);
// partition using the insert partitioner
saveWorkloadProfileMetadataToInflight(workloadProfile, instantTime);
return workloadProfile;
}

protected boolean shouldPersistInputRecords(JavaRDD<HoodieRecord<T>> inputRDD) {
return !config.isSourceRddPersisted() || inputRDD.getStorageLevel() == StorageLevel.NONE();
}

/**
* Count the number of updates/inserts for each file in each partition.
*/
Expand Down Expand Up @@ -237,7 +246,7 @@ protected Partitioner getPartitioner(WorkloadProfile profile) {
}
}

private HoodieData<WriteStatus> mapPartitionsAsRDD(HoodieData<HoodieRecord<T>> dedupedRecords, Partitioner partitioner) {
protected HoodieData<WriteStatus> mapPartitionsAsRDD(HoodieData<HoodieRecord<T>> dedupedRecords, Partitioner partitioner) {
JavaPairRDD<Tuple2<HoodieKey, Option<HoodieRecordLocation>>, HoodieRecord<T>> mappedRDD = HoodieJavaPairRDD.getJavaPairRDD(
dedupedRecords.mapToPair(record -> Pair.of(new Tuple2<>(record.getKey(), Option.ofNullable(record.getCurrentLocation())), record)));

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.table.action.commit;

import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.WorkloadStat;
import org.apache.hudi.table.action.deltacommit.SparkUpsertPreppedDeltaCommitActionExecutor;

import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.storage.StorageLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Upsert commit action executor for Metadata table.
*
* @param <T>
*/
public class SparkMetadataTableUpsertCommitActionExecutor<T> extends SparkUpsertPreppedDeltaCommitActionExecutor<T> {
private static final Logger LOG = LoggerFactory.getLogger(SparkMetadataTableUpsertCommitActionExecutor.class);

private static final HashMap<String, WorkloadStat> EMPTY_MAP = new HashMap<>();
private static final WorkloadStat PLACEHOLDER_GLOBAL_STAT = new WorkloadStat();

private final List<Pair<String, String>> mdtPartitionPathFileGroupIdList;

public SparkMetadataTableUpsertCommitActionExecutor(HoodieSparkEngineContext context, HoodieWriteConfig config, HoodieTable table, String instantTime,
HoodieData<HoodieRecord<T>> preppedRecords, List<Pair<String, String>> mdtPartitionPathFileGroupIdList) {
super(context, config, table, instantTime, preppedRecords);
this.mdtPartitionPathFileGroupIdList = mdtPartitionPathFileGroupIdList;
}

@Override
protected boolean shouldPersistInputRecords(JavaRDD<HoodieRecord<T>> inputRDD) {
return inputRDD.getStorageLevel() == StorageLevel.NONE();
}

@Override
protected WorkloadProfile prepareWorkloadProfileAndSaveToInflight(HoodieData<HoodieRecord<T>> inputRecordsWithClusteringUpdate) {
// create workload profile only when we are writing to FILES partition in Metadata table.
WorkloadProfile workloadProfile = new WorkloadProfile(Pair.of(EMPTY_MAP, PLACEHOLDER_GLOBAL_STAT));
//if (mdtPartitionPathFileGroupIdList.size() == 1 && mdtPartitionPathFileGroupIdList.get(0).getKey().equals(FILES.getPartitionPath())) {
saveWorkloadProfileMetadataToInflight(workloadProfile, instantTime);
//}
return workloadProfile;
}

@Override
protected Partitioner getPartitioner(WorkloadProfile profile) {
List<BucketInfo> bucketInfoList = new ArrayList<>();
Map<String, Integer> fileIdToSparkPartitionIndexMap = new HashMap<>();
int counter = 0;
while (counter < mdtPartitionPathFileGroupIdList.size()) {
Pair<String, String> partitionPathFileIdPair = mdtPartitionPathFileGroupIdList.get(counter);
fileIdToSparkPartitionIndexMap.put(partitionPathFileIdPair.getValue(), counter);
bucketInfoList.add(new BucketInfo(BucketType.UPDATE, partitionPathFileIdPair.getValue(), partitionPathFileIdPair.getKey()));
counter++;
}
return new SparkMetadataTableUpsertPartitioner(bucketInfoList, fileIdToSparkPartitionIndexMap);
}

@Override
protected HoodieData<HoodieRecord<T>> clusteringHandleUpdate(HoodieData<HoodieRecord<T>> inputRecords) {
return inputRecords;
}

}
Loading
Loading