Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance improvements #415

Merged
merged 1 commit into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 17 additions & 8 deletions core/src/main/java/org/infinispan/protostream/ProtobufUtil.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.infinispan.protostream;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand All @@ -10,8 +9,8 @@

import org.infinispan.protostream.config.Configuration;
import org.infinispan.protostream.impl.BaseMarshallerDelegate;
import org.infinispan.protostream.impl.ByteArrayOutputStreamEx;
import org.infinispan.protostream.impl.JsonUtils;
import org.infinispan.protostream.impl.RandomAccessOutputStreamImpl;
import org.infinispan.protostream.impl.SerializationContextImpl;
import org.infinispan.protostream.impl.TagReaderImpl;
import org.infinispan.protostream.impl.TagWriterImpl;
Expand Down Expand Up @@ -70,18 +69,23 @@ private static <A> void write(ImmutableSerializationContext ctx, TagWriterImpl o
out.flush();
}

public static void writeTo(ImmutableSerializationContext ctx, OutputStream out, Object t) throws IOException {
public static void writeTo(ImmutableSerializationContext ctx, RandomAccessOutputStream out, Object t) throws IOException {
write(ctx, TagWriterImpl.newInstance(ctx, out), t);
}

public static void writeTo(ImmutableSerializationContext ctx, OutputStream out, Object t) throws IOException {
TagWriterImpl writer = out instanceof RandomAccessOutputStream raos ? TagWriterImpl.newInstance(ctx, raos) : TagWriterImpl.newInstance(ctx, out);
write(ctx, writer, t);
}

public static byte[] toByteArray(ImmutableSerializationContext ctx, Object t) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream(DEFAULT_ARRAY_BUFFER_SIZE);
RandomAccessOutputStream baos = new RandomAccessOutputStreamImpl(DEFAULT_ARRAY_BUFFER_SIZE);
writeTo(ctx, baos, t);
return baos.toByteArray();
}

public static ByteBuffer toByteBuffer(ImmutableSerializationContext ctx, Object t) throws IOException {
ByteArrayOutputStreamEx baos = new ByteArrayOutputStreamEx(DEFAULT_ARRAY_BUFFER_SIZE);
RandomAccessOutputStream baos = new RandomAccessOutputStreamImpl(DEFAULT_ARRAY_BUFFER_SIZE);
writeTo(ctx, baos, t);
return baos.getByteBuffer();
}
Expand Down Expand Up @@ -142,21 +146,26 @@ public static byte[] toWrappedByteArray(ImmutableSerializationContext ctx, Objec
}

public static byte[] toWrappedByteArray(ImmutableSerializationContext ctx, Object t, int bufferSize) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream(bufferSize);
RandomAccessOutputStream baos = new RandomAccessOutputStreamImpl(bufferSize);
WrappedMessage.write(ctx, TagWriterImpl.newInstance(ctx, baos), t);
return baos.toByteArray();
}

public static ByteBuffer toWrappedByteBuffer(ImmutableSerializationContext ctx, Object t) throws IOException {
ByteArrayOutputStreamEx baos = new ByteArrayOutputStreamEx(DEFAULT_ARRAY_BUFFER_SIZE);
RandomAccessOutputStream baos = new RandomAccessOutputStreamImpl(DEFAULT_ARRAY_BUFFER_SIZE);
WrappedMessage.write(ctx, TagWriterImpl.newInstance(ctx, baos), t);
return baos.getByteBuffer();
}

