Skip to content

Commit b4fe10f

Browse files
committed
address comments
1 parent c474783 commit b4fe10f

File tree

10 files changed

+540
-456
lines changed

10 files changed

+540
-456
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -193,8 +193,12 @@ class StateDataSource extends TableProvider with DataSourceRegister with Logging
193193

194194
val stateVars = twsOperatorProperties.stateVariables
195195
val stateVarInfo = stateVars.filter(stateVar => stateVar.stateName == stateVarName)
196+
// This check is to make sure only one stateVarInfo exists in stateVars.
197+
// We skip this check when testing internal column correctness by querying through spark
198+
// because internal columns (e.g., $ttl_, $min_, $count_) are not part of the user-defined
199+
// state variables and therefore not registered in stateVars.
196200
if (stateVarInfo.size != 1 &&
197-
!StateStoreColumnFamilySchemaUtils.isInternalColFamilyTestOnly(stateVarName)) {
201+
!StateStoreColumnFamilySchemaUtils.isTestingInternalColFamily(stateVarName)) {
198202
throw StateDataSourceErrors.invalidOptionValue(STATE_VAR_NAME,
199203
s"State variable $stateVarName is not defined for the transformWithState operator.")
200204
}
@@ -260,8 +264,10 @@ class StateDataSource extends TableProvider with DataSourceRegister with Logging
260264
StateStoreReaderInfo = {
261265
val storeMetadata = StateDataSource.getStateStoreMetadata(sourceOptions, hadoopConf)
262266
if (!sourceOptions.internalOnlyReadAllColumnFamilies) {
263-
// skipping runStateVarChecks for StatePartitionAllColumnFamiliesReader because
264-
// we won't specify any stateVars when querying a TWS operator
267+
// Skip runStateVarChecks when reading all column families (for repartitioning) because:
268+
// 1. We're not targeting a specific state variable, so stateVarName won't be specified
269+
// 2. The validation logic assumes a single state variable is being queried
270+
// 3. For repartitioning, we need to read all column families without these constraints
265271
runStateVarChecks(sourceOptions, storeMetadata)
266272
}
267273

@@ -271,7 +277,7 @@ class StateDataSource extends TableProvider with DataSourceRegister with Logging
271277
var stateSchemaProvider: Option[StateSchemaProvider] = None
272278
var joinColFamilyOpt: Option[String] = None
273279
var timeMode: String = TimeMode.None.toString
274-
var stateStoreColFamilySchemas: List[StateStoreColFamilySchema] = List.empty
280+
var stateStoreColFamilySchemas: Set[StateStoreColFamilySchema] = Set.empty
275281
var stateVariableInfos: List[TransformWithStateVariableInfo] = List.empty
276282

277283
if (sourceOptions.joinSide == JoinSideValues.none) {
@@ -291,14 +297,19 @@ class StateDataSource extends TableProvider with DataSourceRegister with Logging
291297
if (sourceOptions.readRegisteredTimers) {
292298
stateVarName = TimerStateUtils.getTimerStateVarNames(timeMode)._1
293299
}
300+
// When reading all column families (for repartitioning), we collect all state variable
301+
// infos instead of validating a specific stateVarName. This skips the normal validation
302+
// logic because we're not reading a specific state variable - we're reading all of them.
294303
if (sourceOptions.internalOnlyReadAllColumnFamilies) {
295304
stateVariableInfos = operatorProperties.stateVariables
296305
} else {
297306
var stateVarInfoList = operatorProperties.stateVariables
298307
.filter(stateVar => stateVar.stateName == stateVarName)
299308
if (stateVarInfoList.isEmpty &&
300-
StateStoreColumnFamilySchemaUtils.isInternalColFamilyTestOnly(stateVarName)) {
309+
StateStoreColumnFamilySchemaUtils.isTestingInternalColFamily(stateVarName)) {
301310
// pass this dummy TWSStateVariableInfo for TWS internal column family during testing,
311+
// because internalColumns are not register in operatorProperties.stateVariables,
312+
// thus stateVarInfoList will be empty.
302313
stateVarInfoList = List(TransformWithStateVariableInfo(
303314
stateVarName, StateVariableType.ValueState, false
304315
))
@@ -339,8 +350,9 @@ class StateDataSource extends TableProvider with DataSourceRegister with Logging
339350
val stateSchema = manager.readSchemaFile()
340351

341352
if (sourceOptions.internalOnlyReadAllColumnFamilies) {
342-
// Store all column family schemas for multi-CF reading
343-
stateStoreColFamilySchemas = stateSchema
353+
// Store all column family schemas for multi-CF reading.
354+
// Convert to Set to ensure no duplicates and avoid processing same CF twice.
355+
stateStoreColFamilySchemas = stateSchema.toSet
344356
}
345357
// When reading all column families for Join V3, no specific state variable is targeted,
346358
// so stateVarName defaults to DEFAULT_COL_FAMILY_NAME.

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import org.apache.spark.unsafe.types.UTF8String
3030
import org.apache.spark.util.{NextIterator, SerializableConfiguration}
3131

3232
case class AllColumnFamiliesReaderInfo(
33-
colFamilySchemas: List[StateStoreColFamilySchema] = List.empty,
33+
colFamilySchemas: Set[StateStoreColFamilySchema] = Set.empty,
3434
stateVariableInfos: List[TransformWithStateVariableInfo] = List.empty)
3535

3636
/**
@@ -150,7 +150,7 @@ abstract class StatePartitionReaderBase(
150150
require(stateStoreColFamilySchemaOpt.isDefined)
151151
val stateStoreColFamilySchema = stateStoreColFamilySchemaOpt.get
152152
val isInternal = partition.sourceOptions.readRegisteredTimers ||
153-
StateStoreColumnFamilySchemaUtils.isInternalColFamilyTestOnly(
153+
StateStoreColumnFamilySchemaUtils.isTestingInternalColFamily(
154154
stateStoreColFamilySchema.colFamilyName)
155155
require(stateStoreColFamilySchema.keyStateEncoderSpec.isDefined)
156156
store.createColFamilyIfAbsent(
@@ -289,7 +289,7 @@ class StatePartitionAllColumnFamiliesReader(
289289
val stateStoreId = StateStoreId(partition.sourceOptions.stateCheckpointLocation.toString,
290290
partition.sourceOptions.operatorId, partition.partition, partition.sourceOptions.storeName)
291291
val stateStoreProviderId = StateStoreProviderId(stateStoreId, partition.queryId)
292-
val useColumnFamilies = stateStoreColFamilySchemas.length > 1
292+
val useColumnFamilies = stateStoreColFamilySchemas.size > 1
293293
StateStoreProvider.createAndInit(
294294
stateStoreProviderId, keySchema, valueSchema, keyStateEncoderSpec,
295295
useColumnFamilies, storeConf, hadoopConf.value,
@@ -315,8 +315,8 @@ class StatePartitionAllColumnFamiliesReader(
315315
// (they'll be created during registration).
316316
// However, if the checkpoint contains CFs not in the schema, it indicates a mismatch.
317317
require(expectedCFs.subsetOf(actualCFs),
318-
s"Checkpoint contains unexpected column families. " +
319-
s"Column families in checkpoint but not in schema: ${expectedCFs.diff(actualCFs)}")
318+
s"Some column families are present in the state store but missing in the metadata. " +
319+
s"Column families in state store but not in metadata: ${expectedCFs.diff(actualCFs)}")
320320
}
321321

322322
// Use a single store instance for both registering column families and iteration.
@@ -331,13 +331,14 @@ class StatePartitionAllColumnFamiliesReader(
331331
)
332332

333333
// Register all column families from the schema
334-
if (stateStoreColFamilySchemas.length > 1) {
335-
checkAllColFamiliesExist(stateStoreColFamilySchemas.map(_.colFamilyName), stateStore)
334+
if (stateStoreColFamilySchemas.size > 1) {
335+
checkAllColFamiliesExist(stateStoreColFamilySchemas.map(_.colFamilyName).toList, stateStore)
336336
stateStoreColFamilySchemas.foreach { cfSchema =>
337337
cfSchema.colFamilyName match {
338338
case StateStore.DEFAULT_COL_FAMILY_NAME => // createAndInit has registered default
339339
case _ =>
340-
val isInternal = cfSchema.colFamilyName.startsWith("$")
340+
val isInternal =
341+
StateStoreColumnFamilySchemaUtils.isInternalColumn(cfSchema.colFamilyName)
341342
val useMultipleValuesPerKey = isListType(cfSchema.colFamilyName)
342343
require(cfSchema.keyStateEncoderSpec.isDefined,
343344
s"keyStateEncoderSpec must be defined for column family ${cfSchema.colFamilyName}")

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/StateStoreColumnFamilySchemaUtils.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,15 +99,19 @@ object StateStoreColumnFamilySchemaUtils {
9999
def getStateNameFromCountIndexCFName(colFamilyName: String): String =
100100
getStateName(COUNT_INDEX_PREFIX, colFamilyName)
101101

102+
def isInternalColumn(colFamilyName: String): Boolean = {
103+
colFamilyName.startsWith("$")
104+
}
105+
102106
/**
103107
* Returns true if the column family is internal (starts with "$") and we are in testing mode.
104108
* This is used to allow internal column families to be read during tests.
105109
*
106110
* @param colFamilyName The name of the column family to check
107111
* @return true if this is an internal column family and Utils.isTesting is true
108112
*/
109-
def isInternalColFamilyTestOnly(colFamilyName: String): Boolean = {
110-
org.apache.spark.util.Utils.isTesting && colFamilyName.startsWith("$")
113+
def isTestingInternalColFamily(colFamilyName: String): Boolean = {
114+
org.apache.spark.util.Utils.isTesting && isInternalColumn(colFamilyName)
111115
}
112116

113117
def getValueStateSchema[T](

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,9 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
104104
override def valuesIterator(key: UnsafeRow, colFamilyName: String): Iterator[UnsafeRow] = {
105105
throw StateStoreErrors.unsupportedOperationException("multipleValuesPerKey", "HDFSStateStore")
106106
}
107+
108+
override def allColumnFamilyNames: Set[String] =
109+
Set[String](StateStore.DEFAULT_COL_FAMILY_NAME)
107110
}
108111

109112
/** Implementation of [[StateStore]] API which is backed by an HDFS-compatible file system */
@@ -146,8 +149,8 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
146149
throw StateStoreErrors.multipleColumnFamiliesNotSupported(providerName)
147150
}
148151

149-
override def allColumnFamilyNames: collection.Set[String] =
150-
collection.Set[String](StateStore.DEFAULT_COL_FAMILY_NAME)
152+
override def allColumnFamilyNames: Set[String] =
153+
Set[String](StateStore.DEFAULT_COL_FAMILY_NAME)
151154

152155
// Multiple col families are not supported with HDFSBackedStateStoreProvider. Throw an exception
153156
// if the user tries to use a non-default col family.

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,7 @@ class RocksDB(
340340
* Returns all column family names currently registered in RocksDB.
341341
* This includes column families loaded from checkpoint metadata.
342342
*/
343-
def allColumnFamilyNames: collection.Set[String] = {
343+
def allColumnFamilyNames: scala.collection.immutable.Set[String] = {
344344
colFamilyNameToInfoMap.asScala.keySet.toSet
345345
}
346346

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -610,7 +610,7 @@ private[sql] class RocksDBStateStoreProvider
610610

611611
override def hasCommitted: Boolean = state == COMMITTED
612612

613-
override def allColumnFamilyNames: collection.Set[String] = {
613+
override def allColumnFamilyNames: Set[String] = {
614614
rocksDB.allColumnFamilyNames
615615
}
616616

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,13 @@ trait ReadStateStore {
184184
* This method is idempotent and safe to call multiple times.
185185
*/
186186
def release(): Unit
187+
188+
/**
189+
* Returns all column family names in this state store.
190+
*
191+
* @return Set of all column family names
192+
*/
193+
def allColumnFamilyNames: Set[String]
187194
}
188195

189196
/**
@@ -310,13 +317,6 @@ trait StateStore extends ReadStateStore {
310317
* Whether all updates have been committed
311318
*/
312319
def hasCommitted: Boolean
313-
314-
/**
315-
* Returns all column family names in this state store.
316-
*
317-
* @return Set of all column family names
318-
*/
319-
def allColumnFamilyNames: collection.Set[String]
320320
}
321321

322322
/** Wraps the instance of StateStore to make the instance read-only. */
@@ -349,6 +349,8 @@ class WrappedReadStateStore(store: StateStore) extends ReadStateStore {
349349
override def valuesIterator(key: UnsafeRow, colFamilyName: String): Iterator[UnsafeRow] = {
350350
store.valuesIterator(key, colFamilyName)
351351
}
352+
353+
override def allColumnFamilyNames: Set[String] = store.allColumnFamilyNames
352354
}
353355

354356
/**

0 commit comments

Comments
 (0)