@@ -30,8 +30,8 @@ import org.apache.spark.unsafe.types.UTF8String
3030import org .apache .spark .util .{NextIterator , SerializableConfiguration }
3131
3232case class AllColumnFamiliesReaderInfo (
33- colFamilySchemas : List [StateStoreColFamilySchema ] = List .empty,
34- stateVariableInfos : List [TransformWithStateVariableInfo ] = List .empty)
33+ colFamilySchemas : List [StateStoreColFamilySchema ] = List .empty,
34+ stateVariableInfos : List [TransformWithStateVariableInfo ] = List .empty)
3535
3636/**
3737 * An implementation of [[PartitionReaderFactory ]] for State data source. This is used to support
@@ -55,9 +55,10 @@ class StatePartitionReaderFactory(
5555 override def createReader (partition : InputPartition ): PartitionReader [InternalRow ] = {
5656 val stateStoreInputPartition = partition.asInstanceOf [StateStoreInputPartition ]
5757 if (stateStoreInputPartition.sourceOptions.internalOnlyReadAllColumnFamilies) {
58+ require(allColumnFamiliesReaderInfo.isDefined)
5859 new StatePartitionAllColumnFamiliesReader (storeConf, hadoopConf,
59- stateStoreInputPartition, schema, keyStateEncoderSpec,
60- allColumnFamiliesReaderInfo.getOrElse( AllColumnFamiliesReaderInfo ()) )
60+ stateStoreInputPartition, schema, keyStateEncoderSpec, stateStoreColFamilySchemaOpt,
61+ stateSchemaProviderOpt, allColumnFamiliesReaderInfo.get )
6162 } else if (stateStoreInputPartition.sourceOptions.readChangeFeed) {
6263 new StateStoreChangeDataPartitionReader (storeConf, hadoopConf,
6364 stateStoreInputPartition, schema, keyStateEncoderSpec, stateVariableInfoOpt,
@@ -87,23 +88,25 @@ abstract class StatePartitionReaderBase(
8788 extends PartitionReader [InternalRow ] with Logging {
8889 // Used primarily as a placeholder for the value schema in the context of
8990 // state variables used within the transformWithState operator.
90- private val dummySchema : StructType =
91+ private val schemaForValueRow : StructType =
9192 StructType (Array (StructField (" __dummy__" , NullType )))
9293
9394 protected val keySchema : StructType = {
9495 if (SchemaUtil .checkVariableType(stateVariableInfoOpt, StateVariableType .MapState )) {
9596 SchemaUtil .getCompositeKeySchema(schema, partition.sourceOptions)
9697 } else if (partition.sourceOptions.internalOnlyReadAllColumnFamilies) {
97- stateStoreColFamilySchemaOpt.map(_.keySchema).getOrElse(dummySchema)
98+ require(stateStoreColFamilySchemaOpt.isDefined)
99+ stateStoreColFamilySchemaOpt.get.keySchema
98100 } else {
99101 SchemaUtil .getSchemaAsDataType(schema, " key" ).asInstanceOf [StructType ]
100102 }
101103 }
102104
103105 protected val valueSchema : StructType = if (stateVariableInfoOpt.isDefined) {
104- dummySchema
106+ schemaForValueRow
105107 } else if (partition.sourceOptions.internalOnlyReadAllColumnFamilies) {
106- stateStoreColFamilySchemaOpt.map(_.valueSchema).getOrElse(dummySchema)
108+ require(stateStoreColFamilySchemaOpt.isDefined)
109+ stateStoreColFamilySchemaOpt.get.valueSchema
107110 } else {
108111 SchemaUtil .getSchemaAsDataType(
109112 schema, " value" ).asInstanceOf [StructType ]
@@ -262,14 +265,15 @@ class StatePartitionAllColumnFamiliesReader(
262265 partition : StateStoreInputPartition ,
263266 schema : StructType ,
264267 keyStateEncoderSpec : KeyStateEncoderSpec ,
268+ defaultStateStoreColFamilySchemaOpt : Option [StateStoreColFamilySchema ],
269+ stateSchemaProviderOpt : Option [StateSchemaProvider ],
265270 allColumnFamiliesReaderInfo : AllColumnFamiliesReaderInfo )
266271 extends StatePartitionReaderBase (
267272 storeConf,
268273 hadoopConf, partition, schema,
269274 keyStateEncoderSpec, None ,
270- allColumnFamiliesReaderInfo.colFamilySchemas.find(
271- _.colFamilyName == StateStore .DEFAULT_COL_FAMILY_NAME ),
272- None , None ) {
275+ defaultStateStoreColFamilySchemaOpt,
276+ stateSchemaProviderOpt, None ) {
273277
274278 private val stateStoreColFamilySchemas = allColumnFamiliesReaderInfo.colFamilySchemas
275279 private val stateVariableInfos = allColumnFamiliesReaderInfo.stateVariableInfos
@@ -280,7 +284,6 @@ class StatePartitionAllColumnFamiliesReader(
280284 StateVariableType .ListState )
281285 }
282286
283- // Override provider to register ALL column families
284287 override protected lazy val provider : StateStoreProvider = {
285288 val stateStoreId = StateStoreId (partition.sourceOptions.stateCheckpointLocation.toString,
286289 partition.sourceOptions.operatorId, partition.partition, partition.sourceOptions.storeName)
@@ -289,7 +292,7 @@ class StatePartitionAllColumnFamiliesReader(
289292 StateStoreProvider .createAndInit(
290293 stateStoreProviderId, keySchema, valueSchema, keyStateEncoderSpec,
291294 useColumnFamilies, storeConf, hadoopConf.value,
292- useMultipleValuesPerKey = false , stateSchemaProvider = None )
295+ useMultipleValuesPerKey = false , stateSchemaProviderOpt )
293296 }
294297
295298 // Use a single store instance for both registering column families and iteration.
@@ -303,8 +306,29 @@ class StatePartitionAllColumnFamiliesReader(
303306 getStartStoreUniqueId
304307 )
305308
309+ def checkAllColFamiliesExist (colFamilyNames : List [String ]) = {
310+ // Filter out DEFAULT column family from validation for two reasons:
311+ // 1. Some operators (e.g., stream-stream join v3) don't include DEFAULT in their schema
312+ // because the underlying RocksDB creates "default" column family automatically
313+ // 2. The default column family schema is handled separately via
314+ // defaultStateStoreColFamilySchemaOpt, so no need to verify it here
315+ val actualCFs = colFamilyNames.toSet.filter(_ != StateStore .DEFAULT_COL_FAMILY_NAME )
316+ val expectedCFs = stateStore.allColumnFamilyNames
317+ .filter(_ != StateStore .DEFAULT_COL_FAMILY_NAME )
318+
319+ // Validation: All column families found in the checkpoint must be declared in the schema.
320+ // It's acceptable if some schema CFs are not in expectedCFs - this just means those
321+ // column families have no data yet in the checkpoint
322+ // (they'll be created during registration).
323+ // However, if the checkpoint contains CFs not in the schema, it indicates a mismatch.
324+ require(expectedCFs.subsetOf(actualCFs),
325+ s " Checkpoint contains unexpected column families. " +
326+ s " Column families in checkpoint but not in schema: ${expectedCFs.diff(actualCFs)}" )
327+ }
328+
306329 // Register all column families from the schema
307330 if (stateStoreColFamilySchemas.length > 1 ) {
331+ checkAllColFamiliesExist(stateStoreColFamilySchemas.map(_.colFamilyName))
308332 stateStoreColFamilySchemas.foreach { cfSchema =>
309333 cfSchema.colFamilyName match {
310334 case StateStore .DEFAULT_COL_FAMILY_NAME => // createAndInit has registered default
0 commit comments