Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MINOR] HoodieBackedTableMetadataWriter and HoodieTableMetadataUtil code cleanup #12841

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -131,15 +131,15 @@ public String create(
HoodieTimer timer = HoodieTimer.start();
HoodieWriteConfig writeConfig = getWriteConfig();
initJavaSparkContext(Option.of(master));
try (HoodieTableMetadataWriter writer = SparkHoodieBackedTableMetadataWriter.create(HoodieCLI.conf, writeConfig, new HoodieSparkEngineContext(jsc))) {
try (HoodieTableMetadataWriter writer = SparkHoodieBackedTableMetadataWriter.create(writeConfig, new HoodieSparkEngineContext(jsc))) {
return String.format("Created Metadata Table in %s (duration=%.2f secs)", metadataPath, timer.endTimer() / 1000.0);
}
}

@ShellMethod(key = "metadata delete", value = "Remove the Metadata Table")
public String delete(@ShellOption(value = "--backup", help = "Backup the metadata table before delete", defaultValue = "true", arity = 1) final boolean backup) throws Exception {
HoodieTableMetaClient dataMetaClient = HoodieCLI.getTableMetaClient();
String backupPath = HoodieTableMetadataUtil.deleteMetadataTable(dataMetaClient, new HoodieSparkEngineContext(jsc), backup);
String backupPath = HoodieTableMetadataUtil.deleteMetadataTable(dataMetaClient, backup);
if (backup) {
return "Metadata Table has been deleted and backed up to " + backupPath;
} else {
Expand All @@ -150,8 +150,7 @@ public String delete(@ShellOption(value = "--backup", help = "Backup the metadat
@ShellMethod(key = "metadata delete-record-index", value = "Delete the record index from Metadata Table")
public String deleteRecordIndex(@ShellOption(value = "--backup", help = "Backup the record index before delete", defaultValue = "true", arity = 1) final boolean backup) throws Exception {
HoodieTableMetaClient dataMetaClient = HoodieCLI.getTableMetaClient();
String backupPath = HoodieTableMetadataUtil.deleteMetadataTablePartition(dataMetaClient, new HoodieSparkEngineContext(jsc),
MetadataPartitionType.RECORD_INDEX.getPartitionPath(), backup);
String backupPath = HoodieTableMetadataUtil.deleteMetadataTablePartition(dataMetaClient, MetadataPartitionType.RECORD_INDEX.getPartitionPath(), backup);
if (backup) {
return "Record Index has been deleted from the Metadata Table and backed up to " + backupPath;
} else {
Expand All @@ -176,7 +175,7 @@ public String init(@ShellOption(value = "--sparkMaster", defaultValue = SparkUti
if (!readOnly) {
HoodieWriteConfig writeConfig = getWriteConfig();
initJavaSparkContext(Option.of(master));
try (HoodieTableMetadataWriter writer = SparkHoodieBackedTableMetadataWriter.create(HoodieCLI.conf, writeConfig, new HoodieSparkEngineContext(jsc))) {
try (HoodieTableMetadataWriter writer = SparkHoodieBackedTableMetadataWriter.create(writeConfig, new HoodieSparkEngineContext(jsc))) {
// Empty
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public void init() throws Exception {
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
.build();

try (HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create(metaClient.getStorageConf(), config, context)) {
try (HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create(config, context)) {
HoodieTestTable hoodieTestTable = HoodieMetadataTestTable.of(metaClient, metadataWriter, Option.of(context))
.withPartitionMetaFiles(DEFAULT_PARTITION_PATHS)
.addCommit("100")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public void init() throws Exception {
.withRollbackUsingMarkers(false)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();
try (HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create(
metaClient.getStorageConf(), config, context)) {
config, context)) {
HoodieMetadataTestTable.of(metaClient, metadataWriter, Option.of(context))
.withPartitionMetaFiles(DEFAULT_PARTITION_PATHS)
.addCommit("100")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public void testRollbackToSavepointWithMetadataTableEnable() throws Exception {
// then bootstrap metadata table at instant 104
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath(HoodieCLI.basePath)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()).build();
SparkHoodieBackedTableMetadataWriter.create(HoodieCLI.conf, writeConfig, new HoodieSparkEngineContext(jsc)).close();
SparkHoodieBackedTableMetadataWriter.create(writeConfig, new HoodieSparkEngineContext(jsc)).close();

assertTrue(HoodieCLI.storage.exists(metadataTableBasePath));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static java.util.Objects.requireNonNull;
import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_POPULATE_META_FIELDS;
import static org.apache.hudi.common.table.HoodieTableConfig.TIMELINE_HISTORY_PATH;
import static org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED;
Expand Down Expand Up @@ -164,30 +165,26 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM
/**
* Hudi backed table metadata writer.
*
* @param storageConf Storage configuration to use for the metadata writer
* @param writeConfig Writer config
* @param failedWritesCleaningPolicy Cleaning policy on failed writes
* @param engineContext Engine context
* @param inflightInstantTimestamp Timestamp of any instant in progress
*/
protected HoodieBackedTableMetadataWriter(StorageConfiguration<?> storageConf,
HoodieWriteConfig writeConfig,
protected HoodieBackedTableMetadataWriter(HoodieWriteConfig writeConfig,
HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
HoodieEngineContext engineContext,
Option<String> inflightInstantTimestamp) {
HoodieEngineContext engineContext) {
this.dataWriteConfig = writeConfig;
this.engineContext = engineContext;
this.storageConf = storageConf;
this.storageConf = engineContext.getStorageConf();
this.metrics = Option.empty();
this.dataMetaClient = HoodieTableMetaClient.builder().setConf(storageConf.newInstance())
this.dataMetaClient = HoodieTableMetaClient.builder().setConf(engineContext.getStorageConf().newInstance())
.setBasePath(dataWriteConfig.getBasePath())
.setTimeGeneratorConfig(dataWriteConfig.getTimeGeneratorConfig()).build();
this.enabledPartitionTypes = getEnabledPartitions(dataWriteConfig.getProps(), dataMetaClient);
if (writeConfig.isMetadataTableEnabled()) {
this.metadataWriteConfig = createMetadataWriteConfig(writeConfig, failedWritesCleaningPolicy);
try {
initRegistry();
initialized = initializeIfNeeded(dataMetaClient, inflightInstantTimestamp);
initialized = initializeIfNeeded(dataMetaClient);
} catch (IOException e) {
LOG.error("Failed to initialize metadata table", e);
}
Expand Down Expand Up @@ -243,11 +240,9 @@ public List<MetadataPartitionType> getEnabledPartitionTypes() {
* Initialize the metadata table if needed.
*
* @param dataMetaClient - meta client for the data table
* @param inflightInstantTimestamp - timestamp of an instant in progress on the dataset
* @throws IOException on errors
*/
protected boolean initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
Option<String> inflightInstantTimestamp) throws IOException {
protected boolean initializeIfNeeded(HoodieTableMetaClient dataMetaClient) throws IOException {
HoodieTimer timer = HoodieTimer.start();
List<MetadataPartitionType> metadataPartitionsToInit = new ArrayList<>(MetadataPartitionType.getValidValues().length);

Expand Down Expand Up @@ -315,7 +310,7 @@ private boolean metadataTableExists(HoodieTableMetaClient dataMetaClient) throws
if (reInitialize) {
metrics.ifPresent(m -> m.incrementMetric(HoodieMetadataMetrics.REBOOTSTRAP_STR, 1));
LOG.info("Deleting Metadata Table directory so that it can be re-initialized");
HoodieTableMetadataUtil.deleteMetadataTable(dataMetaClient, engineContext, false);
HoodieTableMetadataUtil.deleteMetadataTable(dataMetaClient, false);
exists = false;
}

Expand Down Expand Up @@ -556,7 +551,7 @@ private Pair<List<String>, Pair<Integer, HoodieData<HoodieRecord>>> initializeCo

// during initialization, we need stats for base and log files.
HoodieData<HoodieRecord> records = HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
engineContext, Collections.emptyMap(), partitionToFilesMap, dataMetaClient, dataWriteConfig.getMetadataConfig(),
engineContext, Collections.emptyMap(), partitionToFilesMap, dataMetaClient,
dataWriteConfig.getColumnStatsIndexParallelism(),
dataWriteConfig.getMetadataConfig().getMaxReaderBufferSize(),
columnsToIndex);
Expand Down Expand Up @@ -597,25 +592,27 @@ protected abstract HoodieData<HoodieRecord> getExpressionIndexRecords(List<Pair<
private Pair<Integer, HoodieData<HoodieRecord>> initializeExpressionIndexPartition(String indexName, String instantTime) throws Exception {
HoodieIndexDefinition indexDefinition = getIndexDefinition(indexName);
ValidationUtils.checkState(indexDefinition != null, "Expression Index definition is not present for index " + indexName);
List<Pair<String, Pair<String, Long>>> partitionFilePathSizeTriplet = getPartitionFilePathSizeTriplet();
int fileGroupCount = dataWriteConfig.getMetadataConfig().getExpressionIndexFileGroupCount();
int parallelism = Math.min(partitionFilePathSizeTriplet.size(), dataWriteConfig.getMetadataConfig().getExpressionIndexParallelism());
Schema readerSchema = getProjectedSchemaForExpressionIndex(indexDefinition, dataMetaClient);
return Pair.of(fileGroupCount, getExpressionIndexRecords(partitionFilePathSizeTriplet, indexDefinition, dataMetaClient, parallelism, readerSchema, storageConf, instantTime));
}

private List<Pair<String, Pair<String, Long>>> getPartitionFilePathSizeTriplet() throws IOException {
List<Pair<String, FileSlice>> partitionFileSlicePairs = getPartitionFileSlicePairs();
List<Pair<String, Pair<String, Long>>> partitionFilePathSizeTriplet = new ArrayList<>();
partitionFileSlicePairs.forEach(entry -> {
if (entry.getValue().getBaseFile().isPresent()) {
partitionFilePathSizeTriplet.add(Pair.of(entry.getKey(), Pair.of(entry.getValue().getBaseFile().get().getPath(), entry.getValue().getBaseFile().get().getFileLen())));
}
entry.getValue().getLogFiles().forEach(hoodieLogFile -> {
if (entry.getValue().getLogFiles().count() > 0) {
entry.getValue().getLogFiles().forEach(logfile -> {
partitionFilePathSizeTriplet.add(Pair.of(entry.getKey(), Pair.of(logfile.getPath().toString(), logfile.getFileSize())));
});
if (entry.getValue().getLogFiles().findAny().isPresent()) {
entry.getValue().getLogFiles().forEach(logfile -> partitionFilePathSizeTriplet.add(Pair.of(entry.getKey(), Pair.of(logfile.getPath().toString(), logfile.getFileSize()))));
}
});
});

int fileGroupCount = dataWriteConfig.getMetadataConfig().getExpressionIndexFileGroupCount();
int parallelism = Math.min(partitionFilePathSizeTriplet.size(), dataWriteConfig.getMetadataConfig().getExpressionIndexParallelism());
Schema readerSchema = getProjectedSchemaForExpressionIndex(indexDefinition, dataMetaClient);
return Pair.of(fileGroupCount, getExpressionIndexRecords(partitionFilePathSizeTriplet, indexDefinition, dataMetaClient, parallelism, readerSchema, storageConf, instantTime));
return partitionFilePathSizeTriplet;
}

HoodieIndexDefinition getIndexDefinition(String indexName) {
Expand Down Expand Up @@ -667,7 +664,7 @@ private Pair<Integer, HoodieData<HoodieRecord>> initializeRecordIndexPartition()
// Collect the list of latest base files present in each partition
List<String> partitions = metadata.getAllPartitionPaths();
fsView.loadAllPartitions();
HoodieData<HoodieRecord> records = null;
HoodieData<HoodieRecord> records;
if (dataMetaClient.getTableConfig().getTableType() == HoodieTableType.COPY_ON_WRITE) {
// for COW, we can only consider base files to initialize.
final List<Pair<String, HoodieBaseFile>> partitionBaseFilePairs = new ArrayList<>();
Expand All @@ -676,13 +673,11 @@ private Pair<Integer, HoodieData<HoodieRecord>> initializeRecordIndexPartition()
.map(basefile -> Pair.of(partition, basefile)).collect(Collectors.toList()));
}

LOG.info("Initializing record index from " + partitionBaseFilePairs.size() + " base files in "
+ partitions.size() + " partitions");
LOG.info("Initializing record index from {} base files in {} partitions", partitionBaseFilePairs.size(), partitions.size());

// Collect record keys from the files in parallel
records = readRecordKeysFromBaseFiles(
engineContext,
dataWriteConfig,
partitionBaseFilePairs,
false,
dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism(),
Expand All @@ -691,13 +686,12 @@ private Pair<Integer, HoodieData<HoodieRecord>> initializeRecordIndexPartition()
this.getClass().getSimpleName());
} else {
final List<Pair<String, FileSlice>> partitionFileSlicePairs = new ArrayList<>();
String latestCommit = dataMetaClient.getActiveTimeline().filterCompletedAndCompactionInstants().lastInstant().map(instant -> instant.requestedTime()).orElse(SOLO_COMMIT_TIMESTAMP);
String latestCommit = dataMetaClient.getActiveTimeline().filterCompletedAndCompactionInstants().lastInstant().map(HoodieInstant::requestedTime).orElse(SOLO_COMMIT_TIMESTAMP);
for (String partition : partitions) {
fsView.getLatestMergedFileSlicesBeforeOrOn(partition, latestCommit).forEach(fs -> partitionFileSlicePairs.add(Pair.of(partition, fs)));
}

LOG.info("Initializing record index from " + partitionFileSlicePairs.size() + " file slices in "
+ partitions.size() + " partitions");
LOG.info("Initializing record index from {} file slices in {} partitions", partitionFileSlicePairs.size(), partitions.size());
records = readRecordKeysFromFileSliceSnapshot(
engineContext,
partitionFileSlicePairs,
Expand Down Expand Up @@ -729,7 +723,7 @@ private Pair<Integer, HoodieData<HoodieRecord>> initializeRecordIndexPartition()
* @param metaClient metaclient instance to use.
* @param dataWriteConfig write config to use.
* @param hoodieTable hoodie table instance of interest.
* @return
* @return {@link HoodieData} of {@link HoodieRecord} containing updates for record_index.
*/
private static HoodieData<HoodieRecord> readRecordKeysFromFileSliceSnapshot(HoodieEngineContext engineContext,
List<Pair<String, FileSlice>> partitionFileSlicePairs,
Expand Down Expand Up @@ -759,7 +753,7 @@ private static HoodieData<HoodieRecord> readRecordKeysFromFileSliceSnapshot(Hood
Option.of(fileSlice)).getMergedRecords().stream().map(record -> {
HoodieRecord record1 = (HoodieRecord) record;
return HoodieMetadataPayload.createRecordIndexUpdate(record1.getRecordKey(), partition, fileId,
record1.getCurrentLocation().getInstantTime(), 0);
requireNonNull(record1.getCurrentLocation()).getInstantTime(), 0);
}).iterator();
});
}
Expand Down Expand Up @@ -917,7 +911,7 @@ private void initializeFileGroups(HoodieTableMetaClient dataMetaClient, Metadata
HoodieStorage storage = metadataMetaClient.getStorage();
try {
final List<StoragePathInfo> existingFiles = storage.listDirectEntries(partitionPath);
if (existingFiles.size() > 0) {
if (!existingFiles.isEmpty()) {
LOG.warn("Deleting all existing files found in MDT partition {}", partitionName);
storage.deleteDirectory(partitionPath);
ValidationUtils.checkState(!storage.exists(partitionPath),
Expand Down Expand Up @@ -1209,7 +1203,7 @@ public void update(HoodieRestoreMetadata restoreMetadata, String instantTime) {
InstantGenerator datainstantGenerator = dataMetaClient.getInstantGenerator();
HoodieInstant restoreInstant = datainstantGenerator.createNewInstant(REQUESTED, HoodieTimeline.RESTORE_ACTION, instantTime);
HoodieInstant requested = datainstantGenerator.getRestoreRequestedInstant(restoreInstant);
HoodieRestorePlan restorePlan = null;
HoodieRestorePlan restorePlan;
try {
restorePlan = TimelineMetadataUtils.deserializeAvroMetadata(
dataMetaClient.getActiveTimeline().readRestoreInfoAsBytes(requested).get(), HoodieRestorePlan.class);
Expand Down Expand Up @@ -1626,7 +1620,7 @@ private void fetchOutofSyncFilesRecordsFromMetadataTable(Map<String, DirectoryIn
Map<String, List<String>> partitionFilesToDelete, List<String> partitionsToDelete) throws IOException {

for (String partition : metadata.fetchAllPartitionPaths()) {
StoragePath partitionPath = null;
StoragePath partitionPath;
if (StringUtils.isNullOrEmpty(partition) && !dataMetaClient.getTableConfig().isTablePartitioned()) {
partitionPath = new StoragePath(dataWriteConfig.getBasePath());
} else {
Expand All @@ -1637,7 +1631,7 @@ private void fetchOutofSyncFilesRecordsFromMetadataTable(Map<String, DirectoryIn
if (!dirInfoMap.containsKey(partition)) {
// Entire partition has been deleted
partitionsToDelete.add(partitionId);
if (metadataFiles != null && metadataFiles.size() > 0) {
if (metadataFiles != null && !metadataFiles.isEmpty()) {
partitionFilesToDelete.put(partitionId, metadataFiles.stream().map(f -> f.getPath().getName()).collect(Collectors.toList()));
}
} else {
Expand Down Expand Up @@ -1672,7 +1666,6 @@ private HoodieData<HoodieRecord> getRecordIndexReplacedRecords(HoodieReplaceComm
.collect(Collectors.toList());
return readRecordKeysFromBaseFiles(
engineContext,
dataWriteConfig,
partitionBaseFilePairs,
true,
dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ protected final void writeTableMetadata(HoodieCommitMetadata metadata, String ac
// Recreate MDT for insert_overwrite_table operation.
if (table.getConfig().isMetadataTableEnabled()
&& WriteOperationType.INSERT_OVERWRITE_TABLE == metadata.getOperationType()) {
HoodieTableMetadataUtil.deleteMetadataTable(table.getMetaClient(), table.getContext(), false);
HoodieTableMetadataUtil.deleteMetadataTable(table.getMetaClient(), false);
}

// MDT should be recreated if it has been deleted for insert_overwrite_table operation.
Expand Down Expand Up @@ -157,7 +157,7 @@ protected final void dropIndexOnRestore() {
for (String partitionPath : table.getMetaClient().getTableConfig().getMetadataPartitions()) {
if (MetadataPartitionType.shouldDeletePartitionOnRestore(partitionPath)) {
// setting backup to true as this delete is part of restore operation
HoodieTableMetadataUtil.deleteMetadataTablePartition(table.getMetaClient(), context, partitionPath, true);
HoodieTableMetadataUtil.deleteMetadataTablePartition(table.getMetaClient(), partitionPath, true);
}
}
}
Expand Down
Loading
Loading