Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-9025] perf: improve append performance by reducing avro schema comparisons #12839

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -829,6 +829,13 @@ public class HoodieWriteConfig extends HoodieConfig {
.sinceVersion("1.0.0")
.withDocumentation("Whether to enable incremental table service. So far Clustering and Compaction support incremental processing.");

public static final ConfigProperty<Boolean> ENGINE_SPECIFIC_SCHEMA_OPTIMIZED_ENABLE = ConfigProperty
.key("hoodie.write.schema.engine.specific.optimized.enabled")
.defaultValue(true)
.markAdvanced()
.sinceVersion("1.0.0")
.withDocumentation("Whether to prepend meta fields with engine specific schema");

/**
* Config key with boolean value that indicates whether record being written during MERGE INTO Spark SQL
* operation are already prepped.
Expand Down Expand Up @@ -2884,6 +2891,10 @@ public int getSecondaryIndexParallelism() {
return metadataConfig.getSecondaryIndexParallelism();
}

public boolean isEngineSpecificSchemaOptimizedEnable() {
return getBoolean(ENGINE_SPECIFIC_SCHEMA_OPTIMIZED_ENABLE);
}

public static class Builder {

protected final HoodieWriteConfig writeConfig = new HoodieWriteConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.DeleteRecord;
import org.apache.hudi.common.model.EngineSpecificRecord;
import org.apache.hudi.common.model.EngineSpecificRecordSchema;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
Expand Down Expand Up @@ -134,6 +136,9 @@ public class HoodieAppendHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O

private final Properties recordProperties = new Properties();

private EngineSpecificRecordSchema engineSpecificWriterSchema;
private EngineSpecificRecordSchema engineSpecificWriterSchemaWithMetaFields;

/**
* This is used by log compaction only.
*/
Expand Down Expand Up @@ -246,6 +251,11 @@ private void init(HoodieRecord record) {
String instantTime = config.getWriteVersion().greaterThanOrEquals(HoodieTableVersion.EIGHT)
? getInstantTimeForLogFile(record) : deltaWriteStat.getPrevCommit();
this.writer = createLogWriter(instantTime, fileSliceOpt);
if (record instanceof EngineSpecificRecord) {
EngineSpecificRecord engineSpecificRecord = (EngineSpecificRecord) record;
engineSpecificWriterSchema = engineSpecificRecord.getEngineSpecificSchema(writeSchema);
engineSpecificWriterSchemaWithMetaFields = engineSpecificRecord.getEngineSpecificSchema(writeSchemaWithMetaFields);
}
} catch (Exception e) {
LOG.error("Error in update task at commit " + instantTime, e);
writeStatus.setGlobalError(e);
Expand Down Expand Up @@ -283,6 +293,7 @@ protected boolean isUpdateRecord(HoodieRecord<T> hoodieRecord) {
private Option<HoodieRecord> prepareRecord(HoodieRecord<T> hoodieRecord) {
Option<Map<String, String>> recordMetadata = hoodieRecord.getMetadata();
Schema schema = useWriterSchema ? writeSchemaWithMetaFields : writeSchema;
EngineSpecificRecordSchema engineSpecificRecordSchema = useWriterSchema ? engineSpecificWriterSchemaWithMetaFields : engineSpecificWriterSchema;
try {
// Pass the isUpdateRecord to the props for HoodieRecordPayload to judge
// Whether it is an update or insert record.
Expand All @@ -302,8 +313,12 @@ private Option<HoodieRecord> prepareRecord(HoodieRecord<T> hoodieRecord) {

// Prepend meta-fields into the record
MetadataValues metadataValues = populateMetadataFields(finalRecord);
HoodieRecord populatedRecord =
finalRecord.prependMetaFields(schema, writeSchemaWithMetaFields, metadataValues, recordProperties);
HoodieRecord populatedRecord;
if (engineSpecificRecordSchema != null && config.isEngineSpecificSchemaOptimizedEnable()) {
populatedRecord = ((EngineSpecificRecord) finalRecord).prependMetaFields(engineSpecificRecordSchema, engineSpecificWriterSchemaWithMetaFields, metadataValues, recordProperties);
} else {
populatedRecord = finalRecord.prependMetaFields(schema, writeSchemaWithMetaFields, metadataValues, recordProperties);
}

// NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific
// payload pointing into a shared, mutable (underlying) buffer we get a clean copy of
Expand Down Expand Up @@ -620,7 +635,12 @@ private void writeToBuffer(HoodieRecord<T> record) {
record.seal();
}
// fetch the ordering val first in case the record was deflated.
final Comparable<?> orderingVal = record.getOrderingValue(writeSchema, recordProperties);
Comparable<?> orderingVal;
if (engineSpecificWriterSchema != null && config.isEngineSpecificSchemaOptimizedEnable()) {
orderingVal = ((EngineSpecificRecord) record).getOrderingValue(engineSpecificWriterSchema, recordProperties);
} else {
orderingVal = record.getOrderingValue(writeSchema, recordProperties);
}
Option<HoodieRecord> indexedRecord = prepareRecord(record);
if (indexedRecord.isPresent()) {
// Skip the ignored record.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
* </ul>
*
*/
public class HoodieSparkRecord extends HoodieRecord<InternalRow> {
public class HoodieSparkRecord extends HoodieRecord<InternalRow> implements EngineSpecificRecord<StructType> {

/**
* Record copy operation to avoid double copying. InternalRow do not need to copy twice.
Expand Down Expand Up @@ -468,4 +468,35 @@ private static Object getValue(StructType structType, String fieldName, Internal
throw new HoodieException(String.format("Field at %s is not present in %s", fieldName, structType));
}
}

@Override
public HoodieRecord prependMetaFields(EngineSpecificRecordSchema<StructType> recordSchema, EngineSpecificRecordSchema<StructType> targetSchema, MetadataValues metadataValues, Properties props) {
HoodieInternalRow updatableRow = wrapIntoUpdatableOverlay(this.data, recordSchema.getSchema());
updateMetadataValuesInternal(updatableRow, metadataValues);

return new HoodieSparkRecord(getKey(), updatableRow, targetSchema.getSchema(), getOperation(), this.currentLocation, this.newLocation, false);
}

@Override
public Comparable<?> getOrderingValue(EngineSpecificRecordSchema<StructType> recordSchema, Properties props) {
String orderingField = ConfigUtils.getOrderingField(props);
scala.Option<NestedFieldPath> cachedNestedFieldPath =
HoodieInternalRowUtils.getCachedPosList(recordSchema.getSchema(), orderingField);
if (cachedNestedFieldPath.isDefined()) {
NestedFieldPath nestedFieldPath = cachedNestedFieldPath.get();
return (Comparable<?>) HoodieUnsafeRowUtils.getNestedInternalRowValue(data, nestedFieldPath);
} else {
return 0;
}
}

@Override
public EngineSpecificRecordSchema<StructType> getEngineSpecificSchema(Schema schema) {
return new EngineSpecificRecordSchema<StructType>(schema) {
@Override
StructType parseSchemaFromAvro(Schema avroSchema) {
return HoodieInternalRowUtils.getCachedSchema(avroSchema);
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.common.model;

import org.apache.avro.Schema;

import java.util.Properties;

public interface EngineSpecificRecord<S/*engine specific schema*/> {

HoodieRecord prependMetaFields(EngineSpecificRecordSchema<S> recordSchema, EngineSpecificRecordSchema<S> targetSchema, MetadataValues metadataValues, Properties props);

Comparable<?> getOrderingValue(EngineSpecificRecordSchema<S> recordSchema, Properties props);

EngineSpecificRecordSchema<S> getEngineSpecificSchema(Schema schema);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.common.model;

import org.apache.avro.Schema;

import java.io.Serializable;

public abstract class EngineSpecificRecordSchema<S> implements Serializable {

S schema;
Schema avroSchema;

public EngineSpecificRecordSchema(Schema avroSchema) {
this.avroSchema = avroSchema;
this.schema = parseSchemaFromAvro(avroSchema);
}

public S getSchema() {
return schema;
}

abstract S parseSchemaFromAvro(Schema avroSchema);
}
Loading