From e52ceafa913d3f12f2a5db5235f84278cb996f50 Mon Sep 17 00:00:00 2001 From: Josef Lange Date: Thu, 15 Feb 2024 20:25:06 -0600 Subject: [PATCH 1/3] Implement ThreadPoolExecutor publisher pattern for EventBridge so that PutEvents and scheduled runs return immediately. --- localstack/services/events/provider.py | 333 ++++++++++++------------ localstack/services/events/publisher.py | 57 ++++ 2 files changed, 222 insertions(+), 168 deletions(-) create mode 100644 localstack/services/events/publisher.py diff --git a/localstack/services/events/provider.py b/localstack/services/events/provider.py index 4bbb68b3ae2a3..54f5e1c505558 100644 --- a/localstack/services/events/provider.py +++ b/localstack/services/events/provider.py @@ -41,6 +41,7 @@ from localstack.http import route from localstack.services.edge import ROUTER from localstack.services.events.models import EventsStore, events_stores +from localstack.services.events.publisher import EventTargetPublisher from localstack.services.events.scheduler import JobScheduler from localstack.services.moto import call_moto from localstack.services.plugins import ServiceLifecycleHook @@ -71,7 +72,8 @@ class ValidationException(ServiceException): class EventsProvider(EventsApi, ServiceLifecycleHook): def __init__(self): - apply_patches() + self._publisher = EventTargetPublisher() + apply_patches(self._publisher) def on_after_init(self): ROUTER.add(self.trigger_scheduled_rule) @@ -81,6 +83,7 @@ def on_before_start(self): def on_before_stop(self): JobScheduler.shutdown() + self._publisher.shutdown() @route("/_aws/events/rules//trigger") def trigger_scheduled_rule(self, request: Request, rule_arn: str): @@ -106,7 +109,7 @@ def get_store(context: RequestContext) -> EventsStore: return events_stores[context.account_id][context.region] def test_event_pattern( - self, context: RequestContext, event_pattern: EventPattern, event: String, **kwargs + self, context: RequestContext, event_pattern: EventPattern, event: String, **kwargs ) -> TestEventPatternResponse: # https://docs.aws.amazon.com/eventbridge/latest/APIReference/API_TestEventPattern.html # Test event pattern uses event pattern to match against event. @@ -119,11 +122,11 @@ def test_event_pattern( return TestEventPatternResponse(Result=False) return TestEventPatternResponse(Result=True) - @staticmethod def get_scheduled_rule_func( - store: EventsStore, - rule_name: RuleName, - event_bus_name_or_arn: Optional[EventBusNameOrArn] = None, + self, + store: EventsStore, + rule_name: RuleName, + event_bus_name_or_arn: Optional[EventBusNameOrArn] = None, ): def func(*args, **kwargs): account_id = store._account_id @@ -168,23 +171,15 @@ def func(*args, **kwargs): attr = pick_attributes(target, ["$.SqsParameters", "$.KinesisParameters"]) - try: - send_event_to_target( - arn, - event, - target_attributes=attr, - role=target.get("RoleArn"), - target=target, - source_arn=rule.arn, - source_service=ServicePrincipal.events, - ) - except Exception as e: - LOG.info( - "Unable to send event notification %s to target %s: %s", - truncate(event), - target, - e, - ) + self._publisher.send_event_to_target( + arn, + event, + target_attributes=attr, + role=target.get("RoleArn"), + target=target, + source_arn=rule.arn, + source_service=ServicePrincipal.events, + ) return func @@ -218,13 +213,13 @@ def convert_schedule_to_cron(schedule): raise ValueError("Unable to parse events schedule expression: %s" % schedule) return schedule - @staticmethod def put_rule_job_scheduler( - store: EventsStore, - name: Optional[RuleName], - state: Optional[RuleState], - schedule_expression: Optional[ScheduleExpression], - event_bus_name_or_arn: Optional[EventBusNameOrArn] = None, + self, + store: EventsStore, + name: Optional[RuleName], + state: Optional[RuleState], + schedule_expression: Optional[ScheduleExpression], + event_bus_name_or_arn: Optional[EventBusNameOrArn] = None, ): if not schedule_expression: return @@ -235,7 +230,7 @@ def put_rule_job_scheduler( LOG.error("Error parsing schedule expression: %s", e) raise ValidationException("Parameter ScheduleExpression is not valid.") from e - job_func = EventsProvider.get_scheduled_rule_func( + job_func = self.get_scheduled_rule_func( store, name, event_bus_name_or_arn=event_bus_name_or_arn ) LOG.debug("Adding new scheduled Events rule with cron schedule %s", cron) @@ -246,17 +241,17 @@ def put_rule_job_scheduler( rule_scheduled_jobs[name] = job_id def put_rule( - self, - context: RequestContext, - name: RuleName, - schedule_expression: ScheduleExpression = None, - event_pattern: EventPattern = None, - state: RuleState = None, - description: RuleDescription = None, - role_arn: RoleArn = None, - tags: TagList = None, - event_bus_name: EventBusNameOrArn = None, - **kwargs, + self, + context: RequestContext, + name: RuleName, + schedule_expression: ScheduleExpression = None, + event_pattern: EventPattern = None, + state: RuleState = None, + description: RuleDescription = None, + role_arn: RoleArn = None, + tags: TagList = None, + event_bus_name: EventBusNameOrArn = None, + **kwargs, ) -> PutRuleResponse: store = self.get_store(context) self.put_rule_job_scheduler( @@ -265,12 +260,12 @@ def put_rule( return call_moto(context) def delete_rule( - self, - context: RequestContext, - name: RuleName, - event_bus_name: EventBusNameOrArn = None, - force: Boolean = None, - **kwargs, + self, + context: RequestContext, + name: RuleName, + event_bus_name: EventBusNameOrArn = None, + force: Boolean = None, + **kwargs, ) -> None: rule_scheduled_jobs = self.get_store(context).rule_scheduled_jobs job_id = rule_scheduled_jobs.get(name) @@ -280,11 +275,11 @@ def delete_rule( call_moto(context) def disable_rule( - self, - context: RequestContext, - name: RuleName, - event_bus_name: EventBusNameOrArn = None, - **kwargs, + self, + context: RequestContext, + name: RuleName, + event_bus_name: EventBusNameOrArn = None, + **kwargs, ) -> None: rule_scheduled_jobs = self.get_store(context).rule_scheduled_jobs job_id = rule_scheduled_jobs.get(name) @@ -294,13 +289,13 @@ def disable_rule( call_moto(context) def create_connection( - self, - context: RequestContext, - name: ConnectionName, - authorization_type: ConnectionAuthorizationType, - auth_parameters: CreateConnectionAuthRequestParameters, - description: ConnectionDescription = None, - **kwargs, + self, + context: RequestContext, + name: ConnectionName, + authorization_type: ConnectionAuthorizationType, + auth_parameters: CreateConnectionAuthRequestParameters, + description: ConnectionDescription = None, + **kwargs, ) -> CreateConnectionResponse: errors = [] @@ -326,12 +321,12 @@ def create_connection( return call_moto(context) def put_targets( - self, - context: RequestContext, - rule: RuleName, - targets: TargetList, - event_bus_name: EventBusNameOrArn = None, - **kwargs, + self, + context: RequestContext, + rule: RuleName, + targets: TargetList, + event_bus_name: EventBusNameOrArn = None, + **kwargs, ) -> PutTargetsResponse: validation_errors = [] @@ -441,27 +436,27 @@ def filter_event_with_content_base_parameter(pattern_value: list, event_value: s if isinstance(element_value[index], int): continue if ( - element_value[index] == ">" - and isinstance(element_value[index + 1], int) - and event_value <= element_value[index + 1] + element_value[index] == ">" + and isinstance(element_value[index + 1], int) + and event_value <= element_value[index + 1] ): break elif ( - element_value[index] == ">=" - and isinstance(element_value[index + 1], int) - and event_value < element_value[index + 1] + element_value[index] == ">=" + and isinstance(element_value[index + 1], int) + and event_value < element_value[index + 1] ): break elif ( - element_value[index] == "<" - and isinstance(element_value[index + 1], int) - and event_value >= element_value[index + 1] + element_value[index] == "<" + and isinstance(element_value[index + 1], int) + and event_value >= element_value[index + 1] ): break elif ( - element_value[index] == "<=" - and isinstance(element_value[index + 1], int) - and event_value > element_value[index + 1] + element_value[index] == "<=" + and isinstance(element_value[index + 1], int) + and event_value > element_value[index + 1] ): break else: @@ -475,7 +470,7 @@ def filter_event_with_content_base_parameter(pattern_value: list, event_value: s elif isinstance(element_value, dict): nested_key = list(element_value)[0] if nested_key == "prefix" and not re.match( - r"^{}".format(element_value.get(nested_key)), event_value + r"^{}".format(element_value.get(nested_key)), event_value ): return True return False @@ -529,7 +524,7 @@ def event_pattern_prefix_bool_filter(event_pattern_filter_value_list: list[dict[ # TODO: refactor/simplify! def filter_event_based_on_event_format( - self, rule_name: str, event_bus_name: str, event: dict[str, Any] + self, rule_name: str, event_bus_name: str, event: dict[str, Any] ): def filter_event(event_pattern_filter: dict[str, Any], event: dict[str, Any]): for key, value in event_pattern_filter.items(): @@ -558,14 +553,14 @@ def filter_event(event_pattern_filter: dict[str, Any], event: dict[str, Any]): return False else: if ( - isinstance(event_value, list) - and get_two_lists_intersection(value, event_value) == [] + isinstance(event_value, list) + and get_two_lists_intersection(value, event_value) == [] ): return False if ( - not isinstance(event_value, list) - and isinstance(event_value, (str, int)) - and event_value not in value + not isinstance(event_value, list) + and isinstance(event_value, (str, int)) + and event_value not in value ): return False @@ -622,16 +617,25 @@ def process_event_with_input_transformer(input_transformer: Dict, event: Dict) - return templated_event -def process_events(event: Dict, targets: list[Dict]): - for target in targets: - arn = target["Arn"] - changed_event = filter_event_with_target_input_path(target, event) - if input_transformer := target.get("InputTransformer"): - changed_event = process_event_with_input_transformer(input_transformer, changed_event) - if target.get("Input"): - changed_event = json.loads(target.get("Input")) - try: - send_event_to_target( +def get_event_bus_name(event_bus_name_or_arn: Optional[EventBusNameOrArn] = None) -> str: + event_bus_name_or_arn = event_bus_name_or_arn or DEFAULT_EVENT_BUS_NAME + return event_bus_name_or_arn.split("/")[-1] + + +class PutEventsHandler: + def __init__(self, publisher: EventTargetPublisher): + self.publisher = publisher + + def process_events(self, event: Dict, targets: list[Dict]): + for target in targets: + arn = target["Arn"] + changed_event = filter_event_with_target_input_path(target, event) + if input_transformer := target.get("InputTransformer"): + changed_event = process_event_with_input_transformer(input_transformer, changed_event) + if target.get("Input"): + changed_event = json.loads(target.get("Input")) + + self.publisher.send_event_to_target( arn, changed_event, pick_attributes(target, ["$.SqsParameters", "$.KinesisParameters"]), @@ -640,89 +644,82 @@ def process_events(event: Dict, targets: list[Dict]): source_service=ServicePrincipal.events, source_arn=target.get("RuleArn"), ) - except Exception as e: - LOG.info(f"Unable to send event notification {truncate(event)} to target {target}: {e}") + # specific logic for put_events which forwards matching events to target listeners + def __call__(handler_self, self): + entries = self._get_param("Entries") -def get_event_bus_name(event_bus_name_or_arn: Optional[EventBusNameOrArn] = None) -> str: - event_bus_name_or_arn = event_bus_name_or_arn or DEFAULT_EVENT_BUS_NAME - return event_bus_name_or_arn.split("/")[-1] + # keep track of events for local integration testing + if config.is_local_test_mode(): + TEST_EVENTS_CACHE.extend(entries) + events = list(map(lambda event: {"event": event, "uuid": str(long_uid())}, entries)) -# specific logic for put_events which forwards matching events to target listeners -def events_handler_put_events(self): - entries = self._get_param("Entries") - - # keep track of events for local integration testing - if config.is_local_test_mode(): - TEST_EVENTS_CACHE.extend(entries) - - events = list(map(lambda event: {"event": event, "uuid": str(long_uid())}, entries)) - - _dump_events_to_files(events) - - for event_envelope in events: - event = event_envelope["event"] - event_bus_name = get_event_bus_name(event.get("EventBusName")) - event_bus = self.events_backend.event_buses.get(event_bus_name) - if not event_bus: - continue - - matching_rules = [ - r - for r in event_bus.rules.values() - if r.event_bus_name == event_bus_name and not r.scheduled_expression - ] - if not matching_rules: - continue - - event_time = datetime.datetime.utcnow() - if event_timestamp := event.get("Time"): - try: - # if provided, use the time from event - event_time = datetime.datetime.utcfromtimestamp(event_timestamp) - except ValueError: - # if we can't parse it, pass and keep using `utcnow` - LOG.debug( - "Could not parse the `Time` parameter, falling back to `utcnow` for the following Event: '%s'", - event, - ) + _dump_events_to_files(events) - # See https://docs.aws.amazon.com/AmazonS3/latest/userguide/ev-events.html - formatted_event = { - "version": "0", - "id": event_envelope["uuid"], - "detail-type": event.get("DetailType"), - "source": event.get("Source"), - "account": self.current_account, - "time": event_time.strftime("%Y-%m-%dT%H:%M:%SZ"), - "region": self.region, - "resources": event.get("Resources", []), - "detail": json.loads(event.get("Detail", "{}")), - } + for event_envelope in events: + event = event_envelope["event"] + event_bus_name = get_event_bus_name(event.get("EventBusName")) + event_bus = self.events_backend.event_buses.get(event_bus_name) + if not event_bus: + continue - targets = [] - for rule in matching_rules: - if filter_event_based_on_event_format(self, rule.name, event_bus_name, formatted_event): - rule_targets = self.events_backend.list_targets_by_rule( - rule.name, event_bus_arn(event_bus_name, self.current_account, self.region) - ).get("Targets", []) + matching_rules = [ + r + for r in event_bus.rules.values() + if r.event_bus_name == event_bus_name and not r.scheduled_expression + ] + if not matching_rules: + continue - targets.extend([{"RuleArn": rule.arn} | target for target in rule_targets]) - # process event - process_events(formatted_event, targets) + event_time = datetime.datetime.utcnow() + if event_timestamp := event.get("Time"): + try: + # if provided, use the time from event + event_time = datetime.datetime.utcfromtimestamp(event_timestamp) + except ValueError: + # if we can't parse it, pass and keep using `utcnow` + LOG.debug( + "Could not parse the `Time` parameter, falling back to `utcnow` for the following Event: '%s'", + event, + ) - content = { - "FailedEntryCount": 0, # TODO: dynamically set proper value when refactoring - "Entries": list(map(lambda event: {"EventId": event["uuid"]}, events)), - } + # See https://docs.aws.amazon.com/AmazonS3/latest/userguide/ev-events.html + formatted_event = { + "version": "0", + "id": event_envelope["uuid"], + "detail-type": event.get("DetailType"), + "source": event.get("Source"), + "account": self.current_account, + "time": event_time.strftime("%Y-%m-%dT%H:%M:%SZ"), + "region": self.region, + "resources": event.get("Resources", []), + "detail": json.loads(event.get("Detail", "{}")), + } - self.response_headers.update( - {"Content-Type": APPLICATION_AMZ_JSON_1_1, "x-amzn-RequestId": short_uid()} - ) + targets = [] + for rule in matching_rules: + if filter_event_based_on_event_format(self, rule.name, event_bus_name, formatted_event): + rule_targets = self.events_backend.list_targets_by_rule( + rule.name, event_bus_arn(event_bus_name, self.current_account, self.region) + ).get("Targets", []) + + targets.extend([{"RuleArn": rule.arn} | target for target in rule_targets]) + + # process event + handler_self.process_events(formatted_event, targets) + + content = { + "FailedEntryCount": 0, # TODO: dynamically set proper value when refactoring + "Entries": list(map(lambda event: {"EventId": event["uuid"]}, events)), + } + + self.response_headers.update( + {"Content-Type": APPLICATION_AMZ_JSON_1_1, "x-amzn-RequestId": short_uid()} + ) - return json.dumps(content), self.response_headers + return json.dumps(content), self.response_headers -def apply_patches(): - MotoEventsHandler.put_events = events_handler_put_events +def apply_patches(publisher: EventTargetPublisher): + MotoEventsHandler.put_events = PutEventsHandler(publisher) diff --git a/localstack/services/events/publisher.py b/localstack/services/events/publisher.py new file mode 100644 index 0000000000000..52b07cd428452 --- /dev/null +++ b/localstack/services/events/publisher.py @@ -0,0 +1,57 @@ +from concurrent.futures.thread import ThreadPoolExecutor +from typing import Dict + +import logging + +from localstack.utils.aws.message_forwarding import send_event_to_target as send_event_to_target_ +from localstack.utils.common import truncate + +LOG = logging.getLogger(__name__) + + +def send_event_to_target( + target_arn: str, + event: Dict, + target_attributes: Dict = None, + asynchronous: bool = True, + target: Dict = None, + role: str = None, + source_arn: str = None, + source_service: str = None, +): + try: + send_event_to_target_( + target_arn, + event, + target_attributes=target_attributes, + asynchronous=asynchronous, + target=target, + role=role, + source_arn=source_arn, + source_service=source_service + ) + except Exception as e: + LOG.info(f"Unable to send event notification {truncate(event)} to target {target}: {e}") + + +class EventTargetPublisher: + def __init__(self, thread_count: int = 10): + self.executor = ThreadPoolExecutor(thread_count, thread_name_prefix="eb_targets") + + def shutdown(self): + self.executor.shutdown(wait=False) + + def send_event_to_target( + self, + target_arn: str, + event: Dict, + target_attributes: Dict = None, + asynchronous: bool = True, + target: Dict = None, + role: str = None, + source_arn: str = None, + source_service: str = None, + ): + self.executor.submit(send_event_to_target, target_arn, event, target_attributes=target_attributes, + asynchronous=asynchronous, target=target, role=role, source_arn=source_arn, + source_service=source_service) From 8a50ec4bfe00d437cb3bc4c627b3c3f86ff3687e Mon Sep 17 00:00:00 2001 From: Josef Lange Date: Thu, 15 Feb 2024 20:26:28 -0600 Subject: [PATCH 2/3] format/lint --- localstack/services/events/provider.py | 141 ++++++++++++------------ localstack/services/events/publisher.py | 37 ++++--- 2 files changed, 94 insertions(+), 84 deletions(-) diff --git a/localstack/services/events/provider.py b/localstack/services/events/provider.py index 54f5e1c505558..e4b597a6f0ffd 100644 --- a/localstack/services/events/provider.py +++ b/localstack/services/events/provider.py @@ -47,9 +47,8 @@ from localstack.services.plugins import ServiceLifecycleHook from localstack.utils.aws.arns import event_bus_arn, parse_arn from localstack.utils.aws.client_types import ServicePrincipal -from localstack.utils.aws.message_forwarding import send_event_to_target from localstack.utils.collections import pick_attributes -from localstack.utils.common import TMP_FILES, mkdir, save_file, truncate +from localstack.utils.common import TMP_FILES, mkdir, save_file from localstack.utils.json import extract_jsonpath from localstack.utils.strings import long_uid, short_uid from localstack.utils.time import TIMESTAMP_FORMAT_TZ, timestamp @@ -109,7 +108,7 @@ def get_store(context: RequestContext) -> EventsStore: return events_stores[context.account_id][context.region] def test_event_pattern( - self, context: RequestContext, event_pattern: EventPattern, event: String, **kwargs + self, context: RequestContext, event_pattern: EventPattern, event: String, **kwargs ) -> TestEventPatternResponse: # https://docs.aws.amazon.com/eventbridge/latest/APIReference/API_TestEventPattern.html # Test event pattern uses event pattern to match against event. @@ -123,10 +122,10 @@ def test_event_pattern( return TestEventPatternResponse(Result=True) def get_scheduled_rule_func( - self, - store: EventsStore, - rule_name: RuleName, - event_bus_name_or_arn: Optional[EventBusNameOrArn] = None, + self, + store: EventsStore, + rule_name: RuleName, + event_bus_name_or_arn: Optional[EventBusNameOrArn] = None, ): def func(*args, **kwargs): account_id = store._account_id @@ -214,12 +213,12 @@ def convert_schedule_to_cron(schedule): return schedule def put_rule_job_scheduler( - self, - store: EventsStore, - name: Optional[RuleName], - state: Optional[RuleState], - schedule_expression: Optional[ScheduleExpression], - event_bus_name_or_arn: Optional[EventBusNameOrArn] = None, + self, + store: EventsStore, + name: Optional[RuleName], + state: Optional[RuleState], + schedule_expression: Optional[ScheduleExpression], + event_bus_name_or_arn: Optional[EventBusNameOrArn] = None, ): if not schedule_expression: return @@ -241,17 +240,17 @@ def put_rule_job_scheduler( rule_scheduled_jobs[name] = job_id def put_rule( - self, - context: RequestContext, - name: RuleName, - schedule_expression: ScheduleExpression = None, - event_pattern: EventPattern = None, - state: RuleState = None, - description: RuleDescription = None, - role_arn: RoleArn = None, - tags: TagList = None, - event_bus_name: EventBusNameOrArn = None, - **kwargs, + self, + context: RequestContext, + name: RuleName, + schedule_expression: ScheduleExpression = None, + event_pattern: EventPattern = None, + state: RuleState = None, + description: RuleDescription = None, + role_arn: RoleArn = None, + tags: TagList = None, + event_bus_name: EventBusNameOrArn = None, + **kwargs, ) -> PutRuleResponse: store = self.get_store(context) self.put_rule_job_scheduler( @@ -260,12 +259,12 @@ def put_rule( return call_moto(context) def delete_rule( - self, - context: RequestContext, - name: RuleName, - event_bus_name: EventBusNameOrArn = None, - force: Boolean = None, - **kwargs, + self, + context: RequestContext, + name: RuleName, + event_bus_name: EventBusNameOrArn = None, + force: Boolean = None, + **kwargs, ) -> None: rule_scheduled_jobs = self.get_store(context).rule_scheduled_jobs job_id = rule_scheduled_jobs.get(name) @@ -275,11 +274,11 @@ def delete_rule( call_moto(context) def disable_rule( - self, - context: RequestContext, - name: RuleName, - event_bus_name: EventBusNameOrArn = None, - **kwargs, + self, + context: RequestContext, + name: RuleName, + event_bus_name: EventBusNameOrArn = None, + **kwargs, ) -> None: rule_scheduled_jobs = self.get_store(context).rule_scheduled_jobs job_id = rule_scheduled_jobs.get(name) @@ -289,13 +288,13 @@ def disable_rule( call_moto(context) def create_connection( - self, - context: RequestContext, - name: ConnectionName, - authorization_type: ConnectionAuthorizationType, - auth_parameters: CreateConnectionAuthRequestParameters, - description: ConnectionDescription = None, - **kwargs, + self, + context: RequestContext, + name: ConnectionName, + authorization_type: ConnectionAuthorizationType, + auth_parameters: CreateConnectionAuthRequestParameters, + description: ConnectionDescription = None, + **kwargs, ) -> CreateConnectionResponse: errors = [] @@ -321,12 +320,12 @@ def create_connection( return call_moto(context) def put_targets( - self, - context: RequestContext, - rule: RuleName, - targets: TargetList, - event_bus_name: EventBusNameOrArn = None, - **kwargs, + self, + context: RequestContext, + rule: RuleName, + targets: TargetList, + event_bus_name: EventBusNameOrArn = None, + **kwargs, ) -> PutTargetsResponse: validation_errors = [] @@ -436,27 +435,27 @@ def filter_event_with_content_base_parameter(pattern_value: list, event_value: s if isinstance(element_value[index], int): continue if ( - element_value[index] == ">" - and isinstance(element_value[index + 1], int) - and event_value <= element_value[index + 1] + element_value[index] == ">" + and isinstance(element_value[index + 1], int) + and event_value <= element_value[index + 1] ): break elif ( - element_value[index] == ">=" - and isinstance(element_value[index + 1], int) - and event_value < element_value[index + 1] + element_value[index] == ">=" + and isinstance(element_value[index + 1], int) + and event_value < element_value[index + 1] ): break elif ( - element_value[index] == "<" - and isinstance(element_value[index + 1], int) - and event_value >= element_value[index + 1] + element_value[index] == "<" + and isinstance(element_value[index + 1], int) + and event_value >= element_value[index + 1] ): break elif ( - element_value[index] == "<=" - and isinstance(element_value[index + 1], int) - and event_value > element_value[index + 1] + element_value[index] == "<=" + and isinstance(element_value[index + 1], int) + and event_value > element_value[index + 1] ): break else: @@ -470,7 +469,7 @@ def filter_event_with_content_base_parameter(pattern_value: list, event_value: s elif isinstance(element_value, dict): nested_key = list(element_value)[0] if nested_key == "prefix" and not re.match( - r"^{}".format(element_value.get(nested_key)), event_value + r"^{}".format(element_value.get(nested_key)), event_value ): return True return False @@ -524,7 +523,7 @@ def event_pattern_prefix_bool_filter(event_pattern_filter_value_list: list[dict[ # TODO: refactor/simplify! def filter_event_based_on_event_format( - self, rule_name: str, event_bus_name: str, event: dict[str, Any] + self, rule_name: str, event_bus_name: str, event: dict[str, Any] ): def filter_event(event_pattern_filter: dict[str, Any], event: dict[str, Any]): for key, value in event_pattern_filter.items(): @@ -553,14 +552,14 @@ def filter_event(event_pattern_filter: dict[str, Any], event: dict[str, Any]): return False else: if ( - isinstance(event_value, list) - and get_two_lists_intersection(value, event_value) == [] + isinstance(event_value, list) + and get_two_lists_intersection(value, event_value) == [] ): return False if ( - not isinstance(event_value, list) - and isinstance(event_value, (str, int)) - and event_value not in value + not isinstance(event_value, list) + and isinstance(event_value, (str, int)) + and event_value not in value ): return False @@ -631,7 +630,9 @@ def process_events(self, event: Dict, targets: list[Dict]): arn = target["Arn"] changed_event = filter_event_with_target_input_path(target, event) if input_transformer := target.get("InputTransformer"): - changed_event = process_event_with_input_transformer(input_transformer, changed_event) + changed_event = process_event_with_input_transformer( + input_transformer, changed_event + ) if target.get("Input"): changed_event = json.loads(target.get("Input")) @@ -699,7 +700,9 @@ def __call__(handler_self, self): targets = [] for rule in matching_rules: - if filter_event_based_on_event_format(self, rule.name, event_bus_name, formatted_event): + if filter_event_based_on_event_format( + self, rule.name, event_bus_name, formatted_event + ): rule_targets = self.events_backend.list_targets_by_rule( rule.name, event_bus_arn(event_bus_name, self.current_account, self.region) ).get("Targets", []) diff --git a/localstack/services/events/publisher.py b/localstack/services/events/publisher.py index 52b07cd428452..e77d459ccc5c8 100644 --- a/localstack/services/events/publisher.py +++ b/localstack/services/events/publisher.py @@ -1,8 +1,7 @@ +import logging from concurrent.futures.thread import ThreadPoolExecutor from typing import Dict -import logging - from localstack.utils.aws.message_forwarding import send_event_to_target as send_event_to_target_ from localstack.utils.common import truncate @@ -28,7 +27,7 @@ def send_event_to_target( target=target, role=role, source_arn=source_arn, - source_service=source_service + source_service=source_service, ) except Exception as e: LOG.info(f"Unable to send event notification {truncate(event)} to target {target}: {e}") @@ -42,16 +41,24 @@ def shutdown(self): self.executor.shutdown(wait=False) def send_event_to_target( - self, - target_arn: str, - event: Dict, - target_attributes: Dict = None, - asynchronous: bool = True, - target: Dict = None, - role: str = None, - source_arn: str = None, - source_service: str = None, + self, + target_arn: str, + event: Dict, + target_attributes: Dict = None, + asynchronous: bool = True, + target: Dict = None, + role: str = None, + source_arn: str = None, + source_service: str = None, ): - self.executor.submit(send_event_to_target, target_arn, event, target_attributes=target_attributes, - asynchronous=asynchronous, target=target, role=role, source_arn=source_arn, - source_service=source_service) + self.executor.submit( + send_event_to_target, + target_arn, + event, + target_attributes=target_attributes, + asynchronous=asynchronous, + target=target, + role=role, + source_arn=source_arn, + source_service=source_service, + ) From 74ca3caca90be63c8c450fd470288d436d32a9f6 Mon Sep 17 00:00:00 2001 From: Josef Lange Date: Wed, 21 Feb 2024 11:18:28 -0600 Subject: [PATCH 3/3] Wrap publisher usage in a function to appease moto --- localstack/services/events/provider.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/localstack/services/events/provider.py b/localstack/services/events/provider.py index e4b597a6f0ffd..8f8483b60e19e 100644 --- a/localstack/services/events/provider.py +++ b/localstack/services/events/provider.py @@ -647,7 +647,7 @@ def process_events(self, event: Dict, targets: list[Dict]): ) # specific logic for put_events which forwards matching events to target listeners - def __call__(handler_self, self): + def __call__(handler_self, self) -> tuple: entries = self._get_param("Entries") # keep track of events for local integration testing @@ -724,5 +724,12 @@ def __call__(handler_self, self): return json.dumps(content), self.response_headers +def get_handler_function(handler: PutEventsHandler): + def handle(self) -> str: + return handler(self)[0] + + return handle + + def apply_patches(publisher: EventTargetPublisher): - MotoEventsHandler.put_events = PutEventsHandler(publisher) + MotoEventsHandler.put_events = get_handler_function(PutEventsHandler(publisher))