Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement ThreadPoolExecutor publisher pattern for EventBridge (WIP) #10260

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
214 changes: 107 additions & 107 deletions localstack/services/events/provider.py
Expand Up @@ -41,14 +41,14 @@
from localstack.http import route
from localstack.services.edge import ROUTER
from localstack.services.events.models import EventsStore, events_stores
from localstack.services.events.publisher import EventTargetPublisher
from localstack.services.events.scheduler import JobScheduler
from localstack.services.moto import call_moto
from localstack.services.plugins import ServiceLifecycleHook
from localstack.utils.aws.arns import event_bus_arn, parse_arn
from localstack.utils.aws.client_types import ServicePrincipal
from localstack.utils.aws.message_forwarding import send_event_to_target
from localstack.utils.collections import pick_attributes
from localstack.utils.common import TMP_FILES, mkdir, save_file, truncate
from localstack.utils.common import TMP_FILES, mkdir, save_file
from localstack.utils.json import extract_jsonpath
from localstack.utils.strings import long_uid, short_uid
from localstack.utils.time import TIMESTAMP_FORMAT_TZ, timestamp
Expand All @@ -71,7 +71,8 @@ class ValidationException(ServiceException):

class EventsProvider(EventsApi, ServiceLifecycleHook):
def __init__(self):
apply_patches()
self._publisher = EventTargetPublisher()
apply_patches(self._publisher)

def on_after_init(self):
ROUTER.add(self.trigger_scheduled_rule)
Expand All @@ -81,6 +82,7 @@ def on_before_start(self):

def on_before_stop(self):
JobScheduler.shutdown()
self._publisher.shutdown()

