Skip to content

Commit

Permalink
Merge pull request #408 from atlanhq/DVX-648
Browse files Browse the repository at this point in the history
DVX-648: Added support for assigning a `SourceTag`  with a value to asset
  • Loading branch information
Aryamanz29 authored Oct 21, 2024
2 parents 0325f2f + c7a53fc commit 5b805d2
Show file tree
Hide file tree
Showing 4 changed files with 271 additions and 22 deletions.
38 changes: 38 additions & 0 deletions pyatlan/model/core.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# SPDX-License-Identifier: Apache-2.0
# Copyright 2022 Atlan Pte. Ltd.
from __future__ import annotations

import json
from abc import ABC
from typing import TYPE_CHECKING
Expand Down Expand Up @@ -251,8 +253,44 @@ def __init__(self, *args, **kwargs):
self._source_tag_attachements = [
SourceTagAttachment(**source_tag["attributes"])
for source_tag in self.attributes[attr_id]
if isinstance(source_tag, dict) and source_tag.get("attributes")
]

@classmethod
def of(
cls,
atlan_tag_name: AtlanTagName,
entity_guid: Optional[str] = None,
source_tag_attachment: Optional[SourceTagAttachment] = None,
) -> AtlanTag:
from pyatlan.cache.atlan_tag_cache import AtlanTagCache

"""
Construct an Atlan tag assignment for a specific entity.
:param atlan_tag_name: human-readable name of the Atlan tag
:param entity_guid: unique identifier (GUID) of the entity to which the Atlan tag is to be assigned
:param source_tag_attachment: (optional) source-specific details for the tag
:return: an Atlan tag assignment with default settings for propagation and a specific entity assignment
"""
tag = AtlanTag(
type_name=atlan_tag_name,
propagate=True,
remove_propagations_on_entity_delete=True,
restrict_propagation_through_lineage=False,
restrict_propagation_through_hierarchy=False,
)
if entity_guid:
tag.entity_guid = entity_guid
tag.entity_status = EntityStatus.ACTIVE
if source_tag_attachment:
source_tag_attr_id = (
AtlanTagCache.get_source_tags_attr_id(atlan_tag_name.id) or ""
)
tag.attributes = {source_tag_attr_id: [source_tag_attachment]}
tag._source_tag_attachements.append(source_tag_attachment)
return tag


