diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md index 3705be5d35..912582b8c1 100644 --- a/mkdocs/docs/configuration.md +++ b/mkdocs/docs/configuration.md @@ -524,6 +524,104 @@ catalog: +### S3Tables Catalog + +The S3Tables Catalog leverages the catalog functionalities of the Amazon S3Tables service and requires an existing S3 Tables Bucket to operate. + +To use Amazon S3Tables as your catalog, you can configure pyiceberg using one of the following methods. Additionally, refer to the [AWS documentation](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-configure.html) on configuring credentials to set up your AWS account credentials locally. + +If you intend to use the same credentials for both the S3Tables Catalog and S3 FileIO, you can configure the [`client.*` properties](configuration.md#unified-aws-credentials) to streamline the process. + +Note that the S3Tables Catalog manages the underlying table locations internally, which makes it incompatible with S3-like storage systems such as MinIO. If you specify the `s3tables.endpoint`, ensure that the `s3.endpoint` is configured accordingly. + +```yaml +catalog: + default: + type: s3tables + warehouse: arn:aws:s3tables:us-east-1:012345678901:bucket/pyiceberg-catalog +``` + +If you prefer to pass the credentials explicitly to the client instead of relying on environment variables, + +```yaml +catalog: + default: + type: s3tables + s3tables.access-key-id: + s3tables.secret-access-key: + s3tables.session-token: + s3tables.region: + s3tables.endpoint: http://localhost:9000 + s3.endpoint: http://localhost:9000 +``` + + + +!!! Note "Client-specific Properties" + `s3tables.*` properties are for S3TablesCatalog only. If you want to use the same credentials for both S3TablesCatalog and S3 FileIO, you can set the `client.*` properties. See the [Unified AWS Credentials](configuration.md#unified-aws-credentials) section for more details. + + + + + +| Key | Example | Description | +| -------------------------- | ------------------- | -------------------------------------------------------------------------- | +| s3tables.profile-name | default | Configure the static profile used to access the S3Tables Catalog | +| s3tables.region | us-east-1 | Set the region of the S3Tables Catalog | +| s3tables.access-key-id | admin | Configure the static access key id used to access the S3Tables Catalog | +| s3tables.secret-access-key | password | Configure the static secret access key used to access the S3Tables Catalog | +| s3tables.session-token | AQoDYXdzEJr... | Configure the static session token used to access the S3Tables Catalog | +| s3tables.endpoint | ... | Configure the AWS endpoint | +| s3tables.warehouse | arn:aws:s3tables... | Set the underlying S3 Table Bucket | + + + + + +!!! warning "Removed Properties" + The properties `profile_name`, `region_name`, `aws_access_key_id`, `aws_secret_access_key`, and `aws_session_token` were deprecated and removed in 0.8.0 + + + +An example usage of the S3Tables Catalog is shown below: + +```python +from pyiceberg.catalog.s3tables import S3TablesCatalog +import pyarrow as pa + + +table_bucket_arn: str = "..." +aws_region: str = "..." + +properties = {"s3tables.warehouse": table_bucket_arn, "s3tables.region": aws_region} +catalog = S3TablesCatalog(name="s3tables_catalog", **properties) + +database_name = "prod" + +catalog.create_namespace(namespace=database_name) + +pyarrow_table = pa.Table.from_arrays( + [ + pa.array([None, "A", "B", "C"]), + pa.array([1, 2, 3, 4]), + pa.array([True, None, False, True]), + pa.array([None, "A", "B", "C"]), + ], + schema=pa.schema( + [ + pa.field("foo", pa.large_string(), nullable=True), + pa.field("bar", pa.int32(), nullable=False), + pa.field("baz", pa.bool_(), nullable=True), + pa.field("large", pa.large_string(), nullable=True), + ] + ), +) + +identifier = (database_name, "orders") +table = catalog.create_table(identifier=identifier, schema=pyarrow_table.schema) +table.append(pyarrow_table) +``` + ### Custom Catalog Implementations If you want to load any custom catalog implementation, you can set catalog configurations like the following: diff --git a/poetry.lock b/poetry.lock index ae19f3a0e2..68fda796c1 100644 --- a/poetry.lock +++ b/poetry.lock @@ -2584,13 +2584,13 @@ type = ["mypy (==1.14.1)"] [[package]] name = "moto" -version = "5.0.27" +version = "5.0.28" description = "A library that allows you to easily mock out tests based on AWS infrastructure" optional = false python-versions = ">=3.8" files = [ - {file = "moto-5.0.27-py3-none-any.whl", hash = "sha256:27042fd94c8def0166d9f2ae8d39d9488d4b3115542b5fca88566c0424549013"}, - {file = "moto-5.0.27.tar.gz", hash = "sha256:6c123de7e0e5e6508a10c399ba3ecf2d5143f263f8e804fd4a7091941c3f5207"}, + {file = "moto-5.0.28-py3-none-any.whl", hash = "sha256:2dfbea1afe3b593e13192059a1a7fc4b3cf7fdf92e432070c22346efa45aa0f0"}, + {file = "moto-5.0.28.tar.gz", hash = "sha256:4d3437693411ec943c13c77de5b0b520c4b0a9ac850fead4ba2a54709e086e8b"}, ] [package.dependencies] @@ -5350,6 +5350,7 @@ pyiceberg-core = ["pyiceberg-core"] ray = ["pandas", "pyarrow", "ray", "ray"] rest-sigv4 = ["boto3"] s3fs = ["s3fs"] +s3tables = ["boto3"] snappy = ["python-snappy"] sql-postgres = ["psycopg2-binary", "sqlalchemy"] sql-sqlite = ["sqlalchemy"] @@ -5358,4 +5359,4 @@ zstandard = ["zstandard"] [metadata] lock-version = "2.0" python-versions = "^3.9, !=3.9.7" -content-hash = "66b432ec3645ad122e1c668f795fee1160fae2fd10012e2dfd04c04af90806ec" +content-hash = "a121d29b25a7e835b1fb226ac804861bf308070b4bf4eab221cd23405059d9fc" diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py index c3ea23c1d2..c79f4d00fd 100644 --- a/pyiceberg/catalog/__init__.py +++ b/pyiceberg/catalog/__init__.py @@ -116,6 +116,7 @@ class CatalogType(Enum): GLUE = "glue" DYNAMODB = "dynamodb" SQL = "sql" + S3TABLES = "s3tables" def load_rest(name: str, conf: Properties) -> Catalog: @@ -162,12 +163,22 @@ def load_sql(name: str, conf: Properties) -> Catalog: ) from exc +def load_s3tables(name: str, conf: Properties) -> Catalog: + try: + from pyiceberg.catalog.s3tables import S3TablesCatalog + + return S3TablesCatalog(name, **conf) + except ImportError as exc: + raise NotInstalledError("AWS S3Tables support not installed: pip install 'pyiceberg[s3tables]'") from exc + + AVAILABLE_CATALOGS: dict[CatalogType, Callable[[str, Properties], Catalog]] = { CatalogType.REST: load_rest, CatalogType.HIVE: load_hive, CatalogType.GLUE: load_glue, CatalogType.DYNAMODB: load_dynamodb, CatalogType.SQL: load_sql, + CatalogType.S3TABLES: load_s3tables, } @@ -914,8 +925,8 @@ def _get_default_warehouse_location(self, database_name: str, table_name: str) - raise ValueError("No default path is set, please specify a location when creating a table") @staticmethod - def _write_metadata(metadata: TableMetadata, io: FileIO, metadata_path: str) -> None: - ToOutputFile.table_metadata(metadata, io.new_output(metadata_path)) + def _write_metadata(metadata: TableMetadata, io: FileIO, metadata_path: str, overwrite: bool = False) -> None: + ToOutputFile.table_metadata(metadata, io.new_output(metadata_path), overwrite=overwrite) @staticmethod def _get_metadata_location(location: str, new_version: int = 0) -> str: diff --git a/pyiceberg/catalog/s3tables.py b/pyiceberg/catalog/s3tables.py new file mode 100644 index 0000000000..16da5346e0 --- /dev/null +++ b/pyiceberg/catalog/s3tables.py @@ -0,0 +1,378 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import re +from typing import TYPE_CHECKING, List, Optional, Set, Tuple, Union + +import boto3 + +from pyiceberg.catalog import DEPRECATED_BOTOCORE_SESSION, MetastoreCatalog, PropertiesUpdateSummary +from pyiceberg.exceptions import ( + CommitFailedException, + InvalidNamespaceName, + InvalidTableName, + NamespaceNotEmptyError, + NoSuchNamespaceError, + NoSuchTableError, + S3TablesError, + TableAlreadyExistsError, + TableBucketNotFound, +) +from pyiceberg.io import AWS_ACCESS_KEY_ID, AWS_REGION, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN, load_file_io +from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec +from pyiceberg.schema import Schema +from pyiceberg.serializers import FromInputFile +from pyiceberg.table import CommitTableResponse, CreateTableTransaction, Table +from pyiceberg.table.metadata import new_table_metadata +from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder +from pyiceberg.table.update import TableRequirement, TableUpdate +from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties +from pyiceberg.utils.properties import get_first_property_value + +if TYPE_CHECKING: + import pyarrow as pa + +S3TABLES_PROFILE_NAME = "s3tables.profile-name" +S3TABLES_REGION = "s3tables.region" +S3TABLES_ACCESS_KEY_ID = "s3tables.access-key-id" +S3TABLES_SECRET_ACCESS_KEY = "s3tables.secret-access-key" +S3TABLES_SESSION_TOKEN = "s3tables.session-token" + +S3TABLES_TABLE_BUCKET_ARN = "s3tables.warehouse" + +S3TABLES_ENDPOINT = "s3tables.endpoint" + +# for naming rules see: https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-tables-buckets-naming.html +S3TABLES_VALID_NAME_REGEX = pattern = re.compile("[a-z0-9][a-z0-9_]{1,61}[a-z0-9]") +S3TABLES_RESERVED_NAMESPACE = "aws_s3_metadata" + +S3TABLES_FORMAT = "ICEBERG" + + +class S3TablesCatalog(MetastoreCatalog): + def __init__(self, name: str, **properties: str): + super().__init__(name, **properties) + + self.table_bucket_arn = self.properties[S3TABLES_TABLE_BUCKET_ARN] + + session = boto3.Session( + profile_name=properties.get(S3TABLES_PROFILE_NAME), + region_name=get_first_property_value(properties, S3TABLES_REGION, AWS_REGION), + botocore_session=properties.get(DEPRECATED_BOTOCORE_SESSION), + aws_access_key_id=get_first_property_value(properties, S3TABLES_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID), + aws_secret_access_key=get_first_property_value(properties, S3TABLES_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY), + aws_session_token=get_first_property_value(properties, S3TABLES_SESSION_TOKEN, AWS_SESSION_TOKEN), + ) + try: + self.s3tables = session.client("s3tables", endpoint_url=properties.get(S3TABLES_ENDPOINT)) + except boto3.session.UnknownServiceError as e: + raise S3TablesError("'s3tables' requires boto3>=1.35.74. Current version: {boto3.__version__}.") from e + + try: + self.s3tables.get_table_bucket(tableBucketARN=self.table_bucket_arn) + except self.s3tables.exceptions.NotFoundException as e: + raise TableBucketNotFound(e) from e + + def commit_table( + self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...] + ) -> CommitTableResponse: + table_identifier = table.name() + database_name, table_name = self.identifier_to_database_and_table(table_identifier, NoSuchTableError) + + current_table: Optional[Table] + version_token: Optional[str] + try: + current_table, version_token = self._load_table_and_version(identifier=table_identifier) + except NoSuchTableError: + current_table = None + version_token = None + + if current_table: + updated_staged_table = self._update_and_stage_table(current_table, table_identifier, requirements, updates) + if updated_staged_table.metadata == current_table.metadata: + # no changes, do nothing + return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location) + + self._write_metadata( + metadata=updated_staged_table.metadata, + io=updated_staged_table.io, + metadata_path=updated_staged_table.metadata_location, + overwrite=True, + ) + + # try to update metadata location which will fail if the versionToken changed meanwhile + try: + self.s3tables.update_table_metadata_location( + tableBucketARN=self.table_bucket_arn, + namespace=database_name, + name=table_name, + versionToken=version_token, + metadataLocation=updated_staged_table.metadata_location, + ) + except self.s3tables.exceptions.ConflictException as e: + raise CommitFailedException( + f"Cannot commit {database_name}.{table_name} because of a concurrent update to the table version {version_token}." + ) from e + return CommitTableResponse( + metadata=updated_staged_table.metadata, metadata_location=updated_staged_table.metadata_location + ) + else: + # table does not exist, create it + raise NotImplementedError("Creating a table on commit is currently not supported.") + + def create_table_transaction( + self, + identifier: Union[str, Identifier], + schema: Union[Schema, "pa.Schema"], + location: Optional[str] = None, + partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, + sort_order: SortOrder = UNSORTED_SORT_ORDER, + properties: Properties = EMPTY_DICT, + ) -> CreateTableTransaction: + raise NotImplementedError("create_table_transaction currently not supported.") + + def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None: + if properties: + raise NotImplementedError("Setting namespace properties is not supported.") + valid_namespace: str = self._validate_namespace_identifier(namespace) + self.s3tables.create_namespace(tableBucketARN=self.table_bucket_arn, namespace=[valid_namespace]) + + def _validate_namespace_identifier(self, namespace: Union[str, Identifier]) -> str: + namespace = self.identifier_to_database(namespace) + + if not S3TABLES_VALID_NAME_REGEX.fullmatch(namespace) or namespace == S3TABLES_RESERVED_NAMESPACE: + raise InvalidNamespaceName( + "The specified namespace name is not valid. See https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-tables-buckets-naming.html for naming rules." + ) + + return namespace + + def _validate_database_and_table_identifier(self, identifier: Union[str, Identifier]) -> Tuple[str, str]: + namespace, table_name = self.identifier_to_database_and_table(identifier) + + namespace = self._validate_namespace_identifier(namespace) + + if not S3TABLES_VALID_NAME_REGEX.fullmatch(table_name): + raise InvalidTableName( + "The specified table name is not valid. See https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-tables-buckets-naming.html for naming rules." + ) + + return namespace, table_name + + def create_table( + self, + identifier: Union[str, Identifier], + schema: Union[Schema, "pa.Schema"], + location: Optional[str] = None, + partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, + sort_order: SortOrder = UNSORTED_SORT_ORDER, + properties: Properties = EMPTY_DICT, + ) -> Table: + if location: + raise NotImplementedError("S3 Tables does not support user specified table locations.") + namespace, table_name = self._validate_database_and_table_identifier(identifier) + + schema: Schema = self._convert_schema_if_needed(schema) # type: ignore + + # creating a new table with S3 Tables is a two step process. We first have to create an S3 Table with the + # S3 Tables API and then write the new metadata.json to the warehouseLocation associated with the newly + # created S3 Table. + try: + version_token = self.s3tables.create_table( + tableBucketARN=self.table_bucket_arn, namespace=namespace, name=table_name, format=S3TABLES_FORMAT + )["versionToken"] + except self.s3tables.exceptions.NotFoundException as e: + raise NoSuchNamespaceError(f"Cannot create {namespace}.{table_name} because no such namespace exists.") from e + except self.s3tables.exceptions.ConflictException as e: + raise TableAlreadyExistsError( + f"Cannot create {namespace}.{table_name} because a table of the same name already exists in the namespace." + ) from e + + try: + response = self.s3tables.get_table_metadata_location( + tableBucketARN=self.table_bucket_arn, namespace=namespace, name=table_name + ) + warehouse_location = response["warehouseLocation"] + + metadata_location = self._get_metadata_location(location=warehouse_location) + metadata = new_table_metadata( + location=warehouse_location, + schema=schema, + partition_spec=partition_spec, + sort_order=sort_order, + properties=properties, + ) + + io = load_file_io(properties=self.properties, location=metadata_location) + # this triggers unsupported list operation error as S3 Table Buckets only support a subset of the S3 Bucket API, + # setting overwrite=True is a workaround for now since it prevents a call to list_objects + self._write_metadata(metadata, io, metadata_location, overwrite=True) + + try: + self.s3tables.update_table_metadata_location( + tableBucketARN=self.table_bucket_arn, + namespace=namespace, + name=table_name, + versionToken=version_token, + metadataLocation=metadata_location, + ) + except self.s3tables.exceptions.ConflictException as e: + raise CommitFailedException( + f"Cannot create {namespace}.{table_name} because of a concurrent update to the table version {version_token}." + ) from e + except: + self.s3tables.delete_table(tableBucketARN=self.table_bucket_arn, namespace=namespace, name=table_name) + raise + + return self.load_table(identifier=identifier) + + def drop_namespace(self, namespace: Union[str, Identifier]) -> None: + namespace = self._validate_namespace_identifier(namespace) + try: + self.s3tables.delete_namespace(tableBucketARN=self.table_bucket_arn, namespace=namespace) + except self.s3tables.exceptions.ConflictException as e: + raise NamespaceNotEmptyError(f"Namespace {namespace} is not empty.") from e + + def drop_table(self, identifier: Union[str, Identifier]) -> None: + namespace, table_name = self._validate_database_and_table_identifier(identifier) + try: + response = self.s3tables.get_table(tableBucketARN=self.table_bucket_arn, namespace=namespace, name=table_name) + except self.s3tables.exceptions.NotFoundException as e: + raise NoSuchTableError(f"No table with identifier {identifier} exists.") from e + + version_token = response["versionToken"] + try: + self.s3tables.delete_table( + tableBucketARN=self.table_bucket_arn, + namespace=namespace, + name=table_name, + versionToken=version_token, + ) + except self.s3tables.exceptions.ConflictException as e: + raise CommitFailedException( + f"Cannot delete {namespace}.{table_name} because of a concurrent update to the table version {version_token}." + ) from e + + def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]: + if namespace: + # hierarchical namespaces are not supported + return [] + paginator = self.s3tables.get_paginator("list_namespaces") + + namespaces: List[Identifier] = [] + for page in paginator.paginate(tableBucketARN=self.table_bucket_arn): + namespaces.extend(tuple(entry["namespace"]) for entry in page["namespaces"]) + + return namespaces + + def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]: + namespace = self._validate_namespace_identifier(namespace) + paginator = self.s3tables.get_paginator("list_tables") + tables: List[Identifier] = [] + for page in paginator.paginate(tableBucketARN=self.table_bucket_arn, namespace=namespace): + tables.extend((namespace, table["name"]) for table in page["tables"]) + return tables + + def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties: + namespace = self._validate_namespace_identifier(namespace) + response = self.s3tables.get_namespace(tableBucketARN=self.table_bucket_arn, namespace=namespace) + return { + "namespace": response["namespace"], + "createdAt": response["createdAt"], + "createdBy": response["createdBy"], + "ownerAccountId": response["ownerAccountId"], + } + + def load_table(self, identifier: Union[str, Identifier]) -> Table: + table, _ = self._load_table_and_version(identifier) + return table + + def _load_table_and_version(self, identifier: Union[str, Identifier]) -> Tuple[Table, str]: + namespace, table_name = self._validate_database_and_table_identifier(identifier) + + try: + response = self.s3tables.get_table_metadata_location( + tableBucketARN=self.table_bucket_arn, namespace=namespace, name=table_name + ) + except self.s3tables.exceptions.NotFoundException as e: + raise NoSuchTableError(f"No table with identifier {identifier} exists.") from e + + metadata_location = response.get("metadataLocation") + if not metadata_location: + raise S3TablesError("No table metadata found.") + + version_token = response["versionToken"] + + io = load_file_io(properties=self.properties, location=metadata_location) + file = io.new_input(metadata_location) + metadata = FromInputFile.table_metadata(file) + return Table( + identifier=(namespace, table_name), + metadata=metadata, + metadata_location=metadata_location, + io=self._load_file_io(metadata.properties, metadata_location), + catalog=self, + ), version_token + + def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table: + from_namespace, from_table_name = self._validate_database_and_table_identifier(from_identifier) + to_namespace, to_table_name = self._validate_database_and_table_identifier(to_identifier) + + version_token = self.s3tables.get_table( + tableBucketARN=self.table_bucket_arn, namespace=from_namespace, name=from_table_name + )["versionToken"] + + self.s3tables.rename_table( + tableBucketARN=self.table_bucket_arn, + namespace=from_namespace, + name=from_table_name, + newNamespaceName=to_namespace, + newName=to_table_name, + versionToken=version_token, + ) + + return self.load_table(to_identifier) + + def table_exists(self, identifier: Union[str, Identifier]) -> bool: + namespace, table_name = self._validate_database_and_table_identifier(identifier) + try: + self.s3tables.get_table(tableBucketARN=self.table_bucket_arn, namespace=namespace, name=table_name) + except self.s3tables.exceptions.NotFoundException: + return False + return True + + def update_namespace_properties( + self, namespace: Union[str, Identifier], removals: Optional[Set[str]] = None, updates: Properties = EMPTY_DICT + ) -> PropertiesUpdateSummary: + # namespace properties are read only + raise NotImplementedError("Namespace properties are read only.") + + def purge_table(self, identifier: Union[str, Identifier]) -> None: + # purge is not supported as s3tables doesn't support delete operations + # table maintenance is automated + raise NotImplementedError + + def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table: + raise NotImplementedError + + def drop_view(self, identifier: Union[str, Identifier]) -> None: + raise NotImplementedError + + def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]: + raise NotImplementedError + + def view_exists(self, identifier: Union[str, Identifier]) -> bool: + raise NotImplementedError diff --git a/pyiceberg/exceptions.py b/pyiceberg/exceptions.py index 56574ff471..8367d91b79 100644 --- a/pyiceberg/exceptions.py +++ b/pyiceberg/exceptions.py @@ -112,6 +112,22 @@ class GenericDynamoDbError(DynamoDbError): pass +class S3TablesError(Exception): + pass + + +class InvalidNamespaceName(S3TablesError): + pass + + +class InvalidTableName(S3TablesError): + pass + + +class TableBucketNotFound(S3TablesError): + pass + + class CommitFailedException(Exception): """Commit failed, refresh and try again.""" diff --git a/pyproject.toml b/pyproject.toml index 7d28e89832..6e940e0058 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -89,7 +89,7 @@ pre-commit = "4.1.0" fastavro = "1.10.0" coverage = { version = "^7.4.2", extras = ["toml"] } requests-mock = "1.12.1" -moto = { version = "^5.0.2", extras = ["server"] } +moto = { version = "^5.0.28", extras = ["server"] } typing-extensions = "4.12.2" pytest-mock = "3.14.0" pyspark = "3.5.3" @@ -1211,6 +1211,7 @@ sql-postgres = ["sqlalchemy", "psycopg2-binary"] sql-sqlite = ["sqlalchemy"] gcsfs = ["gcsfs"] rest-sigv4 = ["boto3"] +s3tables = ["boto3"] pyiceberg-core = ["pyiceberg-core"] [tool.pytest.ini_options] diff --git a/tests/catalog/integration_test_s3tables.py b/tests/catalog/integration_test_s3tables.py new file mode 100644 index 0000000000..8cfcbe7b44 --- /dev/null +++ b/tests/catalog/integration_test_s3tables.py @@ -0,0 +1,286 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import pytest + +from pyiceberg.catalog.s3tables import S3TablesCatalog +from pyiceberg.exceptions import NoSuchNamespaceError, NoSuchTableError, TableBucketNotFound +from pyiceberg.partitioning import PartitionField, PartitionSpec +from pyiceberg.schema import Schema +from pyiceberg.transforms import IdentityTransform +from pyiceberg.types import IntegerType + + +@pytest.fixture +def database_name(database_name: str) -> str: + # naming rules prevent "-" in namespaces for s3 table buckets + return database_name.replace("-", "_") + + +@pytest.fixture +def table_name(table_name: str) -> str: + # naming rules prevent "-" in table namees for s3 table buckets + return table_name.replace("-", "_") + + +@pytest.fixture +def table_bucket_arn() -> str: + """Set the environment variable AWS_TEST_S3_TABLE_BUCKET_ARN for an S3 table bucket to test.""" + import os + + table_bucket_arn = os.getenv("AWS_TEST_S3_TABLE_BUCKET_ARN") + if not table_bucket_arn: + raise ValueError( + "Please specify a table bucket arn to run the test by setting environment variable AWS_TEST_S3_TABLE_BUCKET_ARN" + ) + return table_bucket_arn + + +@pytest.fixture +def aws_region() -> str: + import os + + aws_region = os.getenv("AWS_REGION") + if not aws_region: + raise ValueError("Please specify an AWS region to run the test by setting environment variable AWS_REGION") + return aws_region + + +@pytest.fixture +def catalog(table_bucket_arn: str, aws_region: str) -> S3TablesCatalog: + properties = {"s3tables.warehouse": table_bucket_arn, "s3tables.region": aws_region} + return S3TablesCatalog(name="test_s3tables_catalog", **properties) + + +def test_creating_catalog_validates_s3_table_bucket_exists(table_bucket_arn: str) -> None: + properties = {"s3tables.warehouse": f"{table_bucket_arn}-modified", "s3tables.region": "us-east-1"} + with pytest.raises(TableBucketNotFound): + S3TablesCatalog(name="test_s3tables_catalog", **properties) + + +def test_create_namespace(catalog: S3TablesCatalog, database_name: str) -> None: + catalog.create_namespace(namespace=database_name) + namespaces = catalog.list_namespaces() + assert (database_name,) in namespaces + + +def test_load_namespace_properties(catalog: S3TablesCatalog, database_name: str) -> None: + catalog.create_namespace(namespace=database_name) + assert database_name in catalog.load_namespace_properties(database_name)["namespace"] + + +def test_drop_namespace(catalog: S3TablesCatalog, database_name: str) -> None: + catalog.create_namespace(namespace=database_name) + assert (database_name,) in catalog.list_namespaces() + catalog.drop_namespace(namespace=database_name) + assert (database_name,) not in catalog.list_namespaces() + + +def test_create_table(catalog: S3TablesCatalog, database_name: str, table_name: str, table_schema_nested: Schema) -> None: + identifier = (database_name, table_name) + + catalog.create_namespace(namespace=database_name) + table = catalog.create_table(identifier=identifier, schema=table_schema_nested) + + assert table == catalog.load_table(identifier) + + +def test_create_table_in_invalid_namespace_raises_exception( + catalog: S3TablesCatalog, database_name: str, table_name: str, table_schema_nested: Schema +) -> None: + identifier = (database_name, table_name) + + with pytest.raises(NoSuchNamespaceError): + catalog.create_table(identifier=identifier, schema=table_schema_nested) + + +def test_table_exists(catalog: S3TablesCatalog, database_name: str, table_name: str, table_schema_nested: Schema) -> None: + identifier = (database_name, table_name) + + catalog.create_namespace(namespace=database_name) + assert not catalog.table_exists(identifier=identifier) + catalog.create_table(identifier=identifier, schema=table_schema_nested) + assert catalog.table_exists(identifier=identifier) + + +def test_rename_table(catalog: S3TablesCatalog, database_name: str, table_name: str, table_schema_nested: Schema) -> None: + identifier = (database_name, table_name) + + catalog.create_namespace(namespace=database_name) + catalog.create_table(identifier=identifier, schema=table_schema_nested) + + to_database_name = f"{database_name}new" + to_table_name = f"{table_name}new" + to_identifier = (to_database_name, to_table_name) + catalog.create_namespace(namespace=to_database_name) + catalog.rename_table(from_identifier=identifier, to_identifier=to_identifier) + + assert not catalog.table_exists(identifier=identifier) + assert catalog.table_exists(identifier=to_identifier) + + +def test_list_tables(catalog: S3TablesCatalog, database_name: str, table_name: str, table_schema_nested: Schema) -> None: + identifier = (database_name, table_name) + + catalog.create_namespace(namespace=database_name) + assert not catalog.list_tables(namespace=database_name) + catalog.create_table(identifier=identifier, schema=table_schema_nested) + assert catalog.list_tables(namespace=database_name) + + +def test_drop_table(catalog: S3TablesCatalog, database_name: str, table_name: str, table_schema_nested: Schema) -> None: + identifier = (database_name, table_name) + + catalog.create_namespace(namespace=database_name) + catalog.create_table(identifier=identifier, schema=table_schema_nested) + + catalog.drop_table(identifier=identifier) + + with pytest.raises(NoSuchTableError): + catalog.load_table(identifier=identifier) + + +def test_commit_new_column_to_table( + catalog: S3TablesCatalog, database_name: str, table_name: str, table_schema_nested: Schema +) -> None: + identifier = (database_name, table_name) + + catalog.create_namespace(namespace=database_name) + table = catalog.create_table(identifier=identifier, schema=table_schema_nested) + + last_updated_ms = table.metadata.last_updated_ms + original_table_metadata_location = table.metadata_location + original_table_last_updated_ms = table.metadata.last_updated_ms + + transaction = table.transaction() + update = transaction.update_schema() + update.add_column(path="b", field_type=IntegerType()) + update.commit() + transaction.commit_transaction() + + updated_table_metadata = table.metadata + assert updated_table_metadata.current_schema_id == 1 + assert len(updated_table_metadata.schemas) == 2 + assert updated_table_metadata.last_updated_ms > last_updated_ms + assert updated_table_metadata.metadata_log[0].metadata_file == original_table_metadata_location + assert updated_table_metadata.metadata_log[0].timestamp_ms == original_table_last_updated_ms + assert table.schema().columns[-1].name == "b" + + +def test_write_pyarrow_table(catalog: S3TablesCatalog, database_name: str, table_name: str) -> None: + identifier = (database_name, table_name) + catalog.create_namespace(namespace=database_name) + + import pyarrow as pa + + pyarrow_table = pa.Table.from_arrays( + [ + pa.array([None, "A", "B", "C"]), # 'foo' column + pa.array([1, 2, 3, 4]), # 'bar' column + pa.array([True, None, False, True]), # 'baz' column + pa.array([None, "A", "B", "C"]), # 'large' column + ], + schema=pa.schema( + [ + pa.field("foo", pa.large_string(), nullable=True), + pa.field("bar", pa.int32(), nullable=False), + pa.field("baz", pa.bool_(), nullable=True), + pa.field("large", pa.large_string(), nullable=True), + ] + ), + ) + table = catalog.create_table(identifier=identifier, schema=pyarrow_table.schema) + table.append(pyarrow_table) + + assert table.scan().to_arrow().num_rows == pyarrow_table.num_rows + + +def test_commit_new_data_to_table(catalog: S3TablesCatalog, database_name: str, table_name: str) -> None: + identifier = (database_name, table_name) + catalog.create_namespace(namespace=database_name) + + import pyarrow as pa + + pyarrow_table = pa.Table.from_arrays( + [ + pa.array([None, "A", "B", "C"]), # 'foo' column + pa.array([1, 2, 3, 4]), # 'bar' column + pa.array([True, None, False, True]), # 'baz' column + pa.array([None, "A", "B", "C"]), # 'large' column + ], + schema=pa.schema( + [ + pa.field("foo", pa.large_string(), nullable=True), + pa.field("bar", pa.int32(), nullable=False), + pa.field("baz", pa.bool_(), nullable=True), + pa.field("large", pa.large_string(), nullable=True), + ] + ), + ) + + table = catalog.create_table(identifier=identifier, schema=pyarrow_table.schema) + table.append(pyarrow_table) + + row_count = table.scan().to_arrow().num_rows + assert row_count + last_updated_ms = table.metadata.last_updated_ms + original_table_metadata_location = table.metadata_location + original_table_last_updated_ms = table.metadata.last_updated_ms + + transaction = table.transaction() + transaction.append(table.scan().to_arrow()) + transaction.commit_transaction() + + updated_table_metadata = table.metadata + assert updated_table_metadata.last_updated_ms > last_updated_ms + assert updated_table_metadata.metadata_log[-1].metadata_file == original_table_metadata_location + assert updated_table_metadata.metadata_log[-1].timestamp_ms == original_table_last_updated_ms + assert table.scan().to_arrow().num_rows == 2 * row_count + + +@pytest.mark.xfail(raises=NotImplementedError, reason="create_table_transaction not implemented yet") +def test_create_table_transaction( + catalog: S3TablesCatalog, database_name: str, table_name: str, table_schema_nested: str +) -> None: + identifier = (database_name, table_name) + catalog.create_namespace(namespace=database_name) + + with catalog.create_table_transaction( + identifier, + table_schema_nested, + partition_spec=PartitionSpec(PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="foo")), + ) as txn: + last_updated_metadata = txn.table_metadata.last_updated_ms + with txn.update_schema() as update_schema: + update_schema.add_column(path="b", field_type=IntegerType()) + + with txn.update_spec() as update_spec: + update_spec.add_identity("bar") + + txn.set_properties(test_a="test_aa", test_b="test_b", test_c="test_c") + + table = catalog.load_table(identifier) + + assert table.schema().find_field("b").field_type == IntegerType() + assert table.properties == {"test_a": "test_aa", "test_b": "test_b", "test_c": "test_c"} + assert table.spec().last_assigned_field_id == 1001 + assert table.spec().fields_by_source_id(1)[0].name == "foo" + assert table.spec().fields_by_source_id(1)[0].field_id == 1000 + assert table.spec().fields_by_source_id(1)[0].transform == IdentityTransform() + assert table.spec().fields_by_source_id(2)[0].name == "bar" + assert table.spec().fields_by_source_id(2)[0].field_id == 1001 + assert table.spec().fields_by_source_id(2)[0].transform == IdentityTransform() + assert table.metadata.last_updated_ms > last_updated_metadata diff --git a/tests/catalog/test_s3tables.py b/tests/catalog/test_s3tables.py new file mode 100644 index 0000000000..8ae0ff1da0 --- /dev/null +++ b/tests/catalog/test_s3tables.py @@ -0,0 +1,307 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import string +from random import choice + +import boto3 +import pytest + +from pyiceberg.catalog import load_catalog +from pyiceberg.catalog.s3tables import S3TablesCatalog +from pyiceberg.exceptions import NoSuchNamespaceError, NoSuchTableError, TableBucketNotFound +from pyiceberg.partitioning import PartitionField, PartitionSpec +from pyiceberg.schema import Schema +from pyiceberg.transforms import IdentityTransform +from pyiceberg.types import IntegerType + + +@pytest.fixture +def database_name(database_name: str) -> str: + # naming rules prevent "-" in namespaces for s3 table buckets + return database_name.replace("-", "_") + + +@pytest.fixture +def table_name(table_name: str) -> str: + # naming rules prevent "-" in table namees for s3 table buckets + return table_name.replace("-", "_") + + +@pytest.fixture +def aws_region() -> str: + return "us-east-1" + + +@pytest.fixture +def table_bucket_arn(monkeypatch: pytest.MonkeyPatch, moto_endpoint_url: str) -> str: + monkeypatch.setenv("AWS_ENDPOINT_URL", moto_endpoint_url) + + prefix = "pyiceberg-table-bucket-" + random_tag = "".join(choice(string.ascii_letters) for _ in range(12)) + name = (prefix + random_tag).lower() + table_bucket_arn = boto3.client("s3tables", endpoint_url=moto_endpoint_url).create_table_bucket(name=name)["arn"] + return table_bucket_arn + + +@pytest.fixture(params=["pyiceberg.io.fsspec.FsspecFileIO", "pyiceberg.io.pyarrow.PyArrowFileIO"]) +def file_io_impl(request: pytest.FixtureRequest) -> str: + return request.param + + +@pytest.fixture +def catalog(table_bucket_arn: str, aws_region: str, moto_endpoint_url: str, file_io_impl: str) -> S3TablesCatalog: + properties = { + "s3tables.warehouse": table_bucket_arn, + "s3tables.region": aws_region, + "py-io-impl": file_io_impl, + "s3tables.endpoint": moto_endpoint_url, + "s3.endpoint": moto_endpoint_url, + } + return S3TablesCatalog(name="test_s3tables_catalog", **properties) + + +def test_load_catalog(table_bucket_arn: str, aws_region: str, moto_endpoint_url: str) -> None: + properties = { + "type": "s3tables", + "s3tables.warehouse": table_bucket_arn, + "s3tables.region": aws_region, + "py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO", + "s3tables.endpoint": moto_endpoint_url, + } + catalog = load_catalog(**properties) + assert isinstance(catalog, S3TablesCatalog) + + +def test_creating_catalog_validates_s3_table_bucket_exists(table_bucket_arn: str) -> None: + properties = {"s3tables.warehouse": f"{table_bucket_arn}-modified", "s3tables.region": "us-east-1"} + with pytest.raises(TableBucketNotFound): + S3TablesCatalog(name="test_s3tables_catalog", **properties) + + +def test_create_namespace(catalog: S3TablesCatalog, database_name: str) -> None: + catalog.create_namespace(namespace=database_name) + namespaces = catalog.list_namespaces() + assert (database_name,) in namespaces + + +def test_load_namespace_properties(catalog: S3TablesCatalog, database_name: str) -> None: + catalog.create_namespace(namespace=database_name) + assert database_name in catalog.load_namespace_properties(database_name)["namespace"] + + +def test_drop_namespace(catalog: S3TablesCatalog, database_name: str) -> None: + catalog.create_namespace(namespace=database_name) + assert (database_name,) in catalog.list_namespaces() + catalog.drop_namespace(namespace=database_name) + assert (database_name,) not in catalog.list_namespaces() + + +def test_create_table(catalog: S3TablesCatalog, database_name: str, table_name: str, table_schema_nested: Schema) -> None: + identifier = (database_name, table_name) + + catalog.create_namespace(namespace=database_name) + table = catalog.create_table(identifier=identifier, schema=table_schema_nested) + + assert table == catalog.load_table(identifier) + + +def test_create_table_in_invalid_namespace_raises_exception( + catalog: S3TablesCatalog, database_name: str, table_name: str, table_schema_nested: Schema +) -> None: + identifier = (database_name, table_name) + + with pytest.raises(NoSuchNamespaceError): + catalog.create_table(identifier=identifier, schema=table_schema_nested) + + +def test_table_exists(catalog: S3TablesCatalog, database_name: str, table_name: str, table_schema_nested: Schema) -> None: + identifier = (database_name, table_name) + + catalog.create_namespace(namespace=database_name) + assert not catalog.table_exists(identifier=identifier) + catalog.create_table(identifier=identifier, schema=table_schema_nested) + assert catalog.table_exists(identifier=identifier) + + +def test_rename_table(catalog: S3TablesCatalog, database_name: str, table_name: str, table_schema_nested: Schema) -> None: + identifier = (database_name, table_name) + + catalog.create_namespace(namespace=database_name) + catalog.create_table(identifier=identifier, schema=table_schema_nested) + + to_database_name = f"{database_name}new" + to_table_name = f"{table_name}new" + to_identifier = (to_database_name, to_table_name) + catalog.create_namespace(namespace=to_database_name) + catalog.rename_table(from_identifier=identifier, to_identifier=to_identifier) + + assert not catalog.table_exists(identifier=identifier) + assert catalog.table_exists(identifier=to_identifier) + + +def test_list_tables(catalog: S3TablesCatalog, database_name: str, table_name: str, table_schema_nested: Schema) -> None: + identifier = (database_name, table_name) + + catalog.create_namespace(namespace=database_name) + assert not catalog.list_tables(namespace=database_name) + catalog.create_table(identifier=identifier, schema=table_schema_nested) + assert catalog.list_tables(namespace=database_name) + + +def test_drop_table(catalog: S3TablesCatalog, database_name: str, table_name: str, table_schema_nested: Schema) -> None: + identifier = (database_name, table_name) + + catalog.create_namespace(namespace=database_name) + catalog.create_table(identifier=identifier, schema=table_schema_nested) + + catalog.drop_table(identifier=identifier) + + with pytest.raises(NoSuchTableError): + catalog.load_table(identifier=identifier) + + +def test_commit_new_column_to_table( + catalog: S3TablesCatalog, database_name: str, table_name: str, table_schema_nested: Schema +) -> None: + identifier = (database_name, table_name) + + catalog.create_namespace(namespace=database_name) + table = catalog.create_table(identifier=identifier, schema=table_schema_nested) + + last_updated_ms = table.metadata.last_updated_ms + original_table_metadata_location = table.metadata_location + original_table_last_updated_ms = table.metadata.last_updated_ms + + transaction = table.transaction() + update = transaction.update_schema() + update.add_column(path="b", field_type=IntegerType()) + update.commit() + transaction.commit_transaction() + + updated_table_metadata = table.metadata + assert updated_table_metadata.current_schema_id == 1 + assert len(updated_table_metadata.schemas) == 2 + assert updated_table_metadata.last_updated_ms > last_updated_ms + assert updated_table_metadata.metadata_log[0].metadata_file == original_table_metadata_location + assert updated_table_metadata.metadata_log[0].timestamp_ms == original_table_last_updated_ms + assert table.schema().columns[-1].name == "b" + + +def test_write_pyarrow_table(catalog: S3TablesCatalog, database_name: str, table_name: str) -> None: + identifier = (database_name, table_name) + catalog.create_namespace(namespace=database_name) + + import pyarrow as pa + + pyarrow_table = pa.Table.from_arrays( + [ + pa.array([None, "A", "B", "C"]), # 'foo' column + pa.array([1, 2, 3, 4]), # 'bar' column + pa.array([True, None, False, True]), # 'baz' column + pa.array([None, "A", "B", "C"]), # 'large' column + ], + schema=pa.schema( + [ + pa.field("foo", pa.large_string(), nullable=True), + pa.field("bar", pa.int32(), nullable=False), + pa.field("baz", pa.bool_(), nullable=True), + pa.field("large", pa.large_string(), nullable=True), + ] + ), + ) + table = catalog.create_table(identifier=identifier, schema=pyarrow_table.schema) + table.append(pyarrow_table) + + assert table.scan().to_arrow().num_rows == pyarrow_table.num_rows + + +def test_commit_new_data_to_table(catalog: S3TablesCatalog, database_name: str, table_name: str) -> None: + identifier = (database_name, table_name) + catalog.create_namespace(namespace=database_name) + + import pyarrow as pa + + pyarrow_table = pa.Table.from_arrays( + [ + pa.array([None, "A", "B", "C"]), # 'foo' column + pa.array([1, 2, 3, 4]), # 'bar' column + pa.array([True, None, False, True]), # 'baz' column + pa.array([None, "A", "B", "C"]), # 'large' column + ], + schema=pa.schema( + [ + pa.field("foo", pa.large_string(), nullable=True), + pa.field("bar", pa.int32(), nullable=False), + pa.field("baz", pa.bool_(), nullable=True), + pa.field("large", pa.large_string(), nullable=True), + ] + ), + ) + + table = catalog.create_table(identifier=identifier, schema=pyarrow_table.schema) + table.append(pyarrow_table) + + row_count = table.scan().to_arrow().num_rows + assert row_count + last_updated_ms = table.metadata.last_updated_ms + original_table_metadata_location = table.metadata_location + original_table_last_updated_ms = table.metadata.last_updated_ms + + transaction = table.transaction() + transaction.append(table.scan().to_arrow()) + transaction.commit_transaction() + + updated_table_metadata = table.metadata + assert updated_table_metadata.last_updated_ms > last_updated_ms + assert updated_table_metadata.metadata_log[-1].metadata_file == original_table_metadata_location + assert updated_table_metadata.metadata_log[-1].timestamp_ms == original_table_last_updated_ms + assert table.scan().to_arrow().num_rows == 2 * row_count + + +@pytest.mark.xfail(raises=NotImplementedError) +def test_create_table_transaction( + catalog: S3TablesCatalog, database_name: str, table_name: str, table_schema_nested: str +) -> None: + identifier = (database_name, table_name) + catalog.create_namespace(namespace=database_name) + + with catalog.create_table_transaction( + identifier, + table_schema_nested, + partition_spec=PartitionSpec(PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="foo")), + ) as txn: + last_updated_metadata = txn.table_metadata.last_updated_ms + with txn.update_schema() as update_schema: + update_schema.add_column(path="b", field_type=IntegerType()) + + with txn.update_spec() as update_spec: + update_spec.add_identity("bar") + + txn.set_properties(test_a="test_aa", test_b="test_b", test_c="test_c") + + table = catalog.load_table(identifier) + + assert table.schema().find_field("b").field_type == IntegerType() + assert table.properties == {"test_a": "test_aa", "test_b": "test_b", "test_c": "test_c"} + assert table.spec().last_assigned_field_id == 1001 + assert table.spec().fields_by_source_id(1)[0].name == "foo" + assert table.spec().fields_by_source_id(1)[0].field_id == 1000 + assert table.spec().fields_by_source_id(1)[0].transform == IdentityTransform() + assert table.spec().fields_by_source_id(2)[0].name == "bar" + assert table.spec().fields_by_source_id(2)[0].field_id == 1001 + assert table.spec().fields_by_source_id(2)[0].transform == IdentityTransform() + assert table.metadata.last_updated_ms > last_updated_metadata