Skip to content

Commit

Permalink
DVX-624: Adds ability to search for source-synced tags by assigned …
Browse files Browse the repository at this point in the history
…`value`

- Added initial support for `Span, SpanNear, SpanWithin` and `SpanTerm` queries for textual fields.
  • Loading branch information
Aryamanz29 committed Sep 27, 2024
1 parent d08b711 commit e6be616
Show file tree
Hide file tree
Showing 3 changed files with 206 additions and 3 deletions.
106 changes: 104 additions & 2 deletions pyatlan/model/fluent_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,29 @@

import copy
import dataclasses
import logging
from typing import Dict, List, Optional, TypeVar, Union

from pyatlan.cache.atlan_tag_cache import AtlanTagCache
from pyatlan.client.asset import IndexSearchResults
from pyatlan.errors import ErrorCode
from pyatlan.model.aggregation import Aggregation
from pyatlan.model.assets import Referenceable
from pyatlan.model.assets import Referenceable, Tag
from pyatlan.model.enums import EntityStatus
from pyatlan.model.fields.atlan_fields import AtlanField
from pyatlan.model.search import DSL, Bool, IndexSearchRequest, Query, SortItem, Term
from pyatlan.model.search import (
DSL,
Bool,
IndexSearchRequest,
Query,
SortItem,
SpanNear,
SpanTerm,
SpanWithin,
Term,
)

LOGGER = logging.getLogger(__name__)

SelfQuery = TypeVar("SelfQuery", bound="CompoundQuery")

Expand Down Expand Up @@ -128,6 +142,94 @@ def tagged(
_min_somes=1,
).to_query()

@staticmethod
def tagged_with_value(
atlan_tag_name: str, value: str, directly: bool = False
) -> Query:
"""
Returns a query that will match assets that have a
specific value for the specified tag (for source-synced tags).
:param atlan_tag_name: human-readable name of the Atlan tag
:param value: tag should have to match the query
:param directly: when `True`, the asset must have the tag and
value directly assigned (otherwise even propagated tags with the value will suffice)
:raises: AtlanError on any error communicating
with the API to refresh the Atlan tag cache
:returns: a query that will only match assets that have
a particular value assigned for the given Atlan tag
"""
big_spans = []
little_spans = []
tag_id = AtlanTagCache.get_id_for_name(atlan_tag_name) or ""
client = AtlanClient.get_default_client()
synced_tags = [
tag
for tag in (
FluentSearch()
.select()
.where(Tag.MAPPED_CLASSIFICATION_NAME.eq(tag_id))
.execute(client=client)
)
]
if len(synced_tags) > 1:
synced_tag_qn = synced_tags[0].qualified_name or ""
LOGGER.warning(
(
"Multiple mapped source-synced tags "
"found for tag %s -- using only the first: %s",
),
atlan_tag_name,
synced_tag_qn,
)
elif synced_tags:
synced_tag_qn = synced_tags[0].qualified_name or ""
else:
synced_tag_qn = "NON_EXISTENT"

# Contruct little spans
little_spans.append(
SpanTerm(field="__classificationsText.text", value="tagAttachmentValue")
)
for token in value.split(" "):
little_spans.append(
SpanTerm(field="__classificationsText.text", value=token)
)
little_spans.append(
SpanTerm(field="__classificationsText.text", value="tagAttachmentKey")
)

# Contruct big spans
big_spans.append(SpanTerm(field="__classificationsText.text", value=tag_id))
big_spans.append(
SpanTerm(field="__classificationsText.text", value=synced_tag_qn)
)

# Contruct final span query
span = SpanWithin(
little=SpanNear(clauses=little_spans, slop=0, in_order=True),
big=SpanNear(clauses=big_spans, slop=10000000, in_order=True),
)

# Without atlan tag propagation
if directly:
return (
FluentSearch()
.where(Referenceable.ATLAN_TAGS.eq(tag_id))
.where(span)
.to_query()
)
# With atlan tag propagation
return (
FluentSearch()
.where_some(Referenceable.ATLAN_TAGS.eq(tag_id))
.where_some(Referenceable.PROPAGATED_ATLAN_TAGS.eq(tag_id))
.min_somes(1)
.where(span)
.to_query()
)