class AtlanTags(AtlanObject):
__root__: List[AtlanTag] = Field(
Expand Down
135 changes: 133 additions & 2 deletions pyatlan/model/structs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,21 @@
from __future__ import annotations

from datetime import datetime
from typing import Any, Dict, List, Optional, Set, Union
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Union

from pydantic.v1 import BaseModel, Extra, Field, root_validator

from pyatlan.model.enums import (
AtlanConnectorType,
BadgeComparisonOperator,
BadgeConditionColor,
SourceCostUnitType,
)
from pyatlan.model.utils import to_camel_case
from pyatlan.utils import validate_required_fields
from pyatlan.utils import select_optional_set_fields, validate_required_fields

if TYPE_CHECKING:
from pyatlan.cache.source_tag_cache import SourceTagName


class AtlanObject(BaseModel):
Expand Down Expand Up @@ -161,6 +165,133 @@ class SourceTagAttachment(AtlanObject):
source_tag_sync_timestamp: Optional[datetime] = Field(default=None, description="")
source_tag_sync_error: Optional[str] = Field(default=None, description="")

@classmethod
def by_name(
cls,
name: SourceTagName,
source_tag_values: List[SourceTagAttachmentValue],
source_tag_sync_timestamp: Optional[datetime] = None,
is_source_tag_synced: Optional[bool] = None,
source_tag_sync_error: Optional[str] = None,
):
from pyatlan.cache.source_tag_cache import SourceTagCache

"""
Create a source-synced tag attachment with
a particular value when the attachment is synced to the source.
:param client: connectivity to an Atlan tenant
:param name: unique name of the source tag in Atlan
:param source_tag_values: value of the tag attachment from the source
:param is_source_tag_synced: whether the tag attachment has been synced at the source (True) or not (False)
:param source_tag_sync_timestamp: time (epoch) when the tag attachment was synced at the source, in milliseconds
:param source_tag_sync_error: error message if the tag attachment sync at the source failed
:returns: a SourceTagAttachment with the provided information
:raises AtlanError: on any error communicating via the underlying APIs
:raises NotFoundError: if the source-synced tag cannot be resolved
"""
tag = SourceTagCache.get_by_name(name)
tag_connector_name = AtlanConnectorType._get_connector_type_from_qualified_name(
tag.qualified_name or ""
)
return cls.of(
source_tag_name=tag.name,
source_tag_qualified_name=tag.qualified_name,
source_tag_guid=tag.guid,
source_tag_connector_name=tag_connector_name,
source_tag_values=source_tag_values,
**select_optional_set_fields(
dict(
is_source_tag_synced=is_source_tag_synced,
source_tag_sync_timestamp=source_tag_sync_timestamp,
source_tag_sync_error=source_tag_sync_error,
)
),
)

@classmethod
def by_qualified_name(
cls,
source_tag_qualified_name: str,
source_tag_values: List[SourceTagAttachmentValue],
source_tag_sync_timestamp: Optional[datetime] = None,
is_source_tag_synced: Optional[bool] = None,
source_tag_sync_error: Optional[str] = None,
):
from pyatlan.cache.source_tag_cache import SourceTagCache

"""
Create a source-synced tag attachment with a particular value when the attachment is synced to the source.
:param client: connectivity to an Atlan tenant
:param source_tag_qualified_name: unique name of the source tag in Atlan
:param source_tag_values: value of the tag attachment from the source
:param is_source_tag_synced: whether the tag attachment has been synced at the source (True) or not (False)
:param source_tag_sync_timestamp: time (epoch) when the tag attachment was synced at the source, in milliseconds
:param source_tag_sync_error: error message if the tag attachment sync at the source failed
:returns: a SourceTagAttachment with the provided information
:raises AtlanError: on any error communicating via the underlying APIs
:raises NotFoundError: if the source-synced tag cannot be resolved
"""
tag = SourceTagCache.get_by_qualified_name(source_tag_qualified_name)
tag_connector_name = AtlanConnectorType._get_connector_type_from_qualified_name(
source_tag_qualified_name or ""
)
return cls.of(
source_tag_name=tag.name,
source_tag_qualified_name=source_tag_qualified_name,
source_tag_guid=tag.guid,
source_tag_connector_name=tag_connector_name,
source_tag_values=source_tag_values,
**select_optional_set_fields(
dict(
is_source_tag_synced=is_source_tag_synced,
source_tag_sync_timestamp=source_tag_sync_timestamp,
source_tag_sync_error=source_tag_sync_error,
)
),
)

@classmethod
def of(
cls,
source_tag_name: Optional[str] = None,
source_tag_qualified_name: Optional[str] = None,
source_tag_guid: Optional[str] = None,
source_tag_connector_name: Optional[str] = None,
source_tag_values: Optional[List[SourceTagAttachmentValue]] = None,
is_source_tag_synced: Optional[bool] = None,
source_tag_sync_timestamp: Optional[datetime] = None,
source_tag_sync_error: Optional[str] = None,
):
"""
Quickly create a new SourceTagAttachment.
:param source_tag_name: simple name of the source tag
:param source_tag_qualified_name: unique name of the source tag in Atlan
:param source_tag_guid: unique identifier (GUID) of the source tag in Atlan
:param source_tag_connector_name: connector that is the source of the tag
:param source_tag_values: value of the tag attachment from the source
:param is_source_tag_synced: whether the tag attachment has been synced at the source (True) or not (False)
:param source_tag_sync_timestamp: time (epoch) when the tag attachment was synced at the source, in milliseconds
:param source_tag_sync_error: error message if the tag attachment sync at the source failed
:returns: a SourceTagAttachment with the provided information
"""
return SourceTagAttachment(
**select_optional_set_fields(
dict(
source_tag_name=source_tag_name,
source_tag_qualified_name=source_tag_qualified_name,
source_tag_guid=source_tag_guid,
source_tag_connector_name=source_tag_connector_name,
source_tag_value=source_tag_values,
is_source_tag_synced=is_source_tag_synced,
source_tag_sync_timestamp=source_tag_sync_timestamp,
source_tag_sync_error=source_tag_sync_error,
)
),
)


class StarredDetails(AtlanObject):
"""Description"""
Expand Down
8 changes: 8 additions & 0 deletions pyatlan/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@ def attributes_to_params(
return query_params


def select_optional_set_fields(params: Dict[str, Any]) -> Dict:
"""
Filter the provided parameters to include
only those fields that are not set to `None`.
"""
return {key: value for key, value in params.items() if value is not None}


def non_null(obj: Optional[object], def_value: object):
return obj if obj is not None else def_value

Expand Down
112 changes: 92 additions & 20 deletions tests/integration/test_index_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@
# Copyright 2022 Atlan Pte. Ltd.
from dataclasses import dataclass, field
from datetime import datetime
from time import time
from time import sleep, time
from typing import Generator, Set
from unittest.mock import patch

import pytest

from pyatlan.cache.source_tag_cache import SourceTagName
from pyatlan.client.asset import LOGGER, IndexSearchResults
from pyatlan.client.atlan import AtlanClient
from pyatlan.model.assets import Asset, AtlasGlossaryTerm, Column, Table
from pyatlan.model.core import AtlanTag, AtlanTagName
from pyatlan.model.enums import AtlanConnectorType, CertificateStatus, SortOrder
from pyatlan.model.fields.atlan_fields import SearchableField
from pyatlan.model.fluent_search import CompoundQuery, FluentSearch
Expand All @@ -25,11 +27,13 @@
Term,
Wildcard,
)
from pyatlan.model.structs import SourceTagAttachment, SourceTagAttachmentValue

QUALIFIED_NAME = "qualifiedName"
ASSET_GUID = Asset.GUID.keyword_field_name
NOW_AS_TIMESTAMP = int(time() * 1000)
NOW_AS_YYYY_MM_DD = datetime.today().strftime("%Y-%m-%d")
EXISTING_TAG = "Issue"
EXISTING_SOURCE_SYNCED_TAG = "Confidential"

VALUES_FOR_TERM_QUERIES = {
Expand Down Expand Up @@ -134,39 +138,107 @@ def test_search(client: AtlanClient, asset_tracker, cls):
asset_tracker.missing_types.add(name)


def _assert_source_tag(tables, source_tag, source_tag_value):
assert tables and len(tables) > 0
for table in tables:
tags = table.atlan_tags
assert tags and len(tags) > 0
synced_tags = [tag for tag in tags if str(tag.type_name) == source_tag]
assert synced_tags and len(synced_tags) > 0
for st in synced_tags:
attachments = st.source_tag_attachements
assert attachments and len(attachments) > 0
for sta in attachments:
values = sta.source_tag_value
assert values and len(values) > 0
for value in values:
attached_value = value.tag_attachment_value
assert attached_value and attached_value == source_tag_value


def test_search_source_synced_assets(client: AtlanClient):
tables = [
table
for table in (
FluentSearch()
.select()
.where(Asset.TYPE_NAME.eq("Table"))
.where(CompoundQuery.asset_type(Table))
.where(
CompoundQuery.tagged_with_value(
EXISTING_SOURCE_SYNCED_TAG, "Highly Restricted"
)
)
.execute(client=client)
)
if isinstance(table, Table)
]
assert tables and len(tables) > 0
for table in tables:
assert isinstance(table, Table)
tags = table.atlan_tags
assert tags and len(tags) > 0
synced_tags = [
tag for tag in tags if str(tag.type_name) == EXISTING_SOURCE_SYNCED_TAG
]
assert synced_tags and len(synced_tags) > 0
for st in synced_tags:
attachments = st.source_tag_attachements
assert attachments and len(attachments) > 0
for sta in attachments:
values = sta.source_tag_value
assert values and len(values) > 0
for value in values:
attached_value = value.tag_attachment_value
assert attached_value and attached_value == "Highly Restricted"
_assert_source_tag(tables, EXISTING_SOURCE_SYNCED_TAG, "Highly Restricted")


def test_source_tag_assign_with_value(client: AtlanClient, table: Table):
# Make sure no tags are assigned initially
assert table.guid
table = client.asset.get_by_guid(guid=table.guid, asset_type=Table)
assert not table.atlan_tags
assert table.name and table.qualified_name

source_tag_name = SourceTagName(
"snowflake/development@@ANALYTICS/WIDE_WORLD_IMPORTERS/CONFIDENTIAL"
)
to_update = table.updater(table.qualified_name, table.name)
to_update.atlan_tags = [
AtlanTag.of(atlan_tag_name=AtlanTagName(EXISTING_TAG)),
AtlanTag.of(
atlan_tag_name=AtlanTagName(EXISTING_SOURCE_SYNCED_TAG),
source_tag_attachment=SourceTagAttachment.by_name(
name=source_tag_name,
source_tag_values=[
SourceTagAttachmentValue(tag_attachment_value="Not Restricted")
],
),
),
]
response = client.asset.save(to_update, replace_atlan_tags=True)

assert (tables := response.assets_updated(asset_type=Table)) and len(tables) == 1
assert (
tables
and len(tables) == 1
and tables[0].atlan_tags
and len(tables[0].atlan_tags) == 2
)
for tag in tables[0].atlan_tags:
assert str(tag.type_name) in (EXISTING_TAG, EXISTING_SOURCE_SYNCED_TAG)

# Make sure source tag is now attached
# to the table with the provided value
sleep(5)
tables = [
table
for table in (
FluentSearch()
.select()
.where(CompoundQuery.asset_type(Table))
.where(Table.QUALIFIED_NAME.eq(table.qualified_name))
.where(
CompoundQuery.tagged_with_value(
EXISTING_SOURCE_SYNCED_TAG, "Not Restricted"
)
)
.execute(client=client)
)
if isinstance(table, Table)
]

assert (
tables
and len(tables) == 1
and tables[0].atlan_tags
and len(tables[0].atlan_tags) == 2
)
for tag in tables[0].atlan_tags:
assert str(tag.type_name) in (EXISTING_TAG, EXISTING_SOURCE_SYNCED_TAG)
_assert_source_tag(tables, EXISTING_SOURCE_SYNCED_TAG, "Not Restricted")


def test_search_next_page(client: AtlanClient):
Expand Down

0 comments on commit 5b805d2

Please sign in to comment.