Skip to content

Commit

Permalink
Python: Non-Cython fallback Avro parser (#8521)
Browse files Browse the repository at this point in the history
  • Loading branch information
Fokko committed Sep 30, 2023
1 parent 9b193fa commit 66b57e5
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 82 deletions.
4 changes: 1 addition & 3 deletions build-module.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@
import shutil
from pathlib import Path

# Uncommend if your library can still function if extensions fail to compile.
allowed_to_fail = False
# allowed_to_fail = os.environ.get("CIBUILDWHEEL", "0") != "1"
allowed_to_fail = os.environ.get("CIBUILDWHEEL", "0") != "1"


def build_cython_extensions() -> None:
Expand Down
15 changes: 8 additions & 7 deletions pyiceberg/avro/decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import io
from abc import ABC, abstractmethod
from io import SEEK_CUR
from typing import (
Dict,
List,
Tuple,
Union,
cast,
)

Expand All @@ -30,10 +32,6 @@
class BinaryDecoder(ABC):
"""Decodes bytes into Python physical primitives."""

@abstractmethod
def __init__(self, input_stream: InputStream) -> None:
"""Create the decoder."""

@abstractmethod
def tell(self) -> int:
"""Return the current position."""
Expand Down Expand Up @@ -138,10 +136,13 @@ class StreamingBinaryDecoder(BinaryDecoder):
__slots__ = "_input_stream"
_input_stream: InputStream

def __init__(self, input_stream: InputStream) -> None:
def __init__(self, input_stream: Union[bytes, InputStream]) -> None:
"""Reader is a Python object on which we can call read, seek, and tell."""
super().__init__(input_stream)
self._input_stream = input_stream
if isinstance(input_stream, bytes):
# In the case of bytes, we wrap it into a BytesIO to make it a stream
self._input_stream = io.BytesIO(input_stream)
else:
self._input_stream = input_stream

def tell(self) -> int:
"""Return the current stream position."""
Expand Down
13 changes: 10 additions & 3 deletions pyiceberg/avro/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,14 @@
)

from pyiceberg.avro.codecs import KNOWN_CODECS, Codec
from pyiceberg.avro.decoder_fast import CythonBinaryDecoder

try:
from pyiceberg.avro.decoder_fast import CythonBinaryDecoder as AvroDecoder
except ModuleNotFoundError:
import warnings

