Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.avro.io.{DecoderFactory, EncoderFactory}
import org.apache.spark.SparkException
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.avro.{functions => Fns}
import org.apache.spark.sql.avro.functions.{from_avro, to_avro}
import org.apache.spark.sql.execution.LocalTableScanExec
import org.apache.spark.sql.functions.{col, lit, struct}
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -665,4 +666,81 @@ class AvroFunctionsSuite extends QueryTest with SharedSparkSession {
checkAnswer(df.select(functions.schema_of_avro(avroMultiType)),
Row("STRUCT<u: STRUCT<member0: INT, member1: STRING> NOT NULL>"))
}

test("roundtrip in to_avro and from_avro - TIME type with different precisions") {
val df = spark.sql("""
SELECT
TIME'12:34:56' as time_p0,
TIME'12:34:56.1' as time_p1,
TIME'12:34:56.12' as time_p2,
TIME'12:34:56.123' as time_p3,
TIME'12:34:56.1234' as time_p4,
TIME'12:34:56.12345' as time_p5,
TIME'12:34:56.123456' as time_p6
""")

val precisions = Seq(0, 1, 2, 3, 4, 5, 6)
precisions.foreach { p =>
val fieldName = s"time_p$p"
// Generate correct schema for each precision
val avroTimeSchema = s"""
|{
| "type": "long",
| "logicalType": "time-micros",
| "spark.sql.catalyst.type": "time($p)"
|}
""".stripMargin

val avroDF = df.select(to_avro(col(fieldName)).as("avro"))
val readBack = avroDF.select(from_avro($"avro", avroTimeSchema).as(fieldName))

checkAnswer(readBack, df.select(col(fieldName)))
}
}

test("roundtrip in to_avro and from_avro - TIME type in struct") {
val df = spark.sql("""
SELECT
struct(
TIME'09:00:00.123' as start,
TIME'17:30:45.987654' as end,
'Morning Shift' as description
) as schedule
""")

val avroStructDF = df.select(to_avro($"schedule").as("avro"))

val avroStructSchema = """
|{
| "type": "record",
| "name": "schedule",
| "fields": [
| {
| "name": "start",
| "type": {
| "type": "long",
| "logicalType": "time-micros",
| "spark.sql.catalyst.type": "time(3)"
| }
| },
| {
| "name": "end",
| "type": {
| "type": "long",
| "logicalType": "time-micros",
| "spark.sql.catalyst.type": "time(6)"
| }
| },
| {
| "name": "description",
| "type": "string"
| }
| ]
|}
""".stripMargin

val readBack = avroStructDF.select(
from_avro($"avro", avroStructSchema).as("schedule"))
checkAnswer(readBack, df)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3192,21 +3192,6 @@ abstract class AvroSuite
}
}

test("SPARK-51590: unsupported the TIME data types in Avro") {
withTempDir { dir =>
val tempDir = new File(dir, "files").getCanonicalPath
checkError(
exception = intercept[AnalysisException] {
sql("select time'12:01:02' as t")
.write.format("avro").mode("overwrite").save(tempDir)
},
condition = "UNSUPPORTED_DATA_TYPE_FOR_DATASOURCE",
parameters = Map(
"columnName" -> "`t`",
"columnType" -> s"\"${TimeType().sql}\"",
"format" -> "Avro"))
}
}
}

class AvroV1Suite extends AvroSuite {
Expand Down Expand Up @@ -3407,4 +3392,94 @@ class AvroV2Suite extends AvroSuite with ExplainSuiteHelper {
}
}
}

test("TIME type read/write with Avro format") {
withTempPath { dir =>
// Test boundary values and NULL handling
val df = spark.sql("""
SELECT
TIME'00:00:00.123456' as midnight,
TIME'12:34:56.789012' as noon,
TIME'23:59:59.999999' as max_time,
CAST(NULL AS TIME) as null_time
""")

df.write.format("avro").save(dir.toString)
val readDf = spark.read.format("avro").load(dir.toString)

checkAnswer(readDf, df)

// Verify schema - all should be default TimeType(6)
readDf.schema.fields.foreach { field =>
assert(field.dataType == TimeType(), s"Field ${field.name} should be TimeType")
}

// Verify boundary values
val row = readDf.collect()(0)
assert(row.getAs[java.time.LocalTime]("midnight") ==
java.time.LocalTime.of(0, 0, 0, 123456000))
assert(row.getAs[java.time.LocalTime]("noon") ==
java.time.LocalTime.of(12, 34, 56, 789012000))
assert(row.getAs[java.time.LocalTime]("max_time") ==
java.time.LocalTime.of(23, 59, 59, 999999000))
assert(row.get(3) == null, "NULL time should be preserved")
}
}

