Skip to content

Commit

Permalink
register table integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
JoniKet committed Jan 29, 2025
1 parent 5cedb9a commit e8f1c9c
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 92 deletions.
81 changes: 0 additions & 81 deletions tests/catalog/test_hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,87 +204,6 @@ def test_check_number_of_namespaces(table_schema_simple: Schema) -> None:
catalog.create_table("table", schema=table_schema_simple)


@pytest.mark.parametrize("hive2_compatible", [True, False])
@patch("time.time", MagicMock(return_value=12345))
def test_register_table(
table_schema_with_all_types: Schema,
hive_database: HiveDatabase,
hive_table: HiveTable,
hive2_compatible: bool,
metadata_with_owner_location: str,
) -> None:
catalog = HiveCatalog(HIVE_CATALOG_NAME, uri=HIVE_METASTORE_FAKE_URL)
if hive2_compatible:
catalog = HiveCatalog(HIVE_CATALOG_NAME, uri=HIVE_METASTORE_FAKE_URL, **{"hive.hive2-compatible": "true"})

catalog._client = MagicMock()
catalog._client.__enter__().create_table.return_value = None
catalog._client.__enter__().register_table.return_value = None
catalog._client.__enter__().get_table.return_value = hive_table
catalog._client.__enter__().get_database.return_value = hive_database

catalog.register_table(("default", "table"), metadata_location=metadata_with_owner_location)

catalog._client.__enter__().create_table.assert_called_with(
HiveTable(
tableName="table",
dbName="default",
owner="test",
createTime=12345,
lastAccessTime=12345,
retention=None,
sd=StorageDescriptor(
cols=[
FieldSchema(name="x", type="bigint", comment=None), # Corrected columns
FieldSchema(name="y", type="bigint", comment="comment"),
FieldSchema(name="z", type="bigint", comment=None),
],
location="s3://bucket/test/location", # Corrected location
inputFormat="org.apache.hadoop.mapred.FileInputFormat",
outputFormat="org.apache.hadoop.mapred.FileOutputFormat",
compressed=None,
numBuckets=None,
serdeInfo=SerDeInfo(
name=None,
serializationLib="org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
parameters=None,
description=None,
serializerClass=None,
deserializerClass=None,
serdeType=None,
),
bucketCols=None,
sortCols=None,
parameters=None,
skewedInfo=None,
storedAsSubDirectories=None,
),
partitionKeys=None,
parameters={"EXTERNAL": "TRUE", "table_type": "ICEBERG", "metadata_location": metadata_with_owner_location},
viewOriginalText=None,
viewExpandedText=None,
tableType="EXTERNAL_TABLE",
privileges=None,
temporary=False,
rewriteEnabled=None,
creationMetadata=None,
catName=None,
ownerType=1,
writeId=-1,
isStatsCompliant=None,
colStats=None,
accessType=None,
requiredReadCapabilities=None,
requiredWriteCapabilities=None,
id=None,
fileMetadata=None,
dictionary=None,
txnId=None,
)
)
assert catalog.table_exists(identifier="default.table")


@pytest.mark.parametrize("hive2_compatible", [True, False])
@patch("time.time", MagicMock(return_value=12345))
def test_create_table(
Expand Down
11 changes: 0 additions & 11 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -1133,17 +1133,6 @@ def metadata_location(tmp_path_factory: pytest.TempPathFactory) -> str:
return metadata_location


@pytest.fixture(scope="session")
def metadata_with_owner_location(tmp_path_factory: pytest.TempPathFactory) -> str:
from pyiceberg.io.pyarrow import PyArrowFileIO

metadata_location = str(tmp_path_factory.mktemp("metadata") / f"{uuid.uuid4()}.metadata.json")
metadata = TableMetadataV2(**EXAMPLE_TABLE_METADATA_V2)
metadata.properties["owner"] = "test"
ToOutputFile.table_metadata(metadata, PyArrowFileIO().new_output(location=metadata_location), overwrite=True)
return metadata_location


@pytest.fixture(scope="session")
def metadata_location_gz(tmp_path_factory: pytest.TempPathFactory) -> str:
from pyiceberg.io.pyarrow import PyArrowFileIO
Expand Down
100 changes: 100 additions & 0 deletions tests/integration/test_register_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import pytest

from pyiceberg.catalog import Catalog
from pyiceberg.catalog.hive import (
HiveCatalog,
)
from pyiceberg.exceptions import NoSuchTableError, TableAlreadyExistsError
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table import Table
from pyiceberg.types import (
BooleanType,
DateType,
IntegerType,
NestedField,
StringType,
)

TABLE_SCHEMA = Schema(
NestedField(field_id=1, name="foo", field_type=BooleanType(), required=False),
NestedField(field_id=2, name="bar", field_type=StringType(), required=False),
NestedField(field_id=4, name="baz", field_type=IntegerType(), required=False),
NestedField(field_id=10, name="qux", field_type=DateType(), required=False),
)


def _create_table(
session_catalog: Catalog,
identifier: str,
format_version: int,
location: str,
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
schema: Schema = TABLE_SCHEMA,
) -> Table:
try:
session_catalog.drop_table(identifier=identifier)
except NoSuchTableError:
pass

return session_catalog.create_table(
identifier=identifier,
schema=schema,
location=location,
properties={"format-version": str(format_version)},
partition_spec=partition_spec,
)


@pytest.mark.integration
def test_hive_register_table(
session_catalog: HiveCatalog,
) -> None:
identifier = "default.hive_register_table"
location = "s3a://warehouse/default/hive_register_table"
tbl = _create_table(session_catalog, identifier, 2, location)
assert session_catalog.table_exists(identifier=identifier)
session_catalog.drop_table(identifier=identifier)
assert not session_catalog.table_exists(identifier=identifier)
session_catalog.register_table(("default", "hive_register_table"), metadata_location=tbl.metadata_location)
assert session_catalog.table_exists(identifier=identifier)


@pytest.mark.integration
def test_hive_register_table_existing(
session_catalog: HiveCatalog,
) -> None:
identifier = "default.hive_register_table_existing"
location = "s3a://warehouse/default/hive_register_table_existing"
tbl = _create_table(session_catalog, identifier, 2, location)
assert session_catalog.table_exists(identifier=identifier)
# Assert that registering the table again raises TableAlreadyExistsError
with pytest.raises(TableAlreadyExistsError):
session_catalog.register_table(("default", "hive_register_table_existing"), metadata_location=tbl.metadata_location)


@pytest.mark.integration
def test_rest_register_table(
session_catalog: Catalog,
) -> None:
identifier = "default.rest_register_table"
location = "s3a://warehouse/default/rest_register_table"
tbl = _create_table(session_catalog, identifier, 2, location)
assert session_catalog.table_exists(identifier=identifier)
session_catalog.drop_table(identifier=identifier)
assert not session_catalog.table_exists(identifier=identifier)
session_catalog.register_table(identifier=identifier, metadata_location=tbl.metadata_location)
assert session_catalog.table_exists(identifier=identifier)


@pytest.mark.integration
def test_rest_register_table_existing(
session_catalog: Catalog,
) -> None:
identifier = "default.rest_register_table_existing"
location = "s3a://warehouse/default/rest_register_table_existing"
tbl = _create_table(session_catalog, identifier, 2, location)
assert session_catalog.table_exists(identifier=identifier)
# Assert that registering the table again raises TableAlreadyExistsError
with pytest.raises(TableAlreadyExistsError):
session_catalog.register_table(identifier=identifier, metadata_location=tbl.metadata_location)

0 comments on commit e8f1c9c

Please sign in to comment.