From 9670c6f91aa0e0b3aa12227b76ac81d74e32d4f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Alberto=20Leiva=20Obando?= <56046999+jleiva-gap@users.noreply.github.com> Date: Tue, 29 Aug 2023 16:04:33 -0600 Subject: [PATCH] [RND-629] Fix issues with kafka sink to opensearch transformation (#294) * [RND-629] Fix issues with kafka sink to opensearch transformation - Update GenerateIndexFromResource to update index and replace dots. - Remove validation to append descriptor part to index name. * Update test * Update README.md Add details about gradle-wrapper generation. * Update README.md --- .../ed-fi-kafka-connect-transforms/README.md | 16 ++++++++++++++++ .../transforms/GenerateIndexFromResource.java | 6 +----- .../GenerateIndexFromResourceTest.java | 13 +++++-------- eng/Run-DevContainers.ps1 | 12 ++++++------ 4 files changed, 28 insertions(+), 19 deletions(-) diff --git a/docker/kafka/ed-fi-kafka-connect-transforms/README.md b/docker/kafka/ed-fi-kafka-connect-transforms/README.md index 13ac3889..f8f9e1ad 100644 --- a/docker/kafka/ed-fi-kafka-connect-transforms/README.md +++ b/docker/kafka/ed-fi-kafka-connect-transforms/README.md @@ -34,6 +34,22 @@ transforms.GenerateIndexFromResource.field.name=projectName,resourceVersion,reso ## Running transformations +### Prerequisites + +- Install, if you don't have it, gradle version 7.2.4 according to the [installation guide](https://gradle.org/install/) +- To verify your installation, pen a console (or a Windows command prompt) and run gradle -v to run gradle and display the version, e.g.: +`> gradle -v` +Result: +``` +------------------------------------------------------------ +Gradle 7.2.4 +------------------------------------------------------------ +``` + +- To run the transforms locally for the first time you need to build the gradle-wrapper.jar. To generate it, run the following command, this will add the gradle-wrapper.jar in the gradle\wrapper folder +`> gradle wrapper` + +### Tasks This project includes a series of *gradle* tasks: - `./gradlew build`: Compile code diff --git a/docker/kafka/ed-fi-kafka-connect-transforms/src/main/java/com/github/edfiexchangeoss/meadowlark/kafka/connect/transforms/GenerateIndexFromResource.java b/docker/kafka/ed-fi-kafka-connect-transforms/src/main/java/com/github/edfiexchangeoss/meadowlark/kafka/connect/transforms/GenerateIndexFromResource.java index 2aee27c0..c55307d3 100644 --- a/docker/kafka/ed-fi-kafka-connect-transforms/src/main/java/com/github/edfiexchangeoss/meadowlark/kafka/connect/transforms/GenerateIndexFromResource.java +++ b/docker/kafka/ed-fi-kafka-connect-transforms/src/main/java/com/github/edfiexchangeoss/meadowlark/kafka/connect/transforms/GenerateIndexFromResource.java @@ -70,15 +70,11 @@ public R apply(final R record) { topicResult.append( topicNameFromNamedField(record.toString(), schemaAndValue.value(), - field).get() + separator); + field).get().replace(".", "-") + separator); }); topicResult.replace(topicResult.length() - 1, topicResult.length(), ""); - if (record.toString().contains("isDescriptor=true")) { - topicResult.append("descriptor"); - } - newTopic = Optional.of(topicResult.toString()); if (newTopic.isPresent()) { diff --git a/docker/kafka/ed-fi-kafka-connect-transforms/src/test/java/com/github/edfiexchangeoss/meadowlark/kafka/connect/transforms/GenerateIndexFromResourceTest.java b/docker/kafka/ed-fi-kafka-connect-transforms/src/test/java/com/github/edfiexchangeoss/meadowlark/kafka/connect/transforms/GenerateIndexFromResourceTest.java index 29749a44..39c1a0fb 100644 --- a/docker/kafka/ed-fi-kafka-connect-transforms/src/test/java/com/github/edfiexchangeoss/meadowlark/kafka/connect/transforms/GenerateIndexFromResourceTest.java +++ b/docker/kafka/ed-fi-kafka-connect-transforms/src/test/java/com/github/edfiexchangeoss/meadowlark/kafka/connect/transforms/GenerateIndexFromResourceTest.java @@ -118,24 +118,21 @@ void generateIndex_ReceivingObject_WithCommaSeparatedList(final String isDescrip final SinkRecord originalRecord; final var project = "project"; final var resourceVersion = "resourceVersion"; + final var resourceVersionValue = "major.minor.patch"; final var resourceName = "resourceName"; - + final var resourceNameValue = "resourceName" + (isDescriptor.equals("true") ? "descriptor" : ""); final Map receivedObject = Stream.of(new String[][] { {"project", project}, - {"resourceVersion", resourceVersion}, - {"resourceName", resourceName}, + {"resourceVersion", resourceVersionValue}, + {"resourceName", resourceNameValue}, {"additionalData", "additionalData"}, {"isDescriptor", isDescriptor}, }).collect(Collectors.toMap(data -> data[0], data -> data[1])); originalRecord = record(receivedObject); + final var expectedResult = (project + "$" + resourceVersionValue + "$" + resourceNameValue).replace(".", "-"); final var params = project + "," + resourceVersion + "," + resourceName; - var expectedResult = project + "$" + resourceVersion + "$" + resourceName; - if (isDescriptor.equals("true")) { - expectedResult += "descriptor"; - } - final SinkRecord result = transformation(params).apply(originalRecord); assertThat(result).isEqualTo(setNewTopic(originalRecord, expectedResult)); } diff --git a/eng/Run-DevContainers.ps1 b/eng/Run-DevContainers.ps1 index 3b13d105..2f9697e4 100644 --- a/eng/Run-DevContainers.ps1 +++ b/eng/Run-DevContainers.ps1 @@ -9,23 +9,23 @@ PostgreSQL. The MongoDB collection also includes Kafka with Debezium. #> param( - + [Parameter(Mandatory=$false)] [Switch] $ElasticSearch, - + [Parameter(Mandatory=$false)] [Switch] $OpenSearch, - + [Parameter(Mandatory=$false)] [Switch] $MongoDB, - + [Parameter(Mandatory=$false)] [Switch] $PostgreSQL, - + [Parameter(Mandatory=$false)] [Switch] $Kafka @@ -34,9 +34,9 @@ param( $ErrorActionPreference = "Stop" $start = @() +if ($MongoDB) { $start += "meadowlark-mongodb-backend" } if ($ElasticSearch) { $start += "meadowlark-elasticsearch-backend" } if ($OpenSearch) { $start += "meadowlark-opensearch-backend"} -if ($MongoDB) { $start += "meadowlark-mongodb-backend" } if ($PostgreSQL) { $start += "meadowlark-postgresql-backend" } if ($Kafka) { $start += "meadowlark-kafka-stream" }