diff --git a/core-common/src/main/java/org/zalando/nakadi/config/KPIEventTypes.java b/core-common/src/main/java/org/zalando/nakadi/config/KPIEventTypes.java index 7b2b73162a..5977250275 100644 --- a/core-common/src/main/java/org/zalando/nakadi/config/KPIEventTypes.java +++ b/core-common/src/main/java/org/zalando/nakadi/config/KPIEventTypes.java @@ -3,4 +3,7 @@ public final class KPIEventTypes { public static final String ACCESS_LOG = "nakadi.access.log"; public static final String BATCH_PUBLISHED = "nakadi.batch.published"; + public static final String DATA_STREAMED = "nakadi.data.streamed"; + public static final String EVENT_TYPE_LOG = "nakadi.event.type.log"; + public static final String SUBSCRIPTION_LOG = "nakadi.subscription.log"; } diff --git a/core-common/src/main/java/org/zalando/nakadi/service/AvroSchema.java b/core-common/src/main/java/org/zalando/nakadi/service/AvroSchema.java index 78c2ffdc7d..ea9013b0df 100644 --- a/core-common/src/main/java/org/zalando/nakadi/service/AvroSchema.java +++ b/core-common/src/main/java/org/zalando/nakadi/service/AvroSchema.java @@ -31,7 +31,10 @@ public class AvroSchema { private static final Collection INTERNAL_EVENT_TYPE_NAMES = Set.of( METADATA_KEY, KPIEventTypes.ACCESS_LOG, - KPIEventTypes.BATCH_PUBLISHED); + KPIEventTypes.BATCH_PUBLISHED, + KPIEventTypes.DATA_STREAMED, + KPIEventTypes.EVENT_TYPE_LOG, + KPIEventTypes.SUBSCRIPTION_LOG); private final Map> eventTypeSchema; private final AvroMapper avroMapper; diff --git a/core-common/src/main/resources/event-type-schema/nakadi.data.streamed/nakadi.data.streamed.0.avsc b/core-common/src/main/resources/event-type-schema/nakadi.data.streamed/nakadi.data.streamed.0.avsc new file mode 100644 index 0000000000..ec75998b3a --- /dev/null +++ b/core-common/src/main/resources/event-type-schema/nakadi.data.streamed/nakadi.data.streamed.0.avsc @@ -0,0 +1,51 @@ +{ + "name": "nakadi.data.streamed", + "type": "record", + "doc": "Stores KPI events of type nakadi.data.streamed", + "fields": [ + { + "name": "api", + "description": "This field indicates if data was streamed through low level api or high level api", + "type": "string" + }, + { + "name": "subscription", + "description": "this is optional and is only present when streaming from a subscription", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "event_type", + "type": "string" + }, + { + "name": "app", + "type": "string" + }, + { + "name": "app_hashed", + "type": "string" + }, + { + "name": "token_realm", + "type": "string" + }, + { + "name": "number_of_events", + "type": "long" + }, + { + "name": "bytes_streamed", + "description": "amount of bytes streamed since last event", + "type": "long" + }, + { + "name": "batches_streamed", + "description": "amount of batches streamed since last event", + "type": "int" + } + ] +} diff --git a/core-common/src/main/resources/event-type-schema/nakadi.event.type.log/nakadi.event.type.log.0.avsc b/core-common/src/main/resources/event-type-schema/nakadi.event.type.log/nakadi.event.type.log.0.avsc new file mode 100644 index 0000000000..cbaf7bb5f6 --- /dev/null +++ b/core-common/src/main/resources/event-type-schema/nakadi.event.type.log/nakadi.event.type.log.0.avsc @@ -0,0 +1,27 @@ +{ + "name": "nakadi.event.type.log", + "type": "record", + "doc": "Stores KPI events of type nakadi.event.type.log", + "fields": [ + { + "name": "event_type", + "type": "string" + }, + { + "name": "status", + "type": "string" + }, + { + "name": "category", + "type": "string" + }, + { + "name": "authz", + "type": "string" + }, + { + "name": "compatibility_mode", + "type": "string" + } + ] +} diff --git a/core-common/src/main/resources/event-type-schema/nakadi.subscription.log/nakadi.subscription.log.0.avsc b/core-common/src/main/resources/event-type-schema/nakadi.subscription.log/nakadi.subscription.log.0.avsc new file mode 100644 index 0000000000..3fd8cc0fa7 --- /dev/null +++ b/core-common/src/main/resources/event-type-schema/nakadi.subscription.log/nakadi.subscription.log.0.avsc @@ -0,0 +1,15 @@ +{ + "name": "nakadi.subscription.log", + "type": "record", + "doc": "Stores KPI events of type nakadi.subscription.log", + "fields": [ + { + "name": "subscription_id", + "type": "string" + }, + { + "name": "status", + "type": "string" + } + ] +} diff --git a/core-common/src/test/java/org/zalando/nakadi/service/AvroSchemaTest.java b/core-common/src/test/java/org/zalando/nakadi/service/AvroSchemaTest.java new file mode 100644 index 0000000000..c3592f36be --- /dev/null +++ b/core-common/src/test/java/org/zalando/nakadi/service/AvroSchemaTest.java @@ -0,0 +1,40 @@ +package org.zalando.nakadi.service; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.avro.AvroMapper; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; +import org.springframework.core.io.DefaultResourceLoader; +import org.zalando.nakadi.config.KPIEventTypes; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +public class AvroSchemaTest { + private final AvroSchema avroSchema; + private final Map> allSchemas = Map.of( + AvroSchema.METADATA_KEY, List.of("0", "1"), + KPIEventTypes.ACCESS_LOG, List.of("0", "1"), + KPIEventTypes.BATCH_PUBLISHED, List.of("0"), + KPIEventTypes.DATA_STREAMED, List.of("0"), + KPIEventTypes.EVENT_TYPE_LOG, List.of("0"), + KPIEventTypes.SUBSCRIPTION_LOG, List.of("0") + ); + + public AvroSchemaTest() throws IOException { + final var eventTypeRes = new DefaultResourceLoader().getResource("event-type-schema/"); + avroSchema = new AvroSchema(new AvroMapper(), new ObjectMapper(), eventTypeRes); + } + + @Test + public void testAllSchemaVersionAreLoadable() { + allSchemas.entrySet().stream() + .flatMap(schemaEntry -> schemaEntry.getValue().stream() + .map(version -> Map.entry(schemaEntry.getKey(), version))) + .forEach(schemaEntry -> Assertions.assertDoesNotThrow( + () -> avroSchema.getEventTypeSchema(schemaEntry.getKey(), schemaEntry.getValue()), + "Schema of " + schemaEntry.getKey() + + ", version " + schemaEntry.getValue() + " unavailable")); + } +} diff --git a/core-common/src/test/resources/log4j.properties b/core-common/src/test/resources/log4j.properties new file mode 100644 index 0000000000..b0e049c640 --- /dev/null +++ b/core-common/src/test/resources/log4j.properties @@ -0,0 +1,25 @@ +log4j.rootCategory=INFO, CONSOLE + +LOG_PATTERN=[%d{yyyy-MM-dd HH:mm:ss.SSSXXX}] [%p] [%x] [%t] [%c] --- %m %throwable{compact} %n + +# CONSOLE is set to be a ConsoleAppender using a PatternLayout. +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +log4j.appender.CONSOLE.layout=org.apache.log4j.EnhancedPatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=${LOG_PATTERN} + +log4j.category.org.apache.catalina.startup.DigesterFactory=ERROR +log4j.category.org.apache.catalina.util.LifecycleBase=ERROR +log4j.category.org.apache.coyote.http11.Http11NioProtocol=WARN +log4j.category.org.apache.tomcat.util.net.NioSelectorPool=WARN +log4j.category.org.crsh.plugin=WARN +log4j.category.org.crsh.ssh=WARN +log4j.category.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR +log4j.category.org.hibernate.validator.internal.util.Version=WARN +log4j.category.org.springframework.boot.actuate.autoconfigure.CrshAutoConfiguration=WARN +log4j.category.org.springframework.boot.actuate.endpoint.jmx=WARN +log4j.category.org.thymeleaf=WARN +log4j.category.org.zalando.nakadi=DEBUG +log4j.category.org.zalando.nakadi.config=INFO +log4j.category.org.apache.kafka=WARN +log4j.category.org.zalando.nakadi.service.ClosedConnectionsCrutch=WARN +log4j.category.org.zalando.nakadi.repository.kafka.KafkaFactory$KafkaCrutchConsumer=WARN