Skip to content

Commit 887f803

Browse files
committed
address comment
1 parent b4fe10f commit 887f803

File tree

3 files changed

+11
-3
lines changed

3 files changed

+11
-3
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,13 @@ import org.apache.spark.sql.types.{NullType, StructField, StructType}
2929
import org.apache.spark.unsafe.types.UTF8String
3030
import org.apache.spark.util.{NextIterator, SerializableConfiguration}
3131

32+
/**
33+
* Information used specifically by StatePartitionAllColumnFamiliesReader
34+
* @param colFamilySchemas a set of ColFamilySchema for all column families in an operator.
35+
* The reader relies on this field to read data for all column family
36+
* @param stateVariableInfos a list of TransformWithStateVariableInfo for state variables
37+
* in TWS operator. The reader relies on this to check variable type
38+
*/
3239
case class AllColumnFamiliesReaderInfo(
3340
colFamilySchemas: Set[StateStoreColFamilySchema] = Set.empty,
3441
stateVariableInfos: List[TransformWithStateVariableInfo] = List.empty)
@@ -338,7 +345,7 @@ class StatePartitionAllColumnFamiliesReader(
338345
case StateStore.DEFAULT_COL_FAMILY_NAME => // createAndInit has registered default
339346
case _ =>
340347
val isInternal =
341-
StateStoreColumnFamilySchemaUtils.isInternalColumn(cfSchema.colFamilyName)
348+
StateStoreColumnFamilySchemaUtils.isInternalColFamily(cfSchema.colFamilyName)
342349
val useMultipleValuesPerKey = isListType(cfSchema.colFamilyName)
343350
require(cfSchema.keyStateEncoderSpec.isDefined,
344351
s"keyStateEncoderSpec must be defined for column family ${cfSchema.colFamilyName}")

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/StateStoreColumnFamilySchemaUtils.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ object StateStoreColumnFamilySchemaUtils {
9999
def getStateNameFromCountIndexCFName(colFamilyName: String): String =
100100
getStateName(COUNT_INDEX_PREFIX, colFamilyName)
101101

102-
def isInternalColumn(colFamilyName: String): Boolean = {
102+
def isInternalColFamily(colFamilyName: String): Boolean = {
103103
colFamilyName.startsWith("$")
104104
}
105105

@@ -111,7 +111,7 @@ object StateStoreColumnFamilySchemaUtils {
111111
* @return true if this is an internal column family and Utils.isTesting is true
112112
*/
113113
def isTestingInternalColFamily(colFamilyName: String): Boolean = {
114-
org.apache.spark.util.Utils.isTesting && isInternalColumn(colFamilyName)
114+
org.apache.spark.util.Utils.isTesting && isInternalColFamily(colFamilyName)
115115
}
116116

117117
def getValueStateSchema[T](

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1320,6 +1320,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession
13201320
// Create a test column family
13211321
val testCfName = "test_cf"
13221322
db.createColFamilyIfAbsent(testCfName, isInternal = false)
1323+
assert(db.allColumnFamilyNames == Set(StateStore.DEFAULT_COL_FAMILY_NAME, testCfName))
13231324

13241325
// Write initial data
13251326
db.load(0)

0 commit comments

Comments
 (0)