9999import java .util .stream .Collectors ;
100100import java .util .stream .IntStream ;
101101
102+ import static java .util .Objects .requireNonNull ;
102103import static org .apache .hudi .common .config .HoodieMetadataConfig .DEFAULT_METADATA_POPULATE_META_FIELDS ;
103104import static org .apache .hudi .common .table .HoodieTableConfig .TIMELINE_HISTORY_PATH ;
104105import static org .apache .hudi .common .table .timeline .HoodieInstant .State .REQUESTED ;
@@ -309,7 +310,7 @@ private boolean metadataTableExists(HoodieTableMetaClient dataMetaClient) throws
309310 if (reInitialize ) {
310311 metrics .ifPresent (m -> m .incrementMetric (HoodieMetadataMetrics .REBOOTSTRAP_STR , 1 ));
311312 LOG .info ("Deleting Metadata Table directory so that it can be re-initialized" );
312- HoodieTableMetadataUtil .deleteMetadataTable (dataMetaClient , engineContext , false );
313+ HoodieTableMetadataUtil .deleteMetadataTable (dataMetaClient , false );
313314 exists = false ;
314315 }
315316
@@ -550,7 +551,7 @@ private Pair<List<String>, Pair<Integer, HoodieData<HoodieRecord>>> initializeCo
550551
551552 // during initialization, we need stats for base and log files.
552553 HoodieData <HoodieRecord > records = HoodieTableMetadataUtil .convertFilesToColumnStatsRecords (
553- engineContext , Collections .emptyMap (), partitionToFilesMap , dataMetaClient , dataWriteConfig . getMetadataConfig (),
554+ engineContext , Collections .emptyMap (), partitionToFilesMap , dataMetaClient ,
554555 dataWriteConfig .getColumnStatsIndexParallelism (),
555556 dataWriteConfig .getMetadataConfig ().getMaxReaderBufferSize (),
556557 columnsToIndex );
@@ -591,25 +592,27 @@ protected abstract HoodieData<HoodieRecord> getExpressionIndexRecords(List<Pair<
591592 private Pair <Integer , HoodieData <HoodieRecord >> initializeExpressionIndexPartition (String indexName , String instantTime ) throws Exception {
592593 HoodieIndexDefinition indexDefinition = getIndexDefinition (indexName );
593594 ValidationUtils .checkState (indexDefinition != null , "Expression Index definition is not present for index " + indexName );
595+ List <Pair <String , Pair <String , Long >>> partitionFilePathSizeTriplet = getPartitionFilePathSizeTriplet ();
596+ int fileGroupCount = dataWriteConfig .getMetadataConfig ().getExpressionIndexFileGroupCount ();
597+ int parallelism = Math .min (partitionFilePathSizeTriplet .size (), dataWriteConfig .getMetadataConfig ().getExpressionIndexParallelism ());
598+ Schema readerSchema = getProjectedSchemaForExpressionIndex (indexDefinition , dataMetaClient );
599+ return Pair .of (fileGroupCount , getExpressionIndexRecords (partitionFilePathSizeTriplet , indexDefinition , dataMetaClient , parallelism , readerSchema , storageConf , instantTime ));
600+ }
601+
602+ private List <Pair <String , Pair <String , Long >>> getPartitionFilePathSizeTriplet () throws IOException {
594603 List <Pair <String , FileSlice >> partitionFileSlicePairs = getPartitionFileSlicePairs ();
595604 List <Pair <String , Pair <String , Long >>> partitionFilePathSizeTriplet = new ArrayList <>();
596605 partitionFileSlicePairs .forEach (entry -> {
597606 if (entry .getValue ().getBaseFile ().isPresent ()) {
598607 partitionFilePathSizeTriplet .add (Pair .of (entry .getKey (), Pair .of (entry .getValue ().getBaseFile ().get ().getPath (), entry .getValue ().getBaseFile ().get ().getFileLen ())));
599608 }
600609 entry .getValue ().getLogFiles ().forEach (hoodieLogFile -> {
601- if (entry .getValue ().getLogFiles ().count () > 0 ) {
602- entry .getValue ().getLogFiles ().forEach (logfile -> {
603- partitionFilePathSizeTriplet .add (Pair .of (entry .getKey (), Pair .of (logfile .getPath ().toString (), logfile .getFileSize ())));
604- });
610+ if (entry .getValue ().getLogFiles ().findAny ().isPresent ()) {
611+ entry .getValue ().getLogFiles ().forEach (logfile -> partitionFilePathSizeTriplet .add (Pair .of (entry .getKey (), Pair .of (logfile .getPath ().toString (), logfile .getFileSize ()))));
605612 }
606613 });
607614 });
608-
609- int fileGroupCount = dataWriteConfig .getMetadataConfig ().getExpressionIndexFileGroupCount ();
610- int parallelism = Math .min (partitionFilePathSizeTriplet .size (), dataWriteConfig .getMetadataConfig ().getExpressionIndexParallelism ());
611- Schema readerSchema = getProjectedSchemaForExpressionIndex (indexDefinition , dataMetaClient );
612- return Pair .of (fileGroupCount , getExpressionIndexRecords (partitionFilePathSizeTriplet , indexDefinition , dataMetaClient , parallelism , readerSchema , storageConf , instantTime ));
615+ return partitionFilePathSizeTriplet ;
613616 }
614617
615618 HoodieIndexDefinition getIndexDefinition (String indexName ) {
@@ -661,7 +664,7 @@ private Pair<Integer, HoodieData<HoodieRecord>> initializeRecordIndexPartition()
661664 // Collect the list of latest base files present in each partition
662665 List <String > partitions = metadata .getAllPartitionPaths ();
663666 fsView .loadAllPartitions ();
664- HoodieData <HoodieRecord > records = null ;
667+ HoodieData <HoodieRecord > records ;
665668 if (dataMetaClient .getTableConfig ().getTableType () == HoodieTableType .COPY_ON_WRITE ) {
666669 // for COW, we can only consider base files to initialize.
667670 final List <Pair <String , HoodieBaseFile >> partitionBaseFilePairs = new ArrayList <>();
@@ -670,8 +673,7 @@ private Pair<Integer, HoodieData<HoodieRecord>> initializeRecordIndexPartition()
670673 .map (basefile -> Pair .of (partition , basefile )).collect (Collectors .toList ()));
671674 }
672675
673- LOG .info ("Initializing record index from " + partitionBaseFilePairs .size () + " base files in "
674- + partitions .size () + " partitions" );
676+ LOG .info ("Initializing record index from {} base files in {} partitions" , partitionBaseFilePairs .size (), partitions .size ());
675677
676678 // Collect record keys from the files in parallel
677679 records = readRecordKeysFromBaseFiles (
@@ -684,13 +686,12 @@ private Pair<Integer, HoodieData<HoodieRecord>> initializeRecordIndexPartition()
684686 this .getClass ().getSimpleName ());
685687 } else {
686688 final List <Pair <String , FileSlice >> partitionFileSlicePairs = new ArrayList <>();
687- String latestCommit = dataMetaClient .getActiveTimeline ().filterCompletedAndCompactionInstants ().lastInstant ().map (instant -> instant . requestedTime () ).orElse (SOLO_COMMIT_TIMESTAMP );
689+ String latestCommit = dataMetaClient .getActiveTimeline ().filterCompletedAndCompactionInstants ().lastInstant ().map (HoodieInstant :: requestedTime ).orElse (SOLO_COMMIT_TIMESTAMP );
688690 for (String partition : partitions ) {
689691 fsView .getLatestMergedFileSlicesBeforeOrOn (partition , latestCommit ).forEach (fs -> partitionFileSlicePairs .add (Pair .of (partition , fs )));
690692 }
691693
692- LOG .info ("Initializing record index from " + partitionFileSlicePairs .size () + " file slices in "
693- + partitions .size () + " partitions" );
694+ LOG .info ("Initializing record index from {} file slices in {} partitions" , partitionFileSlicePairs .size (), partitions .size ());
694695 records = readRecordKeysFromFileSliceSnapshot (
695696 engineContext ,
696697 partitionFileSlicePairs ,
@@ -722,7 +723,7 @@ private Pair<Integer, HoodieData<HoodieRecord>> initializeRecordIndexPartition()
722723 * @param metaClient metaclient instance to use.
723724 * @param dataWriteConfig write config to use.
724725 * @param hoodieTable hoodie table instance of interest.
725- * @return
726+ * @return {@link HoodieData} of {@link HoodieRecord} containing updates for record_index.
726727 */
727728 private static HoodieData <HoodieRecord > readRecordKeysFromFileSliceSnapshot (HoodieEngineContext engineContext ,
728729 List <Pair <String , FileSlice >> partitionFileSlicePairs ,
@@ -752,7 +753,7 @@ private static HoodieData<HoodieRecord> readRecordKeysFromFileSliceSnapshot(Hood
752753 Option .of (fileSlice )).getMergedRecords ().stream ().map (record -> {
753754 HoodieRecord record1 = (HoodieRecord ) record ;
754755 return HoodieMetadataPayload .createRecordIndexUpdate (record1 .getRecordKey (), partition , fileId ,
755- record1 .getCurrentLocation ().getInstantTime (), 0 );
756+ requireNonNull ( record1 .getCurrentLocation () ).getInstantTime (), 0 );
756757 }).iterator ();
757758 });
758759 }
@@ -845,9 +846,10 @@ private List<DirectoryInfo> listAllPartitionsFromFilesystem(String initializatio
845846 // List all directories in parallel
846847 engineContext .setJobStatus (this .getClass ().getSimpleName (), "Listing " + numDirsToList + " partitions from filesystem" );
847848 List <DirectoryInfo > processedDirectories = engineContext .map (pathsToProcess , path -> {
848- HoodieStorage storage = new HoodieHadoopStorage (path , storageConf );
849- String relativeDirPath = FSUtils .getRelativePartitionPath (storageBasePath , path );
850- return new DirectoryInfo (relativeDirPath , storage .listDirectEntries (path ), initializationTime , pendingDataInstants );
849+ try (HoodieStorage storage = new HoodieHadoopStorage (path , storageConf )) {
850+ String relativeDirPath = FSUtils .getRelativePartitionPath (storageBasePath , path );
851+ return new DirectoryInfo (relativeDirPath , storage .listDirectEntries (path ), initializationTime , pendingDataInstants );
852+ }
851853 }, numDirsToList );
852854
853855 // If the listing reveals a directory, add it to queue. If the listing reveals a hoodie partition, add it to
@@ -910,7 +912,7 @@ private void initializeFileGroups(HoodieTableMetaClient dataMetaClient, Metadata
910912 HoodieStorage storage = metadataMetaClient .getStorage ();
911913 try {
912914 final List <StoragePathInfo > existingFiles = storage .listDirectEntries (partitionPath );
913- if (existingFiles .size () > 0 ) {
915+ if (! existingFiles .isEmpty () ) {
914916 LOG .warn ("Deleting all existing files found in MDT partition {}" , partitionName );
915917 storage .deleteDirectory (partitionPath );
916918 ValidationUtils .checkState (!storage .exists (partitionPath ),
@@ -1202,7 +1204,7 @@ public void update(HoodieRestoreMetadata restoreMetadata, String instantTime) {
12021204 InstantGenerator datainstantGenerator = dataMetaClient .getInstantGenerator ();
12031205 HoodieInstant restoreInstant = datainstantGenerator .createNewInstant (REQUESTED , HoodieTimeline .RESTORE_ACTION , instantTime );
12041206 HoodieInstant requested = datainstantGenerator .getRestoreRequestedInstant (restoreInstant );
1205- HoodieRestorePlan restorePlan = null ;
1207+ HoodieRestorePlan restorePlan ;
12061208 try {
12071209 restorePlan = TimelineMetadataUtils .deserializeAvroMetadata (
12081210 dataMetaClient .getActiveTimeline ().readRestoreInfoAsBytes (requested ).get (), HoodieRestorePlan .class );
@@ -1608,7 +1610,7 @@ private void fetchOutofSyncFilesRecordsFromMetadataTable(Map<String, DirectoryIn
16081610 Map <String , List <String >> partitionFilesToDelete , List <String > partitionsToDelete ) throws IOException {
16091611
16101612 for (String partition : metadata .fetchAllPartitionPaths ()) {
1611- StoragePath partitionPath = null ;
1613+ StoragePath partitionPath ;
16121614 if (StringUtils .isNullOrEmpty (partition ) && !dataMetaClient .getTableConfig ().isTablePartitioned ()) {
16131615 partitionPath = new StoragePath (dataWriteConfig .getBasePath ());
16141616 } else {
@@ -1619,7 +1621,7 @@ private void fetchOutofSyncFilesRecordsFromMetadataTable(Map<String, DirectoryIn
16191621 if (!dirInfoMap .containsKey (partition )) {
16201622 // Entire partition has been deleted
16211623 partitionsToDelete .add (partitionId );
1622- if (metadataFiles != null && metadataFiles .size () > 0 ) {
1624+ if (metadataFiles != null && ! metadataFiles .isEmpty () ) {
16231625 partitionFilesToDelete .put (partitionId , metadataFiles .stream ().map (f -> f .getPath ().getName ()).collect (Collectors .toList ()));
16241626 }
16251627 } else {
0 commit comments