Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.execution

import scala.util.control.NonFatal

import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.internal.{Logging, MessageWithContext}
Expand All @@ -35,8 +37,7 @@ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.columnar.InMemoryRelation
import org.apache.spark.sql.execution.command.CommandUtils
import org.apache.spark.sql.execution.datasources.{FileIndex, HadoopFsRelation, LogicalRelation, LogicalRelationWithTable}
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, ExtractV2Table, FileTable}
import org.apache.spark.sql.execution.datasources.v2.V2TableRefreshUtil
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, ExtractV2Table, FileTable, V2TableRefreshUtil}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK
Expand Down Expand Up @@ -352,22 +353,35 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
}
needToRecache.foreach { cd =>
cd.cachedRepresentation.cacheBuilder.clearCache()
tryRebuildCacheEntry(spark, cd).foreach { entry =>
this.synchronized {
if (lookupCachedDataInternal(entry.plan).nonEmpty) {
logWarning("While recaching, data was already added to cache.")
} else {
cachedData = entry +: cachedData
CacheManager.logCacheOperation(log"Re-cached Dataframe cache entry:" +
log"${MDC(DATAFRAME_CACHE_ENTRY, entry)}")
}
}
}
}
}

private def tryRebuildCacheEntry(
spark: SparkSession,
cd: CachedData): Option[CachedData] = {
try {
val sessionWithConfigsOff = getOrCloneSessionWithConfigsOff(spark)
val (newKey, newCache) = sessionWithConfigsOff.withActive {
val refreshedPlan = V2TableRefreshUtil.refresh(cd.plan)
Copy link
Contributor Author

@aokolnychyi aokolnychyi Nov 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In 4.1, we added this line to refresh versions. This refresh MUST NOT fail writes.

val refreshedPlan = V2TableRefreshUtil.refresh(cd.plan)

I am not sure how we want to treat failures from the line below. Previously, this threw an exception and potentially marked writes as failed if we couldn't refresh.

val qe = sessionWithConfigsOff.sessionState.executePlan(refreshedPlan)

The current implementation will NOT throw an exception in either of the cases. Other options:

  • Don't fail only if refresh fails. Continue to fail if QE construction fails.
  • Don't fail in either case but only for DSv2 operations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does seem like all our current invocations treat cache refresh as opportunistic. In other words, it is usually the last step where it is critical to remove the old entry but reaching may or may not succeed. Is that the same understanding that everyone has? Any cases I missed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun, following up on the question here. I see only one place that potentially calls recache on read and it is related to AQE. Usually, only write or REFRESH operations trigger reaching.

  private def buildBuffers(): RDD[CachedBatch] = {
    val cb = try {
      if (supportsColumnarInput) {
        serializer.convertColumnarBatchToCachedBatch(
          cachedPlan.executeColumnar(),
          cachedPlan.output,
          storageLevel,
          cachedPlan.conf)
      } else {
        serializer.convertInternalRowToCachedBatch(
          cachedPlan.execute(),
          cachedPlan.output,
          storageLevel,
          cachedPlan.conf)
      }
    } catch {
      case e: Throwable if cachedPlan.isInstanceOf[AdaptiveSparkPlanExec] =>
        // SPARK-49982: during RDD execution, AQE will execute all stages except ResultStage. If any
        // failure happen, the failure will be cached and the next SQL cache caller will hit the
        // negative cache. Therefore we need to recache the plan.
        val session = cachedPlan.session
        session.sharedState.cacheManager.recacheByPlan(session, logicalPlan)
        throw e
    }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to avoid failure here, how about skip query plan validation in the method call here instead?

val refreshedPlan = V2TableRefreshUtil.refresh(cd.plan, validation = false)

and skip the following in V2TableRefreshUtil.refresh

        validateTableIdentity(currentTable, r)
        validateDataColumns(currentTable, r)
        validateMetadataColumns(currentTable, r)

val qe = sessionWithConfigsOff.sessionState.executePlan(refreshedPlan)
qe.normalized -> InMemoryRelation(cd.cachedRepresentation.cacheBuilder, qe)
}
val recomputedPlan = cd.copy(plan = newKey, cachedRepresentation = newCache)
this.synchronized {
if (lookupCachedDataInternal(recomputedPlan.plan).nonEmpty) {
logWarning("While recaching, data was already added to cache.")
} else {
cachedData = recomputedPlan +: cachedData
CacheManager.logCacheOperation(log"Re-cached Dataframe cache entry:" +
log"${MDC(DATAFRAME_CACHE_ENTRY, recomputedPlan)}")
}
}
Some(cd.copy(plan = newKey, cachedRepresentation = newCache))
} catch {
case NonFatal(e) =>
logWarning(log"Failed to recache query", e)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm worrying about the side-effect of this part.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I worry too, that's why I want everyone to take a look.

None
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSel
import org.apache.spark.sql.connector.catalog.{Column, ColumnDefaultValue, DefaultValue, Identifier, InMemoryTableCatalog, TableInfo}
import org.apache.spark.sql.connector.catalog.BasicInMemoryTableCatalog
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, UpdateColumnDefaultValue}
import org.apache.spark.sql.connector.catalog.TableChange
import org.apache.spark.sql.connector.expressions.{ApplyTransform, GeneralScalarExpression, LiteralValue, Transform}
import org.apache.spark.sql.connector.expressions.filter.{AlwaysFalse, AlwaysTrue}
import org.apache.spark.sql.execution.{QueryExecution, SparkPlan}
Expand Down Expand Up @@ -1640,6 +1641,38 @@ class DataSourceV2DataFrameSuite
}
}

test("SPARK-54424: inability to refresh cache shouldn't fail writes") {
val t = "testcat.ns1.ns2.tbl"
val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
withTable(t) {
sql(s"CREATE TABLE $t (id INT, value INT, category STRING) USING foo")
sql(s"INSERT INTO $t VALUES (1, 10, 'A'), (2, 20, 'B'), (3, 30, 'A')")

// cache table
spark.table(t).cache()

// verify caching works as expected
assertCached(spark.table(t))
checkAnswer(
spark.table(t),
Seq(Row(1, 10, "A"), Row(2, 20, "B"), Row(3, 30, "A")))

// evolve table directly to mimic external changes
// these external changes make cached plan invalid (column is no longer there)
val change = TableChange.deleteColumn(Array("category"), false)
catalog("testcat").alterTable(ident, change)

// refresh table is supposed to trigger recaching
spark.sql(s"REFRESH TABLE $t")

// recaching is expected to fail but there should be no stale entries
assert(spark.sharedState.cacheManager.isEmpty)

// verify latest schema and data are propagated
checkAnswer(spark.table(t), Seq(Row(1, 10), Row(2, 20), Row(3, 30)))
}
}

private def pinTable(catalogName: String, ident: Identifier, version: String): Unit = {
catalog(catalogName) match {
case inMemory: BasicInMemoryTableCatalog => inMemory.pinTable(ident, version)
Expand Down