@route("/_aws/events/rules/<path:rule_arn>/trigger")
def trigger_scheduled_rule(self, request: Request, rule_arn: str):
Expand Down Expand Up @@ -119,8 +121,8 @@ def test_event_pattern(
return TestEventPatternResponse(Result=False)
return TestEventPatternResponse(Result=True)

@staticmethod
def get_scheduled_rule_func(
self,
store: EventsStore,
rule_name: RuleName,
event_bus_name_or_arn: Optional[EventBusNameOrArn] = None,
Expand Down Expand Up @@ -168,23 +170,15 @@ def func(*args, **kwargs):

attr = pick_attributes(target, ["$.SqsParameters", "$.KinesisParameters"])

try:
send_event_to_target(
arn,
event,
target_attributes=attr,
role=target.get("RoleArn"),
target=target,
source_arn=rule.arn,
source_service=ServicePrincipal.events,
)
except Exception as e:
LOG.info(
"Unable to send event notification %s to target %s: %s",
truncate(event),
target,
e,
)
self._publisher.send_event_to_target(
arn,
event,
target_attributes=attr,
role=target.get("RoleArn"),
target=target,
source_arn=rule.arn,
source_service=ServicePrincipal.events,
)

return func

Expand Down Expand Up @@ -218,8 +212,8 @@ def convert_schedule_to_cron(schedule):
raise ValueError("Unable to parse events schedule expression: %s" % schedule)
return schedule

@staticmethod
def put_rule_job_scheduler(
self,
store: EventsStore,
name: Optional[RuleName],
state: Optional[RuleState],
Expand All @@ -235,7 +229,7 @@ def put_rule_job_scheduler(
LOG.error("Error parsing schedule expression: %s", e)
raise ValidationException("Parameter ScheduleExpression is not valid.") from e

job_func = EventsProvider.get_scheduled_rule_func(
job_func = self.get_scheduled_rule_func(
store, name, event_bus_name_or_arn=event_bus_name_or_arn
)
LOG.debug("Adding new scheduled Events rule with cron schedule %s", cron)
Expand Down Expand Up @@ -622,16 +616,27 @@ def process_event_with_input_transformer(input_transformer: Dict, event: Dict) -
return templated_event


def process_events(event: Dict, targets: list[Dict]):
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The diff, especially around here, is a little messy-looking but it's mainly creating class PutEventsHandler that is callable, the __call__ method being the same implementation as events_handler_put_events. This class also absorbs some functions into internal methods, mainly because they're only used and needed for the handler's sake. Implementation details should remain relatively the same, save for the executor-publisher being involved instead of directly calling send_event_to_target

for target in targets:
arn = target["Arn"]
changed_event = filter_event_with_target_input_path(target, event)
if input_transformer := target.get("InputTransformer"):
changed_event = process_event_with_input_transformer(input_transformer, changed_event)
if target.get("Input"):
changed_event = json.loads(target.get("Input"))
try:
send_event_to_target(
def get_event_bus_name(event_bus_name_or_arn: Optional[EventBusNameOrArn] = None) -> str:
event_bus_name_or_arn = event_bus_name_or_arn or DEFAULT_EVENT_BUS_NAME
return event_bus_name_or_arn.split("/")[-1]


class PutEventsHandler:
def __init__(self, publisher: EventTargetPublisher):
self.publisher = publisher

def process_events(self, event: Dict, targets: list[Dict]):
for target in targets:
arn = target["Arn"]
changed_event = filter_event_with_target_input_path(target, event)
if input_transformer := target.get("InputTransformer"):
changed_event = process_event_with_input_transformer(
input_transformer, changed_event
)
if target.get("Input"):
changed_event = json.loads(target.get("Input"))

self.publisher.send_event_to_target(
arn,
changed_event,
pick_attributes(target, ["$.SqsParameters", "$.KinesisParameters"]),
Expand All @@ -640,89 +645,84 @@ def process_events(event: Dict, targets: list[Dict]):
source_service=ServicePrincipal.events,
source_arn=target.get("RuleArn"),
)
except Exception as e:
LOG.info(f"Unable to send event notification {truncate(event)} to target {target}: {e}")

# specific logic for put_events which forwards matching events to target listeners
def __call__(handler_self, self):
entries = self._get_param("Entries")

def get_event_bus_name(event_bus_name_or_arn: Optional[EventBusNameOrArn] = None) -> str:
event_bus_name_or_arn = event_bus_name_or_arn or DEFAULT_EVENT_BUS_NAME
return event_bus_name_or_arn.split("/")[-1]
# keep track of events for local integration testing
if config.is_local_test_mode():
TEST_EVENTS_CACHE.extend(entries)

events = list(map(lambda event: {"event": event, "uuid": str(long_uid())}, entries))

# specific logic for put_events which forwards matching events to target listeners
def events_handler_put_events(self):
entries = self._get_param("Entries")

# keep track of events for local integration testing
if config.is_local_test_mode():
TEST_EVENTS_CACHE.extend(entries)

events = list(map(lambda event: {"event": event, "uuid": str(long_uid())}, entries))

_dump_events_to_files(events)

for event_envelope in events:
event = event_envelope["event"]
event_bus_name = get_event_bus_name(event.get("EventBusName"))
event_bus = self.events_backend.event_buses.get(event_bus_name)
if not event_bus:
continue

matching_rules = [
r
for r in event_bus.rules.values()
if r.event_bus_name == event_bus_name and not r.scheduled_expression
]
if not matching_rules:
continue

event_time = datetime.datetime.utcnow()
if event_timestamp := event.get("Time"):
try:
# if provided, use the time from event
event_time = datetime.datetime.utcfromtimestamp(event_timestamp)
except ValueError:
# if we can't parse it, pass and keep using `utcnow`
LOG.debug(
"Could not parse the `Time` parameter, falling back to `utcnow` for the following Event: '%s'",
event,
)
_dump_events_to_files(events)

# See https://docs.aws.amazon.com/AmazonS3/latest/userguide/ev-events.html
formatted_event = {
"version": "0",
"id": event_envelope["uuid"],
"detail-type": event.get("DetailType"),
"source": event.get("Source"),
"account": self.current_account,
"time": event_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"region": self.region,
"resources": event.get("Resources", []),
"detail": json.loads(event.get("Detail", "{}")),
}
for event_envelope in events:
event = event_envelope["event"]
event_bus_name = get_event_bus_name(event.get("EventBusName"))
event_bus = self.events_backend.event_buses.get(event_bus_name)
if not event_bus:
continue

targets = []
for rule in matching_rules:
if filter_event_based_on_event_format(self, rule.name, event_bus_name, formatted_event):
rule_targets = self.events_backend.list_targets_by_rule(
rule.name, event_bus_arn(event_bus_name, self.current_account, self.region)
).get("Targets", [])
matching_rules = [
r
for r in event_bus.rules.values()
if r.event_bus_name == event_bus_name and not r.scheduled_expression
]
if not matching_rules:
continue

targets.extend([{"RuleArn": rule.arn} | target for target in rule_targets])
# process event
process_events(formatted_event, targets)
event_time = datetime.datetime.utcnow()
if event_timestamp := event.get("Time"):
try:
# if provided, use the time from event
event_time = datetime.datetime.utcfromtimestamp(event_timestamp)
except ValueError:
# if we can't parse it, pass and keep using `utcnow`
LOG.debug(
"Could not parse the `Time` parameter, falling back to `utcnow` for the following Event: '%s'",
event,
)

content = {
"FailedEntryCount": 0, # TODO: dynamically set proper value when refactoring
"Entries": list(map(lambda event: {"EventId": event["uuid"]}, events)),
}
# See https://docs.aws.amazon.com/AmazonS3/latest/userguide/ev-events.html
formatted_event = {
"version": "0",
"id": event_envelope["uuid"],
"detail-type": event.get("DetailType"),
"source": event.get("Source"),
"account": self.current_account,
"time": event_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"region": self.region,
"resources": event.get("Resources", []),
"detail": json.loads(event.get("Detail", "{}")),
}

self.response_headers.update(
{"Content-Type": APPLICATION_AMZ_JSON_1_1, "x-amzn-RequestId": short_uid()}
)
targets = []
for rule in matching_rules:
if filter_event_based_on_event_format(
self, rule.name, event_bus_name, formatted_event
):
rule_targets = self.events_backend.list_targets_by_rule(
rule.name, event_bus_arn(event_bus_name, self.current_account, self.region)
).get("Targets", [])

targets.extend([{"RuleArn": rule.arn} | target for target in rule_targets])

# process event
handler_self.process_events(formatted_event, targets)

content = {
"FailedEntryCount": 0, # TODO: dynamically set proper value when refactoring
"Entries": list(map(lambda event: {"EventId": event["uuid"]}, events)),
}

self.response_headers.update(
{"Content-Type": APPLICATION_AMZ_JSON_1_1, "x-amzn-RequestId": short_uid()}
)

return json.dumps(content), self.response_headers
return json.dumps(content), self.response_headers


def apply_patches():
MotoEventsHandler.put_events = events_handler_put_events
def apply_patches(publisher: EventTargetPublisher):
MotoEventsHandler.put_events = PutEventsHandler(publisher)
64 changes: 64 additions & 0 deletions localstack/services/events/publisher.py
@@ -0,0 +1,64 @@
import logging
from concurrent.futures.thread import ThreadPoolExecutor
from typing import Dict

from localstack.utils.aws.message_forwarding import send_event_to_target as send_event_to_target_
from localstack.utils.common import truncate

LOG = logging.getLogger(__name__)


def send_event_to_target(
target_arn: str,
event: Dict,
target_attributes: Dict = None,
asynchronous: bool = True,
target: Dict = None,
role: str = None,
source_arn: str = None,
source_service: str = None,
):
try:
send_event_to_target_(
target_arn,
event,
target_attributes=target_attributes,
asynchronous=asynchronous,
target=target,
role=role,
source_arn=source_arn,
source_service=source_service,
)
except Exception as e:
LOG.info(f"Unable to send event notification {truncate(event)} to target {target}: {e}")


class EventTargetPublisher:
def __init__(self, thread_count: int = 10):
self.executor = ThreadPoolExecutor(thread_count, thread_name_prefix="eb_targets")

def shutdown(self):
self.executor.shutdown(wait=False)

def send_event_to_target(
self,
target_arn: str,
event: Dict,
target_attributes: Dict = None,
asynchronous: bool = True,
target: Dict = None,
role: str = None,
source_arn: str = None,
source_service: str = None,
):
self.executor.submit(
send_event_to_target,
target_arn,
event,
target_attributes=target_attributes,
asynchronous=asynchronous,
target=target,
role=role,
source_arn=source_arn,
source_service=source_service,
)