test("TIME type in nested structures in Avro") {
withTempPath { dir =>
// Test TIME type in arrays and structs with different precisions
val df = spark.sql("""
SELECT
named_struct('start', CAST(TIME'09:00:00.123' AS TIME(3)),
'end', CAST(TIME'17:30:45.654321' AS TIME(6))) as schedule,
array(TIME'08:15:30.111222', TIME'12:45:15.333444', TIME'16:20:50.555666') as checkpoints
""")

df.write.format("avro").save(dir.toString)
val readDf = spark.read.format("avro").load(dir.toString)

checkAnswer(readDf, df)
}
}

test("TIME type precision metadata is preserved in Avro") {
withTempPath { dir =>
// Test all TIME precisions (0-6) with multiple columns
val df = spark.sql("""
SELECT
id,
CAST(TIME '12:34:56' AS TIME(0)) as time_p0,
CAST(TIME '12:34:56.1' AS TIME(1)) as time_p1,
CAST(TIME '12:34:56.12' AS TIME(2)) as time_p2,
CAST(TIME '12:34:56.123' AS TIME(3)) as time_p3,
CAST(TIME '12:34:56.1234' AS TIME(4)) as time_p4,
CAST(TIME '12:34:56.12345' AS TIME(5)) as time_p5,
CAST(TIME '12:34:56.123456' AS TIME(6)) as time_p6,
description
FROM VALUES
(1, 'Morning'),
(2, 'Evening')
AS t(id, description)
""")

// Verify original schema has all precisions
(0 to 6).foreach { p =>
assert(df.schema(s"time_p$p").dataType == TimeType(p))
}

// Write to Avro and read back
df.write.format("avro").save(dir.toString)
val readDf = spark.read.format("avro").load(dir.toString)

// Verify ALL precisions are preserved after round-trip
(0 to 6).foreach { p =>
assert(readDf.schema(s"time_p$p").dataType == TimeType(p),
s"Precision $p should be preserved")
}

// Verify data integrity
checkAnswer(readDf, df)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,14 @@ private[sql] class AvroDeserializer(
s"Avro logical type $other cannot be converted to SQL type ${TimestampNTZType.sql}.")
}

case (LONG, _: TimeType) => avroType.getLogicalType match {
case _: LogicalTypes.TimeMicros => (updater, ordinal, value) =>
val micros = value.asInstanceOf[Long]
updater.setLong(ordinal, micros)
case other => throw new IncompatibleSchemaException(errorPrefix +
s"Avro logical type $other cannot be converted to SQL type ${TimeType().sql}.")
}

// Before we upgrade Avro to 1.8 for logical type support, spark-avro converts Long to Date.
// For backward compatibility, we still keep this conversion.
case (LONG, DateType) => (updater, ordinal, value) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,12 @@ private[sql] class AvroSerializer(
s"SQL type ${TimestampNTZType.sql} cannot be converted to Avro logical type $other")
}

case (_: TimeType, LONG) => avroType.getLogicalType match {
case _: LogicalTypes.TimeMicros => (getter, ordinal) => getter.getLong(ordinal)
case other => throw new IncompatibleSchemaException(errorPrefix +
s"SQL type ${TimeType().sql} cannot be converted to Avro logical type $other")
}

case (ArrayType(et, containsNull), ARRAY) =>
val elementConverter = newConverter(
et, resolveNullableType(avroType.getElementType, containsNull),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ private[sql] object AvroUtils extends Logging {
def supportsDataType(dataType: DataType): Boolean = dataType match {
case _: VariantType => false

case _: TimeType => false
case _: AtomicType => true

case st: StructType => st.forall { f => supportsDataType(f.dataType) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,16 @@ object SchemaConverters extends Logging {
case _: TimestampMillis | _: TimestampMicros => SchemaType(TimestampType, nullable = false)
case _: LocalTimestampMillis | _: LocalTimestampMicros =>
SchemaType(TimestampNTZType, nullable = false)
case _: LogicalTypes.TimeMicros =>
// Falls back to default precision for backward compatibility with
// Avro files written by external tools.
val catalystTypeAttrValue = avroSchema.getProp(CATALYST_TYPE_PROP_NAME)
val timeType = if (catalystTypeAttrValue == null) {
TimeType(TimeType.MICROS_PRECISION)
} else {
CatalystSqlParser.parseDataType(catalystTypeAttrValue).asInstanceOf[TimeType]
}
SchemaType(timeType, nullable = false)
case _ =>
val catalystTypeAttrValue = avroSchema.getProp(CATALYST_TYPE_PROP_NAME)
val catalystType = if (catalystTypeAttrValue == null) {
Expand Down Expand Up @@ -324,6 +334,10 @@ object SchemaConverters extends Logging {
LogicalTypes.timestampMicros().addToSchema(builder.longType())
case TimestampNTZType =>
LogicalTypes.localTimestampMicros().addToSchema(builder.longType())
case t: TimeType =>
val timeSchema = LogicalTypes.timeMicros().addToSchema(builder.longType())
timeSchema.addProp(CATALYST_TYPE_PROP_NAME, t.typeName)
timeSchema

case FloatType => builder.floatType()
case DoubleType => builder.doubleType()
Expand Down