Skip to content

[HUDI-8474] Metadata table upsert prepped optimized #13005

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
import org.apache.hudi.internal.schema.utils.SerDeHelper;
import org.apache.hudi.keygen.constant.KeyGeneratorType;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.MetadataPartitionType;
Expand Down Expand Up @@ -142,6 +143,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
protected Set<String> pendingInflightAndRequestedInstants = Collections.emptySet();

protected BaseHoodieTableServiceClient<?, ?, O> tableServiceClient;
protected boolean isMetadataTable;

/**
* Create a write client, with new hudi index.
Expand Down Expand Up @@ -172,6 +174,7 @@ public BaseHoodieWriteClient(HoodieEngineContext context,
this.index = createIndex(writeConfig);
this.upgradeDowngradeHelper = upgradeDowngradeHelper;
this.metrics.emitIndexTypeMetrics(config.getIndexType().ordinal());
this.isMetadataTable = HoodieTableMetadata.isMetadataTable(config.getBasePath());
}

@VisibleForTesting
Expand Down Expand Up @@ -428,6 +431,19 @@ public void bootstrap(Option<Map<String, String>> extraMetadata) {
*/
public abstract O upsertPreppedRecords(I preppedRecords, final String instantTime);

/**
* Upserts the given prepared records into the Hoodie table, at the supplied instantTime.
* <p>
* This implementation requires that the input records are already tagged, and de-duped if needed.
*
* @param preppedRecords Prepared HoodieRecords to upsert
* @param instantTime Instant time of the commit
* @return Collection of WriteStatus to inspect errors and counts
*/
public O upsertPreppedRecords(I preppedRecords, final String instantTime, Option<List<Pair<String, String>>> partitionFileIdPairsOpt) {
return upsertPreppedRecords(preppedRecords, instantTime);
}

/**
* Inserts the given HoodieRecords, into the table. This API is intended to be used for normal writes.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1408,6 +1408,8 @@ protected void commitInternal(String instantTime, Map<String, HoodieData<HoodieR
bulkInsertAndCommit(writeClient, instantTime, preppedRecordInputs, bulkInsertPartitioner);
} else {
engineContext.setJobStatus(this.getClass().getSimpleName(), String.format("Upserting at %s into metadata table %s", instantTime, metadataWriteConfig.getTableName()));
// to do: fix the last argument is required so that we can support streaming writes to metadata table.
// Option.of(partitionFileIdPairs)
upsertAndCommit(writeClient, instantTime, preppedRecordInputs);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,11 @@ public abstract HoodieWriteMetadata<O> bulkInsert(HoodieEngineContext context, S
public abstract HoodieWriteMetadata<O> upsertPrepped(HoodieEngineContext context, String instantTime,
I preppedRecords);

public HoodieWriteMetadata<O> upsertPrepped(HoodieEngineContext context, String instantTime,
I preppedRecords, Option<List<Pair<String, String>>> partitionFileIdPairsOpt) {
return upsertPrepped(context, instantTime, preppedRecords);
}

/**
* Inserts the given prepared records into the Hoodie table, at the supplied instantTime.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public HoodieWriteMetadata<O> execute(I inputRecords, Option<HoodieTimer> source
* are unknown across batches Inserts (which are new parquet files) are rolled back based on commit time. // TODO :
* Create a new WorkloadProfile metadata file instead of using HoodieCommitMetadata
*/
void saveWorkloadProfileMetadataToInflight(WorkloadProfile profile, String instantTime)
protected void saveWorkloadProfileMetadataToInflight(WorkloadProfile profile, String instantTime)
throws HoodieCommitException {
try {
HoodieCommitMetadata metadata = new HoodieCommitMetadata();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hudi.client;

import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.index.HoodieSparkIndexClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
Expand Down Expand Up @@ -148,6 +150,21 @@ public JavaRDD<WriteStatus> upsertPreppedRecords(JavaRDD<HoodieRecord<T>> preppe
return postWrite(resultRDD, instantTime, table);
}

/**
* Used for streaming writes to metadata table.
*/
@Override
public JavaRDD<WriteStatus> upsertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords, String instantTime, Option<List<Pair<String, String>>> partitionFileIdPairsOpt) {
ValidationUtils.checkArgument(isMetadataTable, "This version of upsert prepped can only be invoked for metadata table");
HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table =
initTable(WriteOperationType.UPSERT_PREPPED, Option.ofNullable(instantTime));
table.validateUpsertSchema();
preWrite(instantTime, WriteOperationType.UPSERT_PREPPED, table.getMetaClient());
HoodieWriteMetadata<HoodieData<WriteStatus>> result = table.upsertPrepped(context, instantTime, HoodieJavaRDD.of(preppedRecords), partitionFileIdPairsOpt);
HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));
return postWrite(resultRDD, instantTime, table);
}

