Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Merge pull request #1385 from zalando/avro-schema-for-kpi-events
Browse files Browse the repository at this point in the history
Avro schema for kpi events
  • Loading branch information
thomasabraham authored Apr 1, 2022
2 parents d95baf9 + 751e491 commit fc62219
Show file tree
Hide file tree
Showing 7 changed files with 165 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,7 @@
public final class KPIEventTypes {
public static final String ACCESS_LOG = "nakadi.access.log";
public static final String BATCH_PUBLISHED = "nakadi.batch.published";
public static final String DATA_STREAMED = "nakadi.data.streamed";
public static final String EVENT_TYPE_LOG = "nakadi.event.type.log";
public static final String SUBSCRIPTION_LOG = "nakadi.subscription.log";
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ public class AvroSchema {
private static final Collection<String> INTERNAL_EVENT_TYPE_NAMES = Set.of(
METADATA_KEY,
KPIEventTypes.ACCESS_LOG,
KPIEventTypes.BATCH_PUBLISHED);
KPIEventTypes.BATCH_PUBLISHED,
KPIEventTypes.DATA_STREAMED,
KPIEventTypes.EVENT_TYPE_LOG,
KPIEventTypes.SUBSCRIPTION_LOG);

private final Map<String, TreeMap<String, Schema>> eventTypeSchema;
private final AvroMapper avroMapper;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
{
"name": "nakadi.data.streamed",
"type": "record",
"doc": "Stores KPI events of type nakadi.data.streamed",
"fields": [
{
"name": "api",
"description": "This field indicates if data was streamed through low level api or high level api",
"type": "string"
},
{
"name": "subscription",
"description": "this is optional and is only present when streaming from a subscription",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "event_type",
"type": "string"
},
{
"name": "app",
"type": "string"
},
{
"name": "app_hashed",
"type": "string"
},
{
"name": "token_realm",
"type": "string"
},
{
"name": "number_of_events",
"type": "long"
},
{
"name": "bytes_streamed",
"description": "amount of bytes streamed since last event",
"type": "long"
},
{
"name": "batches_streamed",
"description": "amount of batches streamed since last event",
"type": "int"
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
{
"name": "nakadi.event.type.log",
"type": "record",
"doc": "Stores KPI events of type nakadi.event.type.log",
"fields": [
{
"name": "event_type",
"type": "string"
},
{
"name": "status",
"type": "string"
},
{
"name": "category",
"type": "string"
},
{
"name": "authz",
"type": "string"
},
{
"name": "compatibility_mode",
"type": "string"
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"name": "nakadi.subscription.log",
"type": "record",
"doc": "Stores KPI events of type nakadi.subscription.log",
"fields": [
{
"name": "subscription_id",
"type": "string"
},
{
"name": "status",
"type": "string"
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package org.zalando.nakadi.service;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.avro.AvroMapper;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.springframework.core.io.DefaultResourceLoader;
import org.zalando.nakadi.config.KPIEventTypes;

import java.io.IOException;
import java.util.List;
import java.util.Map;

public class AvroSchemaTest {
private final AvroSchema avroSchema;
private final Map<String, List<String>> allSchemas = Map.of(
AvroSchema.METADATA_KEY, List.of("0", "1"),
KPIEventTypes.ACCESS_LOG, List.of("0", "1"),
KPIEventTypes.BATCH_PUBLISHED, List.of("0"),
KPIEventTypes.DATA_STREAMED, List.of("0"),
KPIEventTypes.EVENT_TYPE_LOG, List.of("0"),
KPIEventTypes.SUBSCRIPTION_LOG, List.of("0")
);

public AvroSchemaTest() throws IOException {
final var eventTypeRes = new DefaultResourceLoader().getResource("event-type-schema/");
avroSchema = new AvroSchema(new AvroMapper(), new ObjectMapper(), eventTypeRes);
}

@Test
public void testAllSchemaVersionAreLoadable() {
allSchemas.entrySet().stream()
.flatMap(schemaEntry -> schemaEntry.getValue().stream()
.map(version -> Map.entry(schemaEntry.getKey(), version)))
.forEach(schemaEntry -> Assertions.assertDoesNotThrow(
() -> avroSchema.getEventTypeSchema(schemaEntry.getKey(), schemaEntry.getValue()),
"Schema of " + schemaEntry.getKey()
+ ", version " + schemaEntry.getValue() + " unavailable"));
}
}
25 changes: 25 additions & 0 deletions core-common/src/test/resources/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
log4j.rootCategory=INFO, CONSOLE

LOG_PATTERN=[%d{yyyy-MM-dd HH:mm:ss.SSSXXX}] [%p] [%x] [%t] [%c] --- %m %throwable{compact} %n

# CONSOLE is set to be a ConsoleAppender using a PatternLayout.
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.layout=org.apache.log4j.EnhancedPatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=${LOG_PATTERN}

log4j.category.org.apache.catalina.startup.DigesterFactory=ERROR
log4j.category.org.apache.catalina.util.LifecycleBase=ERROR
log4j.category.org.apache.coyote.http11.Http11NioProtocol=WARN
log4j.category.org.apache.tomcat.util.net.NioSelectorPool=WARN
log4j.category.org.crsh.plugin=WARN
log4j.category.org.crsh.ssh=WARN
log4j.category.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
log4j.category.org.hibernate.validator.internal.util.Version=WARN
log4j.category.org.springframework.boot.actuate.autoconfigure.CrshAutoConfiguration=WARN
log4j.category.org.springframework.boot.actuate.endpoint.jmx=WARN
log4j.category.org.thymeleaf=WARN
log4j.category.org.zalando.nakadi=DEBUG
log4j.category.org.zalando.nakadi.config=INFO
log4j.category.org.apache.kafka=WARN
log4j.category.org.zalando.nakadi.service.ClosedConnectionsCrutch=WARN
log4j.category.org.zalando.nakadi.repository.kafka.KafkaFactory$KafkaCrutchConsumer=WARN

0 comments on commit fc62219

Please sign in to comment.