Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@

lexer grammar SqlBaseLexer;

@header {
import java.util.ArrayDeque;
import java.util.Deque;
}

@members {
/**
* When true, parser should throw ParseException for unclosed bracketed comment.
Expand Down Expand Up @@ -70,6 +75,11 @@ lexer grammar SqlBaseLexer;
has_unclosed_bracketed_comment = true;
}

/**
* This field stores the tags which are used to detect the end of a dollar quoted string literal.
*/
private final Deque<String> tags = new ArrayDeque<String>();

/**
* When greater than zero, it's in the middle of parsing ARRAY/MAP/STRUCT type.
*/
Expand Down Expand Up @@ -325,6 +335,7 @@ MATCHED: 'MATCHED';
MATERIALIZED: 'MATERIALIZED';
MAX: 'MAX';
MERGE: 'MERGE';
METRICS: 'METRICS';
MICROSECOND: 'MICROSECOND';
MICROSECONDS: 'MICROSECONDS';
MILLISECOND: 'MILLISECOND';
Expand Down Expand Up @@ -557,6 +568,10 @@ STRING_LITERAL
| 'R"'(~'"')* '"'
;

BEGIN_DOLLAR_QUOTED_STRING
: DOLLAR_QUOTED_TAG {tags.push(getText());} -> pushMode(DOLLAR_QUOTED_STRING_MODE)
;

DOUBLEQUOTED_STRING
:'"' ( ~('"'|'\\') | '""' | ('\\' .) )* '"'
;
Expand Down Expand Up @@ -634,6 +649,10 @@ fragment LETTER
: [A-Z]
;

fragment DOLLAR_QUOTED_TAG
: '$' LETTER* '$'
;

fragment UNICODE_LETTER
: [\p{L}]
;
Expand All @@ -656,3 +675,14 @@ WS
UNRECOGNIZED
: .
;

mode DOLLAR_QUOTED_STRING_MODE;

DOLLAR_QUOTED_STRING_BODY
: ~'$'+
| '$' ~'$'*
;

END_DOLLAR_QUOTED_STRING
: DOLLAR_QUOTED_TAG {getText().equals(tags.peek())}? {tags.pop();} -> popMode
;
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,14 @@ statement
(PARTITIONED ON identifierList) |
(TBLPROPERTIES propertyList))*
AS query #createView
| CREATE (OR REPLACE)?
VIEW (IF errorCapturingNot EXISTS)? identifierReference
identifierCommentList?
((WITH METRICS) |
routineLanguage |
commentSpec |
(TBLPROPERTIES propertyList))*
AS codeLiteral #createMetricView
| CREATE (OR REPLACE)? GLOBAL? TEMPORARY VIEW
tableIdentifier (LEFT_PAREN colTypeList RIGHT_PAREN)? tableProvider
(OPTIONS propertyList)? #createTempViewUsing
Expand Down Expand Up @@ -1523,6 +1531,17 @@ complexColType
: errorCapturingIdentifier COLON? dataType (errorCapturingNot NULL)? commentSpec?
;

// The code literal is defined as a dollar quoted string.
// A dollar quoted string consists of
// - a begin tag which contains a dollar sign, an optional tag, and another dollar sign,
// - a string literal that is made up of arbitrary sequence of characters, and
// - an end tag which has to be exact the same as the begin tag.
// As the string literal can contain dollar signs, we add + to DOLLAR_QUOTED_STRING_BODY to avoid
// the parser eagarly matching END_DOLLAR_QUOTED_STRING when seeing a dollar sign.
codeLiteral
: BEGIN_DOLLAR_QUOTED_STRING DOLLAR_QUOTED_STRING_BODY+ END_DOLLAR_QUOTED_STRING
;

routineCharacteristics
: (routineLanguage
| specificName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,7 @@ private[sql] object QueryParsingErrors extends DataTypeErrorsBase {
ctx)
}

def createViewWithBothIfNotExistsAndReplaceError(ctx: CreateViewContext): Throwable = {
def createViewWithBothIfNotExistsAndReplaceError(ctx: ParserRuleContext): Throwable = {
new ParseException(errorClass = "_LEGACY_ERROR_TEMP_0052", ctx)
}

Expand Down Expand Up @@ -774,6 +774,18 @@ private[sql] object QueryParsingErrors extends DataTypeErrorsBase {
ctx)
}

def missingClausesForOperation(
ctx: ParserRuleContext,
clauses: String,
operation: String): Throwable =
new ParseException(
errorClass = "MISSING_CLAUSES_FOR_OPERATION",
messageParameters = Map(
"clauses" -> clauses,
"operation" -> operation),
ctx
)

