78
78
import org.apache.parquet.io.LocalOutputFile;
79
79
import org.apache.parquet.io.MessageColumnIO;
80
80
import org.apache.parquet.io.RecordReader;
81
+ import org.apache.parquet.schema.LogicalTypeAnnotation;
82
+ import org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit;
81
83
import org.apache.parquet.schema.MessageType;
82
- import org.apache.parquet.schema.MessageTypeParser ;
84
+ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName ;
83
85
import org.apache.parquet.schema.Type;
86
+ import org.apache.parquet.schema.Types.MessageTypeBuilder;
84
87
import org.xml.sax.InputSource;
85
88
import org.xml.sax.XMLReader;
86
89
import ucar.ma2.*;
@@ -588,7 +591,7 @@ private static interface WithColumnNames {
588
591
public static BitSet ncCFcc = null; // null=inactive, new BitSet() = active
589
592
590
593
/** An arrayList to hold 0 or more PrimitiveArray's with data. */
591
- protected ArrayList<PrimitiveArray> columns = new ArrayList();
594
+ protected ArrayList<PrimitiveArray> columns = new ArrayList<> ();
592
595
593
596
/** An arrayList to hold the column names. */
594
597
protected StringArray columnNames = new StringArray();
@@ -605,7 +608,7 @@ private static interface WithColumnNames {
605
608
* Although a HashTable is more appropriate for name=value pairs, this uses ArrayList to preserve
606
609
* the order of the attributes. This may be null if not in use.
607
610
*/
608
- protected ArrayList<Attributes> columnAttributes = new ArrayList();
611
+ protected ArrayList<Attributes> columnAttributes = new ArrayList<> ();
609
612
610
613
/** The one known valid url for readIobis. */
611
614
public static final String IOBIS_URL = "http://www.iobis.org/OBISWEB/ObisControllerServlet";
@@ -16089,52 +16092,91 @@ public void readParquet(
16089
16092
}
16090
16093
}
16091
16094
16092
- private MessageType getParquetSchemaForTable(String name) {
16093
- String schemaProto = "message m {";
16095
+ private boolean isTimeColumn(int col) {
16096
+ return "time".equalsIgnoreCase(getColumnName(col))
16097
+ && Calendar2.SECONDS_SINCE_1970.equals(columnAttributes.get(col).getString("units"));
16098
+ }
16099
+
16100
+ private MessageType getParquetSchemaForTable() {
16101
+ MessageTypeBuilder schemaBuilder = org.apache.parquet.schema.Types.buildMessage();
16094
16102
for (int j = 0; j < nColumns(); j++) {
16095
- String schemaType = "String";
16103
+ String columnName = getColumnName(j);
16104
+ if (isTimeColumn(j)) {
16105
+ schemaBuilder
16106
+ .optional(PrimitiveTypeName.INT64)
16107
+ .as(LogicalTypeAnnotation.timestampType(true, TimeUnit.MILLIS))
16108
+ .named(columnName);
16109
+ continue;
16110
+ }
16096
16111
switch (getColumn(j).elementType()) {
16097
16112
case BYTE:
16098
- schemaType = " INT32" ;
16113
+ schemaBuilder.optional(PrimitiveTypeName. INT32).named(columnName) ;
16099
16114
break;
16100
16115
case SHORT:
16101
- schemaType = " INT32" ;
16116
+ schemaBuilder.optional(PrimitiveTypeName. INT32).named(columnName) ;
16102
16117
break;
16103
16118
case CHAR:
16104
- schemaType = "BINARY";
16119
+ schemaBuilder
16120
+ .optional(PrimitiveTypeName.BINARY)
16121
+ .as(LogicalTypeAnnotation.stringType())
16122
+ .named(columnName);
16105
16123
break;
16106
16124
case INT:
16107
- schemaType = " INT32" ;
16125
+ schemaBuilder.optional(PrimitiveTypeName. INT32).named(columnName) ;
16108
16126
break;
16109
16127
case LONG:
16110
- schemaType = " INT64" ;
16128
+ schemaBuilder.optional(PrimitiveTypeName. INT64).named(columnName) ;
16111
16129
break;
16112
16130
case FLOAT:
16113
- schemaType = " FLOAT" ;
16131
+ schemaBuilder.optional(PrimitiveTypeName. FLOAT).named(columnName) ;
16114
16132
break;
16115
16133
case DOUBLE:
16116
- schemaType = " DOUBLE" ;
16134
+ schemaBuilder.optional(PrimitiveTypeName. DOUBLE).named(columnName) ;
16117
16135
break;
16118
16136
case STRING:
16119
- schemaType = "BINARY";
16137
+ schemaBuilder
16138
+ .optional(PrimitiveTypeName.BINARY)
16139
+ .as(LogicalTypeAnnotation.stringType())
16140
+ .named(columnName);
16120
16141
break;
16121
16142
case UBYTE:
16122
- schemaType = " INT32" ;
16143
+ schemaBuilder.optional(PrimitiveTypeName. INT32).named(columnName) ;
16123
16144
break;
16124
16145
case USHORT:
16125
- schemaType = " INT32" ;
16146
+ schemaBuilder.optional(PrimitiveTypeName. INT32).named(columnName) ;
16126
16147
break;
16127
16148
case UINT:
16128
- schemaType = " INT64" ;
16149
+ schemaBuilder.optional(PrimitiveTypeName. INT64).named(columnName) ;
16129
16150
break;
16130
16151
case ULONG:
16131
- schemaType = " DOUBLE" ;
16152
+ schemaBuilder.optional(PrimitiveTypeName. DOUBLE).named(columnName) ;
16132
16153
break;
16154
+ case BOOLEAN:
16155
+ schemaBuilder.optional(PrimitiveTypeName.BOOLEAN).named(columnName);
16156
+ break;
16157
+ }
16158
+ }
16159
+ return schemaBuilder.named("m");
16160
+ }
16161
+
16162
+ private void addMetadata(Map<String, String> metadata, Attributes attributes, String prefix) {
16163
+ String names[] = attributes.getNames();
16164
+ for (int ni = 0; ni < names.length; ni++) {
16165
+ String tName = names[ni];
16166
+ if (!String2.isSomething(tName)) {
16167
+ continue;
16168
+ }
16169
+ PrimitiveArray tValue = attributes.get(tName);
16170
+ if (tValue == null || tValue.size() == 0 || tValue.toString().length() == 0) {
16171
+ continue; // do nothing
16172
+ }
16173
+ if ("time_".equalsIgnoreCase(prefix)
16174
+ && Calendar2.SECONDS_SINCE_1970.equals(attributes.getString(tName))) {
16175
+ metadata.put(prefix + tName, Calendar2.MILLISECONDS_SINCE_1970);
16176
+ } else {
16177
+ metadata.put(prefix + tName, tValue.toCSVString());
16133
16178
}
16134
- schemaProto += " optional " + schemaType + " " + getColumnName(j) + ";\n";
16135
16179
}
16136
- schemaProto += "}";
16137
- return MessageTypeParser.parseMessageType(schemaProto);
16138
16180
}
16139
16181
16140
16182
/**
@@ -16143,23 +16185,49 @@ private MessageType getParquetSchemaForTable(String name) {
16143
16185
* @param fullFileName This is just used for error messages.
16144
16186
* @throws Exception if trouble, including observed nItems != expected nItems.
16145
16187
*/
16146
- public void writeParquet(String fullFileName) throws Exception {
16188
+ public void writeParquet(String fullFileName, boolean fullMetadata ) throws Exception {
16147
16189
String msg = " Table.writeParquet " + fullFileName;
16148
16190
long time = System.currentTimeMillis();
16149
16191
16150
16192
int randomInt = Math2.random(Integer.MAX_VALUE);
16151
-
16152
- int nameStart = fullFileName.lastIndexOf('/');
16153
- if (nameStart == -1) {
16154
- nameStart = fullFileName.lastIndexOf('\\');
16193
+ MessageType schema = getParquetSchemaForTable();
16194
+
16195
+ Map<String, String> metadata = new HashMap<>();
16196
+ if (fullMetadata) {
16197
+ addMetadata(metadata, globalAttributes, "");
16198
+ for (int col = 0; col < nColumns(); col++) {
16199
+ Attributes colAttributes = columnAttributes.get(col);
16200
+ if (colAttributes == null) {
16201
+ continue;
16202
+ }
16203
+ addMetadata(metadata, colAttributes, getColumnName(col) + "_");
16204
+ }
16155
16205
}
16156
- int nameEnd = fullFileName.lastIndexOf('.');
16157
- String name = fullFileName.substring(nameStart + 1, nameEnd);
16158
- MessageType schema = getParquetSchemaForTable(name);
16159
-
16206
+ String columnNames = "";
16207
+ String columnUnits = "";
16208
+ for (int col = 0; col < nColumns(); col++) {
16209
+ Attributes colAttributes = columnAttributes.get(col);
16210
+ if (colAttributes == null) {
16211
+ continue;
16212
+ }
16213
+ if (columnNames.length() > 0) {
16214
+ columnNames += ",";
16215
+ columnUnits += ",";
16216
+ }
16217
+ columnNames += getColumnName(col);
16218
+ if (isTimeColumn(col)) {
16219
+ columnUnits += Calendar2.MILLISECONDS_SINCE_1970;
16220
+ } else {
16221
+ columnUnits += colAttributes.getString("units");
16222
+ }
16223
+ }
16224
+ metadata.put("column_names", columnNames);
16225
+ metadata.put("column_units", columnUnits);
16160
16226
try (ParquetWriter<List<PAOne>> writer =
16161
16227
new ParquetWriterBuilder(
16162
- schema, new LocalOutputFile(java.nio.file.Path.of(fullFileName + randomInt)))
16228
+ schema,
16229
+ new LocalOutputFile(java.nio.file.Path.of(fullFileName + randomInt)),
16230
+ metadata)
16163
16231
.withCompressionCodec(CompressionCodecName.SNAPPY)
16164
16232
.withRowGroupSize(ParquetWriter.DEFAULT_BLOCK_SIZE)
16165
16233
.withPageSize(ParquetWriter.DEFAULT_PAGE_SIZE)
@@ -16171,7 +16239,12 @@ schema, new LocalOutputFile(java.nio.file.Path.of(fullFileName + randomInt)))
16171
16239
for (int row = 0; row < nRows(); row++) {
16172
16240
ArrayList<PAOne> record = new ArrayList<>();
16173
16241
for (int j = 0; j < nColumns(); j++) {
16174
- record.add(getPAOneData(j, row));
16242
+ if (isTimeColumn(j)) {
16243
+ // Convert from seconds since epoch to millis since epoch.
16244
+ record.add(getPAOneData(j, row).multiply(PAOne.fromInt(1000)));
16245
+ } else {
16246
+ record.add(getPAOneData(j, row));
16247
+ }
16175
16248
}
16176
16249
writer.write(record);
16177
16250
}
0 commit comments