Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix DVX-201 AssetUpdatePayload not being returned from AtlanEvent #230

Merged
merged 1 commit into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 52 additions & 24 deletions pyatlan/model/events.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# SPDX-License-Identifier: Apache-2.0
# Copyright 2023 Atlan Pte. Ltd.
from typing import Any, Optional
from typing import Any, Literal, Optional, Union

from pydantic import Field

Expand All @@ -23,9 +23,14 @@ def __init__(__pydantic_self__, **data: Any) -> None:
event_type: Optional[str] = Field(
description="Type of the event payload.", alias="type"
)
operation_type: Optional[str] = Field(
description="Type of the operation the event contains a payload for."
)
operation_type: Literal[
"ENTITY_CREATE",
"ENTITY_UPDATE",
"ENTITY_DELETE",
"BUSINESS_ATTRIBUTE_UPDATE",
"CLASSIFICATION_ADD",
"CLASSIFICATION_DELETE",
] = Field(description="Type of the operation the event contains a payload for.")
event_time: Optional[int] = Field(
description="Time (epoch) the event was triggered in the source system, in milliseconds."
)
Expand All @@ -37,46 +42,59 @@ def __init__(__pydantic_self__, **data: Any) -> None:
)


class AssetCreatePayload(AtlanEventPayload, operation_type="ENTITY_CREATE"):
pass
class AssetCreatePayload(AtlanEventPayload):
operation_type: Literal["ENTITY_CREATE"] = Field(
description="Type of the operation the event contains a payload for."
)


class AssetUpdatePayload(AtlanEventPayload, operation_type="ENTITY_UPDATE"):
class AssetUpdatePayload(AtlanEventPayload):
operation_type: Literal["ENTITY_UPDATE"] = Field(
description="Type of the operation the event contains a payload for."
)
mutated_details: Optional[Asset] = Field(
description="Details of what was updated on the asset."
)


class AssetDeletePayload(AtlanEventPayload, operation_type="ENTITY_DELETE"):
pass
class AssetDeletePayload(AtlanEventPayload):
operation_type: Literal["ENTITY_DELETE"] = Field(
description="Type of the operation the event contains a payload for."
)


# TODO
# class CustomMetadataUpdatePayload(
# AtlanEventPayload,
# operation_type="BUSINESS_ATTRIBUTE_UPDATE",
# ):
# mutated_details: Optional[dict[str, CustomMetadataDict]] = Field(
# description="Map of custom metadata attributes and values defined on the asset."
# "The map is keyed by the human-readable name of the custom metadata set,"
# "and the values are a further mapping from human-readable attribute name"
# "to the value for that attribute as provided when updating this asset."
# )
class CustomMetadataUpdatePayload(
AtlanEventPayload,
):
operation_type: Literal["BUSINESS_ATTRIBUTE_UPDATE"] = Field(
description="Type of the operation the event contains a payload for."
)
# TODO: Need to create a more specific type
mutated_details: Optional[dict[str, Any]] = Field(
description="Map of custom metadata attributes and values defined on the asset."
"The map is keyed by the human-readable name of the custom metadata set,"
"and the values are a further mapping from human-readable attribute name"
"to the value for that attribute as provided when updating this asset."
)


class AtlanTagAddPayload(
AtlanEventPayload,
operation_type="CLASSIFICATION_ADD",
):
operation_type: Literal["CLASSIFICATION_ADD"] = Field(
description="Type of the operation the event contains a payload for."
)
mutated_details: Optional[AtlanTag] = Field(
description="Atlan tags that were added to the asset by this event."
)


class AtlanTagDeletePayload(
AtlanEventPayload,
operation_type="CLASSIFICATION_DELETE",
):
operation_type: Literal["CLASSIFICATION_DELETE"] = Field(
description="Type of the operation the event contains a payload for."
)
mutated_details: Optional[AtlanTag] = Field(
description="Atlan tags that were removed from the asset by this event."
)
Expand All @@ -96,8 +114,18 @@ class AtlanEvent(AtlanObject):
description="Timestamp (epoch) for when the event was created, in milliseconds."
)
spooled: Optional[bool] = Field(description="TBC")
payload: Optional[AtlanEventPayload] = Field(
description="Detailed contents (payload) of the event.", alias="message"
payload: Optional[
Union[
AssetCreatePayload,
AssetUpdatePayload,
CustomMetadataUpdatePayload,
AtlanTagAddPayload,
AtlanTagDeletePayload,
]
] = Field(
description="Detailed contents (payload) of the event.",
alias="message",
discriminator="operation_type",
)


Expand Down
156 changes: 155 additions & 1 deletion tests/unit/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from pyatlan.events.atlan_event_handler import is_validation_request, valid_signature
from pyatlan.model.assets import AtlasGlossaryTerm
from pyatlan.model.events import AtlanEvent
from pyatlan.model.events import AssetUpdatePayload, AtlanEvent

SIGNING = "abc123"

Expand Down Expand Up @@ -96,6 +96,154 @@
"isBase64Encoded": False,
}

