diff --git a/Meadowlark-js/backends/meadowlark-kafka-stream/.gitignore b/Meadowlark-js/backends/meadowlark-kafka-stream/.gitignore new file mode 100644 index 00000000..baa92a73 --- /dev/null +++ b/Meadowlark-js/backends/meadowlark-kafka-stream/.gitignore @@ -0,0 +1,2 @@ +debezium-mongodb.json +opensearch_sink.json \ No newline at end of file diff --git a/Meadowlark-js/backends/meadowlark-mongodb-backend/docker/debezium-mongodb.json.example b/Meadowlark-js/backends/meadowlark-kafka-stream/docker/debezium-mongodb.json.example similarity index 52% rename from Meadowlark-js/backends/meadowlark-mongodb-backend/docker/debezium-mongodb.json.example rename to Meadowlark-js/backends/meadowlark-kafka-stream/docker/debezium-mongodb.json.example index 6a88a623..b4d05507 100644 --- a/Meadowlark-js/backends/meadowlark-mongodb-backend/docker/debezium-mongodb.json.example +++ b/Meadowlark-js/backends/meadowlark-kafka-stream/docker/debezium-mongodb.json.example @@ -1,23 +1,27 @@ { - "name": "DebeziumConnector", + "name": "mongo_kafka", "config": { "connector.class": "io.debezium.connector.mongodb.MongoDbConnector", "tasks.max": "1", - "mongodb.name": "edfi", + "topic.prefix": "edfi", + + "mongodb.connection.string": "", "mongodb.user": "", "mongodb.password": "", - "mongodb.hosts": "rs0/mongo1:27017,mongo2:27018,mongo3:27019", + "database.history.kafka.bootstrap.servers": "kafka:9092", - "key.converter": "org.apache.kafka.connect.json.JsonConverter", - "key.converter.schemas.enable": "false", - "value.converter": "org.apache.kafka.connect.json.JsonConverter", - "value.converter.schemas.enable": "false", + "key.converter": "org.apache.kafka.connect.storage.StringConverter", + "value.converter": "org.apache.kafka.connect.storage.StringConverter", "predicates": "isTombstone", "predicates.isTombstone.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone", - "transforms": "dropTombstone,extractDebeziumAfter", + "transforms": "dropTombstone, extractDebeziumAfter, outbox, extractId", + "transforms.outbox.type" : "io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter", + "transforms.outbox.collection.expand.json.payload" : "true", + "transforms.extractId.type": "org.apache.kafka.connect.transforms.ExtractField$Key", + "transforms.extractId.field": "id", "transforms.dropTombstone.type": "org.apache.kafka.connect.transforms.Filter", "transforms.dropTombstone.predicate": "isTombstone", "transforms.extractDebeziumAfter.type": "org.apache.kafka.connect.transforms.ExtractField$Value", diff --git a/Meadowlark-js/backends/meadowlark-kafka-stream/docker/docker-compose.yml b/Meadowlark-js/backends/meadowlark-kafka-stream/docker/docker-compose.yml new file mode 100644 index 00000000..fb8def9f --- /dev/null +++ b/Meadowlark-js/backends/meadowlark-kafka-stream/docker/docker-compose.yml @@ -0,0 +1,70 @@ +version: '3' +services: + + # Zookeeper image from Debezium + zookeeper: + hostname: zookeeper1 + container_name: zookeeper1 + image: debezium/zookeeper:2.3@sha256:42b73151458b12c9dc1473807ee8369a917724278d6ec08d82702da8c46a9639 + networks: + - meadowlark-net + ports: + - 2181:2181 + - 2888:2888 + - 3888:3888 + volumes: + - zookeeper-logs:/var/lib/zookeeper/log + - zookeeper-data:/var/lib/zookeeper/data + + # Kafka image from Debezium + kafka: + hostname: kafka1 + container_name: kafka1 + image: debezium/kafka:2.3@sha256:ffe34d457bff18de31c5ed695f22291680608d9758f041ae55310439224781cf + networks: + - meadowlark-net + ports: + - 9092:9092 + links: + - zookeeper + environment: + - ZOOKEEPER_CONNECT=zookeeper:2181 + + connect: + hostname: kafka-connect + container_name: kafka-connect + image: edfialliance/connect-meadowlark:2.3-1@sha256:6605d2f0ad1797ccf7e3f7a4dbe690bb0c9e198dd6a0d5720a7b170d0bc4ca95 + ports: + - 8083:8083 + networks: + - meadowlark-net + links: + - kafka + environment: + - BOOTSTRAP_SERVERS=kafka:9092 + - GROUP_ID=1 + - CONFIG_STORAGE_TOPIC=debezium_config + - OFFSET_STORAGE_TOPIC=debezium_offset + - STATUS_STORAGE_TOPIC=debezium_status + + # Kafka Web UI - https://github.com/obsidiandynamics/kafdrop + kafdrop: + hostname: kafdrop + container_name: kafdrop + image: obsidiandynamics/kafdrop:3.31.0@sha256:f89f34f56e72188aa61b557866dbece57238a74c599e88105b200c2532bb804b + ports: + - 9000:9000 + networks: + - meadowlark-net + environment: + KAFKA_BROKERCONNECT: kafka1:9092 + JVM_OPTS: "-Xms32M -Xmx64M" + SERVER_SERVLET_CONTEXTPATH: "/" + +volumes: + zookeeper-logs: + zookeeper-data: + +networks: + meadowlark-net: + external: true diff --git a/Meadowlark-js/backends/meadowlark-kafka-stream/docker/opensearch_sink.json.example b/Meadowlark-js/backends/meadowlark-kafka-stream/docker/opensearch_sink.json.example new file mode 100644 index 00000000..a84c4731 --- /dev/null +++ b/Meadowlark-js/backends/meadowlark-kafka-stream/docker/opensearch_sink.json.example @@ -0,0 +1,32 @@ +{ + "name":"kafka_opensearch", + "config": { + "connector.class": "io.aiven.kafka.connect.opensearch.OpensearchSinkConnector", + "topics": "edfi.meadowlark.documents", + "type_name": "_doc", + + "connection.url": "", + "connection.username": "", + "connection.password": "", + + "tasks.max":"1", + + "key.converter": "org.apache.kafka.connect.json.JsonConverter", + "key.converter.schemas.enable": "false", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + + "schema.ignore": "true", + + "compact.map.entries": "true", + + "transforms": "removeId, generateIndexFromResource", + "transforms.removeId.type": "org.apache.kafka.connect.transforms.ReplaceField$Value", + "transforms.removeId.exclude": "_id", + "transforms.generateIndexFromResource.type":"com.github.edfiexchangeoss.meadowlark.kafka.connect.transforms.GenerateIndexFromResource", + "transforms.generateIndexFromResource.field.name":"projectName, resourceVersion, resourceName", + + "behavior.on.version.conflict": "ignore" + + } +} diff --git a/Meadowlark-js/backends/meadowlark-kafka-stream/docker/readme.md b/Meadowlark-js/backends/meadowlark-kafka-stream/docker/readme.md new file mode 100644 index 00000000..e7dcc6d1 --- /dev/null +++ b/Meadowlark-js/backends/meadowlark-kafka-stream/docker/readme.md @@ -0,0 +1,71 @@ +# Using Kafka for Event processing + +To setup with Debezium and connect to MongoDB and OpenSearch, run the `docker compose up -d`. Then execute the following +steps: + +## Configure Debezium + +The Debezium Kafka Connector must be configured with the MongoDB admin username and password to listen to MongoDB change +stream. To do this, copy the `debezium-mongodb.json.example` file to `debezium-mongodb.json`. Edit the json file and insert +the MongoDB admin username and password. Then send the configuration to the Debezium Kafka Connector: + +Linux: + +```bash +curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \ + http://localhost:8083/connectors/ -d @debezium-mongodb.json +``` + +Windows: + +```pwsh +Invoke-RestMethod -Method Post -InFile .\debezium-mongodb.json ` + -uri http://localhost:8083/connectors/ -ContentType "application/json" +``` + +## Send Kafka Events to OpenSearch + +The Debezium Kafka Connector must be configured with the OpenSearch admin username and password to send the data streams to opensearch. To do this, copy the `opensearch_sink.json.example` file to `opensearch_sink.json`. Edit the json file and insert +the connection username and password. Then send the configuration to the Debezium Kafka Connector: + +Linux: + +```bash +curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \ + http://localhost:8083/connectors/ -d @opensearch_sink.json +``` + +Windows: + +```pwsh +Invoke-RestMethod -Method Post -InFile .\opensearch_sink.json ` + -uri http://localhost:8083/connectors/ -ContentType "application/json" +``` + +### Verify configuration + +To check that connectors are running, execute: + +```bash +curl http://localhost:8083/connector-plugins | jq . +``` + +```pwsh +Invoke-RestMethod http://localhost:8083/connector-plugins | ConvertTo-Json | ConvertFrom-Json +``` + +This returns the debezium connectors and the OpenSearch connector information. + +### Browsing Kafka Topics and Messages + +[Kafdrop](https://github.com/obsidiandynamics/kafdrop), a free Kafka Web UI, is +bundled with this deployment. Browse the Kafka instance with Kafdrop at +`http://localhost:9000/`. + +### View Logs + +To view logs and additional information, use the `kafka-console-consumer.sh` script inside kafka-connect: + +```bash +docker exec -it kafka1 ./bin/kafka-console-consumer.sh --bootstrap-server 172.18.0.9:9092 --topic edfi.meadowlark.documents --from-beginning --max-messages 1 | jq . +``` diff --git a/Meadowlark-js/backends/meadowlark-mongodb-backend/docker/docker-compose.yml b/Meadowlark-js/backends/meadowlark-mongodb-backend/docker/docker-compose.yml index b8f32222..fc48f162 100644 --- a/Meadowlark-js/backends/meadowlark-mongodb-backend/docker/docker-compose.yml +++ b/Meadowlark-js/backends/meadowlark-mongodb-backend/docker/docker-compose.yml @@ -11,7 +11,7 @@ services: ports: - 27017:27017 networks: - - mongo-net + - meadowlark-net restart: always command: [ @@ -43,7 +43,7 @@ services: ports: - 27018:27018 networks: - - mongo-net + - meadowlark-net restart: always command: [ @@ -76,7 +76,7 @@ services: ports: - 27019:27019 networks: - - mongo-net + - meadowlark-net restart: always command: [ @@ -99,70 +99,6 @@ services: - mongo-log3:/var/log/mongodb - mongo-auth:/auth - # Zookeeper image from Debezium - zookeeper: - hostname: zookeeper1 - container_name: zookeeper1 - image: debezium/zookeeper:1.9 - ports: - - 2181:2181 - - 2888:2888 - - 3888:3888 - networks: - - mongo-net - volumes: - - zookeeper-logs:/var/lib/zookeeper/log - - zookeeper-data:/var/lib/zookeeper/data - - # Kafka image from Debezium - kafka: - hostname: kafka1 - container_name: kafka1 - image: debezium/kafka:1.9 - ports: - - 9092:9092 - networks: - - mongo-net - links: - - zookeeper - environment: - - ZOOKEEPER_CONNECT=zookeeper:2181 - - # Kafka Connect from Debezium, includes Debezium connectors - connect: - hostname: kafka-connect - container_name: kafka-connect - image: debezium/connect:1.9 - ports: - - 8083:8083 - networks: - - mongo-net - links: - - kafka - - mongo1 - - mongo2 - - mongo3 - environment: - - BOOTSTRAP_SERVERS=kafka:9092 - - GROUP_ID=1 - - CONFIG_STORAGE_TOPIC=debezium_config - - OFFSET_STORAGE_TOPIC=debezium_offset - - STATUS_STORAGE_TOPIC=debezium_status - - # Kafka Web UI - https://github.com/obsidiandynamics/kafdrop - kafdrop: - hostname: kafdrop - container_name: kafdrop - image: obsidiandynamics/kafdrop:3.30.0 - ports: - - 9000:9000 - networks: - - mongo-net - environment: - KAFKA_BROKERCONNECT: kafka1:9092 - JVM_OPTS: "-Xms32M -Xmx64M" - SERVER_SERVLET_CONTEXTPATH: "/" - volumes: mongo-data1: mongo-log1: @@ -172,9 +108,8 @@ volumes: mongo-log3: mongo-auth: external: true - zookeeper-logs: - zookeeper-data: networks: - mongo-net: + meadowlark-net: + name: meadowlark-net diff --git a/Meadowlark-js/backends/meadowlark-mongodb-backend/docker/readme.md b/Meadowlark-js/backends/meadowlark-mongodb-backend/docker/readme.md index b91b56b7..068b6ee6 100644 --- a/Meadowlark-js/backends/meadowlark-mongodb-backend/docker/readme.md +++ b/Meadowlark-js/backends/meadowlark-mongodb-backend/docker/readme.md @@ -93,31 +93,3 @@ connection string to use for this MongoDB replica set. The logs for `mongo1`, `mongo2`, and `mongo3` can be found on docker volumes `mongo-log1`, `mongo-log2`, and `mongo-log3` respectively. When not mounted to an active container, they can be browsed via the VS Code Docker extension. - -### Configure Debezium - -The Debezium Kafka Connector must be configured with the MongoDB admin username -and password to listen to MongoDB change stream. To do this, copy the -`debezium-mongodb.json.example` file to `debezium-mongodb.json`. Edit the json -file and insert the MongoDB admin username and password. Then send the -configuration to the Debezium Kafka Connector: - -Linux: - -```bash -curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \ - http://localhost:8083/connectors/ -d @debezium-mongodb.json -``` - -Windows: - -```pwsh -Invoke-RestMethod -Method Post -InFile .\debezium-mongodb.json ` - -uri http://localhost:8083/connectors/ -ContentType "application/json" -``` - -### Browsing Kafka Topics and Messages - -[Kafdrop](https://github.com/obsidiandynamics/kafdrop), a free Kafka Web UI, is -bundled with this deployment. Browse the Kafka instance with Kafdrop at -`http://localhost:9000/`. diff --git a/Meadowlark-js/backends/meadowlark-opensearch-backend/docker/docker-compose.yml b/Meadowlark-js/backends/meadowlark-opensearch-backend/docker/docker-compose.yml index 54b6bf80..3a078b31 100644 --- a/Meadowlark-js/backends/meadowlark-opensearch-backend/docker/docker-compose.yml +++ b/Meadowlark-js/backends/meadowlark-opensearch-backend/docker/docker-compose.yml @@ -24,7 +24,7 @@ services: - 9200:9200 - 9600:9600 # required for Performance Analyzer networks: - - opensearch-net + - meadowlark-net restart: unless-stopped opensearch-dashboards: @@ -38,11 +38,13 @@ services: OPENSEARCH_HOSTS: '["http://opensearch-node1:9200"]' DISABLE_SECURITY_DASHBOARDS_PLUGIN: true # disables security dashboards plugin in OpenSearch Dashboards networks: - - opensearch-net + - meadowlark-net restart: unless-stopped volumes: opensearch-data1: networks: - opensearch-net: + meadowlark-net: + external: true + diff --git a/Meadowlark-js/backends/meadowlark-opensearch-backend/docker/readme.md b/Meadowlark-js/backends/meadowlark-opensearch-backend/docker/readme.md index ec9e8ee6..80e1e24c 100644 --- a/Meadowlark-js/backends/meadowlark-opensearch-backend/docker/readme.md +++ b/Meadowlark-js/backends/meadowlark-opensearch-backend/docker/readme.md @@ -44,10 +44,3 @@ Delete all indices: ```none DELETE * ``` - -## Test dependency on older "docker-compose" versus newer "docker compose" - -The integration tests use the testcontainers library to spin up an Opensearch instance. As of -Feb 2023, it uses the legacy "docker-compose" command from Compose V1. If tests fail -with "Error: spawn docker-compose ENOENT", you will need to either [install Compose V1 -standalone](https://docs.docker.com/compose/install/other/) or `alias docker-compose='docker compose'`. diff --git a/NOTICES.md b/NOTICES.md index 5b5f62f0..d772159b 100644 --- a/NOTICES.md +++ b/NOTICES.md @@ -37,3 +37,27 @@ under the following license: COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +-- + +The `ed-fi-kafka-connect-tranform` project structure and the transformation `GenerateIndexFromResource` and was adapted from the [ExtractTopic +transformation](https://github.com/Aiven-Open/transforms-for-apache-kafka-connect/blob/master/src/main/java/io/aiven/kafka/connect/transforms/ExtractTopic.java) +by Aiven Open under the following license: + + Copyright 2019 Aiven Oy + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + Licensed under Apache License, Version 2.0. + +-- diff --git a/docker/kafka/Dockerfile b/docker/kafka/Dockerfile new file mode 100644 index 00000000..571fa130 --- /dev/null +++ b/docker/kafka/Dockerfile @@ -0,0 +1,28 @@ +# SPDX-License-Identifier: Apache-2.0 +# Licensed to the Ed-Fi Alliance under one or more agreements. +# The Ed-Fi Alliance licenses this file to you under the Apache License, Version 2.0. +# See the LICENSE and NOTICES files in the project root for more information. + +FROM gradle:8.2.1-jdk-focal@sha256:78b95c865c1bcb50ced01501e6a9c6c9bcc181be93c1ea723ff4b58b2f7e9a96 AS build +COPY --chown=gradle:gradle /ed-fi-kafka-connect-transforms /home/gradle/src +WORKDIR /home/gradle/src +RUN gradle installDist --no-daemon + +FROM debezium/connect:2.3@sha256:dfa59c008a03f45c7b286d2874f2e6dbe04f3db6f26b6f01806c136abb07381a +LABEL maintainer="Ed-Fi Alliance, LLC and Contributors " + +ARG package=opensearch-connector-for-apache-kafka-3.1.0.tar + +ADD --chown=kafka --chmod=600 https://github.com/aiven/opensearch-connector-for-apache-kafka/releases/download/v3.1.0/${package} \ + /kafka/connect/ + +RUN cd /kafka/connect/ && \ + originalSha=58c27bdb0b8883e2e3291a2aaa42151b77240a33d3361ad620c656e775da14d2 && \ + newSha=$(sha256sum ${package} | awk '{print $1}') && \ + if [ $originalSha != $newSha ]; then exit 1; fi + +RUN cd /kafka/connect/ && \ + tar -xvf ${package} && \ + rm ${package} + +COPY --from=build /home/gradle/src/build/libs /kafka/connect/ed-fi-kafka-connect-transforms diff --git a/docker/kafka/ed-fi-kafka-connect-transforms/.gitignore b/docker/kafka/ed-fi-kafka-connect-transforms/.gitignore new file mode 100644 index 00000000..41b76225 --- /dev/null +++ b/docker/kafka/ed-fi-kafka-connect-transforms/.gitignore @@ -0,0 +1,5 @@ +bin +.gradle +build + +*.jar diff --git a/docker/kafka/ed-fi-kafka-connect-transforms/README.md b/docker/kafka/ed-fi-kafka-connect-transforms/README.md new file mode 100644 index 00000000..13ac3889 --- /dev/null +++ b/docker/kafka/ed-fi-kafka-connect-transforms/README.md @@ -0,0 +1,44 @@ +# Ed-Fi Transformations for Apache Kafka® Connect + +[Single Message Transformations +(SMTs)](https://kafka.apache.org/documentation/#connect_transforms) for Apache +Kafka Connect. + +## Transformations + +See [the Kafka +documentation](https://kafka.apache.org/documentation/#connect_transforms) for +more details about configuring transformations on how to install transforms. + +### `GenerateIndexFromResource` + +This transformation builds an index based on a group of values contained in the +body of the result, separated by $. + +- `com.github.edfiexchangeoss.meadowlark.kafka.connect.transforms.GenerateIndexFromResource$Value` + - works on values. + +The transformation defines the following configurations: + +- `field.name` - Comma separated list of fields to be included into building the + Index. This fields will be separated by $ and will add `descriptor` at the end + if resource is marked as such. + +Here is an example of this transformation configuration: + +```properties +transforms=GenerateIndexFromResource +transforms.GenerateIndexFromResource.type=com.github.edfiexchangeoss.meadowlark.kafka.connect.transforms.GenerateIndexFromResource +transforms.GenerateIndexFromResource.field.name=projectName,resourceVersion,resourceName +``` + +## Running transformations + +This project includes a series of *gradle* tasks: + +- `./gradlew build`: Compile code + +- `./gradlew test`: Run unit tests + +- `./gradlew installDist`: Creates a jar distributable file, located under + `/build/install/ed-fi-kafka-connect-transforms/ed-fi-kafka-connect-transforms-{version}.jar` diff --git a/docker/kafka/ed-fi-kafka-connect-transforms/build.gradle b/docker/kafka/ed-fi-kafka-connect-transforms/build.gradle new file mode 100644 index 00000000..4b9f9640 --- /dev/null +++ b/docker/kafka/ed-fi-kafka-connect-transforms/build.gradle @@ -0,0 +1,106 @@ +// SPDX-License-Identifier: Apache-2.0 + +// Licensed to the Ed-Fi Alliance under one or more agreements. +// The Ed-Fi Alliance licenses this file to you under the Apache License, Version 2.0. +// See the LICENSE and NOTICES files in the project root for more information. + +import org.gradle.util.DistributionLocator +import org.gradle.util.GradleVersion +import org.gradle.util.VersionNumber + +plugins { + // https://docs.gradle.org/current/userguide/java_library_plugin.html + id 'java-library' + + // https://docs.gradle.org/current/userguide/distribution_plugin.html + id 'distribution' + + // https://docs.gradle.org/current/userguide/checkstyle_plugin.html + id 'checkstyle' +} + +repositories { + mavenCentral() +} + +sourceCompatibility = JavaVersion.VERSION_11 +targetCompatibility = JavaVersion.VERSION_11 + +ext { + kafkaVersion = "2.3.0" + debeziumVersion = "2.3.0.Final" +} + +distributions { + main { + contents { + from jar + from configurations.runtimeClasspath + } + } +} + +wrapper { + distributionType = 'ALL' + doLast { + final DistributionLocator locator = new DistributionLocator() + final GradleVersion version = GradleVersion.version(wrapper.gradleVersion) + final URI distributionUri = locator.getDistributionFor(version, wrapper.distributionType.name().toLowerCase(Locale.ENGLISH)) + final URI sha256Uri = new URI(distributionUri.toString() + ".sha256") + final String sha256Sum = new String(sha256Uri.toURL().bytes) + wrapper.getPropertiesFile() << "distributionSha256Sum=${sha256Sum}\n" + println "Added checksum to wrapper properties" + } +} + +dependencies { + compileOnly "org.apache.kafka:connect-api:$kafkaVersion" + compileOnly "io.debezium:debezium-api:$debeziumVersion" + + implementation "org.slf4j:slf4j-api:1.7.36" + + testImplementation "org.junit.jupiter:junit-jupiter:5.10.0" + testImplementation "org.apache.kafka:connect-api:$kafkaVersion" + testImplementation "io.debezium:debezium-api:$debeziumVersion" + testImplementation "org.assertj:assertj-core:3.24.2" + + testRuntimeOnly "org.apache.logging.log4j:log4j-slf4j-impl:2.20.0" + testRuntimeOnly "org.apache.logging.log4j:log4j-api:2.20.0" + testRuntimeOnly "org.apache.logging.log4j:log4j-core:2.20.0" +} + +checkstyle { + toolVersion "8.21" +} + +test { + useJUnitPlatform { + includeEngines 'junit-jupiter' + } +} + +jar { + manifest { + attributes( + 'Version': "${getArchiveVersion()}" + ) + } +} + +def setVersionInGradleProperties(ver) { + logger.quiet('Changing version in gradle.properties to {}', ver) + file('gradle.properties.new').withWriter { writer -> + file('gradle.properties').eachLine { line -> + if (line ==~ /version=.*/) { + writer.writeLine("version=${ver}") + } else { + writer.writeLine(line) + } + } + } + file('gradle.properties.new').renameTo('gradle.properties') +} + +def releaseTag() { + return 'v' + releaseVersion +} diff --git a/docker/kafka/ed-fi-kafka-connect-transforms/config/checkstyle/checkstyle.xml b/docker/kafka/ed-fi-kafka-connect-transforms/config/checkstyle/checkstyle.xml new file mode 100644 index 00000000..b8c21a03 --- /dev/null +++ b/docker/kafka/ed-fi-kafka-connect-transforms/config/checkstyle/checkstyle.xml @@ -0,0 +1,328 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/docker/kafka/ed-fi-kafka-connect-transforms/config/checkstyle/java.header b/docker/kafka/ed-fi-kafka-connect-transforms/config/checkstyle/java.header new file mode 100644 index 00000000..7191208e --- /dev/null +++ b/docker/kafka/ed-fi-kafka-connect-transforms/config/checkstyle/java.header @@ -0,0 +1,15 @@ +/\* + \* Copyright 20(19|2[0-9]) Aiven Oy + \* + \* Licensed under the Apache License, Version 2.0 \(the "License"\); + \* you may not use this file except in compliance with the License. + \* You may obtain a copy of the License at + \* + \* http://www.apache.org/licenses/LICENSE-2.0 + \* + \* Unless required by applicable law or agreed to in writing, software + \* distributed under the License is distributed on an "AS IS" BASIS, + \* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + \* See the License for the specific language governing permissions and + \* limitations under the License. + \*/ diff --git a/docker/kafka/ed-fi-kafka-connect-transforms/gradle.properties b/docker/kafka/ed-fi-kafka-connect-transforms/gradle.properties new file mode 100644 index 00000000..aef125e0 --- /dev/null +++ b/docker/kafka/ed-fi-kafka-connect-transforms/gradle.properties @@ -0,0 +1 @@ +version=1.0.0 diff --git a/docker/kafka/ed-fi-kafka-connect-transforms/gradle/wrapper/gradle-wrapper.properties b/docker/kafka/ed-fi-kafka-connect-transforms/gradle/wrapper/gradle-wrapper.properties new file mode 100644 index 00000000..746f3422 --- /dev/null +++ b/docker/kafka/ed-fi-kafka-connect-transforms/gradle/wrapper/gradle-wrapper.properties @@ -0,0 +1,6 @@ +distributionBase=GRADLE_USER_HOME +distributionPath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-7.4.2-all.zip +zipStoreBase=GRADLE_USER_HOME +zipStorePath=wrapper/dists +distributionSha256Sum=e6d864e3b5bc05cc62041842b306383fc1fefcec359e70cebb1d470a6094ca82 diff --git a/docker/kafka/ed-fi-kafka-connect-transforms/gradlew b/docker/kafka/ed-fi-kafka-connect-transforms/gradlew new file mode 100644 index 00000000..1b6c7873 --- /dev/null +++ b/docker/kafka/ed-fi-kafka-connect-transforms/gradlew @@ -0,0 +1,234 @@ +#!/bin/sh + +# +# Copyright © 2015-2021 the original authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +############################################################################## +# +# Gradle start up script for POSIX generated by Gradle. +# +# Important for running: +# +# (1) You need a POSIX-compliant shell to run this script. If your /bin/sh is +# noncompliant, but you have some other compliant shell such as ksh or +# bash, then to run this script, type that shell name before the whole +# command line, like: +# +# ksh Gradle +# +# Busybox and similar reduced shells will NOT work, because this script +# requires all of these POSIX shell features: +# * functions; +# * expansions «$var», «${var}», «${var:-default}», «${var+SET}», +# «${var#prefix}», «${var%suffix}», and «$( cmd )»; +# * compound commands having a testable exit status, especially «case»; +# * various built-in commands including «command», «set», and «ulimit». +# +# Important for patching: +# +# (2) This script targets any POSIX shell, so it avoids extensions provided +# by Bash, Ksh, etc; in particular arrays are avoided. +# +# The "traditional" practice of packing multiple parameters into a +# space-separated string is a well documented source of bugs and security +# problems, so this is (mostly) avoided, by progressively accumulating +# options in "$@", and eventually passing that to Java. +# +# Where the inherited environment variables (DEFAULT_JVM_OPTS, JAVA_OPTS, +# and GRADLE_OPTS) rely on word-splitting, this is performed explicitly; +# see the in-line comments for details. +# +# There are tweaks for specific operating systems such as AIX, CygWin, +# Darwin, MinGW, and NonStop. +# +# (3) This script is generated from the Groovy template +# https://github.com/gradle/gradle/blob/master/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt +# within the Gradle project. +# +# You can find Gradle at https://github.com/gradle/gradle/. +# +############################################################################## + +# Attempt to set APP_HOME + +# Resolve links: $0 may be a link +app_path=$0 + +# Need this for daisy-chained symlinks. +while + APP_HOME=${app_path%"${app_path##*/}"} # leaves a trailing /; empty if no leading path + [ -h "$app_path" ] +do + ls=$( ls -ld "$app_path" ) + link=${ls#*' -> '} + case $link in #( + /*) app_path=$link ;; #( + *) app_path=$APP_HOME$link ;; + esac +done + +APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit + +APP_NAME="Gradle" +APP_BASE_NAME=${0##*/} + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' + +# Use the maximum available, or set MAX_FD != -1 to use that value. +MAX_FD=maximum + +warn () { + echo "$*" +} >&2 + +die () { + echo + echo "$*" + echo + exit 1 +} >&2 + +# OS specific support (must be 'true' or 'false'). +cygwin=false +msys=false +darwin=false +nonstop=false +case "$( uname )" in #( + CYGWIN* ) cygwin=true ;; #( + Darwin* ) darwin=true ;; #( + MSYS* | MINGW* ) msys=true ;; #( + NONSTOP* ) nonstop=true ;; +esac + +CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar + + +# Determine the Java command to use to start the JVM. +if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD=$JAVA_HOME/jre/sh/java + else + JAVACMD=$JAVA_HOME/bin/java + fi + if [ ! -x "$JAVACMD" ] ; then + die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +else + JAVACMD=java + which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." +fi + +# Increase the maximum file descriptors if we can. +if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then + case $MAX_FD in #( + max*) + MAX_FD=$( ulimit -H -n ) || + warn "Could not query maximum file descriptor limit" + esac + case $MAX_FD in #( + '' | soft) :;; #( + *) + ulimit -n "$MAX_FD" || + warn "Could not set maximum file descriptor limit to $MAX_FD" + esac +fi + +# Collect all arguments for the java command, stacking in reverse order: +# * args from the command line +# * the main class name +# * -classpath +# * -D...appname settings +# * --module-path (only if needed) +# * DEFAULT_JVM_OPTS, JAVA_OPTS, and GRADLE_OPTS environment variables. + +# For Cygwin or MSYS, switch paths to Windows format before running java +if "$cygwin" || "$msys" ; then + APP_HOME=$( cygpath --path --mixed "$APP_HOME" ) + CLASSPATH=$( cygpath --path --mixed "$CLASSPATH" ) + + JAVACMD=$( cygpath --unix "$JAVACMD" ) + + # Now convert the arguments - kludge to limit ourselves to /bin/sh + for arg do + if + case $arg in #( + -*) false ;; # don't mess with options #( + /?*) t=${arg#/} t=/${t%%/*} # looks like a POSIX filepath + [ -e "$t" ] ;; #( + *) false ;; + esac + then + arg=$( cygpath --path --ignore --mixed "$arg" ) + fi + # Roll the args list around exactly as many times as the number of + # args, so each arg winds up back in the position where it started, but + # possibly modified. + # + # NB: a `for` loop captures its iteration list before it begins, so + # changing the positional parameters here affects neither the number of + # iterations, nor the values presented in `arg`. + shift # remove old arg + set -- "$@" "$arg" # push replacement arg + done +fi + +# Collect all arguments for the java command; +# * $DEFAULT_JVM_OPTS, $JAVA_OPTS, and $GRADLE_OPTS can contain fragments of +# shell script including quotes and variable substitutions, so put them in +# double quotes to make sure that they get re-expanded; and +# * put everything else in single quotes, so that it's not re-expanded. + +set -- \ + "-Dorg.gradle.appname=$APP_BASE_NAME" \ + -classpath "$CLASSPATH" \ + org.gradle.wrapper.GradleWrapperMain \ + "$@" + +# Use "xargs" to parse quoted args. +# +# With -n1 it outputs one arg per line, with the quotes and backslashes removed. +# +# In Bash we could simply go: +# +# readarray ARGS < <( xargs -n1 <<<"$var" ) && +# set -- "${ARGS[@]}" "$@" +# +# but POSIX shell has neither arrays nor command substitution, so instead we +# post-process each arg (as a line of input to sed) to backslash-escape any +# character that might be a shell metacharacter, then use eval to reverse +# that process (while maintaining the separation between arguments), and wrap +# the whole thing up as a single "set" statement. +# +# This will of course break if any of these variables contains a newline or +# an unmatched quote. +# + +eval "set -- $( + printf '%s\n' "$DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS" | + xargs -n1 | + sed ' s~[^-[:alnum:]+,./:=@_]~\\&~g; ' | + tr '\n' ' ' + )" '"$@"' + +exec "$JAVACMD" "$@" diff --git a/docker/kafka/ed-fi-kafka-connect-transforms/gradlew.bat b/docker/kafka/ed-fi-kafka-connect-transforms/gradlew.bat new file mode 100644 index 00000000..107acd32 --- /dev/null +++ b/docker/kafka/ed-fi-kafka-connect-transforms/gradlew.bat @@ -0,0 +1,89 @@ +@rem +@rem Copyright 2015 the original author or authors. +@rem +@rem Licensed under the Apache License, Version 2.0 (the "License"); +@rem you may not use this file except in compliance with the License. +@rem You may obtain a copy of the License at +@rem +@rem https://www.apache.org/licenses/LICENSE-2.0 +@rem +@rem Unless required by applicable law or agreed to in writing, software +@rem distributed under the License is distributed on an "AS IS" BASIS, +@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +@rem See the License for the specific language governing permissions and +@rem limitations under the License. +@rem + +@if "%DEBUG%" == "" @echo off +@rem ########################################################################## +@rem +@rem Gradle startup script for Windows +@rem +@rem ########################################################################## + +@rem Set local scope for the variables with windows NT shell +if "%OS%"=="Windows_NT" setlocal + +set DIRNAME=%~dp0 +if "%DIRNAME%" == "" set DIRNAME=. +set APP_BASE_NAME=%~n0 +set APP_HOME=%DIRNAME% + +@rem Resolve any "." and ".." in APP_HOME to make it shorter. +for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi + +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m" + +@rem Find java.exe +if defined JAVA_HOME goto findJavaFromJavaHome + +set JAVA_EXE=java.exe +%JAVA_EXE% -version >NUL 2>&1 +if "%ERRORLEVEL%" == "0" goto execute + +echo. +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:findJavaFromJavaHome +set JAVA_HOME=%JAVA_HOME:"=% +set JAVA_EXE=%JAVA_HOME%/bin/java.exe + +if exist "%JAVA_EXE%" goto execute + +echo. +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:execute +@rem Setup the command line + +set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar + + +@rem Execute Gradle +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %* + +:end +@rem End local scope for the variables with windows NT shell +if "%ERRORLEVEL%"=="0" goto mainEnd + +:fail +rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of +rem the _cmd.exe /c_ return code! +if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 +exit /b 1 + +:mainEnd +if "%OS%"=="Windows_NT" endlocal + +:omega diff --git a/docker/kafka/ed-fi-kafka-connect-transforms/settings.gradle b/docker/kafka/ed-fi-kafka-connect-transforms/settings.gradle new file mode 100644 index 00000000..a0cfb4ce --- /dev/null +++ b/docker/kafka/ed-fi-kafka-connect-transforms/settings.gradle @@ -0,0 +1 @@ +rootProject.name = 'ed-fi-kafka-connect-transforms' diff --git a/docker/kafka/ed-fi-kafka-connect-transforms/src/main/java/com/github/edfiexchangeoss/meadowlark/kafka/connect/transforms/GenerateIndexFromResource.java b/docker/kafka/ed-fi-kafka-connect-transforms/src/main/java/com/github/edfiexchangeoss/meadowlark/kafka/connect/transforms/GenerateIndexFromResource.java new file mode 100644 index 00000000..2aee27c0 --- /dev/null +++ b/docker/kafka/ed-fi-kafka-connect-transforms/src/main/java/com/github/edfiexchangeoss/meadowlark/kafka/connect/transforms/GenerateIndexFromResource.java @@ -0,0 +1,139 @@ +/* + * Copyright 2019 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * SPDX-License-Identifier: Apache-2.0 + * Licensed to the Ed-Fi Alliance under one or more agreements. + * The Ed-Fi Alliance licenses this file to you under the Apache License, Version 2.0. + * See the LICENSE and NOTICES files in the project root for more information. + */ + +package com.github.edfiexchangeoss.meadowlark.kafka.connect.transforms; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.transforms.Transformation; + +public class GenerateIndexFromResource> implements Transformation { + + private GenerateIndexFromResourceConfig config; + + @Override + public ConfigDef config() { + return GenerateIndexFromResourceConfig.config(); + } + + @Override + public void configure(final Map settings) { + this.config = new GenerateIndexFromResourceConfig(settings); + } + + @Override + public R apply(final R record) { + final SchemaAndValue schemaAndValue = getSchemaAndValue(record); + + final Optional newTopic; + + if (!config.fieldName().isPresent()) { + throw new DataException("value must specify one or more field names comma separated."); + } + + final List fieldList = Stream.of(config.fieldName().get().split(",")) + .map(String::trim) + .collect(Collectors.toList()); + + final StringBuilder topicResult = new StringBuilder(); + final String separator = "$"; + + fieldList.forEach(field -> { + topicResult.append( + topicNameFromNamedField(record.toString(), + schemaAndValue.value(), + field).get() + separator); + }); + + topicResult.replace(topicResult.length() - 1, topicResult.length(), ""); + + if (record.toString().contains("isDescriptor=true")) { + topicResult.append("descriptor"); + } + + newTopic = Optional.of(topicResult.toString()); + + if (newTopic.isPresent()) { + return record.newRecord( + newTopic.get(), + record.kafkaPartition(), + record.keySchema(), + record.key(), + record.valueSchema(), + record.value(), + record.timestamp(), + record.headers()); + } else { + return record; + } + } + + protected SchemaAndValue getSchemaAndValue(final R record) { + return new SchemaAndValue(record.valueSchema(), record.value()); + } + + private Optional topicNameFromNamedField(final String recordStr, + final Object value, + final String fieldName) { + if (value == null) { + throw new DataException("value can't be null: " + recordStr); + } + + if (!(value instanceof Map)) { + throw new DataException("value type must be an object: " + recordStr); + } + + @SuppressWarnings("unchecked") + final Map valueMap = (Map) value; + + final Optional result = Optional.ofNullable(valueMap.get(fieldName)) + .map(field -> { + if (!field.getClass().equals(String.class)) { + throw new DataException(fieldName + " type in value " + + value + + " must be a comma separated string: " + + recordStr); + } + return field; + }) + .map(Object::toString); + + if (result.isPresent() && !result.get().isBlank()) { + return result; + } else { + throw new DataException(fieldName + " in value can't be null or empty: " + recordStr); + } + } + + @Override + public void close() { + } +} diff --git a/docker/kafka/ed-fi-kafka-connect-transforms/src/main/java/com/github/edfiexchangeoss/meadowlark/kafka/connect/transforms/GenerateIndexFromResourceConfig.java b/docker/kafka/ed-fi-kafka-connect-transforms/src/main/java/com/github/edfiexchangeoss/meadowlark/kafka/connect/transforms/GenerateIndexFromResourceConfig.java new file mode 100644 index 00000000..57d9b4ed --- /dev/null +++ b/docker/kafka/ed-fi-kafka-connect-transforms/src/main/java/com/github/edfiexchangeoss/meadowlark/kafka/connect/transforms/GenerateIndexFromResourceConfig.java @@ -0,0 +1,59 @@ +/* + * Copyright 2019 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * SPDX-License-Identifier: Apache-2.0 + * Licensed to the Ed-Fi Alliance under one or more agreements. + * The Ed-Fi Alliance licenses this file to you under the Apache License, Version 2.0. + * See the LICENSE and NOTICES files in the project root for more information. + */ + + +package com.github.edfiexchangeoss.meadowlark.kafka.connect.transforms; + +import java.util.Map; +import java.util.Optional; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; + +class GenerateIndexFromResourceConfig extends AbstractConfig { + public static final String FIELD_NAME_CONFIG = "field.name"; + private static final String FIELD_NAME_DOC = + "The list of properties separated by comma which should be used as the topic name. "; + + GenerateIndexFromResourceConfig(final Map originals) { + super(config(), originals); + } + + static ConfigDef config() { + return new ConfigDef() + .define( + FIELD_NAME_CONFIG, + ConfigDef.Type.STRING, + null, + ConfigDef.Importance.HIGH, + FIELD_NAME_DOC); + } + + Optional fieldName() { + final String rawFieldName = getString(FIELD_NAME_CONFIG); + if (null == rawFieldName || "".equals(rawFieldName)) { + return Optional.empty(); + } + return Optional.of(rawFieldName); + } +} diff --git a/docker/kafka/ed-fi-kafka-connect-transforms/src/test/java/com/github/edfiexchangeoss/meadowlark/kafka/connect/transforms/GenerateIndexFromResourceConfigTest.java b/docker/kafka/ed-fi-kafka-connect-transforms/src/test/java/com/github/edfiexchangeoss/meadowlark/kafka/connect/transforms/GenerateIndexFromResourceConfigTest.java new file mode 100644 index 00000000..da1b59a1 --- /dev/null +++ b/docker/kafka/ed-fi-kafka-connect-transforms/src/test/java/com/github/edfiexchangeoss/meadowlark/kafka/connect/transforms/GenerateIndexFromResourceConfigTest.java @@ -0,0 +1,51 @@ +/* + * Copyright 2019 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * SPDX-License-Identifier: Apache-2.0 + * Licensed to the Ed-Fi Alliance under one or more agreements. + * The Ed-Fi Alliance licenses this file to you under the Apache License, Version 2.0. + * See the LICENSE and NOTICES files in the project root for more information. + */ + + +package com.github.edfiexchangeoss.meadowlark.kafka.connect.transforms; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +class GenerateIndexFromResourceConfigTest { + + @Test + void emptyFieldName() { + final Map props = new HashMap<>(); + props.put("field.name", ""); + final GenerateIndexFromResourceConfig config = new GenerateIndexFromResourceConfig(props); + assertThat(config.fieldName()).isNotPresent(); + } + + @Test + void definedFieldName() { + final Map props = new HashMap<>(); + props.put("field.name", "test"); + final GenerateIndexFromResourceConfig config = new GenerateIndexFromResourceConfig(props); + assertThat(config.fieldName()).hasValue("test"); + } +} diff --git a/docker/kafka/ed-fi-kafka-connect-transforms/src/test/java/com/github/edfiexchangeoss/meadowlark/kafka/connect/transforms/GenerateIndexFromResourceTest.java b/docker/kafka/ed-fi-kafka-connect-transforms/src/test/java/com/github/edfiexchangeoss/meadowlark/kafka/connect/transforms/GenerateIndexFromResourceTest.java new file mode 100644 index 00000000..29749a44 --- /dev/null +++ b/docker/kafka/ed-fi-kafka-connect-transforms/src/test/java/com/github/edfiexchangeoss/meadowlark/kafka/connect/transforms/GenerateIndexFromResourceTest.java @@ -0,0 +1,183 @@ +/* + * Copyright 2019 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * SPDX-License-Identifier: Apache-2.0 + * Licensed to the Ed-Fi Alliance under one or more agreements. + * The Ed-Fi Alliance licenses this file to you under the Apache License, Version 2.0. + * See the LICENSE and NOTICES files in the project root for more information. + */ + +package com.github.edfiexchangeoss.meadowlark.kafka.connect.transforms; + +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.sink.SinkRecord; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.NullAndEmptySource; +import org.junit.jupiter.params.provider.ValueSource; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +class GenerateIndexFromResourceTest { + + private static final String FIELD = "test_field"; + private static final String NEW_TOPIC = "new_topic"; + + @Test + void nullSchema() { + final SinkRecord originalRecord = record(null); + assertThatThrownBy(() -> transformation(FIELD).apply(originalRecord)) + .isInstanceOf(DataException.class) + .hasMessage("value can't be null: " + originalRecord); + } + + @Test + void generateIndex_UnsupportedValueType() { + final SinkRecord originalRecord = record(new HashMap()); + assertThatThrownBy(() -> transformation(null).apply(originalRecord)) + .isInstanceOf(DataException.class) + .hasMessage("value must specify one or more field names comma separated."); + } + + @Test + void generateIndex_NonObject() { + final SinkRecord originalRecord = record("some"); + assertThatThrownBy(() -> transformation(FIELD).apply(originalRecord)) + .isInstanceOf(DataException.class) + .hasMessage("value type must be an object: " + originalRecord); + } + + @Test + void generateIndex_NullStruct() { + final SinkRecord originalRecord = record(null); + assertThatThrownBy(() -> transformation(FIELD).apply(originalRecord)) + .isInstanceOf(DataException.class) + .hasMessage("value can't be null: " + originalRecord); + } + + @Test + void generateIndex_NotReceivingExpectedObject() { + final var field = Map.of(FIELD, Map.of()); + final SinkRecord originalRecord = record(field); + assertThatThrownBy(() -> transformation(FIELD).apply(originalRecord)) + .isInstanceOf(DataException.class) + .hasMessage(FIELD + " type in value " + + field + " must be a comma separated string: " + originalRecord); + } + + @ParameterizedTest + @ValueSource(strings = "missing") + @NullAndEmptySource + void generateIndex_ReceivingObject_NullOrEmptyValue(final String value) { + final Map valueMap = new HashMap<>(); + valueMap.put("another", "value"); + if (!"missing".equals(value)) { + valueMap.put(FIELD, value); + } + final SinkRecord originalRecord = record(valueMap); + assertThatThrownBy(() -> transformation(FIELD).apply(originalRecord)) + .isInstanceOf(DataException.class) + .hasMessage(FIELD + " in value can't be null or empty: " + originalRecord); + } + + @Test + void generateIndex_ReceivingObject_NormalStringValue() { + final SinkRecord originalRecord; + final var receivedObject = Map.of(FIELD, NEW_TOPIC); + originalRecord = record(receivedObject); + final SinkRecord result = transformation(FIELD).apply(originalRecord); + assertThat(result).isEqualTo(setNewTopic(originalRecord, NEW_TOPIC)); + } + + @ParameterizedTest + @ValueSource(strings = { "true", "false" }) + void generateIndex_ReceivingObject_WithCommaSeparatedList(final String isDescriptor) { + final SinkRecord originalRecord; + final var project = "project"; + final var resourceVersion = "resourceVersion"; + final var resourceName = "resourceName"; + + final Map receivedObject = Stream.of(new String[][] { + {"project", project}, + {"resourceVersion", resourceVersion}, + {"resourceName", resourceName}, + {"additionalData", "additionalData"}, + {"isDescriptor", isDescriptor}, + }).collect(Collectors.toMap(data -> data[0], data -> data[1])); + + originalRecord = record(receivedObject); + + final var params = project + "," + resourceVersion + "," + resourceName; + var expectedResult = project + "$" + resourceVersion + "$" + resourceName; + if (isDescriptor.equals("true")) { + expectedResult += "descriptor"; + } + + final SinkRecord result = transformation(params).apply(originalRecord); + assertThat(result).isEqualTo(setNewTopic(originalRecord, expectedResult)); + } + + private GenerateIndexFromResource transformation(final String fieldName) { + + final Map props = new HashMap<>(); + if (fieldName != null) { + props.put("field.name", fieldName); + } + final GenerateIndexFromResource transform = createTransformationObject(); + transform.configure(props); + return transform; + } + + protected GenerateIndexFromResource createTransformationObject() { + return new GenerateIndexFromResource<>(); + } + + protected SinkRecord record(final Object data) { + return record(null, null, null, data); + } + + protected SinkRecord record(final Schema keySchema, + final Object key, + final Schema valueSchema, + final Object value) { + return new SinkRecord("original_topic", 0, + keySchema, key, + valueSchema, value, + 123L, + 456L, TimestampType.CREATE_TIME); + } + + private SinkRecord setNewTopic(final SinkRecord record, final String newTopic) { + return record.newRecord(newTopic, + record.kafkaPartition(), + record.keySchema(), + record.key(), + record.valueSchema(), + record.value(), + record.timestamp(), + record.headers()); + } +} diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md index 834ce87f..d1bb7c87 100644 --- a/docs/CONFIGURATION.md +++ b/docs/CONFIGURATION.md @@ -11,7 +11,7 @@ The MongoDB configuration needs to have a replica set to support atomic transact only a single node, though we recommend at least three nodes. OpenSearch clustering has not been tested, and the development team does not yet know how clustering works with this tool. -Ideall, all three services would be in the same network segment and able to communicate through unencrypted channels. Indeed, +Ideally, all three services would be in the same network segment and able to communicate through unencrypted channels. Indeed, the development team has not tested any alternative. However, only the API port should be open to outside traffic, except as needed for debugging. diff --git a/docs/LOCALHOST.md b/docs/LOCALHOST.md index 6b65d219..c51bb77c 100644 --- a/docs/LOCALHOST.md +++ b/docs/LOCALHOST.md @@ -28,6 +28,10 @@ Instructions for running a local "developer" environment on localhost: 10. Seup other environment variables. See [CONFIGURATION](CONFIGURATION.md) for more details. 11. Run `npm run start:local` to start the Meadowlark API service +## Using Kafka + +Alternatively, you can use Kafka and Kafka-connect to listen to MongoDB changes and write them to OpenSearch (PostgreSQL and ElasticSearch will be added in the future). To do so, run the [Kafka](../Meadowlark-js/backends/meadowlark-kafka-stream/docker) setup and set the `LISTENER1_PLUGIN` as an empty variable in the .env file. + ## Clearing Out Local Databases Sometimes it is useful to reset your local environment to a fresh state, with no