Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DVX-709: Implement caching for Connection and SourceTag #407

Merged
merged 4 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 176 additions & 0 deletions pyatlan/cache/abstract_asset_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
# SPDX-License-Identifier: Apache-2.0
# Copyright 2024 Atlan Pte. Ltd.
from __future__ import annotations

import threading
from abc import ABC, abstractmethod
from typing import Any

from pyatlan.errors import ErrorCode
from pyatlan.model.assets import Asset
from pyatlan.model.enums import AtlanConnectorType


class AbstractAssetCache(ABC):
"""
Base class for reusable components that are common
to all caches, where a cache is populated entry-by-entry.
"""

def __init__(self, client):
self.client = client
self.lock = threading.Lock()
self.name_to_guid = dict()
self.guid_to_asset = dict()
self.qualified_name_to_guid = dict()

@classmethod
@abstractmethod
def get_cache(cls):
"""Abstract method to retreive cache."""

@abstractmethod
def lookup_by_guid(self, guid: str):
"""Abstract method to lookup asset by guid."""

@abstractmethod
def lookup_by_qualified_name(self, qualified_name: str):
"""Abstract method to lookup asset by qualified name."""

@abstractmethod
def lookup_by_name(self, name: Any):
"""Abstract method to lookup asset by name."""

@abstractmethod
def get_name(self, asset: Asset):
"""Abstract method to get name from asset."""

def is_guid_known(self, guid: str) -> bool:
"""
Checks whether the provided Atlan-internal UUID is known.
NOTE: will not refresh the cache itself to determine this.

:param guid: Atlan-internal UUID of the object
:returns: `True` if the object is known, `False` otherwise
"""
return guid in self.guid_to_asset

def is_qualified_name_known(self, qualified_name: str):
"""
Checks whether the provided Atlan-internal ID string is known.
NOTE: will not refresh the cache itself to determine this.

:param qualified_name: Atlan-internal ID string of the object
:returns: `True` if the object is known, `False` otherwise
"""
return qualified_name in self.qualified_name_to_guid

def is_name_known(self, name: str):
"""
Checks whether the provided Atlan-internal ID string is known.
NOTE: will not refresh the cache itself to determine this.

:param name: human-constructable name of the object
:returns: `True` if the object is known, `False` otherwise
"""
return name in self.name_to_guid

def cache(self, asset: Asset):
"""
Add an entry to the cache.

:param asset: to be cached
"""
name = asset and self.get_name(asset)
if not all([name, asset.guid, asset.qualified_name]):
return
self.name_to_guid[name] = asset.guid
self.guid_to_asset[asset.guid] = asset
self.qualified_name_to_guid[asset.qualified_name] = asset.guid

def _get_by_guid(self, guid: str, allow_refresh: bool = True):
"""
Retrieve an asset from the cache by its UUID.
If the asset is not found, it will be looked up and added to the cache.

:param guid: UUID of the asset in Atlan
:returns: the asset (if found)
:raises AtlanError: on any API communication problem if the cache needs to be refreshed
:raises NotFoundError: if the asset cannot be found (does not exist) in Atlan
:raises InvalidRequestError: if no UUID was provided for the asset to retrieve
"""
if not guid:
raise ErrorCode.MISSING_ID.exception_with_parameters()
asset = self.guid_to_asset.get(guid)
if not asset and allow_refresh:
self.lookup_by_guid(guid)
asset = self.guid_to_asset.get(guid)
if not asset:
raise ErrorCode.ASSET_NOT_FOUND_BY_GUID.exception_with_parameters(guid)
return asset

def _get_by_qualified_name(self, qualified_name: str, allow_refresh: bool = True):
"""
Retrieve an asset from the cache by its unique Atlan-internal name.

:param qualified_name: unique Atlan-internal name of the asset
:param allow_refresh: whether to allow a refresh of the cache (`True`) or not (`False`)
:returns: the asset (if found)
:raises AtlanError: on any API communication problem if the cache needs to be refreshed
:raises NotFoundError: if the object cannot be found (does not exist) in Atlan
:raises InvalidRequestError: if no qualified_name was provided for the object to retrieve
"""
if not qualified_name:
raise ErrorCode.MISSING_ID.exception_with_parameters()
guid = self.qualified_name_to_guid.get(qualified_name)
if not guid and allow_refresh:
self.lookup_by_qualified_name(qualified_name)
guid = self.qualified_name_to_guid.get(qualified_name)
if not guid:
raise ErrorCode.ASSET_NOT_FOUND_BY_QN.exception_with_parameters(
qualified_name,
AtlanConnectorType._get_connector_type_from_qualified_name(
qualified_name
),
)
return self._get_by_guid(guid=guid, allow_refresh=False)

