Skip to content

Commit

Permalink
Add view_exists method to REST Catalog (#1242)
Browse files Browse the repository at this point in the history
Part of the adding view support to the REST catalog: #818 

Todo:
- [x] Add tests
- [x] Add docs

Please let me know what the appropriate place to add docs would be

---------

Co-authored-by: Shiv Gupta <[email protected]>
  • Loading branch information
shiv-io and shivonchain authored Feb 1, 2025
1 parent 4fbcd6e commit 29c9a96
Show file tree
Hide file tree
Showing 11 changed files with 121 additions and 0 deletions.
13 changes: 13 additions & 0 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -1258,6 +1258,19 @@ with table.manage_snapshots() as ms:
ms.create_branch(snapshot_id1, "Branch_A").create_tag(snapshot_id2, "tag789")
```

## Views

PyIceberg supports view operations.

### Check if a view exists

```python
from pyiceberg.catalog import load_catalog

catalog = load_catalog("default")
catalog.view_exists("default.bar")
```

## Table Statistics Management

Manage table statistics with operations through the `Table` API:
Expand Down
11 changes: 11 additions & 0 deletions pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,17 @@ def table_exists(self, identifier: Union[str, Identifier]) -> bool:
bool: True if the table exists, False otherwise.
"""

@abstractmethod
def view_exists(self, identifier: Union[str, Identifier]) -> bool:
"""Check if a view exists.
Args:
identifier (str | Identifier): View identifier.
Returns:
bool: True if the view exists, False otherwise.
"""

@abstractmethod
def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table:
"""Register a new table using existing metadata.
Expand Down
3 changes: 3 additions & 0 deletions pyiceberg/catalog/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,9 @@ def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]:
def drop_view(self, identifier: Union[str, Identifier]) -> None:
raise NotImplementedError

def view_exists(self, identifier: Union[str, Identifier]) -> bool:
raise NotImplementedError

def _get_iceberg_table_item(self, database_name: str, table_name: str) -> Dict[str, Any]:
try:
return self._get_dynamo_item(identifier=f"{database_name}.{table_name}", namespace=database_name)
Expand Down
3 changes: 3 additions & 0 deletions pyiceberg/catalog/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,9 @@ def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]:
def drop_view(self, identifier: Union[str, Identifier]) -> None:
raise NotImplementedError

def view_exists(self, identifier: Union[str, Identifier]) -> bool:
raise NotImplementedError

@staticmethod
def __is_iceberg_table(table: TableTypeDef) -> bool:
return table.get("Parameters", {}).get(TABLE_TYPE, "").lower() == ICEBERG
3 changes: 3 additions & 0 deletions pyiceberg/catalog/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,9 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location:
def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]:
raise NotImplementedError

def view_exists(self, identifier: Union[str, Identifier]) -> bool:
raise NotImplementedError

def _create_lock_request(self, database_name: str, table_name: str) -> LockRequest:
lock_component: LockComponent = LockComponent(
level=LockLevel.TABLE, type=LockType.EXCLUSIVE, dbname=database_name, tablename=table_name, isTransactional=True
Expand Down
3 changes: 3 additions & 0 deletions pyiceberg/catalog/noop.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,5 +123,8 @@ def update_namespace_properties(
def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]:
raise NotImplementedError

def view_exists(self, identifier: Union[str, Identifier]) -> bool:
raise NotImplementedError

def drop_view(self, identifier: Union[str, Identifier]) -> None:
raise NotImplementedError
26 changes: 26 additions & 0 deletions pyiceberg/catalog/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ class Endpoints:
rename_table: str = "tables/rename"
list_views: str = "namespaces/{namespace}/views"
drop_view: str = "namespaces/{namespace}/views/{view}"
view_exists: str = "namespaces/{namespace}/views/{view}"


class IdentifierKind(Enum):
Expand Down Expand Up @@ -906,6 +907,31 @@ def table_exists(self, identifier: Union[str, Identifier]) -> bool:

return False

@retry(**_RETRY_ARGS)
def view_exists(self, identifier: Union[str, Identifier]) -> bool:
"""Check if a view exists.
Args:
identifier (str | Identifier): View identifier.
Returns:
bool: True if the view exists, False otherwise.
"""
response = self._session.head(
self.url(Endpoints.view_exists, prefixed=True, **self._split_identifier_for_path(identifier, IdentifierKind.VIEW)),
)
if response.status_code == 404:
return False
elif response.status_code in [200, 204]:
return True

try:
response.raise_for_status()
except HTTPError as exc:
self._handle_non_200_response(exc, {})

return False

@retry(**_RETRY_ARGS)
def drop_view(self, identifier: Union[str]) -> None:
response = self._session.delete(
Expand Down
3 changes: 3 additions & 0 deletions pyiceberg/catalog/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -702,5 +702,8 @@ def update_namespace_properties(
def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]:
raise NotImplementedError

def view_exists(self, identifier: Union[str, Identifier]) -> bool:
raise NotImplementedError

def drop_view(self, identifier: Union[str, Identifier]) -> None:
raise NotImplementedError
3 changes: 3 additions & 0 deletions tests/catalog/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,9 @@ def list_views(self, namespace: Optional[Union[str, Identifier]] = None) -> List
def drop_view(self, identifier: Union[str, Identifier]) -> None:
raise NotImplementedError

def view_exists(self, identifier: Union[str, Identifier]) -> bool:
raise NotImplementedError


@pytest.fixture
def catalog(tmp_path: PosixPath) -> InMemoryCatalog:
Expand Down
36 changes: 36 additions & 0 deletions tests/catalog/test_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,42 @@ def test_list_views_404(rest_mock: Mocker) -> None:
assert "Namespace does not exist" in str(e.value)


def test_view_exists_204(rest_mock: Mocker) -> None:
namespace = "examples"
view = "some_view"
rest_mock.head(
f"{TEST_URI}v1/namespaces/{namespace}/views/{view}",
status_code=204,
request_headers=TEST_HEADERS,
)
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
assert catalog.view_exists((namespace, view))


def test_view_exists_404(rest_mock: Mocker) -> None:
namespace = "examples"
view = "some_view"
rest_mock.head(
f"{TEST_URI}v1/namespaces/{namespace}/views/{view}",
status_code=404,
request_headers=TEST_HEADERS,
)
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
assert not catalog.view_exists((namespace, view))


def test_view_exists_multilevel_namespace_404(rest_mock: Mocker) -> None:
multilevel_namespace = "core.examples.some_namespace"
view = "some_view"
rest_mock.head(
f"{TEST_URI}v1/namespaces/{multilevel_namespace}/views/{view}",
status_code=404,
request_headers=TEST_HEADERS,
)
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
assert not catalog.view_exists((multilevel_namespace, view))


def test_list_namespaces_200(rest_mock: Mocker) -> None:
rest_mock.get(
f"{TEST_URI}v1/namespaces",
Expand Down
17 changes: 17 additions & 0 deletions tests/integration/test_writes/test_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1425,6 +1425,23 @@ def test_table_v1_with_null_nested_namespace(session_catalog: Catalog, arrow_tab
session_catalog.drop_table(identifier)


@pytest.mark.integration
def test_view_exists(
spark: SparkSession,
session_catalog: Catalog,
) -> None:
identifier = "default.some_view"
spark.sql(
f"""
CREATE VIEW {identifier}
AS
(SELECT 1 as some_col)
"""
).collect()
assert session_catalog.view_exists(identifier)
session_catalog.drop_view(identifier) # clean up


@pytest.mark.integration
def test_overwrite_all_data_with_filter(session_catalog: Catalog) -> None:
schema = Schema(
Expand Down

0 comments on commit 29c9a96

Please sign in to comment.