@Override
public JavaRDD<WriteStatus> insert(JavaRDD<HoodieRecord<T>> records, String instantTime) {
HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
Expand All @@ -43,6 +45,7 @@
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
import org.apache.hudi.table.action.bootstrap.SparkBootstrapDeltaCommitActionExecutor;
import org.apache.hudi.table.action.commit.SparkMetadataTableUpsertCommitActionExecutor;
import org.apache.hudi.table.action.compact.HoodieSparkMergeOnReadTableCompactor;
import org.apache.hudi.table.action.compact.RunCompactionActionExecutor;
import org.apache.hudi.table.action.compact.ScheduleCompactionActionExecutor;
Expand Down Expand Up @@ -126,6 +129,15 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>> upsertPrepped(HoodieEngineCo
return new SparkUpsertPreppedDeltaCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, preppedRecords).execute();
}

@Override
public HoodieWriteMetadata<HoodieData<WriteStatus>> upsertPrepped(HoodieEngineContext context, String instantTime,
HoodieData<HoodieRecord<T>> preppedRecords, Option<List<Pair<String, String>>> partitionFileIdPairsOpt) {
ValidationUtils.checkArgument(partitionFileIdPairsOpt.isPresent(), "ParitionFileIdPairs is expected to be present with upsert prepped for metadata table");
// Uses optimized upsert partitioner for metadata table when all records are upsert and locations are known upfront
return new SparkMetadataTableUpsertCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, preppedRecords,
partitionFileIdPairsOpt.get()).execute();
}

@Override
public HoodieWriteMetadata<HoodieData<WriteStatus>> insertPrepped(HoodieEngineContext context, String instantTime,
HoodieData<HoodieRecord<T>> preppedRecords) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public BaseSparkCommitActionExecutor(HoodieEngineContext context,
keyGeneratorOpt = HoodieSparkKeyGeneratorFactory.createBaseKeyGenerator(config);
}

