Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add experimental event ruler #10615

Merged
merged 34 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
0d89025
Add feature flag for Java event ruler implementation
joe4dev Apr 4, 2024
776c2c7
Add Java exception handling for event ruler
joe4dev Apr 5, 2024
a4ea75e
Add LS package for event ruler libs
joe4dev Apr 8, 2024
6ecd057
Add comment about machine-based matching
joe4dev Apr 8, 2024
138f6bb
Add JPype1 dependency for event ruler
joe4dev Apr 8, 2024
055ae46
Use java default for CI testing
joe4dev Apr 8, 2024
5fc3349
Fix malformed filter in unit test
joe4dev Apr 8, 2024
d2e748c
Try to fix ARM Linux build
joe4dev Apr 9, 2024
150af42
Fix S3 ARM build
joe4dev Apr 9, 2024
c373a7a
Fix ApproximateCreationDateTime type conversion
joe4dev Apr 9, 2024
dda765c
Remove duplicate negative numeric test and re-validate snapshots
joe4dev Apr 9, 2024
38ab093
Improve reliabilty of negative test
joe4dev Apr 9, 2024
35cfaea
Add note about test refactoring
joe4dev Apr 9, 2024
9e96cd2
Fix missing AND_IF in S3 Dockerfile
joe4dev Apr 9, 2024
db663b6
Add DynamoDB special case tests for EventBridge
joe4dev Apr 11, 2024
bcc2f13
Improve ESM DynamoDB test and fix the false negative
joe4dev Apr 11, 2024
87a6440
Skip failing tests for native Python implementation
joe4dev Apr 12, 2024
f330041
Switch default rule engine back to Python
joe4dev Apr 12, 2024
c2cbf23
Remove misleading comment
joe4dev Apr 12, 2024
4869491
Perform g++ install in single command
joe4dev Apr 12, 2024
0a77e95
Move Maven package installer to reusable core packages
joe4dev Apr 12, 2024
531839e
Skip another failing test in our own implementation
joe4dev Apr 12, 2024
b3b6b5b
Refactor Maven package installer
joe4dev Apr 15, 2024
d984c72
Switch default to Java for CI testing
joe4dev Apr 15, 2024
e267837
Raise exceptions from original exception
joe4dev Apr 15, 2024
9d142d0
Make JPype imports dynamic
joe4dev Apr 16, 2024
9a54628
Clarify JPype JVM shutdown comments
joe4dev Apr 16, 2024
1ccc192
Switch default back to Python implementation
joe4dev Apr 17, 2024
1c37ed8
Move JPype1 dependency from base-runtime into runtime
joe4dev Apr 18, 2024
4e7dbc5
Switch default to java for CI run
joe4dev Apr 18, 2024
29f660d
Fix S3 tests requiring native libs for runtime deps
joe4dev Apr 18, 2024
cd22f80
Install JPype runtime deps before installing test deps
joe4dev Apr 18, 2024
8f108eb
Remove unnecessary second g++ install
joe4dev Apr 18, 2024
dc265aa
Revert "Switch default to java for CI run"
joe4dev Apr 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ ARG TARGETARCH
RUN --mount=type=cache,target=/var/cache/apt \
apt-get update && \
# Install dependencies to add additional repos
apt-get install -y gcc
# g++ is a workaround to fix the JPype1 compile error on ARM Linux "gcc: fatal error: cannot execute ‘cc1plus’"
apt-get install -y gcc g++

# upgrade python build tools
RUN --mount=type=cache,target=/root/.cache \
Expand Down
3 changes: 2 additions & 1 deletion Dockerfile.s3
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ ARG TARGETARCH
RUN --mount=type=cache,target=/var/cache/apt \
apt-get update && \
# Install dependencies to add additional repos
apt-get install -y gcc
# g++ is a workaround to fix the JPype1 compile error on ARM Linux "gcc: fatal error: cannot execute ‘cc1plus’"
apt-get install -y gcc g++