@staticmethod
def assigned_term(qualified_names: Optional[List[str]] = None) -> Query:
"""
Expand Down
67 changes: 66 additions & 1 deletion pyatlan/model/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from datetime import datetime
from enum import Enum
from itertools import chain
from typing import Any, Dict, List, Literal, Optional, Union
from typing import Any, Dict, List, Literal, Optional, Sequence, Union

from pydantic.v1 import (
ConfigDict,
Expand Down Expand Up @@ -509,6 +509,71 @@ def to_dict(self):
return {self.type_name: terms}


@dataclass
class SpanWithin(Query):
little: Optional[Query] = None
big: Optional[Query] = None
boost: Optional[float] = None
type_name: Literal["span_within"] = "span_within"

def to_dict(self):
span_within = {}
if self.little is not None:
span_within["little"] = self.little
if self.big is not None:
span_within["big"] = self.big
if self.boost is not None:
span_within["boost"] = self.boost
return {self.type_name: span_within}


@dataclass
class SpanTerm(Query):
field: str
value: SearchFieldType
boost: Optional[float] = None
type_name: Literal["span_term"] = "span_term"

def to_dict(self):
span_term = {self.field: self.value}
if self.boost is not None:
span_term["boost"] = self.boost
return {self.type_name: span_term}


@dataclass
class SpanNear(Query):
clauses: Optional[Sequence[Query]] = None
in_order: Optional[bool] = None
slop: Optional[int] = None
type_name: Literal["span_near"] = "span_near"

def to_dict(self):
span_near = {}
if self.clauses is not None:
span_near["clauses"] = self.clauses
if self.in_order is not None:
span_near["in_order"] = self.in_order
if self.slop is not None:
span_near["slop"] = self.slop
return {self.type_name: span_near}


@dataclass
class Span(Query):
span_within: Optional[Query] = None
span_near: Optional[Query] = None
type_name: Literal["span"] = "span"

def to_dict(self):
span = {}
if self.span_within is not None:
span["span_within"] = self.span_within
if self.span_near is not None:
span["span_near"] = self.span_near
return {self.type_name: span}


@dataclass(config=ConfigDict(smart_union=True, extra="forbid")) # type: ignore
class Bool(Query):
must: List[Query] = Field(default_factory=list)
Expand Down
36 changes: 36 additions & 0 deletions tests/integration/test_index_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
ASSET_GUID = Asset.GUID.keyword_field_name
NOW_AS_TIMESTAMP = int(time() * 1000)
NOW_AS_YYYY_MM_DD = datetime.today().strftime("%Y-%m-%d")
EXISTING_SOURCE_SYNCED_TAG = "Confidential"

VALUES_FOR_TERM_QUERIES = {
"with_categories": "VBsYc9dUoEcAtDxZmjby6@mweSfpXBwfYWedQTvA3Gi",
Expand Down Expand Up @@ -133,6 +134,41 @@ def test_search(client: AtlanClient, asset_tracker, cls):
asset_tracker.missing_types.add(name)


def test_search_source_synced_assets(client: AtlanClient):
tables = [
table
for table in (
FluentSearch()
.select()
.where(Asset.TYPE_NAME.eq("Table"))
.where(
CompoundQuery.tagged_with_value(
EXISTING_SOURCE_SYNCED_TAG, "Highly Restricted"
)
)
.execute(client=client)
)
]
assert tables and len(tables) > 0
for table in tables:
assert isinstance(table, Table)
tags = table.atlan_tags
assert tags and len(tags) > 0
synced_tags = [
tag for tag in tags if str(tag.type_name) == EXISTING_SOURCE_SYNCED_TAG
]
assert synced_tags and len(synced_tags) > 0
for st in synced_tags:
attachments = st.source_tag_attachements
assert attachments and len(attachments) > 0
for sta in attachments:
values = sta.source_tag_value
assert values and len(values) > 0
for value in values:
attached_value = value.tag_attachment_value
assert attached_value and attached_value == "Highly Restricted"


def test_search_next_page(client: AtlanClient):
size = 15
dsl = DSL(
Expand Down

0 comments on commit e6be616

Please sign in to comment.