Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Eventbridge v2: Add pattern matching #10664

Merged
merged 28 commits into from May 7, 2024
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
5c4a8fd
feat: add invalid event pattern exeption
maxhoheiser Apr 25, 2024
71dc9b3
feat: add pattern loader to pattern dict
maxhoheiser Apr 15, 2024
99841de
feat: add validate event pattern
maxhoheiser Apr 15, 2024
10c7b4a
feat: add filter event based on rule pattern
maxhoheiser Apr 15, 2024
8dc03db
feat: remove skip marker from passing tests targets
maxhoheiser Apr 15, 2024
6cc9171
feat: add comment about alphabetical order for target workers
maxhoheiser Apr 15, 2024
5360ecf
feat: remove skip marker from passing tests events
maxhoheiser Apr 15, 2024
f6ebd00
feat: remove skip marker from passing tets rules
maxhoheiser Apr 15, 2024
b663463
feat: add api test event pattern
maxhoheiser Apr 16, 2024
bf850f2
feat: remove skip marker from passing tets pattern
maxhoheiser Apr 15, 2024
b42c2e4
feat: test add snapshot to sqs target
maxhoheiser Apr 26, 2024
a93c0f9
refactor: rename process entry function
maxhoheiser Apr 29, 2024
4066cfd
fix: use api stub for pattern exception
maxhoheiser Apr 29, 2024
ede757d
feat: add experimental rule matching
maxhoheiser Apr 29, 2024
28354be
feat: set failed entry count directly
maxhoheiser Apr 29, 2024
f053084
feat: validate input pattern tests
maxhoheiser Apr 29, 2024
084c866
feat: clean test put events with time
maxhoheiser Apr 29, 2024
f86c5c6
feat: validate test put event without source
maxhoheiser Apr 29, 2024
e7fa9e1
refactor: events tests
maxhoheiser Apr 29, 2024
bad5497
fix: use dynamic event bus name
maxhoheiser Apr 29, 2024
989becd
feat: move test event pattern to correct test file
maxhoheiser Apr 29, 2024
3a99a86
feat: use only java pattern matching engine
maxhoheiser May 6, 2024
ca62eb2
feat: remove java engin env variable check
maxhoheiser May 6, 2024
55eb484
feat: add event validation
maxhoheiser May 6, 2024
12efbac
feat: skip unvalidated failing test
maxhoheiser May 6, 2024
b011d8a
fix: skip tests not supported by v1 provider
maxhoheiser May 6, 2024
81d63c0
refactor: reuse existing error type definition
maxhoheiser May 7, 2024
aae673a
feat: update test markers
maxhoheiser May 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 0 additions & 3 deletions localstack/services/events/event_ruler.py
Expand Up @@ -3,7 +3,6 @@
from functools import cache
from pathlib import Path

from localstack import config
from localstack.services.events.packages import event_ruler_package
from localstack.services.events.utils import InvalidEventPatternException
from localstack.utils.objects import singleton_factory
Expand Down Expand Up @@ -43,8 +42,6 @@ def matches_rule(event: str, rule: str) -> bool:
There is a single static boolean method Ruler.matchesRule(event, rule) -
both arguments are provided as JSON strings.
"""
if config.EVENT_RULE_ENGINE != "java":
maxhoheiser marked this conversation as resolved.
Show resolved Hide resolved
raise NotImplementedError("Set EVENT_RULE_ENGINE=java to enable the Java Event Ruler.")

start_jvm()
import jpype.imports # noqa F401: required for importing Java modules
Expand Down
8 changes: 8 additions & 0 deletions localstack/services/events/models_v2.py
Expand Up @@ -94,3 +94,11 @@ class ValidationException(ServiceException):
code: str = "ValidationException"
sender_fault: bool = True
status_code: int = 400


class InternalInvalidEventPatternException(Exception):
maxhoheiser marked this conversation as resolved.
Show resolved Hide resolved
reason: str

def __init__(self, reason=None, message=None) -> None:
self.reason = reason
self.message = message or f"Event pattern is not valid. Reason: {reason}"
134 changes: 114 additions & 20 deletions localstack/services/events/provider_v2.py
@@ -1,5 +1,7 @@
import base64
import json
import logging
from datetime import datetime, timezone
from typing import Optional

from localstack.aws.api import RequestContext, handler
Expand All @@ -16,14 +18,18 @@
EventPattern,
EventsApi,
EventSourceName,
InvalidEventPatternException,
LimitMax100,
ListEventBusesResponse,
ListRuleNamesByTargetResponse,
ListRulesResponse,
ListTargetsByRuleResponse,
NextToken,
PutEventsRequestEntry,
PutEventsRequestEntryList,
PutEventsResponse,
PutEventsResultEntry,
PutEventsResultEntryList,
PutPartnerEventsRequestEntryList,
PutPartnerEventsResponse,
PutRuleResponse,
Expand All @@ -43,14 +49,17 @@
TargetId,
TargetIdList,
TargetList,
TestEventPatternResponse,
)
from localstack.aws.api.events import EventBus as ApiTypeEventBus
from localstack.aws.api.events import Rule as ApiTypeRule
from localstack.services.events.event_bus import EventBusService, EventBusServiceDict
from localstack.services.events.event_ruler import matches_rule
from localstack.services.events.models_v2 import (
EventBus,
EventBusDict,
EventsStore,
InternalInvalidEventPatternException,
Rule,
RuleDict,
TargetDict,
Expand All @@ -60,6 +69,7 @@
from localstack.services.events.rule import RuleService, RuleServiceDict
from localstack.services.events.target import TargetSender, TargetSenderDict, TargetSenderFactory
from localstack.services.plugins import ServiceLifecycleHook
from localstack.utils.strings import long_uid

LOG = logging.getLogger(__name__)

Expand All @@ -79,6 +89,57 @@ def get_filtered_dict(name_prefix: str, input_dict: dict) -> dict:
return {name: value for name, value in input_dict.items() if name.startswith(name_prefix)}


def get_event_time(event: PutEventsRequestEntry) -> str:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: the event is not needed for a time conversion. Passing minimal knowledge is considered best practice. Logging could still happen at a different place.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true, but in this case the function not only serves to convert time but also conditionally set the time based if the time filed is set in events or not.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, then the naming get_event_time is a bit misleading here. Probably worth testing/refactoring in a follow-up.

event_time = datetime.now(timezone.utc)
if event_timestamp := event.get("Time"):
try:
# use time from event if provided
event_time = event_timestamp.replace(tzinfo=timezone.utc)
except ValueError:
# use current time if event time is invalid
LOG.debug(
"Could not parse the `Time` parameter, falling back to current time for the following Event: '%s'",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this validated AWS behavior? Do we want a fallback here or should we fail?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is from this fix in this pr: #8690
I cannot reproduce the bug since boto validates input and throws an error if it is not a string that can be parsed to a valid datetime object. I would keep it in for now, since it is mentioned that it has been an issue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Botocore validation can be disabled using parameter_validation=False (see botocore docs). If it's a known issue, it might be worth adding a test case in a follow-up.

event,
)
formatted_time_string = event_time.strftime("%Y-%m-%dT%H:%M:%SZ")
return formatted_time_string


def validate_event(event: PutEventsRequestEntry) -> None | PutEventsResultEntry:
if not event.get("Source"):
return {
"ErrorCode": "InvalidArgument",
"ErrorMessage": "Parameter Source is not valid. Reason: Source is a required argument.",
}
elif not event.get("DetailType"):
return {
"ErrorCode": "InvalidArgument",
"ErrorMessage": "Parameter DetailType is not valid. Reason: DetailType is a required argument.",
}
elif not event.get("Detail"):
return {
"ErrorCode": "InvalidArgument",
"ErrorMessage": "Parameter Detail is not valid. Reason: Detail is a required argument.",
}


def format_event(event: PutEventsRequestEntry, region: str, account_id: str) -> dict:
# See https://docs.aws.amazon.com/AmazonS3/latest/userguide/ev-events.html
formatted_event = {
"version": "0",
"id": str(long_uid()),
"detail-type": event.get("DetailType"),
"source": event.get("Source"),
"account": account_id,
"time": get_event_time(event),
"region": region,
"resources": event.get("Resources", []),
"detail": json.loads(event.get("Detail", "{}")),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the AWS docs, detail should always be a JSON object. Hence, I guess we can safely assume that other strings or base64-encoded content is not expected here, right?

detail — A JSON object that contains information about the event. For more information about what can be included in this field, see Event message detail field.

}

return formatted_event


class EventsProvider(EventsApi, ServiceLifecycleHook):
# api methods are grouped by resource type and sorted in hierarchical order
# each group is sorted alphabetically
Expand Down Expand Up @@ -303,6 +364,20 @@ def put_rule(
response = PutRuleResponse(RuleArn=rule_service.arn)
return response

@handler("TestEventPattern")
def test_event_pattern(
self, context: RequestContext, event_pattern: EventPattern, event: str, **kwargs
) -> TestEventPatternResponse:
"""Test event pattern uses EventBridge event pattern matching:
https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-patterns.html
"""
try:
result = matches_rule(event, event_pattern)
except InternalInvalidEventPatternException as e:
raise InvalidEventPatternException(e.message) from e

return TestEventPatternResponse(Result=result)

#########
# Targets
#########
Expand Down Expand Up @@ -345,7 +420,7 @@ def put_targets(
rule_service = self.get_rule_service(context, rule, event_bus_name)
failed_entries = rule_service.add_targets(targets)
rule_arn = rule_service.arn
for target in targets:
for target in targets: # TODO only add successful targets
self.create_target_sender(target, region, account_id, rule_arn)

response = PutTargetsResponse(
Expand Down Expand Up @@ -384,9 +459,12 @@ def put_events(
endpoint_id: EndpointId = None,
**kwargs,
) -> PutEventsResponse:
failed_entries = self._put_entries(context, entries)
entries, failed_entry_count = self._process_entries(context, entries)

response = PutEventsResponse(FailedEntryCount=len(failed_entries), Entries=failed_entries)
response = PutEventsResponse(
Entries=entries,
FailedEntryCount=failed_entry_count,
)
return response

@handler("PutPartnerEvents")
Expand Down Expand Up @@ -578,25 +656,41 @@ def _delete_target_sender(self, ids: TargetIdList, rule) -> None:
except KeyError:
LOG.error(f"Error deleting target service {target_arn}.")

def _put_entries(self, context: RequestContext, entries: PutEventsRequestEntryList) -> list:
failed_entries = []
def _process_entries(
self, context: RequestContext, entries: PutEventsRequestEntryList
) -> tuple[PutEventsResultEntryList, int]:
processed_entries = []
failed_entry_count = 0
for event in entries:
event_bus_name = event.get("EventBusName", "default")
if event_failed_validation := validate_event(event):
processed_entries.append(event_failed_validation)
failed_entry_count += 1
continue
event = format_event(event, context.region, context.account_id)
store = self.get_store(context)
event_bus = self.get_event_bus(event_bus_name, store)
# TODO add pattern matching
try:
event_bus = self.get_event_bus(event_bus_name, store)
except ResourceNotFoundException:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is validated AWS behavior - see tests test_put_events_nonexistent_event_bus

# ignore events for non-existing event buses but add processed event
processed_entries.append({"EventId": event["id"]})
continue
matching_rules = [rule for rule in event_bus.rules.values()]
for rule in matching_rules:
for target in rule.targets.values():
target_sender = self._target_sender_store[target["Arn"]]
try:
target_sender.send_event(event)
except Exception as error:
failed_entries.append(
{
"Entry": event,
"ErrorCode": "InternalException",
"ErrorMessage": str(error),
}
)
return failed_entries
event_pattern = rule.event_pattern
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: event_pattern_str or str typing could clarify the type here (especially given we have both versions in our codebase)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now for the v2 provider, since we use only the Java matching engine we only ever have event pattern as a string, we don't parse it into a JSON object. In addition, the API string type is called EventPattern, therefor I would keep it without the str suffix.
Also, I think the naming convention of match_rule is misleading since we don't match the rule but the rule_pattern and only pass in the pattern. We should think of updating this in the event_ruler.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now for the v2 provider, since we use only the Java matching engine we only ever have event pattern as a string, we don't parse it into a JSON object. In addition, the API string type is called EventPattern, therefor I would keep it without the str suffix.

It's a good thing we don't need JSON parsing for the event pattern anymore in the new provider, but right one line below we have the counter-example of transforming an event into an event_str 😬 . Looking over the many implementations in LocalStack, this str vs json duality was very confusing, and being explicit can help to clarify the API usage here.


How do you think does rule differ from rule_pattern?

the naming convention of match_rule is misleading since we don't match the rule but the rule_pattern and only pass in the pattern.

Background

The terminology Ruler.matchesRule(event, rule) in the event_ruler.py follows the official AWS event-ruler and aims to be more generic than just for EventBridge. Notice that different services use slightly different terminology:

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

json vs strg makes sense - the old fallacy if you build it you know it and think its obvious to everyone else

event_str = json.dumps(event)
if matches_rule(event_str, event_pattern):
for target in rule.targets.values():
target_sender = self._target_sender_store[target["Arn"]]
try:
target_sender.send_event(event)
processed_entries.append({"EventId": event["id"]})
except Exception as error:
processed_entries.append(
{
"ErrorCode": "InternalException",
"ErrorMessage": str(error),
}
)
failed_entry_count += 1
return processed_entries, failed_entry_count
6 changes: 5 additions & 1 deletion localstack/services/events/rule.py
Expand Up @@ -19,7 +19,11 @@
TargetIdList,
TargetList,
)
from localstack.services.events.models_v2 import Rule, TargetDict, ValidationException
from localstack.services.events.models_v2 import (
Rule,
TargetDict,
ValidationException,
)

TARGET_ID_REGEX = re.compile(r"^[\.\-_A-Za-z0-9]+$")
TARGET_ARN_REGEX = re.compile(r"arn:[\d\w:\-/]*")
Expand Down
6 changes: 5 additions & 1 deletion localstack/services/events/target.py
Expand Up @@ -7,6 +7,7 @@

from localstack.aws.api.events import (
Arn,
PutEventsRequestEntry,
Target,
)
from localstack.aws.connect import connect_to
Expand Down Expand Up @@ -53,7 +54,7 @@ def client(self):
return self._client

@abstractmethod
def send_event(self):
def send_event(self, event: PutEventsRequestEntry):
pass

def _validate_input(self, target: Target):
Expand Down Expand Up @@ -83,6 +84,8 @@ def _initialize_client(self) -> BaseClient:

TargetSenderDict = dict[Arn, TargetSender]

# Target Senders are ordered alphabetically by service name


class ApiGatewayTargetSender(TargetSender):
def send_event(self, event):
Expand Down Expand Up @@ -174,6 +177,7 @@ def send_event(self, event):

def _validate_input(self, target: Target):
super()._validate_input(target)
# TODO add validated test to check if RoleArn is mandatory
if not collections.get_safe(target, "$.RoleArn"):
raise ValueError("RoleArn is required for Kinesis target")
if not collections.get_safe(target, "$.KinesisParameters.PartitionKeyPath"):
Expand Down
49 changes: 45 additions & 4 deletions tests/aws/services/events/test_event_patterns.py
@@ -1,5 +1,6 @@
import json
import os
from datetime import datetime
from pathlib import Path
from typing import List, Tuple

Expand All @@ -8,7 +9,6 @@

from localstack.testing.aws.util import is_aws_cloud
from localstack.testing.pytest import markers
from tests.aws.services.events.helper_functions import is_v2_provider

THIS_FOLDER: str = os.path.dirname(os.path.realpath(__file__))
REQUEST_TEMPLATE_DIR = os.path.join(THIS_FOLDER, "event_pattern_templates")
Expand Down Expand Up @@ -80,7 +80,6 @@ def list_files_with_suffix(directory_path: str, suffix: str) -> List[str]:
# TODO: extend these test cases based on the open source docs + tests: https://github.com/aws/event-ruler
# For example, "JSON Array Matching", "And and Or Relationship among fields with Ruler", rule validation,
# and exception handling.
@pytest.mark.skipif(is_v2_provider(), reason="V2 provider does not support this feature yet")
@pytest.mark.parametrize(
"request_template,label", request_template_tuples, ids=[t[1] for t in request_template_tuples]
)
Expand Down Expand Up @@ -118,7 +117,6 @@ def test_test_event_pattern(aws_client, snapshot, request_template, label):
assert response["Result"]


@pytest.mark.skipif(is_v2_provider(), reason="V2 provider does not support this feature yet")
@markers.aws.validated
def test_test_event_pattern_with_multi_key(aws_client):
"""Test the special case of a duplicate JSON key separately because it requires working around the
Expand All @@ -140,7 +138,6 @@ def test_test_event_pattern_with_multi_key(aws_client):
assert response["Result"]


