Skip to content

Conversation

yihua
Copy link
Contributor

@yihua yihua commented Oct 18, 2025

Describe the issue this Pull Request addresses

Summary and Changelog

Impact

Risk Level

Documentation Update

Contributor's checklist

  • Read through contributor's guide
  • Enough context is provided in the sections above
  • Adequate tests were added if applicable

@github-actions github-actions bot added the size:L PR with lines of changes in (300, 1000] label Oct 18, 2025
@hudi-bot
Copy link

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

case LONG:
if (oldSchema.getLogicalType() != newSchema.getLogicalType()) {
if (oldSchema.getLogicalType() instanceof LogicalTypes.TimestampMillis) {
if (skipLogicalTimestampEvolution || oldSchema.getLogicalType() == null || newSchema.getLogicalType() == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

didn' get why we need this flag skipLogicalTimestampEvolution, we should always rewrite the field if the logical type mismatch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on my understanding, previously, AvroSchemaCompatibility#calculateCompatibility does not validate the logical timestamp evolution before, so timestamp micros to timestamp millis can happen which leads to precision loss, and such schema evolution should not be allowed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, for handling the timestamp issue this PR addresses, the ingestion writer needs to rewrite the schema from timestamp micros to millis.

.withDocumentation("Enables support for Schema Evolution feature");

public static final ConfigProperty<Boolean> SCHEMA_EVOLUTION_ALLOW_LOGICAL_EVOLUTION = ConfigProperty
.key("hoodie.schema.evolution.allow.logical.evolution")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why need this flag? timestamp-millis to/from timestamp-micros should always be feasible in schema evolution.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned in the other thread, timestamp-micros to timestamp-millis should not be allowed as it loses precision.

this.readRecords++;
if (this.promotedSchema.isPresent()) {
return HoodieAvroUtils.rewriteRecordWithNewSchema(record, this.promotedSchema.get());
return HoodieAvroUtils.rewriteRecordWithNewSchema(record, this.promotedSchema.get(), skipLogicalTimestampEvolution);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we only got problems for Parquets, so avro logs also got mismatch precision for timestamp type and it's values? the avro schema in the log block head comes from the table schema which should be correct right?

if (isTimestampMicros(fileType) && isTimestampMillis(tableType)) {
columnsToMultiply.add(path);
} else if (isLong(fileType) && isLocalTimestampMillis(tableType)) {
columnsToMultiply.add(path);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a new breaking case to handle?

* @param tableSchema The Parquet schema from the table (target)
* @return Set of column paths (e.g., "timestamp", "metadata.created_at") that need multiplication
*/
public static Set<String> findColumnsToMultiply(MessageType fileSchema, MessageType tableSchema) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could Avro table schema be passed in for comparison instead of converting the Avro table schema to Parquet MessageType for comparison? The additional conversion from Avro to Parquet schema introduces another layer of processing, which can be error-prone (e.g., any change in such a conversion logic can affect the mitigation in this PR).

Cast(expr, dec, if (needTimeZone) timeZoneId else None)
case (StringType, DateType) =>
Cast(expr, DateType, if (needTimeZone) timeZoneId else None)
case (LongType, TimestampNTZType) => expr // @ethan I think we just want a no-op here?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I kind of get it. Is this because the local timestamp or TimestampNTZType is written as Long type in parquet before? Also, there is no regression micros in schema vs millis in values for TimestampNTZType for published Hudi releases correct? If so, there is no need for conversion.

}

if (typeChangeInfos.isEmpty) {
if (typeChangeInfos.isEmpty && columnsToMultiply.isEmpty) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean that the record-level projection overhead is only incurred if there are columns to apply multiplication?

case LONG:
if (oldSchema.getLogicalType() != newSchema.getLogicalType()) {
if (oldSchema.getLogicalType() instanceof LogicalTypes.TimestampMillis) {
if (skipLogicalTimestampEvolution || oldSchema.getLogicalType() == null || newSchema.getLogicalType() == null) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on my understanding, previously, AvroSchemaCompatibility#calculateCompatibility does not validate the logical timestamp evolution before, so timestamp micros to timestamp millis can happen which leads to precision loss, and such schema evolution should not be allowed.

case LONG:
if (oldSchema.getLogicalType() != newSchema.getLogicalType()) {
if (oldSchema.getLogicalType() instanceof LogicalTypes.TimestampMillis) {
if (skipLogicalTimestampEvolution || oldSchema.getLogicalType() == null || newSchema.getLogicalType() == null) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, for handling the timestamp issue this PR addresses, the ingestion writer needs to rewrite the schema from timestamp micros to millis.

})
}

def recursivelyApplyMultiplication(expr: Expression, columnPath: String, dataType: DataType): Expression = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we can change HoodieParquetReadSupport and adding a read support implementation for Avro parquet reader for handling the millis interpretation, which is one layer below the current approach? Would that incur less overhead than the projection?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size:L PR with lines of changes in (300, 1000]

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants