Skip to content

Conversation

@vinodkc
Copy link
Contributor

@vinodkc vinodkc commented Nov 23, 2025

What changes were proposed in this pull request?

This PR adds Avro serialization and deserialization support for Spark's TIME type.

Why are the changes needed?

TIME type currently lacks Avro support, preventing users from:

  • Reading/writing Avro files with TIME columns
  • Using TIME with data exchange pipelines (Kafka, streaming)
  • Integrating TIME data with Avro-based systems
  • Preserving TIME precision in schema evolution

Does this PR introduce any user-facing change?

Yes. Users can now:

  1. Read Avro with TIME columns
spark.read.format("avro").load("data.avro")
// Returns DataFrame with TIME columns preserved
  1. Write DataFrames with TIME to Avro
val df = spark.sql("SELECT TIME'14:30:45.123456' as shift_start")
df.write.format("avro").save("output.avro")
  1. Use to_avro/from_avro functions with TIME
import org.apache.spark.sql.avro.functions.{to_avro, from_avro}

// Serialize TIME to Avro binary
val avroDF = df.select(to_avro($"shift_start").as("avro"))

// Deserialize Avro binary back to TIME (with precision metadata)
val schema = """
  {
    "type": "long",
    "logicalType": "time-micros",
    "spark.sql.catalyst.type": "time(3)"
  }
"""
val timeDF = avroDF.select(from_avro($"avro", schema).as("shift_start"))
  1. Use TIME in Avro-based streaming
// Kafka with Avro serialization
df.selectExpr("to_avro(struct(shift_start)) as value")
  .write
  .format("kafka")
  .save()

How was this patch tested?

Added tests in AvroSuite and AvroFunctionsSuite.scala

Also manually tested using

spark-shell --packages org.apache.spark:spark-avro_2.13:4.0.0

val df = spark.sql("SELECT TIME'14:30:45.123456' as shift_start")
import org.apache.spark.sql.avro.functions.{to_avro, from_avro}
val avroDF = df.select(to_avro($"shift_start").as("avro"))

// Deserialize Avro binary back to TIME (with precision metadata)
val schema = """
  {
    "type": "long",
    "logicalType": "time-micros",
    "spark.sql.catalyst.type": "time(3)"
  }
"""
val timeDF = avroDF.select(from_avro($"avro", schema).as("shift_start"))
timeDF.show
+---------------+
|    shift_start|
+---------------+
|14:30:45.123456|
+---------------+
timeDF.printSchema
root
 |-- shift_start: time(3) (nullable = true)

Was this patch authored or co-authored using generative AI tooling?

Yes.
Generated-by: Claude 3.5 Sonnet

AI assistance was used for:

  • Code pattern analysis and design discussions
  • Implementation guidance following Spark conventions
  • Test case generation and organization
  • Documentation and examples

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, @vinodkc .

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants