Skip to content

Commit

Permalink
fix compilation issue
Browse files Browse the repository at this point in the history
  • Loading branch information
Davis-Zhang-Onehouse committed Feb 20, 2025
1 parent ee5d3e7 commit 2880aca
Show file tree
Hide file tree
Showing 39 changed files with 130 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -189,17 +189,17 @@ protected Option<HoodieCleanerPlan> requestClean(String startCleanTime) {
if (lastClean.isPresent()) {
HoodieInstant cleanInstant = lastClean.get();
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
HoodieInstant cleanPlanInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, cleanInstant.getAction(), cleanInstant.requestedTime(), InstantComparatorV1.REQUESTED_TIME_BASED_COMPARATOR);
if (activeTimeline.isEmpty(cleanInstant)) {
activeTimeline.deleteEmptyInstantIfExists(cleanInstant);
HoodieInstant cleanPlanInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, cleanInstant.getAction(), cleanInstant.requestedTime(), InstantComparatorV1.REQUESTED_TIME_BASED_COMPARATOR);
try {
// Deserialize plan if it is non-empty
if (!activeTimeline.isEmpty(cleanPlanInstant)) {
return Option.of(TimelineMetadataUtils.deserializeCleanerPlan(activeTimeline.getContentStream(cleanPlanInstant)));
} else {
// Deserialize plan.
return Option.of(TimelineMetadataUtils.deserializeCleanerPlan(activeTimeline.getContentStream(cleanPlanInstant)));
} catch (IOException ex) {
// If it is empty we catch error and repair.
if (activeTimeline.isEmpty(cleanPlanInstant)) {
return Option.of(new HoodieCleanerPlan());
}
} catch (IOException ex) {
throw new HoodieIOException("Failed to parse cleaner plan", ex);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import static org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
import static org.apache.hudi.config.HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -124,7 +125,8 @@ public void testAverageRecordSizeWithNonEmptyCommitTimeline(List<Pair<HoodieInst
try {
when(mockTimeline.getInstantDetails(hoodieInstant))
.thenReturn(org.apache.hudi.common.util.Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
when(mockCommitMetadataSerDe.deserialize(hoodieInstant, mockTimeline.getInstantContentStream(hoodieInstant), HoodieCommitMetadata.class))
when(mockCommitMetadataSerDe.deserialize(
hoodieInstant, mockTimeline.getInstantContentStream(hoodieInstant), any(), HoodieCommitMetadata.class))
.thenReturn(commitMetadata);
} catch (IOException e) {
throw new RuntimeException("Should not have failed", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,8 @@ protected Pair<String, String> getPartitionAndBaseFilePathsFromLatestCommitMetad
String extension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
HoodieInstant instant = metaClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get();
HoodieCommitMetadata commitMetadata = metaClient.getCommitMetadataSerDe()
.deserialize(instant, metaClient.getActiveTimeline().getInstantContentStream(instant), HoodieCommitMetadata.class);
.deserialize(instant, metaClient.getActiveTimeline().getInstantContentStream(instant),
() -> metaClient.getActiveTimeline().isEmpty(instant), HoodieCommitMetadata.class);
String filePath = commitMetadata.getPartitionToWriteStats().values().stream()
.flatMap(Collection::stream).filter(s -> s.getPath().endsWith(extension)).findAny()
.map(HoodieWriteStat::getPath).orElse(null);
Expand Down Expand Up @@ -557,7 +558,7 @@ protected void verifyClusteredFilesWithReplaceCommitMetadata(String partitionPat
HoodieReplaceCommitMetadata replaceCommitMetadata = metaClient.getCommitMetadataSerDe().deserialize(
replaceCommitInstant,
metaClient.getActiveTimeline().getInstantContentStream(replaceCommitInstant),
HoodieReplaceCommitMetadata.class);
() -> metaClient.getActiveTimeline().isEmpty(replaceCommitInstant), HoodieReplaceCommitMetadata.class);

List<String> filesFromReplaceCommit = new ArrayList<>();
replaceCommitMetadata.getPartitionToWriteStats()
Expand Down Expand Up @@ -882,15 +883,17 @@ protected void testCommitWritesRelativePaths(Function transformInputFn) throws E
HoodieInstant commitInstant = INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, actionType, instantTime);
HoodieTimeline commitTimeline = metaClient.getCommitTimeline().filterCompletedInstants();
HoodieCommitMetadata commitMetadata = metaClient.getCommitMetadataSerDe()
.deserialize(commitInstant, commitTimeline.getInstantContentStream(commitInstant), HoodieCommitMetadata.class);
.deserialize(commitInstant, commitTimeline.getInstantContentStream(commitInstant),
() -> commitTimeline.isEmpty(commitInstant),
HoodieCommitMetadata.class);
StoragePath basePath = metaClient.getBasePath();
Collection<String> commitPathNames = commitMetadata.getFileIdAndFullPaths(basePath).values();

// Read from commit file
HoodieInstant instant = INSTANT_GENERATOR.createNewInstant(COMPLETED, COMMIT_ACTION, instantTime);
HoodieCommitMetadata metadata = metaClient.getCommitMetadataSerDe().deserialize(instant,
metaClient.reloadActiveTimeline().getInstantContentStream(instant),
HoodieCommitMetadata.class);
() -> metaClient.getActiveTimeline().isEmpty(instant), HoodieCommitMetadata.class);
HashMap<String, String> paths = metadata.getFileIdAndFullPaths(basePath);
// Compare values in both to make sure they are equal.
for (String pathName : paths.values()) {
Expand All @@ -913,9 +916,10 @@ protected void testMetadataStatsOnCommit(boolean populateMetaFields, Function tr

// Read from commit file
HoodieInstant instant = INSTANT_GENERATOR.createNewInstant(COMPLETED, COMMIT_ACTION, instantTime0);
HoodieCommitMetadata metadata = metaClient.getCommitMetadataSerDe().deserialize(instant,
createMetaClient().reloadActiveTimeline().getInstantContentStream(instant),
HoodieCommitMetadata.class);
HoodieActiveTimeline activeTimeline = createMetaClient().reloadActiveTimeline();
HoodieInstant finalInstant = instant;
HoodieCommitMetadata metadata = metaClient.getCommitMetadataSerDe().deserialize(
instant, activeTimeline.getInstantContentStream(instant), () -> activeTimeline.isEmpty(finalInstant), HoodieCommitMetadata.class);
int inserts = 0;
for (Map.Entry<String, List<HoodieWriteStat>> pstat : metadata.getPartitionToWriteStats().entrySet()) {
for (HoodieWriteStat stat : pstat.getValue()) {
Expand All @@ -934,9 +938,11 @@ protected void testMetadataStatsOnCommit(boolean populateMetaFields, Function tr
metaClient = createMetaClient();
instant = INSTANT_GENERATOR.createNewInstant(COMPLETED, COMMIT_ACTION, instantTime1);
// Read from commit file
HoodieActiveTimeline activeTimeline1 = metaClient.reloadActiveTimeline();
HoodieInstant finalInstant1 = instant;
metadata = metaClient.getCommitMetadataSerDe().deserialize(instant,
metaClient.reloadActiveTimeline().getInstantContentStream(instant),
HoodieCommitMetadata.class);
activeTimeline1.getInstantContentStream(instant),
() -> activeTimeline1.isEmpty(finalInstant1), HoodieCommitMetadata.class);
inserts = 0;
int upserts = 0;
for (Map.Entry<String, List<HoodieWriteStat>> pstat : metadata.getPartitionToWriteStats().entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ public void testArchivedInsertOverwriteWithoutClustering() throws Exception {
assertEquals(HoodieTimeline.REPLACE_COMMIT_ACTION, archived.getAction());
assertDoesNotThrow(() -> metaClient.getCommitMetadataSerDe().deserialize(
INSTANT_GENERATOR.createNewInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, newCommitTime),
Option.of(new ByteArrayInputStream(archived.getPlan().array())), HoodieCommitMetadata.class), "Insert overwrite without clustering should have a plan");
Option.of(new ByteArrayInputStream(archived.getPlan().array())), () -> true, HoodieCommitMetadata.class), "Insert overwrite without clustering should have a plan");

String newCommitTime2 = HoodieTestTable.makeNewCommitTime();
createReplace(newCommitTime2, WriteOperationType.INSERT_OVERWRITE_TABLE, false);
Expand All @@ -234,7 +234,7 @@ public void testArchivedInsertOverwriteWithoutClustering() throws Exception {
assertEquals(HoodieTimeline.REPLACE_COMMIT_ACTION, archived2.getAction());
assertDoesNotThrow(() -> metaClient.getCommitMetadataSerDe().deserialize(
INSTANT_GENERATOR.createNewInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, newCommitTime2),
Option.of(new ByteArrayInputStream(archived2.getPlan().array())), HoodieCommitMetadata.class),
Option.of(new ByteArrayInputStream(archived2.getPlan().array())), () -> true, HoodieCommitMetadata.class),
"Insert overwrite table without clustering should have a plan");
}

Expand Down Expand Up @@ -266,7 +266,8 @@ public void testArchivedCommit() throws Exception {
assertEquals(HoodieTimeline.COMMIT_ACTION, archived.getAction());
assertDoesNotThrow(() -> metaClient.getCommitMetadataSerDe().deserialize(
INSTANT_GENERATOR.createNewInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, newCommitTime),
Option.of(new ByteArrayInputStream(archived.getMetadata().array())), HoodieCommitMetadata.class));
Option.of(new ByteArrayInputStream(archived.getMetadata().array())),
() -> true, HoodieCommitMetadata.class));
}

@Test
Expand Down Expand Up @@ -299,7 +300,8 @@ public void testArchivedCompaction() throws Exception {
assertEquals(HoodieTimeline.COMMIT_ACTION, archived.getAction());
assertDoesNotThrow(() -> metaClient.getCommitMetadataSerDe().deserialize(
INSTANT_GENERATOR.createNewInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, newCommitTime),
Option.of(new ByteArrayInputStream(archived.getMetadata().array())), HoodieCommitMetadata.class));
Option.of(new ByteArrayInputStream(archived.getMetadata().array())),
() -> true, HoodieCommitMetadata.class));
assertDoesNotThrow(() -> CompactionUtils.getCompactionPlan(metaClient, Option.of(new ByteArrayInputStream(archived.getPlan().array()))));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1440,7 +1440,7 @@ public void testColStatsPrefixLookup() throws IOException {
metaClient.getActiveTimeline().getInstants().forEach(entry -> {
try {
HoodieCommitMetadata commitMetadata = metaClient.getCommitMetadataSerDe()
.deserialize(entry, metaClient.getActiveTimeline().getInstantContentStream(entry), HoodieCommitMetadata.class);
.deserialize(entry, metaClient.getActiveTimeline().getInstantContentStream(entry), () -> true, HoodieCommitMetadata.class);
String commitTime = entry.requestedTime();
if (!commitToPartitionsToFiles.containsKey(commitTime)) {
commitToPartitionsToFiles.put(commitTime, new HashMap<>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -961,7 +961,7 @@ private static HashMap<String, String> getLatestFileIDsToFullPath(String basePat
TimelineLayout layout = TimelineLayout.fromVersion(commitTimeline.getTimelineLayoutVersion());
for (HoodieInstant commit : commitsToReturn) {
HoodieCommitMetadata metadata =
layout.getCommitMetadataSerDe().deserialize(commit, commitTimeline.getInstantContentStream(commit), HoodieCommitMetadata.class);
layout.getCommitMetadataSerDe().deserialize(commit, commitTimeline.getInstantContentStream(commit), () -> true, HoodieCommitMetadata.class);
fileIdToFullPath.putAll(metadata.getFileIdAndFullPaths(new StoragePath(basePath)));
}
return fileIdToFullPath;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public void testSavepoint(boolean enableMetadataTable,
Map<String, List<HoodieWriteStat>> partitionToWriteStats = metaClient.getCommitMetadataSerDe().deserialize(
commitsTimeline.lastInstant().get(),
commitsTimeline.getInstantContentStream(commitsTimeline.lastInstant().get()),
() -> true,
HoodieCommitMetadata.class)
.getPartitionToWriteStats();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ private int getNumCompactions(HoodieTableMetaClient metaClient) {
try {
return s.getAction().equals(HoodieTimeline.COMMIT_ACTION)
&& metaClient.getCommitMetadataSerDe().deserialize(s,
timeline.getInstantContentStream(s), HoodieCommitMetadata.class)
timeline.getInstantContentStream(s), () -> true, HoodieCommitMetadata.class)
.getOperationType().equals(COMPACT);
} catch (IOException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ private void validateFilesAfterCleaning(
try {
HoodieInstant instant1 = timeline.filter(inst -> inst.requestedTime().equals(newInstant))
.firstInstant().get();
return layout.getCommitMetadataSerDe().deserialize(instant1, timeline.getInstantContentStream(instant1), HoodieCommitMetadata.class)
return layout.getCommitMetadataSerDe().deserialize(instant1, timeline.getInstantContentStream(instant1), () -> true, HoodieCommitMetadata.class)
.getWriteStats();
} catch (IOException e) {
return Collections.EMPTY_LIST;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ private void testInsertAndCleanByVersions(
HashMap<String, TreeSet<String>> fileIdToVersions = new HashMap<>();
for (HoodieInstant entry : timeline.getInstants()) {
HoodieCommitMetadata commitMetadata =
metaClient.getCommitMetadataSerDe().deserialize(entry, timeline.getInstantContentStream(entry), HoodieCommitMetadata.class);
metaClient.getCommitMetadataSerDe().deserialize(entry, timeline.getInstantContentStream(entry), () -> true, HoodieCommitMetadata.class);

for (HoodieWriteStat wstat : commitMetadata.getWriteStats(partitionPath)) {
if (!fileIdToVersions.containsKey(wstat.getFileId())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ private static HashMap<String, String> getLatestFileIDsToFullPath(String basePat
TimelineLayout layout = TimelineLayout.fromVersion(commitTimeline.getTimelineLayoutVersion());
for (HoodieInstant commit : commitsToReturn) {
HoodieCommitMetadata metadata =
layout.getCommitMetadataSerDe().deserialize(commit, commitTimeline.getInstantContentStream(commit), HoodieCommitMetadata.class);
layout.getCommitMetadataSerDe().deserialize(commit, commitTimeline.getInstantContentStream(commit), () -> true, HoodieCommitMetadata.class);
fileIdToFullPath.putAll(metadata.getFileIdAndFullPaths(new StoragePath(basePath)));
}
return fileIdToFullPath;
Expand Down Expand Up @@ -328,7 +328,7 @@ public static HoodieTableMetaClient createMetaClient(SparkSession spark, String
public static Option<HoodieCommitMetadata> getCommitMetadataForInstant(HoodieTableMetaClient metaClient, HoodieInstant instant) {
try {
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
return Option.of(metaClient.getCommitMetadataSerDe().deserialize(instant, timeline.getInstantContentStream(instant), HoodieCommitMetadata.class));
return Option.of(metaClient.getCommitMetadataSerDe().deserialize(instant, timeline.getInstantContentStream(instant), () -> true, HoodieCommitMetadata.class));
} catch (Exception e) {
throw new HoodieException("Failed to read schema from commit metadata", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.hudi.common.model;

import org.apache.avro.Schema;
import org.apache.avro.reflect.ReflectData;
import org.apache.hudi.common.table.timeline.CommitMetadataSerDe;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
Expand All @@ -30,6 +28,9 @@
import org.apache.hudi.common.util.JsonUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;

import org.apache.avro.Schema;
import org.apache.avro.reflect.ReflectData;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -168,7 +169,7 @@ public void testCommitMetadataSerde() throws Exception {
HoodieInstant instant = INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, "commit", "1");
org.apache.hudi.common.model.HoodieCommitMetadata commitMetadata1 =
COMMIT_METADATA_SER_DE.deserialize(instant,
Option.of(new ByteArrayInputStream(serializedCommitMetadata)), org.apache.hudi.common.model.HoodieCommitMetadata.class);
Option.of(new ByteArrayInputStream(serializedCommitMetadata)), () -> true, org.apache.hudi.common.model.HoodieCommitMetadata.class);
assertEquals(2, commitMetadata1.partitionToWriteStats.size());
assertEquals(2, commitMetadata1.partitionToWriteStats.get("partition1").size());
assertEquals(2, commitMetadata1.partitionToWriteStats.get("partition1").size());
Expand All @@ -182,7 +183,7 @@ public void testCommitMetadataSerde() throws Exception {
byte[] v1Bytes = v1SerDe.serialize(commitMetadata1).get();
System.out.println(new String(v1Bytes));
org.apache.hudi.common.model.HoodieCommitMetadata commitMetadata2 =
COMMIT_METADATA_SER_DE.deserialize(legacyInstant, Option.of(new ByteArrayInputStream(v1Bytes)), org.apache.hudi.common.model.HoodieCommitMetadata.class);
COMMIT_METADATA_SER_DE.deserialize(legacyInstant, Option.of(new ByteArrayInputStream(v1Bytes)), () -> true, org.apache.hudi.common.model.HoodieCommitMetadata.class);
assertEquals(2, commitMetadata2.partitionToWriteStats.size());
assertEquals(2, commitMetadata2.partitionToWriteStats.get("partition1").size());
assertEquals(2, commitMetadata2.partitionToWriteStats.get("partition1").size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void testEmptyMetadataSerDe() throws Exception {
assertTrue(serialized.isPresent());

// Deserialize
HoodieCommitMetadata deserialized = serDe.deserialize(instant, Option.of(new ByteArrayInputStream(serialized.get())), HoodieCommitMetadata.class);
HoodieCommitMetadata deserialized = serDe.deserialize(instant, Option.of(new ByteArrayInputStream(serialized.get())), () -> true, HoodieCommitMetadata.class);
// Verify
assertNotNull(deserialized);
assertEquals(0, deserialized.getPartitionToWriteStats().size());
Expand Down Expand Up @@ -135,7 +135,7 @@ public void testPopulatedMetadataSerDe() throws Exception {

// Deserialize
HoodieCommitMetadata deserialized = serDe.deserialize(
instant, Option.of(new ByteArrayInputStream(serialized.get())), HoodieCommitMetadata.class);
instant, Option.of(new ByteArrayInputStream(serialized.get())), () -> true, HoodieCommitMetadata.class);

// Verify all fields
assertNotNull(deserialized);
Expand Down Expand Up @@ -225,7 +225,7 @@ public void testReplaceCommitMetadataSerDe() throws Exception {
assertTrue(serialized.isPresent());

// Deserialize
HoodieReplaceCommitMetadata deserialized = serDe.deserialize(instant, Option.of(new ByteArrayInputStream(serialized.get())), HoodieReplaceCommitMetadata.class);
HoodieReplaceCommitMetadata deserialized = serDe.deserialize(instant, Option.of(new ByteArrayInputStream(serialized.get())), () -> true, HoodieReplaceCommitMetadata.class);

// Verify basic fields
assertNotNull(deserialized);
Expand Down
Loading

0 comments on commit 2880aca

Please sign in to comment.