Skip to content

Commit 2329fec

Browse files
committed
Add Avro read and write support for TIME type
1 parent 021f89a commit 2329fec

File tree

6 files changed

+196
-16
lines changed

6 files changed

+196
-16
lines changed

connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import org.apache.avro.io.{DecoderFactory, EncoderFactory}
2929
import org.apache.spark.SparkException
3030
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
3131
import org.apache.spark.sql.avro.{functions => Fns}
32+
import org.apache.spark.sql.avro.functions.{from_avro, to_avro}
3233
import org.apache.spark.sql.execution.LocalTableScanExec
3334
import org.apache.spark.sql.functions.{col, lit, struct}
3435
import org.apache.spark.sql.internal.SQLConf
@@ -665,4 +666,81 @@ class AvroFunctionsSuite extends QueryTest with SharedSparkSession {
665666
checkAnswer(df.select(functions.schema_of_avro(avroMultiType)),
666667
Row("STRUCT<u: STRUCT<member0: INT, member1: STRING> NOT NULL>"))
667668
}
669+
670+
test("roundtrip in to_avro and from_avro - TIME type with different precisions") {
671+
val df = spark.sql("""
672+
SELECT
673+
TIME'12:34:56' as time_p0,
674+
TIME'12:34:56.1' as time_p1,
675+
TIME'12:34:56.12' as time_p2,
676+
TIME'12:34:56.123' as time_p3,
677+
TIME'12:34:56.1234' as time_p4,
678+
TIME'12:34:56.12345' as time_p5,
679+
TIME'12:34:56.123456' as time_p6
680+
""")
681+
682+
val precisions = Seq(0, 1, 2, 3, 4, 5, 6)
683+
precisions.foreach { p =>
684+
val fieldName = s"time_p$p"
685+
// Generate correct schema for each precision
686+
val avroTimeSchema = s"""
687+
|{
688+
| "type": "long",
689+
| "logicalType": "time-micros",
690+
| "spark.sql.catalyst.type": "time($p)"
691+
|}
692+
""".stripMargin
693+
694+
val avroDF = df.select(to_avro(col(fieldName)).as("avro"))
695+
val readBack = avroDF.select(from_avro($"avro", avroTimeSchema).as(fieldName))
696+
697+
checkAnswer(readBack, df.select(col(fieldName)))
698+
}
699+
}
700+
701+
test("roundtrip in to_avro and from_avro - TIME type in struct") {
702+
val df = spark.sql("""
703+
SELECT
704+
struct(
705+
TIME'09:00:00.123' as start,
706+
TIME'17:30:45.987654' as end,
707+
'Morning Shift' as description
708+
) as schedule
709+
""")
710+
711+
val avroStructDF = df.select(to_avro($"schedule").as("avro"))
712+
713+
val avroStructSchema = """
714+
|{
715+
| "type": "record",
716+
| "name": "schedule",
717+
| "fields": [
718+
| {
719+
| "name": "start",
720+
| "type": {
721+
| "type": "long",
722+
| "logicalType": "time-micros",
723+
| "spark.sql.catalyst.type": "time(3)"
724+
| }
725+
| },
726+
| {
727+
| "name": "end",
728+
| "type": {
729+
| "type": "long",
730+
| "logicalType": "time-micros",
731+
| "spark.sql.catalyst.type": "time(6)"
732+
| }
733+
| },
734+
| {
735+
| "name": "description",
736+
| "type": "string"
737+
| }
738+
| ]
739+
|}
740+
""".stripMargin
741+
742+
val readBack = avroStructDF.select(
743+
from_avro($"avro", avroStructSchema).as("schedule"))
744+
checkAnswer(readBack, df)
745+
}
668746
}

connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala

Lines changed: 90 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3192,21 +3192,6 @@ abstract class AvroSuite
31923192
}
31933193
}
31943194

3195-
test("SPARK-51590: unsupported the TIME data types in Avro") {
3196-
withTempDir { dir =>
3197-
val tempDir = new File(dir, "files").getCanonicalPath
3198-
checkError(
3199-
exception = intercept[AnalysisException] {
3200-
sql("select time'12:01:02' as t")
3201-
.write.format("avro").mode("overwrite").save(tempDir)
3202-
},
3203-
condition = "UNSUPPORTED_DATA_TYPE_FOR_DATASOURCE",
3204-
parameters = Map(
3205-
"columnName" -> "`t`",
3206-
"columnType" -> s"\"${TimeType().sql}\"",
3207-
"format" -> "Avro"))
3208-
}
3209-
}
32103195
}
32113196

