Skip to content

Commit 92c948f

Browse files
aokolnychyigengliangwang
authored andcommitted
[SPARK-54022][SQL] Make DSv2 table resolution aware of cached tables
### What changes were proposed in this pull request? This PR makes DSv2 table resolution aware of cached tables via CACHE TABLE t or `spark.table("t").cache()` commands. ### Why are the changes needed? These changes are needed to avoid silent cache misses for DSv2 tables. Cache lookups depend on DSv2 Table instance equality. If each query is allowed to load a new Table instance from the metastore, this would mean connectors can pick up external changes, leading to unexpected cache misses. This contradicts the behavior we had for built-in Tables and some DSv1 connectors such as Delta. Historically, the expected behavior of CACHE TABLE t and `spark.table("t").cache()` is to cache the table state. ### Does this PR introduce _any_ user-facing change? Yes. The PR fixes the the resolution for DSv2 so that CACHE TABLE t behaves correctly and reliably. - caching table via Dataset API will now pin table state - caching table via CACHE TABLE will now pin table state - caching a query via Dataset API will continue to simply cache the query plan as before ### How was this patch tested? This PR comes with tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #52764 from aokolnychyi/spark-54022. Lead-authored-by: Anton Okolnychyi <[email protected]> Co-authored-by: Gengliang Wang <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 4d1c79f commit 92c948f

File tree

18 files changed

+337
-62
lines changed

18 files changed

+337
-62
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,8 @@ object SimpleAnalyzer extends Analyzer(
8080
FunctionRegistry.builtin,
8181
TableFunctionRegistry.builtin) {
8282
override def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = {}
83-
})) {
83+
}),
84+
RelationCache.empty) {
8485
override def resolver: Resolver = caseSensitiveResolution
8586
}
8687

@@ -285,11 +286,14 @@ object Analyzer {
285286
* Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and
286287
* [[UnresolvedRelation]]s into fully typed objects using information in a [[SessionCatalog]].
287288
*/
288-
class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor[LogicalPlan]
289+
class Analyzer(
290+
override val catalogManager: CatalogManager,
291+
private[sql] val sharedRelationCache: RelationCache = RelationCache.empty)
292+
extends RuleExecutor[LogicalPlan]
289293
with CheckAnalysis with AliasHelper with SQLConfHelper with ColumnResolutionHelper {
290294

291295
private val v1SessionCatalog: SessionCatalog = catalogManager.v1SessionCatalog
292-
private val relationResolution = new RelationResolution(catalogManager)
296+
private val relationResolution = new RelationResolution(catalogManager, sharedRelationCache)
293297
private val functionResolution = new FunctionResolution(catalogManager, relationResolution)
294298

295299
override protected def validatePlanChanges(
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.analysis
19+
20+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
21+
22+
private[sql] trait RelationCache {
23+
def lookup(nameParts: Seq[String], resolver: Resolver): Option[LogicalPlan]
24+
}
25+
26+
private[sql] object RelationCache {
27+
val empty: RelationCache = (_, _) => None
28+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala

Lines changed: 51 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,9 @@ import org.apache.spark.sql.internal.SQLConf
4646
import org.apache.spark.sql.util.CaseInsensitiveStringMap
4747
import org.apache.spark.util.ArrayImplicits._
4848

49-
class RelationResolution(override val catalogManager: CatalogManager)
49+
class RelationResolution(
50+
override val catalogManager: CatalogManager,
51+
sharedRelationCache: RelationCache)
5052
extends DataTypeErrorsBase
5153
with Logging
5254
with LookupCatalog
@@ -118,36 +120,62 @@ class RelationResolution(override val catalogManager: CatalogManager)
118120
val planId = u.getTagValue(LogicalPlan.PLAN_ID_TAG)
119121
relationCache
120122
.get(key)
121-
.map { cache =>
122-
val cachedRelation = cache.transform {
123-
case multi: MultiInstanceRelation =>
124-
val newRelation = multi.newInstance()
125-
newRelation.copyTagsFrom(multi)
126-
newRelation
127-
}
128-
cloneWithPlanId(cachedRelation, planId)
129-
}
123+
.map(adaptCachedRelation(_, planId))
130124
.orElse {
131-
val writePrivilegesString =
132-
Option(u.options.get(UnresolvedRelation.REQUIRED_WRITE_PRIVILEGES))
133-
val table =
134-
CatalogV2Util.loadTable(catalog, ident, finalTimeTravelSpec, writePrivilegesString)
135-
val loaded = createRelation(
125+
val writePrivileges = u.options.get(UnresolvedRelation.REQUIRED_WRITE_PRIVILEGES)
126+
val finalOptions = u.clearWritePrivileges.options
127+
val table = CatalogV2Util.loadTable(
136128
catalog,
137129
ident,
138-
table,
139-
u.clearWritePrivileges.options,
140-
u.isStreaming,
141-
finalTimeTravelSpec
142-
)
143-
loaded.foreach(relationCache.update(key, _))
144-
loaded.map(cloneWithPlanId(_, planId))
145-
}
130+
finalTimeTravelSpec,
131+
Option(writePrivileges))
132+
133+
val sharedRelationCacheMatch = for {
134+
t <- table
135+
if finalTimeTravelSpec.isEmpty && writePrivileges == null && !u.isStreaming
136+
cached <- lookupSharedRelationCache(catalog, ident, t)
137+
} yield {
138+
val updatedRelation = cached.copy(options = finalOptions)
139+
val nameParts = ident.toQualifiedNameParts(catalog)
140+
val aliasedRelation = SubqueryAlias(nameParts, updatedRelation)
141+
relationCache.update(key, aliasedRelation)
142+
adaptCachedRelation(aliasedRelation, planId)
143+
}
144+
145+
sharedRelationCacheMatch.orElse {
146+
val loaded = createRelation(
147+
catalog,
148+
ident,
149+
table,
150+
finalOptions,
151+
u.isStreaming,
152+
finalTimeTravelSpec)
153+
loaded.foreach(relationCache.update(key, _))
154+
loaded.map(cloneWithPlanId(_, planId))
155+
}
156+
}
146157
case _ => None
147158
}
148159
}
149160
}
150161

