From 57904d890a18c59c432647a7ed5aeb13cee008b6 Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Tue, 11 Feb 2025 14:53:43 +0530 Subject: [PATCH 1/4] Simplify metadata writer constructor --- .../hudi/cli/commands/MetadataCommand.java | 4 +- .../cli/commands/TestRestoresCommand.java | 2 +- .../cli/commands/TestRollbacksCommand.java | 2 +- .../cli/integ/ITTestSavepointsCommand.java | 2 +- .../HoodieBackedTableMetadataWriter.java | 18 +++------ .../client/HoodieFlinkTableServiceClient.java | 4 +- .../FlinkHoodieBackedTableMetadataWriter.java | 29 +++----------- .../apache/hudi/table/HoodieFlinkTable.java | 4 +- .../hudi/client/HoodieJavaWriteClient.java | 3 +- .../JavaHoodieBackedTableMetadataWriter.java | 25 +++--------- .../apache/hudi/table/HoodieJavaTable.java | 14 +++---- .../client/TestJavaHoodieBackedMetadata.java | 2 +- .../HoodieJavaClientTestHarness.java | 4 +- .../testutils/TestHoodieMetadataBase.java | 2 +- .../hudi/client/SparkRDDWriteClient.java | 5 +-- .../SparkHoodieBackedTableMetadataWriter.java | 39 +++++-------------- .../apache/hudi/table/HoodieSparkTable.java | 7 +--- .../hudi/client/TestClientRollback.java | 10 ++--- .../functional/TestHoodieMetadataBase.java | 2 +- .../index/bloom/TestHoodieBloomIndex.java | 8 ++-- .../hudi/io/TestHoodieTimelineArchiver.java | 4 +- .../org/apache/hudi/table/TestCleaner.java | 2 +- .../hudi/testutils/HoodieCleanerTestBase.java | 2 +- .../HoodieSparkClientTestHarness.java | 4 +- .../apache/hudi/utils/TestCompactionUtil.java | 3 +- .../CreateMetadataTableProcedure.scala | 2 +- .../InitMetadataTableProcedure.scala | 2 +- .../functional/TestHoodieBackedMetadata.java | 4 +- .../table/TestHoodieMergeOnReadTable.java | 4 +- .../TestManifestFileWriterSpark.java | 2 +- 30 files changed, 72 insertions(+), 143 deletions(-) diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java index cabd9fdcaf6fe..0fb59dcb38330 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java @@ -131,7 +131,7 @@ public String create( HoodieTimer timer = HoodieTimer.start(); HoodieWriteConfig writeConfig = getWriteConfig(); initJavaSparkContext(Option.of(master)); - try (HoodieTableMetadataWriter writer = SparkHoodieBackedTableMetadataWriter.create(HoodieCLI.conf, writeConfig, new HoodieSparkEngineContext(jsc))) { + try (HoodieTableMetadataWriter writer = SparkHoodieBackedTableMetadataWriter.create(writeConfig, new HoodieSparkEngineContext(jsc))) { return String.format("Created Metadata Table in %s (duration=%.2f secs)", metadataPath, timer.endTimer() / 1000.0); } } @@ -176,7 +176,7 @@ public String init(@ShellOption(value = "--sparkMaster", defaultValue = SparkUti if (!readOnly) { HoodieWriteConfig writeConfig = getWriteConfig(); initJavaSparkContext(Option.of(master)); - try (HoodieTableMetadataWriter writer = SparkHoodieBackedTableMetadataWriter.create(HoodieCLI.conf, writeConfig, new HoodieSparkEngineContext(jsc))) { + try (HoodieTableMetadataWriter writer = SparkHoodieBackedTableMetadataWriter.create(writeConfig, new HoodieSparkEngineContext(jsc))) { // Empty } } diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRestoresCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRestoresCommand.java index 77930b929a229..0abdabef4abb0 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRestoresCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRestoresCommand.java @@ -102,7 +102,7 @@ public void init() throws Exception { .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()) .build(); - try (HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create(metaClient.getStorageConf(), config, context)) { + try (HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create(config, context)) { HoodieTestTable hoodieTestTable = HoodieMetadataTestTable.of(metaClient, metadataWriter, Option.of(context)) .withPartitionMetaFiles(DEFAULT_PARTITION_PATHS) .addCommit("100") diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRollbacksCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRollbacksCommand.java index 5641c2c4aa059..fb79789354d4d 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRollbacksCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRollbacksCommand.java @@ -103,7 +103,7 @@ public void init() throws Exception { .withRollbackUsingMarkers(false) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build(); try (HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create( - metaClient.getStorageConf(), config, context)) { + config, context)) { HoodieMetadataTestTable.of(metaClient, metadataWriter, Option.of(context)) .withPartitionMetaFiles(DEFAULT_PARTITION_PATHS) .addCommit("100") diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestSavepointsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestSavepointsCommand.java index b0fbb9f8718a0..c368fefc288b7 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestSavepointsCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestSavepointsCommand.java @@ -166,7 +166,7 @@ public void testRollbackToSavepointWithMetadataTableEnable() throws Exception { // then bootstrap metadata table at instant 104 HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath(HoodieCLI.basePath) .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()).build(); - SparkHoodieBackedTableMetadataWriter.create(HoodieCLI.conf, writeConfig, new HoodieSparkEngineContext(jsc)).close(); + SparkHoodieBackedTableMetadataWriter.create(writeConfig, new HoodieSparkEngineContext(jsc)).close(); assertTrue(HoodieCLI.storage.exists(metadataTableBasePath)); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index ddeefe45e65c1..330ceea925ed4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -164,22 +164,18 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableM /** * Hudi backed table metadata writer. * - * @param storageConf Storage configuration to use for the metadata writer * @param writeConfig Writer config * @param failedWritesCleaningPolicy Cleaning policy on failed writes * @param engineContext Engine context - * @param inflightInstantTimestamp Timestamp of any instant in progress */ - protected HoodieBackedTableMetadataWriter(StorageConfiguration storageConf, - HoodieWriteConfig writeConfig, + protected HoodieBackedTableMetadataWriter(HoodieWriteConfig writeConfig, HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy, - HoodieEngineContext engineContext, - Option inflightInstantTimestamp) { + HoodieEngineContext engineContext) { this.dataWriteConfig = writeConfig; this.engineContext = engineContext; - this.storageConf = storageConf; + this.storageConf = engineContext.getStorageConf(); this.metrics = Option.empty(); - this.dataMetaClient = HoodieTableMetaClient.builder().setConf(storageConf.newInstance()) + this.dataMetaClient = HoodieTableMetaClient.builder().setConf(engineContext.getStorageConf().newInstance()) .setBasePath(dataWriteConfig.getBasePath()) .setTimeGeneratorConfig(dataWriteConfig.getTimeGeneratorConfig()).build(); this.enabledPartitionTypes = getEnabledPartitions(dataWriteConfig.getProps(), dataMetaClient); @@ -187,7 +183,7 @@ protected HoodieBackedTableMetadataWriter(StorageConfiguration storageConf, this.metadataWriteConfig = createMetadataWriteConfig(writeConfig, failedWritesCleaningPolicy); try { initRegistry(); - initialized = initializeIfNeeded(dataMetaClient, inflightInstantTimestamp); + initialized = initializeIfNeeded(dataMetaClient); } catch (IOException e) { LOG.error("Failed to initialize metadata table", e); } @@ -243,11 +239,9 @@ public List getEnabledPartitionTypes() { * Initialize the metadata table if needed. * * @param dataMetaClient - meta client for the data table - * @param inflightInstantTimestamp - timestamp of an instant in progress on the dataset * @throws IOException on errors */ - protected boolean initializeIfNeeded(HoodieTableMetaClient dataMetaClient, - Option inflightInstantTimestamp) throws IOException { + protected boolean initializeIfNeeded(HoodieTableMetaClient dataMetaClient) throws IOException { HoodieTimer timer = HoodieTimer.start(); List metadataPartitionsToInit = new ArrayList<>(MetadataPartitionType.getValidValues().length); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java index 3874acdd4297b..fbb01182532c9 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java @@ -36,7 +36,6 @@ import org.apache.hudi.exception.HoodieClusteringException; import org.apache.hudi.exception.HoodieCommitException; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter; import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter; import org.apache.hudi.storage.StorageConfiguration; @@ -45,7 +44,6 @@ import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.compact.CompactHelpers; import org.apache.hudi.table.marker.WriteMarkersFactory; -import org.apache.hudi.util.FlinkClientUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -188,7 +186,7 @@ protected HoodieTable createTable(HoodieWriteConfig config, StorageConfiguration */ private HoodieBackedTableMetadataWriter initMetadataWriter(Option latestPendingInstant) { return (HoodieBackedTableMetadataWriter) FlinkHoodieBackedTableMetadataWriter.create( - HadoopFSUtils.getStorageConf(FlinkClientUtil.getHadoopConf()), this.config, HoodieFlinkEngineContext.DEFAULT, latestPendingInstant); + this.config, HoodieFlinkEngineContext.DEFAULT); } public void initMetadataTable() { diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java index ad7aca9ec6a68..3285a6f8d1c69 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java @@ -57,34 +57,17 @@ public class FlinkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetadataWriter> { private static final Logger LOG = LoggerFactory.getLogger(FlinkHoodieBackedTableMetadataWriter.class); - public static HoodieTableMetadataWriter create(StorageConfiguration conf, HoodieWriteConfig writeConfig, + public static HoodieTableMetadataWriter create(HoodieWriteConfig writeConfig, HoodieEngineContext context) { - return new FlinkHoodieBackedTableMetadataWriter(conf, writeConfig, EAGER, context, Option.empty()); + return new FlinkHoodieBackedTableMetadataWriter(writeConfig, context, EAGER); } - public static HoodieTableMetadataWriter create(StorageConfiguration conf, - HoodieWriteConfig writeConfig, - HoodieEngineContext context, - Option inFlightInstantTimestamp) { - return new FlinkHoodieBackedTableMetadataWriter( - conf, writeConfig, EAGER, context, inFlightInstantTimestamp); + public static HoodieTableMetadataWriter create(HoodieWriteConfig writeConfig, HoodieEngineContext context, HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy) { + return new FlinkHoodieBackedTableMetadataWriter(writeConfig, context, failedWritesCleaningPolicy); } - public static HoodieTableMetadataWriter create(StorageConfiguration conf, - HoodieWriteConfig writeConfig, - HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy, - HoodieEngineContext context, - Option inFlightInstantTimestamp) { - return new FlinkHoodieBackedTableMetadataWriter( - conf, writeConfig, failedWritesCleaningPolicy, context, inFlightInstantTimestamp); - } - - FlinkHoodieBackedTableMetadataWriter(StorageConfiguration storageConf, - HoodieWriteConfig writeConfig, - HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy, - HoodieEngineContext engineContext, - Option inFlightInstantTimestamp) { - super(storageConf, writeConfig, failedWritesCleaningPolicy, engineContext, inFlightInstantTimestamp); + FlinkHoodieBackedTableMetadataWriter(HoodieWriteConfig writeConfig, HoodieEngineContext engineContext, HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy) { + super(writeConfig, failedWritesCleaningPolicy, engineContext); } @Override diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java index c902b73420ae3..1b2fe55203b0c 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java @@ -101,9 +101,7 @@ protected Option getMetadataWriter( String triggeringInstantTimestamp, HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy) { if (config.isMetadataTableEnabled() || getMetaClient().getTableConfig().isMetadataTableAvailable()) { - return Option.of(FlinkHoodieBackedTableMetadataWriter.create( - getContext().getStorageConf(), config, failedWritesCleaningPolicy, getContext(), - Option.of(triggeringInstantTimestamp))); + return Option.of(FlinkHoodieBackedTableMetadataWriter.create(config, getContext(), failedWritesCleaningPolicy)); } else { return Option.empty(); } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java index 486b2d13557fc..d02537770006b 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java @@ -231,8 +231,7 @@ private void initializeMetadataTable(Option inFlightInstantTimestamp) { return; } - try (HoodieTableMetadataWriter writer = JavaHoodieBackedTableMetadataWriter.create( - context.getStorageConf(), config, context, inFlightInstantTimestamp)) { + try (HoodieTableMetadataWriter writer = JavaHoodieBackedTableMetadataWriter.create(config, context)) { if (writer.isInitialized()) { writer.performTableServices(inFlightInstantTimestamp); } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/metadata/JavaHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/metadata/JavaHoodieBackedTableMetadataWriter.java index f6d4699b7bc73..fdbc0fa574568 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/metadata/JavaHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/metadata/JavaHoodieBackedTableMetadataWriter.java @@ -49,16 +49,12 @@ public class JavaHoodieBackedTableMetadataWriter extends HoodieBackedTableMetada /** * Hudi backed table metadata writer. * - * @param storageConf Storage configuration to use for the metadata writer * @param writeConfig Writer config * @param failedWritesCleaningPolicy Cleaning policy on failed writes * @param engineContext Engine context - * @param inflightInstantTimestamp Timestamp of any instant in progress */ - protected JavaHoodieBackedTableMetadataWriter(StorageConfiguration storageConf, HoodieWriteConfig writeConfig, HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy, - HoodieEngineContext engineContext, - Option inflightInstantTimestamp) { - super(storageConf, writeConfig, failedWritesCleaningPolicy, engineContext, inflightInstantTimestamp); + protected JavaHoodieBackedTableMetadataWriter(HoodieWriteConfig writeConfig, HoodieEngineContext engineContext, HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy) { + super(writeConfig, failedWritesCleaningPolicy, engineContext); } @Override @@ -66,21 +62,12 @@ HoodieTable getTable(HoodieWriteConfig writeConfig, HoodieTableMetaClient metaCl return HoodieJavaTable.create(writeConfig, engineContext, metaClient); } - public static HoodieTableMetadataWriter create(StorageConfiguration conf, - HoodieWriteConfig writeConfig, - HoodieEngineContext context, - Option inflightInstantTimestamp) { - return new JavaHoodieBackedTableMetadataWriter( - conf, writeConfig, EAGER, context, inflightInstantTimestamp); + public static HoodieTableMetadataWriter create(HoodieWriteConfig writeConfig, HoodieEngineContext context) { + return new JavaHoodieBackedTableMetadataWriter(writeConfig, context, EAGER); } - public static HoodieTableMetadataWriter create(StorageConfiguration conf, - HoodieWriteConfig writeConfig, - HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy, - HoodieEngineContext context, - Option inflightInstantTimestamp) { - return new JavaHoodieBackedTableMetadataWriter( - conf, writeConfig, failedWritesCleaningPolicy, context, inflightInstantTimestamp); + public static HoodieTableMetadataWriter create(HoodieWriteConfig writeConfig, HoodieEngineContext context, HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy) { + return new JavaHoodieBackedTableMetadataWriter(writeConfig, context, failedWritesCleaningPolicy); } @Override diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java index a0ed4e65747c2..c230efca05e4d 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java @@ -36,7 +36,6 @@ import org.apache.hudi.metadata.JavaHoodieBackedTableMetadataWriter; import org.apache.hudi.table.action.HoodieWriteMetadata; -import java.io.IOException; import java.util.List; public abstract class HoodieJavaTable @@ -81,22 +80,19 @@ protected HoodieIndex getIndex(HoodieWriteConfig config, HoodieEngineContext con @Override protected Option getMetadataWriter(String triggeringInstantTimestamp, HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy) { if (config.isMetadataTableEnabled() || metaClient.getTableConfig().isMetadataTableAvailable()) { - // Create the metadata table writer. First time after the upgrade this creation might trigger - // metadata table bootstrapping. Bootstrapping process could fail and checking the table - // existence after the creation is needed. - final HoodieTableMetadataWriter metadataWriter = JavaHoodieBackedTableMetadataWriter.create( - getContext().getStorageConf(), config, failedWritesCleaningPolicy, getContext(), - Option.of(triggeringInstantTimestamp)); // even with metadata enabled, some index could have been disabled // delete metadata partitions corresponding to such indexes deleteMetadataIndexIfNecessary(); - try { + // Create the metadata table writer. First time after the upgrade this creation might trigger + // metadata table bootstrapping. Bootstrapping process could fail and checking the table + // existence after the creation is needed. + try (HoodieTableMetadataWriter metadataWriter = JavaHoodieBackedTableMetadataWriter.create(config, getContext(), failedWritesCleaningPolicy)) { if (isMetadataTableExists || metaClient.getStorage().exists( HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath()))) { isMetadataTableExists = true; return Option.of(metadataWriter); } - } catch (IOException e) { + } catch (Exception e) { throw new HoodieMetadataException("Checking existence of metadata table failed", e); } } else { diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java index cfee5a022e991..699b5a2156709 100644 --- a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java +++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java @@ -2654,7 +2654,7 @@ public void testOutOfOrderCommits() throws Exception { // Execute compaction on metadata table. try (JavaHoodieBackedTableMetadataWriter metadataWriter = - (JavaHoodieBackedTableMetadataWriter) JavaHoodieBackedTableMetadataWriter.create(storageConf, client.getConfig(), context, Option.empty())) { + (JavaHoodieBackedTableMetadataWriter) JavaHoodieBackedTableMetadataWriter.create(client.getConfig(), context)) { Properties metadataProps = metadataWriter.getWriteConfig().getProps(); metadataProps.setProperty(INLINE_COMPACT_NUM_DELTA_COMMITS.key(), "3"); HoodieWriteConfig metadataWriteConfig = HoodieWriteConfig.newBuilder() diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java index 25ec624b93d0e..40028f026083a 100644 --- a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java +++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java @@ -268,7 +268,7 @@ public void syncTableMetadata(HoodieWriteConfig writeConfig) { return; } // Open up the metadata table again, for syncing - try (HoodieTableMetadataWriter writer = JavaHoodieBackedTableMetadataWriter.create(storageConf, writeConfig, context, Option.empty())) { + try (HoodieTableMetadataWriter writer = JavaHoodieBackedTableMetadataWriter.create(writeConfig, context)) { LOG.info("Successfully synced to metadata table"); } catch (Exception e) { throw new HoodieMetadataException("Error syncing to metadata table.", e); @@ -410,7 +410,7 @@ protected void validateFilesPerPartition(HoodieTestTable testTable, protected HoodieBackedTableMetadataWriter metadataWriter(HoodieWriteConfig clientConfig) { return (HoodieBackedTableMetadataWriter) JavaHoodieBackedTableMetadataWriter - .create(storageConf, clientConfig, new HoodieJavaEngineContext(storageConf), Option.empty()); + .create(clientConfig, new HoodieJavaEngineContext(storageConf)); } private void runFullValidation(HoodieWriteConfig writeConfig, diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/TestHoodieMetadataBase.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/TestHoodieMetadataBase.java index fc74c10d2a65a..166a5647fd295 100644 --- a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/TestHoodieMetadataBase.java +++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/TestHoodieMetadataBase.java @@ -114,7 +114,7 @@ public void init(HoodieTableType tableType, Option writeConfi protected void initWriteConfigAndMetatableWriter(HoodieWriteConfig writeConfig, boolean enableMetadataTable) { this.writeConfig = writeConfig; if (enableMetadataTable) { - metadataWriter = JavaHoodieBackedTableMetadataWriter.create(storageConf, writeConfig, context, Option.empty()); + metadataWriter = JavaHoodieBackedTableMetadataWriter.create(writeConfig, context); // reload because table configs could have been updated metaClient = HoodieTableMetaClient.reload(metaClient); testTable = HoodieMetadataTestTable.of(metaClient, metadataWriter, Option.of(context)); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index 58b2d98ffd78f..e7c1c4a0b97ba 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -18,7 +18,6 @@ package org.apache.hudi.client; -import org.apache.hudi.index.HoodieSparkIndexClient; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.client.embedded.EmbeddedTimelineService; import org.apache.hudi.client.utils.SparkReleaseResources; @@ -39,6 +38,7 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.hadoop.fs.HoodieWrapperFileSystem; import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.index.HoodieSparkIndexClient; import org.apache.hudi.index.SparkHoodieIndexFactory; import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.hudi.metadata.MetadataPartitionType; @@ -300,8 +300,7 @@ private void initializeMetadataTable(Option inFlightInstantTimestamp, Ho metrics.emitMetadataEnablementMetrics(true, isMetadataColStatsAvailable, isMetadataBloomFilterAvailable, isMetadataRliAvailable); } - try (HoodieTableMetadataWriter writer = SparkHoodieBackedTableMetadataWriter.create( - context.getStorageConf(), config, context, inFlightInstantTimestamp)) { + try (HoodieTableMetadataWriter writer = SparkHoodieBackedTableMetadataWriter.create(config, context)) { if (writer.isInitialized()) { writer.performTableServices(inFlightInstantTimestamp); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java index 863a4995f6e0a..20a78f4b9b39d 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java @@ -18,7 +18,6 @@ package org.apache.hudi.metadata; -import org.apache.hudi.index.HoodieSparkIndexClient; import org.apache.hudi.client.BaseHoodieWriteClient; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.common.HoodieSparkEngineContext; @@ -41,6 +40,7 @@ import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.data.HoodieJavaRDD; +import org.apache.hudi.index.HoodieSparkIndexClient; import org.apache.hudi.index.expression.HoodieSparkExpressionIndex; import org.apache.hudi.index.expression.HoodieSparkExpressionIndex.ExpressionIndexComputationMetadata; import org.apache.hudi.metrics.DistributedRegistry; @@ -77,41 +77,20 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad * If the metadata table does not exist, an attempt is made to bootstrap it but there is no guaranteed that * table will end up bootstrapping at this time. * - * @param conf - * @param writeConfig - * @param context - * @param inflightInstantTimestamp Timestamp of an instant which is in-progress. This instant is ignored while - * attempting to bootstrap the table. + * @param writeConfig write config + * @param context engine context - {@link HoodieSparkEngineContext} * @return An instance of the {@code HoodieTableMetadataWriter} */ - public static HoodieTableMetadataWriter create(StorageConfiguration conf, - HoodieWriteConfig writeConfig, - HoodieEngineContext context, - Option inflightInstantTimestamp) { - return new SparkHoodieBackedTableMetadataWriter( - conf, writeConfig, EAGER, context, inflightInstantTimestamp); - } - - public static HoodieTableMetadataWriter create(StorageConfiguration conf, - HoodieWriteConfig writeConfig, - HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy, - HoodieEngineContext context, - Option inflightInstantTimestamp) { - return new SparkHoodieBackedTableMetadataWriter( - conf, writeConfig, failedWritesCleaningPolicy, context, inflightInstantTimestamp); + public static HoodieTableMetadataWriter create(HoodieWriteConfig writeConfig, HoodieEngineContext context) { + return new SparkHoodieBackedTableMetadataWriter(writeConfig, context, EAGER); } - public static HoodieTableMetadataWriter create(StorageConfiguration conf, HoodieWriteConfig writeConfig, - HoodieEngineContext context) { - return create(conf, writeConfig, context, Option.empty()); + public static HoodieTableMetadataWriter create(HoodieWriteConfig writeConfig, HoodieEngineContext context, HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy) { + return new SparkHoodieBackedTableMetadataWriter(writeConfig, context, failedWritesCleaningPolicy); } - SparkHoodieBackedTableMetadataWriter(StorageConfiguration hadoopConf, - HoodieWriteConfig writeConfig, - HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy, - HoodieEngineContext engineContext, - Option inflightInstantTimestamp) { - super(hadoopConf, writeConfig, failedWritesCleaningPolicy, engineContext, inflightInstantTimestamp); + SparkHoodieBackedTableMetadataWriter(HoodieWriteConfig writeConfig, HoodieEngineContext engineContext, HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy) { + super(writeConfig, failedWritesCleaningPolicy, engineContext); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java index 0ff6d8ab551f7..106e1d5a231fa 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java @@ -105,16 +105,13 @@ protected Option getMetadataWriter( // Create the metadata table writer. First time after the upgrade this creation might trigger // metadata table bootstrapping. Bootstrapping process could fail and checking the table // existence after the creation is needed. - HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create( - getContext().getStorageConf(), config, failedWritesCleaningPolicy, getContext(), - Option.of(triggeringInstantTimestamp)); - try { + try (HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create(config, getContext(), failedWritesCleaningPolicy)) { if (isMetadataTableExists || metaClient.getStorage().exists( HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath()))) { isMetadataTableExists = true; return Option.of(metadataWriter); } - } catch (IOException e) { + } catch (Exception e) { throw new HoodieMetadataException("Checking existence of metadata table failed", e); } } else { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java index 74c16125d3766..55b3a261878fa 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java @@ -411,7 +411,7 @@ public void testRollbackCommit() throws Exception { .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()) .withMetadataConfig(HoodieMetadataConfig.newBuilder().withMetadataIndexColumnStats(false).build()).build(); // HUDI-8815 - try (HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create(storageConf, config, context)) { + try (HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create(config, context)) { HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient, metadataWriter, Option.of(context)); Map>> partitionToFilesNameLengthMap1 = new HashMap<>(); @@ -527,7 +527,7 @@ public void testFailedRollbackCommit( .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build()) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build(); - HoodieTableMetadataWriter metadataWriter = enableMetadataTable ? SparkHoodieBackedTableMetadataWriter.create(storageConf, config, context) : null; + HoodieTableMetadataWriter metadataWriter = enableMetadataTable ? SparkHoodieBackedTableMetadataWriter.create(config, context) : null; HoodieTestTable testTable = enableMetadataTable ? HoodieMetadataTestTable.of(metaClient, metadataWriter, Option.of(context)) : HoodieTestTable.of(metaClient); @@ -637,7 +637,7 @@ public void testAutoRollbackInflightCommit() throws Exception { .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build()) .withMetadataConfig(HoodieMetadataConfig.newBuilder().withMetadataIndexColumnStats(false).build()).build(); - try (HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create(storageConf, config, context)) { + try (HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create(config, context)) { HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient, metadataWriter, Option.of(context)); Map>> partitionToFilesNameLengthMap1 = new HashMap<>(); @@ -735,7 +735,7 @@ public void testRollbackWithRequestedRollbackPlan(boolean enableMetadataTable, b .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build(); HoodieTableMetadataWriter metadataWriter = enableMetadataTable ? SparkHoodieBackedTableMetadataWriter.create( - metaClient.getStorageConf(), config, context) : null; + config, context) : null; HoodieTestTable testTable = enableMetadataTable ? HoodieMetadataTestTable.of(metaClient, metadataWriter, Option.of(context)) : HoodieTestTable.of(metaClient); @@ -831,7 +831,7 @@ public void testFallbackToListingBasedRollbackForCompletedInstant() throws Excep .withMetadataConfig(HoodieMetadataConfig.newBuilder().withMetadataIndexColumnStats(false).build()).build(); // create test table with all commits completed - try (HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create(metaClient.getStorageConf(), config, context)) { + try (HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create(config, context)) { HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient, metadataWriter, Option.of(context)); testTable.withPartitionMetaFiles(p1, p2, p3) .addCommit(commitTime1) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java index 1c7a929ee9112..42054eb42768c 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java @@ -117,7 +117,7 @@ public void init(HoodieTableType tableType, Option writeConfi protected void initWriteConfigAndMetatableWriter(HoodieWriteConfig writeConfig, boolean enableMetadataTable) throws IOException { this.writeConfig = writeConfig; if (enableMetadataTable) { - metadataWriter = SparkHoodieBackedTableMetadataWriter.create(storageConf, writeConfig, context); + metadataWriter = SparkHoodieBackedTableMetadataWriter.create(writeConfig, context); // reload because table configs could have been updated metaClient = HoodieTableMetaClient.reload(metaClient); testTable = HoodieMetadataTestTable.of(metaClient, metadataWriter, Option.of(context)); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java index 496a922bb7d6b..dd0b2efc85e86 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java @@ -151,7 +151,7 @@ public void testLoadInvolvedFiles( makeConfig(rangePruning, treeFiltering, bucketizedChecking, useMetadataTable); HoodieBloomIndex index = new HoodieBloomIndex(config, SparkHoodieBloomIndexHelper.getInstance()); HoodieTable hoodieTable = HoodieSparkTable.create(config, context, metaClient); - metadataWriter = SparkHoodieBackedTableMetadataWriter.create(storageConf, config, context); + metadataWriter = SparkHoodieBackedTableMetadataWriter.create(config, context); HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of(metaClient, SCHEMA, metadataWriter, Option.of(context)); // Create some partitions, and put some files @@ -400,7 +400,7 @@ public void testTagLocationOnPartitionedTable( // Also create the metadata and config HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking, useMetadataTable); HoodieSparkTable hoodieTable = HoodieSparkTable.create(config, context, metaClient); - metadataWriter = SparkHoodieBackedTableMetadataWriter.create(storageConf, config, context); + metadataWriter = SparkHoodieBackedTableMetadataWriter.create(config, context); HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of(metaClient, SCHEMA, metadataWriter, Option.of(context)); // Let's tag @@ -499,7 +499,7 @@ public void testTagLocationOnNonpartitionedTable( HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking, useMetadataTable); HoodieSparkTable hoodieTable = HoodieSparkTable.create(config, context, metaClient); - metadataWriter = SparkHoodieBackedTableMetadataWriter.create(storageConf, config, context); + metadataWriter = SparkHoodieBackedTableMetadataWriter.create(config, context); HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of(metaClient, SCHEMA, metadataWriter, Option.of(context)); // Let's tag @@ -595,7 +595,7 @@ public void testCheckExists( HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking, useMetadataTable); HoodieTable hoodieTable = HoodieSparkTable.create(config, context, metaClient); - metadataWriter = SparkHoodieBackedTableMetadataWriter.create(storageConf, config, context); + metadataWriter = SparkHoodieBackedTableMetadataWriter.create(config, context); HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of(metaClient, SCHEMA, metadataWriter, Option.of(context)); // Let's tag diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java index c3b8375687e45..e5b1bd5c4980e 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java @@ -152,7 +152,7 @@ public void init(HoodieTableType tableType) throws Exception { private void initWriteConfigAndMetatableWriter(HoodieWriteConfig writeConfig, boolean enableMetadataTable) { if (enableMetadataTable) { - metadataWriter = SparkHoodieBackedTableMetadataWriter.create(storageConf, writeConfig, context); + metadataWriter = SparkHoodieBackedTableMetadataWriter.create(writeConfig, context); // reload because table configs could have been updated metaClient = HoodieTableMetaClient.reload(metaClient); testTable = HoodieMetadataTestTable.of(metaClient, metadataWriter, Option.of(context)); @@ -396,7 +396,7 @@ private HoodieInstant triggerCommit( String file1P0C0 = UUID.randomUUID().toString(); String file1P1C0 = UUID.randomUUID().toString(); String commitTs = TimelineUtils.formatDate(Date.from(curDateTime.minusMinutes(minutesForCommit).toInstant())); - try (HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create(storageConf, config, context)) { + try (HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create(config, context)) { Map> part1ToFileId = Collections.unmodifiableMap(new HashMap>() { { put(p0, CollectionUtils.createImmutableList(file1P0C0)); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java index fa8231bbce3e9..2304e275b4e82 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java @@ -947,7 +947,7 @@ public void testCleaningWithZeroPartitionPaths() throws Exception { // Make a commit, although there are no partitionPaths. // Example use-case of this is when a client wants to create a table // with just some commit metadata, but no data/partitionPaths. - try (HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create(storageConf, config, context)) { + try (HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create(config, context)) { HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient, metadataWriter, Option.of(context)); testTable.doWriteOperation("001", WriteOperationType.INSERT, Collections.emptyList(), 1); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieCleanerTestBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieCleanerTestBase.java index 63ecf2b4f552e..816d597414525 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieCleanerTestBase.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieCleanerTestBase.java @@ -220,7 +220,7 @@ public void commitWithMdt(String instantTime, Map> partToFi } protected HoodieTableMetadataWriter getMetadataWriter(HoodieWriteConfig config) { - return SparkHoodieBackedTableMetadataWriter.create(storageConf, config, context); + return SparkHoodieBackedTableMetadataWriter.create(config, context); } protected HoodieTestTable tearDownTestTableAndReinit(HoodieTestTable testTable, HoodieWriteConfig config) throws Exception { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkClientTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkClientTestHarness.java index a4dee7afe2a0a..7c777e2ac19e2 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkClientTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkClientTestHarness.java @@ -559,7 +559,7 @@ public void syncTableMetadata(HoodieWriteConfig writeConfig) { return; } // Open up the metadata table again, for syncing - try (HoodieTableMetadataWriter writer = SparkHoodieBackedTableMetadataWriter.create(storageConf, writeConfig, context)) { + try (HoodieTableMetadataWriter writer = SparkHoodieBackedTableMetadataWriter.create(writeConfig, context)) { LOG.info("Successfully synced to metadata table"); } catch (Exception e) { throw new HoodieMetadataException("Error syncing to metadata table.", e); @@ -568,7 +568,7 @@ public void syncTableMetadata(HoodieWriteConfig writeConfig) { public HoodieBackedTableMetadataWriter metadataWriter(HoodieWriteConfig clientConfig) { return (HoodieBackedTableMetadataWriter) SparkHoodieBackedTableMetadataWriter - .create(storageConf, clientConfig, new HoodieSparkEngineContext(jsc)); + .create(clientConfig, new HoodieSparkEngineContext(jsc)); } public HoodieTableMetadata metadata(HoodieWriteConfig clientConfig, diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java index 964e47b9e1a1d..48a010625c60b 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java @@ -83,8 +83,7 @@ void beforeEach(Map options) throws IOException { this.metaClient = table.getMetaClient(); // initialize the metadata table path if (conf.getBoolean(FlinkOptions.METADATA_ENABLED)) { - FlinkHoodieBackedTableMetadataWriter.create(table.getStorageConf(), table.getConfig(), - table.getContext(), Option.empty()); + FlinkHoodieBackedTableMetadataWriter.create(table.getConfig(), table.getContext()); } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CreateMetadataTableProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CreateMetadataTableProcedure.scala index 4b81abe0d70c9..8906874d59f25 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CreateMetadataTableProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CreateMetadataTableProcedure.scala @@ -63,7 +63,7 @@ class CreateMetadataTableProcedure extends BaseProcedure with ProcedureBuilder w } val timer = HoodieTimer.start val writeConfig = getWriteConfig(basePath) - SparkHoodieBackedTableMetadataWriter.create(metaClient.getStorageConf, writeConfig, new HoodieSparkEngineContext(jsc)) + SparkHoodieBackedTableMetadataWriter.create(writeConfig, new HoodieSparkEngineContext(jsc)) Seq(Row("Created Metadata Table in " + metadataPath + " (duration=" + timer.endTimer / 1000.0 + "secs)")) } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/InitMetadataTableProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/InitMetadataTableProcedure.scala index 4864a70a9ad8d..72155e71339f9 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/InitMetadataTableProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/InitMetadataTableProcedure.scala @@ -64,7 +64,7 @@ class InitMetadataTableProcedure extends BaseProcedure with ProcedureBuilder wit val timer = HoodieTimer.start if (!readOnly) { val writeConfig = getWriteConfig(basePath) - SparkHoodieBackedTableMetadataWriter.create(metaClient.getStorageConf, writeConfig, new HoodieSparkEngineContext(jsc)) + SparkHoodieBackedTableMetadataWriter.create(writeConfig, new HoodieSparkEngineContext(jsc)) } val action = if (readOnly) "Opened" else "Initialized" diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index 02a6a95360e23..d6f6812b08719 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -3437,7 +3437,7 @@ public void testOutOfOrderCommits() throws Exception { validateMetadata(client); // Execute compaction on metadata table. - metadataWriter = SparkHoodieBackedTableMetadataWriter.create(storageConf, client.getConfig(), context); + metadataWriter = SparkHoodieBackedTableMetadataWriter.create(client.getConfig(), context); Properties metadataProps = ((SparkHoodieBackedTableMetadataWriter) metadataWriter).getWriteConfig().getProps(); metadataProps.setProperty(INLINE_COMPACT_NUM_DELTA_COMMITS.key(), "3"); HoodieWriteConfig metadataWriteConfig = HoodieWriteConfig.newBuilder() @@ -3800,7 +3800,7 @@ private List getAllFiles(HoodieTableMetadata metadata) throws Excep private HoodieBackedTableMetadataWriter> metadataWriter(SparkRDDWriteClient client) { return (HoodieBackedTableMetadataWriter>) SparkHoodieBackedTableMetadataWriter - .create(storageConf, client.getConfig(), new HoodieSparkEngineContext(jsc)); + .create(client.getConfig(), new HoodieSparkEngineContext(jsc)); } private HoodieTableMetadata metadata(SparkRDDWriteClient client) { diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java index 006a1bd80c90b..91c22c4f303e2 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java @@ -376,7 +376,7 @@ public void testLogFileCountsAfterCompaction() throws Exception { metaClient = HoodieTableMetaClient.reload(metaClient); try (HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create( - writeClient.getEngineContext().getStorageConf(), config, writeClient.getEngineContext())) { + config, writeClient.getEngineContext())) { HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable .of(metaClient, HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS, metadataWriter); @@ -488,7 +488,7 @@ public void testLogBlocksCountsAfterLogCompaction(boolean populateMetaFields, St metaClient = HoodieTableMetaClient.reload(metaClient); try (HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create( - writeClient.getEngineContext().getStorageConf(), config, writeClient.getEngineContext())) { + config, writeClient.getEngineContext())) { HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable .of(metaClient, HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS, metadataWriter); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestManifestFileWriterSpark.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestManifestFileWriterSpark.java index 3a750dda54a98..58f800bd4dcd6 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestManifestFileWriterSpark.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestManifestFileWriterSpark.java @@ -91,7 +91,7 @@ private static void createTestDataForPartitionedTable(HoodieTableMetaClient meta final String instantTime = "100"; HoodieTestTable testTable = null; if (enableMetadata) { - HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create(storageConfiguration, writeConfig, context); + HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create(writeConfig, context); // reload because table configs could have been updated metaClient = HoodieTableMetaClient.reload(metaClient); testTable = HoodieMetadataTestTable.of(metaClient, metadataWriter, Option.of(context)); From 2db7c334240fe261b384c44efc583e51a872936c Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Tue, 11 Feb 2025 15:10:49 +0530 Subject: [PATCH 2/4] Remove unused util method --- .../HoodieBackedTableMetadataWriter.java | 2 - .../metadata/HoodieTableMetadataUtil.java | 71 +------------------ .../metadata/TestHoodieTableMetadataUtil.java | 27 ++++--- 3 files changed, 16 insertions(+), 84 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 330ceea925ed4..a81fc9d40f4bb 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -676,7 +676,6 @@ private Pair> initializeRecordIndexPartition() // Collect record keys from the files in parallel records = readRecordKeysFromBaseFiles( engineContext, - dataWriteConfig, partitionBaseFilePairs, false, dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism(), @@ -1666,7 +1665,6 @@ private HoodieData getRecordIndexReplacedRecords(HoodieReplaceComm .collect(Collectors.toList()); return readRecordKeysFromBaseFiles( engineContext, - dataWriteConfig, partitionBaseFilePairs, true, dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism(), diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index 695b011e87557..4414ce2cde190 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -155,7 +155,6 @@ import static org.apache.hudi.common.config.HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED; import static org.apache.hudi.common.config.HoodieCommonConfig.MAX_MEMORY_FOR_COMPACTION; import static org.apache.hudi.common.config.HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE; -import static org.apache.hudi.common.config.HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN; import static org.apache.hudi.common.fs.FSUtils.getFileNameFromPath; import static org.apache.hudi.common.model.HoodieRecord.COMMIT_TIME_METADATA_FIELD; import static org.apache.hudi.common.model.HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION; @@ -2317,12 +2316,10 @@ public static HoodieRecordGlobalLocation getLocationFromRecordIndexInfo( } /** - * Reads the record keys from the base files and returns a {@link HoodieData} of {@link HoodieRecord} to be updated in the metadata table. - * Use {@link #readRecordKeysFromFileSlices(HoodieEngineContext, List, boolean, int, String, HoodieTableMetaClient, EngineType)} instead. + * Reads the record keys from the given base files and returns a {@link HoodieData} of {@link HoodieRecord} to be updated in the metadata table. + * Computation is parallelized based on tuples. */ - @Deprecated public static HoodieData readRecordKeysFromBaseFiles(HoodieEngineContext engineContext, - HoodieConfig config, List> partitionBaseFilePairs, boolean forDelete, int recordIndexMaxParallelism, @@ -2345,69 +2342,7 @@ public static HoodieData readRecordKeysFromBaseFiles(HoodieEngineC final String instantTime = baseFile.getCommitTime(); HoodieFileReader reader = HoodieIOFactory.getIOFactory(HoodieStorageUtils.getStorage(basePath, configuration)) .getReaderFactory(HoodieRecord.HoodieRecordType.AVRO) - .getFileReader(config, dataFilePath); - return getHoodieRecordIterator(reader.getRecordKeyIterator(), forDelete, partition, fileId, instantTime); - }); - } - - /** - * Reads the record keys from the given file slices and returns a {@link HoodieData} of {@link HoodieRecord} to be updated in the metadata table. - * If file slice does not have any base file, then iterates over the log files to get the record keys. - */ - public static HoodieData readRecordKeysFromFileSlices(HoodieEngineContext engineContext, - List> partitionFileSlicePairs, - boolean forDelete, - int recordIndexMaxParallelism, - String activeModule, HoodieTableMetaClient metaClient, EngineType engineType) { - if (partitionFileSlicePairs.isEmpty()) { - return engineContext.emptyHoodieData(); - } - - engineContext.setJobStatus(activeModule, "Record Index: reading record keys from " + partitionFileSlicePairs.size() + " file slices"); - final int parallelism = Math.min(partitionFileSlicePairs.size(), recordIndexMaxParallelism); - final StoragePath basePath = metaClient.getBasePath(); - final StorageConfiguration storageConf = metaClient.getStorageConf(); - return engineContext.parallelize(partitionFileSlicePairs, parallelism).flatMap(partitionAndBaseFile -> { - final String partition = partitionAndBaseFile.getKey(); - final FileSlice fileSlice = partitionAndBaseFile.getValue(); - if (!fileSlice.getBaseFile().isPresent()) { - List logFilePaths = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()) - .map(l -> l.getPath().toString()).collect(Collectors.toList()); - HoodieMergedLogRecordScanner mergedLogRecordScanner = HoodieMergedLogRecordScanner.newBuilder() - .withStorage(metaClient.getStorage()) - .withBasePath(basePath) - .withLogFilePaths(logFilePaths) - .withReaderSchema(HoodieAvroUtils.getRecordKeySchema()) - .withLatestInstantTime(metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::requestedTime).orElse("")) - .withReverseReader(false) - .withMaxMemorySizeInBytes(storageConf.getLong( - MAX_MEMORY_FOR_COMPACTION.key(), DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES)) - .withSpillableMapBasePath(FileIOUtils.getDefaultSpillableMapBasePath()) - .withPartition(fileSlice.getPartitionPath()) - .withOptimizedLogBlocksScan(storageConf.getBoolean(ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.key(), false)) - .withDiskMapType(storageConf.getEnum(SPILLABLE_DISK_MAP_TYPE.key(), SPILLABLE_DISK_MAP_TYPE.defaultValue())) - .withBitCaskDiskMapCompressionEnabled(storageConf.getBoolean( - DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())) - .withRecordMerger(HoodieRecordUtils.createRecordMerger( - metaClient.getBasePath().toString(), - engineType, - Collections.emptyList(), // TODO: support different merger classes, which is currently only known to write config - metaClient.getTableConfig().getRecordMergeStrategyId())) - .withTableMetaClient(metaClient) - .build(); - ClosableIterator recordKeyIterator = ClosableIterator.wrap(mergedLogRecordScanner.getRecords().keySet().iterator()); - return getHoodieRecordIterator(recordKeyIterator, forDelete, partition, fileSlice.getFileId(), fileSlice.getBaseInstantTime()); - } - final HoodieBaseFile baseFile = fileSlice.getBaseFile().get(); - final String filename = baseFile.getFileName(); - StoragePath dataFilePath = filePath(basePath, partition, filename); - - final String fileId = baseFile.getFileId(); - final String instantTime = baseFile.getCommitTime(); - HoodieConfig hoodieConfig = getReaderConfigs(storageConf); - HoodieFileReader reader = HoodieIOFactory.getIOFactory(metaClient.getStorage()) - .getReaderFactory(HoodieRecord.HoodieRecordType.AVRO) - .getFileReader(hoodieConfig, dataFilePath); + .getFileReader(getReaderConfigs(configuration), dataFilePath); return getHoodieRecordIterator(reader.getRecordKeyIterator(), forDelete, partition, fileId, instantTime); }); } diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java index 7947c0571c1ab..49c6a598657de 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java @@ -22,7 +22,6 @@ import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.data.HoodieData; -import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.engine.HoodieLocalEngineContext; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieBaseFile; @@ -102,15 +101,15 @@ public void tearDown() throws IOException { @Test public void testReadRecordKeysFromBaseFilesWithEmptyPartitionBaseFilePairs() { HoodieLocalEngineContext engineContext = new HoodieLocalEngineContext(metaClient.getStorageConf()); - List> partitionFileSlicePairs = Collections.emptyList(); - HoodieData result = HoodieTableMetadataUtil.readRecordKeysFromFileSlices( + List> partitionBaseFilePairs = Collections.emptyList(); + HoodieData result = HoodieTableMetadataUtil.readRecordKeysFromBaseFiles( engineContext, - partitionFileSlicePairs, + partitionBaseFilePairs, false, 1, - "activeModule", - metaClient, - EngineType.SPARK + metaClient.getBasePath(), + engineContext.getStorageConf(), + "activeModule" ); assertTrue(result.isEmpty()); } @@ -180,7 +179,7 @@ public void testReadRecordKeysFromBaseFilesWithValidRecords() throws Exception { String instant = "20230918120000000"; hoodieTestTable = hoodieTestTable.addCommit(instant); Set recordKeys = new HashSet<>(); - final List> partitionFileSlicePairs = new ArrayList<>(); + final List> partitionBaseFilePairs = new ArrayList<>(); // Generate 10 inserts for each partition and populate partitionBaseFilePairs and recordKeys. DATE_PARTITIONS.forEach(p -> { try { @@ -195,7 +194,7 @@ public void testReadRecordKeysFromBaseFilesWithValidRecords() throws Exception { engineContext); HoodieBaseFile baseFile = new HoodieBaseFile(hoodieTestTable.getBaseFilePath(p, fileId).toString(), fileId, instant, null); fileSlice.setBaseFile(baseFile); - partitionFileSlicePairs.add(Pair.of(p, fileSlice)); + partitionBaseFilePairs.add(Pair.of(p, baseFile)); recordKeys.addAll(hoodieRecords.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toSet())); } catch (Exception e) { throw new RuntimeException(e); @@ -203,14 +202,14 @@ public void testReadRecordKeysFromBaseFilesWithValidRecords() throws Exception { }); // Call the method readRecordKeysFromBaseFiles with the created partitionBaseFilePairs. - HoodieData result = HoodieTableMetadataUtil.readRecordKeysFromFileSlices( + HoodieData result = HoodieTableMetadataUtil.readRecordKeysFromBaseFiles( engineContext, - partitionFileSlicePairs, + partitionBaseFilePairs, false, 1, - "activeModule", - metaClient, - EngineType.SPARK + metaClient.getBasePath(), + engineContext.getStorageConf(), + "activeModule" ); // Validate the result. List records = result.collectAsList(); From 3b0eb8442b2cada06f3ad4d5fc59b3ded74d86eb Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Wed, 12 Feb 2025 11:02:04 +0530 Subject: [PATCH 3/4] More cleanup in HoodieTableMetadataUtil --- .../hudi/cli/commands/MetadataCommand.java | 5 +- .../HoodieBackedTableMetadataWriter.java | 54 ++-- .../hudi/table/action/BaseActionExecutor.java | 4 +- .../upgrade/EightToSevenDowngradeHandler.java | 10 +- .../TestEightToSevenDowngradeHandler.java | 8 +- .../client/TestJavaHoodieBackedMetadata.java | 2 +- .../metadata/HoodieTableMetadataUtil.java | 294 +++++------------- .../DeleteMetadataTableProcedure.scala | 2 +- .../functional/TestHoodieBackedMetadata.java | 2 +- 9 files changed, 125 insertions(+), 256 deletions(-) diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java index 0fb59dcb38330..cb1559f03cc44 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java @@ -139,7 +139,7 @@ public String create( @ShellMethod(key = "metadata delete", value = "Remove the Metadata Table") public String delete(@ShellOption(value = "--backup", help = "Backup the metadata table before delete", defaultValue = "true", arity = 1) final boolean backup) throws Exception { HoodieTableMetaClient dataMetaClient = HoodieCLI.getTableMetaClient(); - String backupPath = HoodieTableMetadataUtil.deleteMetadataTable(dataMetaClient, new HoodieSparkEngineContext(jsc), backup); + String backupPath = HoodieTableMetadataUtil.deleteMetadataTable(dataMetaClient, backup); if (backup) { return "Metadata Table has been deleted and backed up to " + backupPath; } else { @@ -150,8 +150,7 @@ public String delete(@ShellOption(value = "--backup", help = "Backup the metadat @ShellMethod(key = "metadata delete-record-index", value = "Delete the record index from Metadata Table") public String deleteRecordIndex(@ShellOption(value = "--backup", help = "Backup the record index before delete", defaultValue = "true", arity = 1) final boolean backup) throws Exception { HoodieTableMetaClient dataMetaClient = HoodieCLI.getTableMetaClient(); - String backupPath = HoodieTableMetadataUtil.deleteMetadataTablePartition(dataMetaClient, new HoodieSparkEngineContext(jsc), - MetadataPartitionType.RECORD_INDEX.getPartitionPath(), backup); + String backupPath = HoodieTableMetadataUtil.deleteMetadataTablePartition(dataMetaClient, MetadataPartitionType.RECORD_INDEX.getPartitionPath(), backup); if (backup) { return "Record Index has been deleted from the Metadata Table and backed up to " + backupPath; } else { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index a81fc9d40f4bb..271d6f4d279a4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -99,6 +99,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; +import static java.util.Objects.requireNonNull; import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_POPULATE_META_FIELDS; import static org.apache.hudi.common.table.HoodieTableConfig.TIMELINE_HISTORY_PATH; import static org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED; @@ -309,7 +310,7 @@ private boolean metadataTableExists(HoodieTableMetaClient dataMetaClient) throws if (reInitialize) { metrics.ifPresent(m -> m.incrementMetric(HoodieMetadataMetrics.REBOOTSTRAP_STR, 1)); LOG.info("Deleting Metadata Table directory so that it can be re-initialized"); - HoodieTableMetadataUtil.deleteMetadataTable(dataMetaClient, engineContext, false); + HoodieTableMetadataUtil.deleteMetadataTable(dataMetaClient, false); exists = false; } @@ -550,7 +551,7 @@ private Pair, Pair>> initializeCo // during initialization, we need stats for base and log files. HoodieData records = HoodieTableMetadataUtil.convertFilesToColumnStatsRecords( - engineContext, Collections.emptyMap(), partitionToFilesMap, dataMetaClient, dataWriteConfig.getMetadataConfig(), + engineContext, Collections.emptyMap(), partitionToFilesMap, dataMetaClient, dataWriteConfig.getColumnStatsIndexParallelism(), dataWriteConfig.getMetadataConfig().getMaxReaderBufferSize(), columnsToIndex); @@ -591,6 +592,14 @@ protected abstract HoodieData getExpressionIndexRecords(List> initializeExpressionIndexPartition(String indexName, String instantTime) throws Exception { HoodieIndexDefinition indexDefinition = getIndexDefinition(indexName); ValidationUtils.checkState(indexDefinition != null, "Expression Index definition is not present for index " + indexName); + List>> partitionFilePathSizeTriplet = getPartitionFilePathSizeTriplet(); + int fileGroupCount = dataWriteConfig.getMetadataConfig().getExpressionIndexFileGroupCount(); + int parallelism = Math.min(partitionFilePathSizeTriplet.size(), dataWriteConfig.getMetadataConfig().getExpressionIndexParallelism()); + Schema readerSchema = getProjectedSchemaForExpressionIndex(indexDefinition, dataMetaClient); + return Pair.of(fileGroupCount, getExpressionIndexRecords(partitionFilePathSizeTriplet, indexDefinition, dataMetaClient, parallelism, readerSchema, storageConf, instantTime)); + } + + private List>> getPartitionFilePathSizeTriplet() throws IOException { List> partitionFileSlicePairs = getPartitionFileSlicePairs(); List>> partitionFilePathSizeTriplet = new ArrayList<>(); partitionFileSlicePairs.forEach(entry -> { @@ -598,18 +607,12 @@ private Pair> initializeExpressionIndexPartiti partitionFilePathSizeTriplet.add(Pair.of(entry.getKey(), Pair.of(entry.getValue().getBaseFile().get().getPath(), entry.getValue().getBaseFile().get().getFileLen()))); } entry.getValue().getLogFiles().forEach(hoodieLogFile -> { - if (entry.getValue().getLogFiles().count() > 0) { - entry.getValue().getLogFiles().forEach(logfile -> { - partitionFilePathSizeTriplet.add(Pair.of(entry.getKey(), Pair.of(logfile.getPath().toString(), logfile.getFileSize()))); - }); + if (entry.getValue().getLogFiles().findAny().isPresent()) { + entry.getValue().getLogFiles().forEach(logfile -> partitionFilePathSizeTriplet.add(Pair.of(entry.getKey(), Pair.of(logfile.getPath().toString(), logfile.getFileSize())))); } }); }); - - int fileGroupCount = dataWriteConfig.getMetadataConfig().getExpressionIndexFileGroupCount(); - int parallelism = Math.min(partitionFilePathSizeTriplet.size(), dataWriteConfig.getMetadataConfig().getExpressionIndexParallelism()); - Schema readerSchema = getProjectedSchemaForExpressionIndex(indexDefinition, dataMetaClient); - return Pair.of(fileGroupCount, getExpressionIndexRecords(partitionFilePathSizeTriplet, indexDefinition, dataMetaClient, parallelism, readerSchema, storageConf, instantTime)); + return partitionFilePathSizeTriplet; } HoodieIndexDefinition getIndexDefinition(String indexName) { @@ -661,7 +664,7 @@ private Pair> initializeRecordIndexPartition() // Collect the list of latest base files present in each partition List partitions = metadata.getAllPartitionPaths(); fsView.loadAllPartitions(); - HoodieData records = null; + HoodieData records; if (dataMetaClient.getTableConfig().getTableType() == HoodieTableType.COPY_ON_WRITE) { // for COW, we can only consider base files to initialize. final List> partitionBaseFilePairs = new ArrayList<>(); @@ -670,8 +673,7 @@ private Pair> initializeRecordIndexPartition() .map(basefile -> Pair.of(partition, basefile)).collect(Collectors.toList())); } - LOG.info("Initializing record index from " + partitionBaseFilePairs.size() + " base files in " - + partitions.size() + " partitions"); + LOG.info("Initializing record index from {} base files in {} partitions", partitionBaseFilePairs.size(), partitions.size()); // Collect record keys from the files in parallel records = readRecordKeysFromBaseFiles( @@ -684,13 +686,12 @@ private Pair> initializeRecordIndexPartition() this.getClass().getSimpleName()); } else { final List> partitionFileSlicePairs = new ArrayList<>(); - String latestCommit = dataMetaClient.getActiveTimeline().filterCompletedAndCompactionInstants().lastInstant().map(instant -> instant.requestedTime()).orElse(SOLO_COMMIT_TIMESTAMP); + String latestCommit = dataMetaClient.getActiveTimeline().filterCompletedAndCompactionInstants().lastInstant().map(HoodieInstant::requestedTime).orElse(SOLO_COMMIT_TIMESTAMP); for (String partition : partitions) { fsView.getLatestMergedFileSlicesBeforeOrOn(partition, latestCommit).forEach(fs -> partitionFileSlicePairs.add(Pair.of(partition, fs))); } - LOG.info("Initializing record index from " + partitionFileSlicePairs.size() + " file slices in " - + partitions.size() + " partitions"); + LOG.info("Initializing record index from {} file slices in {} partitions", partitionFileSlicePairs.size(), partitions.size()); records = readRecordKeysFromFileSliceSnapshot( engineContext, partitionFileSlicePairs, @@ -722,7 +723,7 @@ private Pair> initializeRecordIndexPartition() * @param metaClient metaclient instance to use. * @param dataWriteConfig write config to use. * @param hoodieTable hoodie table instance of interest. - * @return + * @return {@link HoodieData} of {@link HoodieRecord} containing updates for record_index. */ private static HoodieData readRecordKeysFromFileSliceSnapshot(HoodieEngineContext engineContext, List> partitionFileSlicePairs, @@ -752,7 +753,7 @@ private static HoodieData readRecordKeysFromFileSliceSnapshot(Hood Option.of(fileSlice)).getMergedRecords().stream().map(record -> { HoodieRecord record1 = (HoodieRecord) record; return HoodieMetadataPayload.createRecordIndexUpdate(record1.getRecordKey(), partition, fileId, - record1.getCurrentLocation().getInstantTime(), 0); + requireNonNull(record1.getCurrentLocation()).getInstantTime(), 0); }).iterator(); }); } @@ -845,9 +846,10 @@ private List listAllPartitionsFromFilesystem(String initializatio // List all directories in parallel engineContext.setJobStatus(this.getClass().getSimpleName(), "Listing " + numDirsToList + " partitions from filesystem"); List processedDirectories = engineContext.map(pathsToProcess, path -> { - HoodieStorage storage = new HoodieHadoopStorage(path, storageConf); - String relativeDirPath = FSUtils.getRelativePartitionPath(storageBasePath, path); - return new DirectoryInfo(relativeDirPath, storage.listDirectEntries(path), initializationTime, pendingDataInstants); + try (HoodieStorage storage = new HoodieHadoopStorage(path, storageConf)) { + String relativeDirPath = FSUtils.getRelativePartitionPath(storageBasePath, path); + return new DirectoryInfo(relativeDirPath, storage.listDirectEntries(path), initializationTime, pendingDataInstants); + } }, numDirsToList); // If the listing reveals a directory, add it to queue. If the listing reveals a hoodie partition, add it to @@ -910,7 +912,7 @@ private void initializeFileGroups(HoodieTableMetaClient dataMetaClient, Metadata HoodieStorage storage = metadataMetaClient.getStorage(); try { final List existingFiles = storage.listDirectEntries(partitionPath); - if (existingFiles.size() > 0) { + if (!existingFiles.isEmpty()) { LOG.warn("Deleting all existing files found in MDT partition {}", partitionName); storage.deleteDirectory(partitionPath); ValidationUtils.checkState(!storage.exists(partitionPath), @@ -1202,7 +1204,7 @@ public void update(HoodieRestoreMetadata restoreMetadata, String instantTime) { InstantGenerator datainstantGenerator = dataMetaClient.getInstantGenerator(); HoodieInstant restoreInstant = datainstantGenerator.createNewInstant(REQUESTED, HoodieTimeline.RESTORE_ACTION, instantTime); HoodieInstant requested = datainstantGenerator.getRestoreRequestedInstant(restoreInstant); - HoodieRestorePlan restorePlan = null; + HoodieRestorePlan restorePlan; try { restorePlan = TimelineMetadataUtils.deserializeAvroMetadata( dataMetaClient.getActiveTimeline().readRestoreInfoAsBytes(requested).get(), HoodieRestorePlan.class); @@ -1619,7 +1621,7 @@ private void fetchOutofSyncFilesRecordsFromMetadataTable(Map> partitionFilesToDelete, List partitionsToDelete) throws IOException { for (String partition : metadata.fetchAllPartitionPaths()) { - StoragePath partitionPath = null; + StoragePath partitionPath; if (StringUtils.isNullOrEmpty(partition) && !dataMetaClient.getTableConfig().isTablePartitioned()) { partitionPath = new StoragePath(dataWriteConfig.getBasePath()); } else { @@ -1630,7 +1632,7 @@ private void fetchOutofSyncFilesRecordsFromMetadataTable(Map 0) { + if (metadataFiles != null && !metadataFiles.isEmpty()) { partitionFilesToDelete.put(partitionId, metadataFiles.stream().map(f -> f.getPath().getName()).collect(Collectors.toList())); } } else { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java index 5940129c91d70..8c1024c6775dc 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java @@ -74,7 +74,7 @@ protected final void writeTableMetadata(HoodieCommitMetadata metadata, String ac // Recreate MDT for insert_overwrite_table operation. if (table.getConfig().isMetadataTableEnabled() && WriteOperationType.INSERT_OVERWRITE_TABLE == metadata.getOperationType()) { - HoodieTableMetadataUtil.deleteMetadataTable(table.getMetaClient(), table.getContext(), false); + HoodieTableMetadataUtil.deleteMetadataTable(table.getMetaClient(), false); } // MDT should be recreated if it has been deleted for insert_overwrite_table operation. @@ -157,7 +157,7 @@ protected final void dropIndexOnRestore() { for (String partitionPath : table.getMetaClient().getTableConfig().getMetadataPartitions()) { if (MetadataPartitionType.shouldDeletePartitionOnRestore(partitionPath)) { // setting backup to true as this delete is part of restore operation - HoodieTableMetadataUtil.deleteMetadataTablePartition(table.getMetaClient(), context, partitionPath, true); + HoodieTableMetadataUtil.deleteMetadataTablePartition(table.getMetaClient(), partitionPath, true); } } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToSevenDowngradeHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToSevenDowngradeHandler.java index 52bbc830238a4..54f1135b20349 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToSevenDowngradeHandler.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToSevenDowngradeHandler.java @@ -346,21 +346,17 @@ static void downgradeMetadataPartitions(HoodieEngineContext context, false); // Delete partitions. - List validPartitionPaths = deleteMetadataPartition(context, metaClient, metadataPartitions); + List validPartitionPaths = deleteMetadataPartition(metaClient, metadataPartitions); // Clean the configuration. tablePropsToAdd.put(TABLE_METADATA_PARTITIONS, String.join(",", validPartitionPaths)); } - static List deleteMetadataPartition(HoodieEngineContext context, - HoodieTableMetaClient metaClient, + static List deleteMetadataPartition(HoodieTableMetaClient metaClient, List metadataPartitions) { metadataPartitions.stream() .filter(metadataPath -> !SUPPORTED_METADATA_PARTITION_PATHS.contains(metadataPath)) - .forEach(metadataPath -> - HoodieTableMetadataUtil.deleteMetadataTablePartition( - metaClient, context, metadataPath, true) - ); + .forEach(metadataPath -> HoodieTableMetadataUtil.deleteMetadataTablePartition(metaClient, metadataPath, true)); return metadataPartitions.stream() .filter(SUPPORTED_METADATA_PARTITION_PATHS::contains) diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestEightToSevenDowngradeHandler.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestEightToSevenDowngradeHandler.java index 300b00a4880ff..289305e6c996d 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestEightToSevenDowngradeHandler.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestEightToSevenDowngradeHandler.java @@ -104,19 +104,19 @@ void setUp() { void testDeleteMetadataPartition() { try (MockedStatic mockedMetadataUtils = mockStatic(HoodieTableMetadataUtil.class)) { List leftPartitionPaths = - EightToSevenDowngradeHandler.deleteMetadataPartition(context, metaClient, SAMPLE_METADATA_PATHS); + EightToSevenDowngradeHandler.deleteMetadataPartition(metaClient, SAMPLE_METADATA_PATHS); mockedMetadataUtils.verify( () -> HoodieTableMetadataUtil.deleteMetadataTablePartition( - metaClient, context, "expr_index_random", true), + metaClient, "expr_index_random", true), times(1)); mockedMetadataUtils.verify( () -> HoodieTableMetadataUtil.deleteMetadataTablePartition( - metaClient, context, "secondary_index_random", true), + metaClient, "secondary_index_random", true), times(1)); mockedMetadataUtils.verify( () -> HoodieTableMetadataUtil.deleteMetadataTablePartition( - metaClient, context, "partition_stats", true), + metaClient, "partition_stats", true), times(1)); assertArrayEquals(new String[] {"files", "column_stats"}, leftPartitionPaths.toArray()); diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java index 699b5a2156709..9aa24fc6f83a9 100644 --- a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java +++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java @@ -2270,7 +2270,7 @@ public void testBootstrapWithTableNotFound() throws Exception { new StoragePath(getMetadataTableBasePath(writeConfig.getBasePath())); assertTrue(storage.exists(metadataTablePath), "metadata table should exist."); - deleteMetadataTable(metaClient, context, false); + deleteMetadataTable(metaClient, false); assertFalse(storage.exists(metadataTablePath), "metadata table should not exist after being deleted."); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index 4414ce2cde190..fcab8926aca17 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -138,9 +138,9 @@ import java.util.Set; import java.util.TreeMap; import java.util.UUID; -import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.Supplier; +import java.util.stream.BaseStream; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -194,8 +194,6 @@ public class HoodieTableMetadataUtil { public static final String PARTITION_NAME_SECONDARY_INDEX = "secondary_index"; public static final String PARTITION_NAME_SECONDARY_INDEX_PREFIX = "secondary_index_"; - private static final Set SUPPORTED_TYPES_PARTITION_STATS = new HashSet<>(Arrays.asList( - Schema.Type.INT, Schema.Type.LONG, Schema.Type.FLOAT, Schema.Type.DOUBLE, Schema.Type.STRING, Schema.Type.BOOLEAN, Schema.Type.NULL, Schema.Type.BYTES)); public static final Set SUPPORTED_META_FIELDS_PARTITION_STATS = new HashSet<>(Arrays.asList( HoodieRecord.HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName(), HoodieRecord.HoodieMetadataField.PARTITION_PATH_METADATA_FIELD.getFieldName(), @@ -291,7 +289,10 @@ class ColumnStats { String fieldName = fieldNameFieldPair.getKey(); Schema fieldSchema = fieldNameFieldPair.getValue().schema(); ColumnStats colStats = allColumnStats.get(fieldName); - HoodieColumnRangeMetadata hcrm = HoodieColumnRangeMetadata.create( + // NOTE: Size and compressed size statistics are set to 0 to make sure we're not + // mixing up those provided by Parquet with the ones from other encodings, + // since those are not directly comparable + return HoodieColumnRangeMetadata.create( filePath, fieldName, colStats == null ? null : coerceToComparable(fieldSchema, colStats.minValue), @@ -304,7 +305,6 @@ class ColumnStats { 0L, 0L ); - return hcrm; }); return hoodieColumnRangeMetadataStream.collect( Collectors.toMap(HoodieColumnRangeMetadata::getColumnName, Function.identity())); @@ -312,7 +312,7 @@ class ColumnStats { public static Option getColumnStatsValueAsString(Object statsValue) { if (statsValue == null) { - LOG.info("Invalid column stats value: {}", statsValue); + LOG.info("Null column stats value"); return Option.empty(); } Class statsValueClass = statsValue.getClass(); @@ -334,7 +334,7 @@ public static Option getColumnStatsValueAsString(Object statsValue) { public static void deleteMetadataTable(String basePath, HoodieEngineContext context) { HoodieTableMetaClient dataMetaClient = HoodieTableMetaClient.builder() .setBasePath(basePath).setConf(context.getStorageConf().newInstance()).build(); - deleteMetadataTable(dataMetaClient, context, false); + deleteMetadataTable(dataMetaClient, false); } /** @@ -347,7 +347,7 @@ public static void deleteMetadataTable(String basePath, HoodieEngineContext cont public static void deleteMetadataPartition(StoragePath basePath, HoodieEngineContext context, String partitionPath) { HoodieTableMetaClient dataMetaClient = HoodieTableMetaClient.builder() .setBasePath(basePath).setConf(context.getStorageConf().newInstance()).build(); - deleteMetadataTablePartition(dataMetaClient, context, partitionPath, false); + deleteMetadataTablePartition(dataMetaClient, partitionPath, false); } /** @@ -378,7 +378,7 @@ public static boolean metadataPartitionExists(StoragePath basePath, HoodieEngine * @param commitMetadata - Commit action metadata * @param instantTime - Action instant time * @param dataMetaClient - HoodieTableMetaClient for data - * @param tableMetadata + * @param tableMetadata - {@link HoodieTableMetadata} instance for reading metadata table * @param metadataConfig - HoodieMetadataConfig * @param enabledPartitionTypes - Set of enabled MDT partitions to update * @param bloomFilterType - Type of generated bloom filter records @@ -654,8 +654,8 @@ private static void convertMetadataToExpressionIndexRecords(HoodieEngineContext /** * Finds all files that were deleted as part of a clean and creates metadata table records for them. * - * @param cleanMetadata - * @param instantTime + * @param cleanMetadata - Clean action metadata + * @param instantTime - Clean action instant time * @return a list of metadata table records */ public static List convertMetadataToFilesPartitionRecords(HoodieCleanMetadata cleanMetadata, @@ -818,8 +818,7 @@ public static HoodieData convertMetadataToRecordIndexRecords(Hoodi String basePath = dataTableMetaClient.getBasePath().toString(); HoodieFileFormat baseFileFormat = dataTableMetaClient.getTableConfig().getBaseFileFormat(); StorageConfiguration storageConfiguration = dataTableMetaClient.getStorageConf(); - Option writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient); - Option finalWriterSchemaOpt = writerSchemaOpt; + Option finalWriterSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient); HoodieData recordIndexRecords = engineContext.parallelize(new ArrayList<>(writeStatsByFileId.entrySet()), parallelism) .flatMap(writeStatsByFileIdEntry -> { String fileId = writeStatsByFileIdEntry.getKey(); @@ -992,7 +991,7 @@ private static Map getLogRecords(List logFilePaths Collections.emptyList(), datasetMetaClient.getTableConfig().getRecordMergeStrategyId()); - HoodieMergedLogRecordScanner mergedLogRecordScanner = HoodieMergedLogRecordScanner.newBuilder() + try (HoodieMergedLogRecordScanner mergedLogRecordScanner = HoodieMergedLogRecordScanner.newBuilder() .withStorage(datasetMetaClient.getStorage()) .withBasePath(datasetMetaClient.getBasePath()) .withLogFilePaths(logFilePaths) @@ -1007,8 +1006,9 @@ private static Map getLogRecords(List logFilePaths .withBitCaskDiskMapCompressionEnabled(storageConf.getBoolean(DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())) .withRecordMerger(recordMerger) .withTableMetaClient(datasetMetaClient) - .build(); - return mergedLogRecordScanner.getRecords(); + .build()) { + return mergedLogRecordScanner.getRecords(); + } } return Collections.emptyMap(); } @@ -1032,9 +1032,10 @@ public static Pair, Set> computeRevivedAndDeletedKeys(Set reduceByKeys(HoodieData recordIndexRecords, int parallelism) { @@ -1047,7 +1048,7 @@ public static HoodieData reduceByKeys(HoodieData rec return record2; } else if (!isRecord1Deleted && isRecord2Deleted) { return record1; - } else if (isRecord1Deleted && isRecord2Deleted) { + } else if (isRecord1Deleted) { // let's delete just 1 of them return record1; } else { @@ -1086,128 +1087,6 @@ public static Set getRecordKeys(List logFilePaths, HoodieTableMe return Collections.emptySet(); } - private static void reAddLogFilesFromRollbackPlan(HoodieTableMetaClient dataTableMetaClient, String instantTime, - Map> partitionToFilesMap) { - InstantGenerator factory = dataTableMetaClient.getInstantGenerator(); - HoodieInstant rollbackInstant = factory.createNewInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.ROLLBACK_ACTION, instantTime); - HoodieInstant requested = factory.getRollbackRequestedInstant(rollbackInstant); - try { - HoodieRollbackPlan rollbackPlan = TimelineMetadataUtils.deserializeAvroMetadata( - dataTableMetaClient.getActiveTimeline().readRollbackInfoAsBytes(requested).get(), HoodieRollbackPlan.class); - - rollbackPlan.getRollbackRequests().forEach(rollbackRequest -> { - final String partitionId = getPartitionIdentifierForFilesPartition(rollbackRequest.getPartitionPath()); - partitionToFilesMap.computeIfAbsent(partitionId, s -> new HashMap<>()); - // fetch only log files that are expected to be RB'd in DT as part of this rollback. these log files will not be deleted, but rendered - // invalid once rollback is complete. - if (!rollbackRequest.getLogBlocksToBeDeleted().isEmpty()) { - Map logFiles = new HashMap<>(); - rollbackRequest.getLogBlocksToBeDeleted().forEach((k,v) -> { - String fileName = k.substring(k.lastIndexOf("/") + 1); - // rollback plan may not have size for log files to be rolled back. but while merging w/ original commits, the size will get adjusted. - logFiles.put(fileName, 1L); - }); - partitionToFilesMap.get(partitionId).putAll(logFiles); - } - }); - } catch (IOException e) { - throw new HoodieMetadataException("Parsing rollback plan for " + rollbackInstant + " failed "); - } - } - - /** - * Convert rollback action metadata to files partition records. - * Consider only new log files added. - */ - private static List convertMetadataToRollbackRecords(HoodieRollbackMetadata rollbackMetadata, - String instantTime, - HoodieTableMetaClient dataTableMetaClient) { - Map> partitionToAppendedFiles = new HashMap<>(); - processRollbackMetadata(rollbackMetadata, partitionToAppendedFiles); - reAddLogFilesFromRollbackPlan(dataTableMetaClient, instantTime, partitionToAppendedFiles); - return convertFilesToFilesPartitionRecords(Collections.emptyMap(), partitionToAppendedFiles, instantTime, "Rollback"); - } - - /** - * Extracts information about the deleted and append files from the {@code HoodieRollbackMetadata}. - *

- * During a rollback files may be deleted (COW, MOR) or rollback blocks be appended (MOR only) to files. This - * function will extract this change file for each partition. - * - * @param rollbackMetadata {@code HoodieRollbackMetadata} - * @param partitionToAppendedFiles The {@code Map} to fill with files appended per partition and their sizes. - */ - private static void processRollbackMetadata(HoodieRollbackMetadata rollbackMetadata, - Map> partitionToAppendedFiles) { - rollbackMetadata.getPartitionMetadata().values().forEach(pm -> { - // Has this rollback produced new files? - boolean hasRollbackLogFiles = pm.getRollbackLogFiles() != null && !pm.getRollbackLogFiles().isEmpty(); - final String partition = pm.getPartitionPath(); - final String partitionId = getPartitionIdentifierForFilesPartition(partition); - - BiFunction fileMergeFn = (oldSize, newSizeCopy) -> { - // if a file exists in both written log files and rollback log files, we want to pick the one that is higher - // as rollback file could have been updated after written log files are computed. - return oldSize > newSizeCopy ? oldSize : newSizeCopy; - }; - - if (hasRollbackLogFiles) { - if (!partitionToAppendedFiles.containsKey(partitionId)) { - partitionToAppendedFiles.put(partitionId, new HashMap<>()); - } - - // Extract appended file name from the absolute paths saved in getAppendFiles() - pm.getRollbackLogFiles().forEach((path, size) -> { - String fileName = new StoragePath(path).getName(); - partitionToAppendedFiles.get(partitionId).merge(fileName, size, fileMergeFn); - }); - } - }); - } - - /** - * Convert rollback action metadata to files partition records. - */ - protected static List convertFilesToFilesPartitionRecords(Map> partitionToDeletedFiles, - Map> partitionToAppendedFiles, - String instantTime, String operation) { - List records = new ArrayList<>(partitionToDeletedFiles.size() + partitionToAppendedFiles.size()); - int[] fileChangeCount = {0, 0}; // deletes, appends - - partitionToDeletedFiles.forEach((partitionName, deletedFiles) -> { - fileChangeCount[0] += deletedFiles.size(); - - Map filesAdded = Collections.emptyMap(); - if (partitionToAppendedFiles.containsKey(partitionName)) { - filesAdded = partitionToAppendedFiles.remove(partitionName); - } - - HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partitionName, filesAdded, - deletedFiles); - records.add(record); - }); - - partitionToAppendedFiles.forEach((partitionName, appendedFileMap) -> { - final String partition = getPartitionIdentifierForFilesPartition(partitionName); - fileChangeCount[1] += appendedFileMap.size(); - - // Validate that no appended file has been deleted - checkState( - !appendedFileMap.keySet().removeAll(partitionToDeletedFiles.getOrDefault(partition, Collections.emptyList())), - "Rollback file cannot both be appended and deleted"); - - // New files added to a partition - HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, appendedFileMap, - Collections.emptyList()); - records.add(record); - }); - - LOG.info("Found at {} from {}. #partitions_updated={}, #files_deleted={}, #files_appended={}", - instantTime, operation, records.size(), fileChangeCount[0], fileChangeCount[1]); - - return records; - } - public static String getColumnStatsIndexPartitionIdentifier(String partitionName) { return getPartitionIdentifier(partitionName); } @@ -1278,7 +1157,6 @@ public static HoodieData convertFilesToColumnStatsRecords(HoodieEn Map> partitionToDeletedFiles, Map> partitionToAppendedFiles, HoodieTableMetaClient dataMetaClient, - HoodieMetadataConfig metadataConfig, int columnStatsIndexParallelism, int maxReaderBufferSize, List columnsToIndex) { @@ -1434,7 +1312,7 @@ private static List getPartitionFileSlices(HoodieTableMetaClient meta return fileSliceStream.sorted(Comparator.comparing(FileSlice::getFileId)).collect(Collectors.toList()); } finally { if (!fileSystemView.isPresent()) { - fsView.close(); + Objects.requireNonNull(fsView).close(); } } } @@ -1445,7 +1323,7 @@ private static List getPartitionFileSlices(HoodieTableMetaClient meta * @param metaClient - instance of {@link HoodieTableMetaClient} * @param fileSystemView - hoodie table file system view, which will be fetched from meta client if not already present * @param partition - name of the partition whose file groups are to be loaded - * @return + * @return list of latest file slices, including inflight, for all file groups in a given partition */ public static List getPartitionLatestFileSlicesIncludingInflight(HoodieTableMetaClient metaClient, Option fileSystemView, @@ -1565,7 +1443,7 @@ public static Map getColumnsToIndex(HoodieTableConfig tableConfi * @param tableSchemaLazyOpt lazy option of the table schema * @param isTableInitializing true if table is being initialized. * @param recordType Option of record type. Used to determine which types are valid to index - * @return list of columns that should be indexed + * @return map of column names with schema that should be indexed */ private static Map getColumnsToIndexWithoutRequiredMetaFields(HoodieMetadataConfig metadataConfig, Lazy> tableSchemaLazyOpt, @@ -1576,7 +1454,7 @@ private static Map getColumnsToIndexWithoutRequiredMetaFields(Ho // if explicitly overriden if (isTableInitializing) { Map toReturn = new LinkedHashMap<>(); - columnsToIndex.stream().forEach(colName -> toReturn.put(colName, null)); + columnsToIndex.forEach(colName -> toReturn.put(colName, null)); return toReturn; } ValidationUtils.checkArgument(tableSchemaLazyOpt.get().isPresent(), "Table schema not found for the table while computing col stats"); @@ -1585,9 +1463,8 @@ private static Map getColumnsToIndexWithoutRequiredMetaFields(Ho Map colsToIndexSchemaMap = new LinkedHashMap<>(); columnsToIndex.stream().filter(fieldName -> !META_COL_SET_TO_INDEX.contains(fieldName)) .map(colName -> Pair.of(colName, HoodieAvroUtils.getSchemaForField(tableSchema.get(), colName).getRight().schema())) - .filter(fieldNameSchemaPair -> { - return isColumnTypeSupported(fieldNameSchemaPair.getValue(), recordType); - }).forEach(entry -> colsToIndexSchemaMap.put(entry.getKey(), entry.getValue())); + .filter(fieldNameSchemaPair -> isColumnTypeSupported(fieldNameSchemaPair.getValue(), recordType)) + .forEach(entry -> colsToIndexSchemaMap.put(entry.getKey(), entry.getValue())); return colsToIndexSchemaMap; } // if not overridden @@ -1761,11 +1638,11 @@ public static Option tryResolveSchemaForTable(HoodieTableMetaClient data } /** - * Given a schema, coerces provided value to instance of {@link Comparable} such that + * Given a schema, coerces provided value to instance of {@link Comparable} such that * it could subsequently be used in column stats - * + *

* NOTE: This method has to stay compatible with the semantic of - * {@link FileFormatUtils#readColumnStatsFromMetadata} as they are used in tandem + * {@link FileFormatUtils#readColumnStatsFromMetadata} as they are used in tandem */ public static Comparable coerceToComparable(Schema schema, Object val) { if (val == null) { @@ -1876,7 +1753,7 @@ private static Float castToFloat(Object val) { } else if (val instanceof Long) { return ((Long) val).floatValue(); } else if (val instanceof Float) { - return ((Float)val).floatValue(); + return (Float) val; } else if (val instanceof Double) { return ((Double)val).floatValue(); } else if (val instanceof Boolean) { @@ -1898,7 +1775,7 @@ private static Double castToDouble(Object val) { } else if (val instanceof Float) { return ((Float)val).doubleValue(); } else if (val instanceof Double) { - return ((Double)val).doubleValue(); + return (Double) val; } else if (val instanceof Boolean) { return (Boolean) val ? 1.0d : 0.0d; } else { @@ -2044,12 +1921,11 @@ private static List getRollbackedCommits(HoodieInstant instant, HoodieAc * Delete the metadata table for the dataset and backup if required. * * @param dataMetaClient {@code HoodieTableMetaClient} of the dataset for which metadata table is to be deleted - * @param context instance of {@link HoodieEngineContext}. * @param backup Whether metadata table should be backed up before deletion. If true, the table is backed up to the * directory with name metadata_. * @return The backup directory if backup was requested */ - public static String deleteMetadataTable(HoodieTableMetaClient dataMetaClient, HoodieEngineContext context, boolean backup) { + public static String deleteMetadataTable(HoodieTableMetaClient dataMetaClient, boolean backup) { final StoragePath metadataTablePath = HoodieTableMetadata.getMetadataTableBasePath(dataMetaClient.getBasePath()); HoodieStorage storage = dataMetaClient.getStorage(); @@ -2094,16 +1970,15 @@ public static String deleteMetadataTable(HoodieTableMetaClient dataMetaClient, H * This can be used to delete a partition so that it can be re-bootstrapped. * * @param dataMetaClient {@code HoodieTableMetaClient} of the dataset for which metadata table is to be deleted - * @param context instance of {@code HoodieEngineContext}. * @param backup Whether metadata table should be backed up before deletion. If true, the table is backed up to the * directory with name metadata_. * @param partitionPath The partition to delete * @return The backup directory if backup was requested, null otherwise */ - public static String deleteMetadataTablePartition(HoodieTableMetaClient dataMetaClient, HoodieEngineContext context, + public static String deleteMetadataTablePartition(HoodieTableMetaClient dataMetaClient, String partitionPath, boolean backup) { if (partitionPath.equals(MetadataPartitionType.FILES.getPartitionPath())) { - return deleteMetadataTable(dataMetaClient, context, backup); + return deleteMetadataTable(dataMetaClient, backup); } final StoragePath metadataTablePartitionPath = new StoragePath(HoodieTableMetadata.getMetadataTableBasePath(dataMetaClient.getBasePath()), partitionPath); @@ -2131,7 +2006,7 @@ public static String deleteMetadataTablePartition(HoodieTableMetaClient dataMeta } } catch (Exception e) { // If rename fails, we will try to delete the table instead - LOG.error(String.format("Failed to backup MDT partition %s using rename", partitionPath), e); + LOG.error("Failed to backup MDT partition {} using rename", partitionPath, e); } } else { LOG.info("Deleting metadata table partition from {}", metadataTablePartitionPath); @@ -2298,7 +2173,7 @@ public static HoodieRecordGlobalLocation getLocationFromRecordIndexInfo(HoodieRe public static HoodieRecordGlobalLocation getLocationFromRecordIndexInfo( String partition, int fileIdEncoding, long fileIdHighBits, long fileIdLowBits, int fileIndex, String originalFileId, Long instantTime) { - String fileId = null; + String fileId; if (fileIdEncoding == 0) { // encoding 0 refers to UUID based fileID final UUID uuid = new UUID(fileIdHighBits, fileIdLowBits); @@ -2435,8 +2310,8 @@ private static Stream collectAndProcessColumnMetadata(String parti fileColumnMetadata.collect(Collectors.groupingBy(HoodieColumnRangeMetadata::getColumnName, Collectors.toList())); // Aggregate Column Ranges - Stream> partitionStatsRangeMetadata = columnMetadataMap.entrySet().stream() - .map(entry -> FileFormatUtils.getColumnRangeInPartition(partitionPath, entry.getValue(), colsToIndexSchemaMap)); + Stream> partitionStatsRangeMetadata = columnMetadataMap.values().stream() + .map(hoodieColumnRangeMetadata -> FileFormatUtils.getColumnRangeInPartition(partitionPath, hoodieColumnRangeMetadata, colsToIndexSchemaMap)); // Create Partition Stats Records return HoodieMetadataPayload.createPartitionStatsRecords(partitionPath, partitionStatsRangeMetadata.collect(Collectors.toList()), false, isTightBound, indexPartitionOpt); @@ -2537,7 +2412,7 @@ private static HoodieData convertMetadataToPartitionStatsRecords(H final String partitionName = pair.getLeft(); return collectAndProcessColumnMetadata(pair.getRight(), partitionName, isShouldScanColStatsForTightBound(dataMetaClient), Option.empty(), colsToIndexSchemaMap); }) - .flatMap(recordStream -> recordStream.iterator()); + .flatMap(BaseStream::iterator); } catch (Exception e) { throw new HoodieException("Failed to generate column stats records for metadata table", e); } @@ -2584,7 +2459,7 @@ public static HoodieData convertMetadataToPartitionStatRecords(Hoo final String partitionName = partitionedWriteStat.get(0).getPartitionPath(); // Step 1: Collect Column Metadata for Each File part of current commit metadata List> fileColumnMetadata = partitionedWriteStat.stream() - .flatMap(writeStat -> translateWriteStatToFileStats(writeStat, dataMetaClient, colsToIndex, tableSchema).stream()).collect(toList()); + .flatMap(writeStat -> translateWriteStatToFileStats(writeStat, dataMetaClient, colsToIndex).stream()).collect(toList()); if (shouldScanColStatsForTightBound) { checkState(tableMetadata != null, "tableMetadata should not be null when scanning metadata table"); @@ -2600,9 +2475,9 @@ public static HoodieData convertMetadataToPartitionStatRecords(Hoo List> partitionColumnMetadata = tableMetadata .getRecordsByKeyPrefixes(generateKeyPrefixes(colsToIndex, partitionName), MetadataPartitionType.COLUMN_STATS.getPartitionPath(), false) // schema and properties are ignored in getInsertValue, so simply pass as null - .map(record -> ((HoodieMetadataPayload)record.getData()).getColumnStatMetadata()) + .map(record -> record.getData().getColumnStatMetadata()) .filter(Option::isPresent) - .map(colStatsOpt -> colStatsOpt.get()) + .map(Option::get) .filter(stats -> fileNames.contains(stats.getFileName())) .map(HoodieColumnRangeMetadata::fromColumnStats).collectAsList(); if (!partitionColumnMetadata.isEmpty()) { @@ -2652,8 +2527,7 @@ public static List generateKeyPrefixes(List columnsToIndex, Stri private static List> translateWriteStatToFileStats(HoodieWriteStat writeStat, HoodieTableMetaClient datasetMetaClient, - List columnsToIndex, - Option writerSchemaOpt) { + List columnsToIndex) { if (writeStat instanceof HoodieDeltaWriteStat && ((HoodieDeltaWriteStat) writeStat).getColumnStats().isPresent()) { Map> columnRangeMap = ((HoodieDeltaWriteStat) writeStat).getColumnStats().get(); return columnRangeMap.values().stream().collect(Collectors.toList()); @@ -2735,49 +2609,47 @@ public static Map combineFileSystemMetadata(Hood if (newer.filesystemMetadata != null) { validatePayload(newer.type, newer.filesystemMetadata); - newer.filesystemMetadata.forEach((key, fileInfo) -> { - combinedFileInfo.merge(key, fileInfo, - // Combine previous record w/ the new one, new records taking precedence over - // the old one + newer.filesystemMetadata.forEach((key, fileInfo) -> combinedFileInfo.merge(key, fileInfo, + // Combine previous record w/ the new one, new records taking precedence over + // the old one + // + // NOTE: That if previous listing contains the file that is being deleted by the tombstone + // record (`IsDeleted` = true) in the new one, we simply delete the file from the resulting + // listing as well as drop the tombstone itself. + // However, if file is not present in the previous record we have to persist tombstone + // record in the listing to make sure we carry forward information that this file + // was deleted. This special case could occur since the merging flow is 2-stage: + // - First we merge records from all of the delta log-files + // - Then we merge records from base-files with the delta ones (coming as a result + // of the previous step) + (oldFileInfo, newFileInfo) -> { + // NOTE: We can’t assume that MT update records will be ordered the same way as actual + // FS operations (since they are not atomic), therefore MT record merging should be a + // _commutative_ & _associative_ operation (ie one that would work even in case records + // will get re-ordered), which is + // - Possible for file-sizes (since file-sizes will ever grow, we can simply + // take max of the old and new records) + // - Not possible for is-deleted flags* // - // NOTE: That if previous listing contains the file that is being deleted by the tombstone - // record (`IsDeleted` = true) in the new one, we simply delete the file from the resulting - // listing as well as drop the tombstone itself. - // However, if file is not present in the previous record we have to persist tombstone - // record in the listing to make sure we carry forward information that this file - // was deleted. This special case could occur since the merging flow is 2-stage: - // - First we merge records from all of the delta log-files - // - Then we merge records from base-files with the delta ones (coming as a result - // of the previous step) - (oldFileInfo, newFileInfo) -> { - // NOTE: We can’t assume that MT update records will be ordered the same way as actual - // FS operations (since they are not atomic), therefore MT record merging should be a - // _commutative_ & _associative_ operation (ie one that would work even in case records - // will get re-ordered), which is - // - Possible for file-sizes (since file-sizes will ever grow, we can simply - // take max of the old and new records) - // - Not possible for is-deleted flags* - // - // *However, we’re assuming that the case of concurrent write and deletion of the same - // file is _impossible_ -- it would only be possible with concurrent upsert and - // rollback operation (affecting the same log-file), which is implausible, b/c either - // of the following have to be true: - // - We’re appending to failed log-file (then the other writer is trying to - // rollback it concurrently, before it’s own write) - // - Rollback (of completed instant) is running concurrently with append (meaning - // that restore is running concurrently with a write, which is also nut supported - // currently) - if (newFileInfo.getIsDeleted()) { - if (oldFileInfo.getIsDeleted()) { - LOG.warn("A file is repeatedly deleted in the files partition of the metadata table: {}", key); - return newFileInfo; - } - return null; + // *However, we’re assuming that the case of concurrent write and deletion of the same + // file is _impossible_ -- it would only be possible with concurrent upsert and + // rollback operation (affecting the same log-file), which is implausible, b/c either + // of the following have to be true: + // - We’re appending to failed log-file (then the other writer is trying to + // rollback it concurrently, before it’s own write) + // - Rollback (of completed instant) is running concurrently with append (meaning + // that restore is running concurrently with a write, which is also nut supported + // currently) + if (newFileInfo.getIsDeleted()) { + if (oldFileInfo.getIsDeleted()) { + LOG.warn("A file is repeatedly deleted in the files partition of the metadata table: {}", key); + return newFileInfo; } - return new HoodieMetadataFileInfo( - Math.max(newFileInfo.getSize(), oldFileInfo.getSize()), false); - }); - }); + return null; + } + return new HoodieMetadataFileInfo( + Math.max(newFileInfo.getSize(), oldFileInfo.getSize()), false); + })); } return combinedFileInfo; } @@ -2909,7 +2781,7 @@ public static class DirectoryInfo implements Serializable { // List of directories within this partition private final List subDirectories = new ArrayList<>(); // Is this a hoodie partition - private boolean isHoodiePartition = false; + private final boolean isHoodiePartition; public DirectoryInfo(String relativePath, List pathInfos, String maxInstantTime, Set pendingDataInstants) { this(relativePath, pathInfos, maxInstantTime, pendingDataInstants, true); diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMetadataTableProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMetadataTableProcedure.scala index 690570562924c..a6a1a8f0c30cc 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMetadataTableProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMetadataTableProcedure.scala @@ -51,7 +51,7 @@ class DeleteMetadataTableProcedure extends BaseProcedure with ProcedureBuilder w val metaClient = createMetaClient(jsc, basePath) try { - val metadataTableBasePath = deleteMetadataTable(metaClient, new HoodieSparkEngineContext(jsc), false) + val metadataTableBasePath = deleteMetadataTable(metaClient, false) metadataPaths = s"$metadataPaths,$metadataTableBasePath" Seq(Row(s"Deleted Metadata Table at '$metadataTableBasePath'")) } catch { diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index d6f6812b08719..42cf905ee15c0 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -2901,7 +2901,7 @@ public void testBootstrapWithTableNotFound() throws Exception { getMetadataTableBasePath(writeConfig.getBasePath())); assertTrue(storage.exists(metadataTablePath), "metadata table should exist."); - deleteMetadataTable(metaClient, context, false); + deleteMetadataTable(metaClient, false); assertFalse(storage.exists(metadataTablePath), "metadata table should not exist after being deleted."); From 7bb226b82766fdee2270b4ee14685b0c5dc918c5 Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Fri, 14 Feb 2025 17:58:47 +0530 Subject: [PATCH 4/4] fix mdt instantiation failure --- .../hudi/metadata/HoodieBackedTableMetadataWriter.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 271d6f4d279a4..778f8d0cf3217 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -846,10 +846,9 @@ private List listAllPartitionsFromFilesystem(String initializatio // List all directories in parallel engineContext.setJobStatus(this.getClass().getSimpleName(), "Listing " + numDirsToList + " partitions from filesystem"); List processedDirectories = engineContext.map(pathsToProcess, path -> { - try (HoodieStorage storage = new HoodieHadoopStorage(path, storageConf)) { - String relativeDirPath = FSUtils.getRelativePartitionPath(storageBasePath, path); - return new DirectoryInfo(relativeDirPath, storage.listDirectEntries(path), initializationTime, pendingDataInstants); - } + HoodieStorage storage = new HoodieHadoopStorage(path, storageConf); + String relativeDirPath = FSUtils.getRelativePartitionPath(storageBasePath, path); + return new DirectoryInfo(relativeDirPath, storage.listDirectEntries(path), initializationTime, pendingDataInstants); }, numDirsToList); // If the listing reveals a directory, add it to queue. If the listing reveals a hoodie partition, add it to