32123197
class AvroV1Suite extends AvroSuite {
@@ -3407,4 +3392,94 @@ class AvroV2Suite extends AvroSuite with ExplainSuiteHelper {
34073392
}
34083393
}
34093394
}
3395+
3396+
test("TIME type read/write with Avro format") {
3397+
withTempPath { dir =>
3398+
// Test boundary values and NULL handling
3399+
val df = spark.sql("""
3400+
SELECT
3401+
TIME'00:00:00.123456' as midnight,
3402+
TIME'12:34:56.789012' as noon,
3403+
TIME'23:59:59.999999' as max_time,
3404+
CAST(NULL AS TIME) as null_time
3405+
""")
3406+
3407+
df.write.format("avro").save(dir.toString)
3408+
val readDf = spark.read.format("avro").load(dir.toString)
3409+
3410+
checkAnswer(readDf, df)
3411+
3412+
// Verify schema - all should be default TimeType(6)
3413+
readDf.schema.fields.foreach { field =>
3414+
assert(field.dataType == TimeType(), s"Field ${field.name} should be TimeType")
3415+
}
3416+
3417+
// Verify boundary values
3418+
val row = readDf.collect()(0)
3419+
assert(row.getAs[java.time.LocalTime]("midnight") ==
3420+
java.time.LocalTime.of(0, 0, 0, 123456000))
3421+
assert(row.getAs[java.time.LocalTime]("noon") ==
3422+
java.time.LocalTime.of(12, 34, 56, 789012000))
3423+
assert(row.getAs[java.time.LocalTime]("max_time") ==
3424+
java.time.LocalTime.of(23, 59, 59, 999999000))
3425+
assert(row.get(3) == null, "NULL time should be preserved")
3426+
}
3427+
}
3428+
3429+
test("TIME type in nested structures in Avro") {
3430+
withTempPath { dir =>
3431+
// Test TIME type in arrays and structs with different precisions
3432+
val df = spark.sql("""
3433+
SELECT
3434+
named_struct('start', CAST(TIME'09:00:00.123' AS TIME(3)),
3435+
'end', CAST(TIME'17:30:45.654321' AS TIME(6))) as schedule,
3436+
array(TIME'08:15:30.111222', TIME'12:45:15.333444', TIME'16:20:50.555666') as checkpoints
3437+
""")
3438+
3439+
df.write.format("avro").save(dir.toString)
3440+
val readDf = spark.read.format("avro").load(dir.toString)
3441+
3442+
checkAnswer(readDf, df)
3443+
}
3444+
}
3445+
3446+
test("TIME type precision metadata is preserved in Avro") {
3447+
withTempPath { dir =>
3448+
// Test all TIME precisions (0-6) with multiple columns
3449+
val df = spark.sql("""
3450+
SELECT
3451+
id,
3452+
CAST(TIME '12:34:56' AS TIME(0)) as time_p0,
3453+
CAST(TIME '12:34:56.1' AS TIME(1)) as time_p1,
3454+
CAST(TIME '12:34:56.12' AS TIME(2)) as time_p2,
3455+
CAST(TIME '12:34:56.123' AS TIME(3)) as time_p3,
3456+
CAST(TIME '12:34:56.1234' AS TIME(4)) as time_p4,
3457+
CAST(TIME '12:34:56.12345' AS TIME(5)) as time_p5,
3458+
CAST(TIME '12:34:56.123456' AS TIME(6)) as time_p6,
3459+
description
3460+
FROM VALUES
3461+
(1, 'Morning'),
3462+
(2, 'Evening')
3463+
AS t(id, description)
3464+
""")
3465+
3466+
// Verify original schema has all precisions
3467+
(0 to 6).foreach { p =>
3468+
assert(df.schema(s"time_p$p").dataType == TimeType(p))
3469+
}
3470+
3471+
// Write to Avro and read back
3472+
df.write.format("avro").save(dir.toString)
3473+
val readDf = spark.read.format("avro").load(dir.toString)
3474+
3475+
// Verify ALL precisions are preserved after round-trip
3476+
(0 to 6).foreach { p =>
3477+
assert(readDf.schema(s"time_p$p").dataType == TimeType(p),
3478+
s"Precision $p should be preserved")
3479+
}
3480+
3481+
// Verify data integrity
3482+
checkAnswer(readDf, df)
3483+
}
3484+
}
34103485
}

