diff --git a/src/main/java/com/purbon/kafka/topology/model/Topic.java b/src/main/java/com/purbon/kafka/topology/model/Topic.java index 2b621fa7..8397ca53 100644 --- a/src/main/java/com/purbon/kafka/topology/model/Topic.java +++ b/src/main/java/com/purbon/kafka/topology/model/Topic.java @@ -90,7 +90,8 @@ public Topic( dataType, config, appConfig, - appConfig.getTopicPrefixFormat()); + appConfig.getTopicPrefixFormat(), + null); } public Topic( @@ -100,7 +101,34 @@ public Topic( Optional dataType, Map config, Configuration appConfig) { - this(name, producers, consumers, dataType, config, appConfig, appConfig.getTopicPrefixFormat()); + this( + name, + producers, + consumers, + dataType, + config, + appConfig, + appConfig.getTopicPrefixFormat(), + null); + } + + public Topic( + String name, + List producers, + List consumers, + Optional dataType, + Map config, + Configuration appConfig, + String plan) { + this( + name, + producers, + consumers, + dataType, + config, + appConfig, + appConfig.getTopicPrefixFormat(), + plan); } public Topic( @@ -110,7 +138,8 @@ public Topic( Optional dataType, Map config, Configuration appConfig, - String topicNamePattern) { + String topicNamePattern, + String plan) { this.name = name; this.dlqPrefix = ""; // this topic is not a dlq topic this.producers = producers; @@ -130,6 +159,8 @@ public Topic( } subjectNameStrategy = Optional.empty(); this.topicNamePattern = topicNamePattern; + + this.plan = plan; } public SubjectNameStrategy getSubjectNameStrategy() { @@ -219,7 +250,8 @@ public Topic clone() { getDataType(), getConfig(), getAppConfig(), - topicNamePattern); + topicNamePattern, + getPlan()); } } } diff --git a/src/main/java/com/purbon/kafka/topology/serdes/TopicCustomDeserializer.java b/src/main/java/com/purbon/kafka/topology/serdes/TopicCustomDeserializer.java index 34a1a20e..658314fc 100644 --- a/src/main/java/com/purbon/kafka/topology/serdes/TopicCustomDeserializer.java +++ b/src/main/java/com/purbon/kafka/topology/serdes/TopicCustomDeserializer.java @@ -90,7 +90,15 @@ public Topic deserialize(JsonParser parser, DeserializationContext context) thro "Topic \"" + name + "\" references non-existing plan \"" + planLabel + "\""); } }); - Topic topic = new Topic(name, producers, consumers, optionalDataType, config, this.config); + Topic topic = + new Topic( + name, + producers, + consumers, + optionalDataType, + config, + this.config, + optionalPlanLabel.isPresent() ? optionalPlanLabel.get().asText() : null); Optional subjectNameStrategy = Optional.ofNullable(rootNode.get("subject.name.strategy")) diff --git a/src/test/java/com/purbon/kafka/topology/TopologyObjectBuilderTest.java b/src/test/java/com/purbon/kafka/topology/TopologyObjectBuilderTest.java index 43bc514f..d1fd09bc 100644 --- a/src/test/java/com/purbon/kafka/topology/TopologyObjectBuilderTest.java +++ b/src/test/java/com/purbon/kafka/topology/TopologyObjectBuilderTest.java @@ -130,6 +130,9 @@ public void testConfigUpdateWhenUsingCustomPlans() throws IOException { config.put("bar", "3"); assertThat(topic.getConfig()).containsAllEntriesOf(config); + // should include the name of the plan for validation uses + assertThat(topic.getPlan()).isEqualTo("gold"); + // should respect values from the original config if not present in the plan description topic = map.get("barFoo"); assertThat(topic.getConfig()).containsEntry("replication.factor", "1");