Skip to content

Commit

Permalink
Merge pull request #230 from atlanhq/DVX-201
Browse files Browse the repository at this point in the history
Fix DVX-201 AssetUpdatePayload not being returned from AtlanEvent
  • Loading branch information
ErnestoLoma authored Jan 24, 2024
2 parents 3ce5c0f + 76b0b06 commit 381fd2e
Show file tree
Hide file tree
Showing 2 changed files with 207 additions and 25 deletions.
76 changes: 52 additions & 24 deletions pyatlan/model/events.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# SPDX-License-Identifier: Apache-2.0
# Copyright 2023 Atlan Pte. Ltd.
from typing import Any, Optional
from typing import Any, Literal, Optional, Union

from pydantic import Field

Expand All @@ -23,9 +23,14 @@ def __init__(__pydantic_self__, **data: Any) -> None:
event_type: Optional[str] = Field(
description="Type of the event payload.", alias="type"
)
operation_type: Optional[str] = Field(
description="Type of the operation the event contains a payload for."
)
operation_type: Literal[
"ENTITY_CREATE",
"ENTITY_UPDATE",
"ENTITY_DELETE",
"BUSINESS_ATTRIBUTE_UPDATE",
"CLASSIFICATION_ADD",
"CLASSIFICATION_DELETE",
] = Field(description="Type of the operation the event contains a payload for.")
event_time: Optional[int] = Field(
description="Time (epoch) the event was triggered in the source system, in milliseconds."
)
Expand All @@ -37,46 +42,59 @@ def __init__(__pydantic_self__, **data: Any) -> None:
)


class AssetCreatePayload(AtlanEventPayload, operation_type="ENTITY_CREATE"):
pass
class AssetCreatePayload(AtlanEventPayload):
operation_type: Literal["ENTITY_CREATE"] = Field(
description="Type of the operation the event contains a payload for."
)


class AssetUpdatePayload(AtlanEventPayload, operation_type="ENTITY_UPDATE"):
class AssetUpdatePayload(AtlanEventPayload):
operation_type: Literal["ENTITY_UPDATE"] = Field(
description="Type of the operation the event contains a payload for."
)
mutated_details: Optional[Asset] = Field(
description="Details of what was updated on the asset."
)


class AssetDeletePayload(AtlanEventPayload, operation_type="ENTITY_DELETE"):
pass
class AssetDeletePayload(AtlanEventPayload):
operation_type: Literal["ENTITY_DELETE"] = Field(
description="Type of the operation the event contains a payload for."
)


# TODO
# class CustomMetadataUpdatePayload(
# AtlanEventPayload,
# operation_type="BUSINESS_ATTRIBUTE_UPDATE",
# ):
# mutated_details: Optional[dict[str, CustomMetadataDict]] = Field(
# description="Map of custom metadata attributes and values defined on the asset."
# "The map is keyed by the human-readable name of the custom metadata set,"
# "and the values are a further mapping from human-readable attribute name"
# "to the value for that attribute as provided when updating this asset."
# )
class CustomMetadataUpdatePayload(
AtlanEventPayload,
):
operation_type: Literal["BUSINESS_ATTRIBUTE_UPDATE"] = Field(
description="Type of the operation the event contains a payload for."
)
# TODO: Need to create a more specific type
mutated_details: Optional[dict[str, Any]] = Field(
description="Map of custom metadata attributes and values defined on the asset."
"The map is keyed by the human-readable name of the custom metadata set,"
"and the values are a further mapping from human-readable attribute name"
"to the value for that attribute as provided when updating this asset."
)


class AtlanTagAddPayload(
AtlanEventPayload,
operation_type="CLASSIFICATION_ADD",
):
operation_type: Literal["CLASSIFICATION_ADD"] = Field(
description="Type of the operation the event contains a payload for."
)
mutated_details: Optional[AtlanTag] = Field(
description="Atlan tags that were added to the asset by this event."
)


class AtlanTagDeletePayload(
AtlanEventPayload,
operation_type="CLASSIFICATION_DELETE",
):
operation_type: Literal["CLASSIFICATION_DELETE"] = Field(
description="Type of the operation the event contains a payload for."
)
mutated_details: Optional[AtlanTag] = Field(
description="Atlan tags that were removed from the asset by this event."
)
Expand All @@ -96,8 +114,18 @@ class AtlanEvent(AtlanObject):
description="Timestamp (epoch) for when the event was created, in milliseconds."
)
spooled: Optional[bool] = Field(description="TBC")
payload: Optional[AtlanEventPayload] = Field(
description="Detailed contents (payload) of the event.", alias="message"
payload: Optional[
Union[
AssetCreatePayload,
AssetUpdatePayload,
CustomMetadataUpdatePayload,
AtlanTagAddPayload,
AtlanTagDeletePayload,
]
] = Field(
description="Detailed contents (payload) of the event.",
alias="message",
discriminator="operation_type",
)


Expand Down
156 changes: 155 additions & 1 deletion tests/unit/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from pyatlan.events.atlan_event_handler import is_validation_request, valid_signature
from pyatlan.model.assets import AtlasGlossaryTerm
from pyatlan.model.events import AtlanEvent
from pyatlan.model.events import AssetUpdatePayload, AtlanEvent

SIGNING = "abc123"

Expand Down Expand Up @@ -96,6 +96,154 @@
"isBase64Encoded": False,
}

