Skip to content

Commit 3ea653b

Browse files
authored
Merge 7700ac4 into 7bb33eb
2 parents 7bb33eb + 7700ac4 commit 3ea653b

File tree

3 files changed

+55
-5
lines changed

3 files changed

+55
-5
lines changed

pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,12 @@
2626
<dependency>
2727
<groupId>com.amazonaws</groupId>
2828
<artifactId>amazon-kinesis-producer</artifactId>
29-
<version>0.14.0</version>
29+
<version>0.14.12</version>
3030
</dependency>
3131
<dependency>
3232
<groupId>com.amazonaws</groupId>
3333
<artifactId>aws-java-sdk</artifactId>
34-
<version>1.11.327</version>
34+
<version>1.12.198</version>
3535
</dependency>
3636
<dependency>
3737
<groupId>javax.xml.bind</groupId>

src/main/java/com/warnermedia/kplserver/App.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ public static void main(String[] args) throws Exception {
2525
ServerSocket errSocket = new ServerSocket(port);
2626
errSocket.setSoTimeout(100);
2727

28-
KinesisEventPublisher kinesisEventPublisher = new KinesisEventPublisher(stream, getRegion(), getMetricsLevel(), errSocket);
28+
KinesisEventPublisher kinesisEventPublisher = new KinesisEventPublisher(stream, getRegion(), getMetricsLevel(), getCrossAccountRole(), errSocket);
2929

3030
// graceful shutdowns
3131
Runtime.getRuntime().addShutdownHook(new Thread() {
@@ -88,4 +88,11 @@ static String getMetricsLevel() {
8888
return p;
8989
}
9090

91+
static String getCrossAccountRole() {
92+
String p = System.getenv("CROSS_ACCOUNT_ROLE");
93+
if (p == null || p.equals("")) {
94+
return "";
95+
}
96+
return p;
97+
}
9198
}

src/main/java/com/warnermedia/kplserver/KinesisEventPublisher.java

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,20 @@
11
package com.warnermedia.kplserver;
22

3+
import com.amazonaws.auth.AWSCredentialsProvider;
4+
import com.amazonaws.auth.AWSStaticCredentialsProvider;
5+
import com.amazonaws.auth.BasicSessionCredentials;
6+
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
7+
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
38
import com.amazonaws.services.kinesis.producer.KinesisProducer;
49
import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
510
import com.amazonaws.services.kinesis.producer.UserRecord;
611
import com.amazonaws.services.kinesis.producer.UserRecordFailedException;
712
import com.amazonaws.services.kinesis.producer.UserRecordResult;
13+
import com.amazonaws.services.securitytoken.AWSSecurityTokenService;
14+
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceAsyncClientBuilder;
15+
import com.amazonaws.services.securitytoken.model.AssumeRoleRequest;
16+
import com.amazonaws.services.securitytoken.model.AssumeRoleResult;
17+
import com.amazonaws.services.securitytoken.model.Credentials;
818
import com.google.common.util.concurrent.FutureCallback;
919
import com.google.common.util.concurrent.Futures;
1020
import com.google.common.util.concurrent.ListenableFuture;
@@ -36,14 +46,47 @@ public class KinesisEventPublisher {
3646
ServerSocket errSocket;
3747
Socket errClient;
3848

39-
public KinesisEventPublisher(String stream, String region, String metricsLevel, ServerSocket errSocket) {
49+
public KinesisEventPublisher(String stream, String region, String metricsLevel, String crossAccountRole, ServerSocket errSocket) {
4050
this.stream = stream;
4151
kinesis = new KinesisProducer(new KinesisProducerConfiguration()
4252
.setRegion(region)
43-
.setMetricsLevel(metricsLevel));
53+
.setMetricsLevel(metricsLevel)
54+
.setCredentialsProvider(loadCredentials(crossAccountRole, region)));
4455
this.errSocket = errSocket;
4556
}
4657

58+
private static AWSCredentialsProvider loadCredentials(String crossAccountRole, String region) {
59+
final AWSCredentialsProvider credentialsProvider;
60+
61+
Boolean isCrossAccount = false;
62+
if (!crossAccountRole.equals("")) {
63+
isCrossAccount = true;
64+
}
65+
66+
if (isCrossAccount) {
67+
AWSSecurityTokenService stsClient = AWSSecurityTokenServiceAsyncClientBuilder.standard()
68+
.withRegion(region)
69+
.build();
70+
71+
AssumeRoleRequest assumeRoleRequest = new AssumeRoleRequest().withDurationSeconds(3600)
72+
.withRoleArn(crossAccountRole)
73+
.withRoleSessionName("Kinesis_Session");
74+
75+
AssumeRoleResult assumeRoleResult = stsClient.assumeRole(assumeRoleRequest);
76+
Credentials creds = assumeRoleResult.getCredentials();
77+
78+
credentialsProvider = new AWSStaticCredentialsProvider(
79+
new BasicSessionCredentials(creds.getAccessKeyId(),
80+
creds.getSecretAccessKey(),
81+
creds.getSessionToken())
82+
);
83+
} else {
84+
credentialsProvider = new DefaultAWSCredentialsProviderChain();
85+
}
86+
87+
return credentialsProvider;
88+
}
89+
4790
public void runOnce(String line) throws Exception {
4891
// add new line so that downstream systems have an easier time parsing
4992
String finalLine = line + "\n";

0 commit comments

Comments
 (0)