-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-54424][SQL] Failures during recaching must not fail operations #53143
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,6 +17,8 @@ | |
|
|
||
| package org.apache.spark.sql.execution | ||
|
|
||
| import scala.util.control.NonFatal | ||
|
|
||
| import org.apache.hadoop.fs.{FileSystem, Path} | ||
|
|
||
| import org.apache.spark.internal.{Logging, MessageWithContext} | ||
|
|
@@ -35,8 +37,7 @@ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper | |
| import org.apache.spark.sql.execution.columnar.InMemoryRelation | ||
| import org.apache.spark.sql.execution.command.CommandUtils | ||
| import org.apache.spark.sql.execution.datasources.{FileIndex, HadoopFsRelation, LogicalRelation, LogicalRelationWithTable} | ||
| import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, ExtractV2Table, FileTable} | ||
| import org.apache.spark.sql.execution.datasources.v2.V2TableRefreshUtil | ||
| import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, ExtractV2Table, FileTable, V2TableRefreshUtil} | ||
| import org.apache.spark.sql.internal.SQLConf | ||
| import org.apache.spark.storage.StorageLevel | ||
| import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK | ||
|
|
@@ -352,22 +353,35 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { | |
| } | ||
| needToRecache.foreach { cd => | ||
| cd.cachedRepresentation.cacheBuilder.clearCache() | ||
| tryRebuildCacheEntry(spark, cd).foreach { entry => | ||
| this.synchronized { | ||
| if (lookupCachedDataInternal(entry.plan).nonEmpty) { | ||
| logWarning("While recaching, data was already added to cache.") | ||
| } else { | ||
| cachedData = entry +: cachedData | ||
| CacheManager.logCacheOperation(log"Re-cached Dataframe cache entry:" + | ||
| log"${MDC(DATAFRAME_CACHE_ENTRY, entry)}") | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private def tryRebuildCacheEntry( | ||
| spark: SparkSession, | ||
| cd: CachedData): Option[CachedData] = { | ||
| try { | ||
| val sessionWithConfigsOff = getOrCloneSessionWithConfigsOff(spark) | ||
| val (newKey, newCache) = sessionWithConfigsOff.withActive { | ||
| val refreshedPlan = V2TableRefreshUtil.refresh(cd.plan) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we want to avoid failure here, how about skip query plan validation in the method call here instead? and skip the following in |
||
| val qe = sessionWithConfigsOff.sessionState.executePlan(refreshedPlan) | ||
| qe.normalized -> InMemoryRelation(cd.cachedRepresentation.cacheBuilder, qe) | ||
| } | ||
| val recomputedPlan = cd.copy(plan = newKey, cachedRepresentation = newCache) | ||
| this.synchronized { | ||
| if (lookupCachedDataInternal(recomputedPlan.plan).nonEmpty) { | ||
| logWarning("While recaching, data was already added to cache.") | ||
| } else { | ||
| cachedData = recomputedPlan +: cachedData | ||
| CacheManager.logCacheOperation(log"Re-cached Dataframe cache entry:" + | ||
| log"${MDC(DATAFRAME_CACHE_ENTRY, recomputedPlan)}") | ||
| } | ||
| } | ||
| Some(cd.copy(plan = newKey, cachedRepresentation = newCache)) | ||
| } catch { | ||
| case NonFatal(e) => | ||
| logWarning(log"Failed to recache query", e) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm worrying about the side-effect of this part.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I worry too, that's why I want everyone to take a look. |
||
| None | ||
| } | ||
| } | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In 4.1, we added this line to refresh versions. This refresh MUST NOT fail writes.
I am not sure how we want to treat failures from the line below. Previously, this threw an exception and potentially marked writes as failed if we couldn't refresh.
The current implementation will NOT throw an exception in either of the cases. Other options:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan @gengliangwang @dongjoon-hyun @viirya @szehon-ho, thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does seem like all our current invocations treat cache refresh as opportunistic. In other words, it is usually the last step where it is critical to remove the old entry but reaching may or may not succeed. Is that the same understanding that everyone has? Any cases I missed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dongjoon-hyun, following up on the question here. I see only one place that potentially calls recache on read and it is related to AQE. Usually, only write or REFRESH operations trigger reaching.