Skip to content

Commit

Permalink
Merge pull request #51 from atlanhq/get_lineage
Browse files Browse the repository at this point in the history
Get lineage
  • Loading branch information
ErnestoLoma authored May 15, 2023
2 parents e682638 + d2f81b3 commit 7587356
Show file tree
Hide file tree
Showing 10 changed files with 5,782 additions and 4 deletions.
11 changes: 11 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
## 0.0.31 (May 15, 2023)

* Added the following classes to support lineage retrieval
* LineageRelation
* DirectedPair
* LineageGraph
* LineageResponse
* Added the get_lineage method to AtlanClient
* Modify create_typdef in client to handle creating EnumDef
* Refresh caches on any SDK-driven creates or deletes

## 0.0.30 (May 11, 2023)

* Fix problem where custom metadata created via the SDK failed to show up in the UI
Expand Down
23 changes: 22 additions & 1 deletion pyatlan/client/atlan.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
GET_ALL_TYPE_DEFS,
GET_ENTITY_BY_GUID,
GET_ENTITY_BY_UNIQUE_ATTRIBUTE,
GET_LINEAGE,
GET_ROLES,
INDEX_SEARCH,
PARTIAL_UPDATE_ENTITY_BY_ATTRIBUTE,
Expand Down Expand Up @@ -67,12 +68,14 @@
AtlanTypeCategory,
CertificateStatus,
)
from pyatlan.model.lineage import LineageRequest, LineageResponse
from pyatlan.model.response import AssetMutationResponse
from pyatlan.model.role import RoleResponse
from pyatlan.model.search import DSL, IndexSearchRequest, Term
from pyatlan.model.typedef import (
ClassificationDef,
CustomMetadataDef,
EnumDef,
TypeDef,
TypeDefResponse,
)
Expand Down Expand Up @@ -472,6 +475,16 @@ def create_typedef(self, typedef: TypeDef) -> TypeDefResponse:
relationship_defs=[],
custom_metadata_defs=[typedef],
)
elif isinstance(typedef, EnumDef):
# Set up the request payload...
payload = TypeDefResponse(
classification_defs=[],
enum_defs=[typedef],
struct_defs=[],
entity_defs=[],
relationship_defs=[],
custom_metadata_defs=[],
)
else:
raise InvalidRequestException(
"Unable to create new type definitions of category: "
Expand Down Expand Up @@ -725,8 +738,10 @@ def find_connections_by_name(
self,
name: str,
connector_type: AtlanConnectorType,
attributes: list[str] = None,
attributes: Optional[list[str]] = None,
) -> list[Connection]:
if attributes is None:
attributes = []
query = (
Term.with_state("ACTIVE")
+ Term.with_type_name("CONNECTION")
Expand All @@ -740,3 +755,9 @@ def find_connections_by_name(
)
results = self.search(search_request)
return [asset for asset in results if isinstance(asset, Connection)]

def get_lineage(self, lineage_request: LineageRequest) -> LineageResponse:
raw_json = self._call_api(
GET_LINEAGE, None, lineage_request, exclude_unset=False
)
return LineageResponse(**raw_json)
2 changes: 2 additions & 0 deletions pyatlan/client/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
BULK_HEADERS = "bulk/headers"

BULK_UPDATE = API(ENTITY_BULK_API, HTTPMethod.POST, HTTPStatus.OK)
# Lineage APIs
GET_LINEAGE = API(f"{BASE_URI}lineage/getlineage", HTTPMethod.POST, HTTPStatus.OK)
# Entity APIs
GET_ENTITY_BY_GUID = API(f"{ENTITY_API}guid", HTTPMethod.GET, HTTPStatus.OK)
GET_ENTITY_BY_UNIQUE_ATTRIBUTE = API(
Expand Down
6 changes: 6 additions & 0 deletions pyatlan/model/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,3 +256,9 @@ def to_qualified_name(self):
class SortOrder(str, Enum):
ASCENDING = "asc"
DESCENDING = "desc"


class LineageDirection(str, Enum):
UPSTREAM = "INPUT"
DOWNSTREAM = "OUTPUT"
BOTH = "BOTH"
224 changes: 224 additions & 0 deletions pyatlan/model/lineage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
# SPDX-License-Identifier: Apache-2.0
# Copyright 2022 Atlan Pte. Ltd.
# Based on original code from https://github.com/apache/atlas (under Apache-2.0 license)
from collections import deque
from typing import TYPE_CHECKING, Any, Optional

from pydantic import Field

if TYPE_CHECKING:
from dataclasses import dataclass
else:
from pydantic.dataclasses import dataclass

from pyatlan.error import InvalidRequestError
from pyatlan.model.assets import Asset
from pyatlan.model.core import AtlanObject
from pyatlan.model.enums import LineageDirection


class LineageRelation(AtlanObject):
from_entity_id: Optional[str]
to_entity_id: Optional[str]
process_id: Optional[str]
relationship_id: Optional[str]

@property
def is_full_link(self):
return self.process_id is not None


@dataclass(frozen=True)
class DirectedPair:
process_guid: str
target_guid: str


@dataclass(frozen=True)
class LineageGraph:
downstream_list: dict[str, dict[DirectedPair, None]]
upstream_list: dict[str, dict[DirectedPair, None]]

@classmethod
def create(cls, relations: list[LineageRelation]) -> "LineageGraph":
downstream_list: dict[str, dict[DirectedPair, None]] = {}
upstream_list: dict[str, dict[DirectedPair, None]] = {}

def add_relation(relation: LineageRelation):
if (
relation.from_entity_id
and relation.process_id
and relation.to_entity_id
):
add_edges(
relation.from_entity_id, relation.process_id, relation.to_entity_id
)

def add_edges(source_guid: str, process_guid: str, target_guid: str):
if source_guid not in downstream_list:
downstream_list[source_guid] = {}
if target_guid not in upstream_list:
upstream_list[target_guid] = {}
downstream_list[source_guid][
DirectedPair(process_guid=process_guid, target_guid=target_guid)
] = None
upstream_list[target_guid][
DirectedPair(process_guid=process_guid, target_guid=source_guid)
] = None

for relation in relations:
if relation.is_full_link:
add_relation(relation)
else:
raise InvalidRequestError(
param="",
code="ATLAN-JAVA-400-013",
message="Lineage was retrieved using hideProces=false. "
"We do not provide a graph view in this case.",
)
return cls(downstream_list=downstream_list, upstream_list=upstream_list)

@staticmethod
def get_asset_guids(
guid: str, guids: dict[str, dict[DirectedPair, None]]
) -> list[str]:
if guid in guids:
return list({pair.target_guid: None for pair in guids[guid].keys()}.keys())
return []

@staticmethod
def get_process_guids(
guid: str, guids: dict[str, dict[DirectedPair, None]]
) -> list[str]:
if guid in guids:
return list({pair.process_guid: None for pair in guids[guid].keys()}.keys())
return []

def get_downstream_asset_guids(self, guid: str) -> list[str]:
return LineageGraph.get_asset_guids(guid, self.downstream_list)

def get_downstream_process_guids(self, guid: str) -> list[str]:
return LineageGraph.get_process_guids(guid, self.downstream_list)

def get_upstream_asset_guids(self, guid: str) -> list[str]:
return LineageGraph.get_asset_guids(guid, self.upstream_list)

def get_upstream_process_guids(self, guid: str) -> list[str]:
return LineageGraph.get_process_guids(guid, self.upstream_list)

def get_all_downstream_asset_guids_dfs(self, guid: str) -> list[str]:
visited: dict[str, None] = {}
stack: deque[str] = deque()
stack.append(guid)
while len(stack) > 0:
to_traverse = stack.pop()
if to_traverse not in visited:
visited[to_traverse] = None
for downstream_guid in self.get_downstream_asset_guids(to_traverse):
if downstream_guid not in visited:
stack.append(downstream_guid)
return list(visited.keys())

def get_all_upstream_asset_guids_dfs(self, guid: str) -> list[str]:
visited: dict[str, None] = {}
stack: deque[str] = deque()
stack.append(guid)
while len(stack) > 0:
to_traverse = stack.pop()
if to_traverse not in visited:
visited[to_traverse] = None
for upstream_guid in self.get_upstream_asset_guids(to_traverse):
if upstream_guid not in visited:
stack.append(upstream_guid)
return list(visited.keys())


class LineageResponse(AtlanObject):
base_entity_guid: str
lineage_direction: LineageDirection
lineage_depth: int
limit: int
offset: int
has_more_upstream_vertices: bool
has_more_downstream_vertices: bool
guid_entity_map: dict[str, Asset]
relations: list[LineageRelation]
vertex_children_info: Optional[dict[str, Any]]
graph: Optional[LineageGraph] = None

def get_graph(self):
if self.graph is None:
self.graph = LineageGraph.create(self.relations)
return self.graph

def get_all_downstream_asset_guids_dfs(
self, guid: Optional[str] = None
) -> list[str]:
return self.get_graph().get_all_downstream_asset_guids_dfs(
guid if guid else self.base_entity_guid
)

def get_all_downstream_assets_dfs(self, guid: Optional[str] = None) -> list[Asset]:
return [
self.guid_entity_map[guid]
for guid in self.get_graph().get_all_downstream_asset_guids_dfs(
guid if guid else self.base_entity_guid
)
]

def get_all_upstream_asset_guids_dfs(self, guid: Optional[str] = None) -> list[str]:
return self.get_graph().get_all_upstream_asset_guids_dfs(
guid if guid else self.base_entity_guid
)

def get_all_upstream_assets_dfs(self, guid: Optional[str] = None) -> list[Asset]:
return [
self.guid_entity_map[guid]
for guid in self.get_graph().get_all_upstream_asset_guids_dfs(
guid if guid else self.base_entity_guid
)
]

def get_downstream_asset_guids(self, guid: Optional[str] = None) -> list[str]:
return self.get_graph().get_downstream_asset_guids(
guid if guid else self.base_entity_guid
)

def get_downstream_assets(self, guid: Optional[str] = None) -> list[Asset]:
return [
self.guid_entity_map[guid]
for guid in self.get_graph().get_downstream_asset_guids(
guid if guid else self.base_entity_guid
)
]

def get_downstream_process_guids(self, guid: Optional[str] = None) -> list[str]:
return self.get_graph().get_downstream_process_guids(
guid if guid else self.base_entity_guid
)

def get_upstream_asset_guids(self, guid: Optional[str] = None) -> list[str]:
return self.get_graph().get_upstream_asset_guids(
guid if guid else self.base_entity_guid
)

def get_upstream_assets(self, guid: Optional[str] = None) -> list[Asset]:
return [
self.guid_entity_map[guid]
for guid in self.get_graph().get_upstream_asset_guids(
guid if guid else self.base_entity_guid
)
]

def get_upstream_process_guids(self, guid: Optional[str] = None) -> list[str]:
return self.get_graph().get_upstream_process_guids(
guid if guid else self.base_entity_guid
)


class LineageRequest(AtlanObject):
guid: str
depth: int = Field(default=0)
direction: LineageDirection = Field(default=LineageDirection.BOTH)
hide_process: bool = Field(default=True)
allow_deleted_process: bool = Field(default=False)
4 changes: 2 additions & 2 deletions pyatlan/model/typedef.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ class Options(AtlanObject):
is_archived: Optional[bool] = Field(
None,
description="Whether the attribute has been deleted (true) or is still active (false).\n",
example=True
example=True,
)
archived_at: Optional[int] = Field(
None, description="When the attribute was deleted.\n"
Expand All @@ -148,7 +148,7 @@ class Options(AtlanObject):
is_new: Optional[bool] = Field(
True,
description="Whether the attribute is being newly created (true) or not (false).",
example=True
example=True,
)
cardinality: Optional[Cardinality] = Field(
"SINGLE",
Expand Down
2 changes: 1 addition & 1 deletion pyatlan/version.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.0.30
0.0.31
9 changes: 9 additions & 0 deletions tests/integration/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from pyatlan.client.atlan import AtlanClient
from pyatlan.model.assets import AtlasGlossary, AtlasGlossaryTerm, Connection, Database
from pyatlan.model.enums import AtlanConnectorType
from pyatlan.model.lineage import LineageRequest

iter_count = count(1)

Expand Down Expand Up @@ -230,3 +231,11 @@ def test_find_connections_by_name(client: AtlanClient):
)
assert len(connections) == 1
assert connections[0].connector_name == AtlanConnectorType.SNOWFLAKE.value


def test_get_lineage(client: AtlanClient):
response = client.get_lineage(
LineageRequest(guid="75474eab-3105-4ef9-9f84-709e386a7d3e")
)
for guid, asset in response.guid_entity_map.items():
assert guid == asset.guid
Loading

0 comments on commit 7587356

Please sign in to comment.