Skip to content

Commit f5fbb0c

Browse files
authoredOct 10, 2024
Example to use Testcontainers to test Kafka and Timeplus (#1)
* first version of the testcontainers example * fix bullet point number * remove imports * Update KafkaPrimalityRouterTest.java * Create README.md
1 parent 9cc2b21 commit f5fbb0c

File tree

6 files changed

+730
-0
lines changed

6 files changed

+730
-0
lines changed
 

‎.gitignore

+2
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,4 @@
11

22
.DS_Store
3+
.gradle
4+
build

‎testcontainers/README.md

+396
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,396 @@
1+
# Automation with Testcontainers
2+
3+
[Testcontainers](https://github.com/testcontainers/testcontainers-java) is a Java library that supports [JUnit](https://junit.org/) tests, providing lightweight, throwaway instances of common databases (including Timeplus), message brokers, or anything else that can run in a Docker container. You don't need to write docker-compose files, or call the Docker API, or mock the infrastructure by yourself. Simply run your test code, containers will be created and deleted automatically.
4+
5+
You can run any Docker images in Testcontainers. But starting from [Testcontainers v1.20.2](https://github.com/testcontainers/testcontainers-java/releases/tag/1.20.2), Timeplus module is supported out of the box. You can start an instance of Timeplus via:
6+
7+
```java
8+
TimeplusContainer timeplus = new TimeplusContainer("timeplus/timeplusd:2.3.31";)
9+
timeplus.start();
10+
```
11+
12+
Compared to a [GenericContainer](https://java.testcontainers.org/features/creating_container/), you can call various methods on [TimeplusContainer](https://github.com/testcontainers/testcontainers-java/blob/main/modules/timeplus/src/main/java/org/testcontainers/timeplus/TimeplusContainer.java), such as setting the username, password, or run SQL queries with the default JDBC driver.
13+
14+
In this tutorial, we will walk through how to setup Apache Kafka and Timeplus via Testcontainers for Java, create a few Kafka topics, generate data, and apply streaming ETL and routing, and finally tear down the services. With [the GraalVM based native Kafka image](https://hub.docker.com/r/apache/kafka-native) and C++ based Timeplus, all of these operations can be done within 4 to 5 seconds.
15+
16+
The source code of the tutorial is available at [GitHub](https://github.com/timeplus-io/examples/tree/main/testcontainers). The data generator and demo scenario are inspired by Confluent's blog: [How to integration test a Kafka application with a native (non-JVM) Kafka binary with Testcontainers](https://developer.confluent.io/confluent-tutorials/kafka-native-testcontainers/kafka/)
17+
18+
## Set up the dependencies
19+
20+
We will be using [Gradle](https://gradle.org/), a popular open source build system for Java ecosystems. In the build.gradle file, we load the required dependencies, namly:
21+
22+
```gradle
23+
dependencies {
24+
implementation 'org.apache.kafka:kafka-clients:3.8.0'
25+
testImplementation 'org.testcontainers:kafka:1.20.2'
26+
testImplementation 'org.testcontainers:timeplus:1.20.2'
27+
testRuntimeOnly 'com.timeplus:timeplus-native-jdbc:2.0.4'
28+
}
29+
```
30+
31+
Please load 1.20.2 or newer version of Testcontainers Kafka and Timeplus modules. If you need to run SQL to query Timeplus in your test code, please load the Timeplus JDBC driver via `com.timeplus:timeplus-native-jdbc:2.0.4` .
32+
33+
## Write test code
34+
35+
You don't need to write any code in the `src/main/java` folder since this tutorial only focuses on test automation. Create a Java source code under `src/test/java` folder, and use `org.junit.jupiter.api.Test` annotation to mark one function as a test case.
36+
37+
```java
38+
package com.timeplus.examples;
39+
40+
import org.junit.jupiter.api.Test;
41+
42+
public class KafkaPrimalityRouterTest {
43+
44+
@Test
45+
public void testPrimalityRouter() {
46+
..
47+
}
48+
```
49+
50+
### Start the Kafka container
51+
52+
As one of the best practices, you can start the Kafka test container in a try block, so that the container will be teared down automatically.
53+
54+
```java
55+
@Test
56+
public void testPrimalityRouter() {
57+
try (
58+
Network network = Network.newNetwork();
59+
KafkaContainer kafka = new KafkaContainer(
60+
"apache/kafka-native:3.8.0"
61+
)
62+
.withListener("kafka:19092")
63+
.withNetwork(network);
64+
) {
65+
// Step 1: start Apache Kafka (we will start Timeplus container when data is ready)
66+
kafka.start();
67+
```
68+
69+
Please note, we create a network object and set a listener for the Kafka broker, so that our Timeplus instance can access to the Kafka broker without hardcoding the IP address.
70+
71+
### Create Kafka topics
72+
73+
We will be using multiple Kafka topics. One topic for the input, literally `input-topic` . If the number is a prime, put it in the `primes` topic. If the number is not a prime, put it in the `composites` topic. If the data is not a number, which is not supposed to happen, put the data in the `dlq` topic (Dead Letter Queue), which is a common practice to handle dirty data or retries.
74+
75+
```mermaid
76+
flowchart LR
77+
input-topic --> primes
78+
input-topic --> composites
79+
input-topic --> dlq
80+
```
81+
82+
There could be many ways to create Kafka topics, such as using Shell command or using tools like kcat. Since we are writing Java code, the easiest solution will be using Kafka AdminClient to create the topics programmatically.
83+
84+
```java
85+
// Step 2: create topics
86+
try (
87+
var admin = AdminClient.create(
88+
Map.of(
89+
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
90+
kafka.getBootstrapServers()
91+
)
92+
)
93+
) {
94+
admin.createTopics(
95+
List.of(
96+
new NewTopic(INPUT_TOPIC, 1, (short) 1),
97+
new NewTopic(PRIME_TOPIC, 1, (short) 1),
98+
new NewTopic(COMPOSITE_TOPIC, 1, (short) 1),
99+
new NewTopic(DLQ_TOPIC, 1, (short) 1)
100+
)
101+
);
102+
}
103+
```
104+
105+
Since we are running the Kafka broker without authentication, you can retrieve the bootstrap URLs with Testcontainers `kafka.getBootstrapServers()` method, then use AdminClient to create 4 topics with 1 partition and 1 replica.
106+
107+
### Put 1..100 to the input topic
108+
109+
For the happy path testing, we will generate numbers from 1 to 100 to the input topic.
110+
111+
```java
112+
// Step 3.1: produce 100 ints that should go to prime / composite topics
113+
try (
114+
final Producer<Integer, Integer> producer = buildProducer(
115+
kafka.getBootstrapServers(),
116+
StringSerializer.class
117+
)
118+
) {
119+
for (int i = 1; i <= 100; i++) {
120+
ProducerRecord record = new ProducerRecord<>(
121+
INPUT_TOPIC,
122+
"" + i,
123+
"" + i
124+
);
125+
producer.send(record);
126+
}
127+
}
128+
...
129+
protected static Producer buildProducer(
130+
String bootstrapServers,
131+
Class serializerClass
132+
) {
133+
final Properties producerProps = new Properties() {
134+
{
135+
put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
136+
put(
137+
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
138+
serializerClass
139+
);
140+
put(
141+
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
142+
serializerClass
143+
);
144+
}
145+
};
146+
147+
return new KafkaProducer(producerProps);
148+
}
149+
```
150+
151+
The number will be saved as a string as both key and value for the Kafka message.
152+
153+
### Put a dirty message in the input topic
154+
155+
We will also put our favorite `hello world` string in the input topic to test our error handling.
156+
157+
```java
158+
// Step 3.2: produce strings to test DLQ routing
159+
try (
160+
final Producer<String, String> producer = buildProducer(
161+
kafka.getBootstrapServers(),
162+
StringSerializer.class
163+
)
164+
) {
165+
ProducerRecord record = new ProducerRecord<>(
166+
INPUT_TOPIC,
167+
"hello",
168+
"world"
169+
);
170+
producer.send(record);
171+
}
172+
```
173+
174+
### Start Timeplus container
175+
176+
Now let's start the Timeplus container to process the data and validate the data pipeline.
177+
178+
```java
179+
// Step 4: start Timeplus container and run init.sql to create ETL pipelines
180+
TimeplusContainer timeplus = new TimeplusContainer(
181+
"timeplus/timeplusd:2.3.31"
182+
)
183+
.withNetwork(network)
184+
.withInitScript("init.sql"); // inside src/test/resources
185+
timeplus.start();
186+
```
187+
188+
Please note
189+
190+
1. `timeplus/timeplusd:2.3.31` is the latest build as the time of writing. Feel free to change it to a newer version, if available.
191+
192+
2. We attach the same network to the Timeplus container, so that they can access to each other.
193+
194+
3. `withInitScript` method is called to setup Timeplus
195+
196+
### Define stream processing logic in Timeplus
197+
198+
We will create a `init.sql` file under `src/test/resources`, so that the file is available at runtime classpath.
199+
200+
#### Create Kafka external streams
201+
202+
First we will create 4 [Kafka External Streams](https://docs.timeplus.com/proton-kafka) so that we can read data and write data.
203+
204+
```sql
205+
CREATE EXTERNAL STREAM input(raw string)
206+
SETTINGS type='kafka', brokers='kafka:19092',topic='input-topic';
207+
208+
CREATE EXTERNAL STREAM primes(raw string,_tp_message_key string default raw)
209+
SETTINGS type='kafka', brokers='kafka:19092',topic='primes';
210+
211+
CREATE EXTERNAL STREAM composites(raw string,_tp_message_key string default raw)
212+
SETTINGS type='kafka', brokers='kafka:19092',topic='composites';
213+
214+
CREATE EXTERNAL STREAM dlq(raw string)
215+
SETTINGS type='kafka', brokers='kafka:19092',topic='dlq';
216+
```
217+
218+
Please note:
219+
220+
1. `brokers` are set to 'kafka:19092', since Kafka and Timeplus containers are running in the same docker network.
221+
222+
2. Kafka External Stream in Timeplus is bi-directional. You can read Kafka data by `SELECT.. FROM` or write data to Kafka via `INSERT INTO` or Materialized View target stream.
223+
224+
3. For `primes` and `composites` external streams, we also defined the [_tp_message_key](https://docs.timeplus.com/proton-kafka#_tp_message_key) virtual column to write the message key. The default key value is set to the message value.
225+
226+
#### Create a JavaScript UDF to check prime numbers
227+
228+
There is no built-in SQL functions to determine whether a number is a prime or not. This can be done via defining a [JavaScript UDF](https://docs.timeplus.com/js-udf) in Timeplus, as following:
229+
230+
```sql
231+
CREATE FUNCTION is_prime(values int8)
232+
RETURNS bool
233+
LANGUAGE JAVASCRIPT AS $$
234+
function _check_prime(num, limit){
235+
for (let start = 3; start <= limit; start += 2) {
236+
if (0 === num % start) {
237+
return false;
238+
}
239+
}
240+
return num > 1;
241+
};
242+
function is_prime(values) {
243+
var bools=[]
244+
for(let i=0;i<values.length;i++) {
245+
var number=values[i];
246+
bools.push(number === 2 || number % 2 !== 0 && _check_prime(number, Math.sqrt(number)));
247+
}
248+
return bools;
249+
}
250+
$$;
251+
```
252+
253+
Please note:
254+
255+
1. the input type is a `int8` or a small int, since our test data will be from 1 to 100.
256+
257+
2. the return type is a `bool`, either `true` or `false`
258+
259+
3. To improve performance, Timeplus will batch the input values. That's why in the `is_prime` method, the `values` is an array of `int8`. You can check whether they are prime and put the result as an array of `bool`.
260+
261+
4. The algorithm of how to check the prime is based on [the discussion on stackoverflow](https://stackoverflow.com/questions/17389350/detecting-prime-numbers-in-javascript)
262+
263+
#### Create materialized views to process data
264+
265+
Materialized Views in Timeplus are long-running queries. It leverage the full power of streaming SQL reading from any number of sources, versus just acting on the block of data inserted into a source ClickHouse table.
266+
267+
Fist let's handle the case if the input value is not a number, this can be done via the following SQL:
268+
269+
```sql
270+
CREATE MATERIALIZED VIEW mv_dlq INTO dlq AS
271+
SELECT raw FROM input WHERE _tp_time>earliest_ts() AND to_int8_or_zero(raw)=0;
272+
```
273+
274+
Please note:
275+
276+
1. This materialized view reads data from `input-topic` via the `input` Kafka External Stream. We set `_tp_time>earliest_ts()` in the `WHERE` clause so that all existing messages in the Kafka topic will be read.
277+
278+
2. `to_int8_or_zero(string)` is a built-in SQL function to parse the string as a `int8`. If it fails, the function will return 0. Then we put the data into the `dlq` topic.
279+
280+
3. `INTO dlq` informs the materialized view to send results to the specific target, instead of using the materialized view internal storage.
281+
282+
Similarly, we can define other Materialized Views to check whether the numbers are prime or not, then send them to correspondingly topics.
283+
284+
```sql
285+
CREATE MATERIALIZED VIEW mv_prime INTO primes AS
286+
SELECT raw FROM input WHERE _tp_time>earliest_ts() AND to_int8_or_zero(raw)>0 AND is_prime(to_int8_or_zero(raw));
287+
288+
CREATE MATERIALIZED VIEW mv_not_prime INTO composites AS
289+
SELECT raw FROM input WHERE _tp_time>earliest_ts() AND to_int8_or_zero(raw)>0 AND NOT is_prime(to_int8_or_zero(raw));
290+
```
291+
292+
The `is_prime` UDF is called after making sure the input value is a number.
293+
294+
### Validate the processing logic
295+
296+
Now back to the test code. We will create a Kafka Java Consumer to read from both `primes` and `composites` topics, to make sure
297+
298+
1. Each message in `primes` topic is a prime, etc.
299+
300+
2. Totally 100 messages in both `primes`and`composites` topics. No more, no less.
301+
302+
3. In the Dead Letter Queue topic, there is a `hello world` message.
303+
304+
We will call the `buildConsumer` to set up a Kafka Java Consumer with `StringDeserializer`, and use JUnit assert.. methods to validate the results.
305+
306+
```java
307+
// Step 5: validate prime / composite routing
308+
try (
309+
final Consumer<String, String> consumer = buildConsumer(
310+
kafka.getBootstrapServers(),
311+
"test-group-id",
312+
StringDeserializer.class
313+
)
314+
) {
315+
consumer.subscribe(List.of(PRIME_TOPIC, COMPOSITE_TOPIC));
316+
317+
int numConsumed = 0;
318+
for (int i = 0; i < 10 && numConsumed < 100; i++) {
319+
final ConsumerRecords<String, String> consumerRecords =
320+
consumer.poll(Duration.ofSeconds(5));
321+
numConsumed += consumerRecords.count();
322+
323+
for (ConsumerRecord<
324+
String,
325+
String
326+
> record : consumerRecords) {
327+
int key = Integer.parseInt(record.key());
328+
String expectedTopic = isPrime(key)
329+
? PRIME_TOPIC
330+
: COMPOSITE_TOPIC;
331+
assertEquals(expectedTopic, record.topic());
332+
}
333+
}
334+
assertEquals(100, numConsumed);
335+
336+
// make sure no more events show up in prime / composite topics
337+
assertEquals(0, consumer.poll(Duration.ofMillis(200)).count());
338+
}
339+
340+
// valdate DLQ routing
341+
try (
342+
final Consumer<String, String> dlqConsumer = buildConsumer(
343+
kafka.getBootstrapServers(),
344+
"test-group-id",
345+
StringDeserializer.class
346+
)
347+
) {
348+
dlqConsumer.subscribe(List.of(DLQ_TOPIC));
349+
350+
int numConsumed = 0;
351+
for (int i = 0; i < 10 && numConsumed < 1; i++) {
352+
final ConsumerRecords<String, String> consumerRecords =
353+
dlqConsumer.poll(Duration.ofSeconds(5));
354+
numConsumed += consumerRecords.count();
355+
356+
for (ConsumerRecord<
357+
String,
358+
String
359+
> record : consumerRecords) {
360+
assertEquals("world", record.value());
361+
}
362+
}
363+
assertEquals(1, numConsumed);
364+
365+
// make sure no more events show up in DLQ topic
366+
assertEquals(
367+
0,
368+
dlqConsumer.poll(Duration.ofMillis(200)).count()
369+
);
370+
}
371+
```
372+
373+
### Tear down Kafka and Timeplus containers
374+
375+
At the end of the test code, tear down Timeplus first then Kafka.
376+
377+
```java
378+
timeplus.stop();
379+
kafka.stop();
380+
```
381+
382+
## Run the test
383+
384+
Just run `gradle test` in the `testcontainers` folder, if you have the Gradle CLI configured.
385+
386+
```
387+
gradle test
388+
389+
> Task :test
390+
..
391+
392+
BUILD SUCCESSFUL in 4s
393+
4 actionable tasks: 1 executed, 3 up-to-date
394+
```
395+
396+
It's amazing that only 4 seconds are spent to start Kafka and Timeplus containers, create topics, send test data, setup & verify the data pipeline, and tear down everything.

‎testcontainers/build.gradle

+43
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
plugins {
2+
id "java"
3+
id "com.diffplug.spotless" version "7.0.0.BETA2"
4+
}
5+
6+
repositories {
7+
mavenCentral()
8+
}
9+
10+
java {
11+
sourceCompatibility = JavaVersion.VERSION_17
12+
targetCompatibility = JavaVersion.VERSION_17
13+
}
14+
15+
16+
dependencies {
17+
implementation 'org.slf4j:slf4j-simple:2.0.7'
18+
implementation 'org.apache.kafka:kafka-clients:3.8.0'
19+
implementation 'org.apache.commons:commons-math3:3.6.1'
20+
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.9.2'
21+
testImplementation 'org.testcontainers:kafka:1.20.2'
22+
testImplementation 'org.testcontainers:timeplus:1.20.2'
23+
testRuntimeOnly 'com.timeplus:timeplus-native-jdbc:2.0.4'
24+
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
25+
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.9.2'
26+
}
27+
28+
test {
29+
useJUnitPlatform()
30+
31+
testLogging {
32+
outputs.upToDateWhen { false }
33+
showStandardStreams = true
34+
exceptionFormat = "full"
35+
}
36+
}
37+
38+
spotless {
39+
java {
40+
importOrder()
41+
removeUnusedImports()
42+
}
43+
}

‎testcontainers/settings.gradle

+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
/*
2+
* This file was generated by the Gradle 'init' task.
3+
*
4+
* The settings file is used to specify which projects to include in your build.
5+
*
6+
* Detailed information about configuring a multi-project build in Gradle can be found
7+
* in the user manual at https://docs.gradle.org/6.7.1/userguide/multi_project_builds.html
8+
*/
9+
10+
rootProject.name = 'testcontainers'
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
package com.timeplus.examples;
2+
3+
import static org.apache.commons.math3.primes.Primes.isPrime;
4+
import static org.junit.jupiter.api.Assertions.assertEquals;
5+
import static org.junit.jupiter.api.Assertions.fail;
6+
7+
import java.time.Duration;
8+
import java.util.List;
9+
import java.util.Map;
10+
import java.util.Properties;
11+
import org.apache.kafka.clients.admin.AdminClient;
12+
import org.apache.kafka.clients.admin.AdminClientConfig;
13+
import org.apache.kafka.clients.admin.NewTopic;
14+
import org.apache.kafka.clients.consumer.Consumer;
15+
import org.apache.kafka.clients.consumer.ConsumerConfig;
16+
import org.apache.kafka.clients.consumer.ConsumerRecord;
17+
import org.apache.kafka.clients.consumer.ConsumerRecords;
18+
import org.apache.kafka.clients.consumer.KafkaConsumer;
19+
import org.apache.kafka.clients.producer.KafkaProducer;
20+
import org.apache.kafka.clients.producer.Producer;
21+
import org.apache.kafka.clients.producer.ProducerConfig;
22+
import org.apache.kafka.clients.producer.ProducerRecord;
23+
import org.apache.kafka.common.serialization.StringDeserializer;
24+
import org.apache.kafka.common.serialization.StringSerializer;
25+
import org.junit.jupiter.api.Test;
26+
import org.testcontainers.containers.Network;
27+
import org.testcontainers.kafka.KafkaContainer;
28+
import org.testcontainers.timeplus.TimeplusContainer;
29+
30+
public class KafkaPrimalityRouterTest {
31+
32+
protected static final String INPUT_TOPIC = "input-topic";
33+
protected static final String PRIME_TOPIC = "primes";
34+
protected static final String COMPOSITE_TOPIC = "composites";
35+
protected static final String DLQ_TOPIC = "dlq";
36+
37+
@Test
38+
public void testPrimalityRouter() {
39+
try (
40+
Network network = Network.newNetwork();
41+
KafkaContainer kafka = new KafkaContainer(
42+
"apache/kafka-native:3.8.0"
43+
)
44+
.withListener("kafka:19092")
45+
.withNetwork(network);
46+
) {
47+
// Step 1: start Apache Kafka (we will start Timeplus container when data is ready)
48+
kafka.start();
49+
50+
// Step 2: create topics
51+
try (
52+
var admin = AdminClient.create(
53+
Map.of(
54+
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
55+
kafka.getBootstrapServers()
56+
)
57+
)
58+
) {
59+
admin.createTopics(
60+
List.of(
61+
new NewTopic(INPUT_TOPIC, 1, (short) 1),
62+
new NewTopic(PRIME_TOPIC, 1, (short) 1),
63+
new NewTopic(COMPOSITE_TOPIC, 1, (short) 1),
64+
new NewTopic(DLQ_TOPIC, 1, (short) 1)
65+
)
66+
);
67+
}
68+
69+
// Step 3.1: produce 100 ints that should go to prime / composite topics
70+
try (
71+
final Producer<Integer, Integer> producer = buildProducer(
72+
kafka.getBootstrapServers(),
73+
StringSerializer.class
74+
)
75+
) {
76+
for (int i = 1; i <= 100; i++) {
77+
ProducerRecord record = new ProducerRecord<>(
78+
INPUT_TOPIC,
79+
"" + i,
80+
"" + i
81+
);
82+
producer.send(record);
83+
}
84+
}
85+
86+
// Step 3.2: produce strings to test DLQ routing
87+
try (
88+
final Producer<String, String> producer = buildProducer(
89+
kafka.getBootstrapServers(),
90+
StringSerializer.class
91+
)
92+
) {
93+
ProducerRecord record = new ProducerRecord<>(
94+
INPUT_TOPIC,
95+
"hello",
96+
"world"
97+
);
98+
producer.send(record);
99+
}
100+
101+
// Step 4: start Timeplus container and run init.sql to create ETL pipelines
102+
TimeplusContainer timeplus = new TimeplusContainer(
103+
"timeplus/timeplusd:2.3.31"
104+
)
105+
.withNetwork(network)
106+
.withInitScript("init.sql"); // inside src/test/resources
107+
timeplus.start();
108+
109+
// Step 5: validate prime / composite routing
110+
try (
111+
final Consumer<String, String> consumer = buildConsumer(
112+
kafka.getBootstrapServers(),
113+
"test-group-id",
114+
StringDeserializer.class
115+
)
116+
) {
117+
consumer.subscribe(List.of(PRIME_TOPIC, COMPOSITE_TOPIC));
118+
119+
int numConsumed = 0;
120+
for (int i = 0; i < 10 && numConsumed < 100; i++) {
121+
final ConsumerRecords<String, String> consumerRecords =
122+
consumer.poll(Duration.ofSeconds(5));
123+
numConsumed += consumerRecords.count();
124+
125+
for (ConsumerRecord<
126+
String,
127+
String
128+
> record : consumerRecords) {
129+
int key = Integer.parseInt(record.key());
130+
String expectedTopic = isPrime(key)
131+
? PRIME_TOPIC
132+
: COMPOSITE_TOPIC;
133+
assertEquals(expectedTopic, record.topic());
134+
}
135+
}
136+
assertEquals(100, numConsumed);
137+
138+
// make sure no more events show up in prime / composite topics
139+
assertEquals(0, consumer.poll(Duration.ofMillis(200)).count());
140+
}
141+
142+
// valdate DLQ routing
143+
try (
144+
final Consumer<String, String> dlqConsumer = buildConsumer(
145+
kafka.getBootstrapServers(),
146+
"test-group-id",
147+
StringDeserializer.class
148+
)
149+
) {
150+
dlqConsumer.subscribe(List.of(DLQ_TOPIC));
151+
152+
int numConsumed = 0;
153+
for (int i = 0; i < 10 && numConsumed < 1; i++) {
154+
final ConsumerRecords<String, String> consumerRecords =
155+
dlqConsumer.poll(Duration.ofSeconds(5));
156+
numConsumed += consumerRecords.count();
157+
158+
for (ConsumerRecord<
159+
String,
160+
String
161+
> record : consumerRecords) {
162+
assertEquals("world", record.value());
163+
}
164+
}
165+
assertEquals(1, numConsumed);
166+
167+
// make sure no more events show up in DLQ topic
168+
assertEquals(
169+
0,
170+
dlqConsumer.poll(Duration.ofMillis(200)).count()
171+
);
172+
}
173+
174+
timeplus.stop();
175+
kafka.stop();
176+
}
177+
}
178+
179+
/**
180+
* Helper to build a producer.
181+
*
182+
* @param bootstrapServers bootstrap servers endpoint
183+
* @param serializerClass serializer to use
184+
* @return Producer instance
185+
*/
186+
protected static Producer buildProducer(
187+
String bootstrapServers,
188+
Class serializerClass
189+
) {
190+
final Properties producerProps = new Properties() {
191+
{
192+
put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
193+
put(
194+
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
195+
serializerClass
196+
);
197+
put(
198+
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
199+
serializerClass
200+
);
201+
}
202+
};
203+
204+
return new KafkaProducer(producerProps);
205+
}
206+
207+
/**
208+
* Helper to build a consumer with auto.offset.reset set to earliest.
209+
*
210+
* @param bootstrapServers bootstrap servers endpoint
211+
* @param consumerGroupId consumer group ID
212+
* @param deserializerClass deseriaizer to use
213+
* @return Consumer instance
214+
*/
215+
protected static Consumer buildConsumer(
216+
String bootstrapServers,
217+
String consumerGroupId,
218+
Class deserializerClass
219+
) {
220+
final Properties consumerProps = new Properties() {
221+
{
222+
put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
223+
put(
224+
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
225+
deserializerClass
226+
);
227+
put(
228+
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
229+
deserializerClass
230+
);
231+
put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
232+
put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
233+
}
234+
};
235+
236+
return new KafkaConsumer<>(consumerProps);
237+
}
238+
}
+41
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
CREATE EXTERNAL STREAM input(raw string)
2+
SETTINGS type='kafka', brokers='kafka:19092',topic='input-topic';
3+
4+
CREATE EXTERNAL STREAM primes(raw string,_tp_message_key string default raw)
5+
SETTINGS type='kafka', brokers='kafka:19092',topic='primes';
6+
7+
CREATE EXTERNAL STREAM composites(raw string,_tp_message_key string default raw)
8+
SETTINGS type='kafka', brokers='kafka:19092',topic='composites';
9+
10+
CREATE EXTERNAL STREAM dlq(raw string)
11+
SETTINGS type='kafka', brokers='kafka:19092',topic='dlq';
12+
13+
CREATE FUNCTION is_prime(values int8)
14+
RETURNS bool
15+
LANGUAGE JAVASCRIPT AS $$
16+
function _check_prime(num, limit){
17+
for (let start = 3; start <= limit; start += 2) {
18+
if (0 === num % start) {
19+
return false;
20+
}
21+
}
22+
return num > 1;
23+
};
24+
function is_prime(values) {
25+
var bools=[]
26+
for(let i=0;i<values.length;i++) {
27+
var number=values[i];
28+
bools.push(number === 2 || number % 2 !== 0 && _check_prime(number, Math.sqrt(number)));
29+
}
30+
return bools;
31+
}
32+
$$;
33+
34+
CREATE MATERIALIZED VIEW mv_dlq INTO dlq AS
35+
SELECT raw FROM input WHERE _tp_time>earliest_ts() AND to_int8_or_zero(raw)=0;
36+
37+
CREATE MATERIALIZED VIEW mv_prime INTO primes AS
38+
SELECT raw FROM input WHERE _tp_time>earliest_ts() AND to_int8_or_zero(raw)>0 AND is_prime(to_int8_or_zero(raw));
39+
40+
CREATE MATERIALIZED VIEW mv_not_prime INTO composites AS
41+
SELECT raw FROM input WHERE _tp_time>earliest_ts() AND to_int8_or_zero(raw)>0 AND NOT is_prime(to_int8_or_zero(raw));

0 commit comments

Comments
 (0)
Please sign in to comment.