Skip to content

Commit 82c86fe

Browse files
committed
fix rebase error
1 parent 275130b commit 82c86fe

File tree

3 files changed

+18
-13
lines changed

3 files changed

+18
-13
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/utils/SchemaUtil.scala

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,16 @@ object SchemaUtil {
4444
}
4545
}
4646

47+
def getScanAllColumnFamiliesSchema(keySchema: StructType): StructType = {
48+
new StructType()
49+
// todo [SPARK-54443]: change keySchema to a more specific type after we
50+
// can extract partition key from keySchema
51+
.add("partition_key", keySchema)
52+
.add("key_bytes", BinaryType)
53+
.add("value_bytes", BinaryType)
54+
.add("column_family_name", StringType)
55+
}
56+
4757
def getSourceSchema(
4858
sourceOptions: StateSourceOptions,
4959
keySchema: StructType,
@@ -62,13 +72,7 @@ object SchemaUtil {
6272
.add("value", valueSchema)
6373
.add("partition_id", IntegerType)
6474
} else if (sourceOptions.internalOnlyReadAllColumnFamilies) {
65-
new StructType()
66-
// TODO [SPARK-54443]: change keySchema to a more specific type after we
67-
// can extract partition key from keySchema
68-
.add("partition_key", keySchema)
69-
.add("key_bytes", BinaryType)
70-
.add("value_bytes", BinaryType)
71-
.add("column_family_name", StringType)
75+
getScanAllColumnFamiliesSchema(keySchema)
7276
} else {
7377
new StructType()
7478
.add("key", keySchema)

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StatePartitionWriter.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,20 +53,19 @@ class StatePartitionAllColumnFamiliesWriter(
5353
storeName: String,
5454
currentBatchId: Long,
5555
columnFamilyToSchemaMap: HashMap[String, StatePartitionWriterColumnFamilyInfo]) {
56-
private val dummySchema: StructType =
57-
StructType(Array(StructField("__dummy__", NullType)))
5856
private val defaultSchema = {
5957
columnFamilyToSchemaMap.get(StateStore.DEFAULT_COL_FAMILY_NAME) match {
6058
case Some(info) => info.schema
6159
case None =>
6260
// Return a dummy StateStoreColFamilySchema if not found
61+
val placeholderSchema = columnFamilyToSchemaMap.head._2.schema
6362
StateStoreColFamilySchema(
6463
colFamilyName = "__dummy__",
6564
keySchemaId = 0,
66-
keySchema = dummySchema,
65+
keySchema = placeholderSchema.keySchema,
6766
valueSchemaId = 0,
68-
valueSchema = dummySchema,
69-
keyStateEncoderSpec = Option(NoPrefixKeyStateEncoderSpec(dummySchema)))
67+
valueSchema = placeholderSchema.valueSchema,
68+
keyStateEncoderSpec = Option(NoPrefixKeyStateEncoderSpec(placeholderSchema.keySchema)))
7069
}
7170
}
7271

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StatePartitionAllColumnFamiliesWriterSuite.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ import scala.collection.immutable.HashMap
2424
import org.apache.spark.TaskContext
2525
import org.apache.spark.sql.Row
2626
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
27-
import org.apache.spark.sql.execution.datasources.v2.state.{StateDataSourceTestBase, StateSourceOptions}
27+
import org.apache.spark.sql.execution.datasources.v2.state.{EventTimeTimerProcessor, ListStateTTLProcessor, MultiStateVarProcessor, RunningCountStatefulProcessorWithProcTimeTimer, StateDataSourceTestBase, StateSourceOptions}
28+
import org.apache.spark.sql.execution.datasources.v2.state.utils.SchemaUtil
29+
import org.apache.spark.sql.execution.streaming.operators.stateful.transformwithstate.timers.TimerStateUtils
2830
import org.apache.spark.sql.execution.streaming.runtime.{MemoryStream, StreamingQueryCheckpointMetadata}
2931
import org.apache.spark.sql.execution.streaming.utils.StreamingUtils
3032
import org.apache.spark.sql.functions.{col, timestamp_seconds}

0 commit comments

Comments
 (0)