diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md index 5758628f48..645d18f645 100644 --- a/mkdocs/docs/configuration.md +++ b/mkdocs/docs/configuration.md @@ -550,6 +550,104 @@ catalog: +### S3Tables Catalog + +The S3Tables Catalog leverages the catalog functionalities of the Amazon S3Tables service and requires an existing S3 Tables Bucket to operate. + +To use Amazon S3Tables as your catalog, you can configure pyiceberg using one of the following methods. Additionally, refer to the [AWS documentation](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-configure.html) on configuring credentials to set up your AWS account credentials locally. + +If you intend to use the same credentials for both the S3Tables Catalog and S3 FileIO, you can configure the [`client.*` properties](configuration.md#unified-aws-credentials) to streamline the process. + +Note that the S3Tables Catalog manages the underlying table locations internally, which makes it incompatible with S3-like storage systems such as MinIO. If you specify the `s3tables.endpoint`, ensure that the `s3.endpoint` is configured accordingly. + +```yaml +catalog: + default: + type: s3tables + warehouse: arn:aws:s3tables:us-east-1:012345678901:bucket/pyiceberg-catalog +``` + +If you prefer to pass the credentials explicitly to the client instead of relying on environment variables, + +```yaml +catalog: + default: + type: s3tables + s3tables.access-key-id: + s3tables.secret-access-key: + s3tables.session-token: + s3tables.region: + s3tables.endpoint: http://localhost:9000 + s3.endpoint: http://localhost:9000 +``` + + + +!!! Note "Client-specific Properties" + `s3tables.*` properties are for S3TablesCatalog only. If you want to use the same credentials for both S3TablesCatalog and S3 FileIO, you can set the `client.*` properties. See the [Unified AWS Credentials](configuration.md#unified-aws-credentials) section for more details. + + + + + +| Key | Example | Description | +| -------------------------- | ------------------- | -------------------------------------------------------------------------- | +| s3tables.profile-name | default | Configure the static profile used to access the S3Tables Catalog | +| s3tables.region | us-east-1 | Set the region of the S3Tables Catalog | +| s3tables.access-key-id | admin | Configure the static access key id used to access the S3Tables Catalog | +| s3tables.secret-access-key | password | Configure the static secret access key used to access the S3Tables Catalog | +| s3tables.session-token | AQoDYXdzEJr... | Configure the static session token used to access the S3Tables Catalog | +| s3tables.endpoint | ... | Configure the AWS endpoint | +| s3tables.warehouse | arn:aws:s3tables... | Set the underlying S3 Table Bucket | + + + + + +!!! warning "Removed Properties" + The properties `profile_name`, `region_name`, `aws_access_key_id`, `aws_secret_access_key`, and `aws_session_token` were deprecated and removed in 0.8.0 + + + +An example usage of the S3Tables Catalog is shown below: + +```python +from pyiceberg.catalog.s3tables import S3TablesCatalog +import pyarrow as pa + + +table_bucket_arn: str = "..." +aws_region: str = "..." + +properties = {"s3tables.warehouse": table_bucket_arn, "s3tables.region": aws_region} +catalog = S3TablesCatalog(name="s3tables_catalog", **properties) + +namespace = "prod" + +catalog.create_namespace(namespace=namespace) + +pyarrow_table = pa.Table.from_arrays( + [ + pa.array([None, "A", "B", "C"]), + pa.array([1, 2, 3, 4]), + pa.array([True, None, False, True]), + pa.array([None, "A", "B", "C"]), + ], + schema=pa.schema( + [ + pa.field("foo", pa.large_string(), nullable=True), + pa.field("bar", pa.int32(), nullable=False), + pa.field("baz", pa.bool_(), nullable=True), + pa.field("large", pa.large_string(), nullable=True), + ] + ), +) + +identifier = (namespace, "orders") +table = catalog.create_table(identifier=identifier, schema=pyarrow_table.schema) +table.append(pyarrow_table) +``` + ### Custom Catalog Implementations If you want to load any custom catalog implementation, you can set catalog configurations like the following: diff --git a/poetry.lock b/poetry.lock index 3ab87e4f0b..a931d3e02d 100644 --- a/poetry.lock +++ b/poetry.lock @@ -58,7 +58,7 @@ description = "Happy Eyeballs for asyncio" optional = true python-versions = ">=3.9" groups = ["main"] -markers = "extra == \"adlfs\" or extra == \"gcsfs\" or extra == \"s3fs\"" +markers = "extra == \"s3fs\" or extra == \"adlfs\" or extra == \"gcsfs\"" files = [ {file = "aiohappyeyeballs-2.4.6-py3-none-any.whl", hash = "sha256:147ec992cf873d74f5062644332c539fcd42956dc69453fe5204195e560517e1"}, {file = "aiohappyeyeballs-2.4.6.tar.gz", hash = "sha256:9b05052f9042985d32ecbe4b59a77ae19c006a78f1344d7fdad69d28ded3d0b0"}, @@ -71,7 +71,7 @@ description = "Async http client/server framework (asyncio)" optional = true python-versions = ">=3.9" groups = ["main"] -markers = "extra == \"adlfs\" or extra == \"gcsfs\" or extra == \"s3fs\"" +markers = "extra == \"s3fs\" or extra == \"adlfs\" or extra == \"gcsfs\"" files = [ {file = "aiohttp-3.11.12-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:aa8a8caca81c0a3e765f19c6953416c58e2f4cc1b84829af01dd1c771bb2f91f"}, {file = "aiohttp-3.11.12-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:84ede78acde96ca57f6cf8ccb8a13fbaf569f6011b9a52f870c662d4dc8cd854"}, @@ -196,7 +196,7 @@ description = "aiosignal: a list of registered asynchronous callbacks" optional = true python-versions = ">=3.9" groups = ["main"] -markers = "(extra == \"adlfs\" or extra == \"gcsfs\" or extra == \"s3fs\") and (extra == \"adlfs\" or extra == \"gcsfs\" or extra == \"s3fs\" or extra == \"ray\")" +markers = "(extra == \"s3fs\" or extra == \"adlfs\" or extra == \"gcsfs\") and (extra == \"s3fs\" or extra == \"adlfs\" or extra == \"gcsfs\" or extra == \"ray\")" files = [ {file = "aiosignal-1.3.2-py2.py3-none-any.whl", hash = "sha256:45cde58e409a301715980c2b01d0c28bdde3770d8290b5eb2173759d9acb31a5"}, {file = "aiosignal-1.3.2.tar.gz", hash = "sha256:a8c255c66fafb1e499c9351d0bf32ff2d8a0321595ebac3b93713656d2436f54"}, @@ -248,7 +248,7 @@ description = "Timeout context manager for asyncio programs" optional = true python-versions = ">=3.8" groups = ["main"] -markers = "(extra == \"adlfs\" or extra == \"gcsfs\" or extra == \"s3fs\") and python_version <= \"3.10\"" +markers = "(extra == \"s3fs\" or extra == \"adlfs\" or extra == \"gcsfs\") and python_version <= \"3.10\"" files = [ {file = "async_timeout-5.0.1-py3-none-any.whl", hash = "sha256:39e3809566ff85354557ec2398b55e096c8364bacac9405a7a1fa429e77fe76c"}, {file = "async_timeout-5.0.1.tar.gz", hash = "sha256:d9321a7a3d5a6a5e187e824d2fa0793ce379a202935782d555d6e9d2735677d3"}, @@ -265,7 +265,7 @@ files = [ {file = "attrs-25.1.0-py3-none-any.whl", hash = "sha256:c75a69e28a550a7e93789579c22aa26b0f5b83b75dc4e08fe092980051e1090a"}, {file = "attrs-25.1.0.tar.gz", hash = "sha256:1c97078a80c814273a76b2a298a932eb681c87415c11dee0a6921de7f1b02c3e"}, ] -markers = {main = "(extra == \"adlfs\" or extra == \"gcsfs\" or extra == \"s3fs\") and (extra == \"adlfs\" or extra == \"gcsfs\" or extra == \"s3fs\" or extra == \"ray\")"} +markers = {main = "(extra == \"s3fs\" or extra == \"adlfs\" or extra == \"gcsfs\") and (extra == \"s3fs\" or extra == \"adlfs\" or extra == \"gcsfs\" or extra == \"ray\")"} [package.extras] benchmark = ["cloudpickle ; platform_python_implementation == \"CPython\"", "hypothesis", "mypy (>=1.11.1) ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\"", "pympler", "pytest (>=4.3.0)", "pytest-codspeed", "pytest-mypy-plugins ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\"", "pytest-xdist[psutil]"] @@ -467,7 +467,7 @@ files = [ {file = "boto3-1.37.1-py3-none-any.whl", hash = "sha256:4320441f904435a1b85e6ecb81793192e522c737cc9ed6566014e29f0a11cb22"}, {file = "boto3-1.37.1.tar.gz", hash = "sha256:96d18f7feb0c1fcb95f8837b74b6c8880e1b4e35ce5f8a8f8cb243a090c278ed"}, ] -markers = {main = "extra == \"dynamodb\" or extra == \"glue\" or extra == \"rest-sigv4\""} +markers = {main = "extra == \"glue\" or extra == \"dynamodb\" or extra == \"rest-sigv4\" or extra == \"s3tables\""} [package.dependencies] botocore = ">=1.37.1,<1.38.0" @@ -488,7 +488,7 @@ files = [ {file = "botocore-1.37.1-py3-none-any.whl", hash = "sha256:c1db1bfc5d8c6b3b6d1ca6794f605294b4264e82a7e727b88e0fef9c2b9fbb9c"}, {file = "botocore-1.37.1.tar.gz", hash = "sha256:b194db8fb2a0ffba53568c364ae26166e7eec0445496b2ac86a6e142f3dd982f"}, ] -markers = {main = "extra == \"dynamodb\" or extra == \"glue\" or extra == \"rest-sigv4\" or extra == \"s3fs\""} +markers = {main = "extra == \"glue\" or extra == \"dynamodb\" or extra == \"rest-sigv4\" or extra == \"s3tables\" or extra == \"s3fs\""} [package.dependencies] jmespath = ">=0.7.1,<2.0.0" @@ -1436,7 +1436,7 @@ description = "A list-like structure which implements collections.abc.MutableSeq optional = true python-versions = ">=3.8" groups = ["main"] -markers = "(extra == \"adlfs\" or extra == \"gcsfs\" or extra == \"s3fs\") and (extra == \"adlfs\" or extra == \"gcsfs\" or extra == \"s3fs\" or extra == \"ray\")" +markers = "(extra == \"s3fs\" or extra == \"adlfs\" or extra == \"gcsfs\") and (extra == \"s3fs\" or extra == \"adlfs\" or extra == \"gcsfs\" or extra == \"ray\")" files = [ {file = "frozenlist-1.5.0-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:5b6a66c18b5b9dd261ca98dffcb826a525334b2f29e7caa54e182255c5f6a65a"}, {file = "frozenlist-1.5.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:d1b3eb7b05ea246510b43a7e53ed1653e55c2121019a97e60cad7efb881a97bb"}, @@ -2145,7 +2145,7 @@ files = [ {file = "jmespath-1.0.1-py3-none-any.whl", hash = "sha256:02e2e4cc71b5bcab88332eebf907519190dd9e6e82107fa7f83b1003a6252980"}, {file = "jmespath-1.0.1.tar.gz", hash = "sha256:90261b206d6defd58fdd5e85f478bf633a2901798906be2ad389150c5c60edbe"}, ] -markers = {main = "extra == \"dynamodb\" or extra == \"glue\" or extra == \"rest-sigv4\" or extra == \"s3fs\""} +markers = {main = "extra == \"glue\" or extra == \"dynamodb\" or extra == \"rest-sigv4\" or extra == \"s3tables\" or extra == \"s3fs\""} [[package]] name = "joserfc" @@ -2947,7 +2947,7 @@ description = "multidict implementation" optional = true python-versions = ">=3.8" groups = ["main"] -markers = "extra == \"adlfs\" or extra == \"gcsfs\" or extra == \"s3fs\"" +markers = "extra == \"s3fs\" or extra == \"adlfs\" or extra == \"gcsfs\"" files = [ {file = "multidict-6.1.0-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:3380252550e372e8511d49481bd836264c009adb826b23fefcc5dd3c69692f60"}, {file = "multidict-6.1.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:99f826cbf970077383d7de805c0681799491cb939c25450b9b5b3ced03ca99f1"}, @@ -3562,7 +3562,7 @@ description = "Accelerated property cache" optional = true python-versions = ">=3.9" groups = ["main"] -markers = "extra == \"adlfs\" or extra == \"gcsfs\" or extra == \"s3fs\"" +markers = "extra == \"s3fs\" or extra == \"adlfs\" or extra == \"gcsfs\"" files = [ {file = "propcache-0.2.1-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:6b3f39a85d671436ee3d12c017f8fdea38509e4f25b28eb25877293c98c243f6"}, {file = "propcache-0.2.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:39d51fbe4285d5db5d92a929e3e21536ea3dd43732c5b177c7ef03f918dff9f2"}, @@ -3862,7 +3862,7 @@ files = [ {file = "pyarrow-19.0.1-cp39-cp39-win_amd64.whl", hash = "sha256:8464c9fbe6d94a7fe1599e7e8965f350fd233532868232ab2596a71586c5a429"}, {file = "pyarrow-19.0.1.tar.gz", hash = "sha256:3bf266b485df66a400f282ac0b6d1b500b9d2ae73314a153dbe97d6d5cc8a99e"}, ] -markers = {main = "extra == \"daft\" or extra == \"duckdb\" or extra == \"pandas\" or extra == \"pyarrow\" or extra == \"ray\""} +markers = {main = "extra == \"pyarrow\" or extra == \"pandas\" or extra == \"duckdb\" or extra == \"ray\" or extra == \"daft\""} [package.extras] test = ["cffi", "hypothesis", "pandas", "pytest", "pytz"] @@ -4868,7 +4868,7 @@ files = [ {file = "s3transfer-0.11.2-py3-none-any.whl", hash = "sha256:be6ecb39fadd986ef1701097771f87e4d2f821f27f6071c872143884d2950fbc"}, {file = "s3transfer-0.11.2.tar.gz", hash = "sha256:3b39185cb72f5acc77db1a58b6e25b977f28d20496b6e58d6813d75f464d632f"}, ] -markers = {main = "extra == \"dynamodb\" or extra == \"glue\" or extra == \"rest-sigv4\""} +markers = {main = "extra == \"glue\" or extra == \"dynamodb\" or extra == \"rest-sigv4\" or extra == \"s3tables\""} [package.dependencies] botocore = ">=1.36.0,<2.0a.0" @@ -5586,7 +5586,7 @@ description = "Yet another URL library" optional = true python-versions = ">=3.9" groups = ["main"] -markers = "extra == \"adlfs\" or extra == \"gcsfs\" or extra == \"s3fs\"" +markers = "extra == \"s3fs\" or extra == \"adlfs\" or extra == \"gcsfs\"" files = [ {file = "yarl-1.18.3-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:7df647e8edd71f000a5208fe6ff8c382a1de8edfbccdbbfe649d263de07d8c34"}, {file = "yarl-1.18.3-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:c69697d3adff5aa4f874b19c0e4ed65180ceed6318ec856ebc423aa5850d84f7"}, @@ -5828,6 +5828,7 @@ pyiceberg-core = ["pyiceberg-core"] ray = ["pandas", "pyarrow", "ray", "ray"] rest-sigv4 = ["boto3"] s3fs = ["s3fs"] +s3tables = ["boto3"] snappy = ["python-snappy"] sql-postgres = ["psycopg2-binary", "sqlalchemy"] sql-sqlite = ["sqlalchemy"] @@ -5836,4 +5837,4 @@ zstandard = ["zstandard"] [metadata] lock-version = "2.1" python-versions = "^3.9.2, !=3.9.7" -content-hash = "23cf7612e3b4bdda09d8f7e922c8c0773eb4b25931e6e2ec888c57eec6b9516b" +content-hash = "aa888fb8454d05cbb7ea23b4bf21d8354b51ea9b061f9657f284fa72c921831a" diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py index cf649ba7d6..5a11a04bc3 100644 --- a/pyiceberg/catalog/__init__.py +++ b/pyiceberg/catalog/__init__.py @@ -117,6 +117,7 @@ class CatalogType(Enum): DYNAMODB = "dynamodb" SQL = "sql" IN_MEMORY = "in-memory" + S3TABLES = "s3tables" def load_rest(name: str, conf: Properties) -> Catalog: @@ -172,6 +173,15 @@ def load_in_memory(name: str, conf: Properties) -> Catalog: raise NotInstalledError("SQLAlchemy support not installed: pip install 'pyiceberg[sql-sqlite]'") from exc +def load_s3tables(name: str, conf: Properties) -> Catalog: + try: + from pyiceberg.catalog.s3tables import S3TablesCatalog + + return S3TablesCatalog(name, **conf) + except ImportError as exc: + raise NotInstalledError("AWS S3Tables support not installed: pip install 'pyiceberg[s3tables]'") from exc + + AVAILABLE_CATALOGS: dict[CatalogType, Callable[[str, Properties], Catalog]] = { CatalogType.REST: load_rest, CatalogType.HIVE: load_hive, @@ -179,6 +189,7 @@ def load_in_memory(name: str, conf: Properties) -> Catalog: CatalogType.DYNAMODB: load_dynamodb, CatalogType.SQL: load_sql, CatalogType.IN_MEMORY: load_in_memory, + CatalogType.S3TABLES: load_s3tables, } @@ -935,8 +946,8 @@ def _get_default_warehouse_location(self, database_name: str, table_name: str) - raise ValueError("No default path is set, please specify a location when creating a table") @staticmethod - def _write_metadata(metadata: TableMetadata, io: FileIO, metadata_path: str) -> None: - ToOutputFile.table_metadata(metadata, io.new_output(metadata_path)) + def _write_metadata(metadata: TableMetadata, io: FileIO, metadata_path: str, overwrite: bool = False) -> None: + ToOutputFile.table_metadata(metadata, io.new_output(metadata_path), overwrite=overwrite) @staticmethod def _parse_metadata_version(metadata_location: str) -> int: diff --git a/pyiceberg/catalog/s3tables.py b/pyiceberg/catalog/s3tables.py new file mode 100644 index 0000000000..50581c11ba --- /dev/null +++ b/pyiceberg/catalog/s3tables.py @@ -0,0 +1,387 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import re +from typing import TYPE_CHECKING, List, Optional, Set, Tuple, Union + +import boto3 + +from pyiceberg.catalog import MetastoreCatalog, PropertiesUpdateSummary +from pyiceberg.exceptions import ( + CommitFailedException, + InvalidNamespaceName, + InvalidTableName, + NamespaceNotEmptyError, + NoSuchNamespaceError, + NoSuchTableError, + S3TablesError, + TableAlreadyExistsError, + TableBucketNotFound, +) +from pyiceberg.io import AWS_ACCESS_KEY_ID, AWS_REGION, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN, load_file_io +from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec +from pyiceberg.schema import Schema +from pyiceberg.serializers import FromInputFile +from pyiceberg.table import CommitTableResponse, CreateTableTransaction, Table +from pyiceberg.table.locations import load_location_provider +from pyiceberg.table.metadata import new_table_metadata +from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder +from pyiceberg.table.update import TableRequirement, TableUpdate +from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties +from pyiceberg.utils.properties import get_first_property_value + +if TYPE_CHECKING: + import pyarrow as pa + +S3TABLES_PROFILE_NAME = "s3tables.profile-name" +S3TABLES_REGION = "s3tables.region" +S3TABLES_ACCESS_KEY_ID = "s3tables.access-key-id" +S3TABLES_SECRET_ACCESS_KEY = "s3tables.secret-access-key" +S3TABLES_SESSION_TOKEN = "s3tables.session-token" + +S3TABLES_TABLE_BUCKET_ARN = "s3tables.warehouse" + +S3TABLES_ENDPOINT = "s3tables.endpoint" + +# for naming rules see: https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-tables-buckets-naming.html +S3TABLES_VALID_NAME_REGEX = pattern = re.compile("[a-z0-9][a-z0-9_]{1,61}[a-z0-9]") +S3TABLES_RESERVED_NAMESPACE = "aws_s3_metadata" + +S3TABLES_FORMAT = "ICEBERG" + + +class S3TablesCatalog(MetastoreCatalog): + def __init__(self, name: str, **properties: str): + super().__init__(name, **properties) + + if S3TABLES_TABLE_BUCKET_ARN not in self.properties: + raise S3TablesError(f"No table bucket arn specified. Set it via the {S3TABLES_TABLE_BUCKET_ARN} property.") + self.table_bucket_arn = self.properties[S3TABLES_TABLE_BUCKET_ARN] + + session = boto3.Session( + profile_name=properties.get(S3TABLES_PROFILE_NAME), + region_name=get_first_property_value(properties, S3TABLES_REGION, AWS_REGION), + aws_access_key_id=get_first_property_value(properties, S3TABLES_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID), + aws_secret_access_key=get_first_property_value(properties, S3TABLES_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY), + aws_session_token=get_first_property_value(properties, S3TABLES_SESSION_TOKEN, AWS_SESSION_TOKEN), + ) + try: + self.s3tables = session.client("s3tables", endpoint_url=properties.get(S3TABLES_ENDPOINT)) + except boto3.session.UnknownServiceError as e: + raise S3TablesError(f"'s3tables' requires boto3>=1.35.74. Current version: {boto3.__version__}.") from e + + try: + self.s3tables.get_table_bucket(tableBucketARN=self.table_bucket_arn) + except self.s3tables.exceptions.NotFoundException as e: + raise TableBucketNotFound(e) from e + + def commit_table( + self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...] + ) -> CommitTableResponse: + table_identifier = table.name() + database_name, table_name = self.identifier_to_database_and_table(table_identifier, NoSuchTableError) + + current_table: Optional[Table] + version_token: Optional[str] + try: + current_table, version_token = self._load_table_and_version(identifier=table_identifier) + except NoSuchTableError: + current_table = None + version_token = None + + if current_table: + updated_staged_table = self._update_and_stage_table(current_table, table_identifier, requirements, updates) + if updated_staged_table.metadata == current_table.metadata: + # no changes, do nothing + return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location) + + self._write_metadata( + metadata=updated_staged_table.metadata, + io=updated_staged_table.io, + metadata_path=updated_staged_table.metadata_location, + overwrite=True, + ) + + # try to update metadata location which will fail if the versionToken changed meanwhile + try: + self.s3tables.update_table_metadata_location( + tableBucketARN=self.table_bucket_arn, + namespace=database_name, + name=table_name, + versionToken=version_token, + metadataLocation=updated_staged_table.metadata_location, + ) + except self.s3tables.exceptions.ConflictException as e: + raise CommitFailedException( + f"Cannot commit {database_name}.{table_name} because of a concurrent update to the table version {version_token}." + ) from e + return CommitTableResponse( + metadata=updated_staged_table.metadata, metadata_location=updated_staged_table.metadata_location + ) + else: + # table does not exist, create it + raise NotImplementedError("Creating a table on commit is currently not supported.") + + def create_table_transaction( + self, + identifier: Union[str, Identifier], + schema: Union[Schema, "pa.Schema"], + location: Optional[str] = None, + partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, + sort_order: SortOrder = UNSORTED_SORT_ORDER, + properties: Properties = EMPTY_DICT, + ) -> CreateTableTransaction: + raise NotImplementedError("create_table_transaction currently not supported.") + + def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None: + if properties: + raise NotImplementedError("Setting namespace properties is not supported.") + valid_namespace: str = self._validate_namespace_identifier(namespace) + self.s3tables.create_namespace(tableBucketARN=self.table_bucket_arn, namespace=[valid_namespace]) + + def _validate_namespace_identifier(self, namespace: Union[str, Identifier]) -> str: + namespace = self.identifier_to_database(namespace) + + if not S3TABLES_VALID_NAME_REGEX.fullmatch(namespace) or namespace == S3TABLES_RESERVED_NAMESPACE: + raise InvalidNamespaceName( + "The specified namespace name is not valid. See https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-tables-buckets-naming.html for naming rules." + ) + + return namespace + + def _validate_database_and_table_identifier(self, identifier: Union[str, Identifier]) -> Tuple[str, str]: + namespace, table_name = self.identifier_to_database_and_table(identifier) + + namespace = self._validate_namespace_identifier(namespace) + + if not S3TABLES_VALID_NAME_REGEX.fullmatch(table_name): + raise InvalidTableName( + "The specified table name is not valid. See https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-tables-buckets-naming.html for naming rules." + ) + + return namespace, table_name + + def create_table( + self, + identifier: Union[str, Identifier], + schema: Union[Schema, "pa.Schema"], + location: Optional[str] = None, + partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, + sort_order: SortOrder = UNSORTED_SORT_ORDER, + properties: Properties = EMPTY_DICT, + ) -> Table: + if location: + raise NotImplementedError("S3 Tables does not support user specified table locations.") + namespace, table_name = self._validate_database_and_table_identifier(identifier) + + schema: Schema = self._convert_schema_if_needed(schema) # type: ignore + + # creating a new table with S3 Tables is a two step process. We first have to create an S3 Table with the + # S3 Tables API and then write the new metadata.json to the warehouseLocation associated with the newly + # created S3 Table. + try: + version_token = self.s3tables.create_table( + tableBucketARN=self.table_bucket_arn, namespace=namespace, name=table_name, format=S3TABLES_FORMAT + )["versionToken"] + except self.s3tables.exceptions.NotFoundException as e: + raise NoSuchNamespaceError(f"Cannot create {namespace}.{table_name} because no such namespace exists.") from e + except self.s3tables.exceptions.ConflictException as e: + raise TableAlreadyExistsError( + f"Cannot create {namespace}.{table_name} because a table of the same name already exists in the namespace." + ) from e + + try: + response = self.s3tables.get_table_metadata_location( + tableBucketARN=self.table_bucket_arn, namespace=namespace, name=table_name + ) + warehouse_location = response["warehouseLocation"] + + provider = load_location_provider(warehouse_location, properties) + metadata_location = provider.new_table_metadata_file_location() + metadata = new_table_metadata( + location=warehouse_location, + schema=schema, + partition_spec=partition_spec, + sort_order=sort_order, + properties=properties, + ) + + io = load_file_io(properties=self.properties, location=metadata_location) + # this triggers unsupported list operation error as S3 Table Buckets only support a subset of the S3 Bucket API, + # setting overwrite=True is a workaround for now since it prevents a call to list_objects + self._write_metadata(metadata, io, metadata_location, overwrite=True) + + try: + self.s3tables.update_table_metadata_location( + tableBucketARN=self.table_bucket_arn, + namespace=namespace, + name=table_name, + versionToken=version_token, + metadataLocation=metadata_location, + ) + except self.s3tables.exceptions.ConflictException as e: + raise CommitFailedException( + f"Cannot create {namespace}.{table_name} because of a concurrent update to the table version {version_token}." + ) from e + except: + self.s3tables.delete_table(tableBucketARN=self.table_bucket_arn, namespace=namespace, name=table_name) + raise + + return self.load_table(identifier=identifier) + + def drop_namespace(self, namespace: Union[str, Identifier]) -> None: + namespace = self._validate_namespace_identifier(namespace) + try: + self.s3tables.delete_namespace(tableBucketARN=self.table_bucket_arn, namespace=namespace) + except self.s3tables.exceptions.ConflictException as e: + raise NamespaceNotEmptyError(f"Namespace {namespace} is not empty.") from e + + def drop_table(self, identifier: Union[str, Identifier]) -> None: + raise NotImplementedError( + "S3 Tables does not support the delete_table operation. You can retry with the purge_table operation." + ) + + def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]: + if namespace: + # hierarchical namespaces are not supported + return [] + paginator = self.s3tables.get_paginator("list_namespaces") + + namespaces: List[Identifier] = [] + for page in paginator.paginate(tableBucketARN=self.table_bucket_arn): + namespaces.extend(tuple(entry["namespace"]) for entry in page["namespaces"]) + + return namespaces + + def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]: + namespace = self._validate_namespace_identifier(namespace) + paginator = self.s3tables.get_paginator("list_tables") + tables: List[Identifier] = [] + try: + for page in paginator.paginate(tableBucketARN=self.table_bucket_arn, namespace=namespace): + tables.extend((namespace, table["name"]) for table in page["tables"]) + except self.s3tables.exceptions.NotFoundException as e: + raise NoSuchNamespaceError(f"Namespace {namespace} does not exist.") from e + return tables + + def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties: + namespace = self._validate_namespace_identifier(namespace) + try: + response = self.s3tables.get_namespace(tableBucketARN=self.table_bucket_arn, namespace=namespace) + except self.s3tables.exceptions.NotFoundException as e: + raise NoSuchNamespaceError(f"Namespace {namespace} does not exist.") from e + return { + "namespace": response["namespace"], + "createdAt": response["createdAt"], + "createdBy": response["createdBy"], + "ownerAccountId": response["ownerAccountId"], + } + + def load_table(self, identifier: Union[str, Identifier]) -> Table: + table, _ = self._load_table_and_version(identifier) + return table + + def _load_table_and_version(self, identifier: Union[str, Identifier]) -> Tuple[Table, str]: + namespace, table_name = self._validate_database_and_table_identifier(identifier) + + try: + response = self.s3tables.get_table_metadata_location( + tableBucketARN=self.table_bucket_arn, namespace=namespace, name=table_name + ) + except self.s3tables.exceptions.NotFoundException as e: + raise NoSuchTableError(f"No table with identifier {identifier} exists.") from e + + metadata_location = response.get("metadataLocation") + if not metadata_location: + raise S3TablesError("No table metadata found.") + + version_token = response["versionToken"] + + io = load_file_io(properties=self.properties, location=metadata_location) + file = io.new_input(metadata_location) + metadata = FromInputFile.table_metadata(file) + return Table( + identifier=(namespace, table_name), + metadata=metadata, + metadata_location=metadata_location, + io=self._load_file_io(metadata.properties, metadata_location), + catalog=self, + ), version_token + + def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table: + from_namespace, from_table_name = self._validate_database_and_table_identifier(from_identifier) + to_namespace, to_table_name = self._validate_database_and_table_identifier(to_identifier) + + version_token = self.s3tables.get_table( + tableBucketARN=self.table_bucket_arn, namespace=from_namespace, name=from_table_name + )["versionToken"] + + self.s3tables.rename_table( + tableBucketARN=self.table_bucket_arn, + namespace=from_namespace, + name=from_table_name, + newNamespaceName=to_namespace, + newName=to_table_name, + versionToken=version_token, + ) + + return self.load_table(to_identifier) + + def table_exists(self, identifier: Union[str, Identifier]) -> bool: + namespace, table_name = self._validate_database_and_table_identifier(identifier) + try: + self.s3tables.get_table(tableBucketARN=self.table_bucket_arn, namespace=namespace, name=table_name) + except self.s3tables.exceptions.NotFoundException: + return False + return True + + def update_namespace_properties( + self, namespace: Union[str, Identifier], removals: Optional[Set[str]] = None, updates: Properties = EMPTY_DICT + ) -> PropertiesUpdateSummary: + # namespace properties are read only + raise NotImplementedError("Namespace properties are read only.") + + def purge_table(self, identifier: Union[str, Identifier]) -> None: + namespace, table_name = self._validate_database_and_table_identifier(identifier) + try: + response = self.s3tables.get_table(tableBucketARN=self.table_bucket_arn, namespace=namespace, name=table_name) + except self.s3tables.exceptions.NotFoundException as e: + raise NoSuchTableError(f"No table with identifier {identifier} exists.") from e + + version_token = response["versionToken"] + try: + self.s3tables.delete_table( + tableBucketARN=self.table_bucket_arn, + namespace=namespace, + name=table_name, + versionToken=version_token, + ) + except self.s3tables.exceptions.ConflictException as e: + raise CommitFailedException( + f"Cannot delete {namespace}.{table_name} because of a concurrent update to the table version {version_token}." + ) from e + + def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table: + raise NotImplementedError + + def drop_view(self, identifier: Union[str, Identifier]) -> None: + raise NotImplementedError + + def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]: + raise NotImplementedError + + def view_exists(self, identifier: Union[str, Identifier]) -> bool: + raise NotImplementedError diff --git a/pyiceberg/exceptions.py b/pyiceberg/exceptions.py index 56574ff471..8367d91b79 100644 --- a/pyiceberg/exceptions.py +++ b/pyiceberg/exceptions.py @@ -112,6 +112,22 @@ class GenericDynamoDbError(DynamoDbError): pass +class S3TablesError(Exception): + pass + + +class InvalidNamespaceName(S3TablesError): + pass + + +class InvalidTableName(S3TablesError): + pass + + +class TableBucketNotFound(S3TablesError): + pass + + class CommitFailedException(Exception): """Commit failed, refresh and try again.""" diff --git a/pyproject.toml b/pyproject.toml index 2732a434e6..00396da41a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -91,7 +91,7 @@ pre-commit = "4.1.0" fastavro = "1.10.0" coverage = { version = "^7.4.2", extras = ["toml"] } requests-mock = "1.12.1" -moto = { version = "^5.0.2", extras = ["server"] } +moto = { version = "^5.0.28", extras = ["server"] } typing-extensions = "4.12.2" pytest-mock = "3.14.0" pyspark = "3.5.5" @@ -304,6 +304,7 @@ sql-postgres = ["sqlalchemy", "psycopg2-binary"] sql-sqlite = ["sqlalchemy"] gcsfs = ["gcsfs"] rest-sigv4 = ["boto3"] +s3tables = ["boto3"] pyiceberg-core = ["pyiceberg-core"] [tool.pytest.ini_options] diff --git a/tests/catalog/integration_test_s3tables.py b/tests/catalog/integration_test_s3tables.py new file mode 100644 index 0000000000..7fd134b23c --- /dev/null +++ b/tests/catalog/integration_test_s3tables.py @@ -0,0 +1,286 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import pytest + +from pyiceberg.catalog.s3tables import S3TablesCatalog +from pyiceberg.exceptions import NoSuchNamespaceError, NoSuchTableError, TableBucketNotFound +from pyiceberg.partitioning import PartitionField, PartitionSpec +from pyiceberg.schema import Schema +from pyiceberg.transforms import IdentityTransform +from pyiceberg.types import IntegerType + + +@pytest.fixture +def database_name(database_name: str) -> str: + # naming rules prevent "-" in namespaces for s3 table buckets + return database_name.replace("-", "_") + + +@pytest.fixture +def table_name(table_name: str) -> str: + # naming rules prevent "-" in table namees for s3 table buckets + return table_name.replace("-", "_") + + +@pytest.fixture +def table_bucket_arn() -> str: + """Set the environment variable AWS_TEST_S3_TABLE_BUCKET_ARN for an S3 table bucket to test.""" + import os + + table_bucket_arn = os.getenv("AWS_TEST_S3_TABLE_BUCKET_ARN") + if not table_bucket_arn: + raise ValueError( + "Please specify a table bucket arn to run the test by setting environment variable AWS_TEST_S3_TABLE_BUCKET_ARN" + ) + return table_bucket_arn + + +@pytest.fixture +def aws_region() -> str: + import os + + aws_region = os.getenv("AWS_REGION") + if not aws_region: + raise ValueError("Please specify an AWS region to run the test by setting environment variable AWS_REGION") + return aws_region + + +@pytest.fixture +def catalog(table_bucket_arn: str, aws_region: str) -> S3TablesCatalog: + properties = {"s3tables.warehouse": table_bucket_arn, "s3tables.region": aws_region} + return S3TablesCatalog(name="test_s3tables_catalog", **properties) + + +def test_creating_catalog_validates_s3_table_bucket_exists(table_bucket_arn: str, aws_region: str) -> None: + properties = {"s3tables.warehouse": f"{table_bucket_arn}-modified", "s3tables.region": aws_region} + with pytest.raises(TableBucketNotFound): + S3TablesCatalog(name="test_s3tables_catalog", **properties) + + +def test_create_namespace(catalog: S3TablesCatalog, database_name: str) -> None: + catalog.create_namespace(namespace=database_name) + namespaces = catalog.list_namespaces() + assert (database_name,) in namespaces + + +def test_load_namespace_properties(catalog: S3TablesCatalog, database_name: str) -> None: + catalog.create_namespace(namespace=database_name) + assert database_name in catalog.load_namespace_properties(database_name)["namespace"] + + +def test_drop_namespace(catalog: S3TablesCatalog, database_name: str) -> None: + catalog.create_namespace(namespace=database_name) + assert (database_name,) in catalog.list_namespaces() + catalog.drop_namespace(namespace=database_name) + assert (database_name,) not in catalog.list_namespaces() + + +def test_create_table(catalog: S3TablesCatalog, database_name: str, table_name: str, table_schema_nested: Schema) -> None: + identifier = (database_name, table_name) + + catalog.create_namespace(namespace=database_name) + table = catalog.create_table(identifier=identifier, schema=table_schema_nested) + + assert table == catalog.load_table(identifier) + + +def test_create_table_in_invalid_namespace_raises_exception( + catalog: S3TablesCatalog, database_name: str, table_name: str, table_schema_nested: Schema +) -> None: + identifier = (database_name, table_name) + + with pytest.raises(NoSuchNamespaceError): + catalog.create_table(identifier=identifier, schema=table_schema_nested) + + +def test_table_exists(catalog: S3TablesCatalog, database_name: str, table_name: str, table_schema_nested: Schema) -> None: + identifier = (database_name, table_name) + + catalog.create_namespace(namespace=database_name) + assert not catalog.table_exists(identifier=identifier) + catalog.create_table(identifier=identifier, schema=table_schema_nested) + assert catalog.table_exists(identifier=identifier) + + +def test_rename_table(catalog: S3TablesCatalog, database_name: str, table_name: str, table_schema_nested: Schema) -> None: + identifier = (database_name, table_name) + + catalog.create_namespace(namespace=database_name) + catalog.create_table(identifier=identifier, schema=table_schema_nested) + + to_database_name = f"{database_name}new" + to_table_name = f"{table_name}new" + to_identifier = (to_database_name, to_table_name) + catalog.create_namespace(namespace=to_database_name) + catalog.rename_table(from_identifier=identifier, to_identifier=to_identifier) + + assert not catalog.table_exists(identifier=identifier) + assert catalog.table_exists(identifier=to_identifier) + + +def test_list_tables(catalog: S3TablesCatalog, database_name: str, table_name: str, table_schema_nested: Schema) -> None: + identifier = (database_name, table_name) + + catalog.create_namespace(namespace=database_name) + assert not catalog.list_tables(namespace=database_name) + catalog.create_table(identifier=identifier, schema=table_schema_nested) + assert catalog.list_tables(namespace=database_name) + + +def test_drop_table(catalog: S3TablesCatalog, database_name: str, table_name: str, table_schema_nested: Schema) -> None: + identifier = (database_name, table_name) + + catalog.create_namespace(namespace=database_name) + catalog.create_table(identifier=identifier, schema=table_schema_nested) + + catalog.drop_table(identifier=identifier) + + with pytest.raises(NoSuchTableError): + catalog.load_table(identifier=identifier) + + +def test_commit_new_column_to_table( + catalog: S3TablesCatalog, database_name: str, table_name: str, table_schema_nested: Schema +) -> None: + identifier = (database_name, table_name) + + catalog.create_namespace(namespace=database_name) + table = catalog.create_table(identifier=identifier, schema=table_schema_nested) + + last_updated_ms = table.metadata.last_updated_ms + original_table_metadata_location = table.metadata_location + original_table_last_updated_ms = table.metadata.last_updated_ms + + transaction = table.transaction() + update = transaction.update_schema() + update.add_column(path="b", field_type=IntegerType()) + update.commit() + transaction.commit_transaction() + + updated_table_metadata = table.metadata + assert updated_table_metadata.current_schema_id == 1 + assert len(updated_table_metadata.schemas) == 2 + assert updated_table_metadata.last_updated_ms > last_updated_ms + assert updated_table_metadata.metadata_log[0].metadata_file == original_table_metadata_location + assert updated_table_metadata.metadata_log[0].timestamp_ms == original_table_last_updated_ms + assert table.schema().columns[-1].name == "b" + + +def test_write_pyarrow_table(catalog: S3TablesCatalog, database_name: str, table_name: str) -> None: + identifier = (database_name, table_name) + catalog.create_namespace(namespace=database_name) + + import pyarrow as pa + + pyarrow_table = pa.Table.from_arrays( + [ + pa.array([None, "A", "B", "C"]), # 'foo' column + pa.array([1, 2, 3, 4]), # 'bar' column + pa.array([True, None, False, True]), # 'baz' column + pa.array([None, "A", "B", "C"]), # 'large' column + ], + schema=pa.schema( + [ + pa.field("foo", pa.large_string(), nullable=True), + pa.field("bar", pa.int32(), nullable=False), + pa.field("baz", pa.bool_(), nullable=True), + pa.field("large", pa.large_string(), nullable=True), + ] + ), + ) + table = catalog.create_table(identifier=identifier, schema=pyarrow_table.schema) + table.append(pyarrow_table) + + assert table.scan().to_arrow().num_rows == pyarrow_table.num_rows + + +def test_commit_new_data_to_table(catalog: S3TablesCatalog, database_name: str, table_name: str) -> None: + identifier = (database_name, table_name) + catalog.create_namespace(namespace=database_name) + + import pyarrow as pa + + pyarrow_table = pa.Table.from_arrays( + [ + pa.array([None, "A", "B", "C"]), # 'foo' column + pa.array([1, 2, 3, 4]), # 'bar' column + pa.array([True, None, False, True]), # 'baz' column + pa.array([None, "A", "B", "C"]), # 'large' column + ], + schema=pa.schema( + [ + pa.field("foo", pa.large_string(), nullable=True), + pa.field("bar", pa.int32(), nullable=False), + pa.field("baz", pa.bool_(), nullable=True), + pa.field("large", pa.large_string(), nullable=True), + ] + ), + ) + + table = catalog.create_table(identifier=identifier, schema=pyarrow_table.schema) + table.append(pyarrow_table) + + row_count = table.scan().to_arrow().num_rows + assert row_count + last_updated_ms = table.metadata.last_updated_ms + original_table_metadata_location = table.metadata_location + original_table_last_updated_ms = table.metadata.last_updated_ms + + transaction = table.transaction() + transaction.append(table.scan().to_arrow()) + transaction.commit_transaction() + + updated_table_metadata = table.metadata + assert updated_table_metadata.last_updated_ms > last_updated_ms + assert updated_table_metadata.metadata_log[-1].metadata_file == original_table_metadata_location + assert updated_table_metadata.metadata_log[-1].timestamp_ms == original_table_last_updated_ms + assert table.scan().to_arrow().num_rows == 2 * row_count + + +@pytest.mark.xfail(raises=NotImplementedError, reason="create_table_transaction not implemented yet") +def test_create_table_transaction( + catalog: S3TablesCatalog, database_name: str, table_name: str, table_schema_nested: str +) -> None: + identifier = (database_name, table_name) + catalog.create_namespace(namespace=database_name) + + with catalog.create_table_transaction( + identifier, + table_schema_nested, + partition_spec=PartitionSpec(PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="foo")), + ) as txn: + last_updated_metadata = txn.table_metadata.last_updated_ms + with txn.update_schema() as update_schema: + update_schema.add_column(path="b", field_type=IntegerType()) + + with txn.update_spec() as update_spec: + update_spec.add_identity("bar") + + txn.set_properties(test_a="test_aa", test_b="test_b", test_c="test_c") + + table = catalog.load_table(identifier) + + assert table.schema().find_field("b").field_type == IntegerType() + assert table.properties == {"test_a": "test_aa", "test_b": "test_b", "test_c": "test_c"} + assert table.spec().last_assigned_field_id == 1001 + assert table.spec().fields_by_source_id(1)[0].name == "foo" + assert table.spec().fields_by_source_id(1)[0].field_id == 1000 + assert table.spec().fields_by_source_id(1)[0].transform == IdentityTransform() + assert table.spec().fields_by_source_id(2)[0].name == "bar" + assert table.spec().fields_by_source_id(2)[0].field_id == 1001 + assert table.spec().fields_by_source_id(2)[0].transform == IdentityTransform() + assert table.metadata.last_updated_ms > last_updated_metadata diff --git a/tests/catalog/test_s3tables.py b/tests/catalog/test_s3tables.py new file mode 100644 index 0000000000..3b744cf152 --- /dev/null +++ b/tests/catalog/test_s3tables.py @@ -0,0 +1,332 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import string +from random import choice + +import boto3 +import pytest + +from pyiceberg.catalog import load_catalog +from pyiceberg.catalog.s3tables import S3TablesCatalog +from pyiceberg.exceptions import NoSuchNamespaceError, NoSuchTableError, TableBucketNotFound +from pyiceberg.partitioning import PartitionField, PartitionSpec +from pyiceberg.schema import Schema +from pyiceberg.transforms import IdentityTransform +from pyiceberg.types import IntegerType + + +@pytest.fixture +def database_name(database_name: str) -> str: + # naming rules prevent "-" in namespaces for s3 table buckets + return database_name.replace("-", "_") + + +@pytest.fixture +def table_name(table_name: str) -> str: + # naming rules prevent "-" in table namees for s3 table buckets + return table_name.replace("-", "_") + + +@pytest.fixture() +def aws_region(_aws_credentials: None) -> str: + return "us-east-1" + + +@pytest.fixture +def table_bucket_arn(monkeypatch: pytest.MonkeyPatch, moto_endpoint_url: str, aws_region: str) -> str: + monkeypatch.setenv("AWS_ENDPOINT_URL", moto_endpoint_url) + + prefix = "pyiceberg-table-bucket-" + random_tag = "".join(choice(string.ascii_letters) for _ in range(12)) + name = (prefix + random_tag).lower() + table_bucket_arn = boto3.client("s3tables", endpoint_url=moto_endpoint_url, region_name=aws_region).create_table_bucket( + name=name + )["arn"] + return table_bucket_arn + + +@pytest.fixture(params=["pyiceberg.io.fsspec.FsspecFileIO", "pyiceberg.io.pyarrow.PyArrowFileIO"]) +def file_io_impl(request: pytest.FixtureRequest) -> str: + return request.param + + +@pytest.fixture +def catalog(table_bucket_arn: str, aws_region: str, moto_endpoint_url: str, file_io_impl: str) -> S3TablesCatalog: + properties = { + "s3tables.warehouse": table_bucket_arn, + "s3tables.region": aws_region, + "py-io-impl": file_io_impl, + "s3tables.endpoint": moto_endpoint_url, + "s3.endpoint": moto_endpoint_url, + } + return S3TablesCatalog(name="test_s3tables_catalog", **properties) + + +def test_load_catalog(table_bucket_arn: str, aws_region: str, moto_endpoint_url: str) -> None: + properties = { + "type": "s3tables", + "s3tables.warehouse": table_bucket_arn, + "s3tables.region": aws_region, + "py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO", + "s3tables.endpoint": moto_endpoint_url, + } + catalog = load_catalog(**properties) + assert isinstance(catalog, S3TablesCatalog) + + +def test_creating_catalog_validates_s3_table_bucket_exists(table_bucket_arn: str, aws_region: str) -> None: + properties = {"s3tables.warehouse": f"{table_bucket_arn}-modified", "s3tables.region": aws_region} + with pytest.raises(TableBucketNotFound): + S3TablesCatalog(name="test_s3tables_catalog", **properties) + + +def test_create_namespace(catalog: S3TablesCatalog, database_name: str) -> None: + catalog.create_namespace(namespace=database_name) + namespaces = catalog.list_namespaces() + assert (database_name,) in namespaces + + +def test_load_namespace_properties(catalog: S3TablesCatalog, database_name: str) -> None: + catalog.create_namespace(namespace=database_name) + assert database_name in catalog.load_namespace_properties(database_name)["namespace"] + + +def test_load_namespace_properties_for_invalid_namespace_raises_exception(catalog: S3TablesCatalog, database_name: str) -> None: + with pytest.raises(NoSuchNamespaceError): + catalog.load_namespace_properties(database_name) + + +def test_drop_namespace(catalog: S3TablesCatalog, database_name: str) -> None: + catalog.create_namespace(namespace=database_name) + assert (database_name,) in catalog.list_namespaces() + catalog.drop_namespace(namespace=database_name) + assert (database_name,) not in catalog.list_namespaces() + + +def test_create_table(catalog: S3TablesCatalog, database_name: str, table_name: str, table_schema_nested: Schema) -> None: + identifier = (database_name, table_name) + + catalog.create_namespace(namespace=database_name) + table = catalog.create_table(identifier=identifier, schema=table_schema_nested) + + assert table == catalog.load_table(identifier) + + +def test_create_table_in_invalid_namespace_raises_exception( + catalog: S3TablesCatalog, database_name: str, table_name: str, table_schema_nested: Schema +) -> None: + identifier = (database_name, table_name) + + with pytest.raises(NoSuchNamespaceError): + catalog.create_table(identifier=identifier, schema=table_schema_nested) + + +def test_table_exists(catalog: S3TablesCatalog, database_name: str, table_name: str, table_schema_nested: Schema) -> None: + identifier = (database_name, table_name) + + catalog.create_namespace(namespace=database_name) + assert not catalog.table_exists(identifier=identifier) + catalog.create_table(identifier=identifier, schema=table_schema_nested) + assert catalog.table_exists(identifier=identifier) + + +def test_rename_table(catalog: S3TablesCatalog, database_name: str, table_name: str, table_schema_nested: Schema) -> None: + identifier = (database_name, table_name) + + catalog.create_namespace(namespace=database_name) + catalog.create_table(identifier=identifier, schema=table_schema_nested) + + to_database_name = f"{database_name}new" + to_table_name = f"{table_name}new" + to_identifier = (to_database_name, to_table_name) + catalog.create_namespace(namespace=to_database_name) + catalog.rename_table(from_identifier=identifier, to_identifier=to_identifier) + + assert not catalog.table_exists(identifier=identifier) + assert catalog.table_exists(identifier=to_identifier) + + +def test_list_tables(catalog: S3TablesCatalog, database_name: str, table_name: str, table_schema_nested: Schema) -> None: + identifier = (database_name, table_name) + + catalog.create_namespace(namespace=database_name) + assert not catalog.list_tables(namespace=database_name) + catalog.create_table(identifier=identifier, schema=table_schema_nested) + assert catalog.list_tables(namespace=database_name) + + +def test_list_tables_for_invalid_namespace_raises_exception(catalog: S3TablesCatalog, database_name: str) -> None: + with pytest.raises(NoSuchNamespaceError): + catalog.list_tables(namespace=database_name) + + +def test_delete_table_raises_not_implemented( + catalog: S3TablesCatalog, database_name: str, table_name: str, table_schema_nested: Schema +) -> None: + identifier = (database_name, table_name) + + catalog.create_namespace(namespace=database_name) + catalog.create_table(identifier=identifier, schema=table_schema_nested) + + with pytest.raises(NotImplementedError) as exc: + catalog.drop_table(identifier=identifier) + exc.match("S3 Tables does not support the delete_table operation. You can retry with the purge_table operation.") + + +def test_purge_table(catalog: S3TablesCatalog, database_name: str, table_name: str, table_schema_nested: Schema) -> None: + identifier = (database_name, table_name) + + catalog.create_namespace(namespace=database_name) + catalog.create_table(identifier=identifier, schema=table_schema_nested) + + catalog.purge_table(identifier=identifier) + + with pytest.raises(NoSuchTableError): + catalog.load_table(identifier=identifier) + + +def test_commit_new_column_to_table( + catalog: S3TablesCatalog, database_name: str, table_name: str, table_schema_nested: Schema +) -> None: + identifier = (database_name, table_name) + + catalog.create_namespace(namespace=database_name) + table = catalog.create_table(identifier=identifier, schema=table_schema_nested) + + last_updated_ms = table.metadata.last_updated_ms + original_table_metadata_location = table.metadata_location + original_table_last_updated_ms = table.metadata.last_updated_ms + + transaction = table.transaction() + update = transaction.update_schema() + update.add_column(path="b", field_type=IntegerType()) + update.commit() + transaction.commit_transaction() + + updated_table_metadata = table.metadata + assert updated_table_metadata.current_schema_id == 1 + assert len(updated_table_metadata.schemas) == 2 + assert updated_table_metadata.last_updated_ms > last_updated_ms + assert updated_table_metadata.metadata_log[0].metadata_file == original_table_metadata_location + assert updated_table_metadata.metadata_log[0].timestamp_ms == original_table_last_updated_ms + assert table.schema().columns[-1].name == "b" + + +def test_write_pyarrow_table(catalog: S3TablesCatalog, database_name: str, table_name: str) -> None: + identifier = (database_name, table_name) + catalog.create_namespace(namespace=database_name) + + import pyarrow as pa + + pyarrow_table = pa.Table.from_arrays( + [ + pa.array([None, "A", "B", "C"]), # 'foo' column + pa.array([1, 2, 3, 4]), # 'bar' column + pa.array([True, None, False, True]), # 'baz' column + pa.array([None, "A", "B", "C"]), # 'large' column + ], + schema=pa.schema( + [ + pa.field("foo", pa.large_string(), nullable=True), + pa.field("bar", pa.int32(), nullable=False), + pa.field("baz", pa.bool_(), nullable=True), + pa.field("large", pa.large_string(), nullable=True), + ] + ), + ) + table = catalog.create_table(identifier=identifier, schema=pyarrow_table.schema) + table.append(pyarrow_table) + + assert table.scan().to_arrow().num_rows == pyarrow_table.num_rows + + +def test_commit_new_data_to_table(catalog: S3TablesCatalog, database_name: str, table_name: str) -> None: + identifier = (database_name, table_name) + catalog.create_namespace(namespace=database_name) + + import pyarrow as pa + + pyarrow_table = pa.Table.from_arrays( + [ + pa.array([None, "A", "B", "C"]), # 'foo' column + pa.array([1, 2, 3, 4]), # 'bar' column + pa.array([True, None, False, True]), # 'baz' column + pa.array([None, "A", "B", "C"]), # 'large' column + ], + schema=pa.schema( + [ + pa.field("foo", pa.large_string(), nullable=True), + pa.field("bar", pa.int32(), nullable=False), + pa.field("baz", pa.bool_(), nullable=True), + pa.field("large", pa.large_string(), nullable=True), + ] + ), + ) + + table = catalog.create_table(identifier=identifier, schema=pyarrow_table.schema) + table.append(pyarrow_table) + + row_count = table.scan().to_arrow().num_rows + assert row_count + last_updated_ms = table.metadata.last_updated_ms + original_table_metadata_location = table.metadata_location + original_table_last_updated_ms = table.metadata.last_updated_ms + + transaction = table.transaction() + transaction.append(table.scan().to_arrow()) + transaction.commit_transaction() + + updated_table_metadata = table.metadata + assert updated_table_metadata.last_updated_ms > last_updated_ms + assert updated_table_metadata.metadata_log[-1].metadata_file == original_table_metadata_location + assert updated_table_metadata.metadata_log[-1].timestamp_ms == original_table_last_updated_ms + assert table.scan().to_arrow().num_rows == 2 * row_count + + +@pytest.mark.xfail(raises=NotImplementedError) +def test_create_table_transaction( + catalog: S3TablesCatalog, database_name: str, table_name: str, table_schema_nested: str +) -> None: + identifier = (database_name, table_name) + catalog.create_namespace(namespace=database_name) + + with catalog.create_table_transaction( + identifier, + table_schema_nested, + partition_spec=PartitionSpec(PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="foo")), + ) as txn: + last_updated_metadata = txn.table_metadata.last_updated_ms + with txn.update_schema() as update_schema: + update_schema.add_column(path="b", field_type=IntegerType()) + + with txn.update_spec() as update_spec: + update_spec.add_identity("bar") + + txn.set_properties(test_a="test_aa", test_b="test_b", test_c="test_c") + + table = catalog.load_table(identifier) + + assert table.schema().find_field("b").field_type == IntegerType() + assert table.properties == {"test_a": "test_aa", "test_b": "test_b", "test_c": "test_c"} + assert table.spec().last_assigned_field_id == 1001 + assert table.spec().fields_by_source_id(1)[0].name == "foo" + assert table.spec().fields_by_source_id(1)[0].field_id == 1000 + assert table.spec().fields_by_source_id(1)[0].transform == IdentityTransform() + assert table.spec().fields_by_source_id(2)[0].name == "bar" + assert table.spec().fields_by_source_id(2)[0].field_id == 1001 + assert table.spec().fields_by_source_id(2)[0].transform == IdentityTransform() + assert table.metadata.last_updated_ms > last_updated_metadata