diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/DefinitionLevelDecoder.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/DefinitionLevelDecoder.java index 28721897e5269..31bf62a3508f3 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/DefinitionLevelDecoder.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/DefinitionLevelDecoder.java @@ -13,7 +13,7 @@ */ package com.facebook.presto.parquet.batchreader.decoders; -import com.facebook.presto.parquet.batchreader.decoders.rle.BaseRLEBitPackedDecoder; +import com.facebook.presto.parquet.batchreader.decoders.rle.GenericRLEDictionaryValuesDecoder; import org.apache.parquet.io.ParquetDecodingException; import java.io.IOException; @@ -22,7 +22,7 @@ import static com.google.common.base.Preconditions.checkState; public class DefinitionLevelDecoder - extends BaseRLEBitPackedDecoder + extends GenericRLEDictionaryValuesDecoder { public DefinitionLevelDecoder(int valueCount, int bitWidth, InputStream inputStream) { @@ -40,16 +40,16 @@ public void readNext(int[] values, int offset, int length) int destinationIndex = offset; int remainingToCopy = length; while (remainingToCopy > 0) { - if (currentCount == 0) { + if (getCurrentCount() == 0) { if (!decode()) { break; } } - int chunkSize = Math.min(remainingToCopy, currentCount); - switch (mode) { + int chunkSize = Math.min(remainingToCopy, getCurrentCount()); + switch (getCurrentMode()) { case RLE: { - int rleValue = currentValue; + int rleValue = getDecodedInt(); int endIndex = destinationIndex + chunkSize; while (destinationIndex < endIndex) { values[destinationIndex] = rleValue; @@ -58,14 +58,15 @@ public void readNext(int[] values, int offset, int length) break; } case PACKED: { - System.arraycopy(currentBuffer, currentBuffer.length - currentCount, values, destinationIndex, chunkSize); + int[] decodedInts = getDecodedInts(); + System.arraycopy(decodedInts, decodedInts.length - getCurrentCount(), values, destinationIndex, chunkSize); destinationIndex += chunkSize; break; } default: - throw new ParquetDecodingException("not a valid mode " + mode); + throw new ParquetDecodingException("not a valid mode " + getCurrentMode()); } - currentCount -= chunkSize; + decrementCurrentCount(chunkSize); remainingToCopy -= chunkSize; } checkState(remainingToCopy == 0, "Failed to copy the requested number of DLs"); diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/FlatDefinitionLevelDecoder.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/FlatDefinitionLevelDecoder.java index 71a86ccac5324..d8af284b9c976 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/FlatDefinitionLevelDecoder.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/FlatDefinitionLevelDecoder.java @@ -13,7 +13,7 @@ */ package com.facebook.presto.parquet.batchreader.decoders; -import com.facebook.presto.parquet.batchreader.decoders.rle.BaseRLEBitPackedDecoder; +import com.facebook.presto.parquet.batchreader.decoders.rle.GenericRLEDictionaryValuesDecoder; import org.apache.parquet.io.ParquetDecodingException; import java.io.IOException; @@ -25,7 +25,7 @@ * Definition Level decoder for non-nested types where the values are either 0 or 1 */ public class FlatDefinitionLevelDecoder - extends BaseRLEBitPackedDecoder + extends GenericRLEDictionaryValuesDecoder { public FlatDefinitionLevelDecoder(int valueCount, InputStream inputStream) { @@ -44,26 +44,26 @@ public int readNext(boolean[] values, int offset, int length) int destinationIndex = offset; int remainingToCopy = length; while (remainingToCopy > 0) { - if (currentCount == 0) { + if (getCurrentCount() == 0) { if (!decode()) { break; } } - int chunkSize = Math.min(remainingToCopy, currentCount); + int chunkSize = Math.min(remainingToCopy, getCurrentCount()); int endIndex = destinationIndex + chunkSize; - switch (mode) { + switch (getCurrentMode()) { case RLE: { - boolean rleValue = currentValue == 0; + boolean rleValue = getDecodedInt() == 0; while (destinationIndex < endIndex) { values[destinationIndex++] = rleValue; } - nonNullCount += currentValue * chunkSize; + nonNullCount += getDecodedInt() * chunkSize; break; } case PACKED: { - int[] buffer = currentBuffer; - for (int sourceIndex = buffer.length - currentCount; destinationIndex < endIndex; sourceIndex++, destinationIndex++) { + int[] buffer = getDecodedInts(); + for (int sourceIndex = buffer.length - getCurrentCount(); destinationIndex < endIndex; sourceIndex++, destinationIndex++) { final int value = buffer[sourceIndex]; values[destinationIndex] = value == 0; nonNullCount += value; @@ -71,9 +71,9 @@ public int readNext(boolean[] values, int offset, int length) break; } default: - throw new ParquetDecodingException("not a valid mode " + mode); + throw new ParquetDecodingException("not a valid mode " + getCurrentMode()); } - currentCount -= chunkSize; + decrementCurrentCount(chunkSize); remainingToCopy -= chunkSize; } diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/RepetitionLevelDecoder.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/RepetitionLevelDecoder.java index f8569b4448f1d..99d3997ff35f0 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/RepetitionLevelDecoder.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/RepetitionLevelDecoder.java @@ -14,7 +14,7 @@ package com.facebook.presto.parquet.batchreader.decoders; -import com.facebook.presto.parquet.batchreader.decoders.rle.BaseRLEBitPackedDecoder; +import com.facebook.presto.parquet.batchreader.decoders.rle.GenericRLEDictionaryValuesDecoder; import it.unimi.dsi.fastutil.ints.IntList; import org.apache.parquet.io.ParquetDecodingException; import org.openjdk.jol.info.ClassLayout; @@ -25,9 +25,9 @@ import static io.airlift.slice.SizeOf.sizeOf; public class RepetitionLevelDecoder - extends BaseRLEBitPackedDecoder + extends GenericRLEDictionaryValuesDecoder { - private static final int INSTANCE_SIZE = ClassLayout.parseClass(BaseRLEBitPackedDecoder.class).instanceSize(); + private static final int INSTANCE_SIZE = ClassLayout.parseClass(GenericRLEDictionaryValuesDecoder.class).instanceSize(); private int remaining; private int currentOffsetPackedBuffer; @@ -53,29 +53,29 @@ public int readNext(IntList repetitionLevels, int batchSize) break; } - switch (mode) { + switch (getCurrentMode()) { case RLE: { - int rleValue = currentValue; + int rleValue = getDecodedInt(); if (rleValue == 0) { - int chunkSize = Math.min(remainingToCopy, currentCount); + int chunkSize = Math.min(remainingToCopy, getCurrentCount()); for (int i = 0; i < chunkSize; i++) { repetitionLevels.add(0); } - currentCount -= chunkSize; + decrementCurrentCount(chunkSize); remaining -= chunkSize; remainingToCopy -= chunkSize; } else { - remaining -= currentCount; - for (int i = 0; i < currentCount; i++) { + remaining -= getCurrentCount(); + for (int i = 0; i < getCurrentCount(); i++) { repetitionLevels.add(rleValue); } - currentCount = 0; + decrementCurrentCount(getCurrentCount()); } break; } case PACKED: { - final int[] localBuffer = currentBuffer; + final int[] localBuffer = getDecodedInts(); do { int rlValue = localBuffer[currentOffsetPackedBuffer]; currentOffsetPackedBuffer = currentOffsetPackedBuffer + 1; @@ -86,11 +86,11 @@ public int readNext(IntList repetitionLevels, int batchSize) remaining--; } while (currentOffsetPackedBuffer < endOffsetPackedBuffer && remainingToCopy > 0); - currentCount = endOffsetPackedBuffer - currentOffsetPackedBuffer; + decrementCurrentCount(endOffsetPackedBuffer - currentOffsetPackedBuffer); break; } default: - throw new ParquetDecodingException("not a valid mode " + mode); + throw new ParquetDecodingException("not a valid mode " + getCurrentMode()); } } return batchSize - remainingToCopy; @@ -99,19 +99,19 @@ public int readNext(IntList repetitionLevels, int batchSize) @Override public long getRetainedSizeInBytes() { - return INSTANCE_SIZE + sizeOf(currentBuffer); + return INSTANCE_SIZE + sizeOf(getDecodedInts()); } private boolean ensureBlockAvailable() throws IOException { - if (currentCount == 0) { + if (getCurrentCount() == 0) { if (!decode()) { return false; } - currentCount = Math.min(remaining, currentCount); + decrementCurrentCount(Math.min(remaining, getCurrentCount())); currentOffsetPackedBuffer = 0; - endOffsetPackedBuffer = currentCount; + endOffsetPackedBuffer = getCurrentCount(); } return true; } diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/BinaryRLEDictionaryValuesDecoder.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/BinaryRLEDictionaryValuesDecoder.java index e0cd6726e02ff..fc94e38db3ef0 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/BinaryRLEDictionaryValuesDecoder.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/BinaryRLEDictionaryValuesDecoder.java @@ -26,7 +26,7 @@ import static io.airlift.slice.SizeOf.sizeOf; public class BinaryRLEDictionaryValuesDecoder - extends BaseRLEBitPackedDecoder + extends GenericRLEDictionaryValuesDecoder implements BinaryValuesDecoder { private static final int INSTANCE_SIZE = ClassLayout.parseClass(BinaryRLEDictionaryValuesDecoder.class).instanceSize(); @@ -48,17 +48,17 @@ public ValueBuffer readNext(int length) int bufferSize = 0; int remainingToCopy = length; while (remainingToCopy > 0) { - if (currentCount == 0) { + if (getCurrentCount() == 0) { if (!decode()) { break; } } - int numEntriesToFill = Math.min(remainingToCopy, currentCount); + int numEntriesToFill = Math.min(remainingToCopy, getCurrentCount()); int endIndex = destinationIndex + numEntriesToFill; - switch (mode) { + switch (getCurrentMode()) { case RLE: { - final int rleValue = currentValue; + final int rleValue = getDecodedInt(); final int rleValueLength = dictionary.getLength(rleValue); while (destinationIndex < endIndex) { dictionaries[destinationIndex++] = rleValue; @@ -67,9 +67,9 @@ public ValueBuffer readNext(int length) break; } case PACKED: { - final int[] localBuffer = currentBuffer; + final int[] localBuffer = getDecodedInts(); final BinaryBatchDictionary localDictionary = dictionary; - for (int srcIndex = currentBuffer.length - currentCount; destinationIndex < endIndex; srcIndex++, destinationIndex++) { + for (int srcIndex = localBuffer.length - getCurrentCount(); destinationIndex < endIndex; srcIndex++, destinationIndex++) { int dictionaryId = localBuffer[srcIndex]; dictionaries[destinationIndex] = dictionaryId; bufferSize += localDictionary.getLength(dictionaryId); @@ -77,9 +77,9 @@ public ValueBuffer readNext(int length) break; } default: - throw new ParquetDecodingException("not a valid mode " + this.mode); + throw new ParquetDecodingException("not a valid mode " + getCurrentMode()); } - currentCount -= numEntriesToFill; + decrementCurrentCount(numEntriesToFill); remainingToCopy -= numEntriesToFill; } @@ -110,14 +110,14 @@ public void skip(int length) { int remaining = length; while (remaining > 0) { - if (currentCount == 0) { + if (getCurrentCount() == 0) { if (!decode()) { break; } } - int chunkSize = Math.min(remaining, currentCount); - currentCount -= chunkSize; + int chunkSize = Math.min(remaining, getCurrentCount()); + decrementCurrentCount(chunkSize); remaining -= chunkSize; } checkState(remaining == 0, "Invalid read size request"); diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/BooleanRLEValuesDecoder.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/BooleanRLEValuesDecoder.java index df1b6d2d280af..fa69969505170 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/BooleanRLEValuesDecoder.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/BooleanRLEValuesDecoder.java @@ -11,51 +11,34 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.facebook.presto.parquet.batchreader.decoders.rle; -import com.facebook.presto.parquet.batchreader.BytesUtils; import com.facebook.presto.parquet.batchreader.decoders.ValuesDecoder.BooleanValuesDecoder; -import org.apache.parquet.Preconditions; import org.apache.parquet.io.ParquetDecodingException; import org.openjdk.jol.info.ClassLayout; -import java.nio.Buffer; +import java.io.ByteArrayInputStream; +import java.io.IOException; import java.nio.ByteBuffer; import static com.google.common.base.Preconditions.checkState; import static java.util.Objects.requireNonNull; public class BooleanRLEValuesDecoder + extends GenericRLEDictionaryValuesDecoder implements BooleanValuesDecoder { private static final int INSTANCE_SIZE = ClassLayout.parseClass(BooleanRLEValuesDecoder.class).instanceSize(); private final ByteBuffer inputBuffer; - private MODE mode; - private int currentCount; - private byte currentValue; - private int currentByteOffset; - private byte currentByte; - public BooleanRLEValuesDecoder(ByteBuffer inputBuffer) { + super(Integer.MAX_VALUE, 1, new ByteArrayInputStream(inputBuffer.array(), inputBuffer.arrayOffset() + inputBuffer.position(), inputBuffer.remaining())); this.inputBuffer = requireNonNull(inputBuffer); - } - - // Copied from BytesUtils.readUnsignedVarInt(InputStream in) - public static int readUnsignedVarInt(ByteBuffer in) - { - int value = 0; - - int i; - int b = in.get(); - for (i = 0; (b & 128) != 0; i += 7) { - value |= (b & 127) << i; - b = in.get(); - } - - return value | b << i; + currentBuffer = null; + mode = Mode.RLE; } @Override @@ -64,64 +47,24 @@ public void readNext(byte[] values, int offset, int length) int destinationIndex = offset; int remainingToCopy = length; while (remainingToCopy > 0) { - if (currentCount == 0) { - readNext(); - if (currentCount == 0) { - break; + if (getCurrentCount() == 0) { + try { + if (!decode()) { + break; + } + } + catch (IOException e) { + throw new ParquetDecodingException("Error decoding boolean values", e); } } - int numEntriesToFill = Math.min(remainingToCopy, currentCount); + int numEntriesToFill = Math.min(remainingToCopy, getCurrentCount()); int endIndex = destinationIndex + numEntriesToFill; - switch (mode) { - case RLE: { - byte rleValue = currentValue; - while (destinationIndex < endIndex) { - values[destinationIndex] = rleValue; - destinationIndex++; - } - break; - } - case PACKED: { - int remainingPackedBlock = numEntriesToFill; - if (currentByteOffset > 0) { - // read from the partial values remaining in current byte - int readChunk = Math.min(remainingPackedBlock, 8 - currentByteOffset); - - final byte inValue = currentByte; - for (int i = 0; i < readChunk; i++) { - values[destinationIndex++] = (byte) (inValue >> currentByteOffset & 1); - currentByteOffset++; - } - - remainingPackedBlock -= readChunk; - currentByteOffset = currentByteOffset % 8; - } - - final ByteBuffer localInputBuffer = inputBuffer; - while (remainingPackedBlock >= 8) { - BytesUtils.unpack8Values(localInputBuffer.get(), values, destinationIndex); - remainingPackedBlock -= 8; - destinationIndex += 8; - } - - if (remainingPackedBlock > 0) { - // read partial values from current byte until the requested length is satisfied - byte inValue = localInputBuffer.get(); - for (int i = 0; i < remainingPackedBlock; i++) { - values[destinationIndex++] = (byte) (inValue >> i & 1); - } - - currentByte = inValue; - currentByteOffset = remainingPackedBlock; - } - - break; - } - default: - throw new ParquetDecodingException("not a valid mode " + mode); + byte rleValue = (byte) getDecodedInt(); + while (destinationIndex < endIndex) { + values[destinationIndex++] = rleValue; } - currentCount -= numEntriesToFill; + decrementCurrentCount(numEntriesToFill); remainingToCopy -= numEntriesToFill; } } @@ -131,52 +74,22 @@ public void skip(int length) { int remainingToSkip = length; while (remainingToSkip > 0) { - if (currentCount == 0) { - readNext(); - if (currentCount == 0) { - break; - } - } - - int numEntriesToSkip = Math.min(remainingToSkip, currentCount); - switch (mode) { - case RLE: - break; - case PACKED: { - int remainingPackedBlock = numEntriesToSkip; - if (currentByteOffset > 0) { - // read from the partial values remaining in current byte - int skipChunk = Math.min(remainingPackedBlock, 8 - currentByteOffset); - - currentByteOffset += skipChunk; - - remainingPackedBlock -= skipChunk; - currentByteOffset = currentByteOffset % 8; - } - - int fullBytes = remainingPackedBlock / 8; - - if (fullBytes > 0) { - ((Buffer) inputBuffer).position(inputBuffer.position() + fullBytes); - } - - remainingPackedBlock = remainingPackedBlock % 8; - - if (remainingPackedBlock > 0) { - // read partial values from current byte until the requested length is satisfied - currentByte = inputBuffer.get(); - currentByteOffset = remainingPackedBlock; + if (getCurrentCount() == 0) { + try { + if (!decode()) { + break; } - - break; } - default: - throw new ParquetDecodingException("not a valid mode " + mode); + catch (IOException e) { + throw new ParquetDecodingException("Error skipping boolean values", e); + } } - currentCount -= numEntriesToSkip; + + int numEntriesToSkip = Math.min(remainingToSkip, getCurrentCount()); + decrementCurrentCount(numEntriesToSkip); remainingToSkip -= numEntriesToSkip; } - checkState(remainingToSkip == 0, "Invalid read size request"); + checkState(remainingToSkip == 0, "Invalid skip size request"); } @Override @@ -184,30 +97,4 @@ public long getRetainedSizeInBytes() { return INSTANCE_SIZE + inputBuffer.array().length; } - - private void readNext() - { - Preconditions.checkArgument(inputBuffer.hasRemaining(), "Reading past RLE/BitPacking stream."); - int header = readUnsignedVarInt(inputBuffer); - mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED; - switch (mode) { - case RLE: - currentCount = header >>> 1; - currentValue = inputBuffer.get(); - return; - case PACKED: - int numGroups = header >>> 1; - currentCount = numGroups * 8; - currentByteOffset = 0; - return; - default: - throw new ParquetDecodingException("not a valid mode " + mode); - } - } - - private enum MODE - { - RLE, - PACKED; - } } diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/BaseRLEBitPackedDecoder.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/GenericRLEDictionaryValuesDecoder.java similarity index 62% rename from presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/BaseRLEBitPackedDecoder.java rename to presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/GenericRLEDictionaryValuesDecoder.java index d6c58c4a84ad4..824a6279fc7a9 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/BaseRLEBitPackedDecoder.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/GenericRLEDictionaryValuesDecoder.java @@ -11,10 +11,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.facebook.presto.parquet.batchreader.decoders.rle; -import org.apache.parquet.column.values.bitpacking.BytePacker; -import org.apache.parquet.column.values.bitpacking.Packer; import org.apache.parquet.io.ParquetDecodingException; import org.openjdk.jol.info.ClassLayout; @@ -22,17 +21,17 @@ import java.io.IOException; import java.io.InputStream; -import static com.facebook.presto.parquet.batchreader.decoders.rle.BaseRLEBitPackedDecoder.Mode.PACKED; -import static com.facebook.presto.parquet.batchreader.decoders.rle.BaseRLEBitPackedDecoder.Mode.RLE; +import static com.facebook.presto.parquet.batchreader.decoders.rle.GenericRLEDictionaryValuesDecoder.Mode.PACKED; +import static com.facebook.presto.parquet.batchreader.decoders.rle.GenericRLEDictionaryValuesDecoder.Mode.RLE; import static com.google.common.base.Preconditions.checkArgument; import static io.airlift.slice.SizeOf.sizeOf; import static java.lang.Math.ceil; import static org.apache.parquet.bytes.BytesUtils.readIntLittleEndianPaddedOnBitWidth; import static org.apache.parquet.bytes.BytesUtils.readUnsignedVarInt; -public abstract class BaseRLEBitPackedDecoder +public abstract class GenericRLEDictionaryValuesDecoder { - private static final int INSTANCE_SIZE = ClassLayout.parseClass(BaseRLEBitPackedDecoder.class).instanceSize(); + private static final int INSTANCE_SIZE = ClassLayout.parseClass(GenericRLEDictionaryValuesDecoder.class).instanceSize(); private final boolean rleOnlyMode; private final int bitWidth; @@ -44,7 +43,7 @@ public abstract class BaseRLEBitPackedDecoder protected int currentValue; protected int[] currentBuffer; - public BaseRLEBitPackedDecoder(int valueCount, int bitWidth, InputStream inputStream) + public GenericRLEDictionaryValuesDecoder(int valueCount, int bitWidth, InputStream inputStream) { checkArgument(bitWidth >= 0 && bitWidth <= 32, "bitWidth must be >= 0 and <= 32"); this.bitWidth = bitWidth; @@ -63,7 +62,7 @@ public BaseRLEBitPackedDecoder(int valueCount, int bitWidth, InputStream inputSt } } - public BaseRLEBitPackedDecoder(int rleValue, int rleValueCount) + public GenericRLEDictionaryValuesDecoder(int rleValue, int rleValueCount) { this.rleOnlyMode = true; this.bitWidth = 0; @@ -79,9 +78,22 @@ public long getRetainedSizeInBytes() return INSTANCE_SIZE + sizeOf(currentBuffer); } - protected boolean decode() - throws IOException + protected boolean decode() throws IOException { + if (this instanceof BooleanRLEValuesDecoder) { + // Boolean RLE specific logic + if (inputStream.available() <= 0) { + currentCount = 0; + return false; + } + + int header = readUnsignedVarInt(inputStream); + mode = RLE; // Boolean RLE is always RLE + currentValue = (header & 1) == 1 ? 1 : 0; + currentCount = header >>> 1; + return true; + } + if (rleOnlyMode) { // for RLE only mode there is nothing more to read return false; @@ -120,9 +132,71 @@ protected boolean decode() } } - public enum Mode + public int[] getDecodedInts() + { + return currentBuffer; + } + + public int getDecodedInt() + { + return currentValue; + } + + public Mode getCurrentMode() + { + return mode; + } + + public int getCurrentCount() + { + return currentCount; + } + + public void decrementCurrentCount(int amount) + { + currentCount -= amount; + } + + public interface BytePacker + { + void unpack8Values(byte[] input, int inputOffset, int[] output, int outputOffset); + } + + public enum Packer { + LITTLE_ENDIAN; + + public BytePacker newBytePacker(int bitWidth) + { + return new LittleEndianPacker(bitWidth); + } + } + + private static class LittleEndianPacker + implements BytePacker { - RLE, - PACKED + private final int bitWidth; + + LittleEndianPacker(int bitWidth) + { + this.bitWidth = bitWidth; + } + + @Override + public void unpack8Values(byte[] input, int inputOffset, int[] output, int outputOffset) + { + long packed = 0; + for (int i = 0; i < bitWidth; i += 8) { + packed |= ((long) input[inputOffset + (i / 8)]) << i; + } + long mask = (1L << bitWidth) - 1; + for (int i = 0; i < 8; i++) { + output[outputOffset + i] = (int) (packed & mask); + packed >>>= bitWidth; + } + } + } + + public enum Mode { + RLE, PACKED } } diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/Int32RLEDictionaryValuesDecoder.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/Int32RLEDictionaryValuesDecoder.java index 6ea3a032d169e..ad242e7e09e07 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/Int32RLEDictionaryValuesDecoder.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/Int32RLEDictionaryValuesDecoder.java @@ -26,7 +26,7 @@ import static io.airlift.slice.SizeOf.sizeOf; public class Int32RLEDictionaryValuesDecoder - extends BaseRLEBitPackedDecoder + extends GenericRLEDictionaryValuesDecoder implements Int32ValuesDecoder { private static final int INSTANCE_SIZE = ClassLayout.parseClass(Int32RLEDictionaryValuesDecoder.class).instanceSize(); @@ -46,17 +46,17 @@ public void readNext(int[] values, int offset, int length) int destinationIndex = offset; int remainingToCopy = length; while (remainingToCopy > 0) { - if (currentCount == 0) { + if (getCurrentCount() == 0) { if (!decode()) { break; } } - int numEntriesToFill = Math.min(remainingToCopy, currentCount); + int numEntriesToFill = Math.min(remainingToCopy, getCurrentCount()); int endIndex = destinationIndex + numEntriesToFill; - switch (mode) { + switch (getCurrentMode()) { case RLE: { - final int rleValue = currentValue; + final int rleValue = getDecodedInt(); final int rleDictionaryValue = dictionary.decodeToInt(rleValue); while (destinationIndex < endIndex) { values[destinationIndex++] = rleDictionaryValue; @@ -64,19 +64,19 @@ public void readNext(int[] values, int offset, int length) break; } case PACKED: { - final int[] localCurrentBuffer = currentBuffer; + final int[] localCurrentBuffer = getDecodedInts(); final IntegerDictionary localDictionary = dictionary; - for (int sourceIndex = currentBuffer.length - currentCount; destinationIndex < endIndex; sourceIndex++) { + for (int sourceIndex = localCurrentBuffer.length - getCurrentCount(); destinationIndex < endIndex; sourceIndex++) { int dictionaryValue = localDictionary.decodeToInt(localCurrentBuffer[sourceIndex]); values[destinationIndex++] = dictionaryValue; } break; } default: - throw new ParquetDecodingException("not a valid mode " + mode); + throw new ParquetDecodingException("not a valid mode " + getCurrentMode()); } - currentCount -= numEntriesToFill; + decrementCurrentCount(numEntriesToFill); remainingToCopy -= numEntriesToFill; } } @@ -88,14 +88,14 @@ public void skip(int length) checkArgument(length >= 0, "invalid length %s", length); int remaining = length; while (remaining > 0) { - if (currentCount == 0) { + if (getCurrentCount() == 0) { if (!decode()) { break; } } - int chunkSize = Math.min(remaining, currentCount); - currentCount -= chunkSize; + int chunkSize = Math.min(remaining, getCurrentCount()); + decrementCurrentCount(chunkSize); remaining -= chunkSize; } checkState(remaining == 0, "End of stream: Invalid skip size request: %s", length); @@ -104,6 +104,6 @@ public void skip(int length) @Override public long getRetainedSizeInBytes() { - return INSTANCE_SIZE + (dictionary == null ? 0 : dictionary.getRetainedSizeInBytes()) + sizeOf(currentBuffer); + return INSTANCE_SIZE + (dictionary == null ? 0 : dictionary.getRetainedSizeInBytes()) + sizeOf(getDecodedInts()); } } diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/Int64RLEDictionaryValuesDecoder.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/Int64RLEDictionaryValuesDecoder.java index 51d6da3836278..68ba1cb07b912 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/Int64RLEDictionaryValuesDecoder.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/Int64RLEDictionaryValuesDecoder.java @@ -27,7 +27,7 @@ import static io.airlift.slice.SizeOf.sizeOf; public class Int64RLEDictionaryValuesDecoder - extends BaseRLEBitPackedDecoder + extends GenericRLEDictionaryValuesDecoder implements Int64ValuesDecoder, ShortDecimalValuesDecoder { private static final int INSTANCE_SIZE = ClassLayout.parseClass(Int64RLEDictionaryValuesDecoder.class).instanceSize(); @@ -47,17 +47,17 @@ public void readNext(long[] values, int offset, int length) int destinationIndex = offset; int remainingToCopy = length; while (remainingToCopy > 0) { - if (currentCount == 0) { + if (getCurrentCount() == 0) { if (!decode()) { break; } } - int numEntriesToFill = Math.min(remainingToCopy, currentCount); + int numEntriesToFill = Math.min(remainingToCopy, getCurrentCount()); int endIndex = destinationIndex + numEntriesToFill; - switch (mode) { + switch (getCurrentMode()) { case RLE: { - final int rleValue = currentValue; + final int rleValue = getDecodedInt(); final long rleDictionaryValue = dictionary.decodeToLong(rleValue); while (destinationIndex < endIndex) { values[destinationIndex++] = rleDictionaryValue; @@ -65,19 +65,19 @@ public void readNext(long[] values, int offset, int length) break; } case PACKED: { - final int[] localBuffer = currentBuffer; + final int[] localBuffer = getDecodedInts(); final LongDictionary localDictionary = dictionary; - for (int srcIndex = currentBuffer.length - currentCount; destinationIndex < endIndex; srcIndex++) { + for (int srcIndex = localBuffer.length - getCurrentCount(); destinationIndex < endIndex; srcIndex++) { long dictionaryValue = localDictionary.decodeToLong(localBuffer[srcIndex]); values[destinationIndex++] = dictionaryValue; } break; } default: - throw new ParquetDecodingException("not a valid mode " + mode); + throw new ParquetDecodingException("not a valid mode " + getCurrentMode()); } - currentCount -= numEntriesToFill; + decrementCurrentCount(numEntriesToFill); remainingToCopy -= numEntriesToFill; } @@ -91,14 +91,14 @@ public void skip(int length) checkArgument(length >= 0, "invalid length %s", length); int remaining = length; while (remaining > 0) { - if (currentCount == 0) { + if (getCurrentCount() == 0) { if (!decode()) { break; } } - int chunkSize = Math.min(remaining, currentCount); - currentCount -= chunkSize; + int chunkSize = Math.min(remaining, getCurrentCount()); + decrementCurrentCount(chunkSize); remaining -= chunkSize; } checkState(remaining == 0, "End of stream: Invalid skip size request: %s", length); @@ -107,6 +107,6 @@ public void skip(int length) @Override public long getRetainedSizeInBytes() { - return INSTANCE_SIZE + (dictionary == null ? 0 : dictionary.getRetainedSizeInBytes()) + sizeOf(currentBuffer); + return INSTANCE_SIZE + (dictionary == null ? 0 : dictionary.getRetainedSizeInBytes()) + sizeOf(getDecodedInts()); } } diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/Int64TimeAndTimestampMicrosRLEDictionaryValuesDecoder.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/Int64TimeAndTimestampMicrosRLEDictionaryValuesDecoder.java index c94d88f896805..2834f775534e1 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/Int64TimeAndTimestampMicrosRLEDictionaryValuesDecoder.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/Int64TimeAndTimestampMicrosRLEDictionaryValuesDecoder.java @@ -29,7 +29,7 @@ import static java.util.concurrent.TimeUnit.MICROSECONDS; public class Int64TimeAndTimestampMicrosRLEDictionaryValuesDecoder - extends BaseRLEBitPackedDecoder + extends GenericRLEDictionaryValuesDecoder implements Int64TimeAndTimestampMicrosValuesDecoder { private static final int INSTANCE_SIZE = ClassLayout.parseClass(Int64TimeAndTimestampMicrosRLEDictionaryValuesDecoder.class).instanceSize(); @@ -57,17 +57,17 @@ public void readNext(long[] values, int offset, int length) int destinationIndex = offset; int remainingToCopy = length; while (remainingToCopy > 0) { - if (currentCount == 0) { + if (getCurrentCount() == 0) { if (!decode()) { break; } } - int numEntriesToFill = Math.min(remainingToCopy, currentCount); + int numEntriesToFill = Math.min(remainingToCopy, getCurrentCount()); int endIndex = destinationIndex + numEntriesToFill; - switch (mode) { + switch (getCurrentMode()) { case RLE: { - final int rleValue = currentValue; + final int rleValue = getDecodedInt(); final long rleDictionaryValue = MICROSECONDS.toMillis(dictionary.decodeToLong(rleValue)); while (destinationIndex < endIndex) { values[destinationIndex++] = packFunction.pack(rleDictionaryValue); @@ -75,9 +75,9 @@ public void readNext(long[] values, int offset, int length) break; } case PACKED: { - final int[] localBuffer = currentBuffer; + final int[] localBuffer = getDecodedInts(); final LongDictionary localDictionary = dictionary; - for (int srcIndex = currentBuffer.length - currentCount; destinationIndex < endIndex; srcIndex++) { + for (int srcIndex = localBuffer.length - getCurrentCount(); destinationIndex < endIndex; srcIndex++) { long dictionaryValue = localDictionary.decodeToLong(localBuffer[srcIndex]); long millisValue = MICROSECONDS.toMillis(dictionaryValue); values[destinationIndex++] = packFunction.pack(millisValue); @@ -85,10 +85,10 @@ public void readNext(long[] values, int offset, int length) break; } default: - throw new ParquetDecodingException("not a valid mode " + mode); + throw new ParquetDecodingException("not a valid mode " + getCurrentMode()); } - currentCount -= numEntriesToFill; + decrementCurrentCount(numEntriesToFill); remainingToCopy -= numEntriesToFill; } @@ -102,14 +102,14 @@ public void skip(int length) checkArgument(length >= 0, "invalid length %s", length); int remaining = length; while (remaining > 0) { - if (currentCount == 0) { + if (getCurrentCount() == 0) { if (!decode()) { break; } } - int chunkSize = Math.min(remaining, currentCount); - currentCount -= chunkSize; + int chunkSize = Math.min(remaining, getCurrentCount()); + decrementCurrentCount(chunkSize); remaining -= chunkSize; } @@ -119,6 +119,12 @@ public void skip(int length) @Override public long getRetainedSizeInBytes() { - return INSTANCE_SIZE + (dictionary == null ? 0 : dictionary.getRetainedSizeInBytes()) + sizeOf(currentBuffer); + return INSTANCE_SIZE + (dictionary == null ? 0 : dictionary.getRetainedSizeInBytes()) + sizeOf(getDecodedInts()); + } + + @FunctionalInterface + public interface PackFunction + { + long pack(long millis); } } diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/LongDecimalRLEDictionaryValuesDecoder.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/LongDecimalRLEDictionaryValuesDecoder.java index 05ced8bc80107..50cf29e59d263 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/LongDecimalRLEDictionaryValuesDecoder.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/LongDecimalRLEDictionaryValuesDecoder.java @@ -27,7 +27,7 @@ import static java.util.Objects.requireNonNull; public class LongDecimalRLEDictionaryValuesDecoder - extends BaseRLEBitPackedDecoder + extends GenericRLEDictionaryValuesDecoder implements LongDecimalValuesDecoder { private final BinaryRLEDictionaryValuesDecoder delegate; diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/ShortDecimalRLEDictionaryValuesDecoder.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/ShortDecimalRLEDictionaryValuesDecoder.java index 16e4ee7e4653b..865f1649d72ce 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/ShortDecimalRLEDictionaryValuesDecoder.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/ShortDecimalRLEDictionaryValuesDecoder.java @@ -25,7 +25,7 @@ import static java.util.Objects.requireNonNull; public class ShortDecimalRLEDictionaryValuesDecoder - extends BaseRLEBitPackedDecoder + extends GenericRLEDictionaryValuesDecoder implements ShortDecimalValuesDecoder { private final BinaryRLEDictionaryValuesDecoder delegate; diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/TimestampRLEDictionaryValuesDecoder.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/TimestampRLEDictionaryValuesDecoder.java index 869e3139465ef..2c40f0faca6be 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/TimestampRLEDictionaryValuesDecoder.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/TimestampRLEDictionaryValuesDecoder.java @@ -26,7 +26,7 @@ import static io.airlift.slice.SizeOf.sizeOf; public class TimestampRLEDictionaryValuesDecoder - extends BaseRLEBitPackedDecoder + extends GenericRLEDictionaryValuesDecoder implements TimestampValuesDecoder { private static final int INSTANCE_SIZE = ClassLayout.parseClass(TimestampRLEDictionaryValuesDecoder.class).instanceSize(); @@ -46,17 +46,17 @@ public void readNext(long[] values, int offset, int length) int destinationIndex = offset; int remainingToCopy = length; while (remainingToCopy > 0) { - if (currentCount == 0) { + if (getCurrentCount() == 0) { if (!decode()) { break; } } - int numEntriesToFill = Math.min(remainingToCopy, currentCount); + int numEntriesToFill = Math.min(remainingToCopy, getCurrentCount()); int endIndex = destinationIndex + numEntriesToFill; - switch (mode) { + switch (getCurrentMode()) { case RLE: { - final int rleValue = currentValue; + final int rleValue = getDecodedInt(); final long rleValueMillis = dictionary.decodeToLong(rleValue); while (destinationIndex < endIndex) { values[destinationIndex++] = rleValueMillis; @@ -64,18 +64,18 @@ public void readNext(long[] values, int offset, int length) break; } case PACKED: { - final int[] localBuffer = currentBuffer; + final int[] localBuffer = getDecodedInts(); final TimestampDictionary localDictionary = dictionary; - for (int srcIndex = currentBuffer.length - currentCount; destinationIndex < endIndex; srcIndex++) { + for (int srcIndex = localBuffer.length - getCurrentCount(); destinationIndex < endIndex; srcIndex++) { values[destinationIndex++] = localDictionary.decodeToLong(localBuffer[srcIndex]); } break; } default: - throw new ParquetDecodingException("not a valid mode " + mode); + throw new ParquetDecodingException("not a valid mode " + getCurrentMode()); } - currentCount -= numEntriesToFill; + decrementCurrentCount(numEntriesToFill); remainingToCopy -= numEntriesToFill; } checkState(remainingToCopy == 0, "End of stream: Invalid read size request"); @@ -88,14 +88,14 @@ public void skip(int length) checkArgument(length >= 0, "invalid length %s", length); int remaining = length; while (remaining > 0) { - if (currentCount == 0) { + if (getCurrentCount() == 0) { if (!decode()) { break; } } - int readChunkSize = Math.min(remaining, currentCount); - currentCount -= readChunkSize; + int readChunkSize = Math.min(remaining, getCurrentCount()); + decrementCurrentCount(readChunkSize); remaining -= readChunkSize; } checkState(remaining == 0, "End of stream: Invalid skip size request: %s", length); @@ -104,6 +104,6 @@ public void skip(int length) @Override public long getRetainedSizeInBytes() { - return INSTANCE_SIZE + (dictionary == null ? 0 : dictionary.getRetainedSizeInBytes()) + sizeOf(currentBuffer); + return INSTANCE_SIZE + (dictionary == null ? 0 : dictionary.getRetainedSizeInBytes()) + sizeOf(getDecodedInts()); } } diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/UuidRLEDictionaryValuesDecoder.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/UuidRLEDictionaryValuesDecoder.java index 30fdc0e93d77a..3731507417dc2 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/UuidRLEDictionaryValuesDecoder.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/batchreader/decoders/rle/UuidRLEDictionaryValuesDecoder.java @@ -24,7 +24,7 @@ import static java.util.Objects.requireNonNull; public class UuidRLEDictionaryValuesDecoder - extends BaseRLEBitPackedDecoder + extends GenericRLEDictionaryValuesDecoder implements UuidValuesDecoder { private final BinaryRLEDictionaryValuesDecoder delegate; diff --git a/presto-parquet/src/test/java/com/facebook/presto/parquet/batchreader/decoders/TestValuesDecoders.java b/presto-parquet/src/test/java/com/facebook/presto/parquet/batchreader/decoders/TestValuesDecoders.java index e42e9af8987da..c23c7f409a505 100644 --- a/presto-parquet/src/test/java/com/facebook/presto/parquet/batchreader/decoders/TestValuesDecoders.java +++ b/presto-parquet/src/test/java/com/facebook/presto/parquet/batchreader/decoders/TestValuesDecoders.java @@ -47,6 +47,7 @@ import org.testng.annotations.Test; import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -595,14 +596,60 @@ public void testBooleanPlain() booleanBatchReadWithSkipHelper(89, 29, valueCount, booleanPlain(pageBytes), expectedValues); booleanBatchReadWithSkipHelper(1024, 1024, valueCount, booleanPlain(pageBytes), expectedValues); } + private static byte[] generateBooleanRLEData(List values) throws IOException + { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + int count = 0; + boolean currentValue = false; + + for (int value : values) { + boolean newValue = value == 1; + if (count == 0) { + currentValue = newValue; + count = 1; + } + else if (currentValue == newValue) { + count++; + } + else { + writeRLEBitPackedRun(outputStream, currentValue, count); + currentValue = newValue; + count = 1; + } + } + writeRLEBitPackedRun(outputStream, currentValue, count); + return outputStream.toByteArray(); + } + + private static void writeRLEBitPackedRun(ByteArrayOutputStream outputStream, boolean value, int count) throws IOException + { + int header = (count << 1) | (value ? 1 : 0); + while ((header & ~0x7F) != 0) { + outputStream.write((header & 0x7F) | 0x80); + header >>>= 7; + } + outputStream.write(header); + } + @Test + public void testMinimalBooleanRLE() throws IOException + { + List values = Arrays.asList(0, 1, 0); + byte[] dataPage = generateBooleanRLEData(values); + List expectedValues = new ArrayList<>(values); + + booleanBatchReadWithSkipHelper(1, 0, 3, booleanRLE(dataPage), expectedValues); + } @Test - public void testBooleanRLE() + public void testBooleanRLE() throws IOException { int valueCount = 2048; List values = new ArrayList<>(); + for (int i = 0; i < valueCount; i++) { + values.add(i % 2); // Alternating 0s and 1s + } - byte[] dataPage = generateDictionaryIdPage2048(1, values); + byte[] dataPage = generateBooleanRLEData(values); List expectedValues = new ArrayList<>(values);