Skip to content

Commit 6db405e

Browse files
committed
Fix compatibility with Hive 4 timestamps
1 parent a27ceea commit 6db405e

File tree

61 files changed

+283
-182
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

61 files changed

+283
-182
lines changed

presto-hive-function-namespace/src/main/java/com/facebook/presto/hive/functions/type/ObjectEncoders.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ else if (Decimals.isLongDecimal(type)) {
118118
return compose(primitive(inspector), o -> (Double) o);
119119
case TIMESTAMP:
120120
checkArgument(inspector instanceof PrimitiveObjectInspector);
121-
return compose(primitive(inspector), o -> ((Timestamp) o).toSqlTimestamp().getTime());
121+
return compose(primitive(inspector), o -> ((Timestamp) o).toEpochMilli());
122122
case VARBINARY:
123123
if (inspector instanceof BinaryObjectInspector) {
124124
return compose(primitive(inspector), o -> Slices.wrappedBuffer(((byte[]) o)));

presto-hive/src/main/java/com/facebook/presto/hive/GenericHiveRecordCursor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,7 @@ private static long getLongExpressedValue(Object value, DateTimeZone hiveTimeZon
321321
// time zone. We need to convert it to the configured time zone.
322322

323323
// the timestamp that Hive parsed using the JVM time zone
324-
long parsedJvmMillis = ((Timestamp) value).toSqlTimestamp().getTime();
324+
long parsedJvmMillis = ((Timestamp) value).toEpochMilli();
325325

326326
// remove the JVM time zone correction from the timestamp
327327
long hiveMillis = JVM_TIME_ZONE.convertUTCToLocal(parsedJvmMillis);

presto-hive/src/main/java/com/facebook/presto/hive/util/SerDeUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,7 @@ private static long formatDateAsLong(Object object, DateObjectInspector inspecto
294294
private static long formatTimestampAsLong(Object object, TimestampObjectInspector inspector, DateTimeZone hiveStorageTimeZone)
295295
{
296296
Timestamp timestamp = getTimestamp(object, inspector);
297-
long parsedJvmMillis = timestamp.toSqlTimestamp().getTime();
297+
long parsedJvmMillis = timestamp.toEpochMilli();
298298

299299
// remove the JVM time zone correction from the timestamp
300300
long hiveMillis = JVM_TIME_ZONE.convertUTCToLocal(parsedJvmMillis);

presto-hive/src/test/java/com/facebook/presto/hive/TestHiveBucketing.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
3939
import org.testng.annotations.Test;
4040

41-
import java.time.Instant;
4241
import java.time.LocalDate;
4342
import java.time.LocalDateTime;
4443
import java.time.ZoneOffset;
@@ -291,9 +290,8 @@ private static Object toNativeContainerValue(Type type, Object hiveValue)
291290
assertEquals(daysSinceEpochInLocalZone, DateWritableV2.dateToDays((Date) hiveValue));
292291
return daysSinceEpochInLocalZone;
293292
case StandardTypes.TIMESTAMP:
294-
Instant instant = ((Timestamp) hiveValue).toSqlTimestamp().toInstant();
295-
long epochSecond = instant.getEpochSecond();
296-
int nano = instant.getNano();
293+
long epochSecond = ((Timestamp) hiveValue).toEpochSecond();
294+
int nano = ((Timestamp) hiveValue).getNanos();
297295
assertEquals(nano % 1_000_000, 0);
298296
return epochSecond * 1000 + nano / 1_000_000;
299297
default:

presto-orc/src/main/java/com/facebook/presto/orc/AbstractOrcRecordReader.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343

4444
import java.io.Closeable;
4545
import java.io.IOException;
46+
import java.time.ZoneId;
4647
import java.util.ArrayList;
4748
import java.util.Collections;
4849
import java.util.HashMap;
@@ -264,7 +265,8 @@ public AbstractOrcRecordReader(
264265
this.dwrfEncryptionGroupMap,
265266
runtimeStats,
266267
fileIntrospector,
267-
fileModificationTime);
268+
fileModificationTime,
269+
hiveStorageTimeZone.toTimeZone().toZoneId());
268270

269271
this.streamReaders = requireNonNull(streamReaders, "streamReaders is null");
270272
for (int columnId = 0; columnId < root.getFieldCount(); columnId++) {
@@ -671,9 +673,10 @@ private void advanceToNextStripe()
671673
SharedBuffer sharedDecompressionBuffer = new SharedBuffer(currentStripeSystemMemoryContext.newOrcLocalMemoryContext("sharedDecompressionBuffer"));
672674
Stripe stripe = stripeReader.readStripe(stripeInformation, currentStripeSystemMemoryContext, dwrfEncryptionInfo, sharedDecompressionBuffer);
673675
if (stripe != null) {
676+
ZoneId timezone = stripe.getTimezone();
674677
for (StreamReader column : streamReaders) {
675678
if (column != null) {
676-
column.startStripe(stripe);
679+
column.startStripe(timezone, stripe);
677680
}
678681
}
679682

presto-orc/src/main/java/com/facebook/presto/orc/DecodeTimestampOptions.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,8 @@
1313
*/
1414
package com.facebook.presto.orc;
1515

16-
import org.joda.time.DateTime;
17-
import org.joda.time.DateTimeZone;
18-
16+
import java.time.ZoneId;
17+
import java.time.ZonedDateTime;
1918
import java.util.concurrent.TimeUnit;
2019

2120
import static java.util.Objects.requireNonNull;
@@ -29,17 +28,17 @@ public class DecodeTimestampOptions
2928
private final long nanosecondsPerUnit;
3029
private final long baseSeconds;
3130

32-
public DecodeTimestampOptions(DateTimeZone hiveStorageTimeZone, boolean enableMicroPrecision)
31+
public DecodeTimestampOptions(ZoneId timezone, boolean enableMicroPrecision)
3332
{
3433
this.enableMicroPrecision = enableMicroPrecision;
3534
TimeUnit timeUnit = enableMicroPrecision ? MICROSECONDS : MILLISECONDS;
3635

37-
requireNonNull(hiveStorageTimeZone, "hiveStorageTimeZone is null");
36+
requireNonNull(timezone, "timezone is null");
3837

3938
this.unitsPerSecond = timeUnit.convert(1, TimeUnit.SECONDS);
4039
this.nanosecondsPerUnit = TimeUnit.NANOSECONDS.convert(1, timeUnit);
4140

42-
this.baseSeconds = MILLISECONDS.toSeconds(new DateTime(2015, 1, 1, 0, 0, hiveStorageTimeZone).getMillis());
41+
this.baseSeconds = ZonedDateTime.of(2015, 1, 1, 0, 0, 0, 0, timezone).toEpochSecond();
4342
}
4443

4544
public boolean enableMicroPrecision()
@@ -58,7 +57,7 @@ public long getNanosPerUnit()
5857
}
5958

6059
/**
61-
* @return Seconds since 01/01/2015 (see https://orc.apache.org/specification/ORCv1/) in hive storage timezone (see {@link DecodeTimestampOptions#DecodeTimestampOptions(DateTimeZone, boolean)} })
60+
* @return Seconds since 01/01/2015 (see https://orc.apache.org/specification/ORCv1/) in hive storage timezone (see {@link DecodeTimestampOptions#DecodeTimestampOptions(ZoneId, boolean)} })
6261
*/
6362
public long getBaseSeconds()
6463
{

presto-orc/src/main/java/com/facebook/presto/orc/OrcBatchRecordReader.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public OrcBatchRecordReader(
8080
// doesn't have a local buffer. All non-leaf level StreamReaders' (e.g. MapStreamReader, LongStreamReader,
8181
// ListStreamReader and StructStreamReader) instance sizes were not counted, because calling setBytes() in
8282
// their constructors is confusing.
83-
createStreamReaders(orcDataSource, types, hiveStorageTimeZone, options, includedColumns, systemMemoryUsage.newOrcAggregatedMemoryContext()),
83+
createStreamReaders(orcDataSource, types, options, includedColumns, systemMemoryUsage.newOrcAggregatedMemoryContext()),
8484
predicate,
8585
numberOfRows,
8686
fileStripes,
@@ -155,7 +155,6 @@ private void validateWritePageChecksum(int batchSize)
155155
private static BatchStreamReader[] createStreamReaders(
156156
OrcDataSource orcDataSource,
157157
List<OrcType> types,
158-
DateTimeZone hiveStorageTimeZone,
159158
OrcRecordReaderOptions options,
160159
Map<Integer, Type> includedColumns,
161160
OrcAggregatedMemoryContext systemMemoryContext)
@@ -170,7 +169,7 @@ private static BatchStreamReader[] createStreamReaders(
170169
Type type = includedColumns.get(columnId);
171170
if (type != null) {
172171
StreamDescriptor streamDescriptor = streamDescriptors.get(columnId);
173-
streamReaders[columnId] = BatchStreamReaders.createStreamReader(type, streamDescriptor, hiveStorageTimeZone, options, systemMemoryContext);
172+
streamReaders[columnId] = BatchStreamReaders.createStreamReader(type, streamDescriptor, options, systemMemoryContext);
174173
}
175174
}
176175
}

presto-orc/src/main/java/com/facebook/presto/orc/OrcSelectiveRecordReader.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,6 @@ public OrcSelectiveRecordReader(
196196
createStreamReaders(
197197
orcDataSource,
198198
types,
199-
hiveStorageTimeZone,
200199
options,
201200
includedColumns,
202201
outputColumns,
@@ -583,7 +582,6 @@ private static int[] orderStreamReaders(
583582
private static SelectiveStreamReader[] createStreamReaders(
584583
OrcDataSource orcDataSource,
585584
List<OrcType> types,
586-
DateTimeZone hiveStorageTimeZone,
587585
OrcRecordReaderOptions options,
588586
Map<Integer, Type> includedColumns,
589587
List<Integer> outputColumns,
@@ -615,7 +613,6 @@ private static SelectiveStreamReader[] createStreamReaders(
615613
Optional.ofNullable(filters.get(columnId)).orElse(ImmutableMap.of()),
616614
outputRequired ? Optional.of(includedColumns.get(columnId)) : Optional.empty(),
617615
Optional.ofNullable(requiredSubfields.get(columnId)).orElse(ImmutableList.of()),
618-
hiveStorageTimeZone,
619616
options,
620617
systemMemoryContext,
621618
false);

presto-orc/src/main/java/com/facebook/presto/orc/OrcWriteValidation.java

Lines changed: 54 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import io.airlift.slice.XxHash64;
5555
import org.openjdk.jol.info.ClassLayout;
5656

57+
import java.time.ZoneId;
5758
import java.util.ArrayList;
5859
import java.util.HashMap;
5960
import java.util.HashSet;
@@ -83,6 +84,7 @@
8384
import static com.facebook.presto.common.type.TimestampType.TIMESTAMP_MICROSECONDS;
8485
import static com.facebook.presto.common.type.TinyintType.TINYINT;
8586
import static com.facebook.presto.common.type.VarbinaryType.VARBINARY;
87+
import static com.facebook.presto.orc.OrcEncoding.DWRF;
8688
import static com.facebook.presto.orc.OrcWriteValidation.OrcWriteValidationMode.BOTH;
8789
import static com.facebook.presto.orc.OrcWriteValidation.OrcWriteValidationMode.DETAILED;
8890
import static com.facebook.presto.orc.OrcWriteValidation.OrcWriteValidationMode.HASHED;
@@ -124,6 +126,9 @@ public enum OrcWriteValidationMode
124126
// keeps all flat map value nodes
125127
private final Set<Integer> flattenedValueNodes;
126128

129+
private final OrcEncoding orcEncoding;
130+
private final ZoneId timezone;
131+
127132
// all values passed into this constructor are collected by the writer
128133
private OrcWriteValidation(
129134
List<Integer> version,
@@ -137,7 +142,9 @@ private OrcWriteValidation(
137142
List<ColumnStatistics> fileStatistics,
138143
int stringStatisticsLimitInBytes,
139144
Set<Integer> flattenedNodes,
140-
List<OrcType> orcTypes)
145+
List<OrcType> orcTypes,
146+
OrcEncoding orcEncoding,
147+
ZoneId timezone)
141148
{
142149
this.version = version;
143150
this.compression = compression;
@@ -152,6 +159,8 @@ private OrcWriteValidation(
152159
this.flattenedKeyToMapNodes = getFlattenedKeyToMapNodes(flattenedNodes, orcTypes);
153160
this.flattenedValueNodes = getFlattenedValueNodes(flattenedNodes, orcTypes);
154161
this.flattenedMapToValueNodes = getFlattenedMapToValueNodes(flattenedNodes, orcTypes);
162+
this.orcEncoding = orcEncoding;
163+
this.timezone = timezone;
155164
}
156165

157166
public List<Integer> getVersion()
@@ -182,16 +191,42 @@ public Map<String, Slice> getMetadata()
182191
public void validateMetadata(OrcDataSourceId orcDataSourceId, Map<String, Slice> actualMetadata)
183192
throws OrcCorruptionException
184193
{
185-
// Filter out metadata value statically added by the DWRF writer
186-
Map<String, Slice> filteredMetadata = actualMetadata.entrySet().stream()
187-
.filter(entry -> !STATIC_METADATA.containsKey(entry.getKey()))
188-
.collect(toImmutableMap(Entry::getKey, Entry::getValue));
194+
if (isDwrf()) {
195+
// Filter out metadata value statically added by the DWRF writer
196+
actualMetadata = actualMetadata.entrySet().stream()
197+
.filter(entry -> !STATIC_METADATA.containsKey(entry.getKey()))
198+
.collect(toImmutableMap(Entry::getKey, Entry::getValue));
199+
}
189200

190-
if (!metadata.equals(filteredMetadata)) {
201+
if (!metadata.equals(actualMetadata)) {
191202
throw new OrcCorruptionException(orcDataSourceId, "Unexpected metadata");
192203
}
193204
}
194205

206+
public OrcEncoding getOrcEncoding()
207+
{
208+
return orcEncoding;
209+
}
210+
211+
public boolean isDwrf()
212+
{
213+
return orcEncoding == DWRF;
214+
}
215+
216+
public ZoneId getTimezone()
217+
{
218+
return timezone;
219+
}
220+
221+
public void validateTimeZone(OrcDataSourceId orcDataSourceId, ZoneId actualTimezone)
222+
throws OrcCorruptionException
223+
{
224+
// DWRF does not store the writer timezone
225+
if (!isDwrf() && !timezone.equals(actualTimezone)) {
226+
throw new OrcCorruptionException(orcDataSourceId, "Unexpected timezone");
227+
}
228+
}
229+
195230
public WriteChecksum getChecksum()
196231
{
197232
return checksum;
@@ -209,7 +244,7 @@ public void validateStripeStatistics(OrcDataSourceId orcDataSourceId, List<Strip
209244
requireNonNull(actualStripes, "actualStripes is null");
210245
requireNonNull(actualStripeStatistics, "actualStripeStatistics is null");
211246

212-
if (actualStripeStatistics.isEmpty()) {
247+
if (isDwrf()) {
213248
// DWRF does not have stripe statistics
214249
return;
215250
}
@@ -974,11 +1009,14 @@ public static class OrcWriteValidationBuilder
9741009
private long retainedSize = INSTANCE_SIZE;
9751010
private Set<Integer> flattenedNodes;
9761011
private List<OrcType> orcTypes;
1012+
private final OrcEncoding orcEncoding;
1013+
private ZoneId timezone;
9771014

978-
public OrcWriteValidationBuilder(OrcWriteValidationMode validationMode, List<Type> types)
1015+
public OrcWriteValidationBuilder(OrcWriteValidationMode validationMode, List<Type> types, OrcEncoding orcEncoding)
9791016
{
9801017
this.validationMode = validationMode;
9811018
this.checksum = new WriteChecksumBuilder(types);
1019+
this.orcEncoding = orcEncoding;
9821020
}
9831021

9841022
public long getRetainedSize()
@@ -1067,6 +1105,11 @@ public void setOrcTypes(List<OrcType> orcTypes)
10671105
this.orcTypes = orcTypes;
10681106
}
10691107

1108+
public void setTimezone(ZoneId timezone)
1109+
{
1110+
this.timezone = timezone;
1111+
}
1112+
10701113
public OrcWriteValidation build()
10711114
{
10721115
return new OrcWriteValidation(
@@ -1081,7 +1124,9 @@ public OrcWriteValidation build()
10811124
fileStatistics,
10821125
stringStatisticsLimitInBytes,
10831126
flattenedNodes,
1084-
orcTypes);
1127+
orcTypes,
1128+
orcEncoding,
1129+
timezone);
10851130
}
10861131
}
10871132
}

presto-orc/src/main/java/com/facebook/presto/orc/OrcWriter.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363

6464
import java.io.Closeable;
6565
import java.io.IOException;
66+
import java.time.ZoneId;
6667
import java.util.ArrayList;
6768
import java.util.Collections;
6869
import java.util.HashMap;
@@ -206,7 +207,7 @@ public OrcWriter(
206207
OrcWriteValidationMode validationMode,
207208
WriterStats stats)
208209
{
209-
this.validationBuilder = validate ? new OrcWriteValidation.OrcWriteValidationBuilder(validationMode, types).setStringStatisticsLimitInBytes(toIntExact(options.getMaxStringStatisticsLimit().toBytes())) : null;
210+
this.validationBuilder = validate ? new OrcWriteValidation.OrcWriteValidationBuilder(validationMode, types, orcEncoding).setStringStatisticsLimitInBytes(toIntExact(options.getMaxStringStatisticsLimit().toBytes())) : null;
210211

211212
this.dataSink = requireNonNull(dataSink, "dataSink is null");
212213
this.types = ImmutableList.copyOf(requireNonNull(types, "types is null"));
@@ -240,6 +241,7 @@ public OrcWriter(
240241
recordValidation(validation -> validation.setCompression(compressionKind));
241242
recordValidation(validation -> validation.setFlattenedNodes(flattenedNodes));
242243
recordValidation(validation -> validation.setOrcTypes(orcTypes));
244+
recordValidation(validation -> validation.setTimezone(hiveStorageTimeZone.toTimeZone().toZoneId()));
243245

244246
requireNonNull(options, "options is null");
245247
this.flushPolicy = requireNonNull(options.getFlushPolicy(), "flushPolicy is null");
@@ -629,7 +631,8 @@ private List<DataOutput> bufferStripeData(long stripeStartOffset, FlushReason fl
629631
.collect(toImmutableMap(Entry::getKey, Entry::getValue));
630632
List<Slice> encryptedGroups = createEncryptedGroups(encryptedStreams, encryptedColumnEncodings);
631633

632-
StripeFooter stripeFooter = new StripeFooter(unencryptedStreams, unencryptedColumnEncodings, encryptedGroups);
634+
Optional<ZoneId> timezone = Optional.of(hiveStorageTimeZone.toTimeZone().toZoneId());
635+
StripeFooter stripeFooter = new StripeFooter(unencryptedStreams, unencryptedColumnEncodings, encryptedGroups, timezone);
633636
Slice footer = metadataWriter.writeStripeFooter(stripeFooter);
634637
DataOutput footerDataOutput = createDataOutput(footer);
635638
dwrfStripeCacheWriter.ifPresent(stripeCacheWriter -> stripeCacheWriter.addStripeFooter(createDataOutput(footer)));

0 commit comments

Comments
 (0)