warnings.warn("Falling back to pure Python Avro decoder, missing Cython extension")
from pyiceberg.avro.decoder import StreamingBinaryDecoder as AvroDecoder # type: ignore
from pyiceberg.avro.encoder import BinaryEncoder
from pyiceberg.avro.reader import ReadableDecoder, Reader
from pyiceberg.avro.resolver import construct_reader, construct_writer, resolve
Expand Down Expand Up @@ -166,7 +173,7 @@ def __enter__(self) -> AvroFile[D]:
A generator returning the AvroStructs.
"""
with self.input_file.open() as f:
self.decoder = CythonBinaryDecoder(f.read())
self.decoder = AvroDecoder(f.read())
self.header = self._read_header()
self.schema = self.header.get_schema()
if not self.read_schema:
Expand Down Expand Up @@ -198,7 +205,7 @@ def _read_block(self) -> int:
if codec := self.header.compression_codec():
block_bytes = codec.decompress(block_bytes)

self.block = Block(reader=self.reader, block_records=block_records, block_decoder=CythonBinaryDecoder(block_bytes))
self.block = Block(reader=self.reader, block_records=block_records, block_decoder=AvroDecoder(block_bytes))
return block_records

def __next__(self) -> D:
Expand Down
89 changes: 35 additions & 54 deletions tests/avro/test_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
# under the License.
from __future__ import annotations

import io
import itertools
import struct
from io import SEEK_SET
Expand All @@ -32,45 +31,38 @@
from pyiceberg.io import InputStream
from pyiceberg.types import DoubleType, FloatType

AVAILABLE_DECODERS = [StreamingBinaryDecoder, lambda stream: CythonBinaryDecoder(stream.read())]

CALLABLE_DECODER = Callable[[InputStream], ReadableDecoder]
AVAILABLE_DECODERS = [StreamingBinaryDecoder, CythonBinaryDecoder]


@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
def test_read_boolean_true(decoder_class: CALLABLE_DECODER) -> None:
mis = io.BytesIO(b"\x01")
decoder = decoder_class(mis)
def test_read_boolean_true(decoder_class: Callable[[bytes], ReadableDecoder]) -> None:
decoder = decoder_class(b"\x01")
assert decoder.read_boolean() is True


@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
def test_read_boolean_false(decoder_class: CALLABLE_DECODER) -> None:
mis = io.BytesIO(b"\x00")
decoder = decoder_class(mis)
def test_read_boolean_false(decoder_class: Callable[[bytes], ReadableDecoder]) -> None:
decoder = decoder_class(b"\x00")
assert decoder.read_boolean() is False


@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
def test_skip_boolean(decoder_class: CALLABLE_DECODER) -> None:
mis = io.BytesIO(b"\x00")
decoder = decoder_class(mis)
def test_skip_boolean(decoder_class: Callable[[bytes], ReadableDecoder]) -> None:
decoder = decoder_class(b"\x00")
assert decoder.tell() == 0
decoder.skip_boolean()
assert decoder.tell() == 1


@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
def test_read_int(decoder_class: CALLABLE_DECODER) -> None:
mis = io.BytesIO(b"\x18")
decoder = decoder_class(mis)
def test_read_int(decoder_class: Callable[[bytes], ReadableDecoder]) -> None:
decoder = decoder_class(b"\x18")
assert decoder.read_int() == 12


@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
def test_read_int_longer(decoder_class: CALLABLE_DECODER) -> None:
mis = io.BytesIO(b"\x8e\xd1\x87\x01")
decoder = decoder_class(mis)
def test_read_int_longer(decoder_class: Callable[[bytes], ReadableDecoder]) -> None:
decoder = decoder_class(b"\x8e\xd1\x87\x01")
assert decoder.read_int() == 1111111


Expand All @@ -88,27 +80,24 @@ def zigzag_encode(datum: int) -> bytes:
"decoder_class, expected_value",
list(itertools.product(AVAILABLE_DECODERS, [0, -1, 2**32, -(2**32), (2**63 - 1), -(2**63)])),
)
def test_read_int_custom_encode(decoder_class: CALLABLE_DECODER, expected_value: int) -> None:
def test_read_int_custom_encode(decoder_class: Callable[[bytes], ReadableDecoder], expected_value: int) -> None:
encoded = zigzag_encode(expected_value)
mis = io.BytesIO(encoded)
decoder = decoder_class(mis)
decoder = decoder_class(encoded)
decoded = decoder.read_int()
assert decoded == expected_value, f"Decoded value does not match decoded={decoded} expected={expected_value}"


@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
def test_skip_int(decoder_class: CALLABLE_DECODER) -> None:
mis = io.BytesIO(b"\x18")
decoder = decoder_class(mis)
def test_skip_int(decoder_class: Callable[[bytes], ReadableDecoder]) -> None:
decoder = decoder_class(b"\x18")
assert decoder.tell() == 0
decoder.skip_int()
assert decoder.tell() == 1


@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
def test_read_negative_bytes(decoder_class: CALLABLE_DECODER) -> None:
mis = io.BytesIO(b"")
decoder = decoder_class(mis)
def test_read_negative_bytes(decoder_class: Callable[[bytes], ReadableDecoder]) -> None:
decoder = decoder_class(b"")

with pytest.raises(ValueError) as exc_info:
decoder.read(-1)
Expand Down Expand Up @@ -148,70 +137,62 @@ def __exit__(

# InMemoryBinaryDecoder doesn't work for a byte at a time reading
@pytest.mark.parametrize("decoder_class", [StreamingBinaryDecoder])
def test_read_single_byte_at_the_time(decoder_class: CALLABLE_DECODER) -> None:
decoder = decoder_class(OneByteAtATimeInputStream())
def test_read_single_byte_at_the_time(decoder_class: Callable[[bytes], ReadableDecoder]) -> None:
decoder = decoder_class(OneByteAtATimeInputStream()) # type: ignore
assert decoder.read(2) == b"\x01\x02"


@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
def test_read_float(decoder_class: CALLABLE_DECODER) -> None:
mis = io.BytesIO(b"\x00\x00\x9A\x41")
decoder = decoder_class(mis)
def test_read_float(decoder_class: Callable[[bytes], ReadableDecoder]) -> None:
decoder = decoder_class(b"\x00\x00\x9A\x41")
assert decoder.read_float() == 19.25


@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
def test_skip_float(decoder_class: CALLABLE_DECODER) -> None:
mis = io.BytesIO(b"\x00\x00\x9A\x41")
decoder = decoder_class(mis)
def test_skip_float(decoder_class: Callable[[bytes], ReadableDecoder]) -> None:
decoder = decoder_class(b"\x00\x00\x9A\x41")
assert decoder.tell() == 0
decoder.skip_float()
assert decoder.tell() == 4


@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
def test_read_double(decoder_class: CALLABLE_DECODER) -> None:
mis = io.BytesIO(b"\x00\x00\x00\x00\x00\x40\x33\x40")
decoder = decoder_class(mis)
def test_read_double(decoder_class: Callable[[bytes], ReadableDecoder]) -> None:
decoder = decoder_class(b"\x00\x00\x00\x00\x00\x40\x33\x40")
assert decoder.read_double() == 19.25


@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
def test_skip_double(decoder_class: CALLABLE_DECODER) -> None:
mis = io.BytesIO(b"\x00\x00\x00\x00\x00\x40\x33\x40")
decoder = decoder_class(mis)
def test_skip_double(decoder_class: Callable[[bytes], ReadableDecoder]) -> None:
decoder = decoder_class(b"\x00\x00\x00\x00\x00\x40\x33\x40")
assert decoder.tell() == 0
decoder.skip_double()
assert decoder.tell() == 8


@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
def test_read_bytes(decoder_class: CALLABLE_DECODER) -> None:
mis = io.BytesIO(b"\x08\x01\x02\x03\x04")
decoder = decoder_class(mis)
def test_read_bytes(decoder_class: Callable[[bytes], ReadableDecoder]) -> None:
decoder = decoder_class(b"\x08\x01\x02\x03\x04")
actual = decoder.read_bytes()
assert actual == b"\x01\x02\x03\x04"


@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
def test_read_utf8(decoder_class: CALLABLE_DECODER) -> None:
mis = io.BytesIO(b"\x04\x76\x6F")
decoder = decoder_class(mis)
def test_read_utf8(decoder_class: Callable[[bytes], ReadableDecoder]) -> None:
decoder = decoder_class(b"\x04\x76\x6F")
assert decoder.read_utf8() == "vo"


@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
def test_skip_utf8(decoder_class: CALLABLE_DECODER) -> None:
mis = io.BytesIO(b"\x04\x76\x6F")
decoder = decoder_class(mis)
def test_skip_utf8(decoder_class: Callable[[bytes], ReadableDecoder]) -> None:
decoder = decoder_class(b"\x04\x76\x6F")
assert decoder.tell() == 0
decoder.skip_utf8()
assert decoder.tell() == 3


@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
def test_read_int_as_float(decoder_class: CALLABLE_DECODER) -> None:
mis = io.BytesIO(b"\x00\x00\x9A\x41")
decoder = decoder_class(mis)
def test_read_int_as_float(decoder_class: Callable[[bytes], ReadableDecoder]) -> None:
decoder = decoder_class(b"\x00\x00\x9A\x41")
reader = resolve(FloatType(), DoubleType())
assert reader.read(decoder) == 19.25
24 changes: 9 additions & 15 deletions tests/avro/test_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
# specific language governing permissions and limitations
# under the License.
# pylint:disable=protected-access
import io
import json
from typing import Callable

Expand All @@ -42,7 +41,6 @@
UUIDReader,
)
from pyiceberg.avro.resolver import construct_reader
from pyiceberg.io import InputStream
from pyiceberg.io.pyarrow import PyArrowFileIO
from pyiceberg.manifest import MANIFEST_ENTRY_SCHEMA, DataFile, ManifestEntry
from pyiceberg.schema import Schema
Expand All @@ -67,7 +65,7 @@
UUIDType,
)

AVAILABLE_DECODERS = [StreamingBinaryDecoder, lambda stream: CythonBinaryDecoder(stream.read())]
AVAILABLE_DECODERS = [StreamingBinaryDecoder, CythonBinaryDecoder]


def test_read_header(generated_manifest_entry_file: str, iceberg_manifest_entry_schema: Schema) -> None:
Expand Down Expand Up @@ -342,18 +340,16 @@ def test_uuid_reader() -> None:


@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
def test_read_struct(decoder_class: Callable[[InputStream], ReadableDecoder]) -> None:
mis = io.BytesIO(b"\x18")
decoder = decoder_class(mis)
def test_read_struct(decoder_class: Callable[[bytes], ReadableDecoder]) -> None:
decoder = decoder_class(b"\x18")
struct = StructType(NestedField(1, "id", IntegerType(), required=True))
result = StructReader(((0, IntegerReader()),), Record, struct).read(decoder)
assert repr(result) == "Record[id=12]"


@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
def test_read_struct_lambda(decoder_class: Callable[[InputStream], ReadableDecoder]) -> None:
mis = io.BytesIO(b"\x18")
decoder = decoder_class(mis)
def test_read_struct_lambda(decoder_class: Callable[[bytes], ReadableDecoder]) -> None:
decoder = decoder_class(b"\x18")

struct = StructType(NestedField(1, "id", IntegerType(), required=True))
# You can also pass in an arbitrary function that returns a struct
Expand All @@ -364,9 +360,8 @@ def test_read_struct_lambda(decoder_class: Callable[[InputStream], ReadableDecod


@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
def test_read_not_struct_type(decoder_class: Callable[[InputStream], ReadableDecoder]) -> None:
mis = io.BytesIO(b"\x18")
decoder = decoder_class(mis)
def test_read_not_struct_type(decoder_class: Callable[[bytes], ReadableDecoder]) -> None:
decoder = decoder_class(b"\x18")

struct = StructType(NestedField(1, "id", IntegerType(), required=True))
with pytest.raises(ValueError) as exc_info:
Expand All @@ -376,9 +371,8 @@ def test_read_not_struct_type(decoder_class: Callable[[InputStream], ReadableDec


@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
def test_read_struct_exception_handling(decoder_class: Callable[[InputStream], ReadableDecoder]) -> None:
mis = io.BytesIO(b"\x18")
decoder = decoder_class(mis)
def test_read_struct_exception_handling(decoder_class: Callable[[bytes], ReadableDecoder]) -> None:
decoder = decoder_class(b"\x18")

def raise_err(struct: StructType) -> None:
raise TypeError("boom")
Expand Down

0 comments on commit 66b57e5

Please sign in to comment.