Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Merge pull request #1382 from zalando/nakadi-batch-published-to-avro
Browse files Browse the repository at this point in the history
Bugfix: metadata should have version of event-type schema
  • Loading branch information
thomasabraham authored Mar 30, 2022
2 parents ed827c3 + 0a316d6 commit d95baf9
Showing 1 changed file with 8 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,12 @@ public void publishNakadiBatchPublishedEvent(
avroSchema.getLatestEventTypeSchemaVersion(AvroSchema.METADATA_KEY);
final byte metadataVersion = Byte.parseByte(latestMeta.getKey());

final GenericRecord metadata = buildMetaDataGenericRecord(
KPIEventTypes.BATCH_PUBLISHED, latestMeta.getValue(), latestMeta.getKey());

final Map.Entry<String, Schema> latestSchema =
avroSchema.getLatestEventTypeSchemaVersion(KPIEventTypes.BATCH_PUBLISHED);

final GenericRecord metadata = buildMetaDataGenericRecord(
KPIEventTypes.BATCH_PUBLISHED, latestMeta.getValue(), latestSchema.getKey());

final GenericRecord event = new GenericRecordBuilder(latestSchema.getValue())
.set("event_type", eventTypeName)
.set("app", applicationName)
Expand Down Expand Up @@ -143,12 +144,13 @@ public void publishAccessLogEvent(final String method,
avroSchema.getLatestEventTypeSchemaVersion(AvroSchema.METADATA_KEY);
final byte metadataVersion = Byte.parseByte(latestMeta.getKey());

final Map.Entry<String, Schema> latestSchema =
avroSchema.getLatestEventTypeSchemaVersion(KPIEventTypes.ACCESS_LOG);

final GenericRecord metadata = buildMetaDataGenericRecord(
KPIEventTypes.ACCESS_LOG, latestMeta.getValue(), latestMeta.getKey(), user);
KPIEventTypes.ACCESS_LOG, latestMeta.getValue(), latestSchema.getKey(), user);


final Map.Entry<String, Schema> latestSchema =
avroSchema.getLatestEventTypeSchemaVersion(KPIEventTypes.ACCESS_LOG);
final GenericRecord event = new GenericRecordBuilder(latestSchema.getValue())
.set("method", method)
.set("path", path)
Expand Down

0 comments on commit d95baf9

Please sign in to comment.