Skip to content

Commit

Permalink
Serde take input stream as input
Browse files Browse the repository at this point in the history
  • Loading branch information
Davis-Zhang-Onehouse committed Feb 11, 2025
1 parent bb7913b commit 8552c5d
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,14 @@

import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.JsonUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieException;

import com.fasterxml.jackson.databind.exc.MismatchedInputException;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.io.DatumReader;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.security.MessageDigest;
Expand Down Expand Up @@ -567,27 +559,6 @@ public InputStream getInstantContentStream(HoodieInstant instant) {
return getInstantReader().getContentStream(instant);
}

@Override
public <T> T deserializeInstantContent(HoodieInstant instant, Class<T> clazz) throws IOException {
if (SpecificRecord.class.isAssignableFrom(clazz)) {
try (InputStream inputStream = getInstantContentStream(instant)) {
DatumReader<T> reader = new SpecificDatumReader<>(clazz);
DataFileStream<T> fileReader = new DataFileStream<>(inputStream, reader);
ValidationUtils.checkArgument(fileReader.hasNext(), "Could not deserialize metadata of type " + clazz);
return fileReader.next();
}
} else {
try (InputStream inputStream = getInstantContentStream(instant)) {
return JsonUtils.getObjectMapper().readValue(inputStream, clazz);
} catch (MismatchedInputException ex) {
if (ex.getMessage().startsWith("No content to map")) {
return ReflectionUtils.loadClass(clazz.getName());
}
throw ex;
}
}
}

@Override
public boolean isEmpty(HoodieInstant instant) {
return getInstantDetails(instant).get().length == 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@
import org.apache.hudi.common.util.Option;

import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;

/**
* Interface for serializing and deserializing commit metadata.
*/
public interface CommitMetadataSerDe extends Serializable {

<T> T deserialize(HoodieInstant instant, byte[] bytes, Class<T> clazz) throws IOException;
<T> T deserialize(HoodieInstant instant, InputStream instantStream, Class<T> clazz) throws IOException;

Option<byte[]> serialize(HoodieCommitMetadata commitMetadata) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;

import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.util.List;
Expand Down Expand Up @@ -457,8 +456,6 @@ public interface HoodieTimeline extends HoodieInstantReader, Serializable {
*/
InputStream getInstantContentStream(HoodieInstant instant);

<T> T deserializeInstantContent(HoodieInstant instant, Class<T> clazz) throws IOException;

boolean isEmpty(HoodieInstant instant);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.hudi.storage.StoragePathInfo;

import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.file.FileReader;
import org.apache.avro.file.SeekableByteArrayInput;
Expand All @@ -51,6 +52,7 @@

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -200,6 +202,10 @@ public static HoodieCommitMetadata deserializeCommitMetadata(byte[] bytes) throw
return deserializeAvroMetadata(bytes, HoodieCommitMetadata.class);
}

public static HoodieCommitMetadata deserializeCommitMetadata(InputStream instantStream) throws IOException {
return deserializeAvroMetadata(instantStream, HoodieCommitMetadata.class);
}

public static HoodieReplaceCommitMetadata deserializeReplaceCommitMetadata(byte[] bytes) throws IOException {
return deserializeAvroMetadata(bytes, HoodieReplaceCommitMetadata.class);
}
Expand All @@ -211,4 +217,12 @@ public static <T extends SpecificRecordBase> T deserializeAvroMetadata(byte[] by
ValidationUtils.checkArgument(fileReader.hasNext(), "Could not deserialize metadata of type " + clazz);
return fileReader.next();
}

public static <T> T deserializeAvroMetadata(InputStream instantStream, Class<T> clazz)
throws IOException {
DatumReader<T> reader = new SpecificDatumReader<>(clazz);
DataFileStream<T> fileReader = new DataFileStream<>(instantStream, reader);
ValidationUtils.checkArgument(fileReader.hasNext(), "Could not deserialize metadata of type " + clazz);
return fileReader.next();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,28 @@
import org.apache.hudi.common.util.JsonUtils;
import org.apache.hudi.common.util.Option;

import java.io.BufferedInputStream;
import java.io.IOException;

import static org.apache.hudi.common.util.StringUtils.fromUTF8Bytes;
import java.io.InputStream;

public class CommitMetadataSerDeV1 implements CommitMetadataSerDe {

@Override
public <T> T deserialize(HoodieInstant instant, byte[] bytes, Class<T> clazz) throws IOException {
public <T> T deserialize(HoodieInstant instant, InputStream inputStream, Class<T> clazz) throws IOException {
try {
if (bytes.length == 0) {
BufferedInputStream bis = new BufferedInputStream(inputStream);
bis.mark(1);
boolean isEmpty = (bis.read() == -1);
if (isEmpty) {
return clazz.newInstance();
}
return fromJsonString(fromUTF8Bytes(bytes), clazz);
bis.reset();

// Use ObjectMapper to directly read from InputStream
// This avoids loading entire content into memory at once
return JsonUtils.getObjectMapper().readValue(inputStream, clazz);
} catch (Exception e) {
throw new IOException("unable to read commit metadata for instant " + instant + " bytes length: " + bytes.length, e);
throw new IOException("Unable to read commit metadata for instant " + instant, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,41 +33,38 @@

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;

import static org.apache.hudi.common.table.timeline.MetadataConversionUtils.convertCommitMetadataToJsonBytes;
import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeCommitMetadata;
import static org.apache.hudi.common.util.StringUtils.fromUTF8Bytes;
import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeAvroMetadata;

public class CommitMetadataSerDeV2 implements CommitMetadataSerDe {

@Override
public <T> T deserialize(HoodieInstant instant, byte[] bytes, Class<T> clazz) throws IOException {
public <T> T deserialize(HoodieInstant instant, InputStream inputStream, Class<T> clazz) throws IOException {
try {
if (bytes.length == 0) {
// Check if stream is empty
inputStream.mark(1);
int firstByte = inputStream.read();
if (firstByte == -1) {
return clazz.newInstance();
}
inputStream.reset();

if (instant.isLegacy()) {
try {
return fromJsonString(fromUTF8Bytes(bytes), clazz);
return JsonUtils.getObjectMapper().readValue(inputStream, clazz);
} catch (Exception e) {
throw new IOException("unable to read legacy commit metadata for instant " + instant, e);
}
}
return fromJsonString(
fromUTF8Bytes(
convertCommitMetadataToJsonBytes(deserializeCommitMetadata(bytes), org.apache.hudi.avro.model.HoodieCommitMetadata.class)),
clazz);
return deserializeAvroMetadata(inputStream, clazz);
} catch (Exception e) {
throw new IOException("unable to read commit metadata for instant " + instant + " bytes length: " + bytes.length, e);
throw new IOException("unable to read commit metadata for instant " + instant, e);
}
}

public static <T> T fromJsonString(String jsonStr, Class<T> clazz) throws Exception {
if (jsonStr == null || jsonStr.isEmpty()) {
// For empty commit file
return clazz.newInstance();
}
return JsonUtils.getObjectMapper().readValue(jsonStr, clazz);
public static <T> T fromJsonString(InputStream inputStream, Class<T> clazz) throws Exception {
return JsonUtils.getObjectMapper().readValue(inputStream, clazz);
}

@Override
Expand Down

0 comments on commit 8552c5d

Please sign in to comment.