public static void toWrappedStream(ImmutableSerializationContext ctx, OutputStream out, Object t) throws IOException {
public static void toWrappedStream(ImmutableSerializationContext ctx, RandomAccessOutputStream out, Object t) throws IOException {
WrappedMessage.write(ctx, TagWriterImpl.newInstance(ctx, out), t);
}

public static void toWrappedStream(ImmutableSerializationContext ctx, OutputStream out, Object t) throws IOException {
TagWriter writer = out instanceof RandomAccessOutputStream raos ? TagWriterImpl.newInstance(ctx, raos) : TagWriterImpl.newInstance(ctx, out);
WrappedMessage.write(ctx, writer, t);
}

/**
* @deprecated since 5.0.10 Please use {@link #toWrappedStream(ImmutableSerializationContext, OutputStream, Object)} with a {@link java.io.BufferedOutputStream} instead
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package org.infinispan.protostream;

import java.io.Closeable;
import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;

/**
* A byte output stream that exposes positional arguments in order to allow existing bytes in the stream to be overwritten.
*
* @author Ryan Emerson
* @since 6.0
*/
public interface RandomAccessOutputStream extends Closeable {

/**
* @return the position in the stream where the next {@link #write(int)} will be written.
*/
int getPosition();

/**
* Set the position in the stream where the next {@link #write(int)} will be written.
*/
void setPosition(int position);

/**
* Ensure that the stream has sufficient capacity to hold at least `capacity` bytes.
*
* @param capacity the minimum number of bytes that should be writeable
*/
void ensureCapacity(int capacity);

/**
* Reset the position of the stream to zero, so that all currently accumulated output in the stream is discarded.
* The output stream can then be used again, reusing the already allocated buffer space.
*/
default void reset() {
setPosition(0);
}

/**
* @return a {@link ByteBuffer} representation of the stream
*/
ByteBuffer getByteBuffer();

/**
* @return a trimmed {@link byte[]} instance based upon the current {@link #getPosition()} of the array
*/
byte[] toByteArray();

/**
* @param position the position in the stream to read the byte from
* @return the byte associated with the specified position
*/
byte get(int position);

/**
* Write a single byte to the head of the stream.
*
* @param b the byte to be written
*/
default void write(int b) throws IOException {
write(getPosition(), b);
}

/**
* Write all bytes to the head of the stream.
*
* @param b the array of bytes to be written
*/
default void write(byte[] b) throws IOException {
write(b, 0, b.length);
}

/**
* Write all bytes to the head of the stream.
*
* @param b the array of bytes to be written
* @param off the offset within the array to be written
* @param len the number of bytes from the array to be written
*/
default void write(byte[] b, int off, int len) throws IOException {
write(getPosition(), b, off, len);
}

/**
* Write a single byte to the specified position in the stream.
*
* @param position the position in the stream to write the byte to
* @param b the byte to be written
*/
void write(int position, int b) throws IOException;

/**
* Write all bytes to the specified position in the stream.
*
* @param position the position in the stream to write the bytes to
* @param b the array of bytes to be written
*/
default void write(int position, byte[] b) throws IOException {
write(position, b, 0, b.length);
}

/**
* Write all bytes to the specified position in the stream.
*
* @param position the position in the stream to write the bytes to
* @param b the array of bytes to be written
* @param off the offset within the array to be written
* @param len the number of bytes from the array to be written
*/
void write(int position, byte[] b, int off, int len) throws IOException;

/**
* Write all bytes in this stream to the provided {@link DataOutput} stream
* @param output the stream to write this stream's bytes to
*/
void copyTo(DataOutput output) throws IOException;
}
22 changes: 11 additions & 11 deletions core/src/main/java/org/infinispan/protostream/WrappedMessage.java
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
package org.infinispan.protostream;

import java.io.IOException;
import java.time.Instant;
import java.util.Date;
import java.util.Iterator;
import java.util.Objects;

import org.infinispan.protostream.containers.ElementContainerAdapter;
import org.infinispan.protostream.containers.IndexedElementContainerAdapter;
import org.infinispan.protostream.containers.IterableElementContainerAdapter;
import org.infinispan.protostream.descriptors.WireType;
import org.infinispan.protostream.impl.BaseMarshallerDelegate;
import org.infinispan.protostream.impl.ByteArrayOutputStreamEx;
import org.infinispan.protostream.impl.EnumMarshallerDelegate;
import org.infinispan.protostream.impl.RandomAccessOutputStreamImpl;
import org.infinispan.protostream.impl.SerializationContextImpl;
import org.infinispan.protostream.impl.TagReaderImpl;
import org.infinispan.protostream.impl.TagWriterImpl;

import java.io.IOException;
import java.time.Instant;
import java.util.Date;
import java.util.Iterator;
import java.util.Objects;

/**
* A wrapper for messages, enums or primitive types that encodes the type of the inner object/value and also helps keep
* track of where the message ends. The need for this wrapper stems from two particular design choices in the Protocol
Expand Down Expand Up @@ -315,7 +315,7 @@ private static <T> void writeCustomObject(ImmutableSerializationContext ctx, Tag
if (t.getClass().isEnum()) {
((EnumMarshallerDelegate) marshallerDelegate).encode(WRAPPED_ENUM, (Enum<?>) t, out);
} else {
ByteArrayOutputStreamEx buffer = new ByteArrayOutputStreamEx();
RandomAccessOutputStream buffer = new RandomAccessOutputStreamImpl();
TagWriterImpl nestedCtx = TagWriterImpl.newInstance(ctx, buffer);
marshallerDelegate.marshall(nestedCtx, null, t);
nestedCtx.flush();
Expand All @@ -339,7 +339,7 @@ private static void writeContainer(ImmutableSerializationContext ctx, TagWriter
int containerSize = ((ElementContainerAdapter) containerMarshaller).getNumElements(container);
out.writeUInt32(WRAPPED_CONTAINER_SIZE, containerSize);

ByteArrayOutputStreamEx buffer = new ByteArrayOutputStreamEx();
RandomAccessOutputStream buffer = new RandomAccessOutputStreamImpl();
TagWriterImpl nestedCtx = TagWriterImpl.newInstance(ctx, buffer);
marshallerDelegate.marshall(nestedCtx, null, container);
nestedCtx.flush();
Expand All @@ -353,7 +353,7 @@ private static void writeContainer(ImmutableSerializationContext ctx, TagWriter
}

private static void writeContainerWrappingElements(BaseMarshaller containerMarshaller, int containerSize, Object container,
ImmutableSerializationContext ctx, TagWriter out, ByteArrayOutputStreamEx buffer) throws IOException {
ImmutableSerializationContext ctx, TagWriter out, RandomAccessOutputStream buffer) throws IOException {
if (containerMarshaller instanceof IterableElementContainerAdapter) {
Iterator<?> elements = ((IterableElementContainerAdapter) containerMarshaller).getElements(container);
for (int i = 0; i < containerSize; i++) {
Expand Down Expand Up @@ -392,7 +392,7 @@ private static void writeContainerWithoutWrappingElements(BaseMarshaller contain
}
}

private static void writeContainerElementWrapped(ImmutableSerializationContext ctx, TagWriter out, ByteArrayOutputStreamEx buffer, Object e) throws IOException {
private static void writeContainerElementWrapped(ImmutableSerializationContext ctx, TagWriter out, RandomAccessOutputStream buffer, Object e) throws IOException {
if (tryWritePrimitive(out, e, true)) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
import java.io.IOException;

import org.infinispan.protostream.ProtobufTagMarshaller;
import org.infinispan.protostream.RandomAccessOutputStream;
import org.infinispan.protostream.impl.BaseMarshallerDelegate;
import org.infinispan.protostream.impl.ByteArrayOutputStreamEx;
import org.infinispan.protostream.impl.Log;
import org.infinispan.protostream.impl.RandomAccessOutputStreamImpl;
import org.infinispan.protostream.impl.TagWriterImpl;

/**
Expand Down Expand Up @@ -51,15 +52,15 @@ protected final <T> void writeNestedMessage(BaseMarshallerDelegate<T> marshaller
}

public static class NestedWriter implements AutoCloseable {
final ByteArrayOutputStreamEx baos;
final RandomAccessOutputStream baos;
final TagWriterImpl writer;
private final ProtobufTagMarshaller.WriteContext ctx;
private final int fieldNumber;

public NestedWriter(ProtobufTagMarshaller.WriteContext ctx, int fieldNumber) {
this.ctx = ctx;
this.fieldNumber = fieldNumber;
this.baos = new ByteArrayOutputStreamEx();
this.baos = new RandomAccessOutputStreamImpl();
this.writer = TagWriterImpl.newNestedInstance(ctx, baos);
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.infinispan.protostream.ImmutableSerializationContext;
import org.infinispan.protostream.MessageContext;
import org.infinispan.protostream.MessageMarshaller;
import org.infinispan.protostream.RandomAccessOutputStream;
import org.infinispan.protostream.TagWriter;
import org.infinispan.protostream.descriptors.Descriptor;
import org.infinispan.protostream.descriptors.FieldDescriptor;
Expand Down Expand Up @@ -506,7 +507,7 @@ public <E extends Enum<E>> void writeEnum(String fieldName, E value) throws IOEx

private void writeMessage(FieldDescriptor fd, Object value, Class<?> clazz) throws IOException {
BaseMarshallerDelegate marshallerDelegate = serCtx.getMarshallerDelegate(clazz);
ByteArrayOutputStreamEx nestedBaos = new ByteArrayOutputStreamEx();
RandomAccessOutputStream nestedBaos = new RandomAccessOutputStreamImpl();
TagWriterImpl nestedOut = TagWriterImpl.newNestedInstance(messageContext.out, nestedBaos);
marshallerDelegate.marshall(nestedOut, fd, value);
nestedOut.flush();
Expand Down Expand Up @@ -817,7 +818,7 @@ public <K, V> void writeMap(String fieldName, Map<? super K, ? super V> map, Cla
for (Map.Entry<? super K, ? super V> entry : map.entrySet()) {
Object key = entry.getKey();
validateElement(key, keyClass); // Protobuf 3 does not allow null keys
ByteArrayOutputStreamEx nestedBaos = new ByteArrayOutputStreamEx();
RandomAccessOutputStream nestedBaos = new RandomAccessOutputStreamImpl();
TagWriterImpl out = TagWriterImpl.newNestedInstance(messageContext.out, nestedBaos);
// Write the key as field 1
switch (md.getKeyType()) {
Expand Down Expand Up @@ -878,7 +879,7 @@ public <K, V> void writeMap(String fieldName, Map<? super K, ? super V> map, Cla
case MESSAGE -> {
// FIXME: there is too much nesting here. We can definitely improve things
BaseMarshallerDelegate<V> marshallerDelegate = serCtx.getMarshallerDelegate(valueClass);
ByteArrayOutputStreamEx mapValueBaos = new ByteArrayOutputStreamEx();
RandomAccessOutputStream mapValueBaos = new RandomAccessOutputStreamImpl();
TagWriterImpl mapValueOut = TagWriterImpl.newNestedInstance(out, mapValueBaos);
marshallerDelegate.marshall(mapValueOut, md, (V) value);
mapValueOut.flush();
Expand Down
Loading