Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Eventbridge v2: Add input path #10733

Merged
merged 9 commits into from May 13, 2024
19 changes: 18 additions & 1 deletion localstack/services/events/models.py
@@ -1,12 +1,14 @@
from dataclasses import dataclass, field
from typing import Optional
from typing import Optional, TypeAlias, TypedDict

from localstack.aws.api.core import ServiceException
from localstack.aws.api.events import (
Arn,
CreatedBy,
EventBusName,
EventPattern,
EventResourceList,
EventSourceName,
ManagedBy,
RoleArn,
RuleDescription,
Expand Down Expand Up @@ -102,3 +104,18 @@ class InvalidEventPatternException(Exception):
def __init__(self, reason=None, message=None) -> None:
self.reason = reason
self.message = message or f"Event pattern is not valid. Reason: {reason}"


class FormattedEvent(TypedDict):
version: str
id: str
detail_type: Optional[str] # key "detail-type" is automatically interpreted as detail_type
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maxhoheiser This type definition allows both detail-type and detail_type, but we should restrict it to the hyphen-only variant.

Python supports an alternative syntax for defining TypedDicts, which is recommended in such cases:

The functional syntax should also be used when any of the keys are not valid identifiers, for example because they are keywords or contain hyphens.
https://docs.python.org/3/library/typing.html#typing.TypedDict

All credits go to @dfangl for this suggestion 💯

source: Optional[EventSourceName]
account: str
time: str
region: str
resources: Optional[EventResourceList]
detail: dict[str, str | dict]


TransformedEvent: TypeAlias = FormattedEvent | dict | str
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can understand why we end up such a flexible type definition given the many possibilities using input templates. For readers, this type definition is quite confusing. Consider adding a comment for now.
Maybe, we find something clearer in the future to clarify the type after each step without too many XORs (especially in methods).

5 changes: 3 additions & 2 deletions localstack/services/events/provider.py
Expand Up @@ -59,6 +59,7 @@
EventBus,
EventBusDict,
EventsStore,
FormattedEvent,
Rule,
RuleDict,
TargetDict,
Expand Down Expand Up @@ -125,7 +126,7 @@ def validate_event(event: PutEventsRequestEntry) -> None | PutEventsResultEntry:
}


