Skip to content

Commit

Permalink
[RND-629] Fix issues with kafka sink to opensearch transformation (#294)
Browse files Browse the repository at this point in the history
* [RND-629]  Fix issues with kafka sink to opensearch transformation

- Update GenerateIndexFromResource to update index and replace dots.
- Remove validation to append descriptor part to index name.

* Update test

* Update README.md

Add details about gradle-wrapper generation.

* Update README.md
  • Loading branch information
jleiva-gap authored Aug 29, 2023
1 parent b56e26e commit 9670c6f
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 19 deletions.
16 changes: 16 additions & 0 deletions docker/kafka/ed-fi-kafka-connect-transforms/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,22 @@ transforms.GenerateIndexFromResource.field.name=projectName,resourceVersion,reso

## Running transformations

### Prerequisites

- Install, if you don't have it, gradle version 7.2.4 according to the [installation guide](https://gradle.org/install/)
- To verify your installation, pen a console (or a Windows command prompt) and run gradle -v to run gradle and display the version, e.g.:
`> gradle -v`
Result:
```
------------------------------------------------------------
Gradle 7.2.4
------------------------------------------------------------
```

- To run the transforms locally for the first time you need to build the gradle-wrapper.jar. To generate it, run the following command, this will add the gradle-wrapper.jar in the gradle\wrapper folder
`> gradle wrapper`

### Tasks
This project includes a series of *gradle* tasks:

- `./gradlew build`: Compile code
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,11 @@ public R apply(final R record) {
topicResult.append(
topicNameFromNamedField(record.toString(),
schemaAndValue.value(),
field).get() + separator);
field).get().replace(".", "-") + separator);
});

topicResult.replace(topicResult.length() - 1, topicResult.length(), "");

if (record.toString().contains("isDescriptor=true")) {
topicResult.append("descriptor");
}

newTopic = Optional.of(topicResult.toString());

if (newTopic.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,24 +118,21 @@ void generateIndex_ReceivingObject_WithCommaSeparatedList(final String isDescrip
final SinkRecord originalRecord;
final var project = "project";
final var resourceVersion = "resourceVersion";
final var resourceVersionValue = "major.minor.patch";
final var resourceName = "resourceName";

final var resourceNameValue = "resourceName" + (isDescriptor.equals("true") ? "descriptor" : "");
final Map<String, String> receivedObject = Stream.of(new String[][] {
{"project", project},
{"resourceVersion", resourceVersion},
{"resourceName", resourceName},
{"resourceVersion", resourceVersionValue},
{"resourceName", resourceNameValue},
{"additionalData", "additionalData"},
{"isDescriptor", isDescriptor},
}).collect(Collectors.toMap(data -> data[0], data -> data[1]));

originalRecord = record(receivedObject);
final var expectedResult = (project + "$" + resourceVersionValue + "$" + resourceNameValue).replace(".", "-");

final var params = project + "," + resourceVersion + "," + resourceName;
var expectedResult = project + "$" + resourceVersion + "$" + resourceName;
if (isDescriptor.equals("true")) {
expectedResult += "descriptor";
}

final SinkRecord result = transformation(params).apply(originalRecord);
assertThat(result).isEqualTo(setNewTopic(originalRecord, expectedResult));
}
Expand Down
12 changes: 6 additions & 6 deletions eng/Run-DevContainers.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,23 @@
PostgreSQL. The MongoDB collection also includes Kafka with Debezium.
#>
param(

[Parameter(Mandatory=$false)]
[Switch]
$ElasticSearch,

[Parameter(Mandatory=$false)]
[Switch]
$OpenSearch,

[Parameter(Mandatory=$false)]
[Switch]
$MongoDB,

[Parameter(Mandatory=$false)]
[Switch]
$PostgreSQL,

[Parameter(Mandatory=$false)]
[Switch]
$Kafka
Expand All @@ -34,9 +34,9 @@ param(
$ErrorActionPreference = "Stop"

$start = @()
if ($MongoDB) { $start += "meadowlark-mongodb-backend" }
if ($ElasticSearch) { $start += "meadowlark-elasticsearch-backend" }
if ($OpenSearch) { $start += "meadowlark-opensearch-backend"}
if ($MongoDB) { $start += "meadowlark-mongodb-backend" }
if ($PostgreSQL) { $start += "meadowlark-postgresql-backend" }
if ($Kafka) { $start += "meadowlark-kafka-stream" }

Expand Down

0 comments on commit 9670c6f

Please sign in to comment.