Skip to content

Conversation

@aokolnychyi
Copy link
Contributor

What changes were proposed in this pull request?

This PR prevents failures during recaching failing write/refresh operations.

Why are the changes needed?

After recent changes in SPARK-54387, we may now mark write operations as failed even though they successfully committed to the table but the cache refresh was unsuccessful.

Does this PR introduce any user-facing change?

Yes, recacheByXXX will no longer throw an exception if recaching fails.

How was this patch tested?

This PR comes with tests.

Was this patch authored or co-authored using generative AI tooling?

No.

@github-actions github-actions bot added the SQL label Nov 20, 2025
try {
val sessionWithConfigsOff = getOrCloneSessionWithConfigsOff(spark)
val (newKey, newCache) = sessionWithConfigsOff.withActive {
val refreshedPlan = V2TableRefreshUtil.refresh(cd.plan)
Copy link
Contributor Author

@aokolnychyi aokolnychyi Nov 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In 4.1, we added this line to refresh versions. This refresh MUST NOT fail writes.

val refreshedPlan = V2TableRefreshUtil.refresh(cd.plan)

I am not sure how we want to treat failures from the line below. Previously, this threw an exception and potentially marked writes as failed if we couldn't refresh.

val qe = sessionWithConfigsOff.sessionState.executePlan(refreshedPlan)

The current implementation will NOT throw an exception in either of the cases. Other options:

  • Don't fail only if refresh fails. Continue to fail if QE construction fails.
  • Don't fail in either case but only for DSv2 operations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does seem like all our current invocations treat cache refresh as opportunistic. In other words, it is usually the last step where it is critical to remove the old entry but reaching may or may not succeed. Is that the same understanding that everyone has? Any cases I missed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun, following up on the question here. I see only one place that potentially calls recache on read and it is related to AQE. Usually, only write or REFRESH operations trigger reaching.

  private def buildBuffers(): RDD[CachedBatch] = {
    val cb = try {
      if (supportsColumnarInput) {
        serializer.convertColumnarBatchToCachedBatch(
          cachedPlan.executeColumnar(),
          cachedPlan.output,
          storageLevel,
          cachedPlan.conf)
      } else {
        serializer.convertInternalRowToCachedBatch(
          cachedPlan.execute(),
          cachedPlan.output,
          storageLevel,
          cachedPlan.conf)
      }
    } catch {
      case e: Throwable if cachedPlan.isInstanceOf[AdaptiveSparkPlanExec] =>
        // SPARK-49982: during RDD execution, AQE will execute all stages except ResultStage. If any
        // failure happen, the failure will be cached and the next SQL cache caller will hit the
        // negative cache. Therefore we need to recache the plan.
        val session = cachedPlan.session
        session.sharedState.cacheManager.recacheByPlan(session, logicalPlan)
        throw e
    }

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For write operation, I got it. But, for read operation, is this valid to swallow the error, @aokolnychyi ?

Some(cd.copy(plan = newKey, cachedRepresentation = newCache))
} catch {
case NonFatal(e) =>
logWarning(log"Failed to recache query", e)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm worrying about the side-effect of this part.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I worry too, that's why I want everyone to take a look.

@cloud-fan
Copy link
Contributor

This is a tricky case and I'd like to understand more context:

  • what's the behavior of DML cache refreshing for v1 tables today? do we allow schema change and always rebuild the query plan with latest table version?
  • I think it makes sense to fail if a read query detect incompatible table change after the plan is analyzed, but for DML cache refreshing, it doesn't matter and seems ok to always use the latest table version.

try {
val sessionWithConfigsOff = getOrCloneSessionWithConfigsOff(spark)
val (newKey, newCache) = sessionWithConfigsOff.withActive {
val refreshedPlan = V2TableRefreshUtil.refresh(cd.plan)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to avoid failure here, how about skip query plan validation in the method call here instead?

val refreshedPlan = V2TableRefreshUtil.refresh(cd.plan, validation = false)

and skip the following in V2TableRefreshUtil.refresh

        validateTableIdentity(currentTable, r)
        validateDataColumns(currentTable, r)
        validateMetadataColumns(currentTable, r)

@holdenk
Copy link
Contributor

holdenk commented Nov 21, 2025

Is this a release blocker / regression?

@dongjoon-hyun
Copy link
Member

Gentle ping, @aokolnychyi .

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants