Skip to content

Commit 467eac4

Browse files
fix getField == null checks
1 parent 955f7b7 commit 467eac4

File tree

5 files changed

+11
-11
lines changed

5 files changed

+11
-11
lines changed

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexRecordGenerationUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -344,7 +344,7 @@ public Pair<String, String> next() {
344344

345345
private static HoodieSchema getRequestedSchemaForSecondaryIndex(HoodieTableMetaClient metaClient, HoodieSchema tableSchema, String secondaryKeyField) {
346346
String[] recordKeyFields;
347-
if (tableSchema.getField(RECORD_KEY_METADATA_FIELD) != null) {
347+
if (tableSchema.getField(RECORD_KEY_METADATA_FIELD).isPresent()) {
348348
recordKeyFields = new String[] {RECORD_KEY_METADATA_FIELD};
349349
} else {
350350
recordKeyFields = metaClient.getTableConfig().getRecordKeyFields().orElse(new String[0]);

hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.apache.hudi.common.util.collection.Pair;
3232
import org.apache.hudi.storage.StorageConfiguration;
3333

34-
import org.apache.avro.Schema;
3534
import org.apache.spark.sql.HoodieInternalRowUtils;
3635
import org.apache.spark.sql.catalyst.InternalRow;
3736
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
@@ -87,10 +86,11 @@ public Option<HoodieRecordMerger> getRecordMerger(RecordMergeMode mergeMode, Str
8786
* @param partitionFieldAndValues the partition fields and their values, if any are required by the reader
8887
* @return a function for transforming the row
8988
*/
90-
protected UnaryOperator<InternalRow> getBootstrapProjection(Schema from, Schema to, List<Pair<String, Object>> partitionFieldAndValues) {
91-
Map<Integer, Object> partitionValuesByIndex = partitionFieldAndValues.stream().collect(Collectors.toMap(pair -> to.getField(pair.getKey()).pos(), Pair::getRight));
89+
protected UnaryOperator<InternalRow> getBootstrapProjection(HoodieSchema from, HoodieSchema to, List<Pair<String, Object>> partitionFieldAndValues) {
90+
Map<Integer, Object> partitionValuesByIndex = partitionFieldAndValues.stream()
91+
.collect(Collectors.toMap(pair -> to.getField(pair.getKey()).orElseThrow(() -> new IllegalArgumentException("Missing field: " + pair.getKey())).pos(), Pair::getRight));
9292
Function1<InternalRow, UnsafeRow> unsafeRowWriter =
93-
HoodieInternalRowUtils.getCachedUnsafeRowWriter(getCachedSchema(from), getCachedSchema(to), Collections.emptyMap(), partitionValuesByIndex);
93+
HoodieInternalRowUtils.getCachedUnsafeRowWriter(getCachedSchema(from.toAvroSchema()), getCachedSchema(to.toAvroSchema()), Collections.emptyMap(), partitionValuesByIndex);
9494
return row -> (InternalRow) unsafeRowWriter.apply(row);
9595
}
9696

hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -152,10 +152,10 @@ class SparkFileFormatInternalRowReaderContext(baseFileReader: SparkColumnarFileR
152152

153153
//If we need to do position based merging with log files we will leave the row index column at the end
154154
val dataProjection = if (getShouldMergeUseRecordPosition) {
155-
getBootstrapProjection(dataRequiredSchema.toAvroSchema, dataRequiredSchema.toAvroSchema, partitionFieldAndValues)
155+
getBootstrapProjection(dataRequiredSchema, dataRequiredSchema, partitionFieldAndValues)
156156
} else {
157-
getBootstrapProjection(dataRequiredSchema.toAvroSchema,
158-
HoodieAvroUtils.removeFields(dataRequiredSchema.toAvroSchema, rowIndexColumn), partitionFieldAndValues)
157+
getBootstrapProjection(dataRequiredSchema,
158+
HoodieSchemaUtils.removeFields(dataRequiredSchema, rowIndexColumn), partitionFieldAndValues)
159159
}
160160

161161
//row index will always be the last column
@@ -209,7 +209,7 @@ class SparkFileFormatInternalRowReaderContext(baseFileReader: SparkColumnarFileR
209209
}
210210
}
211211
} else {
212-
val dataProjection = getBootstrapProjection(dataRequiredSchema.toAvroSchema, dataRequiredSchema.toAvroSchema, partitionFieldAndValues)
212+
val dataProjection = getBootstrapProjection(dataRequiredSchema, dataRequiredSchema, partitionFieldAndValues)
213213
new ClosableIterator[Any] {
214214
val combinedRow = new JoinedRow()
215215

hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ private ClosableIterator<T> makeBootstrapBaseFileIterator(HoodieBaseFile baseFil
195195
List<Pair<String, Object>> filterFieldsAndValues = new ArrayList<>(partitionFields.length);
196196
for (int i = 0; i < partitionFields.length; i++) {
197197
String field = partitionFields[i];
198-
if (dataSchema.getField(field) != null) {
198+
if (dataSchema.getField(field).isPresent()) {
199199
filterFieldsAndValues.add(Pair.of(field, readerContext.getRecordContext().convertPartitionValueToEngineType((Comparable) partitionValues[i])));
200200
}
201201
}

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ public void setSchemaHandler(FileGroupReaderSchemaHandler<RowData> schemaHandler
190190
return;
191191
}
192192
// primary key semantic is lost if not all primary key fields are included in the request schema.
193-
boolean pkSemanticLost = Arrays.stream(recordKeysOpt.get()).anyMatch(k -> schemaHandler.getRequestedSchema().getField(k) == null);
193+
boolean pkSemanticLost = Arrays.stream(recordKeysOpt.get()).anyMatch(k -> schemaHandler.getRequestedSchema().getField(k).isEmpty());
194194
if (pkSemanticLost) {
195195
return;
196196
}

0 commit comments

Comments
 (0)