Skip to content

Commit

Permalink
Merge pull request #44 from atlanhq/create_lineage
Browse files Browse the repository at this point in the history
Add create method to Process class
  • Loading branch information
ErnestoLoma authored May 8, 2023
2 parents 612049f + 30e2a82 commit bd5e405
Show file tree
Hide file tree
Showing 6 changed files with 442 additions and 3 deletions.
4 changes: 4 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 0.0.27 (May 8, 2023)

* Add a create method to Process

## 0.0.26 (May 4, 2023)

* Add remove_terms method to AtlanClient
Expand Down
93 changes: 91 additions & 2 deletions pyatlan/generator/templates/entity.jinja2
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from __future__ import annotations

import sys
from io import StringIO
import hashlib
from datetime import datetime
from typing import Any, ClassVar, Dict, List, Optional, TypeVar

Expand Down Expand Up @@ -65,12 +67,14 @@ def validate_single_required_field(field_names: list[str], values: list[Any]):
f"Only one of the following parameters are allowed: {', '.join(names)}"
)

def validate_required_fields(field_names:list[str], values:list[Any]):
for field_name, value in zip(field_names, values):
def validate_required_fields(field_names: list[str], values: list[Any]):
for field_name, value in zip(field_names, values):
if value is None:
raise ValueError(f"{field_name} is required")
if isinstance(value, str) and not value.strip():
raise ValueError(f"{field_name} cannot be blank")
if isinstance(value, list) and len(value) == 0:
raise ValueError(f"{field_name} cannot be an empty list")
{%- macro gen_properties(attribute_defs, additional_names=[]) %}
_convience_properties: ClassVar[list[str]] = [
{%- for attribute_def in attribute_defs %}
Expand Down Expand Up @@ -455,6 +459,70 @@ class {{ entity_def.name }}({{super_classes[0]}} {%- if "Asset" in super_classes
raise ValueError(
"One of admin_user, admin_groups or admin_roles is required"
)
{%- elif entity_def.name == "Process" %}
@staticmethod
def generate_qualified_name(
name: str,
connection_qualified_name: str,
inputs: list["Catalog"],
outputs: list["Catalog"],
parent: Optional["Process"] = None,
process_id: Optional[str] = None,
) -> str:
def append_relationship(output: StringIO, relationship: Asset):
if relationship.guid:
output.write(relationship.guid)

def append_relationships(output: StringIO, relationships: list["Catalog"]):
for catalog in relationships:
append_relationship(output, catalog)

validate_required_fields(
["name", "connection_qualified_name", "inputs", "outputs"],
[name, connection_qualified_name, inputs, outputs],
)
if process_id and process_id.strip():
return f"{connection_qualified_name}/{process_id}"
buffer = StringIO()
buffer.write(name)
buffer.write(connection_qualified_name)
if parent:
append_relationship(buffer, parent)
append_relationships(buffer, inputs)
append_relationships(buffer, outputs)
ret_value = hashlib.md5(
buffer.getvalue().encode(), usedforsecurity=False
).hexdigest()
buffer.close()
return ret_value

@classmethod
def create(
cls,
name: str,
connection_qualified_name: str,
inputs: list["Catalog"],
outputs: list["Catalog"],
process_id: Optional[str] = None,
parent: Optional[Process] = None,
) -> Process.Attributes:
qualified_name = Process.Attributes.generate_qualified_name(
name=name,
connection_qualified_name=connection_qualified_name,
process_id=process_id,
inputs=inputs,
outputs=outputs,
parent=parent,
)
connector_name = connection_qualified_name.split("/")[1]
return Process.Attributes(
name=name,
qualified_name=qualified_name,
connector_name=connector_name,
connection_qualified_name=connection_qualified_name,
inputs=inputs,
outputs=outputs,
)
{%- elif entity_def.name == "Readme" %}
@classmethod
# @validate_arguments()
Expand Down Expand Up @@ -755,6 +823,27 @@ class {{ entity_def.name }}({{super_classes[0]}} {%- if "Asset" in super_classes
raise ValueError(
"One of admin_user, admin_groups or admin_roles is required"
)
{%- elif entity_def.name == "Process" %}
@classmethod
def create(
cls,
name: str,
connection_qualified_name: str,
inputs: list["Catalog"],
outputs: list["Catalog"],
process_id: Optional[str] = None,
parent: Optional[Process] = None,
) -> Process:
return Process(
attributes=Process.Attributes.create(
name=name,
connection_qualified_name=connection_qualified_name,
process_id=process_id,
inputs=inputs,
outputs=outputs,
parent=parent,
)
)
{%- elif entity_def.name == "Database" %}
@classmethod
# @validate_arguments()
Expand Down
89 changes: 89 additions & 0 deletions pyatlan/model/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
# Based on original code from https://github.com/apache/atlas (under Apache-2.0 license)
from __future__ import annotations

import hashlib
import sys
from datetime import datetime
from io import StringIO
from typing import Any, ClassVar, Dict, List, Optional, TypeVar
from urllib.parse import quote, unquote

Expand Down Expand Up @@ -79,6 +81,8 @@ def validate_required_fields(field_names: list[str], values: list[Any]):
raise ValueError(f"{field_name} is required")
if isinstance(value, str) and not value.strip():
raise ValueError(f"{field_name} cannot be blank")
if isinstance(value, list) and len(value) == 0:
raise ValueError(f"{field_name} cannot be an empty list")


SelfAsset = TypeVar("SelfAsset", bound="Asset")
Expand Down Expand Up @@ -3081,12 +3085,97 @@ class Attributes(Asset.Attributes):
None, description="", alias="meanings"
) # relationship

@staticmethod
def generate_qualified_name(
name: str,
connection_qualified_name: str,
inputs: list["Catalog"],
outputs: list["Catalog"],
parent: Optional["Process"] = None,
process_id: Optional[str] = None,
) -> str:
def append_relationship(output: StringIO, relationship: Asset):
if relationship.guid:
output.write(relationship.guid)

def append_relationships(output: StringIO, relationships: list["Catalog"]):
for catalog in relationships:
append_relationship(output, catalog)

validate_required_fields(
["name", "connection_qualified_name", "inputs", "outputs"],
[name, connection_qualified_name, inputs, outputs],
)
if process_id and process_id.strip():
return f"{connection_qualified_name}/{process_id}"
buffer = StringIO()
buffer.write(name)
buffer.write(connection_qualified_name)
if parent:
append_relationship(buffer, parent)
append_relationships(buffer, inputs)
append_relationships(buffer, outputs)
ret_value = hashlib.md5(
buffer.getvalue().encode(), usedforsecurity=False
).hexdigest()
buffer.close()
return ret_value

@classmethod
def create(
cls,
name: str,
connection_qualified_name: str,
inputs: list["Catalog"],
outputs: list["Catalog"],
process_id: Optional[str] = None,
parent: Optional[Process] = None,
) -> Process.Attributes:
qualified_name = Process.Attributes.generate_qualified_name(
name=name,
connection_qualified_name=connection_qualified_name,
process_id=process_id,
inputs=inputs,
outputs=outputs,
parent=parent,
)
connector_name = connection_qualified_name.split("/")[1]
return Process.Attributes(
name=name,
qualified_name=qualified_name,
connector_name=connector_name,
connection_qualified_name=connection_qualified_name,
inputs=inputs,
outputs=outputs,
)

attributes: "Process.Attributes" = Field(
None,
description="Map of attributes in the instance and their values. The specific keys of this map will vary by "
"type, so are described in the sub-types of this schema.\n",
)

@classmethod
def create(
cls,
name: str,
connection_qualified_name: str,
inputs: list["Catalog"],
outputs: list["Catalog"],
process_id: Optional[str] = None,
parent: Optional[Process] = None,
) -> Process:
return Process(
attributes=Process.Attributes.create(
name=name,
connection_qualified_name=connection_qualified_name,
process_id=process_id,
inputs=inputs,
outputs=outputs,
parent=parent,
)
)


class AtlasGlossaryCategory(Asset, type_name="AtlasGlossaryCategory"):
"""Description"""
Expand Down
2 changes: 1 addition & 1 deletion pyatlan/version.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.0.26
0.0.27
50 changes: 50 additions & 0 deletions tests/integration/test_entity_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os
import random
import string
from typing import Callable, Generator

import pytest
import requests
Expand All @@ -18,7 +19,9 @@
Column,
Connection,
Database,
Process,
Readme,
S3Object,
Schema,
Table,
View,
Expand All @@ -43,6 +46,7 @@
"57f5463d-cc2a-4859-bf28-e4fa52002e8e",
}
TEMP_CONNECTION_GUID = "b3a5c49a-0c7c-4e66-8453-f4da8d9ce222"
S3_CONNECTION_GUID = "25f2dc21-cd53-47fe-bbed-be10759d087a"


@pytest.fixture(scope="module")
Expand Down Expand Up @@ -189,6 +193,7 @@ def cleanup(atlan_host, headers, atlan_api_key):
"Connection",
"View",
"Column",
"Process",
"Readme",
]
for type_name in type_names:
Expand Down Expand Up @@ -825,3 +830,48 @@ def test_create_readme(client: AtlanClient):
assert len(reaadmes) == 1
assert (glossaries := response.assets_updated(asset_type=AtlasGlossary))
assert len(glossaries) == 1


@pytest.fixture()
def make_s3_object(
client: AtlanClient,
) -> Generator[Callable[[str], S3Object], None, None]:
created_guids = []
connection = client.get_asset_by_guid(S3_CONNECTION_GUID, Connection)

def _make_s3_object(name: str) -> S3Object:
s3_object = S3Object.create(
connection_qualified_name=connection.qualified_name,
name=name,
aws_arn=f"arn:aws:s3:::{name}",
)
s3_object = client.upsert(s3_object).assets_created(S3Object)[0]
created_guids.append(s3_object.guid)
return s3_object

yield _make_s3_object

for guid in created_guids:
client.purge_entity_by_guid(guid=guid)


def test_process_create(client: AtlanClient, make_s3_object: Callable[[str], S3Object]):
connection = client.get_asset_by_guid(S3_CONNECTION_GUID, Connection)

input_object = make_s3_object("Integration Source")

output_object = make_s3_object("Integration Target")

process = Process.create(
name="Integration Process Test",
connection_qualified_name=connection.qualified_name,
process_id="doit",
inputs=[input_object],
outputs=[output_object],
)

response = client.upsert(process)
assert (processes := response.assets_created(Process))
assert len(processes) == 1
assert (assets := response.assets_updated(S3Object))
assert len(assets) == 2
Loading

0 comments on commit bd5e405

Please sign in to comment.