JSON_ATLAN_EVENT = """
{
"source": {
},
"version": {
"version": "1.0.0",
"versionParts": [
1
]
},
"msgCompressionKind": "NONE",
"msgSplitIdx": 1,
"msgSplitCount": 1,
"msgSourceIP": "10.147.3.150",
"msgCreatedBy": "",
"msgCreationTime": 1705924521864,
"spooled": false,
"message": {
"type": "ENTITY_NOTIFICATION_V2",
"entity": {
"typeName": "AtlasGlossaryTerm",
"attributes": {
"popularityScore": 1.17549435e-38,
"assetMcMonitorNames": [
],
"lastSyncRunAt": 0,
"assetSodaLastSyncRunAt": 0,
"starredCount": 0,
"adminUsers": [
],
"assetMcIncidentQualifiedNames": [
],
"assetMcIncidentTypes": [
],
"assetSodaLastScanAt": 0,
"sourceUpdatedAt": 0,
"assetDbtJobLastRunArtifactsSaved": false,
"isEditable": true,
"announcementUpdatedAt": 0,
"sourceCreatedAt": 0,
"assetDbtJobLastRunDequedAt": 0,
"assetDbtTags": [
],
"qualifiedName": "8Wi1jGldVz1vEBXhGivg3@79FD59qksQ4G3Y6h5ZWTO",
"assetDbtJobLastRunNotificationsSent": false,
"assetMcMonitorTypes": [
],
"assetSodaCheckCount": 0,
"assetMcMonitorStatuses": [
],
"starredBy": [],
"name": "new-term",
"certificateUpdatedAt": 1703077797628,
"assetMcIncidentSeverities": [
],
"ownerUsers": [
"pskib"
],
"certificateStatus": "DRAFT",
"assetDbtJobLastRunHasSourcesGenerated": false,
"assetMcIncidentSubTypes": [
],
"isAIGenerated": false,
"assetDbtJobLastRunHasDocsGenerated": false,
"assetTags": [
],
"assetMcIncidentStates": [
],
"assetDbtJobLastRunUpdatedAt": 0,
"ownerGroups": [
],
"certificateUpdatedBy": "pskib",
"assetMcMonitorQualifiedNames": [
],
"assetDbtJobLastRunStartedAt": 0,
"isDiscoverable": true,
"isPartial": false,
"assetMcMonitorScheduleTypes": [
],
"viewerUsers": [
],
"assetMcIncidentNames": [
],
"userDescription": "test",
"adminRoles": [
],
"adminGroups": [
],
"assetDbtJobLastRunCreatedAt": 0,
"assetDbtJobNextRun": 0,
"assetMcLastSyncRunAt": 0,
"viewerGroups": [
],
"assetDbtJobLastRun": 0
},
"guid": "a5ed097d-93ea-4728-b3c3-ef441c3e6094",
"displayText": "new-term",
"isIncomplete": false,
"createdBy": "pskib",
"updatedBy": "pskib",
"createTime": 1703077797628,
"updateTime": 1705924521736,
"relationshipAttributes": {
"anchor": {
"guid": "579ae112-3f36-40ed-ad58-edcb6e719cf2",
"typeName": "AtlasGlossary",
"attributes": {
"certificateStatus": "DRAFT",
"__modifiedBy": "pskib",
"__state": "ACTIVE",
"__createdBy": "pskib",
"starredBy": [
],
"__modificationTimestamp": 1703077797628,
"name": "Test-Glossary",
"isPartial": false,
"assetIcon": "atlanGlossary",
"__timestamp": 1702635066946,
"assetDbtJobLastRun": 0
},
"uniqueAttributes": {
"qualifiedName": "79FD59qksQ4G3Y6h5ZWTO"
}
}
}
},
"operationType": "ENTITY_UPDATE",
"eventTime": 1705924521736,
"mutatedDetails": {
"typeName": "AtlasGlossaryTerm",
"attributes": {
"userDescription": "test"
},
"guid": "a5ed097d-93ea-4728-b3c3-ef441c3e6094",
"isIncomplete": false,
"provenanceType": 0,
"updatedBy": "pskib",
"updateTime": 1705924521736,
"version": 0,
"proxy": false
},
"headers": {
"x-atlan-request-id": "e0a11772-8f4b-4141-08e1-4e74998cb0d2",
"x-atlan-via-ui": "true"
}
}
}"""


def test_validation_payload():
body = VALIDATION_PAYLOAD.get("body")
Expand All @@ -117,3 +265,9 @@ def test_body():
assert atlan_event
assert atlan_event.payload
assert isinstance(atlan_event.payload.asset, AtlasGlossaryTerm)


def test_correct_payload_type_returned():
payload = json.loads(JSON_ATLAN_EVENT)
event = AtlanEvent(**payload)
assert isinstance(event.payload, AssetUpdatePayload)

0 comments on commit 381fd2e

Please sign in to comment.