diff --git a/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala index 2f5b9475a750..5652545ea567 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -25,7 +25,8 @@ import scala.jdk.CollectionConverters._ import scala.math.BigDecimal.RoundingMode import scala.util.control.NonFatal -import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} +import com.fasterxml.jackson.core.JsonParser +import com.fasterxml.jackson.databind.{DeserializationContext, DeserializationFeature, JsonDeserializer, JsonNode, ObjectMapper} import com.fasterxml.jackson.databind.annotation.JsonDeserialize import com.fasterxml.jackson.module.scala.{ClassTagExtensions, DefaultScalaModule} import org.json4s._ @@ -191,7 +192,19 @@ class StreamingQueryProgress private[spark] ( ("stateOperators" -> JArray(stateOperators.map(_.jsonValue).toList)) ~ ("sources" -> JArray(sources.map(_.jsonValue).toList)) ~ ("sink" -> sink.jsonValue) ~ - ("observedMetrics" -> safeMapToJValue[Row](observedMetrics, (_, row) => row.jsonValue)) + ("observedMetrics" -> { + // TODO: SPARK-54391 + // In Spark connect, the observedMetrics is serialized but is not deserialized properly when + // being sent back to the client and the schema is null. So calling row.jsonValue will throw + // an exception so we need to catch the exception and return JNothing. + // This is because the Row.jsonValue method is a one way method and there is no reverse + // method to convert the JSON back to a Row. + try { + safeMapToJValue[Row](observedMetrics, (_, row) => row.jsonValue) + } catch { + case NonFatal(e) => JNothing + } + }) } } @@ -210,6 +223,19 @@ private[spark] object StreamingQueryProgress { mapper.readValue[StreamingQueryProgress](json) } +// SPARK-54390: Custom deserializer that converts JSON objects to strings for offset fields +private class ObjectToStringDeserializer extends JsonDeserializer[String] { + override def deserialize(parser: JsonParser, context: DeserializationContext): String = { + val node: JsonNode = parser.readValueAsTree() + if (node.isTextual) { + node.asText() + } else { + // Convert JSON object/array to string representation + node.toString + } + } +} + /** * Information about progress made for a source in the execution of a [[StreamingQuery]] during a * trigger. See [[StreamingQueryProgress]] for more information. @@ -233,12 +259,19 @@ private[spark] object StreamingQueryProgress { @Evolving class SourceProgress protected[spark] ( val description: String, + // SPARK-54390: Use a custom deserializer to convert the JSON object to a string. + @JsonDeserialize(using = classOf[ObjectToStringDeserializer]) val startOffset: String, + @JsonDeserialize(using = classOf[ObjectToStringDeserializer]) val endOffset: String, + @JsonDeserialize(using = classOf[ObjectToStringDeserializer]) val latestOffset: String, val numInputRows: Long, - val inputRowsPerSecond: Double, - val processedRowsPerSecond: Double, + // The NaN is used in deserialization to indicate the value was not set. + // The NaN is then used to not output this field in the JSON. + // In Spark connect, we need to ensure that the default value is Double.NaN instead of 0.0. + val inputRowsPerSecond: Double = Double.NaN, + val processedRowsPerSecond: Double = Double.NaN, val metrics: ju.Map[String, String] = Map[String, String]().asJava) extends Serializable { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index 4eabc82281e1..645dbef7abfd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -21,6 +21,7 @@ import java.util.UUID import scala.collection.mutable +import org.json4s.jackson.JsonMethods.{compact, parse, render} import org.scalactic.{Equality, TolerantNumerics} import org.scalatest.BeforeAndAfter import org.scalatest.concurrent.PatienceConfiguration.Timeout @@ -286,6 +287,12 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { ) } + private def removeFieldFromJson(jsonString: String, fieldName: String): String = { + val jv = parse(jsonString, useBigDecimalForDouble = true) + val removed = jv.removeField { case (name, _) => name == fieldName } + compact(render(removed)) + } + test("QueryProgressEvent serialization") { def testSerialization(event: QueryProgressEvent): Unit = { import scala.jdk.CollectionConverters._ @@ -294,9 +301,24 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { assert(newEvent.progress.json === event.progress.json) // json as a proxy for equality assert(newEvent.progress.durationMs.asScala === event.progress.durationMs.asScala) assert(newEvent.progress.eventTime.asScala === event.progress.eventTime.asScala) + + // Verify we can get the event back from the JSON string, this is important for Spark Connect + // and the StreamingQueryListenerBus. This is the method that is used to deserialize the event + // in StreamingQueryListenerBus.queryEventHandler + val eventFromNewEvent = QueryProgressEvent.fromJson(newEvent.json) + // TODO: Remove after SC-206585 is fixed + // We remove the observedMetrics field because it is not serialized properly when being + // removed from the listener bus, so this test is to verify that everything expect the + // observedMetrics field is equal in the JSON string + val eventWithoutObservedMetrics = removeFieldFromJson(event.progress.json, "observedMetrics") + assert(eventFromNewEvent.progress.json === eventWithoutObservedMetrics) } testSerialization(new QueryProgressEvent(StreamingQueryStatusAndProgressSuite.testProgress1)) testSerialization(new QueryProgressEvent(StreamingQueryStatusAndProgressSuite.testProgress2)) + testSerialization(new QueryProgressEvent(StreamingQueryStatusAndProgressSuite.testProgress3)) + testSerialization(new QueryProgressEvent(StreamingQueryStatusAndProgressSuite.testProgress4)) + testSerialization(new QueryProgressEvent(StreamingQueryStatusAndProgressSuite.testProgress5)) + testSerialization(new QueryProgressEvent(StreamingQueryStatusAndProgressSuite.testProgress6)) } test("QueryTerminatedEvent serialization") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala index 9c1e16460879..5b692e4c42c0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala @@ -169,21 +169,148 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually wi |} """.stripMargin.trim) assert(compact(parse(json2)) === testProgress2.json) + + val json5 = testProgress5.prettyJson + assertJson( + json5, + s""" + |{ + | "id" : "${testProgress5.id.toString}", + | "runId" : "${testProgress5.runId.toString}", + | "name" : null, + | "timestamp" : "2025-08-22T00:00:00.111Z", + | "batchId" : 97, + | "batchDuration" : 12, + | "numInputRows" : 201, + | "inputRowsPerSecond" : 20.1, + | "processedRowsPerSecond" : 20.1, + | "stateOperators" : [ ], + | "sources" : [ { + | "description" : "kafka", + | "startOffset" : { + | "topic" : { + | "0" : 123 + | } + | }, + | "endOffset" : { + | "topic" : { + | "0" : 456 + | } + | }, + | "latestOffset" : { + | "topic" : { + | "0" : 789 + | } + | }, + | "numInputRows" : 100, + | "inputRowsPerSecond" : 10.0, + | "processedRowsPerSecond" : 10.0 + | }, { + | "description" : "kinesis", + | "startOffset" : [ { + | "shard" : { + | "stream" : "stream1", + | "shardId" : "shard1" + | }, + | "firstSeqNum" : null, + | "lastSeqNum" : "123", + | "closed" : false, + | "msBehindLatest" : null, + | "lastRecordSeqNum" : null + | } ], + | "endOffset" : [ { + | "shard" : { + | "stream" : "stream1", + | "shardId" : "shard1" + | }, + | "firstSeqNum" : null, + | "lastSeqNum" : "456", + | "closed" : false, + | "msBehindLatest" : null, + | "lastRecordSeqNum" : null + | } ], + | "latestOffset" : [ { + | "shard" : { + | "stream" : "stream1", + | "shardId" : "shard1" + | }, + | "firstSeqNum" : null, + | "lastSeqNum" : "789", + | "closed" : false, + | "msBehindLatest" : null, + | "lastRecordSeqNum" : null + | } ], + | "numInputRows" : 101, + | "inputRowsPerSecond" : 10.1, + | "processedRowsPerSecond" : 10.1 + | } ], + | "sink" : { + | "description" : "sink", + | "numOutputRows" : -1 + | } + |} + """.stripMargin.trim) + assert(compact(parse(json5)) === testProgress5.json) + + val json6 = testProgress6.prettyJson + assertJson( + json6, + s""" + |{ + | "id" : "${testProgress6.id.toString}", + | "runId" : "${testProgress6.runId.toString}", + | "name" : "myName", + | "timestamp" : "2025-09-19T00:00:00.111Z", + | "batchId" : 97, + | "batchDuration" : 12, + | "numInputRows" : 1001, + | "stateOperators" : [ ], + | "sources" : [ { + | "description" : "kafka", + | "startOffset" : 1000, + | "endOffset" : 2000, + | "latestOffset" : 3000, + | "numInputRows" : 1001 + | } ], + | "sink" : { + | "description" : "sink", + | "numOutputRows" : -1 + | } + |} + """.stripMargin.trim) + assert(compact(parse(json6)) === testProgress6.json) } test("StreamingQueryProgress - json") { assert(compact(parse(testProgress1.json)) === testProgress1.json) assert(compact(parse(testProgress2.json)) === testProgress2.json) assert(compact(parse(testProgress3.json)) === testProgress3.json) + assert(compact(parse(testProgress4.json, useBigDecimalForDouble = true)) === testProgress4.json) + assert(compact(parse(testProgress5.json)) === testProgress5.json) + assert(compact(parse(testProgress6.json)) === testProgress6.json) + assert(compact(parse(testProgress7.json)) === testProgress7.json) } test("StreamingQueryProgress - toString") { assert(testProgress1.toString === testProgress1.prettyJson) assert(testProgress2.toString === testProgress2.prettyJson) + assert(testProgress3.toString === testProgress3.prettyJson) + assert(testProgress4.toString === testProgress4.prettyJson) + assert(testProgress5.toString === testProgress5.prettyJson) + assert(testProgress6.toString === testProgress6.prettyJson) + assert(testProgress7.toString === testProgress7.prettyJson) } test("StreamingQueryProgress - jsonString and fromJson") { - Seq(testProgress1, testProgress2).foreach { input => + Seq( + testProgress1, + testProgress2, + testProgress3, + testProgress4, + testProgress5, + testProgress6, + testProgress7 + ).foreach { input => val jsonString = StreamingQueryProgress.jsonString(input) val result = StreamingQueryProgress.fromJson(jsonString) assert(input.id == result.id) @@ -221,7 +348,11 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually wi } else { assert(s1.inputRowsPerSecond == s2.inputRowsPerSecond) } - assert(s1.processedRowsPerSecond == s2.processedRowsPerSecond) + if (s1.processedRowsPerSecond.isNaN) { + assert(s2.processedRowsPerSecond.isNaN) + } else { + assert(s1.processedRowsPerSecond == s2.processedRowsPerSecond) + } assert(s1.metrics == s2.metrics) } @@ -232,10 +363,14 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually wi } val resultObservedMetrics = result.observedMetrics - assert(input.observedMetrics.size() == resultObservedMetrics.size()) - assert(input.observedMetrics.keySet() == resultObservedMetrics.keySet()) - input.observedMetrics.entrySet().forEach { e => - assert(e.getValue == resultObservedMetrics.get(e.getKey)) + if (resultObservedMetrics != null) { + assert(input.observedMetrics.size() == resultObservedMetrics.size()) + assert(input.observedMetrics.keySet() == resultObservedMetrics.keySet()) + input.observedMetrics.entrySet().forEach { e => + assert(e.getValue == resultObservedMetrics.get(e.getKey)) + } + } else { + assert(input.observedMetrics == null) } } } @@ -437,8 +572,8 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually wi } test("SPARK-53690: avgOffsetsBehindLatest should never be in scientific notation") { - val progress = testProgress5.jsonValue - val progressPretty = testProgress5.prettyJson + val progress = testProgress7.jsonValue + val progressPretty = testProgress7.prettyJson // Actual values val avgOffsetsBehindLatest: Double = 2.70941269E8 @@ -465,8 +600,8 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually wi progressPretty shouldBe s""" |{ - | "id" : "${testProgress5.id.toString}", - | "runId" : "${testProgress5.runId.toString}", + | "id" : "${testProgress7.id.toString}", + | "runId" : "${testProgress7.runId.toString}", | "name" : "KafkaMetricsTest", | "timestamp" : "2025-09-23T06:00:00.000Z", | "batchId" : 1250, @@ -680,6 +815,104 @@ object StreamingQueryStatusAndProgressSuite { ) val testProgress5 = new StreamingQueryProgress( + id = UUID.randomUUID, + runId = UUID.randomUUID, + name = null, // should not be present in the json + timestamp = "2025-08-22T00:00:00.111Z", + batchId = 97L, + batchDuration = 12L, + durationMs = null, + // empty maps should be handled correctly + eventTime = null, + stateOperators = Array(), + sources = Array( + new SourceProgress( + description = "kafka", + startOffset = """{"topic":{"0":123}}""", + endOffset = """{"topic":{"0":456}}""", + latestOffset = """{"topic":{"0":789}}""", + numInputRows = 100, + inputRowsPerSecond = 10.0, + processedRowsPerSecond = 10.0 + ), + new SourceProgress( + description = "kinesis", + startOffset = + """ + |[{ + | "shard": { + | "stream": "stream1", + | "shardId": "shard1" + | }, + | "firstSeqNum": null, + | "lastSeqNum": "123", + | "closed": false, + | "msBehindLatest": null, + | "lastRecordSeqNum": null + |}] + """.stripMargin, + endOffset = + """ + |[{ + | "shard": { + | "stream": "stream1", + | "shardId": "shard1" + | }, + | "firstSeqNum": null, + | "lastSeqNum": "456", + | "closed": false, + | "msBehindLatest": null, + | "lastRecordSeqNum": null + |}] + """.stripMargin, + latestOffset = + """ + |[{ + | "shard": { + | "stream": "stream1", + | "shardId": "shard1" + | }, + | "firstSeqNum": null, + | "lastSeqNum": "789", + | "closed": false, + | "msBehindLatest": null, + | "lastRecordSeqNum": null + |}] + """.stripMargin, + numInputRows = 101, + inputRowsPerSecond = 10.1, + processedRowsPerSecond = 10.1 + ) + ), + sink = SinkProgress("sink", None), + observedMetrics = new java.util.HashMap(Map().asJava) + ) + + val testProgress6 = new StreamingQueryProgress( + id = UUID.randomUUID, + runId = UUID.randomUUID, + name = "myName", + timestamp = "2025-09-19T00:00:00.111Z", + batchId = 97L, + batchDuration = 12L, + durationMs = null, + eventTime = null, + stateOperators = Array(), + sources = Array(new SourceProgress( + description = "kafka", + startOffset = "1000", + endOffset = "2000", + latestOffset = "3000", + numInputRows = 1001 + // inputRowsPerSecond and processedRowsPerSecond should be Double.NaN + // and not present in the json + ) + ), + sink = SinkProgress("sink", None), + observedMetrics = new java.util.HashMap(Map().asJava) + ) + + val testProgress7 = new StreamingQueryProgress( id = UUID.randomUUID, runId = UUID.randomUUID, name = "KafkaMetricsTest",