diff --git a/build-module.py b/build-module.py index 208a123b90..d91375e8e6 100644 --- a/build-module.py +++ b/build-module.py @@ -19,9 +19,7 @@ import shutil from pathlib import Path -# Uncommend if your library can still function if extensions fail to compile. -allowed_to_fail = False -# allowed_to_fail = os.environ.get("CIBUILDWHEEL", "0") != "1" +allowed_to_fail = os.environ.get("CIBUILDWHEEL", "0") != "1" def build_cython_extensions() -> None: diff --git a/pyiceberg/avro/decoder.py b/pyiceberg/avro/decoder.py index 46359f66f4..b6349f38a7 100644 --- a/pyiceberg/avro/decoder.py +++ b/pyiceberg/avro/decoder.py @@ -14,12 +14,14 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import io from abc import ABC, abstractmethod from io import SEEK_CUR from typing import ( Dict, List, Tuple, + Union, cast, ) @@ -30,10 +32,6 @@ class BinaryDecoder(ABC): """Decodes bytes into Python physical primitives.""" - @abstractmethod - def __init__(self, input_stream: InputStream) -> None: - """Create the decoder.""" - @abstractmethod def tell(self) -> int: """Return the current position.""" @@ -138,10 +136,13 @@ class StreamingBinaryDecoder(BinaryDecoder): __slots__ = "_input_stream" _input_stream: InputStream - def __init__(self, input_stream: InputStream) -> None: + def __init__(self, input_stream: Union[bytes, InputStream]) -> None: """Reader is a Python object on which we can call read, seek, and tell.""" - super().__init__(input_stream) - self._input_stream = input_stream + if isinstance(input_stream, bytes): + # In the case of bytes, we wrap it into a BytesIO to make it a stream + self._input_stream = io.BytesIO(input_stream) + else: + self._input_stream = input_stream def tell(self) -> int: """Return the current stream position.""" diff --git a/pyiceberg/avro/file.py b/pyiceberg/avro/file.py index 149366e8ab..fb9f32b405 100644 --- a/pyiceberg/avro/file.py +++ b/pyiceberg/avro/file.py @@ -35,7 +35,14 @@ ) from pyiceberg.avro.codecs import KNOWN_CODECS, Codec -from pyiceberg.avro.decoder_fast import CythonBinaryDecoder + +try: + from pyiceberg.avro.decoder_fast import CythonBinaryDecoder as AvroDecoder +except ModuleNotFoundError: + import warnings + + warnings.warn("Falling back to pure Python Avro decoder, missing Cython extension") + from pyiceberg.avro.decoder import StreamingBinaryDecoder as AvroDecoder # type: ignore from pyiceberg.avro.encoder import BinaryEncoder from pyiceberg.avro.reader import ReadableDecoder, Reader from pyiceberg.avro.resolver import construct_reader, construct_writer, resolve @@ -166,7 +173,7 @@ def __enter__(self) -> AvroFile[D]: A generator returning the AvroStructs. """ with self.input_file.open() as f: - self.decoder = CythonBinaryDecoder(f.read()) + self.decoder = AvroDecoder(f.read()) self.header = self._read_header() self.schema = self.header.get_schema() if not self.read_schema: @@ -198,7 +205,7 @@ def _read_block(self) -> int: if codec := self.header.compression_codec(): block_bytes = codec.decompress(block_bytes) - self.block = Block(reader=self.reader, block_records=block_records, block_decoder=CythonBinaryDecoder(block_bytes)) + self.block = Block(reader=self.reader, block_records=block_records, block_decoder=AvroDecoder(block_bytes)) return block_records def __next__(self) -> D: diff --git a/tests/avro/test_decoder.py b/tests/avro/test_decoder.py index 0b35dcff0c..2eaf20cd4a 100644 --- a/tests/avro/test_decoder.py +++ b/tests/avro/test_decoder.py @@ -16,7 +16,6 @@ # under the License. from __future__ import annotations -import io import itertools import struct from io import SEEK_SET @@ -32,45 +31,38 @@ from pyiceberg.io import InputStream from pyiceberg.types import DoubleType, FloatType -AVAILABLE_DECODERS = [StreamingBinaryDecoder, lambda stream: CythonBinaryDecoder(stream.read())] - -CALLABLE_DECODER = Callable[[InputStream], ReadableDecoder] +AVAILABLE_DECODERS = [StreamingBinaryDecoder, CythonBinaryDecoder] @pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS) -def test_read_boolean_true(decoder_class: CALLABLE_DECODER) -> None: - mis = io.BytesIO(b"\x01") - decoder = decoder_class(mis) +def test_read_boolean_true(decoder_class: Callable[[bytes], ReadableDecoder]) -> None: + decoder = decoder_class(b"\x01") assert decoder.read_boolean() is True @pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS) -def test_read_boolean_false(decoder_class: CALLABLE_DECODER) -> None: - mis = io.BytesIO(b"\x00") - decoder = decoder_class(mis) +def test_read_boolean_false(decoder_class: Callable[[bytes], ReadableDecoder]) -> None: + decoder = decoder_class(b"\x00") assert decoder.read_boolean() is False @pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS) -def test_skip_boolean(decoder_class: CALLABLE_DECODER) -> None: - mis = io.BytesIO(b"\x00") - decoder = decoder_class(mis) +def test_skip_boolean(decoder_class: Callable[[bytes], ReadableDecoder]) -> None: + decoder = decoder_class(b"\x00") assert decoder.tell() == 0 decoder.skip_boolean() assert decoder.tell() == 1 @pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS) -def test_read_int(decoder_class: CALLABLE_DECODER) -> None: - mis = io.BytesIO(b"\x18") - decoder = decoder_class(mis) +def test_read_int(decoder_class: Callable[[bytes], ReadableDecoder]) -> None: + decoder = decoder_class(b"\x18") assert decoder.read_int() == 12 @pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS) -def test_read_int_longer(decoder_class: CALLABLE_DECODER) -> None: - mis = io.BytesIO(b"\x8e\xd1\x87\x01") - decoder = decoder_class(mis) +def test_read_int_longer(decoder_class: Callable[[bytes], ReadableDecoder]) -> None: + decoder = decoder_class(b"\x8e\xd1\x87\x01") assert decoder.read_int() == 1111111 @@ -88,27 +80,24 @@ def zigzag_encode(datum: int) -> bytes: "decoder_class, expected_value", list(itertools.product(AVAILABLE_DECODERS, [0, -1, 2**32, -(2**32), (2**63 - 1), -(2**63)])), ) -def test_read_int_custom_encode(decoder_class: CALLABLE_DECODER, expected_value: int) -> None: +def test_read_int_custom_encode(decoder_class: Callable[[bytes], ReadableDecoder], expected_value: int) -> None: encoded = zigzag_encode(expected_value) - mis = io.BytesIO(encoded) - decoder = decoder_class(mis) + decoder = decoder_class(encoded) decoded = decoder.read_int() assert decoded == expected_value, f"Decoded value does not match decoded={decoded} expected={expected_value}" @pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS) -def test_skip_int(decoder_class: CALLABLE_DECODER) -> None: - mis = io.BytesIO(b"\x18") - decoder = decoder_class(mis) +def test_skip_int(decoder_class: Callable[[bytes], ReadableDecoder]) -> None: + decoder = decoder_class(b"\x18") assert decoder.tell() == 0 decoder.skip_int() assert decoder.tell() == 1 @pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS) -def test_read_negative_bytes(decoder_class: CALLABLE_DECODER) -> None: - mis = io.BytesIO(b"") - decoder = decoder_class(mis) +def test_read_negative_bytes(decoder_class: Callable[[bytes], ReadableDecoder]) -> None: + decoder = decoder_class(b"") with pytest.raises(ValueError) as exc_info: decoder.read(-1) @@ -148,70 +137,62 @@ def __exit__( # InMemoryBinaryDecoder doesn't work for a byte at a time reading @pytest.mark.parametrize("decoder_class", [StreamingBinaryDecoder]) -def test_read_single_byte_at_the_time(decoder_class: CALLABLE_DECODER) -> None: - decoder = decoder_class(OneByteAtATimeInputStream()) +def test_read_single_byte_at_the_time(decoder_class: Callable[[bytes], ReadableDecoder]) -> None: + decoder = decoder_class(OneByteAtATimeInputStream()) # type: ignore assert decoder.read(2) == b"\x01\x02" @pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS) -def test_read_float(decoder_class: CALLABLE_DECODER) -> None: - mis = io.BytesIO(b"\x00\x00\x9A\x41") - decoder = decoder_class(mis) +def test_read_float(decoder_class: Callable[[bytes], ReadableDecoder]) -> None: + decoder = decoder_class(b"\x00\x00\x9A\x41") assert decoder.read_float() == 19.25 @pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS) -def test_skip_float(decoder_class: CALLABLE_DECODER) -> None: - mis = io.BytesIO(b"\x00\x00\x9A\x41") - decoder = decoder_class(mis) +def test_skip_float(decoder_class: Callable[[bytes], ReadableDecoder]) -> None: + decoder = decoder_class(b"\x00\x00\x9A\x41") assert decoder.tell() == 0 decoder.skip_float() assert decoder.tell() == 4 @pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS) -def test_read_double(decoder_class: CALLABLE_DECODER) -> None: - mis = io.BytesIO(b"\x00\x00\x00\x00\x00\x40\x33\x40") - decoder = decoder_class(mis) +def test_read_double(decoder_class: Callable[[bytes], ReadableDecoder]) -> None: + decoder = decoder_class(b"\x00\x00\x00\x00\x00\x40\x33\x40") assert decoder.read_double() == 19.25 @pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS) -def test_skip_double(decoder_class: CALLABLE_DECODER) -> None: - mis = io.BytesIO(b"\x00\x00\x00\x00\x00\x40\x33\x40") - decoder = decoder_class(mis) +def test_skip_double(decoder_class: Callable[[bytes], ReadableDecoder]) -> None: + decoder = decoder_class(b"\x00\x00\x00\x00\x00\x40\x33\x40") assert decoder.tell() == 0 decoder.skip_double() assert decoder.tell() == 8 @pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS) -def test_read_bytes(decoder_class: CALLABLE_DECODER) -> None: - mis = io.BytesIO(b"\x08\x01\x02\x03\x04") - decoder = decoder_class(mis) +def test_read_bytes(decoder_class: Callable[[bytes], ReadableDecoder]) -> None: + decoder = decoder_class(b"\x08\x01\x02\x03\x04") actual = decoder.read_bytes() assert actual == b"\x01\x02\x03\x04" @pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS) -def test_read_utf8(decoder_class: CALLABLE_DECODER) -> None: - mis = io.BytesIO(b"\x04\x76\x6F") - decoder = decoder_class(mis) +def test_read_utf8(decoder_class: Callable[[bytes], ReadableDecoder]) -> None: + decoder = decoder_class(b"\x04\x76\x6F") assert decoder.read_utf8() == "vo" @pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS) -def test_skip_utf8(decoder_class: CALLABLE_DECODER) -> None: - mis = io.BytesIO(b"\x04\x76\x6F") - decoder = decoder_class(mis) +def test_skip_utf8(decoder_class: Callable[[bytes], ReadableDecoder]) -> None: + decoder = decoder_class(b"\x04\x76\x6F") assert decoder.tell() == 0 decoder.skip_utf8() assert decoder.tell() == 3 @pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS) -def test_read_int_as_float(decoder_class: CALLABLE_DECODER) -> None: - mis = io.BytesIO(b"\x00\x00\x9A\x41") - decoder = decoder_class(mis) +def test_read_int_as_float(decoder_class: Callable[[bytes], ReadableDecoder]) -> None: + decoder = decoder_class(b"\x00\x00\x9A\x41") reader = resolve(FloatType(), DoubleType()) assert reader.read(decoder) == 19.25 diff --git a/tests/avro/test_reader.py b/tests/avro/test_reader.py index 99a6499bf5..9da8d04593 100644 --- a/tests/avro/test_reader.py +++ b/tests/avro/test_reader.py @@ -15,7 +15,6 @@ # specific language governing permissions and limitations # under the License. # pylint:disable=protected-access -import io import json from typing import Callable @@ -42,7 +41,6 @@ UUIDReader, ) from pyiceberg.avro.resolver import construct_reader -from pyiceberg.io import InputStream from pyiceberg.io.pyarrow import PyArrowFileIO from pyiceberg.manifest import MANIFEST_ENTRY_SCHEMA, DataFile, ManifestEntry from pyiceberg.schema import Schema @@ -67,7 +65,7 @@ UUIDType, ) -AVAILABLE_DECODERS = [StreamingBinaryDecoder, lambda stream: CythonBinaryDecoder(stream.read())] +AVAILABLE_DECODERS = [StreamingBinaryDecoder, CythonBinaryDecoder] def test_read_header(generated_manifest_entry_file: str, iceberg_manifest_entry_schema: Schema) -> None: @@ -342,18 +340,16 @@ def test_uuid_reader() -> None: @pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS) -def test_read_struct(decoder_class: Callable[[InputStream], ReadableDecoder]) -> None: - mis = io.BytesIO(b"\x18") - decoder = decoder_class(mis) +def test_read_struct(decoder_class: Callable[[bytes], ReadableDecoder]) -> None: + decoder = decoder_class(b"\x18") struct = StructType(NestedField(1, "id", IntegerType(), required=True)) result = StructReader(((0, IntegerReader()),), Record, struct).read(decoder) assert repr(result) == "Record[id=12]" @pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS) -def test_read_struct_lambda(decoder_class: Callable[[InputStream], ReadableDecoder]) -> None: - mis = io.BytesIO(b"\x18") - decoder = decoder_class(mis) +def test_read_struct_lambda(decoder_class: Callable[[bytes], ReadableDecoder]) -> None: + decoder = decoder_class(b"\x18") struct = StructType(NestedField(1, "id", IntegerType(), required=True)) # You can also pass in an arbitrary function that returns a struct @@ -364,9 +360,8 @@ def test_read_struct_lambda(decoder_class: Callable[[InputStream], ReadableDecod @pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS) -def test_read_not_struct_type(decoder_class: Callable[[InputStream], ReadableDecoder]) -> None: - mis = io.BytesIO(b"\x18") - decoder = decoder_class(mis) +def test_read_not_struct_type(decoder_class: Callable[[bytes], ReadableDecoder]) -> None: + decoder = decoder_class(b"\x18") struct = StructType(NestedField(1, "id", IntegerType(), required=True)) with pytest.raises(ValueError) as exc_info: @@ -376,9 +371,8 @@ def test_read_not_struct_type(decoder_class: Callable[[InputStream], ReadableDec @pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS) -def test_read_struct_exception_handling(decoder_class: Callable[[InputStream], ReadableDecoder]) -> None: - mis = io.BytesIO(b"\x18") - decoder = decoder_class(mis) +def test_read_struct_exception_handling(decoder_class: Callable[[bytes], ReadableDecoder]) -> None: + decoder = decoder_class(b"\x18") def raise_err(struct: StructType) -> None: raise TypeError("boom")