diff --git a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 index 400461d2d497..9b4748d5dee1 100644 --- a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 +++ b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 @@ -16,6 +16,11 @@ lexer grammar SqlBaseLexer; +@header { +import java.util.ArrayDeque; +import java.util.Deque; +} + @members { /** * When true, parser should throw ParseException for unclosed bracketed comment. @@ -70,6 +75,11 @@ lexer grammar SqlBaseLexer; has_unclosed_bracketed_comment = true; } + /** + * This field stores the tags which are used to detect the end of a dollar quoted string literal. + */ + private final Deque tags = new ArrayDeque(); + /** * When greater than zero, it's in the middle of parsing ARRAY/MAP/STRUCT type. */ @@ -325,6 +335,7 @@ MATCHED: 'MATCHED'; MATERIALIZED: 'MATERIALIZED'; MAX: 'MAX'; MERGE: 'MERGE'; +METRICS: 'METRICS'; MICROSECOND: 'MICROSECOND'; MICROSECONDS: 'MICROSECONDS'; MILLISECOND: 'MILLISECOND'; @@ -557,6 +568,10 @@ STRING_LITERAL | 'R"'(~'"')* '"' ; +BEGIN_DOLLAR_QUOTED_STRING + : DOLLAR_QUOTED_TAG {tags.push(getText());} -> pushMode(DOLLAR_QUOTED_STRING_MODE) + ; + DOUBLEQUOTED_STRING :'"' ( ~('"'|'\\') | '""' | ('\\' .) )* '"' ; @@ -634,6 +649,10 @@ fragment LETTER : [A-Z] ; +fragment DOLLAR_QUOTED_TAG + : '$' LETTER* '$' + ; + fragment UNICODE_LETTER : [\p{L}] ; @@ -656,3 +675,14 @@ WS UNRECOGNIZED : . ; + +mode DOLLAR_QUOTED_STRING_MODE; + + DOLLAR_QUOTED_STRING_BODY + : ~'$'+ + | '$' ~'$'* + ; + + END_DOLLAR_QUOTED_STRING + : DOLLAR_QUOTED_TAG {getText().equals(tags.peek())}? {tags.pop();} -> popMode + ; diff --git a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 index 132ced820e9a..4f1b5620569d 100644 --- a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 +++ b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 @@ -323,6 +323,14 @@ statement (PARTITIONED ON identifierList) | (TBLPROPERTIES propertyList))* AS query #createView + | CREATE (OR REPLACE)? + VIEW (IF errorCapturingNot EXISTS)? identifierReference + identifierCommentList? + ((WITH METRICS) | + routineLanguage | + commentSpec | + (TBLPROPERTIES propertyList))* + AS codeLiteral #createMetricView | CREATE (OR REPLACE)? GLOBAL? TEMPORARY VIEW tableIdentifier (LEFT_PAREN colTypeList RIGHT_PAREN)? tableProvider (OPTIONS propertyList)? #createTempViewUsing @@ -1523,6 +1531,17 @@ complexColType : errorCapturingIdentifier COLON? dataType (errorCapturingNot NULL)? commentSpec? ; +// The code literal is defined as a dollar quoted string. +// A dollar quoted string consists of +// - a begin tag which contains a dollar sign, an optional tag, and another dollar sign, +// - a string literal that is made up of arbitrary sequence of characters, and +// - an end tag which has to be exact the same as the begin tag. +// As the string literal can contain dollar signs, we add + to DOLLAR_QUOTED_STRING_BODY to avoid +// the parser eagarly matching END_DOLLAR_QUOTED_STRING when seeing a dollar sign. +codeLiteral + : BEGIN_DOLLAR_QUOTED_STRING DOLLAR_QUOTED_STRING_BODY+ END_DOLLAR_QUOTED_STRING + ; + routineCharacteristics : (routineLanguage | specificName diff --git a/sql/api/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala b/sql/api/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala index 553161ea2db0..f468cc1cc72d 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala @@ -618,7 +618,7 @@ private[sql] object QueryParsingErrors extends DataTypeErrorsBase { ctx) } - def createViewWithBothIfNotExistsAndReplaceError(ctx: CreateViewContext): Throwable = { + def createViewWithBothIfNotExistsAndReplaceError(ctx: ParserRuleContext): Throwable = { new ParseException(errorClass = "_LEGACY_ERROR_TEMP_0052", ctx) } @@ -774,6 +774,18 @@ private[sql] object QueryParsingErrors extends DataTypeErrorsBase { ctx) } + def missingClausesForOperation( + ctx: ParserRuleContext, + clauses: String, + operation: String): Throwable = + new ParseException( + errorClass = "MISSING_CLAUSES_FOR_OPERATION", + messageParameters = Map( + "clauses" -> clauses, + "operation" -> operation), + ctx + ) + def invalidDatetimeUnitError( ctx: ParserRuleContext, functionName: String, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 6b0665c1b7f3..134cba088062 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -2155,6 +2155,8 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor case f @ UnresolvedFunction(nameParts, _, _, _, _, _, _) => if (functionResolution.lookupBuiltinOrTempFunction(nameParts, Some(f)).isDefined) { f + } else if (nameParts.length == 1 && nameParts.head.equalsIgnoreCase("measure")) { + f } else { val CatalogAndIdentifier(catalog, ident) = relationResolution.expandIdentifier(nameParts) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 36e40306be7d..e9e7a1b38db1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -819,6 +819,7 @@ object FunctionRegistry { expression[ThetaDifference]("theta_difference"), expression[ThetaIntersection]("theta_intersection"), expression[ApproxTopKEstimate]("approx_top_k_estimate"), + expression[Measure]("measure"), // grouping sets expression[Grouping]("grouping"), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala index ada47bd3a40c..25273c73eb7f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, View} import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.metricview.logical.ResolvedMetricView /** * This file defines view types and analysis rules related to views. @@ -32,6 +33,7 @@ import org.apache.spark.sql.catalyst.rules.Rule object EliminateView extends Rule[LogicalPlan] with CastSupport { override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { case v: View => v.child + case rmv: ResolvedMetricView => rmv.child } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index be90c7ad3656..191e2091c40d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -49,6 +49,7 @@ import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAM import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.StaticSQLConf.GLOBAL_TEMP_DATABASE +import org.apache.spark.sql.metricview.util.MetricViewPlanner import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType} import org.apache.spark.sql.util.{CaseInsensitiveStringMap, PartitioningUtils} import org.apache.spark.util.ArrayImplicits._ @@ -943,7 +944,9 @@ class SessionCatalog( val table = qualifiedIdent.table val multiParts = Seq(CatalogManager.SESSION_CATALOG_NAME, db, table) - if (metadata.tableType == CatalogTableType.VIEW) { + if (CatalogTable.isMetricView(metadata)) { + parseMetricViewDefinition(metadata) + } else if (metadata.tableType == CatalogTableType.VIEW) { // The relation is a view, so we wrap the relation by: // 1. Add a [[View]] operator over the relation to keep track of the view desc; // 2. Wrap the logical plan in a [[SubqueryAlias]] which tracks the name of the view. @@ -953,6 +956,27 @@ class SessionCatalog( } } + private def parseMetricViewDefinition(metadata: CatalogTable): LogicalPlan = { + val viewDefinition = metadata.viewText.getOrElse { + throw SparkException.internalError("Invalid view without text.") + } + val viewConfigs = metadata.viewSQLConfigs + val origin = CurrentOrigin.get.copy( + objectType = Some("METRIC VIEW"), + objectName = Some(metadata.qualifiedName) + ) + SQLConf.withExistingConf( + View.effectiveSQLConf( + configs = viewConfigs, + isTempView = false + ) + ) { + CurrentOrigin.withOrigin(origin) { + MetricViewPlanner.planRead(metadata, viewDefinition, parser, metadata.schema) + } + } + } + private def buildViewDDL(metadata: CatalogTable, isTempView: Boolean): Option[String] = { if (isTempView) { None diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index eab99a96f4c3..01153d516e5c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -722,6 +722,18 @@ object CatalogTable { val VIEW_CATALOG_AND_NAMESPACE = VIEW_PREFIX + "catalogAndNamespace.numParts" val VIEW_CATALOG_AND_NAMESPACE_PART_PREFIX = VIEW_PREFIX + "catalogAndNamespace.part." + + // Property to indicate that a VIEW is actually a METRIC VIEW + val VIEW_WITH_METRICS = VIEW_PREFIX + "viewWithMetrics" + + /** + * Check if a CatalogTable is a metric view by looking at its properties. + */ + def isMetricView(table: CatalogTable): Boolean = { + table.tableType == CatalogTableType.VIEW && + table.properties.get(VIEW_WITH_METRICS).contains("true") + } + // Convert the current catalog and namespace to properties. def catalogAndNamespaceToProps( currentCatalog: String, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Measure.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Measure.scala new file mode 100644 index 000000000000..344ed80fc36c --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Measure.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import org.apache.spark.sql.catalyst.analysis.FunctionRegistry +import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, ExpressionDescription, UnevaluableAggregateFunc} +import org.apache.spark.sql.catalyst.trees.TreePattern.{MEASURE, TreePattern} +import org.apache.spark.sql.catalyst.trees.UnaryLike +import org.apache.spark.sql.types.{AbstractDataType, AnyDataType, DataType} + +// This function serves as an annotation to tell the analyzer to calculate +// the measures defined in metric views. It cannot be evaluated in execution phase +// and instead it'll be replaced to the actual aggregate functions defined by +// the measure (as input argument). +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(expr) - this function is used and can only be used to calculate a measure defined in a metric view.", + examples = """ + Examples: + > SELECT dimension_col, _FUNC_(measure_col) + FROM test_metric_view + GROUP BY dimension_col; + dim_1, 100 + dim_2, 200 + """, + group = "agg_funcs", + since = "4.0.0") +// scalastyle:on line.size.limit +case class Measure(child: Expression) + extends UnevaluableAggregateFunc with ExpectsInputTypes + with UnaryLike[Expression] { + + override protected def withNewChildInternal(newChild: Expression): Measure = + copy(child = newChild) + + override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType) + + override def dataType: DataType = child.dataType + + override def prettyName: String = getTagValue(FunctionRegistry.FUNC_ALIAS).getOrElse("measure") + + override def nullable: Boolean = true + + override val nodePatterns: Seq[TreePattern] = Seq(MEASURE) +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala index 336db1382f89..0f7b6be765ee 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala @@ -24,7 +24,7 @@ import scala.util.matching.Regex import org.antlr.v4.runtime.{ParserRuleContext, Token} import org.antlr.v4.runtime.misc.Interval -import org.antlr.v4.runtime.tree.{ParseTree, TerminalNodeImpl} +import org.antlr.v4.runtime.tree.{ParseTree, TerminalNode, TerminalNodeImpl} import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.analysis.UnresolvedIdentifier @@ -88,6 +88,18 @@ object ParserUtils extends SparkParserUtils { node.getText.slice(1, node.getText.length - 1) } + /** + * Obtain the string literal provided as a dollar quoted string. + * A dollar quoted string is defined as {{{$[tag]$$[tag]$}}}, + * where the string literal is parsed as a list of body sections. + * This helper method concatenates all body sections and restores the string literal back. + */ + def dollarQuotedString(sections: util.List[TerminalNode]): String = { + val sb = new StringBuilder() + sections forEach (body => sb.append(body.getText)) + sb.toString() + } + /** Collect the entries if any. */ def entry(key: String, value: Token): Seq[(String, String)] = { Option(value).toSeq.map(x => key -> string(x)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala index 5ea93e74c5d7..a650eb8ed536 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala @@ -70,6 +70,7 @@ object TreePattern extends Enumeration { val MAP_FROM_ARRAYS: Value = Value val MAP_FROM_ENTRIES: Value = Value val MAP_OBJECTS: Value = Value + val MEASURE: Value = Value val MULTI_ALIAS: Value = Value val NEW_INSTANCE: Value = Value val NOT: Value = Value @@ -149,6 +150,7 @@ object TreePattern extends Enumeration { val LIMIT: Value = Value val LOCAL_RELATION: Value = Value val LOGICAL_QUERY_STAGE: Value = Value + val METRIC_VIEW_PLACEHOLDER: Value = Value val NATURAL_LIKE_JOIN: Value = Value val NO_GROUPING_AGGREGATE_REFERENCE: Value = Value val OFFSET: Value = Value @@ -162,6 +164,7 @@ object TreePattern extends Enumeration { val RELATION_TIME_TRAVEL: Value = Value val REPARTITION_OPERATION: Value = Value val REBALANCE_PARTITIONS: Value = Value + val RESOLVED_METRIC_VIEW: Value = Value val SERIALIZE_FROM_OBJECT: Value = Value val SORT: Value = Value val SQL_TABLE_FUNCTION: Value = Value diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/logical/metricViewNodes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/logical/metricViewNodes.scala new file mode 100644 index 000000000000..a7fa037a4b33 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/logical/metricViewNodes.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.metricview.logical + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode} +import org.apache.spark.sql.catalyst.trees.TreePattern.{METRIC_VIEW_PLACEHOLDER, RESOLVED_METRIC_VIEW, TreePattern} +import org.apache.spark.sql.metricview.serde.MetricView + +case class MetricViewPlaceholder( + metadata: CatalogTable, + desc: MetricView, + outputMetrics: Seq[Attribute], + child: LogicalPlan, + isCreate: Boolean = false) extends UnaryNode { + final override val nodePatterns: Seq[TreePattern] = Seq(METRIC_VIEW_PLACEHOLDER) + override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = { + copy(child = newChild) + } + override def output: Seq[Attribute] = outputMetrics + override lazy val resolved: Boolean = child.resolved + override def simpleString(maxFields: Int): String = + s"$nodeName ${output.mkString("[", ", ", "]")}".trim + + override def producedAttributes: AttributeSet = AttributeSet(outputMetrics) +} + +case class ResolvedMetricView( + identifier: TableIdentifier, + child: LogicalPlan) extends UnaryNode { + override def output: scala.Seq[Attribute] = child.output + override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = + copy(child = newChild) + override lazy val resolved: Boolean = child.resolved + final override val nodePatterns: Seq[TreePattern] = Seq(RESOLVED_METRIC_VIEW) +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/util/MetricViewPlanner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/util/MetricViewPlanner.scala new file mode 100644 index 000000000000..0d9c0ed25839 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/util/MetricViewPlanner.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.metricview.util + +import org.apache.spark.SparkException +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.catalyst.parser.ParserInterface +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan} +import org.apache.spark.sql.catalyst.types.DataTypeUtils +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.metricview.logical.MetricViewPlaceholder +import org.apache.spark.sql.metricview.serde.{AssetSource, MetricView, MetricViewFactory, MetricViewValidationException, MetricViewYAMLParsingException, SQLSource} +import org.apache.spark.sql.types.StructType + +object MetricViewPlanner { + + def planWrite( + metadata: CatalogTable, + yaml: String, + sqlParser: ParserInterface): MetricViewPlaceholder = { + val (metricView, dataModelPlan) = parseYAML(yaml, sqlParser) + MetricViewPlaceholder( + metadata, + metricView, + Seq.empty, + dataModelPlan, + isCreate = true + ) + } + + def planRead( + metadata: CatalogTable, + yaml: String, + sqlParser: ParserInterface, + expectedSchema: StructType): MetricViewPlaceholder = { + val (metricView, dataModelPlan) = parseYAML(yaml, sqlParser) + MetricViewPlaceholder( + metadata, + metricView, + DataTypeUtils.toAttributes(expectedSchema), + dataModelPlan + ) + } + + private def parseYAML( + yaml: String, + sqlParser: ParserInterface): (MetricView, LogicalPlan) = { + val metricView = try { + MetricViewFactory.fromYAML(yaml) + } catch { + case e: MetricViewValidationException => + throw QueryCompilationErrors.invalidLiteralForWindowDurationError() + case e: MetricViewYAMLParsingException => + throw QueryCompilationErrors.invalidLiteralForWindowDurationError() + } + val source = metricView.from match { + case asset: AssetSource => UnresolvedRelation(sqlParser.parseTableIdentifier(asset.name)) + case sqlSource: SQLSource => sqlParser.parsePlan(sqlSource.sql) + case _ => throw SparkException.internalError("Either SQLSource or AssetSource") + } + // Compute filter here because all necessary information is available. + val parsedPlan = metricView.where.map { cond => + Filter(sqlParser.parseExpression(cond), source) + }.getOrElse(source) + (metricView, parsedPlan) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveMetricView.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveMetricView.scala new file mode 100644 index 000000000000..7522efb59890 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveMetricView.scala @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import scala.collection.mutable + +import org.apache.spark.SparkException +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Measure} +import org.apache.spark.sql.catalyst.parser.ParserInterface +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.trees.TreePattern.METRIC_VIEW_PLACEHOLDER +import org.apache.spark.sql.metricview.logical.{MetricViewPlaceholder, ResolvedMetricView} +import org.apache.spark.sql.metricview.serde.{Column => CanonicalColumn, Constants => MetricViewConstants, Expression => CanonicalExpression, JsonUtils, MetricView => CanonicalMetricView, _} +import org.apache.spark.sql.types.{DataType, Metadata, MetadataBuilder} + +case class ResolveMetricView(session: SparkSession) extends Rule[LogicalPlan] { + private def parser: ParserInterface = session.sessionState.sqlParser + override def apply(plan: LogicalPlan): LogicalPlan = { + if (!plan.containsPattern(METRIC_VIEW_PLACEHOLDER)) { + return plan + } + plan.resolveOperatorsUp { + // CREATE PATH: to create a metric view, we need to analyze the metric view + // definition and get the output schema (with column metadata). Since the measures + // are aggregate functions, we need to use an Aggregate node and group by all + // dimensions to get the output schema. + case mvp: MetricViewPlaceholder if mvp.isCreate && mvp.child.resolved => + val (dimensions, measures) = buildMetricViewOutput(mvp.desc) + Aggregate( + // group by all dimensions + dimensions.map(_.toAttribute).toSeq, + // select all dimensions and measures to get the final output (mostly data types) + (dimensions ++ measures).toSeq, + mvp.child + ) + + // SELECT PATH: to read a metric view, user will use the `MEASURE` aggregate function + // to read the measures, so it'll lead to an Aggregate node. This way, we only need to + // Resolve the Aggregate node based on the metric view output and then replace + // the AttributeReference of the metric view output to the actual expressions. + case node @ MetricViewReadOperation(metricView) => + // step 1: parse the metric view definition + val (dimensions, measures) = + parseMetricViewColumns(metricView.outputMetrics, metricView.desc.select) + + // step 2: build the Project node containing the dimensions + val dimensionExprs = dimensions.map(_.namedExpr) + // Drop the source columns if it conflicts with dimensions + val sourceOutput = metricView.child.output + // 1. hide the column conflict with dimensions + // 2. add an alias to the source column so they are stable with DeduplicateRelation + // 3. metric view output should use the same exprId + val sourceProjList = sourceOutput.filterNot { attr => + // conflict with dimensions + metricView.outputMetrics + .resolve(Seq(attr.name), session.sessionState.conf.resolver) + .exists(a => dimensions.exists(_.exprId == a.exprId)) + }.map { attr => + if (attr.metadata.contains(MetricViewConstants.COLUMN_TYPE_PROPERTY_KEY)) { + // no alias for metric view column since the measure reference needs to use the + // measure column in MetricViewPlaceholder, but an alias will change the exprId + attr + } else { + // add an alias to the source column so they are stable with DeduplicateRelation + Alias(attr, attr.name)() + } + } + val withDimensions = node.transformDownWithPruning( + _.containsPattern(METRIC_VIEW_PLACEHOLDER)) { + case mv: MetricViewPlaceholder + if mv.metadata.identifier == metricView.metadata.identifier => + ResolvedMetricView( + mv.metadata.identifier, + Project(sourceProjList ++ dimensionExprs, mv.child) + ) + } + + // step 3: resolve the measure references in Aggregate node + val res = withDimensions match { + case aggregate: Aggregate => transformAggregateWithMeasures( + aggregate, + measures + ) + case other => + throw SparkException.internalError("ran into unexpected node: " + other) + } + res + } + } + + private def buildMetricViewOutput(metricView: CanonicalMetricView) + : (Seq[NamedExpression], Seq[NamedExpression]) = { + val dimensions = new mutable.ArrayBuffer[NamedExpression]() + val measures = new mutable.ArrayBuffer[NamedExpression]() + metricView.select.foreach { col => + val metadata = new MetadataBuilder() + .withMetadata(Metadata.fromJson(JsonUtils.toJson(col.getColumnMetadata))) + .build() + col.expression match { + case DimensionExpression(expr) => + dimensions.append( + Alias(parser.parseExpression(expr), col.name)(explicitMetadata = Some(metadata))) + case MeasureExpression(expr) => + measures.append( + Alias(parser.parseExpression(expr), col.name)(explicitMetadata = Some(metadata))) + } + } + (dimensions.toSeq, measures.toSeq) + } + + private def parseMetricViewColumns( + metricViewOutput: Seq[Attribute], + columns: Seq[CanonicalColumn[_ <: CanonicalExpression]] + ): (Seq[MetricViewDimension], Seq[MetricViewMeasure]) = { + val dimensions = new mutable.ArrayBuffer[MetricViewDimension]() + val measures = new mutable.ArrayBuffer[MetricViewMeasure]() + metricViewOutput.zip(columns).foreach { case (attr, column) => + column.expression match { + case DimensionExpression(expr) => + dimensions.append( + MetricViewDimension( + attr.name, + parser.parseExpression(expr), + attr.exprId, + attr.dataType) + ) + case MeasureExpression(expr) => + measures.append( + MetricViewMeasure( + attr.name, + parser.parseExpression(expr), + attr.exprId, + attr.dataType) + ) + } + } + (dimensions.toSeq, measures.toSeq) + } + + private def transformAggregateWithMeasures( + aggregate: Aggregate, + measures: Seq[MetricViewMeasure]): LogicalPlan = { + val measuresMap = measures.map(m => m.exprId -> m).toMap + val newAggExprs = aggregate.aggregateExpressions.map { expr => + expr.transform { + case AggregateExpression(Measure(a: AttributeReference), _, _, _, _) => + measuresMap(a.exprId).expr + }.asInstanceOf[NamedExpression] + } + aggregate.copy(aggregateExpressions = newAggExprs) + } +} + +object MetricViewReadOperation { + def unapply(plan: LogicalPlan): Option[MetricViewPlaceholder] = { + plan match { + case a: Aggregate if a.resolved && a.containsPattern(METRIC_VIEW_PLACEHOLDER) => + collectMetricViewNode(a.child) + case _ => + None + } + } + + @scala.annotation.tailrec + private def collectMetricViewNode(plan: LogicalPlan): Option[MetricViewPlaceholder] = { + plan match { + case f: Filter => collectMetricViewNode(f.child) + case s: Expand => collectMetricViewNode(s.child) + case s: Project => collectMetricViewNode(s.child) + case s: SubqueryAlias => collectMetricViewNode(s.child) + case m: MetricViewPlaceholder => Some(m) + case _ => None + } + } +} + +sealed trait MetricViewColumn { + def name: String + def expr: Expression + def exprId: ExprId + def dataType: DataType + def namedExpr: NamedExpression = { + Alias(UpCast(expr, dataType), name)(exprId = exprId) + } +} + +case class MetricViewDimension( + name: String, + expr: Expression, + exprId: ExprId, + dataType: DataType) extends MetricViewColumn + +case class MetricViewMeasure( + name: String, + expr: Expression, + exprId: ExprId, + dataType: DataType) extends MetricViewColumn diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index f8f6e31be1bc..927b81d2d510 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -724,6 +724,65 @@ class SparkSqlAstBuilder extends AstBuilder { } } + override def visitCodeLiteral(ctx: CodeLiteralContext): String = { + assert(ctx != null) + dollarQuotedString(ctx.DOLLAR_QUOTED_STRING_BODY()) + } + + override def visitCreateMetricView(ctx: CreateMetricViewContext): LogicalPlan = withOrigin(ctx) { + checkDuplicateClauses(ctx.commentSpec(), "COMMENT", ctx) + checkDuplicateClauses(ctx.TBLPROPERTIES, "TBLPROPERTIES", ctx) + checkDuplicateClauses(ctx.routineLanguage(), "LANGUAGE", ctx) + checkDuplicateClauses(ctx.METRICS(), "WITH METRICS", ctx) + val userSpecifiedColumns = Option(ctx.identifierCommentList).toSeq.flatMap { icl => + icl.identifierComment.asScala.map { ic => + ic.identifier.getText -> Option(ic.commentSpec()).map(visitCommentSpec) + } + } + + if (ctx.EXISTS != null && ctx.REPLACE != null) { + throw QueryParsingErrors.createViewWithBothIfNotExistsAndReplaceError(ctx) + } + + if (ctx.METRICS(0) == null) { + throw QueryParsingErrors.missingClausesForOperation( + ctx, "WITH METRICS", "CREATE METRIC VIEW") + } + + if (ctx.routineLanguage(0) == null) { + throw QueryParsingErrors.missingClausesForOperation( + ctx, "LANGUAGE", "CREATE METRIC VIEW") + } + + val languageCtx = ctx.routineLanguage(0) + withOrigin(languageCtx) { + if (languageCtx.SQL() != null) { + operationNotAllowed("Unsupported language for metric view: SQL", ctx) + } + val name: String = languageCtx.IDENTIFIER().getText + if (!name.equalsIgnoreCase("YAML")) { + operationNotAllowed(s"Unsupported language for metric view: $name", ctx) + } + } + + val properties = ctx.propertyList.asScala.headOption + .map(visitPropertyKeyValues) + .getOrElse(Map.empty) + val codeLiteral = visitCodeLiteral(ctx.codeLiteral()) + + withIdentClause(ctx.identifierReference(), ident => { + CreateMetricViewCommand( + UnresolvedIdentifier(ident), + userSpecifiedColumns, + visitCommentSpecList(ctx.commentSpec()), + properties, + codeLiteral, + allowExisting = ctx.EXISTS != null, + replace = ctx.REPLACE != null + ) + }) + } + /** * Create a [[CreateFunctionCommand]]. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/metricViewCommands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/metricViewCommands.scala new file mode 100644 index 000000000000..c1a71fc4c69a --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/metricViewCommands.scala @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.{QueryPlanningTracker, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.{ResolvedIdentifier, SchemaUnsupported} +import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, LogicalPlan} +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.metricview.util.MetricViewPlanner +import org.apache.spark.sql.types.{StringType, StructType} + +case class CreateMetricViewCommand( + child: LogicalPlan, + userSpecifiedColumns: Seq[(String, Option[String])], + comment: Option[String], + properties: Map[String, String], + originalText: String, + allowExisting: Boolean, + replace: Boolean) extends UnaryRunnableCommand with IgnoreCachedData { + + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + + override val output: Seq[Attribute] = Seq( + AttributeReference("result", StringType, nullable = false)() + ) + override def run(sparkSession: SparkSession): Seq[Row] = { + val catalog = sparkSession.sessionState.catalog + val name = child match { + case v: ResolvedIdentifier => + v.identifier.asTableIdentifier + case _ => throw QueryCompilationErrors.loadDataNotSupportedForV2TablesError() + } + val analyzed = MetricViewHelper.analyzeMetricViewText(sparkSession, name, originalText) + + if (userSpecifiedColumns.nonEmpty) { + if (userSpecifiedColumns.length > analyzed.output.length) { + throw QueryCompilationErrors.cannotCreateViewNotEnoughColumnsError( + name, userSpecifiedColumns.map(_._1), analyzed) + } else if (userSpecifiedColumns.length < analyzed.output.length) { + throw QueryCompilationErrors.cannotCreateViewTooManyColumnsError( + name, userSpecifiedColumns.map(_._1), analyzed) + } + } + catalog.createTable( + ViewHelper.prepareTable( + sparkSession, name, Some(originalText), analyzed, userSpecifiedColumns, + properties, SchemaUnsupported, comment, + None, isMetricView = true), + ignoreIfExists = allowExisting) + Seq.empty + } + override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = { + copy(child = newChild) + } +} + +case class AlterMetricViewCommand(child: LogicalPlan, originalText: String) + +object MetricViewHelper { + def analyzeMetricViewText( + session: SparkSession, + name: TableIdentifier, + viewText: String): LogicalPlan = { + val analyzer = session.sessionState.analyzer + // this metadata is used for analysis check, it'll be replaced during create/update with + // more accurate information + val tableMeta = CatalogTable( + identifier = name, + tableType = CatalogTableType.VIEW, + storage = CatalogStorageFormat.empty, + schema = new StructType(), + viewOriginalText = Some(viewText), + viewText = Some(viewText)) + val metricViewNode = MetricViewPlanner.planWrite( + tableMeta, viewText, session.sessionState.sqlParser) + val analyzed = analyzer.executeAndCheck(metricViewNode, new QueryPlanningTracker) + ViewHelper.verifyTemporaryObjectsNotExists(isTemporary = false, name, analyzed, Seq.empty) + analyzed + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index 11ec17ca57fd..95d76c72d295 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -133,7 +133,7 @@ case class CreateViewCommand( SchemaUtils.checkIndeterminateCollationInSchema(plan.schema) if (viewType == LocalTempView) { - val aliasedPlan = aliasPlan(sparkSession, analyzedPlan) + val aliasedPlan = aliasPlan(sparkSession, analyzedPlan, userSpecifiedColumns) val tableDefinition = createTemporaryViewRelation( name, sparkSession, @@ -148,7 +148,7 @@ case class CreateViewCommand( } else if (viewType == GlobalTempView) { val db = sparkSession.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE) val viewIdent = TableIdentifier(name.table, Option(db)) - val aliasedPlan = aliasPlan(sparkSession, analyzedPlan) + val aliasedPlan = aliasPlan(sparkSession, analyzedPlan, userSpecifiedColumns) val tableDefinition = createTemporaryViewRelation( viewIdent, sparkSession, @@ -178,7 +178,10 @@ case class CreateViewCommand( // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...` // Nothing we need to retain from the old view, so just drop and create a new one catalog.dropTable(viewIdent, ignoreIfNotExists = false, purge = false) - catalog.createTable(prepareTable(sparkSession, analyzedPlan), ignoreIfExists = false) + catalog.createTable( + prepareTable( + sparkSession, name, originalText, analyzedPlan, userSpecifiedColumns, properties, + viewSchemaMode, comment, collation), ignoreIfExists = false) } else { // Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already // exists. @@ -186,56 +189,14 @@ case class CreateViewCommand( } } else { // Create the view if it doesn't exist. - catalog.createTable(prepareTable(sparkSession, analyzedPlan), ignoreIfExists = allowExisting) + catalog.createTable( + prepareTable( + sparkSession, name, originalText, analyzedPlan, userSpecifiedColumns, properties, + viewSchemaMode, comment, collation), ignoreIfExists = allowExisting) } Seq.empty[Row] } - /** - * If `userSpecifiedColumns` is defined, alias the analyzed plan to the user specified columns, - * else return the analyzed plan directly. - */ - private def aliasPlan(session: SparkSession, analyzedPlan: LogicalPlan): LogicalPlan = { - if (userSpecifiedColumns.isEmpty) { - analyzedPlan - } else { - val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map { - case (attr, (colName, None)) => Alias(attr, colName)() - case (attr, (colName, Some(colComment))) => - val meta = new MetadataBuilder().putString("comment", colComment).build() - Alias(attr, colName)(explicitMetadata = Some(meta)) - } - session.sessionState.executePlan(Project(projectList, analyzedPlan)).analyzed - } - } - - /** - * Returns a [[CatalogTable]] that can be used to save in the catalog. Generate the view-specific - * properties(e.g. view default database, view query output column names) and store them as - * properties in the CatalogTable, and also creates the proper schema for the view. - */ - private def prepareTable(session: SparkSession, analyzedPlan: LogicalPlan): CatalogTable = { - if (originalText.isEmpty) { - throw QueryCompilationErrors.createPersistedViewFromDatasetAPINotAllowedError() - } - val aliasedSchema = CharVarcharUtils.getRawSchema( - aliasPlan(session, analyzedPlan).schema, session.sessionState.conf) - val newProperties = generateViewProperties( - properties, session, analyzedPlan.schema.fieldNames, aliasedSchema.fieldNames, viewSchemaMode) - - CatalogTable( - identifier = name, - tableType = CatalogTableType.VIEW, - storage = CatalogStorageFormat.empty, - schema = aliasedSchema, - properties = newProperties, - viewOriginalText = originalText, - viewText = originalText, - comment = comment, - collation = collation - ) - } - override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = { copy(plan = WithCTE(plan, cteDefs)) } @@ -812,4 +773,70 @@ object ViewHelper extends SQLConfHelper with Logging with CapturesConfig { properties = Map((VIEW_STORING_ANALYZED_PLAN, "true")), collation = collation) } + + + /** + * Returns a [[CatalogTable]] that can be used to save in the catalog. Generate the view-specific + * properties(e.g. view default database, view query output column names) and store them as + * properties in the CatalogTable, and also creates the proper schema for the view. + */ + def prepareTable( + session: SparkSession, + name: TableIdentifier, + originalText: Option[String], + analyzedPlan: LogicalPlan, + userSpecifiedColumns: Seq[(String, Option[String])], + properties: Map[String, String], + viewSchemaMode: ViewSchemaMode, + comment: Option[String], + collation: Option[String], + isMetricView: Boolean = false): CatalogTable = { + if (originalText.isEmpty) { + throw QueryCompilationErrors.createPersistedViewFromDatasetAPINotAllowedError() + } + val aliasedSchema = CharVarcharUtils.getRawSchema( + aliasPlan(session, analyzedPlan, userSpecifiedColumns).schema, session.sessionState.conf) + val newProperties = generateViewProperties( + properties, session, analyzedPlan.schema.fieldNames, aliasedSchema.fieldNames, viewSchemaMode) + + // Add property to indicate if this is a metric view + val finalProperties = if (isMetricView) { + newProperties + (CatalogTable.VIEW_WITH_METRICS -> "true") + } else { + newProperties + } + + CatalogTable( + identifier = name, + tableType = CatalogTableType.VIEW, + storage = CatalogStorageFormat.empty, + schema = aliasedSchema, + properties = finalProperties, + viewOriginalText = originalText, + viewText = originalText, + comment = comment, + collation = collation + ) + } + + /** + * If `userSpecifiedColumns` is defined, alias the analyzed plan to the user specified columns, + * else return the analyzed plan directly. + */ + def aliasPlan( + session: SparkSession, + analyzedPlan: LogicalPlan, + userSpecifiedColumns: Seq[(String, Option[String])]): LogicalPlan = { + if (userSpecifiedColumns.isEmpty) { + analyzedPlan + } else { + val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map { + case (attr, (colName, None)) => Alias(attr, colName)() + case (attr, (colName, Some(colComment))) => + val meta = new MetadataBuilder().putString("comment", colComment).build() + Alias(attr, colName)(explicitMetadata = Some(meta)) + } + session.sessionState.executePlan(Project(projectList, analyzedPlan)).analyzed + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index ff6e58c2b2a2..995999196cfe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.internal import org.apache.spark.annotation.Unstable import org.apache.spark.sql.{DataSourceRegistration, ExperimentalMethods, SparkSessionExtensions, UDTFRegistration} import org.apache.spark.sql.artifact.ArtifactManager -import org.apache.spark.sql.catalyst.analysis.{Analyzer, EvalSubqueriesForTimeTravel, FunctionRegistry, InvokeProcedures, ReplaceCharWithVarchar, ResolveDataSource, ResolveEventTimeWatermark, ResolveExecuteImmediate, ResolveSessionCatalog, ResolveTranspose, TableFunctionRegistry} +import org.apache.spark.sql.catalyst.analysis.{Analyzer, EvalSubqueriesForTimeTravel, FunctionRegistry, InvokeProcedures, ReplaceCharWithVarchar, ResolveDataSource, ResolveEventTimeWatermark, ResolveExecuteImmediate, ResolveMetricView, ResolveSessionCatalog, ResolveTranspose, TableFunctionRegistry} import org.apache.spark.sql.catalyst.analysis.resolver.ResolverExtension import org.apache.spark.sql.catalyst.catalog.{FunctionExpressionBuilder, SessionCatalog} import org.apache.spark.sql.catalyst.expressions.{Expression, ExtractSemiStructuredFields} @@ -243,6 +243,7 @@ abstract class BaseSessionStateBuilder( ResolveWriteToStream +: new EvalSubqueriesForTimeTravel +: new ResolveTranspose(session) +: + ResolveMetricView(session) +: new InvokeProcedures(session) +: ResolveExecuteImmediate(session, this.catalogManager) +: ExtractSemiStructuredFields +: diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/MetricViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/MetricViewSuite.scala new file mode 100644 index 000000000000..042de7565823 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/MetricViewSuite.scala @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.{AnalysisException, QueryTest} +import org.apache.spark.sql.metricview.serde.{AssetSource, Column, DimensionExpression, MeasureExpression, MetricView, MetricViewFactory, SQLSource} +import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils} + +class SimpleMetricViewSuite extends MetricViewSuite with SharedSparkSession + +/** + * A suite for testing metric view related functionality. + */ +abstract class MetricViewSuite extends QueryTest with SQLTestUtils { + import testImplicits._ + + protected val testMetricViewName = "test_metric_view" + protected val testTableName = "test_table" + protected val testTableData = Seq( + ("region_1", "product_1", 80, 5.0), + ("region_1", "product_2", 70, 10.0), + ("REGION_1", "product_3", 60, 15.0), + ("REGION_1", "product_4", 50, 20.0), + ("region_2", "product_1", 40, 25.0), + ("region_2", "product_2", 30, 30.0), + ("REGION_2", "product_3", 20, 35.0), + ("REGION_2", "product_4", 10, 40.0) + ) + protected val testMetricViewColumns = Seq( + Column("region", DimensionExpression("region"), 0), + Column("product", DimensionExpression("product"), 1), + Column("region_upper", DimensionExpression("upper(region)"), 2), + Column("count_sum", MeasureExpression("sum(count)"), 3), + Column("price_avg", MeasureExpression("avg(price)"), 4) + ) + + override protected def beforeAll(): Unit = { + super.beforeAll() + testTableData + .toDF("region", "product", "count", "price") + .write + .saveAsTable(testTableName) + } + + protected def createMetricView( + metricViewName: String, + metricViewDefinition: MetricView): Unit = { + val yaml = MetricViewFactory.toYAML(metricViewDefinition) + sql(s""" + |CREATE VIEW $metricViewName + |WITH METRICS + |LANGUAGE YAML + |AS + |$$$$ + |$yaml + |$$$$ + |""".stripMargin) + } + + protected def withMetricView( + viewName: String, + metricViewDefinition: MetricView)(body: => Unit): Unit = { + createMetricView(viewName, metricViewDefinition) + withView(viewName) { + body + } + } + + test("test source type") { + val sources = Seq( + AssetSource(testTableName), + SQLSource("SELECT * FROM test_table") + ) + sources.foreach { source => + val metricView = MetricView("0.1", source, None, testMetricViewColumns) + withMetricView(testMetricViewName, metricView) { + checkAnswer( + sql("SELECT measure(count_sum), measure(price_avg) FROM test_metric_view"), + sql("SELECT sum(count), avg(price) FROM test_table") + ) + checkAnswer( + sql("SELECT measure(count_sum), measure(price_avg) " + + "FROM test_metric_view WHERE region_upper = 'REGION_1'"), + sql("SELECT sum(count), avg(price) FROM test_table WHERE upper(region) = 'REGION_1'") + ) + } + } + } + + test("test where clause") { + val metricView = MetricView( + "0.1", AssetSource(testTableName), + Some("product = 'product_1'"), testMetricViewColumns) + withMetricView(testMetricViewName, metricView) { + checkAnswer( + sql("SELECT measure(count_sum), measure(price_avg) FROM test_metric_view"), + sql("SELECT sum(count), avg(price) FROM test_table WHERE product = 'product_1'") + ) + checkAnswer( + sql("SELECT measure(count_sum), measure(price_avg) " + + "FROM test_metric_view WHERE region_upper = 'REGION_1'"), + sql("SELECT sum(count), avg(price) FROM test_table WHERE " + + "product = 'product_1' AND upper(region) = 'REGION_1'") + ) + } + } + + test("test dimensions and measures") { + val metricView = MetricView( + "0.1", AssetSource(testTableName), None, testMetricViewColumns) + withMetricView(testMetricViewName, metricView) { + // dimension and measure + checkAnswer( + sql("SELECT region, product, measure(count_sum), measure(price_avg) " + + "FROM test_metric_view GROUP BY region, product"), + sql("SELECT region, product, sum(count), avg(price) " + + "FROM test_table GROUP BY region, product") + ) + // dimension only + checkAnswer( + sql("SELECT region_upper FROM test_metric_view GROUP BY 1"), + sql("SELECT upper(region) FROM test_table GROUP BY 1") + ) + // measure only + checkAnswer( + sql("SELECT measure(count_sum) FROM test_metric_view"), + sql("SELECT sum(count) FROM test_table") + ) + } + } + + test("column from source cannot be used when query metric view") { + val metricView = MetricView("0.1", AssetSource(testTableName), None, testMetricViewColumns) + withMetricView(testMetricViewName, metricView) { + checkError( + exception = intercept[AnalysisException] { + sql("SELECT sum(count) FROM test_metric_view").collect() + }, + condition = "UNRESOLVED_COLUMN.WITH_SUGGESTION", + parameters = Map( + "objectName" -> "`count`", + "proposal" -> "`count_sum`, `product`, `region`, `price_avg`, `region_upper`" + ), + queryContext = Array(ExpectedContext( + fragment = "count", + start = 11, + stop = 15 + )) + ) + } + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala index aa801b6e2f68..be74d236cd21 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala @@ -25,7 +25,7 @@ import org.apache.hadoop.hive.ql.exec.{UDAF, UDF} import org.apache.hadoop.hive.ql.udf.generic.{AbstractGenericUDAFResolver, GenericUDF, GenericUDTF} import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.analysis.{Analyzer, EvalSubqueriesForTimeTravel, InvokeProcedures, ReplaceCharWithVarchar, ResolveDataSource, ResolveExecuteImmediate, ResolveSessionCatalog, ResolveTranspose} +import org.apache.spark.sql.catalyst.analysis.{Analyzer, EvalSubqueriesForTimeTravel, InvokeProcedures, ReplaceCharWithVarchar, ResolveDataSource, ResolveExecuteImmediate, ResolveMetricView, ResolveSessionCatalog, ResolveTranspose} import org.apache.spark.sql.catalyst.analysis.resolver.ResolverExtension import org.apache.spark.sql.catalyst.catalog.{ExternalCatalogWithListener, InvalidUDFClassException} import org.apache.spark.sql.catalyst.expressions.{Expression, ExtractSemiStructuredFields} @@ -132,6 +132,7 @@ class HiveSessionStateBuilder( new EvalSubqueriesForTimeTravel +: new DetermineTableStats(session) +: new ResolveTranspose(session) +: + ResolveMetricView(session) +: new InvokeProcedures(session) +: ResolveExecuteImmediate(session, catalogManager) +: ExtractSemiStructuredFields +: diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveMetricViewSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveMetricViewSuite.scala new file mode 100644 index 000000000000..bccc2f629a15 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveMetricViewSuite.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import org.apache.spark.sql.execution.MetricViewSuite +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.tags.SlowHiveTest + +/** + * A test suite for Hive metric view related functionality. + */ +@SlowHiveTest +class HiveMetricViewSuite extends MetricViewSuite with TestHiveSingleton