Skip to content

Commit d6a5413

Browse files
committed
address comment
addres comment 2
1 parent 29827e7 commit d6a5413

File tree

7 files changed

+1699
-974
lines changed

7 files changed

+1699
-974
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StatePartitionWriter.scala

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,22 @@ package org.apache.spark.sql.execution.streaming.state
1919
import java.util.UUID
2020

2121
import scala.collection.MapView
22-
import scala.collection.immutable.HashMap
2322

2423
import org.apache.hadoop.conf.Configuration
2524
import org.apache.hadoop.fs.Path
2625

2726
import org.apache.spark.sql.catalyst.InternalRow
2827
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
28+
import org.apache.spark.sql.execution.streaming.operators.stateful.StatefulOperatorsUtils
2929
import org.apache.spark.sql.execution.streaming.operators.stateful.transformwithstate.StateStoreColumnFamilySchemaUtils
3030
import org.apache.spark.sql.execution.streaming.runtime.StreamingCheckpointConstants.DIR_NAME_STATE
31+
import org.apache.spark.sql.internal.SQLConf
3132

3233
case class StatePartitionWriterColumnFamilyInfo(
3334
schema: StateStoreColFamilySchema,
3435
// set this to true if state variable is ListType in TransformWithState
3536
useMultipleValuesPerKey: Boolean = false)
37+
3638
/**
3739
* A writer that can directly write binary data to the streaming state store.
3840
*
@@ -52,11 +54,24 @@ class StatePartitionAllColumnFamiliesWriter(
5254
operatorId: Int,
5355
storeName: String,
5456
currentBatchId: Long,
55-
columnFamilyToSchemaMap: HashMap[String, StatePartitionWriterColumnFamilyInfo]) {
57+
columnFamilyToSchemaMap: Map[String, StatePartitionWriterColumnFamilyInfo],
58+
operatorName: String,
59+
schemaProviderOpt: Option[StateSchemaProvider],
60+
sqlConf: Map[String, String]) {
61+
62+
private def isJoinV3Operator(
63+
operatorName: String, sqlConf: Map[String, String]): Boolean = {
64+
operatorName ==
65+
StatefulOperatorsUtils.SYMMETRIC_HASH_JOIN_EXEC_OP_NAME &&
66+
sqlConf(SQLConf.STREAMING_JOIN_STATE_FORMAT_VERSION.key) == "3"
67+
}
5668
private val defaultSchema = {
5769
columnFamilyToSchemaMap.get(StateStore.DEFAULT_COL_FAMILY_NAME) match {
5870
case Some(info) => info.schema
5971
case None =>
72+
assert(isJoinV3Operator(operatorName, sqlConf),
73+
s"Please provide the schema of 'default' column family in StateStoreColFamilySchema" +
74+
s"for operator $operatorName")
6075
// Return a dummy StateStoreColFamilySchema if not found
6176
val placeholderSchema = columnFamilyToSchemaMap.head._2.schema
6277
StateStoreColFamilySchema(
@@ -68,7 +83,7 @@ class StatePartitionAllColumnFamiliesWriter(
6883
keyStateEncoderSpec = Option(NoPrefixKeyStateEncoderSpec(placeholderSchema.keySchema)))
6984
}
7085
}
71-
86+
private val useColumnFamilies = columnFamilyToSchemaMap.size > 1
7287
private val columnFamilyToKeySchemaLenMap: MapView[String, Int] =
7388
columnFamilyToSchemaMap.view.mapValues(_.schema.keySchema.length)
7489
private val columnFamilyToValueSchemaLenMap: MapView[String, Int] =
@@ -80,12 +95,12 @@ class StatePartitionAllColumnFamiliesWriter(
8095
operatorId, partitionId, storeName)
8196
val stateStoreProviderId = StateStoreProviderId(stateStoreId, UUID.randomUUID())
8297

83-
val useColumnFamilies = columnFamilyToSchemaMap.size > 1
98+
8499
val provider = StateStoreProvider.createAndInit(
85100
stateStoreProviderId, defaultSchema.keySchema, defaultSchema.valueSchema,
86101
defaultSchema.keyStateEncoderSpec.get,
87102
useColumnFamilies = useColumnFamilies, storeConf, hadoopConf,
88-
useMultipleValuesPerKey = false, stateSchemaProvider = None)
103+
useMultipleValuesPerKey = false, stateSchemaProvider = schemaProviderOpt)
89104
provider
90105
}
91106

@@ -101,7 +116,7 @@ class StatePartitionAllColumnFamiliesWriter(
101116
stateStoreCkptId = None,
102117
loadEmpty = true
103118
)
104-
if (columnFamilyToSchemaMap.size > 1) {
119+
if (useColumnFamilies) {
105120
columnFamilyToSchemaMap.foreach { pair =>
106121
val colFamilyName = pair._1
107122
val cfSchema = pair._2.schema
@@ -117,7 +132,7 @@ class StatePartitionAllColumnFamiliesWriter(
117132
cfSchema.keySchema,
118133
cfSchema.valueSchema,
119134
cfSchema.keyStateEncoderSpec.get,
120-
columnFamilyToSchemaMap(colFamilyName).useMultipleValuesPerKey,
135+
pair._2.useMultipleValuesPerKey,
121136
isInternal)
122137
}
123138
}

0 commit comments

Comments
 (0)