Skip to content

Commit 88671ca

Browse files
liviazhuanishshri-db
authored andcommitted
[SPARK-54307][SS] Throw an error if streaming query is restarted with stateful op but there is empty state dir
### What changes were proposed in this pull request? Add an error if stateful operators are in the query plan but state directory is empty. ### Why are the changes needed? Without this explicit error, user will see CANNOT_LOAD_STATE_STORE.CANNOT_READ_STREAMING_STATE_FILE which is confusing and could be mistaken for an internal error. ### Does this PR introduce _any_ user-facing change? Yes, error message will change from `CANNOT_LOAD_STATE_STORE.CANNOT_READ_STREAMING_STATE_FILE` to `STREAMING_STATEFUL_OPERATOR_MISSING_STATE_DIRECTORY` ### How was this patch tested? New unit tests ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude 4.5 Closes #53007 from liviazhu/liviazhu-db/empty-state-dir-error. Authored-by: Livia Zhu <[email protected]> Signed-off-by: Anish Shrigondekar <[email protected]>
1 parent ca9c256 commit 88671ca

File tree

4 files changed

+109
-2
lines changed

4 files changed

+109
-2
lines changed

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5771,6 +5771,15 @@
57715771
},
57725772
"sqlState" : "0A000"
57735773
},
5774+
"STREAMING_STATEFUL_OPERATOR_MISSING_STATE_DIRECTORY" : {
5775+
"message" : [
5776+
"Cannot restart streaming query with stateful operators because the state directory is empty or missing.",
5777+
"Stateful operators in current batch: [<OpsInCurBatchSeq>].",
5778+
"This typically occurs when state files have been deleted or the streaming query was previously run without stateful operators but restarted with stateful operators.",
5779+
"Please remove the stateful operators, use a new checkpoint location, or restore the missing state files."
5780+
],
5781+
"sqlState" : "42K03"
5782+
},
57745783
"STREAMING_STATEFUL_OPERATOR_NOT_MATCH_IN_STATE_METADATA" : {
57755784
"message" : [
57765785
"Streaming stateful operator name does not match with the operator in state metadata. This likely to happen when user adds/removes/changes stateful operator of existing streaming query.",

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingErrors.scala

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
*/
1717
package org.apache.spark.sql.execution.streaming
1818

19-
import org.apache.spark.SparkException
19+
import org.apache.spark.{SparkException, SparkRuntimeException}
2020

2121
/**
2222
* Object for grouping error messages from streaming query exceptions
@@ -39,4 +39,17 @@ object StreamingErrors {
3939
cause = err
4040
)
4141
}
42+
43+
def statefulOperatorMissingStateDirectory(
44+
opsInCurBatch: Map[Long, String]): Throwable = {
45+
def formatPairString(pair: (Long, String)): String =
46+
s"(OperatorId: ${pair._1} -> OperatorName: ${pair._2})"
47+
48+
new SparkRuntimeException(
49+
errorClass = "STREAMING_STATEFUL_OPERATOR_MISSING_STATE_DIRECTORY",
50+
messageParameters = Map(
51+
"OpsInCurBatchSeq" -> opsInCurBatch.map(formatPairString).mkString(", ")
52+
)
53+
)
54+
}
4255
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/IncrementalExecution.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, MergingSessi
3838
import org.apache.spark.sql.execution.datasources.v2.state.metadata.StateMetadataPartitionReader
3939
import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
4040
import org.apache.spark.sql.execution.python.streaming.{FlatMapGroupsInPandasWithStateExec, TransformWithStateInPySparkExec}
41-
import org.apache.spark.sql.execution.streaming.StreamingQueryPlanTraverseHelper
41+
import org.apache.spark.sql.execution.streaming.{StreamingErrors, StreamingQueryPlanTraverseHelper}
4242
import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, OffsetSeqMetadata}
4343
import org.apache.spark.sql.execution.streaming.operators.stateful.{SessionWindowStateStoreRestoreExec, SessionWindowStateStoreSaveExec, StatefulOperator, StatefulOperatorStateInfo, StateStoreRestoreExec, StateStoreSaveExec, StateStoreWriter, StreamingDeduplicateExec, StreamingDeduplicateWithinWatermarkExec, StreamingGlobalLimitExec, StreamingLocalLimitExec, UpdateEventTimeColumnExec}
4444
import org.apache.spark.sql.execution.streaming.operators.stateful.flatmapgroupswithstate.FlatMapGroupsWithStateExec
@@ -563,6 +563,18 @@ class IncrementalExecution(
563563
stateStoreWriter.getStateInfo.operatorId -> stateStoreWriter.shortName
564564
}.toMap
565565

566+
// Check if state directory is empty when we have stateful operators
567+
if (opMapInPhysicalPlan.nonEmpty) {
568+
val stateDirPath = new Path(new Path(checkpointLocation).getParent, "state")
569+
val fileManager = CheckpointFileManager.create(stateDirPath, hadoopConf)
570+
571+
val stateDirectoryEmpty = !fileManager.exists(stateDirPath) ||
572+
fileManager.list(stateDirPath).isEmpty
573+
if (stateDirectoryEmpty) {
574+
throw StreamingErrors.statefulOperatorMissingStateDirectory(opMapInPhysicalPlan)
575+
}
576+
}
577+
566578
// A map of all (operatorId -> operatorName) in the state metadata
567579
val opMapInMetadata: Map[Long, String] = {
568580
var ret = Map.empty[Long, String]

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -472,4 +472,77 @@ class OperatorStateMetadataSuite extends StreamTest with SharedSparkSession {
472472
}
473473
}
474474
}
475+
476+
test("Restart with stateful operator but empty state directory triggers error") {
477+
withTempDir { checkpointDir =>
478+
val inputData = MemoryStream[Int]
479+
val stream = inputData.toDF()
480+
481+
// Run a streaming query with stateful operator
482+
testStream(stream.dropDuplicates())(
483+
StartStream(checkpointLocation = checkpointDir.toString),
484+
AddData(inputData, 1, 2, 3),
485+
ProcessAllAvailable(),
486+
StopStream)
487+
488+
// Delete the state directory to simulate deleted state files
489+
val stateDir = new Path(checkpointDir.toString, "state")
490+
val fileManager = CheckpointFileManager.create(stateDir, hadoopConf)
491+
fileManager.delete(stateDir)
492+
493+
// Restart the query - should fail with empty state directory error
494+
testStream(stream.dropDuplicates())(
495+
StartStream(checkpointLocation = checkpointDir.toString),
496+
AddData(inputData, 4),
497+
ExpectFailure[SparkRuntimeException] { t =>
498+
def formatPairString(pair: (Long, String)): String =
499+
s"(OperatorId: ${pair._1} -> OperatorName: ${pair._2})"
500+
501+
checkError(
502+
t.asInstanceOf[SparkRuntimeException],
503+
"STREAMING_STATEFUL_OPERATOR_MISSING_STATE_DIRECTORY",
504+
"42K03",
505+
Map("OpsInCurBatchSeq" -> formatPairString(0L -> "dedupe")))
506+
}
507+
)
508+
}
509+
}
510+
511+
test("Restart with stateful operator added to previously stateless query triggers error") {
512+
withTempDir { checkpointDir =>
513+
val inputData = MemoryStream[Int]
514+
515+
// Run a stateless streaming query first
516+
testStream(inputData.toDF().select($"value" * 2 as "doubled"))(
517+
StartStream(checkpointLocation = checkpointDir.toString),
518+
AddData(inputData, 1, 2, 3),
519+
ProcessAllAvailable(),
520+
AddData(inputData, 1, 2, 3),
521+
ProcessAllAvailable(),
522+
StopStream)
523+
524+
// Delete the state directory if it exists (it shouldn't for stateless query)
525+
val stateDir = new Path(checkpointDir.toString, "state")
526+
val fileManager = CheckpointFileManager.create(stateDir, hadoopConf)
527+
if (fileManager.exists(stateDir)) {
528+
fileManager.delete(stateDir)
529+
}
530+
531+
// Restart with a stateful operator added - should fail
532+
testStream(inputData.toDF().dropDuplicates())(
533+
StartStream(checkpointLocation = checkpointDir.toString),
534+
AddData(inputData, 4),
535+
ExpectFailure[SparkRuntimeException] { t =>
536+
def formatPairString(pair: (Long, String)): String =
537+
s"(OperatorId: ${pair._1} -> OperatorName: ${pair._2})"
538+
539+
checkError(
540+
t.asInstanceOf[SparkRuntimeException],
541+
"STREAMING_STATEFUL_OPERATOR_MISSING_STATE_DIRECTORY",
542+
"42K03",
543+
Map("OpsInCurBatchSeq" -> formatPairString(0L -> "dedupe")))
544+
}
545+
)
546+
}
547+
}
475548
}

0 commit comments

Comments
 (0)