@@ -30,8 +30,8 @@ import org.apache.spark.unsafe.types.UTF8String
3030import org .apache .spark .util .{NextIterator , SerializableConfiguration }
3131
3232case class AllColumnFamiliesReaderInfo (
33- colFamilySchemas : List [StateStoreColFamilySchema ] = List .empty,
34- stateVariableInfos : List [TransformWithStateVariableInfo ] = List .empty)
33+ colFamilySchemas : List [StateStoreColFamilySchema ] = List .empty,
34+ stateVariableInfos : List [TransformWithStateVariableInfo ] = List .empty)
3535
3636/**
3737 * An implementation of [[PartitionReaderFactory ]] for State data source. This is used to support
@@ -55,9 +55,10 @@ class StatePartitionReaderFactory(
5555 override def createReader (partition : InputPartition ): PartitionReader [InternalRow ] = {
5656 val stateStoreInputPartition = partition.asInstanceOf [StateStoreInputPartition ]
5757 if (stateStoreInputPartition.sourceOptions.internalOnlyReadAllColumnFamilies) {
58+ require(allColumnFamiliesReaderInfo.isDefined)
5859 new StatePartitionAllColumnFamiliesReader (storeConf, hadoopConf,
59- stateStoreInputPartition, schema, keyStateEncoderSpec,
60- allColumnFamiliesReaderInfo.getOrElse( AllColumnFamiliesReaderInfo ()) )
60+ stateStoreInputPartition, schema, keyStateEncoderSpec, stateStoreColFamilySchemaOpt,
61+ stateSchemaProviderOpt, allColumnFamiliesReaderInfo.get )
6162 } else if (stateStoreInputPartition.sourceOptions.readChangeFeed) {
6263 new StateStoreChangeDataPartitionReader (storeConf, hadoopConf,
6364 stateStoreInputPartition, schema, keyStateEncoderSpec, stateVariableInfoOpt,
@@ -87,23 +88,25 @@ abstract class StatePartitionReaderBase(
8788 extends PartitionReader [InternalRow ] with Logging {
8889 // Used primarily as a placeholder for the value schema in the context of
8990 // state variables used within the transformWithState operator.
90- private val dummySchema : StructType =
91+ private val schemaForValueRow : StructType =
9192 StructType (Array (StructField (" __dummy__" , NullType )))
9293
9394 protected val keySchema : StructType = {
9495 if (SchemaUtil .checkVariableType(stateVariableInfoOpt, StateVariableType .MapState )) {
9596 SchemaUtil .getCompositeKeySchema(schema, partition.sourceOptions)
9697 } else if (partition.sourceOptions.internalOnlyReadAllColumnFamilies) {
97- stateStoreColFamilySchemaOpt.map(_.keySchema).getOrElse(dummySchema)
98+ require(stateStoreColFamilySchemaOpt.isDefined)
99+ stateStoreColFamilySchemaOpt.map(_.keySchema).get
98100 } else {
99101 SchemaUtil .getSchemaAsDataType(schema, " key" ).asInstanceOf [StructType ]
100102 }
101103 }
102104
103105 protected val valueSchema : StructType = if (stateVariableInfoOpt.isDefined) {
104- dummySchema
106+ schemaForValueRow
105107 } else if (partition.sourceOptions.internalOnlyReadAllColumnFamilies) {
106- stateStoreColFamilySchemaOpt.map(_.valueSchema).getOrElse(dummySchema)
108+ require(stateStoreColFamilySchemaOpt.isDefined)
109+ stateStoreColFamilySchemaOpt.map(_.valueSchema).get
107110 } else {
108111 SchemaUtil .getSchemaAsDataType(
109112 schema, " value" ).asInstanceOf [StructType ]
@@ -262,14 +265,15 @@ class StatePartitionAllColumnFamiliesReader(
262265 partition : StateStoreInputPartition ,
263266 schema : StructType ,
264267 keyStateEncoderSpec : KeyStateEncoderSpec ,
268+ defaultStateStoreColFamilySchemaOpt : Option [StateStoreColFamilySchema ],
269+ stateSchemaProviderOpt : Option [StateSchemaProvider ],
265270 allColumnFamiliesReaderInfo : AllColumnFamiliesReaderInfo )
266271 extends StatePartitionReaderBase (
267272 storeConf,
268273 hadoopConf, partition, schema,
269274 keyStateEncoderSpec, None ,
270- allColumnFamiliesReaderInfo.colFamilySchemas.find(
271- _.colFamilyName == StateStore .DEFAULT_COL_FAMILY_NAME ),
272- None , None ) {
275+ defaultStateStoreColFamilySchemaOpt,
276+ stateSchemaProviderOpt, None ) {
273277
274278 private val stateStoreColFamilySchemas = allColumnFamiliesReaderInfo.colFamilySchemas
275279 private val stateVariableInfos = allColumnFamiliesReaderInfo.stateVariableInfos
@@ -280,7 +284,6 @@ class StatePartitionAllColumnFamiliesReader(
280284 StateVariableType .ListState )
281285 }
282286
283- // Override provider to register ALL column families
284287 override protected lazy val provider : StateStoreProvider = {
285288 val stateStoreId = StateStoreId (partition.sourceOptions.stateCheckpointLocation.toString,
286289 partition.sourceOptions.operatorId, partition.partition, partition.sourceOptions.storeName)
@@ -289,7 +292,30 @@ class StatePartitionAllColumnFamiliesReader(
289292 StateStoreProvider .createAndInit(
290293 stateStoreProviderId, keySchema, valueSchema, keyStateEncoderSpec,
291294 useColumnFamilies, storeConf, hadoopConf.value,
292- useMultipleValuesPerKey = false , stateSchemaProvider = None )
295+ useMultipleValuesPerKey = false , stateSchemaProviderOpt)
296+ }
297+
298+
299+ private def checkAllColFamiliesExist (
300+ colFamilyNames : List [String ], stateStore : StateStore
301+ ): Unit = {
302+ // Filter out DEFAULT column family from validation for two reasons:
303+ // 1. Some operators (e.g., stream-stream join v3) don't include DEFAULT in their schema
304+ // because the underlying RocksDB creates "default" column family automatically
305+ // 2. The default column family schema is handled separately via
306+ // defaultStateStoreColFamilySchemaOpt, so no need to verify it here
307+ val actualCFs = colFamilyNames.toSet.filter(_ != StateStore .DEFAULT_COL_FAMILY_NAME )
308+ val expectedCFs = stateStore.allColumnFamilyNames
309+ .filter(_ != StateStore .DEFAULT_COL_FAMILY_NAME )
310+
311+ // Validation: All column families found in the checkpoint must be declared in the schema.
312+ // It's acceptable if some schema CFs are not in expectedCFs - this just means those
313+ // column families have no data yet in the checkpoint
314+ // (they'll be created during registration).
315+ // However, if the checkpoint contains CFs not in the schema, it indicates a mismatch.
316+ require(expectedCFs.subsetOf(actualCFs),
317+ s " Checkpoint contains unexpected column families. " +
318+ s " Column families in checkpoint but not in schema: ${expectedCFs.diff(actualCFs)}" )
293319 }
294320
295321 // Use a single store instance for both registering column families and iteration.
@@ -305,6 +331,7 @@ class StatePartitionAllColumnFamiliesReader(
305331
306332 // Register all column families from the schema
307333 if (stateStoreColFamilySchemas.length > 1 ) {
334+ checkAllColFamiliesExist(stateStoreColFamilySchemas.map(_.colFamilyName), stateStore)
308335 stateStoreColFamilySchemas.foreach { cfSchema =>
309336 cfSchema.colFamilyName match {
310337 case StateStore .DEFAULT_COL_FAMILY_NAME => // createAndInit has registered default
0 commit comments