# upgrade python build tools
RUN --mount=type=cache,target=/root/.cache \
Expand Down
5 changes: 5 additions & 0 deletions localstack/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -762,6 +762,11 @@ def populate_edge_configuration(
# get-function call.
INTERNAL_RESOURCE_ACCOUNT = os.environ.get("INTERNAL_RESOURCE_ACCOUNT") or "949334387222"

# Determine which implementation to use for the event rule / event filtering engine used by multiple services:
# EventBridge, EventBridge Pipes, Lambda Event Source Mapping, SNS
# Options: provider (default) | java
EVENT_RULE_ENGINE = os.environ.get("EVENT_RULE_ENGINE", "").strip()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ CI run with JPype enabled using java here in 017bae5


# -----
# SERVICE-SPECIFIC CONFIGS BELOW
# -----
Expand Down
78 changes: 76 additions & 2 deletions localstack/packages/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
from abc import ABC
from functools import lru_cache
from sys import version_info
from typing import Optional
from typing import Optional, Tuple

import requests

from localstack import config

from ..constants import LOCALSTACK_VENV_FOLDER
from ..constants import LOCALSTACK_VENV_FOLDER, MAVEN_REPO_URL
from ..utils.archives import download_and_extract
from ..utils.files import chmod_r, chown_r, mkdir, rm_rf
from ..utils.http import download
Expand Down Expand Up @@ -295,3 +295,77 @@ def _install(self, target: InstallTarget) -> None:
def _setup_existing_installation(self, target: InstallTarget) -> None:
"""If the venv is already present, it just needs to be initialized once."""
self._prepare_installation(target)


class MavenDownloadInstaller(DownloadInstaller):
"""The packageURL is easy copy/pastable from the Maven central repository and the first package URL
defines the package name and version.
Example package_url: pkg:maven/software.amazon.event.ruler/[email protected]
=> name: event-ruler
=> version: 1.7.3
"""

# Example: software.amazon.event.ruler
group_id: str
# Example: event-ruler
artifact_id: str

# Custom installation directory
install_dir_suffix: str | None

def __init__(self, package_url: str, install_dir_suffix: str | None = None):
self.group_id, self.artifact_id, version = parse_maven_package_url(package_url)
super().__init__(self.artifact_id, version)
self.install_dir_suffix = install_dir_suffix

def _get_download_url(self) -> str:
group_id_path = self.group_id.replace(".", "/")
return f"{MAVEN_REPO_URL}/{group_id_path}/{self.artifact_id}/{self.version}/{self.artifact_id}-{self.version}.jar"

def _get_install_dir(self, target: InstallTarget) -> str:
"""Allow to overwrite the default installation directory.
This enables downloading transitive dependencies into the same directory.
"""
if self.install_dir_suffix:
return os.path.join(target.value, self.install_dir_suffix)
else:
return super()._get_install_dir(target)


class MavenPackageInstaller(MavenDownloadInstaller):
"""Package installer for downloading Maven JARs, including optional dependencies.
The first Maven package is used as main LPM package and other dependencies are installed additionally.
Follows the Maven naming conventions: https://maven.apache.org/guides/mini/guide-naming-conventions.html
"""

# Installers for Maven dependencies
dependencies: list[MavenDownloadInstaller]

def __init__(self, *package_urls: str):
super().__init__(package_urls[0])
self.dependencies = []

# Create installers for dependencies
for package_url in package_urls[1:]:
install_dir_suffix = os.path.join(self.name, self.version)
self.dependencies.append(MavenDownloadInstaller(package_url, install_dir_suffix))

def _install(self, target: InstallTarget) -> None:
# Install all dependencies first
for dependency in self.dependencies:
dependency._install(target)
# Install the main Maven package once all dependencies are installed.
# This main package indicates whether all dependencies are installed.
super()._install(target)


def parse_maven_package_url(package_url: str) -> Tuple[str, str, str]:
"""Example: parse_maven_package_url("pkg:maven/software.amazon.event.ruler/[email protected]")
-> software.amazon.event.ruler, event-ruler, 1.7.3
"""
parts = package_url.split("/")
group_id = parts[1]
sub_parts = parts[2].split("@")
artifact_id = sub_parts[0]
version = sub_parts[1]
return group_id, artifact_id, version
63 changes: 63 additions & 0 deletions localstack/services/events/event_ruler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import logging
import os
from functools import cache
from pathlib import Path

from localstack import config
from localstack.services.events.packages import event_ruler_package
from localstack.services.events.utils import InvalidEventPatternException
from localstack.utils.objects import singleton_factory

THIS_FOLDER = os.path.dirname(os.path.realpath(__file__))

LOG = logging.getLogger(__name__)


@singleton_factory
def start_jvm() -> None:
import jpype
from jpype import config as jpype_config

# Workaround to unblock LocalStack shutdown. By default, JPype waits until all daemon threads are terminated,
# which blocks the LocalStack shutdown during testing because pytest runs LocalStack in a separate thread and
# `jpype.shutdownJVM()` only works from the main Python thread.
# Shutting down the JVM: https://jpype.readthedocs.io/en/latest/userguide.html#shutting-down-the-jvm
# JPype shutdown discussion: https://github.com/MPh-py/MPh/issues/15#issuecomment-778486669
jpype_config.destroy_jvm = False
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's unfortunately needed. Could this cause any side-effects?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The container shutdown will probably not work, as we have an orphan process which did not get terminated by its parent on shutdown. The supervisor will probably wait forever, until docker kills the container.


if not jpype.isJVMStarted():
event_ruler_libs_path = get_event_ruler_libs_path()
event_ruler_libs_pattern = event_ruler_libs_path.joinpath("*")
jpype.startJVM(classpath=[event_ruler_libs_pattern])


@cache
def get_event_ruler_libs_path() -> Path:
installer = event_ruler_package.get_installer()
installer.install()
return Path(installer.get_installed_dir())


def matches_rule(event: str, rule: str) -> bool:
"""Invokes the AWS Event Ruler Java library: https://github.com/aws/event-ruler
There is a single static boolean method Ruler.matchesRule(event, rule) -
both arguments are provided as JSON strings.
"""
if config.EVENT_RULE_ENGINE != "java":
raise NotImplementedError("Set EVENT_RULE_ENGINE=java to enable the Java Event Ruler.")

start_jvm()
import jpype.imports # noqa F401: required for importing Java modules
from jpype import java

# Import of the Java class "Ruler" needs to happen after the JVM start
from software.amazon.event.ruler import Ruler

try:
# "Static rule matching" is the easiest implementation to get started.
# "Matching with a machine" using a compiled machine is faster and enables rule validation before matching.
# https://github.com/aws/event-ruler?tab=readme-ov-file#matching-with-a-machine
return Ruler.matchesRule(event, rule)
except java.lang.Exception as e:
reason = e.args[0]
raise InvalidEventPatternException(reason=reason) from e
26 changes: 26 additions & 0 deletions localstack/services/events/packages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from localstack.packages import Package, PackageInstaller
from localstack.packages.core import MavenPackageInstaller

# https://central.sonatype.com/artifact/software.amazon.event.ruler/event-ruler
EVENT_RULER_VERSION = "1.7.3"
# The dependent jackson.version is defined in the Maven POM File of event-ruler
JACKSON_VERSION = "2.16.2"


class EventRulerPackage(Package):
def __init__(self):
super().__init__("EventRulerLibs", EVENT_RULER_VERSION)

def get_versions(self) -> list[str]:
return [EVENT_RULER_VERSION]

def _get_installer(self, version: str) -> PackageInstaller:
return MavenPackageInstaller(
f"pkg:maven/software.amazon.event.ruler/event-ruler@{EVENT_RULER_VERSION}",
f"pkg:maven/com.fasterxml.jackson.core/jackson-annotations@{JACKSON_VERSION}",
f"pkg:maven/com.fasterxml.jackson.core/jackson-core@{JACKSON_VERSION}",
f"pkg:maven/com.fasterxml.jackson.core/jackson-databind@{JACKSON_VERSION}",
)


event_ruler_package = EventRulerPackage()
51 changes: 39 additions & 12 deletions localstack/services/events/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
EventBusNameOrArn,
EventPattern,
EventsApi,
InvalidEventPatternException,
PutRuleResponse,
PutTargetsResponse,
RoleArn,
Expand All @@ -39,8 +40,12 @@
from localstack.constants import APPLICATION_AMZ_JSON_1_1
from localstack.http import route
from localstack.services.edge import ROUTER
from localstack.services.events.event_ruler import matches_rule
from localstack.services.events.models import EventsStore, events_stores
from localstack.services.events.scheduler import JobScheduler
from localstack.services.events.utils import (
InvalidEventPatternException as InternalInvalidEventPatternException,
)
from localstack.services.events.utils import matches_event
from localstack.services.moto import call_moto
from localstack.services.plugins import ServiceLifecycleHook
Expand Down Expand Up @@ -110,28 +115,44 @@ def test_event_pattern(
"""Test event pattern uses EventBridge event pattern matching:
https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-patterns.html
"""
event_pattern_dict = json.loads(event_pattern)
event_dict = json.loads(event)

# TODO: unify all these different implementation below ;)

# EventBridge implementation:
result = matches_event(event_pattern_dict, event_dict)

# EventSourceMapping implementation:
if config.EVENT_RULE_ENGINE == "java":
try:
result = matches_rule(event, event_pattern)
except InternalInvalidEventPatternException as e:
raise InvalidEventPatternException(e.message) from e
else:
event_pattern_dict = json.loads(event_pattern)
event_dict = json.loads(event)
result = matches_event(event_pattern_dict, event_dict)

# TODO: unify the different implementations below:
# event_pattern_dict = json.loads(event_pattern)
# event_dict = json.loads(event)

# EventBridge:
# result = matches_event(event_pattern_dict, event_dict)

# Lambda EventSourceMapping:
# from localstack.services.lambda_.event_source_listeners.utils import does_match_event
#
# result = does_match_event(event_pattern_dict, event_dict)

# moto implementation:
# moto-ext EventBridge:
# from moto.events.models import EventPattern as EventPatternMoto
#
# event_pattern = EventPatternMoto.load(event_pattern)
# result = event_pattern.matches_event(event_dict)

# SNS:
# SNS: The SNS rule engine seems to differ slightly, for example not allowing the wildcard pattern.
# from localstack.services.sns.publisher import SubscriptionFilter
# subscription_filter = SubscriptionFilter()
# result = subscription_filter._evaluate_nested_filter_policy_on_dict(event_pattern_dict, event_dict)

# moto-ext SNS:
# from moto.sns.utils import FilterPolicyMatcher
# filter_policy_matcher = FilterPolicyMatcher(event_pattern_dict, "MessageBody")
# result = filter_policy_matcher._body_based_match(event_dict)

return TestEventPatternResponse(Result=result)

@staticmethod
Expand Down Expand Up @@ -409,7 +430,13 @@ def filter_event_based_on_event_format(
return False
if rule_information.event_pattern._pattern:
event_pattern = rule_information.event_pattern._pattern
if not matches_event(event_pattern, event):
if config.EVENT_RULE_ENGINE == "java":
event_str = json.dumps(event)
event_pattern_str = json.dumps(event_pattern)
match_result = matches_rule(event_str, event_pattern_str)
else:
match_result = matches_event(event_pattern, event)
if not match_result:
return False
return True

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,6 @@ def _filter_records(
def _create_lambda_event_payload(self, stream_arn, records, shard_id=None):
record_payloads = []
for record in records:
creation_time = record.get("dynamodb", {}).get("ApproximateCreationDateTime", None)
if creation_time is not None:
record["dynamodb"]["ApproximateCreationDateTime"] = creation_time.timestamp()
record_payloads.append(
{
"eventID": record["eventID"],
Expand All @@ -76,3 +73,14 @@ def _get_first_and_last_arrival_time(self, first_record, last_record):
last_record.get("ApproximateArrivalTimestamp", datetime.datetime.utcnow()).isoformat()
+ "Z",
)

def _transform_records(self, raw_records: list[dict]) -> list[dict]:
"""Convert dynamodb.ApproximateCreationDateTime datetime to float"""
records_new = []
for record in raw_records:
record_new = record.copy()
if creation_time := record.get("dynamodb", {}).get("ApproximateCreationDateTime"):
# convert datetime object to float timestamp
record_new["dynamodb"]["ApproximateCreationDateTime"] = creation_time.timestamp()
records_new.append(record_new)
return records_new
joe4dev marked this conversation as resolved.
Show resolved Hide resolved
12 changes: 10 additions & 2 deletions localstack/services/lambda_/event_source_listeners/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
import logging
import re

from localstack import config
from localstack.aws.api.lambda_ import FilterCriteria
from localstack.services.events.event_ruler import matches_rule
from localstack.utils.strings import first_char_to_lower

LOG = logging.getLogger(__name__)
Expand All @@ -21,8 +23,14 @@ def filter_stream_records(records, filters: list[FilterCriteria]):
for record in records:
for filter in filters:
for rule in filter["Filters"]:
filter_pattern: dict[str, any] = json.loads(rule["Pattern"])
if does_match_event(filter_pattern, record):
if config.EVENT_RULE_ENGINE == "java":
event_str = json.dumps(record)
event_pattern_str = rule["Pattern"]
match_result = matches_rule(event_str, event_pattern_str)
else:
filter_pattern: dict[str, any] = json.loads(rule["Pattern"])
match_result = does_match_event(filter_pattern, record)
if match_result:
joe4dev marked this conversation as resolved.
Show resolved Hide resolved
filtered_records.append(record)
break
return filtered_records
Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ base-runtime = [
"Werkzeug>=3.0.0",
"xmltodict>=0.13.0",
"rolo>=0.4",
# allow Python programs full access to Java class libraries. Used for opt-in event ruler.
"JPype1>=1.5.0",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

431KB without any extra dependencies (see their pyproject.toml), just packaging and typing_extensions, which we have as well

Copy link
Contributor

@bentsku bentsku Apr 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we move it to runtime instead? base-runtime is the base one used by the S3 image, which has no dependency basically. Or was there a specific reason to put into base-runtime?

For a bit of context, basically, base-runtime is the minimal set of dependencies needed to run a service in LocalStack which would be entirely written in Python with no external dependencies (like S3).
I believe this change, which is for a provider with external dependencies and only opt-in, would fit better in runtime.

This would also have the side-effect of not having to change the S3 image Dockerfile, which does not need this and g++.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes a lot of sense @bentsku 💯 Thank you for this valuable input 👀 👏
I fixed everything (including the follow-up S3 tests because the test deps include the runtime deps 😬 ; that's a bit of an ugly hack though)

I didn't understand the full implications of base-runtime vs runtime dependencies.

With the fixed solution, the S3 image does not get larger 👌

# TODO: remove those 2 dependencies
"flask>=3.0.0",
"Quart>=0.19.2",
Expand Down
3 changes: 3 additions & 0 deletions requirements-base-runtime.txt
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ jmespath==1.0.1
# via
# boto3
# botocore
jpype1==1.5.0
# via localstack-core (pyproject.toml)
jsonpatch==1.33
# via localstack-core (pyproject.toml)
jsonpointer==2.4
Expand All @@ -112,6 +114,7 @@ packaging==24.0
# via
# build
# docker
# jpype1
pbr==6.0.0
# via stevedore
plux==1.9.0
Expand Down