sql/core/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,14 @@ private[sql] class AvroDeserializer(
203203
s"Avro logical type $other cannot be converted to SQL type ${TimestampNTZType.sql}.")
204204
}
205205

206+
case (LONG, _: TimeType) => avroType.getLogicalType match {
207+
case _: LogicalTypes.TimeMicros => (updater, ordinal, value) =>
208+
val micros = value.asInstanceOf[Long]
209+
updater.setLong(ordinal, micros)
210+
case other => throw new IncompatibleSchemaException(errorPrefix +
211+
s"Avro logical type $other cannot be converted to SQL type ${TimeType().sql}.")
212+
}
213+
206214
// Before we upgrade Avro to 1.8 for logical type support, spark-avro converts Long to Date.
207215
// For backward compatibility, we still keep this conversion.
208216
case (LONG, DateType) => (updater, ordinal, value) =>

sql/core/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,12 @@ private[sql] class AvroSerializer(
191191
s"SQL type ${TimestampNTZType.sql} cannot be converted to Avro logical type $other")
192192
}
193193

194+
case (_: TimeType, LONG) => avroType.getLogicalType match {
195+
case _: LogicalTypes.TimeMicros => (getter, ordinal) => getter.getLong(ordinal)
196+
case other => throw new IncompatibleSchemaException(errorPrefix +
197+
s"SQL type ${TimeType().sql} cannot be converted to Avro logical type $other")
198+
}
199+
194200
case (ArrayType(et, containsNull), ARRAY) =>
195201
val elementConverter = newConverter(
196202
et, resolveNullableType(avroType.getElementType, containsNull),

sql/core/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,6 @@ private[sql] object AvroUtils extends Logging {
8383
def supportsDataType(dataType: DataType): Boolean = dataType match {
8484
case _: VariantType => false
8585

86-
case _: TimeType => false
8786
case _: AtomicType => true
8887

8988
case st: StructType => st.forall { f => supportsDataType(f.dataType) }

sql/core/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,16 @@ object SchemaConverters extends Logging {
131131
case _: TimestampMillis | _: TimestampMicros => SchemaType(TimestampType, nullable = false)
132132
case _: LocalTimestampMillis | _: LocalTimestampMicros =>
133133
SchemaType(TimestampNTZType, nullable = false)
134+
case _: LogicalTypes.TimeMicros =>
135+
// Falls back to default precision for backward compatibility with
136+
// Avro files written by external tools.
137+
val catalystTypeAttrValue = avroSchema.getProp(CATALYST_TYPE_PROP_NAME)
138+
val timeType = if (catalystTypeAttrValue == null) {
139+
TimeType(TimeType.MICROS_PRECISION)
140+
} else {
141+
CatalystSqlParser.parseDataType(catalystTypeAttrValue).asInstanceOf[TimeType]
142+
}
143+
SchemaType(timeType, nullable = false)
134144
case _ =>
135145
val catalystTypeAttrValue = avroSchema.getProp(CATALYST_TYPE_PROP_NAME)
136146
val catalystType = if (catalystTypeAttrValue == null) {
@@ -324,6 +334,10 @@ object SchemaConverters extends Logging {
324334
LogicalTypes.timestampMicros().addToSchema(builder.longType())
325335
case TimestampNTZType =>
326336
LogicalTypes.localTimestampMicros().addToSchema(builder.longType())
337+
case t: TimeType =>
338+
val timeSchema = LogicalTypes.timeMicros().addToSchema(builder.longType())
339+
timeSchema.addProp(CATALYST_TYPE_PROP_NAME, t.typeName)
340+
timeSchema
327341

328342
case FloatType => builder.floatType()
329343
case DoubleType => builder.doubleType()

0 commit comments

Comments
 (0)