Skip to content

Commit 4d1c79f

Browse files
vinodkcdongjoon-hyun
authored andcommitted
[SPARK-54472][SQL] Add ORC read and write support for TIME type
### What changes were proposed in this pull request? This PR adds ORC serialization and deserialization support for Spark's TIME type. ### Why are the changes needed? TIME type currently lacks ORC support, preventing users from: - Reading/writing ORC files with TIME columns - Integrating TIME data with existing ORC-based data lakes - Preserving TIME precision in columnar storage ### Does this PR introduce _any_ user-facing change? Yes. Users can now: 1. Read ORC with TIME columns ```scala spark.read.format("orc").load("data.orc") // Returns DataFrame with TIME columns preserved ``` 2. Write DataFrames with TIME to ORC ```scala val df = spark.sql("SELECT TIME'14:30:45.123456' as shift_start") df.write.format("orc").save("output.orc") ``` ### Technical Details #### Storage Format ``` ORC Column: Physical Type: LONG (nanoseconds since midnight) Custom Attribute: spark.sql.catalyst.type = "time(<precision>)" Value Range: 0 to 86,399,999,999,999 ``` #### Precision Handling | Precision | Catalyst Type | ORC Attribute | Example Value | |-----------|---------------|---------------|---------------| | 0 (seconds) | `TimeType(0)` | `"time(0)"` | `12:34:56` | | 3 (millis) | `TimeType(3)` | `"time(3)"` | `12:34:56.123` | | 6 (micros) | `TimeType(6)` | `"time(6)"` | `12:34:56.123456` | ***Future Compatibility*** - Versioned via file metadata: Uses existing `org.apache.spark.version` for compatibility - Forward-compatible: If ORC adds native TIME type, can migrate based on version ### How was this patch tested? Added tests in `OrcQuerySuite` ### Was this patch authored or co-authored using generative AI tooling? Yes. Generated-by: Claude 3.5 Sonnet AI assistance was used for: - Code pattern analysis and design discussions - Implementation guidance following Spark conventions - Test case generation and organization - Documentation and examples Closes #53185 from vinodkc/br_time_orc_read_write. Authored-by: vinodkc <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 0fa72f7 commit 4d1c79f

File tree

6 files changed

+57
-6
lines changed

6 files changed

