Skip to content

Commit

Permalink
[RND-247] Kafka to OpenSearch - MongoDB (#273)
Browse files Browse the repository at this point in the history
* Initial version of kafka to opensearch connector

* Creating meadowlark-net on docker file to handle communication

* Adding permissions to allow extract and delete executable

* Adding documentation

* Moving image to docker folder

* Updating debezium kafka images

* Fixing debezium package version to avoid using alpha versions

* Moving kafka to backend folder and follow same structure

* Add opensearch to same network

* Fix typo

* Adding json converter to example

* Checking for file integrity before downloading from github

* Changing value in example

* Fix path

* Simplifying opensearch sing config example

* Updating sink json example (writes data without information)

* Updating debezium mongodb configuration to insert json into kafka

* Updating opensearch sink example to write to OpenSearch

* Updating opensearch example

* Add initial kafka connect transform

* Rename project

* Add gradle build to dockerfile

* Removing duplicated license file

* Fix issue with escaped strings

* Adding console consumer documentation

* Fix typo

* Adding connect-meadowlark image

* Fixing order of transforms

* Adding documentation information
  • Loading branch information
andonyns authored Aug 4, 2023
1 parent 83ed101 commit c41fb11
Show file tree
Hide file tree
Showing 27 changed files with 1,515 additions and 117 deletions.
2 changes: 2 additions & 0 deletions Meadowlark-js/backends/meadowlark-kafka-stream/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
debezium-mongodb.json
opensearch_sink.json
Original file line number Diff line number Diff line change
@@ -1,23 +1,27 @@
{
"name": "DebeziumConnector",
"name": "mongo_kafka",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.name": "edfi",
"topic.prefix": "edfi",

"mongodb.connection.string": "<your-mongo-connection-string>",
"mongodb.user": "<admin-username-goes-here>",
"mongodb.password": "<admin-password-goes-here>",
"mongodb.hosts": "rs0/mongo1:27017,mongo2:27018,mongo3:27019",

"database.history.kafka.bootstrap.servers": "kafka:9092",

"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",

"predicates": "isTombstone",
"predicates.isTombstone.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone",

"transforms": "dropTombstone,extractDebeziumAfter",
"transforms": "dropTombstone, extractDebeziumAfter, outbox, extractId",
"transforms.outbox.type" : "io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter",
"transforms.outbox.collection.expand.json.payload" : "true",
"transforms.extractId.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractId.field": "id",
"transforms.dropTombstone.type": "org.apache.kafka.connect.transforms.Filter",
"transforms.dropTombstone.predicate": "isTombstone",
"transforms.extractDebeziumAfter.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
version: '3'
services:

# Zookeeper image from Debezium
zookeeper:
hostname: zookeeper1
container_name: zookeeper1
image: debezium/zookeeper:2.3@sha256:42b73151458b12c9dc1473807ee8369a917724278d6ec08d82702da8c46a9639
networks:
- meadowlark-net
ports:
- 2181:2181
- 2888:2888
- 3888:3888
volumes:
- zookeeper-logs:/var/lib/zookeeper/log
- zookeeper-data:/var/lib/zookeeper/data

# Kafka image from Debezium
kafka:
hostname: kafka1
container_name: kafka1
image: debezium/kafka:2.3@sha256:ffe34d457bff18de31c5ed695f22291680608d9758f041ae55310439224781cf
networks:
- meadowlark-net
ports:
- 9092:9092
links:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181

connect:
hostname: kafka-connect
container_name: kafka-connect
image: edfialliance/connect-meadowlark:2.3-1@sha256:6605d2f0ad1797ccf7e3f7a4dbe690bb0c9e198dd6a0d5720a7b170d0bc4ca95
ports:
- 8083:8083
networks:
- meadowlark-net
links:
- kafka
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=debezium_config
- OFFSET_STORAGE_TOPIC=debezium_offset
- STATUS_STORAGE_TOPIC=debezium_status

# Kafka Web UI - https://github.com/obsidiandynamics/kafdrop
kafdrop:
hostname: kafdrop
container_name: kafdrop
image: obsidiandynamics/kafdrop:3.31.0@sha256:f89f34f56e72188aa61b557866dbece57238a74c599e88105b200c2532bb804b
ports:
- 9000:9000
networks:
- meadowlark-net
environment:
KAFKA_BROKERCONNECT: kafka1:9092
JVM_OPTS: "-Xms32M -Xmx64M"
SERVER_SERVLET_CONTEXTPATH: "/"

volumes:
zookeeper-logs:
zookeeper-data:

networks:
meadowlark-net:
external: true
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
{
"name":"kafka_opensearch",
"config": {
"connector.class": "io.aiven.kafka.connect.opensearch.OpensearchSinkConnector",
"topics": "edfi.meadowlark.documents",
"type_name": "_doc",

"connection.url": "<your-opensearch-url>",
"connection.username": "<your-opensearch-username-goes-here>",
"connection.password": "<your-opensearch-password-goes-here>",

"tasks.max":"1",

"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",

"schema.ignore": "true",

"compact.map.entries": "true",

"transforms": "removeId, generateIndexFromResource",
"transforms.removeId.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.removeId.exclude": "_id",
"transforms.generateIndexFromResource.type":"com.github.edfiexchangeoss.meadowlark.kafka.connect.transforms.GenerateIndexFromResource",
"transforms.generateIndexFromResource.field.name":"projectName, resourceVersion, resourceName",

"behavior.on.version.conflict": "ignore"

}
}
71 changes: 71 additions & 0 deletions Meadowlark-js/backends/meadowlark-kafka-stream/docker/readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Using Kafka for Event processing

To setup with Debezium and connect to MongoDB and OpenSearch, run the `docker compose up -d`. Then execute the following
steps:

## Configure Debezium

The Debezium Kafka Connector must be configured with the MongoDB admin username and password to listen to MongoDB change
stream. To do this, copy the `debezium-mongodb.json.example` file to `debezium-mongodb.json`. Edit the json file and insert
the MongoDB admin username and password. Then send the configuration to the Debezium Kafka Connector:

Linux:

```bash
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
http://localhost:8083/connectors/ -d @debezium-mongodb.json
```

Windows:

```pwsh
Invoke-RestMethod -Method Post -InFile .\debezium-mongodb.json `
-uri http://localhost:8083/connectors/ -ContentType "application/json"
```

## Send Kafka Events to OpenSearch

The Debezium Kafka Connector must be configured with the OpenSearch admin username and password to send the data streams to opensearch. To do this, copy the `opensearch_sink.json.example` file to `opensearch_sink.json`. Edit the json file and insert
the connection username and password. Then send the configuration to the Debezium Kafka Connector:

Linux:

```bash
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
http://localhost:8083/connectors/ -d @opensearch_sink.json
```

Windows:

```pwsh
Invoke-RestMethod -Method Post -InFile .\opensearch_sink.json `
-uri http://localhost:8083/connectors/ -ContentType "application/json"
```

### Verify configuration

To check that connectors are running, execute:

```bash
curl http://localhost:8083/connector-plugins | jq .
```

```pwsh
Invoke-RestMethod http://localhost:8083/connector-plugins | ConvertTo-Json | ConvertFrom-Json
```

This returns the debezium connectors and the OpenSearch connector information.

### Browsing Kafka Topics and Messages

[Kafdrop](https://github.com/obsidiandynamics/kafdrop), a free Kafka Web UI, is
bundled with this deployment. Browse the Kafka instance with Kafdrop at
`http://localhost:9000/`.

### View Logs

To view logs and additional information, use the `kafka-console-consumer.sh` script inside kafka-connect:

```bash
docker exec -it kafka1 ./bin/kafka-console-consumer.sh --bootstrap-server 172.18.0.9:9092 --topic edfi.meadowlark.documents --from-beginning --max-messages 1 | jq .
```
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ services:
ports:
- 27017:27017
networks:
- mongo-net
- meadowlark-net
restart: always
command:
[
Expand Down Expand Up @@ -43,7 +43,7 @@ services:
ports:
- 27018:27018
networks:
- mongo-net
- meadowlark-net
restart: always
command:
[
Expand Down Expand Up @@ -76,7 +76,7 @@ services:
ports:
- 27019:27019
networks:
- mongo-net
- meadowlark-net
restart: always
command:
[
Expand All @@ -99,70 +99,6 @@ services:
- mongo-log3:/var/log/mongodb
- mongo-auth:/auth

# Zookeeper image from Debezium
zookeeper:
hostname: zookeeper1
container_name: zookeeper1
image: debezium/zookeeper:1.9
ports:
- 2181:2181
- 2888:2888
- 3888:3888
networks:
- mongo-net
volumes:
- zookeeper-logs:/var/lib/zookeeper/log
- zookeeper-data:/var/lib/zookeeper/data

# Kafka image from Debezium
kafka:
hostname: kafka1
container_name: kafka1
image: debezium/kafka:1.9
ports:
- 9092:9092
networks:
- mongo-net
links:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181

# Kafka Connect from Debezium, includes Debezium connectors
connect:
hostname: kafka-connect
container_name: kafka-connect
image: debezium/connect:1.9
ports:
- 8083:8083
networks:
- mongo-net
links:
- kafka
- mongo1
- mongo2
- mongo3
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=debezium_config
- OFFSET_STORAGE_TOPIC=debezium_offset
- STATUS_STORAGE_TOPIC=debezium_status

# Kafka Web UI - https://github.com/obsidiandynamics/kafdrop
kafdrop:
hostname: kafdrop
container_name: kafdrop
image: obsidiandynamics/kafdrop:3.30.0
ports:
- 9000:9000
networks:
- mongo-net
environment:
KAFKA_BROKERCONNECT: kafka1:9092
JVM_OPTS: "-Xms32M -Xmx64M"
SERVER_SERVLET_CONTEXTPATH: "/"

volumes:
mongo-data1:
mongo-log1:
Expand All @@ -172,9 +108,8 @@ volumes:
mongo-log3:
mongo-auth:
external: true
zookeeper-logs:
zookeeper-data:


networks:
mongo-net:
meadowlark-net:
name: meadowlark-net
28 changes: 0 additions & 28 deletions Meadowlark-js/backends/meadowlark-mongodb-backend/docker/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,31 +93,3 @@ connection string to use for this MongoDB replica set.
The logs for `mongo1`, `mongo2`, and `mongo3` can be found on docker volumes
`mongo-log1`, `mongo-log2`, and `mongo-log3` respectively. When not mounted to
an active container, they can be browsed via the VS Code Docker extension.

### Configure Debezium

The Debezium Kafka Connector must be configured with the MongoDB admin username
and password to listen to MongoDB change stream. To do this, copy the
`debezium-mongodb.json.example` file to `debezium-mongodb.json`. Edit the json
file and insert the MongoDB admin username and password. Then send the
configuration to the Debezium Kafka Connector:

Linux:

```bash
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
http://localhost:8083/connectors/ -d @debezium-mongodb.json
```

Windows:

```pwsh
Invoke-RestMethod -Method Post -InFile .\debezium-mongodb.json `
-uri http://localhost:8083/connectors/ -ContentType "application/json"
```

### Browsing Kafka Topics and Messages

[Kafdrop](https://github.com/obsidiandynamics/kafdrop), a free Kafka Web UI, is
bundled with this deployment. Browse the Kafka instance with Kafdrop at
`http://localhost:9000/`.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ services:
- 9200:9200
- 9600:9600 # required for Performance Analyzer
networks:
- opensearch-net
- meadowlark-net
restart: unless-stopped

opensearch-dashboards:
Expand All @@ -38,11 +38,13 @@ services:
OPENSEARCH_HOSTS: '["http://opensearch-node1:9200"]'
DISABLE_SECURITY_DASHBOARDS_PLUGIN: true # disables security dashboards plugin in OpenSearch Dashboards
networks:
- opensearch-net
- meadowlark-net
restart: unless-stopped

volumes:
opensearch-data1:

networks:
opensearch-net:
meadowlark-net:
external: true

Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,3 @@ Delete all indices:
```none
DELETE *
```

## Test dependency on older "docker-compose" versus newer "docker compose"

The integration tests use the testcontainers library to spin up an Opensearch instance. As of
Feb 2023, it uses the legacy "docker-compose" command from Compose V1. If tests fail
with "Error: spawn docker-compose ENOENT", you will need to either [install Compose V1
standalone](https://docs.docker.com/compose/install/other/) or `alias docker-compose='docker compose'`.
Loading

0 comments on commit c41fb11

Please sign in to comment.