From 45d335635bacce756660664a677ba2908d7390a8 Mon Sep 17 00:00:00 2001 From: Will Morrison Date: Wed, 30 Oct 2024 13:55:55 -0400 Subject: [PATCH] Use MVEL for rule evaluation Co-Authored-By: Prakhar Sapre --- docs/routing-rules.md | 161 +++++++----------- gateway-ha/pom.xml | 25 +-- .../ha/config/RoutingRulesConfiguration.java | 16 ++ .../ha/module/HaGatewayProviderModule.java | 8 +- .../router/FileBasedRoutingGroupSelector.java | 107 ++++++++++++ .../gateway/ha/router/MVELRoutingRule.java | 139 +++++++++++++++ .../ha/router/RoutingGroupSelector.java | 5 +- .../trino/gateway/ha/router/RoutingRule.java | 26 +++ .../RuleReloadingRoutingGroupSelector.java | 125 -------------- .../ha/router/TestRoutingGroupSelector.java | 55 ++++-- .../rules/routing_rules_composite.yml | 24 --- .../resources/rules/routing_rules_state.yml | 31 ++++ 12 files changed, 435 insertions(+), 287 deletions(-) create mode 100644 gateway-ha/src/main/java/io/trino/gateway/ha/router/FileBasedRoutingGroupSelector.java create mode 100644 gateway-ha/src/main/java/io/trino/gateway/ha/router/MVELRoutingRule.java create mode 100644 gateway-ha/src/main/java/io/trino/gateway/ha/router/RoutingRule.java delete mode 100644 gateway-ha/src/main/java/io/trino/gateway/ha/router/RuleReloadingRoutingGroupSelector.java delete mode 100644 gateway-ha/src/test/resources/rules/routing_rules_composite.yml create mode 100644 gateway-ha/src/test/resources/rules/routing_rules_state.yml diff --git a/docs/routing-rules.md b/docs/routing-rules.md index 78def563c..e70b68c3e 100644 --- a/docs/routing-rules.md +++ b/docs/routing-rules.md @@ -24,6 +24,8 @@ To enable the routing rules engine, find the following lines in * Set `rulesEngineEnabled` to `true`, then `rulesType` as `FILE` or `EXTERNAL`. * If you set `rulesType: FILE`, then set `rulesConfigPath` to the path to your rules config file. +* The rules file will be re-read every minute by default. You may change this by setting + `rulesRefreshPeriod: Duration`, where duration is an airlift style Duration such as `30s`. * If you set `rulesType: EXTERNAL`, set `rulesExternalConfiguration` to the URL of an external service for routing rules processing. * `rulesType` is by default `FILE` unless specified. @@ -92,11 +94,10 @@ return a result with the following criteria: ### Configure routing rules with a file -To express and fire routing rules, we use the -[easy-rules](https://github.com/j-easy/easy-rules) engine. These rules must be -stored in a YAML file. Rules consist of a name, description, condition, and list +Rules consist of a name, description, condition, and list of actions. If the condition of a particular rule evaluates to `true`, its -actions are fired. +actions are fired. Rules are stored as a +[multi-document](https://www.yaml.info/learn/document.html) YAML file. ```yaml --- @@ -113,20 +114,37 @@ actions: - 'result.put("routingGroup", "etl-special")' ``` -In the condition, you can access the methods of a -[HttpServletRequest](https://docs.oracle.com/javaee/6/api/javax/servlet/http/HttpServletRequest.html) -object called `request`. Rules may also utilize +Three objects are available by default. They are +* `request`, the incoming request as an [HttpServletRequest](https://docs.oracle.com/javaee/6/api/javax/servlet/http/HttpServletRequest.html) +* `state`, a `HashMap` that allows passing arbitrary state from one rule evaluation to the next +* `result`, a `HashMap` that is used to return the result of rule evaluation to the engine + +In addition to the default objects, rules may optionally utilize [trinoRequestUser](#trinorequestuser) and [trinoQueryProperties](#trinoqueryproperties) -objects, which provide information about the user and query respectively. +, which provide information about the user and query respectively. You must include an action of the form `result.put(\"routingGroup\", \"foo\")` to trigger routing of a request that satisfies the condition to the specific routing group. Without this action, the default adhoc group is used and the whole routing rule is redundant. The condition and actions are written in [MVEL](http://mvel.documentnode.com/), -an expression language with Java-like syntax. In most cases, you can write -conditions and actions in Java syntax and expect it to work. There are some +an expression language with Java-like syntax. Classes from `java.util`, data-type +classes from `java.lang` such as `Integer` and `String`, as well as `java.lang.Math` +and `java.lang.StrictMath` are available in rules. Rules may not use `java.lang.System` +and other classes that allow access the Java runtime and operating system. +In most cases, you can write +conditions and actions in Java syntax and expect it to work. One exception is +parametrized types. Exclude type parameters, for example to add a `HashSet` to the +`state` variable, use an action such as: +```java +actions: + - | + state.put("triggeredRules",new HashSet()) +``` +This is equivalent to `new HashSet()`. + +There are some MVEL-specific operators. For example, instead of doing a null-check before accessing the `String.contains` method like this: @@ -296,8 +314,8 @@ actions: ``` This can difficult to maintain with more rules. To have better control over the -execution of rules, we can use rule priorities and composite rules. Overall, -priorities, composite rules, and other constructs that MVEL support allows +execution of rules, we can use rule priorities. Overall, +priorities and other constructs that MVEL support allows you to express your routing logic. #### Rule priority @@ -328,99 +346,52 @@ that the first rule (priority 0) is fired before the second rule (priority 1). Thus `routingGroup` is set to `etl` and then to `etl-special`, so the `routingGroup` is always `etl-special` in the end. -More specific rules must be set to a lesser priority so they are evaluated last -to set a `routingGroup`. To further control the execution of rules, for example -to have only one rule fire, you can use composite rules. +More specific rules must be set to a higher priority so they are evaluated last +to set a `routingGroup`. -##### Composite rules +##### Passing State -First, please refer to the [easy-rule composite rules documentation](https://github.com/j-easy/easy-rules/wiki/defining-rules#composite-rules). - -The preceding section covers how to control the order of rule execution using -priorities. In addition, you can configure evaluation so that only the first -rule matched fires (the highest priority one) and the rest is ignored. You can -use `ActivationRuleGroup` to achieve this: +The `state` object may be used to pass information from one rule evaluation to +the next. This allows an author to avoid duplicating logic in multiple rules. +Priority should be used to ensure that `state` is updated before being used +in downstream rules. For example, the atomic rules may be re-implemented as ```yaml --- -name: "airflow rule group" -description: "routing rules for query from airflow" -compositeRuleType: "ActivationRuleGroup" -composingRules: - - name: "airflow special" - description: "if query from airflow with special label, route to etl-special group" - priority: 0 - condition: 'request.getHeader("X-Trino-Source") == "airflow" && request.getHeader("X-Trino-Client-Tags") contains "label=special"' - actions: - - 'result.put("routingGroup", "etl-special")' - - name: "airflow" - description: "if query from airflow, route to etl group" - priority: 1 - condition: 'request.getHeader("X-Trino-Source") == "airflow"' - actions: - - 'result.put("routingGroup", "etl")' -``` - -Note that the priorities have switched. The more specific rule has a higher -priority, since it should fire first. A query coming from airflow with special -label is matched to the "airflow special" rule first, since it's higher -priority, and the second rule is ignored. A query coming from airflow with no -labels does not match the first rule, and is then tested and matched to the -second rule. - -You can also use `ConditionalRuleGroup` and `ActivationRuleGroup` to implement -an if/else workflow. The following logic in pseudocode: - -```text -if source == "airflow": - if clientTags["label"] == "foo": - return "etl-foo" - else if clientTags["label"] = "bar": - return "etl-bar" - else - return "etl" -``` - -This logic can be implemented with the following rules: +name: "initialize state" +description: "Add a set to the state map to track rules that have evaluated to true" +priority: 0 +condition: "true" +actions: + - | + state.put("triggeredRules",new HashSet()) + # MVEL does not support type parameters! HashSet() would result in an error. +--- +name: "airflow" +description: "if query from airflow, route to etl group" +priority: 1 +condition: | + request.getHeader("X-Trino-Source") == "airflow" +actions: + - | + result.put("routingGroup", "etl") + - | + state.get("triggeredRules").add("airflow") +--- +name: "airflow special" +description: "if query from airflow with special label, route to etl-special group" +priority: 2 +condition: | + state.get("triggeredRules").contains("airflow") && request.getHeader("X-Trino-Client-Tags") contains "label=special" +actions: + - | + result.put("routingGroup", "etl-special") -```yaml -name: "airflow rule group" -description: "routing rules for query from airflow" -compositeRuleType: "ConditionalRuleGroup" -composingRules: - - name: "main condition" - description: "source is airflow" - priority: 0 # rule with the highest priority acts as main condition - condition: 'request.getHeader("X-Trino-Source") == "airflow"' - actions: - - "" - - name: "airflow subrules" - compositeRuleType: "ActivationRuleGroup" # use ActivationRuleGroup to simulate if/else - composingRules: - - name: "label foo" - description: "label client tag is foo" - priority: 0 - condition: 'request.getHeader("X-Trino-Client-Tags") contains "label=foo"' - actions: - - 'result.put("routingGroup", "etl-foo")' - - name: "label bar" - description: "label client tag is bar" - priority: 0 - condition: 'request.getHeader("X-Trino-Client-Tags") contains "label=bar"' - actions: - - 'result.put("routingGroup", "etl-bar")' - - name: "airflow default" - description: "airflow queries default to etl" - condition: "true" - actions: - - 'result.put("routingGroup", "etl")' ``` ##### If statements (MVEL Flow Control) -In the preceding section you see how `ConditionalRuleGroup` and -`ActivationRuleGroup` are used to implement an `if/else` workflow. You can -use MVEL support for `if` statements and other flow control. The following logic +You can use MVEL support for `if` statements and other flow control. The following logic in pseudocode: ```text diff --git a/gateway-ha/pom.xml b/gateway-ha/pom.xml index eabf9b63d..a6fce920a 100644 --- a/gateway-ha/pom.xml +++ b/gateway-ha/pom.xml @@ -260,21 +260,9 @@ - org.jeasy - easy-rules-core - ${dep.jeasy.version} - - - - org.jeasy - easy-rules-mvel - ${dep.jeasy.version} - - - - org.jeasy - easy-rules-support - ${dep.jeasy.version} + org.mvel + mvel2 + 2.5.2.Final @@ -311,13 +299,6 @@ runtime - - org.mvel - mvel2 - 2.5.2.Final - runtime - - org.postgresql postgresql diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/config/RoutingRulesConfiguration.java b/gateway-ha/src/main/java/io/trino/gateway/ha/config/RoutingRulesConfiguration.java index d780e7823..63bf7c7d4 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/config/RoutingRulesConfiguration.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/config/RoutingRulesConfiguration.java @@ -13,6 +13,10 @@ */ package io.trino.gateway.ha.config; +import io.airlift.units.Duration; + +import static java.util.concurrent.TimeUnit.MINUTES; + public class RoutingRulesConfiguration { private boolean rulesEngineEnabled; @@ -20,6 +24,8 @@ public class RoutingRulesConfiguration private String rulesConfigPath; private RulesExternalConfiguration rulesExternalConfiguration; + private Duration rulesRefreshPeriod = new Duration(1, MINUTES); + public RoutingRulesConfiguration() {} public boolean isRulesEngineEnabled() @@ -61,4 +67,14 @@ public void setRulesExternalConfiguration(RulesExternalConfiguration rulesExtern { this.rulesExternalConfiguration = rulesExternalConfiguration; } + + public Duration getRulesRefreshPeriod() + { + return rulesRefreshPeriod; + } + + public void setRulesRefreshPeriod(Duration rulesRefreshPeriod) + { + this.rulesRefreshPeriod = rulesRefreshPeriod; + } } diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/module/HaGatewayProviderModule.java b/gateway-ha/src/main/java/io/trino/gateway/ha/module/HaGatewayProviderModule.java index 550c98daf..dd1cf6649 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/module/HaGatewayProviderModule.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/module/HaGatewayProviderModule.java @@ -182,10 +182,10 @@ public RoutingGroupSelector getRoutingGroupSelector() if (routingRulesConfig.isRulesEngineEnabled()) { try { return switch (routingRulesConfig.getRulesType()) { - case FILE -> { - String rulesConfigPath = routingRulesConfig.getRulesConfigPath(); - yield RoutingGroupSelector.byRoutingRulesEngine(rulesConfigPath, configuration.getRequestAnalyzerConfig()); - } + case FILE -> RoutingGroupSelector.byRoutingRulesEngine( + routingRulesConfig.getRulesConfigPath(), + routingRulesConfig.getRulesRefreshPeriod(), + configuration.getRequestAnalyzerConfig()); case EXTERNAL -> { RulesExternalConfiguration rulesExternalConfiguration = routingRulesConfig.getRulesExternalConfiguration(); yield RoutingGroupSelector.byRoutingExternal(rulesExternalConfiguration, configuration.getRequestAnalyzerConfig()); diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/router/FileBasedRoutingGroupSelector.java b/gateway-ha/src/main/java/io/trino/gateway/ha/router/FileBasedRoutingGroupSelector.java new file mode 100644 index 000000000..1c4cbddfb --- /dev/null +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/router/FileBasedRoutingGroupSelector.java @@ -0,0 +1,107 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.gateway.ha.router; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.fasterxml.jackson.dataformat.yaml.YAMLParser; +import com.google.common.base.Supplier; +import com.google.common.collect.ImmutableMap; +import io.airlift.log.Logger; +import io.airlift.units.Duration; +import io.trino.gateway.ha.config.RequestAnalyzerConfig; +import jakarta.servlet.http.HttpServletRequest; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static com.google.common.base.Suppliers.memoizeWithExpiration; +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.Collections.sort; + +public class FileBasedRoutingGroupSelector + implements RoutingGroupSelector +{ + private static final Logger log = Logger.get(FileBasedRoutingGroupSelector.class); + public static final String RESULTS_ROUTING_GROUP_KEY = "routingGroup"; + + private static final ObjectMapper yamlReader = new ObjectMapper(new YAMLFactory()); + + private final Supplier> rules; + private final boolean analyzeRequest; + private final boolean clientsUseV2Format; + private final int maxBodySize; + private final TrinoRequestUser.TrinoRequestUserProvider trinoRequestUserProvider; + + public FileBasedRoutingGroupSelector(String rulesPath, Duration rulesRefreshPeriod, RequestAnalyzerConfig requestAnalyzerConfig) + { + analyzeRequest = requestAnalyzerConfig.isAnalyzeRequest(); + clientsUseV2Format = requestAnalyzerConfig.isClientsUseV2Format(); + maxBodySize = requestAnalyzerConfig.getMaxBodySize(); + trinoRequestUserProvider = new TrinoRequestUser.TrinoRequestUserProvider(requestAnalyzerConfig); + + rules = memoizeWithExpiration(() -> readRulesFromPath(Path.of(rulesPath)), rulesRefreshPeriod.toJavaTime()); + } + + @Override + public String findRoutingGroup(HttpServletRequest request) + { + Map result = new HashMap<>(); + Map state = new HashMap<>(); + + Map data; + if (analyzeRequest) { + TrinoQueryProperties trinoQueryProperties = new TrinoQueryProperties( + request, + clientsUseV2Format, + maxBodySize); + TrinoRequestUser trinoRequestUser = trinoRequestUserProvider.getInstance(request); + data = ImmutableMap.of("request", request, "trinoQueryProperties", trinoQueryProperties, "trinoRequestUser", trinoRequestUser); + } + else { + data = ImmutableMap.of("request", request); + } + + rules.get().forEach(rule -> { + if (rule.evaluateCondition(data, state)) { + log.debug(rule + " evaluated to true on request: " + request); + rule.evaluateAction(result, data, state); + } + }); + return result.get(RESULTS_ROUTING_GROUP_KEY); + } + + public List readRulesFromPath(Path rulesPath) + { + try { + String content = Files.readString(rulesPath, UTF_8); + YAMLParser parser = new YAMLFactory().createParser(content); + List routingRulesList = new ArrayList<>(); + while (parser.nextToken() != null) { + MVELRoutingRule routingRules = yamlReader.readValue(parser, MVELRoutingRule.class); + routingRulesList.add(routingRules); + } + sort(routingRulesList); + return routingRulesList; + } + catch (IOException e) { + throw new RuntimeException("Failed to read or parse routing rules configuration from path: " + rulesPath, e); + } + } +} diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/router/MVELRoutingRule.java b/gateway-ha/src/main/java/io/trino/gateway/ha/router/MVELRoutingRule.java new file mode 100644 index 000000000..bcb4b9ccc --- /dev/null +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/router/MVELRoutingRule.java @@ -0,0 +1,139 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.gateway.ha.router; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableMap; +import org.mvel2.ParserContext; +import org.mvel2.debug.DebugTools; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static java.util.Objects.requireNonNull; +import static java.util.Objects.requireNonNullElse; +import static org.mvel2.MVEL.compileExpression; +import static org.mvel2.MVEL.executeExpression; +import static org.mvel2.debug.DebugTools.decompile; + +public class MVELRoutingRule + implements RoutingRule +{ + String name; + String description; + Integer priority; + Serializable condition; + List actions; + ParserContext parserContext = new ParserContext(); + + @JsonCreator + public MVELRoutingRule( + @JsonProperty("name") String name, + @JsonProperty("description") String description, + @JsonProperty("priority") Integer priority, + @JsonProperty("condition") Serializable condition, + @JsonProperty("actions") List actions) + { + initializeParserContext(parserContext); + + this.name = requireNonNull(name, "name is null"); + this.description = requireNonNullElse(description, ""); + this.priority = requireNonNullElse(priority, 0); + this.condition = requireNonNull( + condition instanceof String stringCondition ? compileExpression(stringCondition, parserContext) : condition, + "condition is null"); + this.actions = requireNonNull(actions.stream() + .map(this::compileExpressionIfNecessary).collect(toImmutableList()), "actions is null"); + } + + private Serializable compileExpressionIfNecessary(Serializable expression) + { + if (expression instanceof String stringExpression) { + return compileExpression(stringExpression, parserContext); + } + return expression; + } + + private void initializeParserContext(ParserContext parserContext) + { + parserContext.addPackageImport("java.util"); + + // Members of java.lang, excluding potential security hazards such as Process and Runtime + parserContext.addImport(Boolean.class); + parserContext.addImport(Byte.class); + parserContext.addImport(Character.class); + parserContext.addImport(Double.class); + parserContext.addImport(Enum.class); + parserContext.addImport(Exception.class); + parserContext.addImport(Float.class); + parserContext.addImport(Integer.class); + parserContext.addImport(Long.class); + parserContext.addImport(Math.class); + parserContext.addImport(Short.class); + parserContext.addImport(StrictMath.class); + parserContext.addImport(String.class); + parserContext.addImport(StringBuffer.class); + parserContext.addImport(StringBuilder.class); + } + + @Override + public Integer getPriority() + { + return priority; + } + + @Override + public int compareTo(RoutingRule o) + { + if (o == null) { + return 1; + } + return priority.compareTo(o.getPriority()); + } + + @Override + public boolean evaluateCondition(Map data, Map state) + { + ImmutableMap.Builder variablesBuilder = ImmutableMap.builder(); + variablesBuilder.putAll(data); + variablesBuilder.put("state", state); + return (boolean) executeExpression(condition, variablesBuilder.build()); + } + + @Override + public void evaluateAction(Map result, Map data, Map state) + { + ImmutableMap.Builder variablesBuilder = ImmutableMap.builder(); + variablesBuilder.putAll(data); + variablesBuilder.put("result", result); + variablesBuilder.put("state", state); + actions.forEach(action -> executeExpression(action, variablesBuilder.build())); + } + + @Override + public String toString() + { + return "MVELRoutingRule{" + + "name='" + name + '\'' + + ", description='" + description + '\'' + + ", priority=" + priority + + ", condition=" + decompile(condition) + + ", actions=" + String.join(",", actions.stream().map(DebugTools::decompile).toList()) + + ", parserContext=" + parserContext + + '}'; + } +} diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/router/RoutingGroupSelector.java b/gateway-ha/src/main/java/io/trino/gateway/ha/router/RoutingGroupSelector.java index ae6285e14..d912ec0fa 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/router/RoutingGroupSelector.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/router/RoutingGroupSelector.java @@ -13,6 +13,7 @@ */ package io.trino.gateway.ha.router; +import io.airlift.units.Duration; import io.trino.gateway.ha.config.RequestAnalyzerConfig; import io.trino.gateway.ha.config.RulesExternalConfiguration; import jakarta.servlet.http.HttpServletRequest; @@ -37,9 +38,9 @@ static RoutingGroupSelector byRoutingGroupHeader() * Routing group selector that uses routing engine rules * to determine the right routing group. */ - static RoutingGroupSelector byRoutingRulesEngine(String rulesConfigPath, RequestAnalyzerConfig requestAnalyzerConfig) + static RoutingGroupSelector byRoutingRulesEngine(String rulesConfigPath, Duration rulesRefreshPeriod, RequestAnalyzerConfig requestAnalyzerConfig) { - return new RuleReloadingRoutingGroupSelector(rulesConfigPath, requestAnalyzerConfig); + return new FileBasedRoutingGroupSelector(rulesConfigPath, rulesRefreshPeriod, requestAnalyzerConfig); } /** diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/router/RoutingRule.java b/gateway-ha/src/main/java/io/trino/gateway/ha/router/RoutingRule.java new file mode 100644 index 000000000..d9110aeff --- /dev/null +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/router/RoutingRule.java @@ -0,0 +1,26 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.gateway.ha.router; + +import java.util.Map; + +public interface RoutingRule + extends Comparable +{ + boolean evaluateCondition(Map data, Map state); + + void evaluateAction(Map result, Map data, Map state); + + Integer getPriority(); +} diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/router/RuleReloadingRoutingGroupSelector.java b/gateway-ha/src/main/java/io/trino/gateway/ha/router/RuleReloadingRoutingGroupSelector.java deleted file mode 100644 index c6b763e79..000000000 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/router/RuleReloadingRoutingGroupSelector.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.gateway.ha.router; - -import io.airlift.log.Logger; -import io.trino.gateway.ha.config.RequestAnalyzerConfig; -import jakarta.servlet.http.HttpServletRequest; -import org.jeasy.rules.api.Facts; -import org.jeasy.rules.api.Rules; -import org.jeasy.rules.api.RulesEngine; -import org.jeasy.rules.core.DefaultRulesEngine; -import org.jeasy.rules.mvel.MVELRuleFactory; -import org.jeasy.rules.support.reader.YamlRuleDefinitionReader; - -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.attribute.BasicFileAttributes; -import java.util.HashMap; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import static java.nio.charset.StandardCharsets.UTF_8; - -public class RuleReloadingRoutingGroupSelector - implements RoutingGroupSelector -{ - private static final Logger log = Logger.get(RuleReloadingRoutingGroupSelector.class); - private final RulesEngine rulesEngine = new DefaultRulesEngine(); - private final MVELRuleFactory ruleFactory = new MVELRuleFactory(new YamlRuleDefinitionReader()); - private final Path rulesConfigPath; - private volatile Rules rules = new Rules(); - private volatile long lastUpdatedTime; - private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(true); - private final RequestAnalyzerConfig requestAnalyzerConfig; - private final TrinoRequestUser.TrinoRequestUserProvider trinoRequestUserProvider; - - RuleReloadingRoutingGroupSelector(String rulesConfigPath, RequestAnalyzerConfig requestAnalyzerConfig) - { - this.rulesConfigPath = Path.of(rulesConfigPath); - this.requestAnalyzerConfig = requestAnalyzerConfig; - trinoRequestUserProvider = new TrinoRequestUser.TrinoRequestUserProvider(requestAnalyzerConfig); - try { - rules = ruleFactory.createRules( - Files.newBufferedReader(this.rulesConfigPath, UTF_8)); - BasicFileAttributes attr = Files.readAttributes(this.rulesConfigPath, - BasicFileAttributes.class); - lastUpdatedTime = attr.lastModifiedTime().toMillis(); - } - catch (Exception e) { - throw new RuntimeException("Error opening rules configuration file at " - + rulesConfigPath + "\n" - + "Using routing group header as default.", e); - } - } - - @Override - public String findRoutingGroup(HttpServletRequest request) - { - try { - BasicFileAttributes attr = Files.readAttributes(rulesConfigPath, - BasicFileAttributes.class); - log.debug("File modified time: %s. lastUpdatedTime: %s", attr.lastModifiedTime(), lastUpdatedTime); - if (attr.lastModifiedTime().toMillis() > lastUpdatedTime) { - Lock writeLock = readWriteLock.writeLock(); - writeLock.lock(); - try { - if (attr.lastModifiedTime().toMillis() > lastUpdatedTime) { - // This check is performed again to prevent parsing the rules twice in case another - // thread finds the condition true and acquires the lock before this one - log.info("Updating rules to file modified at %s", attr.lastModifiedTime()); - rules = ruleFactory.createRules( - Files.newBufferedReader(rulesConfigPath, UTF_8)); - lastUpdatedTime = attr.lastModifiedTime().toMillis(); - } - } - finally { - writeLock.unlock(); - } - } - - Facts facts = new Facts(); - HashMap result = new HashMap(); - - facts.put("request", request); - if (requestAnalyzerConfig.isAnalyzeRequest()) { - TrinoQueryProperties trinoQueryProperties = new TrinoQueryProperties( - request, - requestAnalyzerConfig.isClientsUseV2Format(), - requestAnalyzerConfig.getMaxBodySize()); - TrinoRequestUser trinoRequestUser = trinoRequestUserProvider.getInstance(request); - facts.put("trinoQueryProperties", trinoQueryProperties); - facts.put("trinoRequestUser", trinoRequestUser); - } - facts.put("result", result); - Lock readLock = readWriteLock.readLock(); - readLock.lock(); - try { - rulesEngine.fire(rules, facts); - } - finally { - readLock.unlock(); - } - return result.get("routingGroup"); - } - catch (Exception e) { - log.error(e, "Error opening rules configuration file, using " - + "routing group header as default."); - // Invalid rules could lead to perf problems as every thread goes into the writeLock - // block until the issue is resolved - } - return request.getHeader(ROUTING_GROUP_HEADER); - } -} diff --git a/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestRoutingGroupSelector.java b/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestRoutingGroupSelector.java index fa632fef4..c5823b88e 100644 --- a/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestRoutingGroupSelector.java +++ b/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestRoutingGroupSelector.java @@ -14,6 +14,7 @@ package io.trino.gateway.ha.router; import com.google.common.collect.ImmutableSet; +import io.airlift.units.Duration; import io.trino.gateway.ha.config.RequestAnalyzerConfig; import io.trino.sql.tree.QualifiedName; import jakarta.servlet.http.HttpServletRequest; @@ -42,6 +43,8 @@ import static io.trino.gateway.ha.router.RoutingGroupSelector.ROUTING_GROUP_HEADER; import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.concurrent.TimeUnit.HOURS; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -54,6 +57,7 @@ final class TestRoutingGroupSelector private static final String DEFAULT_CATALOG = "default_catalog"; private static final String DEFAULT_SCHEMA = "default_schema"; + private final Duration oneHourRefreshPeriod = new Duration(1, HOURS); RequestAnalyzerConfig requestAnalyzerConfig = new RequestAnalyzerConfig(); @@ -68,9 +72,9 @@ static Stream provideRoutingRuleConfigFiles() String rulesDir = "src/test/resources/rules/"; return Stream.of( rulesDir + "routing_rules_atomic.yml", - rulesDir + "routing_rules_composite.yml", rulesDir + "routing_rules_priorities.yml", - rulesDir + "routing_rules_if_statements.yml"); + rulesDir + "routing_rules_if_statements.yml", + rulesDir + "routing_rules_state.yml"); } @Test @@ -93,7 +97,7 @@ void testByRoutingGroupHeader() void testByRoutingRulesEngine(String rulesConfigPath) { RoutingGroupSelector routingGroupSelector = - RoutingGroupSelector.byRoutingRulesEngine(rulesConfigPath, requestAnalyzerConfig); + RoutingGroupSelector.byRoutingRulesEngine(rulesConfigPath, oneHourRefreshPeriod, requestAnalyzerConfig); HttpServletRequest mockRequest = prepareMockRequest(); @@ -106,7 +110,10 @@ void testByRoutingRulesEngine(String rulesConfigPath) void testGetUserFromBasicAuth() { RoutingGroupSelector routingGroupSelector = - RoutingGroupSelector.byRoutingRulesEngine("src/test/resources/rules/routing_rules_trino_query_properties.yml", requestAnalyzerConfig); + RoutingGroupSelector.byRoutingRulesEngine( + "src/test/resources/rules/routing_rules_trino_query_properties.yml", + oneHourRefreshPeriod, + requestAnalyzerConfig); String encodedUsernamePassword = Base64.getEncoder().encodeToString("will:supersecret".getBytes(UTF_8)); HttpServletRequest mockRequest = prepareMockRequest(); @@ -121,7 +128,10 @@ void testTrinoQueryPropertiesQueryDetails() throws IOException { RoutingGroupSelector routingGroupSelector = - RoutingGroupSelector.byRoutingRulesEngine("src/test/resources/rules/routing_rules_trino_query_properties.yml", requestAnalyzerConfig); + RoutingGroupSelector.byRoutingRulesEngine( + "src/test/resources/rules/routing_rules_trino_query_properties.yml", + oneHourRefreshPeriod, + requestAnalyzerConfig); String query = "SELECT x.*, y.*, z.* FROM catx.schemx.tblx x, schemy.tbly y, tblz z"; Reader reader = new StringReader(query); BufferedReader bufferedReader = new BufferedReader(reader); @@ -138,7 +148,10 @@ void testTrinoQueryPropertiesCatalogSchemas() throws IOException { RoutingGroupSelector routingGroupSelector = - RoutingGroupSelector.byRoutingRulesEngine("src/test/resources/rules/routing_rules_trino_query_properties.yml", requestAnalyzerConfig); + RoutingGroupSelector.byRoutingRulesEngine( + "src/test/resources/rules/routing_rules_trino_query_properties.yml", + oneHourRefreshPeriod, + requestAnalyzerConfig); String query = "SELECT x.*, y.* FROM catx.nondefault.tblx x, caty.default.tbly y"; Reader reader = new StringReader(query); BufferedReader bufferedReader = new BufferedReader(reader); @@ -154,7 +167,10 @@ void testTrinoQueryPropertiesCatalogSchemas() void testTrinoQueryPropertiesSessionDefaults() { RoutingGroupSelector routingGroupSelector = - RoutingGroupSelector.byRoutingRulesEngine("src/test/resources/rules/routing_rules_trino_query_properties.yml", requestAnalyzerConfig); + RoutingGroupSelector.byRoutingRulesEngine( + "src/test/resources/rules/routing_rules_trino_query_properties.yml", + oneHourRefreshPeriod, + requestAnalyzerConfig); HttpServletRequest mockRequest = prepareMockRequest(); when(mockRequest.getHeader(TrinoQueryProperties.TRINO_CATALOG_HEADER_NAME)).thenReturn("other_catalog"); @@ -168,7 +184,10 @@ void testTrinoQueryPropertiesQueryType() throws IOException { RoutingGroupSelector routingGroupSelector = - RoutingGroupSelector.byRoutingRulesEngine("src/test/resources/rules/routing_rules_trino_query_properties.yml", requestAnalyzerConfig); + RoutingGroupSelector.byRoutingRulesEngine( + "src/test/resources/rules/routing_rules_trino_query_properties.yml", + oneHourRefreshPeriod, + requestAnalyzerConfig); String query = "INSERT INTO foo SELECT 1"; Reader reader = new StringReader(query); BufferedReader bufferedReader = new BufferedReader(reader); @@ -184,7 +203,10 @@ void testTrinoQueryPropertiesAlternateStatementFormat() { requestAnalyzerConfig.setClientsUseV2Format(true); RoutingGroupSelector routingGroupSelector = - RoutingGroupSelector.byRoutingRulesEngine("src/test/resources/rules/routing_rules_trino_query_properties.yml", requestAnalyzerConfig); + RoutingGroupSelector.byRoutingRulesEngine( + "src/test/resources/rules/routing_rules_trino_query_properties.yml", + oneHourRefreshPeriod, + requestAnalyzerConfig); String body = "{\"preparedStatements\" : {\"statement1\":\"INSERT INTO foo SELECT 1\"}, \"query\": \"EXECUTE statement1\"}"; Reader reader = new StringReader(body); BufferedReader bufferedReader = new BufferedReader(reader); @@ -202,7 +224,10 @@ void testTrinoQueryPropertiesPreparedStatementInHeader() String body = "EXECUTE statement4"; RoutingGroupSelector routingGroupSelector = - RoutingGroupSelector.byRoutingRulesEngine("src/test/resources/rules/routing_rules_trino_query_properties.yml", requestAnalyzerConfig); + RoutingGroupSelector.byRoutingRulesEngine( + "src/test/resources/rules/routing_rules_trino_query_properties.yml", + oneHourRefreshPeriod, + requestAnalyzerConfig); Reader reader = new StringReader(body); BufferedReader bufferedReader = new BufferedReader(reader); HttpServletRequest mockRequest = prepareMockRequest(); @@ -220,7 +245,7 @@ void testTrinoQueryPropertiesPreparedStatementInHeader() void testByRoutingRulesEngineSpecialLabel(String rulesConfigPath) { RoutingGroupSelector routingGroupSelector = - RoutingGroupSelector.byRoutingRulesEngine(rulesConfigPath, requestAnalyzerConfig); + RoutingGroupSelector.byRoutingRulesEngine(rulesConfigPath, oneHourRefreshPeriod, requestAnalyzerConfig); HttpServletRequest mockRequest = prepareMockRequest(); @@ -236,7 +261,7 @@ void testByRoutingRulesEngineSpecialLabel(String rulesConfigPath) void testByRoutingRulesEngineNoMatch(String rulesConfigPath) { RoutingGroupSelector routingGroupSelector = - RoutingGroupSelector.byRoutingRulesEngine(rulesConfigPath, requestAnalyzerConfig); + RoutingGroupSelector.byRoutingRulesEngine(rulesConfigPath, oneHourRefreshPeriod, requestAnalyzerConfig); HttpServletRequest mockRequest = prepareMockRequest(); // even though special label is present, query is not from airflow. @@ -261,10 +286,10 @@ void testByRoutingRulesEngineFileChange() + "actions:\n" + " - \"result.put(\\\"routingGroup\\\", \\\"etl\\\")\""); } - long lastModified = file.lastModified(); + Duration refreshPeriod = new Duration(1, MILLISECONDS); RoutingGroupSelector routingGroupSelector = - RoutingGroupSelector.byRoutingRulesEngine(file.getPath(), requestAnalyzerConfig); + RoutingGroupSelector.byRoutingRulesEngine(file.getPath(), refreshPeriod, requestAnalyzerConfig); HttpServletRequest mockRequest = prepareMockRequest(); @@ -281,7 +306,7 @@ void testByRoutingRulesEngineFileChange() + "actions:\n" + " - \"result.put(\\\"routingGroup\\\", \\\"etl2\\\")\""); // change from etl to etl2 } - assertThat(file.setLastModified(lastModified + 1000)).isTrue(); + Thread.sleep(2 * refreshPeriod.toMillis()); when(mockRequest.getHeader(TRINO_SOURCE_HEADER)).thenReturn("airflow"); assertThat(routingGroupSelector.findRoutingGroup(mockRequest)) diff --git a/gateway-ha/src/test/resources/rules/routing_rules_composite.yml b/gateway-ha/src/test/resources/rules/routing_rules_composite.yml deleted file mode 100644 index 2ddee5d9c..000000000 --- a/gateway-ha/src/test/resources/rules/routing_rules_composite.yml +++ /dev/null @@ -1,24 +0,0 @@ -name: "airflow rule group" -description: "routing rules for query from airflow" -compositeRuleType: "ConditionalRuleGroup" -composingRules: - - name: "main condition" - description: "source is airflow" - priority: 0 - condition: "request.getHeader(\"X-Trino-Source\") == \"airflow\"" - actions: - - "" - - name: "airflow subrules" - compositeRuleType: "ActivationRuleGroup" - composingRules: - - name: "airflow special" - description: "special label" - priority: 0 - condition: "request.getHeader(\"X-Trino-Client-Tags\") contains \"label=special\"" - actions: - - "result.put(\"routingGroup\", \"etl-special\")" - - name: "airflow default" - description: "airflow queries default to etl" - condition: "true" - actions: - - "result.put(\"routingGroup\", \"etl\")" diff --git a/gateway-ha/src/test/resources/rules/routing_rules_state.yml b/gateway-ha/src/test/resources/rules/routing_rules_state.yml new file mode 100644 index 000000000..463ebd84f --- /dev/null +++ b/gateway-ha/src/test/resources/rules/routing_rules_state.yml @@ -0,0 +1,31 @@ +--- +name: "initialize state" +description: "Add a set to the state map to track rules that have evaluated to true" +priority: 0 +condition: "true" +actions: + - | + state.put("triggeredRules",new HashSet()) + # MVEL does not support type parameters! Using one will result in a syntax error. + # Effectively this results in all objects of classes that support + # parametrization being declared as ParametrizedClass +--- +name: "airflow" +description: "if query from airflow, route to etl group" +priority: 1 +condition: | + request.getHeader("X-Trino-Source") == "airflow" +actions: + - | + result.put("routingGroup", "etl") + - | + state.get("triggeredRules").add("airflow") +--- +name: "airflow special" +description: "if query from airflow with special label, route to etl-special group" +priority: 2 +condition: | + state.get("triggeredRules").contains("airflow") && request.getHeader("X-Trino-Client-Tags") contains "label=special" +actions: + - | + result.put("routingGroup", "etl-special")