def _get_by_name(self, name: AbstractAssetName, allow_refresh: bool = True):
"""
Retrieve an asset from the cache by its uniquely identifiable name.

:param name: uniquely identifiable name of the asset in Atlan
:param allow_refresh: whether to allow a refresh of the cache (`True`) or not (`False`)
:returns: the asset (if found)
:raises AtlanError: on any API communication problem if the cache needs to be refreshed
:raises NotFoundError: if the object cannot be found (does not exist) in Atlan
:raises InvalidRequestError: if no name was provided for the object to retrieve
"""
if not isinstance(name, AbstractAssetName):
raise ErrorCode.MISSING_NAME.exception_with_parameters()
guid = self.name_to_guid.get(str(name))
if not guid and allow_refresh:
self.lookup_by_name(name)
guid = self.name_to_guid.get(str(name))
if not guid:
raise ErrorCode.ASSET_NOT_FOUND_BY_NAME.exception_with_parameters(
name._TYPE_NAME, name
)
return self._get_by_guid(guid=guid, allow_refresh=False)


class AbstractAssetName(ABC):
"""
Base class for reusable components common to all asset names
used by the cache's find methods, such as AssetCache.get_by_name().
"""

_TYPE_NAME = str()

@abstractmethod
def __init__(self):
pass

@abstractmethod
def __str__(self):
pass
204 changes: 204 additions & 0 deletions pyatlan/cache/connection_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
# SPDX-License-Identifier: Apache-2.0
# Copyright 2024 Atlan Pte. Ltd.
from __future__ import annotations

import logging
import threading
from typing import Dict, Optional, Union

from pyatlan.cache.abstract_asset_cache import AbstractAssetCache, AbstractAssetName
from pyatlan.client.atlan import AtlanClient
from pyatlan.model.assets import Asset, Connection
from pyatlan.model.enums import AtlanConnectorType
from pyatlan.model.fluent_search import FluentSearch
from pyatlan.model.search import Term

LOGGER = logging.getLogger(__name__)

lock = threading.Lock()


class ConnectionCache(AbstractAssetCache):
"""
Lazily-loaded cache for translating between
a connection's simplified name its details.

- guid = UUID of the connection
for eg: 9c677e77-e01d-40e0-85b7-8ba4cd7d0ea9
- qualified_name = Atlan-internal name of the connection (with epoch)
for eg: default/snowflake/1234567890
- name = simple name of the form {{connectorType}}/{{connectorName}},
for eg: snowflake/development
"""

_SEARCH_FIELDS = [
Connection.NAME,
Connection.STATUS,
Connection.CONNECTOR_NAME,
]
SEARCH_ATTRIBUTES = [field.atlan_field_name for field in _SEARCH_FIELDS]
caches: Dict[int, ConnectionCache] = dict()

def __init__(self, client: AtlanClient):
super().__init__(client)

@classmethod
def get_cache(cls) -> ConnectionCache:
from pyatlan.client.atlan import AtlanClient

with lock:
default_client = AtlanClient.get_default_client()
cache_key = default_client.cache_key
if cache_key not in cls.caches:
cls.caches[cache_key] = ConnectionCache(client=default_client)
return cls.caches[cache_key]

@classmethod
def get_by_guid(cls, guid: str, allow_refresh: bool = True) -> Connection:
"""
Retrieve a connection from the cache by its UUID.
If the asset is not found, it will be looked up and added to the cache.

:param guid: UUID of the connection in Atlan
for eg: 9c677e77-e01d-40e0-85b7-8ba4cd7d0ea9
:returns: connection (if found)
:raises AtlanError: on any API communication problem if the cache needs to be refreshed
:raises NotFoundError: if the connection cannot be found (does not exist) in Atlan
:raises InvalidRequestError: if no UUID was provided for the connection to retrieve
"""
return cls.get_cache()._get_by_guid(guid=guid, allow_refresh=allow_refresh)

