diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index e57e9b29a834..e3bfa68449fc 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -829,6 +829,13 @@ public class HoodieWriteConfig extends HoodieConfig { .sinceVersion("1.0.0") .withDocumentation("Whether to enable incremental table service. So far Clustering and Compaction support incremental processing."); + public static final ConfigProperty ENGINE_SPECIFIC_SCHEMA_OPTIMIZED_ENABLE = ConfigProperty + .key("hoodie.write.schema.engine.specific.optimized.enabled") + .defaultValue(true) + .markAdvanced() + .sinceVersion("1.0.0") + .withDocumentation("Whether to prepend meta fields with engine specific schema"); + /** * Config key with boolean value that indicates whether record being written during MERGE INTO Spark SQL * operation are already prepped. @@ -2884,6 +2891,10 @@ public int getSecondaryIndexParallelism() { return metadataConfig.getSecondaryIndexParallelism(); } + public boolean isEngineSpecificSchemaOptimizedEnable() { + return getBoolean(ENGINE_SPECIFIC_SCHEMA_OPTIMIZED_ENABLE); + } + public static class Builder { protected final HoodieWriteConfig writeConfig = new HoodieWriteConfig(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java index 038963168ce1..c723f04404e9 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java @@ -26,6 +26,8 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.BaseFile; import org.apache.hudi.common.model.DeleteRecord; +import org.apache.hudi.common.model.EngineSpecificRecord; +import org.apache.hudi.common.model.EngineSpecificRecordSchema; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieColumnRangeMetadata; @@ -134,6 +136,9 @@ public class HoodieAppendHandle extends HoodieWriteHandle hoodieRecord) { private Option prepareRecord(HoodieRecord hoodieRecord) { Option> recordMetadata = hoodieRecord.getMetadata(); Schema schema = useWriterSchema ? writeSchemaWithMetaFields : writeSchema; + EngineSpecificRecordSchema engineSpecificRecordSchema = useWriterSchema ? engineSpecificWriterSchemaWithMetaFields : engineSpecificWriterSchema; try { // Pass the isUpdateRecord to the props for HoodieRecordPayload to judge // Whether it is an update or insert record. @@ -302,8 +313,12 @@ private Option prepareRecord(HoodieRecord hoodieRecord) { // Prepend meta-fields into the record MetadataValues metadataValues = populateMetadataFields(finalRecord); - HoodieRecord populatedRecord = - finalRecord.prependMetaFields(schema, writeSchemaWithMetaFields, metadataValues, recordProperties); + HoodieRecord populatedRecord; + if (engineSpecificRecordSchema != null && config.isEngineSpecificSchemaOptimizedEnable()) { + populatedRecord = ((EngineSpecificRecord) finalRecord).prependMetaFields(engineSpecificRecordSchema, engineSpecificWriterSchemaWithMetaFields, metadataValues, recordProperties); + } else { + populatedRecord = finalRecord.prependMetaFields(schema, writeSchemaWithMetaFields, metadataValues, recordProperties); + } // NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific // payload pointing into a shared, mutable (underlying) buffer we get a clean copy of @@ -620,7 +635,12 @@ private void writeToBuffer(HoodieRecord record) { record.seal(); } // fetch the ordering val first in case the record was deflated. - final Comparable orderingVal = record.getOrderingValue(writeSchema, recordProperties); + Comparable orderingVal; + if (engineSpecificWriterSchema != null && config.isEngineSpecificSchemaOptimizedEnable()) { + orderingVal = ((EngineSpecificRecord) record).getOrderingValue(engineSpecificWriterSchema, recordProperties); + } else { + orderingVal = record.getOrderingValue(writeSchema, recordProperties); + } Option indexedRecord = prepareRecord(record); if (indexedRecord.isPresent()) { // Skip the ignored record. diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java index a7d78f5496f2..95fab5844c58 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java @@ -73,7 +73,7 @@ * * */ -public class HoodieSparkRecord extends HoodieRecord { +public class HoodieSparkRecord extends HoodieRecord implements EngineSpecificRecord { /** * Record copy operation to avoid double copying. InternalRow do not need to copy twice. @@ -468,4 +468,35 @@ private static Object getValue(StructType structType, String fieldName, Internal throw new HoodieException(String.format("Field at %s is not present in %s", fieldName, structType)); } } + + @Override + public HoodieRecord prependMetaFields(EngineSpecificRecordSchema recordSchema, EngineSpecificRecordSchema targetSchema, MetadataValues metadataValues, Properties props) { + HoodieInternalRow updatableRow = wrapIntoUpdatableOverlay(this.data, recordSchema.getSchema()); + updateMetadataValuesInternal(updatableRow, metadataValues); + + return new HoodieSparkRecord(getKey(), updatableRow, targetSchema.getSchema(), getOperation(), this.currentLocation, this.newLocation, false); + } + + @Override + public Comparable getOrderingValue(EngineSpecificRecordSchema recordSchema, Properties props) { + String orderingField = ConfigUtils.getOrderingField(props); + scala.Option cachedNestedFieldPath = + HoodieInternalRowUtils.getCachedPosList(recordSchema.getSchema(), orderingField); + if (cachedNestedFieldPath.isDefined()) { + NestedFieldPath nestedFieldPath = cachedNestedFieldPath.get(); + return (Comparable) HoodieUnsafeRowUtils.getNestedInternalRowValue(data, nestedFieldPath); + } else { + return 0; + } + } + + @Override + public EngineSpecificRecordSchema getEngineSpecificSchema(Schema schema) { + return new EngineSpecificRecordSchema(schema) { + @Override + StructType parseSchemaFromAvro(Schema avroSchema) { + return HoodieInternalRowUtils.getCachedSchema(avroSchema); + } + }; + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/EngineSpecificRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/EngineSpecificRecord.java new file mode 100644 index 000000000000..08057002d5dc --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/EngineSpecificRecord.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.model; + +import org.apache.avro.Schema; + +import java.util.Properties; + +public interface EngineSpecificRecord { + + HoodieRecord prependMetaFields(EngineSpecificRecordSchema recordSchema, EngineSpecificRecordSchema targetSchema, MetadataValues metadataValues, Properties props); + + Comparable getOrderingValue(EngineSpecificRecordSchema recordSchema, Properties props); + + EngineSpecificRecordSchema getEngineSpecificSchema(Schema schema); + +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/EngineSpecificRecordSchema.java b/hudi-common/src/main/java/org/apache/hudi/common/model/EngineSpecificRecordSchema.java new file mode 100644 index 000000000000..95e0af0b834b --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/EngineSpecificRecordSchema.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.model; + +import org.apache.avro.Schema; + +import java.io.Serializable; + +public abstract class EngineSpecificRecordSchema implements Serializable { + + S schema; + Schema avroSchema; + + public EngineSpecificRecordSchema(Schema avroSchema) { + this.avroSchema = avroSchema; + this.schema = parseSchemaFromAvro(avroSchema); + } + + public S getSchema() { + return schema; + } + + abstract S parseSchemaFromAvro(Schema avroSchema); +}