diff --git a/localstack/services/events/models.py b/localstack/services/events/models.py index 3d42c621c6b41..d64c74bbaa7fd 100644 --- a/localstack/services/events/models.py +++ b/localstack/services/events/models.py @@ -1,5 +1,5 @@ from dataclasses import dataclass, field -from typing import Optional +from typing import Optional, TypeAlias, TypedDict from localstack.aws.api.core import ServiceException from localstack.aws.api.events import ( @@ -7,6 +7,8 @@ CreatedBy, EventBusName, EventPattern, + EventResourceList, + EventSourceName, ManagedBy, RoleArn, RuleDescription, @@ -102,3 +104,18 @@ class InvalidEventPatternException(Exception): def __init__(self, reason=None, message=None) -> None: self.reason = reason self.message = message or f"Event pattern is not valid. Reason: {reason}" + + +class FormattedEvent(TypedDict): + version: str + id: str + detail_type: Optional[str] # key "detail-type" is automatically interpreted as detail_type + source: Optional[EventSourceName] + account: str + time: str + region: str + resources: Optional[EventResourceList] + detail: dict[str, str | dict] + + +TransformedEvent: TypeAlias = FormattedEvent | dict | str diff --git a/localstack/services/events/provider.py b/localstack/services/events/provider.py index 7ff7ea0c40a71..80d9c8e7bb8ec 100644 --- a/localstack/services/events/provider.py +++ b/localstack/services/events/provider.py @@ -59,6 +59,7 @@ EventBus, EventBusDict, EventsStore, + FormattedEvent, Rule, RuleDict, TargetDict, @@ -125,7 +126,7 @@ def validate_event(event: PutEventsRequestEntry) -> None | PutEventsResultEntry: } -def format_event(event: PutEventsRequestEntry, region: str, account_id: str) -> dict: +def format_event(event: PutEventsRequestEntry, region: str, account_id: str) -> FormattedEvent: # See https://docs.aws.amazon.com/AmazonS3/latest/userguide/ev-events.html formatted_event = { "version": "0", @@ -685,7 +686,7 @@ def _process_entries( for target in rule.targets.values(): target_sender = self._target_sender_store[target["Arn"]] try: - target_sender.send_event(event) + target_sender.process_event(event) processed_entries.append({"EventId": event["id"]}) except Exception as error: processed_entries.append( diff --git a/localstack/services/events/target.py b/localstack/services/events/target.py index 6a22f4e08d059..0937bd9680ac6 100644 --- a/localstack/services/events/target.py +++ b/localstack/services/events/target.py @@ -7,10 +7,11 @@ from localstack.aws.api.events import ( Arn, - PutEventsRequestEntry, Target, + TargetInputPath, ) from localstack.aws.connect import connect_to +from localstack.services.events.models import FormattedEvent, TransformedEvent from localstack.utils import collections from localstack.utils.aws.arns import ( extract_service_from_arn, @@ -18,12 +19,20 @@ sqs_queue_url_for_arn, ) from localstack.utils.aws.client_types import ServicePrincipal +from localstack.utils.json import extract_jsonpath from localstack.utils.strings import to_bytes from localstack.utils.time import now_utc LOG = logging.getLogger(__name__) +def transform_event_with_target_input_path( + input_path: TargetInputPath, event: FormattedEvent +) -> TransformedEvent: + formatted_event = extract_jsonpath(event, input_path) + return formatted_event + + class TargetSender(ABC): def __init__( self, @@ -54,9 +63,15 @@ def client(self): return self._client @abstractmethod - def send_event(self, event: PutEventsRequestEntry): + def send_event(self, event: FormattedEvent | TransformedEvent): pass + def process_event(self, event: FormattedEvent): + """Processes the event and send it to the target.""" + if input_path := self.target.get("InputPath"): + event = transform_event_with_target_input_path(input_path, event) + self.send_event(event) + def _validate_input(self, target: Target): """Provide a default implementation that does nothing if no specific validation is needed.""" # TODO add For Lambda and Amazon SNS resources, EventBridge relies on resource-based policies. diff --git a/tests/aws/services/events/conftest.py b/tests/aws/services/events/conftest.py index fbb6df5269434..ec0ea6bcba1db 100644 --- a/tests/aws/services/events/conftest.py +++ b/tests/aws/services/events/conftest.py @@ -194,23 +194,12 @@ def _delete_log_group(): @pytest.fixture -def put_events_with_filter_to_sqs(aws_client, sqs_get_queue_arn, clean_up): +def create_sqs_events_target(aws_client, sqs_get_queue_arn): queue_urls = [] - event_bus_names = [] - rule_names = [] - target_ids = [] - - def _put_events_with_filter_to_sqs( - pattern: dict, - entries_asserts: list[Tuple[list[dict], bool]], - input_path: str = None, - input_transformer: dict[dict, str] = None, - ): - queue_name = f"queue-{short_uid()}" - rule_name = f"rule-{short_uid()}" - target_id = f"target-{short_uid()}" - bus_name = f"bus-{short_uid()}" + def _create_sqs_events_target(queue_name: str | None = None) -> tuple[str, str]: + if not queue_name: + queue_name = f"tests-queue-{short_uid()}" sqs_client = aws_client.sqs queue_url = sqs_client.create_queue(QueueName=queue_name)["QueueUrl"] queue_urls.append(queue_url) @@ -231,6 +220,34 @@ def _put_events_with_filter_to_sqs( sqs_client.set_queue_attributes( QueueUrl=queue_url, Attributes={"Policy": json.dumps(policy)} ) + return queue_url, queue_arn + + yield _create_sqs_events_target + + for queue_url in queue_urls: + try: + aws_client.sqs.delete_queue(QueueUrl=queue_url) + except Exception as e: + LOG.debug("error cleaning up queue %s: %s", queue_url, e) + + +@pytest.fixture +def put_events_with_filter_to_sqs(aws_client, create_sqs_events_target, clean_up): + event_bus_names = [] + rule_names = [] + target_ids = [] + + def _put_events_with_filter_to_sqs( + pattern: dict, + entries_asserts: list[Tuple[list[dict], bool]], + input_path: str = None, + input_transformer: dict[dict, str] = None, + ): + rule_name = f"test-rule-{short_uid()}" + target_id = f"test-target-{short_uid()}" + bus_name = f"test-bus-{short_uid()}" + + queue_url, queue_arn = create_sqs_events_target() events_client = aws_client.events events_client.create_event_bus(Name=bus_name) @@ -262,7 +279,7 @@ def _put_events_with_filter_to_sqs( entry["EventBusName"] = bus_name message = _put_entries_assert_results_sqs( events_client, - sqs_client, + aws_client.sqs, queue_url, entries=entries, should_match=entry_asserts[1], @@ -274,14 +291,11 @@ def _put_events_with_filter_to_sqs( yield _put_events_with_filter_to_sqs - for queue_url, event_bus_name, rule_name, target_id in zip( - queue_urls, event_bus_names, rule_names, target_ids - ): + for event_bus_name, rule_name, target_id in zip(event_bus_names, rule_names, target_ids): clean_up( bus_name=event_bus_name, rule_name=rule_name, target_ids=target_id, - queue_url=queue_url, ) diff --git a/tests/aws/services/events/test_events_inputs.py b/tests/aws/services/events/test_events_inputs.py index 5ff7962ce7c8e..6619ec4c76b51 100644 --- a/tests/aws/services/events/test_events_inputs.py +++ b/tests/aws/services/events/test_events_inputs.py @@ -4,103 +4,127 @@ import pytest -from localstack.constants import TEST_AWS_ACCOUNT_ID, TEST_AWS_REGION_NAME from localstack.testing.pytest import markers -from localstack.utils.aws import arns from localstack.utils.strings import short_uid from tests.aws.services.events.conftest import sqs_collect_messages from tests.aws.services.events.helper_functions import is_v2_provider from tests.aws.services.events.test_events import EVENT_DETAIL, TEST_EVENT_PATTERN +EVENT_DETAIL_DUPLICATED_KEY = { + "command": "update-account", + "payload": {"acc_id": "0a787ecb-4015", "payload": {"message": "baz", "id": "123"}}, +} -class TestEventsInputPath: - @markers.aws.unknown - @pytest.mark.skipif(is_v2_provider(), reason="V2 provider does not support this feature yet") - def test_put_events_with_input_path(self, aws_client, clean_up): - queue_name = f"queue-{short_uid()}" - rule_name = f"rule-{short_uid()}" - target_id = f"target-{short_uid()}" - bus_name = f"bus-{short_uid()}" - - queue_url = aws_client.sqs.create_queue(QueueName=queue_name)["QueueUrl"] - queue_arn = arns.sqs_queue_arn(queue_name, TEST_AWS_ACCOUNT_ID, TEST_AWS_REGION_NAME) - aws_client.events.create_event_bus(Name=bus_name) - aws_client.events.put_rule( - Name=rule_name, - EventBusName=bus_name, - EventPattern=json.dumps(TEST_EVENT_PATTERN), - ) - aws_client.events.put_targets( - Rule=rule_name, - EventBusName=bus_name, - Targets=[{"Id": target_id, "Arn": queue_arn, "InputPath": "$.detail"}], +class TestEventsInputPath: + @markers.aws.validated + def test_put_events_with_input_path(self, put_events_with_filter_to_sqs, snapshot): + entries1 = [ + { + "Source": TEST_EVENT_PATTERN["source"][0], + "DetailType": TEST_EVENT_PATTERN["detail-type"][0], + "Detail": json.dumps(EVENT_DETAIL), + } + ] + entries_asserts = [(entries1, True)] + messages = put_events_with_filter_to_sqs( + pattern=TEST_EVENT_PATTERN, + entries_asserts=entries_asserts, + input_path="$.detail", ) - aws_client.events.put_events( - Entries=[ - { - "EventBusName": bus_name, - "Source": TEST_EVENT_PATTERN["source"][0], - "DetailType": TEST_EVENT_PATTERN["detail-type"][0], - "Detail": json.dumps(EVENT_DETAIL), - } + snapshot.add_transformers_list( + [ + snapshot.transform.key_value("MD5OfBody"), + snapshot.transform.key_value("ReceiptHandle"), ] ) + snapshot.match("message", messages) - messages = sqs_collect_messages(aws_client, queue_url, min_events=1, retries=3) - assert json.loads(messages[0].get("Body")) == EVENT_DETAIL + @markers.aws.validated + @pytest.mark.parametrize("event_detail", [EVENT_DETAIL, EVENT_DETAIL_DUPLICATED_KEY]) + def test_put_events_with_input_path_nested( + self, event_detail, put_events_with_filter_to_sqs, snapshot + ): + entries1 = [ + { + "Source": TEST_EVENT_PATTERN["source"][0], + "DetailType": TEST_EVENT_PATTERN["detail-type"][0], + "Detail": json.dumps(event_detail), + } + ] + entries_asserts = [(entries1, True)] + messages = put_events_with_filter_to_sqs( + pattern=TEST_EVENT_PATTERN, + entries_asserts=entries_asserts, + input_path="$.detail.payload", + ) - aws_client.events.put_events( - Entries=[ - { - "EventBusName": bus_name, - "Source": "dummySource", - "DetailType": TEST_EVENT_PATTERN["detail-type"][0], - "Detail": json.dumps(EVENT_DETAIL), - } + snapshot.add_transformers_list( + [ + snapshot.transform.key_value("MD5OfBody"), + snapshot.transform.key_value("ReceiptHandle"), ] ) + snapshot.match("message", messages) - messages = sqs_collect_messages(aws_client, queue_url, min_events=0, retries=1, wait_time=3) - assert messages == [] - - # clean up - clean_up(bus_name=bus_name, rule_name=rule_name, target_ids=target_id, queue_url=queue_url) - - @markers.aws.unknown - @pytest.mark.skipif(is_v2_provider(), reason="V2 provider does not support this feature yet") - def test_put_events_with_input_path_multiple(self, aws_client, clean_up): - queue_name = "queue-{}".format(short_uid()) - queue_name_1 = "queue-{}".format(short_uid()) - rule_name = "rule-{}".format(short_uid()) - target_id = "target-{}".format(short_uid()) - target_id_1 = "target-{}".format(short_uid()) - bus_name = "bus-{}".format(short_uid()) + @markers.aws.validated + def test_put_events_with_input_path_max_level_depth( + self, put_events_with_filter_to_sqs, snapshot + ): + entries1 = [ + { + "Source": TEST_EVENT_PATTERN["source"][0], + "DetailType": TEST_EVENT_PATTERN["detail-type"][0], + "Detail": json.dumps(EVENT_DETAIL), + } + ] + entries_asserts = [(entries1, True)] + messages = put_events_with_filter_to_sqs( + pattern=TEST_EVENT_PATTERN, + entries_asserts=entries_asserts, + input_path="$.detail.payload.sf_id", + ) - queue_url = aws_client.sqs.create_queue(QueueName=queue_name)["QueueUrl"] - queue_arn = arns.sqs_queue_arn(queue_name, TEST_AWS_ACCOUNT_ID, TEST_AWS_REGION_NAME) + snapshot.add_transformers_list( + [ + snapshot.transform.key_value("MD5OfBody"), + snapshot.transform.key_value("ReceiptHandle"), + ] + ) + snapshot.match("message", messages) - queue_url_1 = aws_client.sqs.create_queue(QueueName=queue_name_1)["QueueUrl"] - queue_arn_1 = arns.sqs_queue_arn(queue_name_1, TEST_AWS_ACCOUNT_ID, TEST_AWS_REGION_NAME) + @markers.aws.validated + def test_put_events_with_input_path_multiple_targets( + self, + aws_client, + create_sqs_events_target, + events_create_event_bus, + events_put_rule, + snapshot, + ): + # prepare target queues + queue_url_1, queue_arn_1 = create_sqs_events_target() + queue_url_2, queue_arn_2 = create_sqs_events_target() - aws_client.events.create_event_bus(Name=bus_name) + bus_name = f"test-bus-{short_uid()}" + events_create_event_bus(Name=bus_name) - aws_client.events.put_rule( + rule_name = f"test-rule-{short_uid()}" + events_put_rule( Name=rule_name, EventBusName=bus_name, EventPattern=json.dumps(TEST_EVENT_PATTERN), ) + target_id_1 = f"target-{short_uid()}" + target_id_2 = f"target-{short_uid()}" aws_client.events.put_targets( Rule=rule_name, EventBusName=bus_name, Targets=[ - {"Id": target_id, "Arn": queue_arn, "InputPath": "$.detail"}, - { - "Id": target_id_1, - "Arn": queue_arn_1, - }, + {"Id": target_id_1, "Arn": queue_arn_1, "InputPath": "$.detail"}, + {"Id": target_id_2, "Arn": queue_arn_2}, ], ) @@ -115,35 +139,17 @@ def test_put_events_with_input_path_multiple(self, aws_client, clean_up): ] ) - messages = sqs_collect_messages(aws_client, queue_url, min_events=1, retries=3) - assert len(messages) == 1 - assert json.loads(messages[0].get("Body")) == EVENT_DETAIL + messages_queue_1 = sqs_collect_messages(aws_client, queue_url_1, min_events=1, retries=3) + messages_queue_2 = sqs_collect_messages(aws_client, queue_url_2, min_events=1, retries=3) - messages = sqs_collect_messages(aws_client, queue_url_1, min_events=1, retries=3) - assert len(messages) == 1 - assert json.loads(messages[0].get("Body")).get("detail") == EVENT_DETAIL - - aws_client.events.put_events( - Entries=[ - { - "EventBusName": bus_name, - "Source": "dummySource", - "DetailType": TEST_EVENT_PATTERN["detail-type"][0], - "Detail": json.dumps(EVENT_DETAIL), - } + snapshot.add_transformers_list( + [ + snapshot.transform.key_value("MD5OfBody"), + snapshot.transform.key_value("ReceiptHandle"), ] ) - - messages = sqs_collect_messages(aws_client, queue_url, min_events=0, retries=1, wait_time=3) - assert messages == [] - - # clean up - clean_up( - bus_name=bus_name, - rule_name=rule_name, - target_ids=[target_id, target_id_1], - queue_url=queue_url, - ) + snapshot.match("message-queue-1", messages_queue_1) + snapshot.match("message-queue-2", messages_queue_2) class TestEventsInputTransformers: diff --git a/tests/aws/services/events/test_events_inputs.snapshot.json b/tests/aws/services/events/test_events_inputs.snapshot.json index 3c89c16dc138d..c282663105c32 100644 --- a/tests/aws/services/events/test_events_inputs.snapshot.json +++ b/tests/aws/services/events/test_events_inputs.snapshot.json @@ -19,5 +19,131 @@ } ] } + }, + "tests/aws/services/events/test_events_inputs.py::TestEventsInputPath::test_put_events_with_input_path": { + "recorded-date": "08-05-2024, 13:54:10", + "recorded-content": { + "message": [ + { + "MessageId": "", + "ReceiptHandle": "", + "MD5OfBody": "", + "Body": { + "command": "update-account", + "payload": { + "acc_id": "0a787ecb-4015", + "sf_id": "baz" + } + } + } + ] + } + }, + "tests/aws/services/events/test_events_inputs.py::TestEventsInputPath::test_put_events_with_input_path_nested": { + "recorded-date": "06-05-2024, 15:11:52", + "recorded-content": { + "message": [ + { + "MessageId": "", + "ReceiptHandle": "", + "MD5OfBody": "", + "Body": { + "acc_id": "0a787ecb-4015", + "sf_id": "baz" + } + } + ] + } + }, + "tests/aws/services/events/test_events_inputs.py::TestEventsInputPath::test_put_events_with_input_path_max_level_depth": { + "recorded-date": "06-05-2024, 15:11:54", + "recorded-content": { + "message": [ + { + "MessageId": "", + "ReceiptHandle": "", + "MD5OfBody": "", + "Body": "\"baz\"" + } + ] + } + }, + "tests/aws/services/events/test_events_inputs.py::TestEventsInputPath::test_put_events_with_input_path_multiple_targets": { + "recorded-date": "06-05-2024, 15:22:58", + "recorded-content": { + "message-queue-1": [ + { + "MessageId": "", + "ReceiptHandle": "", + "MD5OfBody": "", + "Body": { + "command": "update-account", + "payload": { + "acc_id": "0a787ecb-4015", + "sf_id": "baz" + } + } + } + ], + "message-queue-2": [ + { + "MessageId": "", + "ReceiptHandle": "", + "MD5OfBody": "", + "Body": { + "version": "0", + "id": "", + "detail-type": "core.update-account-command", + "source": "core.update-account-command", + "account": "111111111111", + "time": "date", + "region": "", + "resources": [], + "detail": { + "command": "update-account", + "payload": { + "acc_id": "0a787ecb-4015", + "sf_id": "baz" + } + } + } + } + ] + } + }, + "tests/aws/services/events/test_events_inputs.py::TestEventsInputPath::test_put_events_with_input_path_nested[event_detail0]": { + "recorded-date": "08-05-2024, 13:54:42", + "recorded-content": { + "message": [ + { + "MessageId": "", + "ReceiptHandle": "", + "MD5OfBody": "", + "Body": { + "acc_id": "0a787ecb-4015", + "sf_id": "baz" + } + } + ] + } + }, + "tests/aws/services/events/test_events_inputs.py::TestEventsInputPath::test_put_events_with_input_path_nested[event_detail1]": { + "recorded-date": "08-05-2024, 13:54:44", + "recorded-content": { + "message": [ + { + "MessageId": "", + "ReceiptHandle": "", + "MD5OfBody": "", + "Body": { + "acc_id": "0a787ecb-4015", + "payload": { + "message": "baz", + "id": "123" + } + } + } + ] + } } } diff --git a/tests/aws/services/events/test_events_inputs.validation.json b/tests/aws/services/events/test_events_inputs.validation.json index 61a8dd3b9cb4f..6b53b82b29016 100644 --- a/tests/aws/services/events/test_events_inputs.validation.json +++ b/tests/aws/services/events/test_events_inputs.validation.json @@ -1,4 +1,22 @@ { + "tests/aws/services/events/test_events_inputs.py::TestEventsInputPath::test_put_events_with_input_path": { + "last_validated_date": "2024-05-08T13:54:10+00:00" + }, + "tests/aws/services/events/test_events_inputs.py::TestEventsInputPath::test_put_events_with_input_path_max_level_depth": { + "last_validated_date": "2024-05-06T15:11:54+00:00" + }, + "tests/aws/services/events/test_events_inputs.py::TestEventsInputPath::test_put_events_with_input_path_multiple_targets": { + "last_validated_date": "2024-05-06T15:22:58+00:00" + }, + "tests/aws/services/events/test_events_inputs.py::TestEventsInputPath::test_put_events_with_input_path_nested": { + "last_validated_date": "2024-05-06T15:11:52+00:00" + }, + "tests/aws/services/events/test_events_inputs.py::TestEventsInputPath::test_put_events_with_input_path_nested[event_detail0]": { + "last_validated_date": "2024-05-08T13:54:42+00:00" + }, + "tests/aws/services/events/test_events_inputs.py::TestEventsInputPath::test_put_events_with_input_path_nested[event_detail1]": { + "last_validated_date": "2024-05-08T13:54:44+00:00" + }, "tests/aws/services/events/test_events_inputs.py::TestEventsInputTransformers::test_put_events_with_input_transformation_to_sqs": { "last_validated_date": "2024-03-26T15:48:35+00:00" } diff --git a/tests/aws/services/events/test_events_integrations.py b/tests/aws/services/events/test_events_integrations.py index 9c8659322d498..609cb09b2e218 100644 --- a/tests/aws/services/events/test_events_integrations.py +++ b/tests/aws/services/events/test_events_integrations.py @@ -634,9 +634,8 @@ def check_stream_status(): assert_valid_event(data) -@markers.aws.unknown +@markers.aws.needs_fixing # TODO: Reason add permission and correct policies @pytest.mark.parametrize("strategy", ["standard", "domain", "path"]) -@pytest.mark.skipif(is_v2_provider(), reason="V2 provider does not support this feature yet") def test_trigger_event_on_ssm_change(monkeypatch, aws_client, clean_up, strategy): monkeypatch.setattr(config, "SQS_ENDPOINT_STRATEGY", strategy)