-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-54424][SQL] Failures during recaching must not fail operations #53143
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
| try { | ||
| val sessionWithConfigsOff = getOrCloneSessionWithConfigsOff(spark) | ||
| val (newKey, newCache) = sessionWithConfigsOff.withActive { | ||
| val refreshedPlan = V2TableRefreshUtil.refresh(cd.plan) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In 4.1, we added this line to refresh versions. This refresh MUST NOT fail writes.
val refreshedPlan = V2TableRefreshUtil.refresh(cd.plan)
I am not sure how we want to treat failures from the line below. Previously, this threw an exception and potentially marked writes as failed if we couldn't refresh.
val qe = sessionWithConfigsOff.sessionState.executePlan(refreshedPlan)
The current implementation will NOT throw an exception in either of the cases. Other options:
- Don't fail only if refresh fails. Continue to fail if QE construction fails.
- Don't fail in either case but only for DSv2 operations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan @gengliangwang @dongjoon-hyun @viirya @szehon-ho, thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does seem like all our current invocations treat cache refresh as opportunistic. In other words, it is usually the last step where it is critical to remove the old entry but reaching may or may not succeed. Is that the same understanding that everyone has? Any cases I missed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dongjoon-hyun, following up on the question here. I see only one place that potentially calls recache on read and it is related to AQE. Usually, only write or REFRESH operations trigger reaching.
private def buildBuffers(): RDD[CachedBatch] = {
val cb = try {
if (supportsColumnarInput) {
serializer.convertColumnarBatchToCachedBatch(
cachedPlan.executeColumnar(),
cachedPlan.output,
storageLevel,
cachedPlan.conf)
} else {
serializer.convertInternalRowToCachedBatch(
cachedPlan.execute(),
cachedPlan.output,
storageLevel,
cachedPlan.conf)
}
} catch {
case e: Throwable if cachedPlan.isInstanceOf[AdaptiveSparkPlanExec] =>
// SPARK-49982: during RDD execution, AQE will execute all stages except ResultStage. If any
// failure happen, the failure will be cached and the next SQL cache caller will hit the
// negative cache. Therefore we need to recache the plan.
val session = cachedPlan.session
session.sharedState.cacheManager.recacheByPlan(session, logicalPlan)
throw e
}
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For write operation, I got it. But, for read operation, is this valid to swallow the error, @aokolnychyi ?
| Some(cd.copy(plan = newKey, cachedRepresentation = newCache)) | ||
| } catch { | ||
| case NonFatal(e) => | ||
| logWarning(log"Failed to recache query", e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm worrying about the side-effect of this part.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I worry too, that's why I want everyone to take a look.
|
This is a tricky case and I'd like to understand more context:
|
| try { | ||
| val sessionWithConfigsOff = getOrCloneSessionWithConfigsOff(spark) | ||
| val (newKey, newCache) = sessionWithConfigsOff.withActive { | ||
| val refreshedPlan = V2TableRefreshUtil.refresh(cd.plan) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we want to avoid failure here, how about skip query plan validation in the method call here instead?
val refreshedPlan = V2TableRefreshUtil.refresh(cd.plan, validation = false)
and skip the following in V2TableRefreshUtil.refresh
validateTableIdentity(currentTable, r)
validateDataColumns(currentTable, r)
validateMetadataColumns(currentTable, r)
|
Is this a release blocker / regression? |
|
Gentle ping, @aokolnychyi . |
What changes were proposed in this pull request?
This PR prevents failures during recaching failing write/refresh operations.
Why are the changes needed?
After recent changes in SPARK-54387, we may now mark write operations as failed even though they successfully committed to the table but the cache refresh was unsuccessful.
Does this PR introduce any user-facing change?
Yes,
recacheByXXXwill no longer throw an exception if recaching fails.How was this patch tested?
This PR comes with tests.
Was this patch authored or co-authored using generative AI tooling?
No.