def format_event(event: PutEventsRequestEntry, region: str, account_id: str) -> dict:
def format_event(event: PutEventsRequestEntry, region: str, account_id: str) -> FormattedEvent:
# See https://docs.aws.amazon.com/AmazonS3/latest/userguide/ev-events.html
formatted_event = {
"version": "0",
Expand Down Expand Up @@ -685,7 +686,7 @@ def _process_entries(
for target in rule.targets.values():
target_sender = self._target_sender_store[target["Arn"]]
try:
target_sender.send_event(event)
target_sender.process_event(event)
processed_entries.append({"EventId": event["id"]})
except Exception as error:
processed_entries.append(
Expand Down
19 changes: 17 additions & 2 deletions localstack/services/events/target.py
Expand Up @@ -7,23 +7,32 @@

from localstack.aws.api.events import (
Arn,
PutEventsRequestEntry,
Target,
TargetInputPath,
)
from localstack.aws.connect import connect_to
from localstack.services.events.models import FormattedEvent, TransformedEvent
from localstack.utils import collections
from localstack.utils.aws.arns import (
extract_service_from_arn,
firehose_name,
sqs_queue_url_for_arn,
)
from localstack.utils.aws.client_types import ServicePrincipal
from localstack.utils.json import extract_jsonpath
from localstack.utils.strings import to_bytes
from localstack.utils.time import now_utc

LOG = logging.getLogger(__name__)


def transform_event_with_target_input_path(
input_path: TargetInputPath, event: FormattedEvent
) -> TransformedEvent:
formatted_event = extract_jsonpath(event, input_path)
return formatted_event


class TargetSender(ABC):
def __init__(
self,
Expand Down Expand Up @@ -54,9 +63,15 @@ def client(self):
return self._client

@abstractmethod
def send_event(self, event: PutEventsRequestEntry):
def send_event(self, event: FormattedEvent | TransformedEvent):
pass

def process_event(self, event: FormattedEvent):
"""Processes the event and send it to the target."""
if input_path := self.target.get("InputPath"):
event = transform_event_with_target_input_path(input_path, event)
self.send_event(event)

def _validate_input(self, target: Target):
"""Provide a default implementation that does nothing if no specific validation is needed."""
# TODO add For Lambda and Amazon SNS resources, EventBridge relies on resource-based policies.
Expand Down
54 changes: 34 additions & 20 deletions tests/aws/services/events/conftest.py
Expand Up @@ -194,23 +194,12 @@ def _delete_log_group():


@pytest.fixture
def put_events_with_filter_to_sqs(aws_client, sqs_get_queue_arn, clean_up):
def create_sqs_events_target(aws_client, sqs_get_queue_arn):
queue_urls = []
event_bus_names = []
rule_names = []
target_ids = []

def _put_events_with_filter_to_sqs(
pattern: dict,
entries_asserts: list[Tuple[list[dict], bool]],
input_path: str = None,
input_transformer: dict[dict, str] = None,
):
queue_name = f"queue-{short_uid()}"
rule_name = f"rule-{short_uid()}"
target_id = f"target-{short_uid()}"
bus_name = f"bus-{short_uid()}"

def _create_sqs_events_target(queue_name: str | None = None) -> tuple[str, str]:
if not queue_name:
queue_name = f"tests-queue-{short_uid()}"
sqs_client = aws_client.sqs
queue_url = sqs_client.create_queue(QueueName=queue_name)["QueueUrl"]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This on purpose does not use the sqs_create_queue fixture here, since it would need to be moved to helper functions because no cleanup and thus no factory would be required, which in turn would not valid this to be a fixture. This would in turn require passing in the aws_client as input.
I believe it is easier to read if the queue is created with the boot client here directly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, fair point 👍

Why is no cleanup required here? Wouldn't the factory remove the need for an explicit cleanup after the yield here? It might have to do with the proper cleanup order 🤔

queue_urls.append(queue_url)
Expand All @@ -231,6 +220,34 @@ def _put_events_with_filter_to_sqs(
sqs_client.set_queue_attributes(
QueueUrl=queue_url, Attributes={"Policy": json.dumps(policy)}
)
return queue_url, queue_arn

yield _create_sqs_events_target

for queue_url in queue_urls:
try:
aws_client.sqs.delete_queue(QueueUrl=queue_url)
except Exception as e:
LOG.debug("error cleaning up queue %s: %s", queue_url, e)


@pytest.fixture
def put_events_with_filter_to_sqs(aws_client, create_sqs_events_target, clean_up):
event_bus_names = []
rule_names = []
target_ids = []

def _put_events_with_filter_to_sqs(
pattern: dict,
entries_asserts: list[Tuple[list[dict], bool]],
input_path: str = None,
input_transformer: dict[dict, str] = None,
):
rule_name = f"test-rule-{short_uid()}"
target_id = f"test-target-{short_uid()}"
bus_name = f"test-bus-{short_uid()}"

queue_url, queue_arn = create_sqs_events_target()

events_client = aws_client.events
events_client.create_event_bus(Name=bus_name)
Expand Down Expand Up @@ -262,7 +279,7 @@ def _put_events_with_filter_to_sqs(
entry["EventBusName"] = bus_name
message = _put_entries_assert_results_sqs(
events_client,
sqs_client,
aws_client.sqs,
queue_url,
entries=entries,
should_match=entry_asserts[1],
Expand All @@ -274,14 +291,11 @@ def _put_events_with_filter_to_sqs(

yield _put_events_with_filter_to_sqs

for queue_url, event_bus_name, rule_name, target_id in zip(
queue_urls, event_bus_names, rule_names, target_ids
):
for event_bus_name, rule_name, target_id in zip(event_bus_names, rule_names, target_ids):
clean_up(
bus_name=event_bus_name,
rule_name=rule_name,
target_ids=target_id,
queue_url=queue_url,
)


Expand Down