-
Notifications
You must be signed in to change notification settings - Fork 2.4k
Timestamp millis repair #14120
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Timestamp millis repair #14120
Conversation
case LONG: | ||
if (oldSchema.getLogicalType() != newSchema.getLogicalType()) { | ||
if (oldSchema.getLogicalType() instanceof LogicalTypes.TimestampMillis) { | ||
if (skipLogicalTimestampEvolution || oldSchema.getLogicalType() == null || newSchema.getLogicalType() == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
didn' get why we need this flag skipLogicalTimestampEvolution
, we should always rewrite the field if the logical type mismatch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on my understanding, previously, AvroSchemaCompatibility#calculateCompatibility
does not validate the logical timestamp evolution before, so timestamp micros to timestamp millis can happen which leads to precision loss, and such schema evolution should not be allowed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
However, for handling the timestamp issue this PR addresses, the ingestion writer needs to rewrite the schema from timestamp micros to millis.
.withDocumentation("Enables support for Schema Evolution feature"); | ||
|
||
public static final ConfigProperty<Boolean> SCHEMA_EVOLUTION_ALLOW_LOGICAL_EVOLUTION = ConfigProperty | ||
.key("hoodie.schema.evolution.allow.logical.evolution") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why need this flag? timestamp-millis to/from timestamp-micros should always be feasible in schema evolution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As mentioned in the other thread, timestamp-micros to timestamp-millis should not be allowed as it loses precision.
this.readRecords++; | ||
if (this.promotedSchema.isPresent()) { | ||
return HoodieAvroUtils.rewriteRecordWithNewSchema(record, this.promotedSchema.get()); | ||
return HoodieAvroUtils.rewriteRecordWithNewSchema(record, this.promotedSchema.get(), skipLogicalTimestampEvolution); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought we only got problems for Parquets, so avro logs also got mismatch precision for timestamp type and it's values? the avro schema in the log block head comes from the table schema which should be correct right?
if (isTimestampMicros(fileType) && isTimestampMillis(tableType)) { | ||
columnsToMultiply.add(path); | ||
} else if (isLong(fileType) && isLocalTimestampMillis(tableType)) { | ||
columnsToMultiply.add(path); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a new breaking case to handle?
* @param tableSchema The Parquet schema from the table (target) | ||
* @return Set of column paths (e.g., "timestamp", "metadata.created_at") that need multiplication | ||
*/ | ||
public static Set<String> findColumnsToMultiply(MessageType fileSchema, MessageType tableSchema) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could Avro table schema be passed in for comparison instead of converting the Avro table schema to Parquet MessageType for comparison? The additional conversion from Avro to Parquet schema introduces another layer of processing, which can be error-prone (e.g., any change in such a conversion logic can affect the mitigation in this PR).
Cast(expr, dec, if (needTimeZone) timeZoneId else None) | ||
case (StringType, DateType) => | ||
Cast(expr, DateType, if (needTimeZone) timeZoneId else None) | ||
case (LongType, TimestampNTZType) => expr // @ethan I think we just want a no-op here? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now I kind of get it. Is this because the local timestamp or TimestampNTZType is written as Long type in parquet before? Also, there is no regression micros in schema vs millis in values for TimestampNTZType for published Hudi releases correct? If so, there is no need for conversion.
} | ||
|
||
if (typeChangeInfos.isEmpty) { | ||
if (typeChangeInfos.isEmpty && columnsToMultiply.isEmpty) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this mean that the record-level projection overhead is only incurred if there are columns to apply multiplication?
case LONG: | ||
if (oldSchema.getLogicalType() != newSchema.getLogicalType()) { | ||
if (oldSchema.getLogicalType() instanceof LogicalTypes.TimestampMillis) { | ||
if (skipLogicalTimestampEvolution || oldSchema.getLogicalType() == null || newSchema.getLogicalType() == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on my understanding, previously, AvroSchemaCompatibility#calculateCompatibility
does not validate the logical timestamp evolution before, so timestamp micros to timestamp millis can happen which leads to precision loss, and such schema evolution should not be allowed.
case LONG: | ||
if (oldSchema.getLogicalType() != newSchema.getLogicalType()) { | ||
if (oldSchema.getLogicalType() instanceof LogicalTypes.TimestampMillis) { | ||
if (skipLogicalTimestampEvolution || oldSchema.getLogicalType() == null || newSchema.getLogicalType() == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
However, for handling the timestamp issue this PR addresses, the ingestion writer needs to rewrite the schema from timestamp micros to millis.
}) | ||
} | ||
|
||
def recursivelyApplyMultiplication(expr: Expression, columnPath: String, dataType: DataType): Expression = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if we can change HoodieParquetReadSupport
and adding a read support implementation for Avro parquet reader for handling the millis interpretation, which is one layer below the current approach? Would that incur less overhead than the projection?
Describe the issue this Pull Request addresses
Summary and Changelog
Impact
Risk Level
Documentation Update
Contributor's checklist