Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-9022] Handle records with custom delete markers for MOR #12843

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.hudi.avro.AvroSchemaUtils.isNullable
import org.apache.hudi.common.engine.HoodieReaderContext
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.read.HoodiePositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME
import org.apache.hudi.common.table.read.PositionBasedHoodieFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.common.util.collection.{CachingIterator, ClosableIterator}
import org.apache.hudi.io.storage.{HoodieSparkFileReaderFactory, HoodieSparkParquetReader}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,15 @@
import static org.apache.hudi.common.config.HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH;
import static org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_PARTITION_PATH;
import static org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_RECORD_KEY;
import static org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY;
import static org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
import static org.apache.hudi.common.model.HoodieRecord.DEFAULT_ORDERING_VALUE;
import static org.apache.hudi.common.model.HoodieRecord.HOODIE_IS_DELETED_FIELD;
import static org.apache.hudi.common.model.HoodieRecord.OPERATION_METADATA_FIELD;
import static org.apache.hudi.common.model.HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID;
import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.INSTANT_TIME;

public abstract class HoodieBaseFileGroupRecordBuffer<T> implements HoodieFileGroupRecordBuffer<T> {
public abstract class BaseHoodieFileGroupRecordBuffer<T> implements HoodieFileGroupRecordBuffer<T> {
protected final HoodieReaderContext<T> readerContext;
protected final Schema readerSchema;
protected final Option<String> orderingFieldName;
Expand All @@ -96,14 +98,17 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> implements HoodieFileGr
protected boolean enablePartialMerging = false;
protected InternalSchema internalSchema;
protected HoodieTableMetaClient hoodieTableMetaClient;

public HoodieBaseFileGroupRecordBuffer(HoodieReaderContext<T> readerContext,
HoodieTableMetaClient hoodieTableMetaClient,
RecordMergeMode recordMergeMode,
Option<String> partitionNameOverrideOpt,
Option<String[]> partitionPathFieldOpt,
TypedProperties props,
HoodieReadStats readStats) {
protected boolean shouldCheckCustomDeleteMarker = false;
protected String customDeleteMarkerKey;
protected String customDeleteMarkerValue;

protected BaseHoodieFileGroupRecordBuffer(HoodieReaderContext<T> readerContext,
HoodieTableMetaClient hoodieTableMetaClient,
RecordMergeMode recordMergeMode,
Option<String> partitionNameOverrideOpt,
Option<String[]> partitionPathFieldOpt,
TypedProperties props,
HoodieReadStats readStats) {
this.readerContext = readerContext;
this.readerSchema = readerContext.getSchemaHandler().getRequiredSchema();
this.partitionNameOverrideOpt = partitionNameOverrideOpt;
Expand Down Expand Up @@ -142,6 +147,29 @@ public HoodieBaseFileGroupRecordBuffer(HoodieReaderContext<T> readerContext,
} catch (IOException e) {
throw new HoodieIOException("IOException when creating ExternalSpillableMap at " + spillableMapBasePath, e);
}
this.shouldCheckCustomDeleteMarker = hasCustomDeleteConfigs(props, readerSchema);
if (shouldCheckCustomDeleteMarker) {
this.customDeleteMarkerKey = props.getProperty(DELETE_KEY);
this.customDeleteMarkerValue = props.getProperty(DELETE_MARKER);
}
}

protected boolean hasCustomDeleteConfigs(TypedProperties props, Schema schema) {
// DELETE_KEY and DELETE_MARKER both should be set.
if (StringUtils.isNullOrEmpty(props.getProperty(DELETE_KEY))
|| StringUtils.isNullOrEmpty(props.getProperty(DELETE_MARKER))) {
return false;
}
// Schema should have the DELETE_KEY field.
String deleteKeyField = props.getProperty(DELETE_KEY);
return schema.getField(deleteKeyField) != null;
}

protected boolean isCustomDeleteRecord(T record) {
Object deleteMarkerValue =
readerContext.getValue(record, readerSchema, customDeleteMarkerKey);
return deleteMarkerValue != null
&& customDeleteMarkerValue.equals(deleteMarkerValue.toString());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,14 +151,14 @@ private static HoodieFileGroupRecordBuffer getRecordBuffer(HoodieReaderContext r
if (hasNoLogFiles) {
return null;
} else if (isSkipMerge) {
return new HoodieUnmergedFileGroupRecordBuffer<>(
return new UnmergedHoodieFileGroupRecordBuffer<>(
readerContext, hoodieTableMetaClient, recordMergeMode, Option.empty(), Option.empty(), props, readStats);
} else if (shouldUseRecordPosition && baseFileOption.isPresent()) {
return new HoodiePositionBasedFileGroupRecordBuffer<>(
return new PositionBasedHoodieFileGroupRecordBuffer<>(
readerContext, hoodieTableMetaClient, recordMergeMode, Option.empty(),
Option.empty(), baseFileOption.get().getCommitTime(), props, readStats);
} else {
return new HoodieKeyBasedFileGroupRecordBuffer<>(
return new KeyBasedHoodieFileGroupRecordBuffer<>(
readerContext, hoodieTableMetaClient, recordMergeMode, Option.empty(), Option.empty(), props, readStats);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ public Pair<List<Schema.Field>,List<Schema.Field>> getBootstrapDataFields() {
static Pair<List<Schema.Field>, List<Schema.Field>> getDataAndMetaCols(Schema schema) {
Map<Boolean, List<Schema.Field>> fieldsByMeta = schema.getFields().stream()
//if there are no data fields, then we don't want to think the temp col is a data col
.filter(f -> !Objects.equals(f.name(), HoodiePositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME))
.filter(f -> !Objects.equals(f.name(), PositionBasedHoodieFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME))
.collect(Collectors.partitioningBy(f -> HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.contains(f.name())));
return Pair.of(fieldsByMeta.getOrDefault(true, Collections.emptyList()),
fieldsByMeta.getOrDefault(false, Collections.emptyList()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import java.util.List;

import static org.apache.hudi.avro.AvroSchemaUtils.appendFieldsToSchemaDedupNested;
import static org.apache.hudi.common.table.read.HoodiePositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME;
import static org.apache.hudi.common.table.read.PositionBasedHoodieFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME;

/**
* This class is responsible for handling the schema for the file group reader that supports positional merge.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.model.DeleteRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.KeySpec;
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
Expand All @@ -40,15 +41,18 @@
import java.util.Iterator;
import java.util.Map;

import static org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_PARTITION_PATH;
import static org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_RECORD_KEY;

/**
* A buffer that is used to store log records by {@link org.apache.hudi.common.table.log.HoodieMergedLogRecordReader}
* by calling the {@link #processDataBlock} and {@link #processDeleteBlock} methods into a record key based map.
* The records from the base file is accessed from an iterator object. These records are merged when the
* {@link #hasNext} method is called.
*/
public class HoodieKeyBasedFileGroupRecordBuffer<T> extends HoodieBaseFileGroupRecordBuffer<T> {
public class KeyBasedHoodieFileGroupRecordBuffer<T> extends BaseHoodieFileGroupRecordBuffer<T> {

public HoodieKeyBasedFileGroupRecordBuffer(HoodieReaderContext<T> readerContext,
public KeyBasedHoodieFileGroupRecordBuffer(HoodieReaderContext<T> readerContext,
HoodieTableMetaClient hoodieTableMetaClient,
RecordMergeMode recordMergeMode,
Option<String> partitionNameOverrideOpt,
Expand Down Expand Up @@ -79,7 +83,12 @@ public void processDataBlock(HoodieDataBlock dataBlock, Option<KeySpec> keySpecO
Map<String, Object> metadata = readerContext.generateMetadataForRecord(
nextRecord, recordsIteratorSchemaPair.getRight());
String recordKey = (String) metadata.get(HoodieReaderContext.INTERNAL_META_RECORD_KEY);
processNextDataRecord(nextRecord, metadata, recordKey);

if (shouldCheckCustomDeleteMarker && isCustomDeleteRecord(nextRecord)) {
processCustomDeleteRecord(nextRecord, metadata);
} else {
processNextDataRecord(nextRecord, metadata, recordKey);
}
}
}
}
Expand Down Expand Up @@ -120,6 +129,18 @@ public void processNextDeletedRecord(DeleteRecord deleteRecord, Serializable rec
}
}

protected void processCustomDeleteRecord(T record, Map<String, Object> metadata) {
DeleteRecord deleteRecord = DeleteRecord.create(
new HoodieKey(
(String) metadata.get(INTERNAL_META_RECORD_KEY),
(String) metadata.get(INTERNAL_META_PARTITION_PATH)),
readerContext.getOrderingValue(
Option.of(record), metadata, readerSchema, orderingFieldName));
processNextDeletedRecord(
deleteRecord,
(String) metadata.get(INTERNAL_META_RECORD_KEY));
}

@Override
public boolean containsLogRecord(String recordKey) {
return records.containsKey(recordKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.model.DeleteRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.KeySpec;
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
Expand Down Expand Up @@ -51,6 +52,7 @@
import java.util.Set;
import java.util.function.Function;

import static org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_PARTITION_PATH;
import static org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_RECORD_KEY;
import static org.apache.hudi.common.model.HoodieRecord.DEFAULT_ORDERING_VALUE;

Expand All @@ -60,16 +62,16 @@
* Here the position means that record position in the base file. The records from the base file is accessed from an iterator object. These records are merged when the
* {@link #hasNext} method is called.
*/
public class HoodiePositionBasedFileGroupRecordBuffer<T> extends HoodieKeyBasedFileGroupRecordBuffer<T> {
private static final Logger LOG = LoggerFactory.getLogger(HoodiePositionBasedFileGroupRecordBuffer.class);
public class PositionBasedHoodieFileGroupRecordBuffer<T> extends KeyBasedHoodieFileGroupRecordBuffer<T> {
private static final Logger LOG = LoggerFactory.getLogger(PositionBasedHoodieFileGroupRecordBuffer.class);

private static final String ROW_INDEX_COLUMN_NAME = "row_index";
public static final String ROW_INDEX_TEMPORARY_COLUMN_NAME = "_tmp_metadata_" + ROW_INDEX_COLUMN_NAME;
protected final String baseFileInstantTime;
private long nextRecordPosition = 0L;
private boolean needToDoHybridStrategy = false;

public HoodiePositionBasedFileGroupRecordBuffer(HoodieReaderContext<T> readerContext,
public PositionBasedHoodieFileGroupRecordBuffer(HoodieReaderContext<T> readerContext,
HoodieTableMetaClient hoodieTableMetaClient,
RecordMergeMode recordMergeMode,
Option<String> partitionNameOverrideOpt,
Expand Down Expand Up @@ -132,13 +134,14 @@ public void processDataBlock(HoodieDataBlock dataBlock, Option<KeySpec> keySpecO
}

long recordPosition = recordPositions.get(recordIndex++);

T evolvedNextRecord = schemaTransformerWithEvolvedSchema.getLeft().apply(nextRecord);
processNextDataRecord(
evolvedNextRecord,
readerContext.generateMetadataForRecord(evolvedNextRecord, schemaTransformerWithEvolvedSchema.getRight()),
recordPosition
);
Map<String, Object> metadata = readerContext.generateMetadataForRecord(
evolvedNextRecord, schemaTransformerWithEvolvedSchema.getRight());
if (shouldCheckCustomDeleteMarker && isCustomDeleteRecord(evolvedNextRecord)) {
processCustomDeleteRecord(evolvedNextRecord, metadata, recordPosition);
} else {
processNextDataRecord(evolvedNextRecord, metadata, recordPosition);
}
}
}
}
Expand Down Expand Up @@ -199,6 +202,16 @@ public void processDeleteBlock(HoodieDeleteBlock deleteBlock) throws IOException
}
}

protected void processCustomDeleteRecord(T record, Map<String, Object> metadata, long recordPosition) {
DeleteRecord deleteRecord = DeleteRecord.create(
new HoodieKey(
(String) metadata.get(INTERNAL_META_RECORD_KEY),
(String) metadata.get(INTERNAL_META_PARTITION_PATH)),
readerContext.getOrderingValue(
Option.of(record), metadata, readerSchema, orderingFieldName));
processNextDeletedRecord(deleteRecord, recordPosition);
}

@Override
public void processNextDeletedRecord(DeleteRecord deleteRecord, Serializable recordPosition) {
Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair = records.get(recordPosition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@
import java.util.Iterator;
import java.util.Map;

public class HoodieUnmergedFileGroupRecordBuffer<T> extends HoodieBaseFileGroupRecordBuffer<T> {
public class UnmergedHoodieFileGroupRecordBuffer<T> extends BaseHoodieFileGroupRecordBuffer<T> {
// Used to order the records in the record map.
private Long putIndex = 0L;
private Long getIndex = 0L;

public HoodieUnmergedFileGroupRecordBuffer(
public UnmergedHoodieFileGroupRecordBuffer(
HoodieReaderContext<T> readerContext,
HoodieTableMetaClient hoodieTableMetaClient,
RecordMergeMode recordMergeMode,
Expand Down
Loading
Loading