162+
private def lookupSharedRelationCache(
163+
catalog: CatalogPlugin,
164+
ident: Identifier,
165+
table: Table): Option[DataSourceV2Relation] = {
166+
CatalogV2Util.lookupCachedRelation(sharedRelationCache, catalog, ident, table, conf)
167+
}
168+
169+
private def adaptCachedRelation(cached: LogicalPlan, planId: Option[Long]): LogicalPlan = {
170+
val plan = cached transform {
171+
case multi: MultiInstanceRelation =>
172+
val newRelation = multi.newInstance()
173+
newRelation.copyTagsFrom(multi)
174+
newRelation
175+
}
176+
cloneWithPlanId(plan, planId)
177+
}
178+
151179
private def createRelation(
152180
catalog: CatalogPlugin,
153181
ident: Identifier,

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/HybridAnalyzer.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,7 @@ object HybridAnalyzer {
302302
resolverGuard = new ResolverGuard(legacyAnalyzer.catalogManager),
303303
resolver = new Resolver(
304304
catalogManager = legacyAnalyzer.catalogManager,
305+
sharedRelationCache = legacyAnalyzer.sharedRelationCache,
305306
extensions = legacyAnalyzer.singlePassResolverExtensions,
306307
metadataResolverExtensions = legacyAnalyzer.singlePassMetadataResolverExtensions,
307308
externalRelationResolution = Some(legacyAnalyzer.getRelationResolution)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/Resolver.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.analysis.{
2626
AnalysisErrorAt,
2727
FunctionResolution,
2828
MultiInstanceRelation,
29+
RelationCache,
2930
RelationResolution,
3031
ResolvedInlineTable,
3132
UnresolvedHaving,
@@ -71,6 +72,7 @@ import org.apache.spark.sql.errors.QueryCompilationErrors
7172
*/
7273
class Resolver(
7374
catalogManager: CatalogManager,
75+
sharedRelationCache: RelationCache = RelationCache.empty,
7476
override val extensions: Seq[ResolverExtension] = Seq.empty,
7577
metadataResolverExtensions: Seq[ResolverExtension] = Seq.empty,
7678
externalRelationResolution: Option[RelationResolution] = None)
@@ -81,8 +83,9 @@ class Resolver(
8183
private val cteRegistry = new CteRegistry
8284
private val subqueryRegistry = new SubqueryRegistry
8385
private val identifierAndCteSubstitutor = new IdentifierAndCteSubstitutor
84-
private val relationResolution =
85-
externalRelationResolution.getOrElse(Resolver.createRelationResolution(catalogManager))
86+
private val relationResolution = externalRelationResolution.getOrElse {
87+
Resolver.createRelationResolution(catalogManager, sharedRelationCache)
88+
}
8689
private val functionResolution = new FunctionResolution(catalogManager, relationResolution)
8790
private val expressionResolver = new ExpressionResolver(this, functionResolution, planLogger)
8891
private val aggregateResolver = new AggregateResolver(this, expressionResolver)
@@ -788,7 +791,9 @@ object Resolver {
788791
/**
789792
* Create a new instance of the [[RelationResolution]].
790793
*/
791-
def createRelationResolution(catalogManager: CatalogManager): RelationResolution = {
792-
new RelationResolution(catalogManager)
794+
def createRelationResolution(
795+
catalogManager: CatalogManager,
796+
sharedRelationCache: RelationCache = RelationCache.empty): RelationResolution = {
797+
new RelationResolution(catalogManager, sharedRelationCache)
793798
}
794799
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ import org.apache.spark.sql.connector.write.RowLevelOperation.Command.{DELETE, M
4040
import org.apache.spark.sql.errors.DataTypeErrors.toSQLType
4141
import org.apache.spark.sql.errors.QueryExecutionErrors
4242
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, ExtractV2Table}
43-
import org.apache.spark.sql.execution.datasources.v2.V2TableRefreshUtil
4443
import org.apache.spark.sql.internal.SQLConf
4544
import org.apache.spark.sql.types.{ArrayType, AtomicType, BooleanType, DataType, IntegerType, MapType, MetadataBuilder, StringType, StructField, StructType}
4645
import org.apache.spark.util.ArrayImplicits._
@@ -689,12 +688,7 @@ case class ReplaceTableAsSelect(
689688
extends V2CreateTableAsSelectPlan {
690689

691690
override def markAsAnalyzed(ac: AnalysisContext): LogicalPlan = {
692-
// RTAS may drop and recreate table before query execution, breaking self-references
693-
// refresh and pin versions here to read from original table versions instead of
694-
// newly created empty table that is meant to serve as target for append/overwrite
695-
val refreshedQuery = V2TableRefreshUtil.refresh(query, versionedOnly = true)
696-
val pinnedQuery = V2TableRefreshUtil.pinVersions(refreshedQuery)
697-
copy(query = pinnedQuery, isAnalyzed = true)
691+
copy(isAnalyzed = true)
698692
}
699693

700694
override def withPartitioning(rewritten: Seq[Transform]): V2CreateTablePlan = {

sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import scala.jdk.CollectionConverters._
2525
import org.apache.spark.{SparkException, SparkIllegalArgumentException}
2626
import org.apache.spark.sql.AnalysisException
2727
import org.apache.spark.sql.catalyst.CurrentUserContext
28-
import org.apache.spark.sql.catalyst.analysis.{AsOfTimestamp, AsOfVersion, NamedRelation, NoSuchDatabaseException, NoSuchFunctionException, NoSuchTableException, TimeTravelSpec}
28+
import org.apache.spark.sql.catalyst.analysis.{AsOfTimestamp, AsOfVersion, NamedRelation, NoSuchDatabaseException, NoSuchFunctionException, NoSuchTableException, RelationCache, TimeTravelSpec}
2929
import org.apache.spark.sql.catalyst.catalog.ClusterBySpec
3030
import org.apache.spark.sql.catalyst.expressions.{Expression, Literal, V2ExpressionUtils}
3131
import org.apache.spark.sql.catalyst.plans.logical.{SerdeInfo, TableSpec}
@@ -36,6 +36,7 @@ import org.apache.spark.sql.connector.catalog.constraints.Constraint
3636
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
3737
import org.apache.spark.sql.connector.expressions.{ClusterByTransform, LiteralValue, Transform}
3838
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
39+
import org.apache.spark.sql.internal.SQLConf
3940
import org.apache.spark.sql.types.{ArrayType, MapType, Metadata, MetadataBuilder, StructField, StructType}
4041
import org.apache.spark.sql.util.CaseInsensitiveStringMap
4142
import org.apache.spark.util.ArrayImplicits._
@@ -497,6 +498,27 @@ private[sql] object CatalogV2Util {
497498
loadTable(catalog, ident).map(DataSourceV2Relation.create(_, Some(catalog), Some(ident)))
498499
}
499500

501+
def isSameTable(
502+
rel: DataSourceV2Relation,
503+
catalog: CatalogPlugin,
504+
ident: Identifier,
505+
table: Table): Boolean = {
506+
rel.catalog.contains(catalog) && rel.identifier.contains(ident) && rel.table.id == table.id
507+
}
508+
509+
def lookupCachedRelation(
510+
cache: RelationCache,
511+
catalog: CatalogPlugin,
512+
ident: Identifier,
513+
table: Table,
514+
conf: SQLConf): Option[DataSourceV2Relation] = {
515+
val nameParts = ident.toQualifiedNameParts(catalog)
516+
val cached = cache.lookup(nameParts, conf.resolver)
517+
cached.collect {
518+
case r: DataSourceV2Relation if isSameTable(r, catalog, ident, table) => r
519+
}
520+
}
521+
500522
def isSessionCatalog(catalog: CatalogPlugin): Boolean = {
501523
catalog.name().equalsIgnoreCase(CatalogManager.SESSION_CATALOG_NAME)
502524
}

sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala

Lines changed: 63 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
2222
import org.apache.spark.internal.{Logging, MessageWithContext}
2323
import org.apache.spark.internal.LogKeys._
2424
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
25+
import org.apache.spark.sql.catalyst.analysis.Resolver
2526
import org.apache.spark.sql.catalyst.analysis.V2TableReference
2627
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
2728
import org.apache.spark.sql.catalyst.expressions.{Attribute, SubqueryExpression}
@@ -30,13 +31,14 @@ import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, LogicalPla
3031
import org.apache.spark.sql.catalyst.trees.TreePattern.PLAN_EXPRESSION
3132
import org.apache.spark.sql.catalyst.util.sideBySide
3233
import org.apache.spark.sql.classic.{Dataset, SparkSession}
33-
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper
34+
import org.apache.spark.sql.connector.catalog.CatalogPlugin
35+
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.{IdentifierHelper, MultipartIdentifierHelper}
36+
import org.apache.spark.sql.connector.catalog.Identifier
3437
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
3538
import org.apache.spark.sql.execution.columnar.InMemoryRelation
3639
import org.apache.spark.sql.execution.command.CommandUtils
3740
import org.apache.spark.sql.execution.datasources.{FileIndex, HadoopFsRelation, LogicalRelation, LogicalRelationWithTable}
38-
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, ExtractV2Table, FileTable}
39-
import org.apache.spark.sql.execution.datasources.v2.V2TableRefreshUtil
41+
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, ExtractV2CatalogAndIdentifier, ExtractV2Table, FileTable, V2TableRefreshUtil}
4042
import org.apache.spark.sql.internal.SQLConf
4143
import org.apache.spark.storage.StorageLevel
4244
import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK
@@ -240,31 +242,51 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
240242
name: Seq[String],
241243
conf: SQLConf,
242244
includeTimeTravel: Boolean): Boolean = {
243-
def isSameName(nameInCache: Seq[String]): Boolean = {
244-
nameInCache.length == name.length && nameInCache.zip(name).forall(conf.resolver.tupled)
245-
}
245+
isMatchedTableOrView(plan, name, conf.resolver, includeTimeTravel)
246+
}
247+
248+
private def isMatchedTableOrView(
249+
plan: LogicalPlan,
250+
name: Seq[String],
251+
resolver: Resolver,
252+
includeTimeTravel: Boolean): Boolean = {
246253

247254
EliminateSubqueryAliases(plan) match {
248255
case LogicalRelationWithTable(_, Some(catalogTable)) =>
249-
isSameName(catalogTable.identifier.nameParts)
256+
isSameName(name, catalogTable.identifier.nameParts, resolver)
250257

251258
case DataSourceV2Relation(_, _, Some(catalog), Some(v2Ident), _, timeTravelSpec) =>
252259
val nameInCache = v2Ident.toQualifiedNameParts(catalog)
253-
isSameName(nameInCache) && (includeTimeTravel || timeTravelSpec.isEmpty)
260+
isSameName(name, nameInCache, resolver) && (includeTimeTravel || timeTravelSpec.isEmpty)
254261

255262
case r: V2TableReference =>
256-
isSameName(r.identifier.toQualifiedNameParts(r.catalog))
263+
isSameName(name, r.identifier.toQualifiedNameParts(r.catalog), resolver)
257264

258265
case v: View =>
259-
isSameName(v.desc.identifier.nameParts)
266+
isSameName(name, v.desc.identifier.nameParts, resolver)
260267

261268
case HiveTableRelation(catalogTable, _, _, _, _) =>
262-
isSameName(catalogTable.identifier.nameParts)
269+
isSameName(name, catalogTable.identifier.nameParts, resolver)
263270

264271
case _ => false
265272
}
266273
}
267274

275+
private def isSameName(
276+
name: Seq[String],
277+
catalog: CatalogPlugin,
278+
ident: Identifier,
279+
resolver: Resolver): Boolean = {
280+
isSameName(name, ident.toQualifiedNameParts(catalog), resolver)
281+
}
282+
283+
private def isSameName(
284+
name: Seq[String],
285+
nameInCache: Seq[String],
286+
resolver: Resolver): Boolean = {
287+
nameInCache.length == name.length && nameInCache.zip(name).forall(resolver.tupled)
288+
}
289+
268290
private def uncacheByCondition(
269291
spark: SparkSession,
270292
isMatchedPlan: LogicalPlan => Boolean,
@@ -354,7 +376,7 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
354376
cd.cachedRepresentation.cacheBuilder.clearCache()
355377
val sessionWithConfigsOff = getOrCloneSessionWithConfigsOff(spark)
356378
val (newKey, newCache) = sessionWithConfigsOff.withActive {
357-
val refreshedPlan = V2TableRefreshUtil.refresh(cd.plan)
379+
val refreshedPlan = V2TableRefreshUtil.refresh(sessionWithConfigsOff, cd.plan)
358380
val qe = sessionWithConfigsOff.sessionState.executePlan(refreshedPlan)
359381
qe.normalized -> InMemoryRelation(cd.cachedRepresentation.cacheBuilder, qe)
360382
}
@@ -371,6 +393,35 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
371393
}
372394
}
373395

396+
private[sql] def lookupCachedTable(
397+
name: Seq[String],
398+
resolver: Resolver): Option[LogicalPlan] = {
399+
val cachedRelations = findCachedRelations(name, resolver)
400+
cachedRelations match {
401+
case cachedRelation +: _ =>
402+
CacheManager.logCacheOperation(
403+
log"Relation cache hit for table ${MDC(TABLE_NAME, name.quoted)}")
404+
Some(cachedRelation)
405+
case _ =>
406+
None
407+
}
408+
}
409+
410+
private def findCachedRelations(
411+
name: Seq[String],
412+
resolver: Resolver): Seq[LogicalPlan] = {
413+
cachedData.flatMap { cd =>
414+
val plan = EliminateSubqueryAliases(cd.plan)
415+
plan match {
416+
case r @ ExtractV2CatalogAndIdentifier(catalog, ident)
417+
if isSameName(name, catalog, ident, resolver) && r.timeTravelSpec.isEmpty =>
418+
Some(r)
419+
case _ =>
420+
None
421+
}
422+
}
423+
}
424+
374425
/**
375426
* Optionally returns cached data for the given [[Dataset]]
376427
*/

0 commit comments

Comments
 (0)