Skip to content

Commit c122dc2

Browse files
authored
Merge 120284c into 38bba80
2 parents 38bba80 + 120284c commit c122dc2

File tree

6 files changed

+110
-1
lines changed

6 files changed

+110
-1
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ building a data ingestion app using Kinesis in a language other than Java.
1313
- Point the server to your kinesis stream by setting the `AWS_DEFAULT_REGION` and `KINESIS_STREAM` environment variables. Once the server is
1414
up and running, you can send data to Kinesis by opening a socket connection and sending utf-8 data.
1515
- Each record you send should be delimited by a new line.
16+
- If you don't pass in a hash key a random hash key is generated for you.
17+
- To pass in a hash key, add the hash key as kdshashkey on the root of your JSON object.
1618

1719
When service starts, it exposes two ports:
1820
1. Inlet Port: This port is used to receive the message from your app to be sent to kinesis. The server defaults to port `3000` but can be overridden by setting the `PORT` environment variable.

pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@
1818
<version>4.13.1</version>
1919
<scope>test</scope>
2020
</dependency>
21+
<dependency>
22+
<groupId>com.google.code.gson</groupId>
23+
<artifactId>gson</artifactId>
24+
<version>2.8.9</version>
25+
</dependency>
2126
<dependency>
2227
<groupId>com.amazonaws</groupId>
2328
<artifactId>amazon-kinesis-producer</artifactId>

src/main/java/com/warnermedia/kplserver/KinesisEventPublisher.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
import com.google.common.util.concurrent.FutureCallback;
99
import com.google.common.util.concurrent.Futures;
1010
import com.google.common.util.concurrent.ListenableFuture;
11+
import com.google.gson.Gson;
12+
import com.google.gson.JsonSyntaxException;
1113
import org.apache.commons.lang.StringUtils;
1214
import org.apache.commons.logging.Log;
1315
import org.apache.commons.logging.LogFactory;
@@ -46,14 +48,33 @@ public void runOnce(String line) throws Exception {
4648

4749
ByteBuffer data = ByteBuffer.wrap(finalLine.getBytes(java.nio.charset.StandardCharsets.UTF_8));
4850

51+
// Need to serialize this to an object to get the key.
52+
String hashKey;
53+
54+
Gson gson = new Gson();
55+
try {
56+
MinimalKey minimal = gson.fromJson(line, MinimalKey.class);
57+
if (minimal.kdsHashKey != null) {
58+
hashKey = minimal.kdsHashKey;
59+
log.debug("Using passed in hash key");
60+
} else {
61+
hashKey = randomExplicitHashKey();
62+
log.debug("Using random hash key");
63+
}
64+
}
65+
catch (JsonSyntaxException e) {
66+
hashKey = randomExplicitHashKey();
67+
log.debug("Using random hash key");
68+
}
69+
4970
//This is a measure of the backpressure in the system, which should be checked before putting more records,
5071
//to avoid exhausting system resources.
5172
while (kinesis.getOutstandingRecordsCount() > 1e4) {
5273
log.info("Too many outstanding records pending in the queue. Waiting for a second.");
5374
Thread.sleep(500);
5475
}
5576

56-
UserRecord userRecord = new UserRecord(stream, " ", randomExplicitHashKey(), data);
77+
UserRecord userRecord = new UserRecord(stream, " ", hashKey, data);
5778
ListenableFuture<UserRecordResult> f = kinesis.addUserRecord(userRecord);
5879

5980
Futures.addCallback(f, new FutureCallback<UserRecordResult>() {
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package com.warnermedia.kplserver;
2+
3+
public class MinimalKey {
4+
String kdsHashKey = null;
5+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package com.warnermedia.kplserver;
2+
3+
import com.google.gson.Gson;
4+
5+
import java.io.*;
6+
import java.net.Socket;
7+
8+
9+
public class TestClientJSONKey {
10+
public static class TestWithKey {
11+
String kdsHashKey;
12+
String testa;
13+
}
14+
15+
public static void main(String[] args) throws Exception, IOException, ClassNotFoundException, InterruptedException {
16+
17+
System.out.println("starting client");
18+
19+
// establish socket connection to server
20+
Socket socket = new Socket("127.0.0.1", 3000);
21+
OutputStreamWriter out = new OutputStreamWriter(socket.getOutputStream(), "UTF-8");
22+
23+
System.out.println("Sending request to Socket Server");
24+
25+
TestWithKey tst = new TestWithKey();
26+
tst.kdsHashKey = "mykey";
27+
tst.testa ="hello";
28+
29+
Gson gson = new Gson();
30+
String jsonResult = gson.toJson(tst);
31+
String jsonFinal = jsonResult + "\n";
32+
33+
out.write(jsonFinal);
34+
out.flush();
35+
Thread.sleep(100);
36+
37+
socket.close();
38+
}
39+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package com.warnermedia.kplserver;
2+
3+
import com.google.gson.Gson;
4+
5+
import java.io.*;
6+
import java.net.Socket;
7+
8+
9+
public class TestClientJSONNoKey {
10+
public static class TestWithNoKey {
11+
String testa;
12+
}
13+
14+
public static void main(String[] args) throws Exception, IOException, ClassNotFoundException, InterruptedException {
15+
16+
System.out.println("starting client");
17+
18+
// establish socket connection to server
19+
Socket socket = new Socket("127.0.0.1", 3000);
20+
OutputStreamWriter out = new OutputStreamWriter(socket.getOutputStream(), "UTF-8");
21+
22+
System.out.println("Sending request to Socket Server");
23+
24+
TestWithNoKey tst = new TestWithNoKey();
25+
tst.testa ="hello";
26+
27+
Gson gson = new Gson();
28+
String jsonResult = gson.toJson(tst);
29+
String jsonFinal = jsonResult + "\n";
30+
31+
out.write(jsonFinal);
32+
out.flush();
33+
Thread.sleep(100);
34+
35+
socket.close();
36+
}
37+
}

0 commit comments

Comments
 (0)