JSON_ATLAN_EVENT = """
{
"source": {
},
"version": {
"version": "1.0.0",
"versionParts": [
1
]
},
"msgCompressionKind": "NONE",
"msgSplitIdx": 1,
"msgSplitCount": 1,
"msgSourceIP": "10.147.3.150",
"msgCreatedBy": "",
"msgCreationTime": 1705924521864,
"spooled": false,
"message": {
"type": "ENTITY_NOTIFICATION_V2",
"entity": {
"typeName": "AtlasGlossaryTerm",
"attributes": {
"popularityScore": 1.17549435e-38,
"assetMcMonitorNames": [
],
"lastSyncRunAt": 0,
"assetSodaLastSyncRunAt": 0,
"starredCount": 0,
"adminUsers": [
],
"assetMcIncidentQualifiedNames": [
],
"assetMcIncidentTypes": [
],
"assetSodaLastScanAt": 0,
"sourceUpdatedAt": 0,
"assetDbtJobLastRunArtifactsSaved": false,
"isEditable": true,
"announcementUpdatedAt": 0,
"sourceCreatedAt": 0,
"assetDbtJobLastRunDequedAt": 0,
"assetDbtTags": [
],
"qualifiedName": "8Wi1jGldVz1vEBXhGivg3@79FD59qksQ4G3Y6h5ZWTO",
"assetDbtJobLastRunNotificationsSent": false,
"assetMcMonitorTypes": [
],
"assetSodaCheckCount": 0,
"assetMcMonitorStatuses": [
],
"starredBy": [],
"name": "new-term",
"certificateUpdatedAt": 1703077797628,
"assetMcIncidentSeverities": [
],
"ownerUsers": [
"pskib"
],
"certificateStatus": "DRAFT",
"assetDbtJobLastRunHasSourcesGenerated": false,
"assetMcIncidentSubTypes": [
],
"isAIGenerated": false,
"assetDbtJobLastRunHasDocsGenerated": false,
"assetTags": [
],
"assetMcIncidentStates": [
],
"assetDbtJobLastRunUpdatedAt": 0,
"ownerGroups": [
],
"certificateUpdatedBy": "pskib",
"assetMcMonitorQualifiedNames": [
],
"assetDbtJobLastRunStartedAt": 0,
"isDiscoverable": true,
"isPartial": false,
"assetMcMonitorScheduleTypes": [
],
"viewerUsers": [
],
"assetMcIncidentNames": [
],
"userDescription": "test",
"adminRoles": [
],
"adminGroups": [
],
"assetDbtJobLastRunCreatedAt": 0,
"assetDbtJobNextRun": 0,
"assetMcLastSyncRunAt": 0,
"viewerGroups": [
],
"assetDbtJobLastRun": 0
},
"guid": "a5ed097d-93ea-4728-b3c3-ef441c3e6094",
"displayText": "new-term",
"isIncomplete": false,
"createdBy": "pskib",
"updatedBy": "pskib",
"createTime": 1703077797628,
"updateTime": 1705924521736,
"relationshipAttributes": {
"anchor": {
"guid": "579ae112-3f36-40ed-ad58-edcb6e719cf2",
"typeName": "AtlasGlossary",
"attributes": {
"certificateStatus": "DRAFT",
"__modifiedBy": "pskib",
"__state": "ACTIVE",
"__createdBy": "pskib",
"starredBy": [
],
"__modificationTimestamp": 1703077797628,
"name": "Test-Glossary",
"isPartial": false,
"assetIcon": "atlanGlossary",
"__timestamp": 1702635066946,
"assetDbtJobLastRun": 0
},
"uniqueAttributes": {
"qualifiedName": "79FD59qksQ4G3Y6h5ZWTO"
}
}
}
},
"operationType": "ENTITY_UPDATE",
"eventTime": 1705924521736,
"mutatedDetails": {
"typeName": "AtlasGlossaryTerm",
"attributes": {
"userDescription": "test"
},
"guid": "a5ed097d-93ea-4728-b3c3-ef441c3e6094",
"isIncomplete": false,
"provenanceType": 0,
"updatedBy": "pskib",
"updateTime": 1705924521736,
"version": 0,
"proxy": false
},
"headers": {
"x-atlan-request-id": "e0a11772-8f4b-4141-08e1-4e74998cb0d2",
"x-atlan-via-ui": "true"
}
}
}"""


def test_validation_payload():
body = VALIDATION_PAYLOAD.get("body")
Expand All @@ -117,3 +265,9 @@ def test_body():
assert atlan_event
assert atlan_event.payload
assert isinstance(atlan_event.payload.asset, AtlasGlossaryTerm)


def test_correct_payload_type_returned():
payload = json.loads(JSON_ATLAN_EVENT)
event = AtlanEvent(**payload)
assert isinstance(event.payload, AssetUpdatePayload)
Loading