diff --git a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala index 8c128d4c7ea6..a0b254e3153c 100644 --- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala +++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala @@ -29,6 +29,7 @@ import org.apache.avro.io.{DecoderFactory, EncoderFactory} import org.apache.spark.SparkException import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.avro.{functions => Fns} +import org.apache.spark.sql.avro.functions.{from_avro, to_avro} import org.apache.spark.sql.execution.LocalTableScanExec import org.apache.spark.sql.functions.{col, lit, struct} import org.apache.spark.sql.internal.SQLConf @@ -665,4 +666,81 @@ class AvroFunctionsSuite extends QueryTest with SharedSparkSession { checkAnswer(df.select(functions.schema_of_avro(avroMultiType)), Row("STRUCT NOT NULL>")) } + + test("roundtrip in to_avro and from_avro - TIME type with different precisions") { + val df = spark.sql(""" + SELECT + TIME'12:34:56' as time_p0, + TIME'12:34:56.1' as time_p1, + TIME'12:34:56.12' as time_p2, + TIME'12:34:56.123' as time_p3, + TIME'12:34:56.1234' as time_p4, + TIME'12:34:56.12345' as time_p5, + TIME'12:34:56.123456' as time_p6 + """) + + val precisions = Seq(0, 1, 2, 3, 4, 5, 6) + precisions.foreach { p => + val fieldName = s"time_p$p" + // Generate correct schema for each precision + val avroTimeSchema = s""" + |{ + | "type": "long", + | "logicalType": "time-micros", + | "spark.sql.catalyst.type": "time($p)" + |} + """.stripMargin + + val avroDF = df.select(to_avro(col(fieldName)).as("avro")) + val readBack = avroDF.select(from_avro($"avro", avroTimeSchema).as(fieldName)) + + checkAnswer(readBack, df.select(col(fieldName))) + } + } + + test("roundtrip in to_avro and from_avro - TIME type in struct") { + val df = spark.sql(""" + SELECT + struct( + TIME'09:00:00.123' as start, + TIME'17:30:45.987654' as end, + 'Morning Shift' as description + ) as schedule + """) + + val avroStructDF = df.select(to_avro($"schedule").as("avro")) + + val avroStructSchema = """ + |{ + | "type": "record", + | "name": "schedule", + | "fields": [ + | { + | "name": "start", + | "type": { + | "type": "long", + | "logicalType": "time-micros", + | "spark.sql.catalyst.type": "time(3)" + | } + | }, + | { + | "name": "end", + | "type": { + | "type": "long", + | "logicalType": "time-micros", + | "spark.sql.catalyst.type": "time(6)" + | } + | }, + | { + | "name": "description", + | "type": "string" + | } + | ] + |} + """.stripMargin + + val readBack = avroStructDF.select( + from_avro($"avro", avroStructSchema).as("schedule")) + checkAnswer(readBack, df) + } } diff --git a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index b0f510f3257e..cd881ec5b394 100644 --- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -3192,21 +3192,6 @@ abstract class AvroSuite } } - test("SPARK-51590: unsupported the TIME data types in Avro") { - withTempDir { dir => - val tempDir = new File(dir, "files").getCanonicalPath - checkError( - exception = intercept[AnalysisException] { - sql("select time'12:01:02' as t") - .write.format("avro").mode("overwrite").save(tempDir) - }, - condition = "UNSUPPORTED_DATA_TYPE_FOR_DATASOURCE", - parameters = Map( - "columnName" -> "`t`", - "columnType" -> s"\"${TimeType().sql}\"", - "format" -> "Avro")) - } - } } class AvroV1Suite extends AvroSuite { @@ -3407,4 +3392,94 @@ class AvroV2Suite extends AvroSuite with ExplainSuiteHelper { } } } + + test("TIME type read/write with Avro format") { + withTempPath { dir => + // Test boundary values and NULL handling + val df = spark.sql(""" + SELECT + TIME'00:00:00.123456' as midnight, + TIME'12:34:56.789012' as noon, + TIME'23:59:59.999999' as max_time, + CAST(NULL AS TIME) as null_time + """) + + df.write.format("avro").save(dir.toString) + val readDf = spark.read.format("avro").load(dir.toString) + + checkAnswer(readDf, df) + + // Verify schema - all should be default TimeType(6) + readDf.schema.fields.foreach { field => + assert(field.dataType == TimeType(), s"Field ${field.name} should be TimeType") + } + + // Verify boundary values + val row = readDf.collect()(0) + assert(row.getAs[java.time.LocalTime]("midnight") == + java.time.LocalTime.of(0, 0, 0, 123456000)) + assert(row.getAs[java.time.LocalTime]("noon") == + java.time.LocalTime.of(12, 34, 56, 789012000)) + assert(row.getAs[java.time.LocalTime]("max_time") == + java.time.LocalTime.of(23, 59, 59, 999999000)) + assert(row.get(3) == null, "NULL time should be preserved") + } + } + + test("TIME type in nested structures in Avro") { + withTempPath { dir => + // Test TIME type in arrays and structs with different precisions + val df = spark.sql(""" + SELECT + named_struct('start', CAST(TIME'09:00:00.123' AS TIME(3)), + 'end', CAST(TIME'17:30:45.654321' AS TIME(6))) as schedule, + array(TIME'08:15:30.111222', TIME'12:45:15.333444', TIME'16:20:50.555666') as checkpoints + """) + + df.write.format("avro").save(dir.toString) + val readDf = spark.read.format("avro").load(dir.toString) + + checkAnswer(readDf, df) + } + } + + test("TIME type precision metadata is preserved in Avro") { + withTempPath { dir => + // Test all TIME precisions (0-6) with multiple columns + val df = spark.sql(""" + SELECT + id, + CAST(TIME '12:34:56' AS TIME(0)) as time_p0, + CAST(TIME '12:34:56.1' AS TIME(1)) as time_p1, + CAST(TIME '12:34:56.12' AS TIME(2)) as time_p2, + CAST(TIME '12:34:56.123' AS TIME(3)) as time_p3, + CAST(TIME '12:34:56.1234' AS TIME(4)) as time_p4, + CAST(TIME '12:34:56.12345' AS TIME(5)) as time_p5, + CAST(TIME '12:34:56.123456' AS TIME(6)) as time_p6, + description + FROM VALUES + (1, 'Morning'), + (2, 'Evening') + AS t(id, description) + """) + + // Verify original schema has all precisions + (0 to 6).foreach { p => + assert(df.schema(s"time_p$p").dataType == TimeType(p)) + } + + // Write to Avro and read back + df.write.format("avro").save(dir.toString) + val readDf = spark.read.format("avro").load(dir.toString) + + // Verify ALL precisions are preserved after round-trip + (0 to 6).foreach { p => + assert(readDf.schema(s"time_p$p").dataType == TimeType(p), + s"Precision $p should be preserved") + } + + // Verify data integrity + checkAnswer(readDf, df) + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala index f66b5bd988c2..f61e6da8583e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala @@ -203,6 +203,14 @@ private[sql] class AvroDeserializer( s"Avro logical type $other cannot be converted to SQL type ${TimestampNTZType.sql}.") } + case (LONG, _: TimeType) => avroType.getLogicalType match { + case _: LogicalTypes.TimeMicros => (updater, ordinal, value) => + val micros = value.asInstanceOf[Long] + updater.setLong(ordinal, micros) + case other => throw new IncompatibleSchemaException(errorPrefix + + s"Avro logical type $other cannot be converted to SQL type ${TimeType().sql}.") + } + // Before we upgrade Avro to 1.8 for logical type support, spark-avro converts Long to Date. // For backward compatibility, we still keep this conversion. case (LONG, DateType) => (updater, ordinal, value) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala index f3c6e168d2f0..533aa6ee09af 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala @@ -191,6 +191,12 @@ private[sql] class AvroSerializer( s"SQL type ${TimestampNTZType.sql} cannot be converted to Avro logical type $other") } + case (_: TimeType, LONG) => avroType.getLogicalType match { + case _: LogicalTypes.TimeMicros => (getter, ordinal) => getter.getLong(ordinal) + case other => throw new IncompatibleSchemaException(errorPrefix + + s"SQL type ${TimeType().sql} cannot be converted to Avro logical type $other") + } + case (ArrayType(et, containsNull), ARRAY) => val elementConverter = newConverter( et, resolveNullableType(avroType.getElementType, containsNull), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala index c338bd428bbe..2f19a507527b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala @@ -83,7 +83,6 @@ private[sql] object AvroUtils extends Logging { def supportsDataType(dataType: DataType): Boolean = dataType match { case _: VariantType => false - case _: TimeType => false case _: AtomicType => true case st: StructType => st.forall { f => supportsDataType(f.dataType) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala b/sql/core/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala index b425f63d6a7e..590eaeac6008 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala @@ -131,6 +131,16 @@ object SchemaConverters extends Logging { case _: TimestampMillis | _: TimestampMicros => SchemaType(TimestampType, nullable = false) case _: LocalTimestampMillis | _: LocalTimestampMicros => SchemaType(TimestampNTZType, nullable = false) + case _: LogicalTypes.TimeMicros => + // Falls back to default precision for backward compatibility with + // Avro files written by external tools. + val catalystTypeAttrValue = avroSchema.getProp(CATALYST_TYPE_PROP_NAME) + val timeType = if (catalystTypeAttrValue == null) { + TimeType(TimeType.MICROS_PRECISION) + } else { + CatalystSqlParser.parseDataType(catalystTypeAttrValue).asInstanceOf[TimeType] + } + SchemaType(timeType, nullable = false) case _ => val catalystTypeAttrValue = avroSchema.getProp(CATALYST_TYPE_PROP_NAME) val catalystType = if (catalystTypeAttrValue == null) { @@ -324,6 +334,10 @@ object SchemaConverters extends Logging { LogicalTypes.timestampMicros().addToSchema(builder.longType()) case TimestampNTZType => LogicalTypes.localTimestampMicros().addToSchema(builder.longType()) + case t: TimeType => + val timeSchema = LogicalTypes.timeMicros().addToSchema(builder.longType()) + timeSchema.addProp(CATALYST_TYPE_PROP_NAME, t.typeName) + timeSchema case FloatType => builder.floatType() case DoubleType => builder.doubleType()