def invalidDatetimeUnitError(
ctx: ParserRuleContext,
functionName: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2155,6 +2155,8 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
case f @ UnresolvedFunction(nameParts, _, _, _, _, _, _) =>
if (functionResolution.lookupBuiltinOrTempFunction(nameParts, Some(f)).isDefined) {
f
} else if (nameParts.length == 1 && nameParts.head.equalsIgnoreCase("measure")) {
f
} else {
val CatalogAndIdentifier(catalog, ident) =
relationResolution.expandIdentifier(nameParts)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -819,6 +819,7 @@ object FunctionRegistry {
expression[ThetaDifference]("theta_difference"),
expression[ThetaIntersection]("theta_intersection"),
expression[ApproxTopKEstimate]("approx_top_k_estimate"),
expression[Measure]("measure"),

// grouping sets
expression[Grouping]("grouping"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, View}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.metricview.logical.ResolvedMetricView

/**
* This file defines view types and analysis rules related to views.
Expand All @@ -32,6 +33,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
object EliminateView extends Rule[LogicalPlan] with CastSupport {
override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
case v: View => v.child
case rmv: ResolvedMetricView => rmv.child
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAM
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf.GLOBAL_TEMP_DATABASE
import org.apache.spark.sql.metricview.util.MetricViewPlanner
import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType}
import org.apache.spark.sql.util.{CaseInsensitiveStringMap, PartitioningUtils}
import org.apache.spark.util.ArrayImplicits._
Expand Down Expand Up @@ -943,7 +944,9 @@ class SessionCatalog(
val table = qualifiedIdent.table
val multiParts = Seq(CatalogManager.SESSION_CATALOG_NAME, db, table)

if (metadata.tableType == CatalogTableType.VIEW) {
if (CatalogTable.isMetricView(metadata)) {
parseMetricViewDefinition(metadata)
} else if (metadata.tableType == CatalogTableType.VIEW) {
// The relation is a view, so we wrap the relation by:
// 1. Add a [[View]] operator over the relation to keep track of the view desc;
// 2. Wrap the logical plan in a [[SubqueryAlias]] which tracks the name of the view.
Expand All @@ -953,6 +956,27 @@ class SessionCatalog(
}
}

private def parseMetricViewDefinition(metadata: CatalogTable): LogicalPlan = {
val viewDefinition = metadata.viewText.getOrElse {
throw SparkException.internalError("Invalid view without text.")
}
val viewConfigs = metadata.viewSQLConfigs
val origin = CurrentOrigin.get.copy(
objectType = Some("METRIC VIEW"),
objectName = Some(metadata.qualifiedName)
)
SQLConf.withExistingConf(
View.effectiveSQLConf(
configs = viewConfigs,
isTempView = false
)
) {
CurrentOrigin.withOrigin(origin) {
MetricViewPlanner.planRead(metadata, viewDefinition, parser, metadata.schema)
}
}
}

private def buildViewDDL(metadata: CatalogTable, isTempView: Boolean): Option[String] = {
if (isTempView) {
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,18 @@ object CatalogTable {

val VIEW_CATALOG_AND_NAMESPACE = VIEW_PREFIX + "catalogAndNamespace.numParts"
val VIEW_CATALOG_AND_NAMESPACE_PART_PREFIX = VIEW_PREFIX + "catalogAndNamespace.part."

// Property to indicate that a VIEW is actually a METRIC VIEW
val VIEW_WITH_METRICS = VIEW_PREFIX + "viewWithMetrics"

/**
* Check if a CatalogTable is a metric view by looking at its properties.
*/
def isMetricView(table: CatalogTable): Boolean = {
table.tableType == CatalogTableType.VIEW &&
table.properties.get(VIEW_WITH_METRICS).contains("true")
}

// Convert the current catalog and namespace to properties.
def catalogAndNamespaceToProps(
currentCatalog: String,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.expressions.aggregate

import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, ExpressionDescription, UnevaluableAggregateFunc}
import org.apache.spark.sql.catalyst.trees.TreePattern.{MEASURE, TreePattern}
import org.apache.spark.sql.catalyst.trees.UnaryLike
import org.apache.spark.sql.types.{AbstractDataType, AnyDataType, DataType}

// This function serves as an annotation to tell the analyzer to calculate
// the measures defined in metric views. It cannot be evaluated in execution phase
// and instead it'll be replaced to the actual aggregate functions defined by
// the measure (as input argument).
// scalastyle:off line.size.limit
@ExpressionDescription(
usage = "_FUNC_(expr) - this function is used and can only be used to calculate a measure defined in a metric view.",
examples = """
Examples:
> SELECT dimension_col, _FUNC_(measure_col)
FROM test_metric_view
GROUP BY dimension_col;
dim_1, 100
dim_2, 200
""",
group = "agg_funcs",
since = "4.0.0")
// scalastyle:on line.size.limit
case class Measure(child: Expression)
extends UnevaluableAggregateFunc with ExpectsInputTypes
with UnaryLike[Expression] {

override protected def withNewChildInternal(newChild: Expression): Measure =
copy(child = newChild)

override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType)

override def dataType: DataType = child.dataType

override def prettyName: String = getTagValue(FunctionRegistry.FUNC_ALIAS).getOrElse("measure")

override def nullable: Boolean = true

override val nodePatterns: Seq[TreePattern] = Seq(MEASURE)
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.util.matching.Regex

import org.antlr.v4.runtime.{ParserRuleContext, Token}
import org.antlr.v4.runtime.misc.Interval
import org.antlr.v4.runtime.tree.{ParseTree, TerminalNodeImpl}
import org.antlr.v4.runtime.tree.{ParseTree, TerminalNode, TerminalNodeImpl}

import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.analysis.UnresolvedIdentifier
Expand Down Expand Up @@ -88,6 +88,18 @@ object ParserUtils extends SparkParserUtils {
node.getText.slice(1, node.getText.length - 1)
}

/**
* Obtain the string literal provided as a dollar quoted string.
* A dollar quoted string is defined as {{{$[tag]$<string literal>$[tag]$}}},
* where the string literal is parsed as a list of body sections.
* This helper method concatenates all body sections and restores the string literal back.
*/
def dollarQuotedString(sections: util.List[TerminalNode]): String = {
val sb = new StringBuilder()
sections forEach (body => sb.append(body.getText))
sb.toString()
}

/** Collect the entries if any. */
def entry(key: String, value: Token): Seq[(String, String)] = {
Option(value).toSeq.map(x => key -> string(x))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ object TreePattern extends Enumeration {
val MAP_FROM_ARRAYS: Value = Value
val MAP_FROM_ENTRIES: Value = Value
val MAP_OBJECTS: Value = Value
val MEASURE: Value = Value
val MULTI_ALIAS: Value = Value
val NEW_INSTANCE: Value = Value
val NOT: Value = Value
Expand Down Expand Up @@ -149,6 +150,7 @@ object TreePattern extends Enumeration {
val LIMIT: Value = Value
val LOCAL_RELATION: Value = Value
val LOGICAL_QUERY_STAGE: Value = Value
val METRIC_VIEW_PLACEHOLDER: Value = Value
val NATURAL_LIKE_JOIN: Value = Value
val NO_GROUPING_AGGREGATE_REFERENCE: Value = Value
val OFFSET: Value = Value
Expand All @@ -162,6 +164,7 @@ object TreePattern extends Enumeration {
val RELATION_TIME_TRAVEL: Value = Value
val REPARTITION_OPERATION: Value = Value
val REBALANCE_PARTITIONS: Value = Value
val RESOLVED_METRIC_VIEW: Value = Value
val SERIALIZE_FROM_OBJECT: Value = Value
val SORT: Value = Value
val SQL_TABLE_FUNCTION: Value = Value
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.metricview.logical

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode}
import org.apache.spark.sql.catalyst.trees.TreePattern.{METRIC_VIEW_PLACEHOLDER, RESOLVED_METRIC_VIEW, TreePattern}
import org.apache.spark.sql.metricview.serde.MetricView

case class MetricViewPlaceholder(
metadata: CatalogTable,
desc: MetricView,
outputMetrics: Seq[Attribute],
child: LogicalPlan,
isCreate: Boolean = false) extends UnaryNode {
final override val nodePatterns: Seq[TreePattern] = Seq(METRIC_VIEW_PLACEHOLDER)
override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = {
copy(child = newChild)
}
override def output: Seq[Attribute] = outputMetrics
override lazy val resolved: Boolean = child.resolved
override def simpleString(maxFields: Int): String =
s"$nodeName ${output.mkString("[", ", ", "]")}".trim

override def producedAttributes: AttributeSet = AttributeSet(outputMetrics)
}

case class ResolvedMetricView(
identifier: TableIdentifier,
child: LogicalPlan) extends UnaryNode {
override def output: scala.Seq[Attribute] = child.output
override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
copy(child = newChild)
override lazy val resolved: Boolean = child.resolved
final override val nodePatterns: Seq[TreePattern] = Seq(RESOLVED_METRIC_VIEW)
}
Loading