@@ -21,8 +21,10 @@ import org.apache.spark.sql.catalyst.InternalRow
2121import org .apache .spark .sql .catalyst .expressions .{GenericInternalRow , UnsafeRow }
2222import org .apache .spark .sql .connector .read .{InputPartition , PartitionReader , PartitionReaderFactory }
2323import org .apache .spark .sql .execution .datasources .v2 .state .utils .SchemaUtil
24+ import org .apache .spark .sql .execution .streaming .operators .stateful .StatefulOperatorsUtils
2425import org .apache .spark .sql .execution .streaming .operators .stateful .join .SymmetricHashJoinStateManager
25- import org .apache .spark .sql .execution .streaming .operators .stateful .transformwithstate .{StateStoreColumnFamilySchemaUtils , StateVariableType , TransformWithStateVariableInfo }
26+ import org .apache .spark .sql .execution .streaming .operators .stateful .transformwithstate .{StateStoreColumnFamilySchemaUtils , StateVariableType , TransformWithStateVariableInfo , TransformWithStateVariableUtils }
27+ import org .apache .spark .sql .execution .streaming .operators .stateful .transformwithstate .timers .TimerStateUtils
2628import org .apache .spark .sql .execution .streaming .state ._
2729import org .apache .spark .sql .execution .streaming .state .RecordType .{getRecordTypeAsString , RecordType }
2830import org .apache .spark .sql .types .{NullType , StructField , StructType }
@@ -32,7 +34,8 @@ import org.apache.spark.util.{NextIterator, SerializableConfiguration}
3234case class AllColumnFamiliesReaderInfo (
3335 colFamilySchemas : List [StateStoreColFamilySchema ] = List .empty,
3436 stateVariableInfos : List [TransformWithStateVariableInfo ] = List .empty,
35- operatorName : String = " " )
37+ operatorName : String = " " ,
38+ stateFormatVersion : Option [Int ] = None )
3639
3740/**
3841 * An implementation of [[PartitionReaderFactory ]] for State data source. This is used to support
@@ -50,8 +53,7 @@ class StatePartitionReaderFactory(
5053 stateStoreColFamilySchemaOpt : Option [StateStoreColFamilySchema ],
5154 stateSchemaProviderOpt : Option [StateSchemaProvider ],
5255 joinColFamilyOpt : Option [String ],
53- allColumnFamiliesReaderInfo : Option [AllColumnFamiliesReaderInfo ],
54- stateFormatVersion : Option [Int ])
56+ allColumnFamiliesReaderInfo : Option [AllColumnFamiliesReaderInfo ])
5557 extends PartitionReaderFactory {
5658
5759 override def createReader (partition : InputPartition ): PartitionReader [InternalRow ] = {
@@ -60,7 +62,7 @@ class StatePartitionReaderFactory(
6062 require(allColumnFamiliesReaderInfo.isDefined)
6163 new StatePartitionAllColumnFamiliesReader (storeConf, hadoopConf,
6264 stateStoreInputPartition, schema, keyStateEncoderSpec, stateStoreColFamilySchemaOpt,
63- stateSchemaProviderOpt, allColumnFamiliesReaderInfo.get, stateFormatVersion )
65+ stateSchemaProviderOpt, allColumnFamiliesReaderInfo.get)
6466 } else if (stateStoreInputPartition.sourceOptions.readChangeFeed) {
6567 new StateStoreChangeDataPartitionReader (storeConf, hadoopConf,
6668 stateStoreInputPartition, schema, keyStateEncoderSpec, stateVariableInfoOpt,
@@ -270,8 +272,7 @@ class StatePartitionAllColumnFamiliesReader(
270272 keyStateEncoderSpec : KeyStateEncoderSpec ,
271273 defaultStateStoreColFamilySchemaOpt : Option [StateStoreColFamilySchema ],
272274 stateSchemaProviderOpt : Option [StateSchemaProvider ],
273- allColumnFamiliesReaderInfo : AllColumnFamiliesReaderInfo ,
274- stateFormatVersion : Option [Int ])
275+ allColumnFamiliesReaderInfo : AllColumnFamiliesReaderInfo )
275276 extends StatePartitionReaderBase (
276277 storeConf,
277278 hadoopConf, partition, schema,
@@ -282,14 +283,55 @@ class StatePartitionAllColumnFamiliesReader(
282283 private val stateStoreColFamilySchemas = allColumnFamiliesReaderInfo.colFamilySchemas
283284 private val stateVariableInfos = allColumnFamiliesReaderInfo.stateVariableInfos
284285 private val operatorName = allColumnFamiliesReaderInfo.operatorName
286+ private val stateFormatVersion = allColumnFamiliesReaderInfo.stateFormatVersion
285287
286- // Create the extractor for partition key extraction
287- private lazy val partitionKeyExtractor = SchemaUtil .getExtractor(
288- operatorName,
289- keySchema,
290- partition.sourceOptions.storeName,
291- stateVariableInfos.headOption,
292- stateFormatVersion)
288+ private def isDefaultColFamilyInTWS (operatorName : String , colFamilyName : String ): Boolean = {
289+ StatefulOperatorsUtils .TRANSFORM_WITH_STATE_OP_NAMES .contains(operatorName) &&
290+ colFamilyName == StateStore .DEFAULT_COL_FAMILY_NAME
291+ }
292+
293+ /**
294+ * Extracts the base state variable name from internal column family names.
295+ */
296+ private def getBaseStateName (colFamilyName : String ): String = {
297+ if (StateStoreColumnFamilySchemaUtils .isTtlColFamilyName(colFamilyName)) {
298+ StateStoreColumnFamilySchemaUtils .getStateNameFromTtlColFamily(colFamilyName)
299+ } else if (StateStoreColumnFamilySchemaUtils .isMinExpiryIndexCFName(colFamilyName)) {
300+ StateStoreColumnFamilySchemaUtils .getStateNameFromMinExpiryIndexCFName(colFamilyName)
301+ } else if (StateStoreColumnFamilySchemaUtils .isCountIndexCFName(colFamilyName)) {
302+ StateStoreColumnFamilySchemaUtils .getStateNameFromCountIndexCFName(colFamilyName)
303+ } else if (TransformWithStateVariableUtils .isRowCounterCFName(colFamilyName)) {
304+ TransformWithStateVariableUtils .getStateNameFromRowCounterCFName(colFamilyName)
305+ } else {
306+ colFamilyName
307+ }
308+ }
309+
310+
311+ private def getStateVarInfo (
312+ colFamilyName : String ): Option [TransformWithStateVariableInfo ] = {
313+ if (TimerStateUtils .isTimerSecondaryIndexCF(colFamilyName)) {
314+ Some (TransformWithStateVariableUtils .getTimerState(colFamilyName))
315+ } else {
316+ stateVariableInfos.find(_.stateName == getBaseStateName(colFamilyName))
317+ }
318+ }
319+
320+ // Create extractors for each column family - each column family may have different key schema
321+ private lazy val partitionKeyExtractors : Map [String , StatePartitionKeyExtractor ] = {
322+ stateStoreColFamilySchemas
323+ .filter(schema => ! isDefaultColFamilyInTWS(operatorName, schema.colFamilyName))
324+ .map { cfSchema =>
325+ val extractor = SchemaUtil .getPartitionKeyExtractor(
326+ operatorName,
327+ cfSchema.keySchema,
328+ partition.sourceOptions.storeName,
329+ cfSchema.colFamilyName,
330+ getStateVarInfo(cfSchema.colFamilyName),
331+ stateFormatVersion)
332+ cfSchema.colFamilyName -> extractor
333+ }.toMap
334+ }
293335
294336 private def isListType (colFamilyName : String ): Boolean = {
295337 SchemaUtil .checkVariableType(
@@ -368,22 +410,25 @@ class StatePartitionAllColumnFamiliesReader(
368410
369411 override lazy val iter : Iterator [InternalRow ] = {
370412 // Iterate all column families and concatenate results
371- stateStoreColFamilySchemas.iterator.flatMap { cfSchema =>
372- if (isListType(cfSchema.colFamilyName)) {
373- store.iterator(cfSchema.colFamilyName).flatMap(
374- pair =>
375- store.valuesIterator(pair.key, cfSchema.colFamilyName).map {
376- value =>
377- SchemaUtil .unifyStateRowPairAsRawBytes(
378- (pair.key, value), cfSchema.colFamilyName, partitionKeyExtractor)
379- }
380- )
381- } else {
382- store.iterator(cfSchema.colFamilyName).map { pair =>
383- SchemaUtil .unifyStateRowPairAsRawBytes(
384- (pair.key, pair.value), cfSchema.colFamilyName, partitionKeyExtractor)
413+ stateStoreColFamilySchemas.iterator
414+ .filter(schema => ! isDefaultColFamilyInTWS(operatorName, schema.colFamilyName))
415+ .flatMap { cfSchema =>
416+ val extractor = partitionKeyExtractors(cfSchema.colFamilyName)
417+ if (isListType(cfSchema.colFamilyName)) {
418+ store.iterator(cfSchema.colFamilyName).flatMap(
419+ pair =>
420+ store.valuesIterator(pair.key, cfSchema.colFamilyName).map {
421+ value =>
422+ SchemaUtil .unifyStateRowPairAsRawBytes(
423+ (pair.key, value), cfSchema.colFamilyName, extractor)
424+ }
425+ )
426+ } else {
427+ store.iterator(cfSchema.colFamilyName).map { pair =>
428+ SchemaUtil .unifyStateRowPairAsRawBytes(
429+ (pair.key, pair.value), cfSchema.colFamilyName, extractor)
430+ }
385431 }
386- }
387432 }
388433 }
389434
0 commit comments