@pytest.mark.skipif(is_v2_provider(), reason="V2 provider does not support this feature yet")
@markers.aws.validated
def test_test_event_pattern_with_escape_characters(aws_client):
r"""Test the special case of using escape characters separately because it requires working around JSON escaping.
Expand All @@ -159,3 +156,47 @@ def test_test_event_pattern_with_escape_characters(aws_client):
EventPattern=event_pattern,
)
assert response["Result"]


@markers.aws.validated
def test_event_pattern_source(aws_client, snapshot, account_id, region_name):
response = aws_client.events.test_event_pattern(
Event=json.dumps(
{
"id": "1",
"source": "order",
"detail-type": "Test",
"account": account_id,
"region": region_name,
"time": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"),
}
),
EventPattern=json.dumps(
{
"source": ["order"],
"detail-type": ["Test"],
}
),
)
snapshot.match("eventbridge-test-event-pattern-response", response)

# negative test, source is not matched
response = aws_client.events.test_event_pattern(
Event=json.dumps(
{
"id": "1",
"source": "order",
"detail-type": "Test",
"account": account_id,
"region": region_name,
"time": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"),
}
),
EventPattern=json.dumps(
{
"source": ["shipment"],
"detail-type": ["Test"],
}
),
)
snapshot.match("eventbridge-test-event-pattern-response-no-match", response)