From bdd36c4ad86b9e209c8c9a145128f7d1097e8362 Mon Sep 17 00:00:00 2001 From: Thomas Abraham Date: Thu, 31 Mar 2022 18:36:34 +0200 Subject: [PATCH 1/2] Adding schemas for the remaining KPI events --- .../nakadi.data.streamed.0.avsc | 51 +++++++++++++++++++ .../nakadi.event.type.log.0.avsc | 27 ++++++++++ .../nakadi.subscription.log.0.avsc | 15 ++++++ 3 files changed, 93 insertions(+) create mode 100644 core-common/src/main/resources/event-type-schema/nakadi.data.streamed/nakadi.data.streamed.0.avsc create mode 100644 core-common/src/main/resources/event-type-schema/nakadi.event.type.log/nakadi.event.type.log.0.avsc create mode 100644 core-common/src/main/resources/event-type-schema/nakadi.subscription.log/nakadi.subscription.log.0.avsc diff --git a/core-common/src/main/resources/event-type-schema/nakadi.data.streamed/nakadi.data.streamed.0.avsc b/core-common/src/main/resources/event-type-schema/nakadi.data.streamed/nakadi.data.streamed.0.avsc new file mode 100644 index 0000000000..f079ae1801 --- /dev/null +++ b/core-common/src/main/resources/event-type-schema/nakadi.data.streamed/nakadi.data.streamed.0.avsc @@ -0,0 +1,51 @@ +{ + "name": "nakadi.data.streamed", + "type": "record", + "doc": "Stores KPI events of type nakadi.data.streamed", + "fields": [ + { + "name": "api", + "description": "This field indicates if data was streamed through low level api or high level api", + "type": "string" + }, + { + "name": "subscription", + "description": "this is optional and is only present when streaming from a subscription", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "event_type", + "type": "string" + }, + { + "name": "app", + "type": "string" + }, + { + "name": "app_hashed", + "type": "string" + }, + { + "name": "token_realm", + "type": "string" + }, + { + "name": "number_of_events", + "type": "long" + }, + { + "name": "bytes_streamed", + "description": "amount of bytes streamed since last event", + "type": "long" + }, + { + "name": "batches_streamed", + "description": "amount of batches streamed since last event", + "type": "int" + } + ] +} \ No newline at end of file diff --git a/core-common/src/main/resources/event-type-schema/nakadi.event.type.log/nakadi.event.type.log.0.avsc b/core-common/src/main/resources/event-type-schema/nakadi.event.type.log/nakadi.event.type.log.0.avsc new file mode 100644 index 0000000000..20cd869c9c --- /dev/null +++ b/core-common/src/main/resources/event-type-schema/nakadi.event.type.log/nakadi.event.type.log.0.avsc @@ -0,0 +1,27 @@ +{ + "name": "nakadi.event.type.log", + "type": "record", + "doc": "Stores KPI events of type nakadi.event.type.log", + "fields": [ + { + "name": "event_type", + "type": "string" + }, + { + "name": "status", + "type": "string" + }, + { + "name": "category", + "type": "string" + }, + { + "name": "authz", + "type": "string" + }, + { + "name": "compatibility_mode", + "type": "string" + } + ] +} \ No newline at end of file diff --git a/core-common/src/main/resources/event-type-schema/nakadi.subscription.log/nakadi.subscription.log.0.avsc b/core-common/src/main/resources/event-type-schema/nakadi.subscription.log/nakadi.subscription.log.0.avsc new file mode 100644 index 0000000000..118c602eee --- /dev/null +++ b/core-common/src/main/resources/event-type-schema/nakadi.subscription.log/nakadi.subscription.log.0.avsc @@ -0,0 +1,15 @@ +{ + "name": "nakadi.subscription.log", + "type": "record", + "doc": "Stores KPI events of type nakadi.subscription.log", + "fields": [ + { + "name": "subscription_id", + "type": "string" + }, + { + "name": "status", + "type": "string" + } + ] +} \ No newline at end of file From 751e491676f00a6e1f777707ff3c15ba794ea521 Mon Sep 17 00:00:00 2001 From: Thomas Abraham Date: Fri, 1 Apr 2022 11:44:47 +0200 Subject: [PATCH 2/2] Testing that every schema version is loadable --- .../zalando/nakadi/config/KPIEventTypes.java | 3 ++ .../zalando/nakadi/service/AvroSchema.java | 5 ++- .../nakadi.data.streamed.0.avsc | 2 +- .../nakadi.event.type.log.0.avsc | 2 +- .../nakadi.subscription.log.0.avsc | 2 +- .../nakadi/service/AvroSchemaTest.java | 40 +++++++++++++++++++ .../src/test/resources/log4j.properties | 25 ++++++++++++ 7 files changed, 75 insertions(+), 4 deletions(-) create mode 100644 core-common/src/test/java/org/zalando/nakadi/service/AvroSchemaTest.java create mode 100644 core-common/src/test/resources/log4j.properties diff --git a/core-common/src/main/java/org/zalando/nakadi/config/KPIEventTypes.java b/core-common/src/main/java/org/zalando/nakadi/config/KPIEventTypes.java index 7b2b73162a..5977250275 100644 --- a/core-common/src/main/java/org/zalando/nakadi/config/KPIEventTypes.java +++ b/core-common/src/main/java/org/zalando/nakadi/config/KPIEventTypes.java @@ -3,4 +3,7 @@ public final class KPIEventTypes { public static final String ACCESS_LOG = "nakadi.access.log"; public static final String BATCH_PUBLISHED = "nakadi.batch.published"; + public static final String DATA_STREAMED = "nakadi.data.streamed"; + public static final String EVENT_TYPE_LOG = "nakadi.event.type.log"; + public static final String SUBSCRIPTION_LOG = "nakadi.subscription.log"; } diff --git a/core-common/src/main/java/org/zalando/nakadi/service/AvroSchema.java b/core-common/src/main/java/org/zalando/nakadi/service/AvroSchema.java index 78c2ffdc7d..ea9013b0df 100644 --- a/core-common/src/main/java/org/zalando/nakadi/service/AvroSchema.java +++ b/core-common/src/main/java/org/zalando/nakadi/service/AvroSchema.java @@ -31,7 +31,10 @@ public class AvroSchema { private static final Collection INTERNAL_EVENT_TYPE_NAMES = Set.of( METADATA_KEY, KPIEventTypes.ACCESS_LOG, - KPIEventTypes.BATCH_PUBLISHED); + KPIEventTypes.BATCH_PUBLISHED, + KPIEventTypes.DATA_STREAMED, + KPIEventTypes.EVENT_TYPE_LOG, + KPIEventTypes.SUBSCRIPTION_LOG); private final Map> eventTypeSchema; private final AvroMapper avroMapper; diff --git a/core-common/src/main/resources/event-type-schema/nakadi.data.streamed/nakadi.data.streamed.0.avsc b/core-common/src/main/resources/event-type-schema/nakadi.data.streamed/nakadi.data.streamed.0.avsc index f079ae1801..ec75998b3a 100644 --- a/core-common/src/main/resources/event-type-schema/nakadi.data.streamed/nakadi.data.streamed.0.avsc +++ b/core-common/src/main/resources/event-type-schema/nakadi.data.streamed/nakadi.data.streamed.0.avsc @@ -48,4 +48,4 @@ "type": "int" } ] -} \ No newline at end of file +} diff --git a/core-common/src/main/resources/event-type-schema/nakadi.event.type.log/nakadi.event.type.log.0.avsc b/core-common/src/main/resources/event-type-schema/nakadi.event.type.log/nakadi.event.type.log.0.avsc index 20cd869c9c..cbaf7bb5f6 100644 --- a/core-common/src/main/resources/event-type-schema/nakadi.event.type.log/nakadi.event.type.log.0.avsc +++ b/core-common/src/main/resources/event-type-schema/nakadi.event.type.log/nakadi.event.type.log.0.avsc @@ -24,4 +24,4 @@ "type": "string" } ] -} \ No newline at end of file +} diff --git a/core-common/src/main/resources/event-type-schema/nakadi.subscription.log/nakadi.subscription.log.0.avsc b/core-common/src/main/resources/event-type-schema/nakadi.subscription.log/nakadi.subscription.log.0.avsc index 118c602eee..3fd8cc0fa7 100644 --- a/core-common/src/main/resources/event-type-schema/nakadi.subscription.log/nakadi.subscription.log.0.avsc +++ b/core-common/src/main/resources/event-type-schema/nakadi.subscription.log/nakadi.subscription.log.0.avsc @@ -12,4 +12,4 @@ "type": "string" } ] -} \ No newline at end of file +} diff --git a/core-common/src/test/java/org/zalando/nakadi/service/AvroSchemaTest.java b/core-common/src/test/java/org/zalando/nakadi/service/AvroSchemaTest.java new file mode 100644 index 0000000000..c3592f36be --- /dev/null +++ b/core-common/src/test/java/org/zalando/nakadi/service/AvroSchemaTest.java @@ -0,0 +1,40 @@ +package org.zalando.nakadi.service; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.avro.AvroMapper; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; +import org.springframework.core.io.DefaultResourceLoader; +import org.zalando.nakadi.config.KPIEventTypes; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +public class AvroSchemaTest { + private final AvroSchema avroSchema; + private final Map> allSchemas = Map.of( + AvroSchema.METADATA_KEY, List.of("0", "1"), + KPIEventTypes.ACCESS_LOG, List.of("0", "1"), + KPIEventTypes.BATCH_PUBLISHED, List.of("0"), + KPIEventTypes.DATA_STREAMED, List.of("0"), + KPIEventTypes.EVENT_TYPE_LOG, List.of("0"), + KPIEventTypes.SUBSCRIPTION_LOG, List.of("0") + ); + + public AvroSchemaTest() throws IOException { + final var eventTypeRes = new DefaultResourceLoader().getResource("event-type-schema/"); + avroSchema = new AvroSchema(new AvroMapper(), new ObjectMapper(), eventTypeRes); + } + + @Test + public void testAllSchemaVersionAreLoadable() { + allSchemas.entrySet().stream() + .flatMap(schemaEntry -> schemaEntry.getValue().stream() + .map(version -> Map.entry(schemaEntry.getKey(), version))) + .forEach(schemaEntry -> Assertions.assertDoesNotThrow( + () -> avroSchema.getEventTypeSchema(schemaEntry.getKey(), schemaEntry.getValue()), + "Schema of " + schemaEntry.getKey() + + ", version " + schemaEntry.getValue() + " unavailable")); + } +} diff --git a/core-common/src/test/resources/log4j.properties b/core-common/src/test/resources/log4j.properties new file mode 100644 index 0000000000..b0e049c640 --- /dev/null +++ b/core-common/src/test/resources/log4j.properties @@ -0,0 +1,25 @@ +log4j.rootCategory=INFO, CONSOLE + +LOG_PATTERN=[%d{yyyy-MM-dd HH:mm:ss.SSSXXX}] [%p] [%x] [%t] [%c] --- %m %throwable{compact} %n + +# CONSOLE is set to be a ConsoleAppender using a PatternLayout. +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +log4j.appender.CONSOLE.layout=org.apache.log4j.EnhancedPatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=${LOG_PATTERN} + +log4j.category.org.apache.catalina.startup.DigesterFactory=ERROR +log4j.category.org.apache.catalina.util.LifecycleBase=ERROR +log4j.category.org.apache.coyote.http11.Http11NioProtocol=WARN +log4j.category.org.apache.tomcat.util.net.NioSelectorPool=WARN +log4j.category.org.crsh.plugin=WARN +log4j.category.org.crsh.ssh=WARN +log4j.category.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR +log4j.category.org.hibernate.validator.internal.util.Version=WARN +log4j.category.org.springframework.boot.actuate.autoconfigure.CrshAutoConfiguration=WARN +log4j.category.org.springframework.boot.actuate.endpoint.jmx=WARN +log4j.category.org.thymeleaf=WARN +log4j.category.org.zalando.nakadi=DEBUG +log4j.category.org.zalando.nakadi.config=INFO +log4j.category.org.apache.kafka=WARN +log4j.category.org.zalando.nakadi.service.ClosedConnectionsCrutch=WARN +log4j.category.org.zalando.nakadi.repository.kafka.KafkaFactory$KafkaCrutchConsumer=WARN