@classmethod
def get_by_qualified_name(
cls, qualified_name: str, allow_refresh: bool = True
) -> Connection:
"""
Retrieve a connection from the cache by its unique Atlan-internal name.

:param qualified_name: unique Atlan-internal name of the connection
for eg: default/snowflake/1234567890
:param allow_refresh: whether to allow a refresh of the cache (`True`) or not (`False`)
:param qualified_name: unique Atlan-internal name of the connection
:returns: connection (if found)
:raises AtlanError: on any API communication problem if the cache needs to be refreshed
:raises NotFoundError: if the connection cannot be found (does not exist) in Atlan
:raises InvalidRequestError: if no qualified_name was provided for the connection to retrieve
"""
return cls.get_cache()._get_by_qualified_name(
qualified_name=qualified_name, allow_refresh=allow_refresh
)

@classmethod
def get_by_name(
cls, name: ConnectionName, allow_refresh: bool = True
) -> Connection:
"""
Retrieve an connection from the cache by its uniquely identifiable name.

:param name: uniquely identifiable name of the connection in Atlan
In the form of {{connectorType}}/{{connectorName}}
for eg: snowflake/development
:param allow_refresh: whether to allow a refresh of the cache (`True`) or not (`False`)
:returns: connection (if found)
:raises AtlanError: on any API communication problem if the cache needs to be refreshed
:raises NotFoundError: if the connection cannot be found (does not exist) in Atlan
:raises InvalidRequestError: if no name was provided for the connection to retrieve
"""
return cls.get_cache()._get_by_name(name=name, allow_refresh=allow_refresh)

def lookup_by_guid(self, guid: str) -> None:
if not guid:
return
with self.lock:
response = (
FluentSearch(_includes_on_results=self.SEARCH_ATTRIBUTES)
.where(Term.with_state("ACTIVE"))
.where(Term.with_super_type_names("Asset"))
.where(Connection.GUID.eq(guid))
.execute(self.client)
)
candidate = (response.current_page() and response.current_page()[0]) or None
if candidate and isinstance(candidate, Connection):
self.cache(candidate)

def lookup_by_qualified_name(self, connection_qn: str) -> None:
if not connection_qn:
return
with self.lock:
response = (
FluentSearch(_includes_on_results=self.SEARCH_ATTRIBUTES)
.where(Term.with_state("ACTIVE"))
.where(Term.with_super_type_names("Asset"))
.where(Connection.QUALIFIED_NAME.eq(connection_qn))
.execute(self.client)
)
candidate = (response.current_page() and response.current_page()[0]) or None
if candidate and isinstance(candidate, Connection):
self.cache(candidate)

def lookup_by_name(self, name: ConnectionName) -> None:
if not isinstance(name, ConnectionName):
return
results = self.client.asset.find_connections_by_name(
name=name.name,
connector_type=name.type,
attributes=self.SEARCH_ATTRIBUTES,
)
if not results:
return
if len(results) > 1:
LOGGER.warning(
(
"Found multiple connections of the same type "
"with the same name, caching only the first: %s"
),
name,
)
self.cache(results[0])

def get_name(self, asset: Asset):
if not isinstance(asset, Connection):
return
return str(ConnectionName(asset))


class ConnectionName(AbstractAssetName):
"""
Unique identity for a connection,
in the form: {{type}}/{{name}}

- For eg: snowflake/development
"""

_TYPE_NAME = "Connection"

def __init__(
self,
connection: Union[
str,
Optional[Connection],
] = None,
):
self.name = None
self.type = None

if isinstance(connection, Connection):
self.name = connection.name
self.type = connection.connector_name

elif isinstance(connection, str):
tokens = connection.split("/")
if len(tokens) > 1:
self.type = AtlanConnectorType(tokens[0]) # type: ignore[call-arg]
self.name = connection[len(tokens[0]) + 1 :] # noqa

def __hash__(self):
return hash((self.name, self.type))

def __str__(self):
return f"{self.type}/{self.name}"

def __eq__(self, other):
if isinstance(other, ConnectionName):
return self.name == other.name and self.type == other.type
return False
Loading
Loading