+57
-6
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,8 +126,8 @@ class OrcDeserializer(
126126
case IntegerType | _: YearMonthIntervalType => (ordinal, value) =>
127127
updater.setInt(ordinal, value.asInstanceOf[IntWritable].get)
128128

129-
case LongType | _: DayTimeIntervalType | _: TimestampNTZType => (ordinal, value) =>
130-
updater.setLong(ordinal, value.asInstanceOf[LongWritable].get)
129+
case LongType | _: DayTimeIntervalType | _: TimestampNTZType | _: TimeType =>
130+
(ordinal, value) => updater.setLong(ordinal, value.asInstanceOf[LongWritable].get)
131131

132132
case FloatType => (ordinal, value) =>
133133
updater.setFloat(ordinal, value.asInstanceOf[FloatWritable].get)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,6 @@ class OrcFileFormat
249249
override def supportDataType(dataType: DataType): Boolean = dataType match {
250250
case _: VariantType => false
251251

252-
case _: TimeType => false
253252
case _: AtomicType => true
254253

255254
case st: StructType => st.forall { f => supportDataType(f.dataType) }

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcSerializer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ class OrcSerializer(dataSchema: StructType) {
9898
}
9999

100100

101-
case LongType | _: DayTimeIntervalType | _: TimestampNTZType =>
101+
case LongType | _: DayTimeIntervalType | _: TimestampNTZType | _: TimeType =>
102102
if (reuseObj) {
103103
val result = new LongWritable()
104104
(getter, ordinal) =>

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,7 @@ object OrcUtils extends Logging {
282282
s"array<${getOrcSchemaString(a.elementType)}>"
283283
case m: MapType =>
284284
s"map<${getOrcSchemaString(m.keyType)},${getOrcSchemaString(m.valueType)}>"
285-
case _: DayTimeIntervalType | _: TimestampNTZType => LongType.catalogString
285+
case _: DayTimeIntervalType | _: TimestampNTZType | _: TimeType => LongType.catalogString
286286
case _: YearMonthIntervalType => IntegerType.catalogString
287287
case _ => dt.catalogString
288288
}
@@ -302,6 +302,10 @@ object OrcUtils extends Logging {
302302
val typeDesc = new TypeDescription(TypeDescription.Category.LONG)
303303
typeDesc.setAttribute(CATALYST_TYPE_ATTRIBUTE_NAME, n.typeName)
304304
Some(typeDesc)
305+
case tm: TimeType =>
306+
val typeDesc = new TypeDescription(TypeDescription.Category.LONG)
307+
typeDesc.setAttribute(CATALYST_TYPE_ATTRIBUTE_NAME, tm.typeName)
308+
Some(typeDesc)
305309
case t: TimestampType =>
306310
val typeDesc = new TypeDescription(TypeDescription.Category.TIMESTAMP)
307311
typeDesc.setAttribute(CATALYST_TYPE_ATTRIBUTE_NAME, t.typeName)

sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1247,7 +1247,7 @@ class FileBasedDataSourceSuite extends QueryTest
12471247
}
12481248

12491249
test("SPARK-51590: unsupported the TIME data types in data sources") {
1250-
val datasources = Seq("orc", "text")
1250+
val datasources = Seq("text")
12511251
Seq(true, false).foreach { useV1 =>
12521252
val useV1List = if (useV1) {
12531253
datasources.mkString(",")

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -899,6 +899,54 @@ abstract class OrcQuerySuite extends OrcQueryTest with SharedSparkSession {
899899
}
900900
}
901901
}
902+
903+
test("TIME type support for ORC format") {
904+
withTempPath { dir =>
905+
val path = dir.getCanonicalPath
906+
val df = spark.sql("""
907+
SELECT
908+
id,
909+
TIME'09:30:00' as morning,
910+
TIME'14:45:30.123456' as afternoon,
911+
TIME'23:59:59.999999' as end_of_day,
912+
TIME'00:00:00' as midnight,
913+
CASE WHEN id % 2 = 0 THEN TIME'12:30:00' ELSE NULL END as nullable_time
914+
FROM VALUES (1), (2), (3) AS t(id)
915+
""")
916+
917+
df.write.mode("overwrite").orc(path)
918+
val result = spark.read.orc(path)
919+
920+
Seq("morning", "afternoon", "end_of_day", "midnight", "nullable_time").foreach { col =>
921+
assert(result.schema(col).dataType == TimeType(6))
922+
}
923+
checkAnswer(result, df)
924+
}
925+
}
926+
927+
test("TIME type with different precisions in ORC") {
928+
withTempPath { dir =>
929+
val path = dir.getCanonicalPath
930+
val df = spark.sql("""
931+
SELECT
932+
CAST(TIME'12:34:56' AS TIME(0)) as time_p0,
933+
CAST(TIME'12:34:56.1' AS TIME(1)) as time_p1,
934+
CAST(TIME'12:34:56.12' AS TIME(2)) as time_p2,
935+
CAST(TIME'12:34:56.123' AS TIME(3)) as time_p3,
936+
CAST(TIME'12:34:56.1234' AS TIME(4)) as time_p4,
937+
CAST(TIME'12:34:56.12345' AS TIME(5)) as time_p5,
938+
CAST(TIME'12:34:56.123456' AS TIME(6)) as time_p6
939+
""")
940+
941+
df.write.mode("overwrite").orc(path)
942+
val result = spark.read.orc(path)
943+
944+
(0 to 6).foreach { p =>
945+
assert(result.schema(s"time_p$p").dataType == TimeType(p))
946+
}
947+
checkAnswer(result, df)
948+
}
949+
}
902950
}
903951

904952
class OrcV1QuerySuite extends OrcQuerySuite {

0 commit comments

Comments
 (0)