78
78
import org.apache.parquet.io.LocalOutputFile;
79
79
import org.apache.parquet.io.MessageColumnIO;
80
80
import org.apache.parquet.io.RecordReader;
81
+ import org.apache.parquet.schema.LogicalTypeAnnotation;
82
+ import org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit;
81
83
import org.apache.parquet.schema.MessageType;
82
- import org.apache.parquet.schema.MessageTypeParser ;
84
+ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName ;
83
85
import org.apache.parquet.schema.Type;
86
+ import org.apache.parquet.schema.Types.MessageTypeBuilder;
84
87
import org.xml.sax.InputSource;
85
88
import org.xml.sax.XMLReader;
86
89
import ucar.ma2.*;
@@ -16089,52 +16092,71 @@ public void readParquet(
16089
16092
}
16090
16093
}
16091
16094
16095
+ private boolean isTimeColumn(int col) {
16096
+ return "time".equalsIgnoreCase(getColumnName(col))
16097
+ && Calendar2.SECONDS_SINCE_1970.equals(columnAttributes.get(col).getString("units"));
16098
+ }
16099
+
16092
16100
private MessageType getParquetSchemaForTable() {
16093
- String schemaProto = "message m {" ;
16101
+ MessageTypeBuilder schemaBuilder = org.apache.parquet.schema.Types.buildMessage() ;
16094
16102
for (int j = 0; j < nColumns(); j++) {
16095
- String schemaType = "String";
16103
+ String columnName = getColumnName(j);
16104
+ if (isTimeColumn(j)) {
16105
+ schemaBuilder
16106
+ .optional(PrimitiveTypeName.INT64)
16107
+ .as(LogicalTypeAnnotation.timestampType(true, TimeUnit.MILLIS))
16108
+ .named(columnName);
16109
+ continue;
16110
+ }
16096
16111
switch (getColumn(j).elementType()) {
16097
16112
case BYTE:
16098
- schemaType = " INT32" ;
16113
+ schemaBuilder.optional(PrimitiveTypeName. INT32).named(columnName) ;
16099
16114
break;
16100
16115
case SHORT:
16101
- schemaType = " INT32" ;
16116
+ schemaBuilder.optional(PrimitiveTypeName. INT32).named(columnName) ;
16102
16117
break;
16103
16118
case CHAR:
16104
- schemaType = "BINARY";
16119
+ schemaBuilder
16120
+ .optional(PrimitiveTypeName.BINARY)
16121
+ .as(LogicalTypeAnnotation.stringType())
16122
+ .named(columnName);
16105
16123
break;
16106
16124
case INT:
16107
- schemaType = " INT32" ;
16125
+ schemaBuilder.optional(PrimitiveTypeName. INT32).named(columnName) ;
16108
16126
break;
16109
16127
case LONG:
16110
- schemaType = " INT64" ;
16128
+ schemaBuilder.optional(PrimitiveTypeName. INT64).named(columnName) ;
16111
16129
break;
16112
16130
case FLOAT:
16113
- schemaType = " FLOAT" ;
16131
+ schemaBuilder.optional(PrimitiveTypeName. FLOAT).named(columnName) ;
16114
16132
break;
16115
16133
case DOUBLE:
16116
- schemaType = " DOUBLE" ;
16134
+ schemaBuilder.optional(PrimitiveTypeName. DOUBLE).named(columnName) ;
16117
16135
break;
16118
16136
case STRING:
16119
- schemaType = "BINARY";
16137
+ schemaBuilder
16138
+ .optional(PrimitiveTypeName.BINARY)
16139
+ .as(LogicalTypeAnnotation.stringType())
16140
+ .named(columnName);
16120
16141
break;
16121
16142
case UBYTE:
16122
- schemaType = " INT32" ;
16143
+ schemaBuilder.optional(PrimitiveTypeName. INT32).named(columnName) ;
16123
16144
break;
16124
16145
case USHORT:
16125
- schemaType = " INT32" ;
16146
+ schemaBuilder.optional(PrimitiveTypeName. INT32).named(columnName) ;
16126
16147
break;
16127
16148
case UINT:
16128
- schemaType = " INT64" ;
16149
+ schemaBuilder.optional(PrimitiveTypeName. INT64).named(columnName) ;
16129
16150
break;
16130
16151
case ULONG:
16131
- schemaType = "DOUBLE";
16152
+ schemaBuilder.optional(PrimitiveTypeName.DOUBLE).named(columnName);
16153
+ break;
16154
+ case BOOLEAN:
16155
+ schemaBuilder.optional(PrimitiveTypeName.BOOLEAN).named(columnName);
16132
16156
break;
16133
16157
}
16134
- schemaProto += " optional " + schemaType + " " + getColumnName(j) + ";\n";
16135
16158
}
16136
- schemaProto += "}";
16137
- return MessageTypeParser.parseMessageType(schemaProto);
16159
+ return schemaBuilder.named("m");
16138
16160
}
16139
16161
16140
16162
private void addMetadata(Map<String, String> metadata, Attributes attributes, String prefix) {
@@ -16148,7 +16170,12 @@ private void addMetadata(Map<String, String> metadata, Attributes attributes, St
16148
16170
if (tValue == null || tValue.size() == 0 || tValue.toString().length() == 0) {
16149
16171
continue; // do nothing
16150
16172
}
16151
- metadata.put(prefix + tName, tValue.toCSVString());
16173
+ if ("time_".equalsIgnoreCase(prefix)
16174
+ && Calendar2.SECONDS_SINCE_1970.equals(attributes.getString(tName))) {
16175
+ metadata.put(prefix + tName, Calendar2.MILLISECONDS_SINCE_1970);
16176
+ } else {
16177
+ metadata.put(prefix + tName, tValue.toCSVString());
16178
+ }
16152
16179
}
16153
16180
}
16154
16181
@@ -16188,7 +16215,11 @@ public void writeParquet(String fullFileName, boolean fullMetadata) throws Excep
16188
16215
columnUnits += ",";
16189
16216
}
16190
16217
columnNames += getColumnName(col);
16191
- columnUnits += colAttributes.getString("units");
16218
+ if (isTimeColumn(col)) {
16219
+ columnUnits += Calendar2.MILLISECONDS_SINCE_1970;
16220
+ } else {
16221
+ columnUnits += colAttributes.getString("units");
16222
+ }
16192
16223
}
16193
16224
metadata.put("column_names", columnNames);
16194
16225
metadata.put("column_units", columnUnits);
@@ -16208,7 +16239,12 @@ public void writeParquet(String fullFileName, boolean fullMetadata) throws Excep
16208
16239
for (int row = 0; row < nRows(); row++) {
16209
16240
ArrayList<PAOne> record = new ArrayList<>();
16210
16241
for (int j = 0; j < nColumns(); j++) {
16211
- record.add(getPAOneData(j, row));
16242
+ if (isTimeColumn(j)) {
16243
+ // Convert from seconds since epoch to millis since epoch.
16244
+ record.add(getPAOneData(j, row).multiply(PAOne.fromInt(1000)));
16245
+ } else {
16246
+ record.add(getPAOneData(j, row));
16247
+ }
16212
16248
}
16213
16249
writer.write(record);
16214
16250
}
0 commit comments