private HoodieData<HoodieRecord<T>> clusteringHandleUpdate(HoodieData<HoodieRecord<T>> inputRecords) {
protected HoodieData<HoodieRecord<T>> clusteringHandleUpdate(HoodieData<HoodieRecord<T>> inputRecords) {
context.setJobStatus(this.getClass().getSimpleName(), "Handling updates which are under clustering: " + config.getTableName());
Set<HoodieFileGroupId> fileGroupsInPendingClustering =
table.getFileSystemView().getFileGroupsInPendingClustering().map(Pair::getKey).collect(Collectors.toSet());
Expand Down Expand Up @@ -155,7 +155,7 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>> execute(HoodieData<HoodieRec
public HoodieWriteMetadata<HoodieData<WriteStatus>> execute(HoodieData<HoodieRecord<T>> inputRecords, Option<HoodieTimer> sourceReadAndIndexTimer) {
// Cache the tagged records, so we don't end up computing both
JavaRDD<HoodieRecord<T>> inputRDD = HoodieJavaRDD.getJavaRDD(inputRecords);
if (!config.isSourceRddPersisted() || inputRDD.getStorageLevel() == StorageLevel.NONE()) {
if (shouldPersistInputRecords(inputRDD)) {
HoodieJavaRDD.of(inputRDD).persist(config.getTaggedRecordStorageLevel(),
context, HoodieDataCacheKey.of(config.getBasePath(), instantTime));
} else {
Expand All @@ -166,17 +166,15 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>> execute(HoodieData<HoodieRec
HoodieData<HoodieRecord<T>> inputRecordsWithClusteringUpdate = clusteringHandleUpdate(inputRecords);
LOG.info("Num spark partitions for inputRecords before triggering workload profile {}", inputRecordsWithClusteringUpdate.getNumPartitions());

context.setJobStatus(this.getClass().getSimpleName(), "Building workload profile:" + config.getTableName());
WorkloadProfile workloadProfile =
new WorkloadProfile(buildProfile(inputRecordsWithClusteringUpdate), operationType, table.getIndex().canIndexLogFiles());
LOG.debug("Input workload profile :{}", workloadProfile);
WorkloadProfile workloadProfile = prepareWorkloadProfile(inputRecordsWithClusteringUpdate);
Long sourceReadAndIndexDurationMs = null;
if (sourceReadAndIndexTimer.isPresent()) {
sourceReadAndIndexDurationMs = sourceReadAndIndexTimer.get().endTimer();
LOG.info("Source read and index timer {}", sourceReadAndIndexDurationMs);
}
// partition using the insert partitioner
final Partitioner partitioner = getPartitioner(workloadProfile);

// partition using the insert partitioner
saveWorkloadProfileMetadataToInflight(workloadProfile, instantTime);

context.setJobStatus(this.getClass().getSimpleName(), "Doing partition and writing data: " + config.getTableName());
Expand All @@ -189,6 +187,23 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>> execute(HoodieData<HoodieRec
return result;
}

/**
* Prepares workload profile.
* @param inputRecordsWithClusteringUpdate input records of interest.
* @return {@link WorkloadProfile} thus prepared.
*/
protected WorkloadProfile prepareWorkloadProfile(HoodieData<HoodieRecord<T>> inputRecordsWithClusteringUpdate) {
context.setJobStatus(this.getClass().getSimpleName(), "Building workload profile:" + config.getTableName());
WorkloadProfile workloadProfile =
new WorkloadProfile(buildProfile(inputRecordsWithClusteringUpdate), operationType, table.getIndex().canIndexLogFiles());
LOG.debug("Input workload profile :{}", workloadProfile);
return workloadProfile;
}

protected boolean shouldPersistInputRecords(JavaRDD<HoodieRecord<T>> inputRDD) {
return !config.isSourceRddPersisted() || inputRDD.getStorageLevel() == StorageLevel.NONE();
}

/**
* Count the number of updates/inserts for each file in each partition.
*/
Expand Down Expand Up @@ -237,7 +252,7 @@ protected Partitioner getPartitioner(WorkloadProfile profile) {
}
}

private HoodieData<WriteStatus> mapPartitionsAsRDD(HoodieData<HoodieRecord<T>> dedupedRecords, Partitioner partitioner) {
protected HoodieData<WriteStatus> mapPartitionsAsRDD(HoodieData<HoodieRecord<T>> dedupedRecords, Partitioner partitioner) {
JavaPairRDD<Tuple2<HoodieKey, Option<HoodieRecordLocation>>, HoodieRecord<T>> mappedRDD = HoodieJavaPairRDD.getJavaPairRDD(
dedupedRecords.mapToPair(record -> Pair.of(new Tuple2<>(record.getKey(), Option.ofNullable(record.getCurrentLocation())), record)));

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.table.action.commit;

import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.WorkloadStat;
import org.apache.hudi.table.action.deltacommit.SparkUpsertPreppedDeltaCommitActionExecutor;

import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.storage.StorageLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.apache.hudi.metadata.MetadataPartitionType.FILES;

/**
* Upsert commit action executor for Metadata table.
*
* @param <T>
*/
public class SparkMetadataTableUpsertCommitActionExecutor<T> extends SparkUpsertPreppedDeltaCommitActionExecutor<T> {
private static final Logger LOG = LoggerFactory.getLogger(SparkMetadataTableUpsertCommitActionExecutor.class);

private static final HashMap<String, WorkloadStat> EMPTY_MAP = new HashMap<>();
private static final WorkloadStat PLACEHOLDER_GLOBAL_STAT = new WorkloadStat();

private final List<Pair<String, String>> mdtPartitionPathFileGroupIdList;

public SparkMetadataTableUpsertCommitActionExecutor(HoodieSparkEngineContext context, HoodieWriteConfig config, HoodieTable table, String instantTime,
HoodieData<HoodieRecord<T>> preppedRecords, List<Pair<String, String>> mdtPartitionPathFileGroupIdList) {
super(context, config, table, instantTime, preppedRecords);
this.mdtPartitionPathFileGroupIdList = mdtPartitionPathFileGroupIdList;
}

@Override
protected boolean shouldPersistInputRecords(JavaRDD<HoodieRecord<T>> inputRDD) {
return inputRDD.getStorageLevel() == StorageLevel.NONE();
}

@Override
protected WorkloadProfile prepareWorkloadProfile(HoodieData<HoodieRecord<T>> inputRecordsWithClusteringUpdate) {
// create workload profile only when we are writing to FILES partition in Metadata table.
WorkloadProfile workloadProfile = new WorkloadProfile(Pair.of(EMPTY_MAP, PLACEHOLDER_GLOBAL_STAT));
return workloadProfile;
}

protected void saveWorkloadProfileMetadataToInflight(WorkloadProfile profile, String instantTime)
throws HoodieCommitException {
// with streaming writes support, we might write to metadata table multiple times for the same instant times.
// ie. writeClient.startCommit(t1), writeClient.upsert(batch1, t1), writeClient.upsert(batch2, t1), writeClient.commit(t1, ...)
// So, here we are generating inflight file only in the last known writes, which we know will only have FILES partition.
if (mdtPartitionPathFileGroupIdList.size() == 1 && mdtPartitionPathFileGroupIdList.get(0).getKey().equals(FILES.getPartitionPath())) {
super.saveWorkloadProfileMetadataToInflight(profile, instantTime);
}
}

@Override
protected Partitioner getPartitioner(WorkloadProfile profile) {
List<BucketInfo> bucketInfoList = new ArrayList<>();
Map<String, Integer> fileIdToSparkPartitionIndexMap = new HashMap<>();
int counter = 0;
while (counter < mdtPartitionPathFileGroupIdList.size()) {
Pair<String, String> partitionPathFileIdPair = mdtPartitionPathFileGroupIdList.get(counter);
fileIdToSparkPartitionIndexMap.put(partitionPathFileIdPair.getValue(), counter);
bucketInfoList.add(new BucketInfo(BucketType.UPDATE, partitionPathFileIdPair.getValue(), partitionPathFileIdPair.getKey()));
counter++;
}
return new SparkMetadataTableUpsertPartitioner(bucketInfoList, fileIdToSparkPartitionIndexMap);
}

@Override
protected HoodieData<HoodieRecord<T>> clusteringHandleUpdate(HoodieData<HoodieRecord<T>> inputRecords) {
return inputRecords;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.table.action.commit;

import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.util.Option;

import java.util.List;
import java.util.Map;

import scala.Tuple2;

/**
* Upsert Partitioner to be used for metadata table in spark. All records are prepped (location known) already wrt metadata table. So, we could optimize the upsert partitioner by avoiding certain
* unnecessary computations.
* @param <T>
*/
public class SparkMetadataTableUpsertPartitioner<T> extends SparkHoodiePartitioner<T> {

private final List<BucketInfo> bucketInfoList;
private final int totalPartitions;
private final Map<String, Integer> fileIdToSparkPartitionIndexMap;

public SparkMetadataTableUpsertPartitioner(List<BucketInfo> bucketInfoList, Map<String, Integer> fileIdToSparkPartitionIndexMap) {
super(null, null); // passing null since these are never used from {@link SparkHoodiePartitioner}.
this.bucketInfoList = bucketInfoList;
this.totalPartitions = bucketInfoList.size();
this.fileIdToSparkPartitionIndexMap = fileIdToSparkPartitionIndexMap;
}

@Override
public int numPartitions() {
return totalPartitions;
}

@Override
public int getPartition(Object key) {
// all records to metadata table are prepped. So, we just fetch the fileId from the incoming key and lookup in the map we constructed
// to find the index.
Tuple2<HoodieKey, Option<HoodieRecordLocation>> keyLocation = (Tuple2<HoodieKey, Option<HoodieRecordLocation>>) key;
HoodieRecordLocation location = keyLocation._2().get();
return fileIdToSparkPartitionIndexMap.get(location.getFileId());
}

@Override
public BucketInfo getBucketInfo(int bucketNumber) {
return bucketInfoList.get(bucketNumber);
}
}
Loading
Loading