|
1 | 1 | package com.warnermedia.kplserver;
|
2 | 2 |
|
| 3 | +import com.amazonaws.auth.AWSCredentialsProvider; |
| 4 | +import com.amazonaws.auth.AWSStaticCredentialsProvider; |
| 5 | +import com.amazonaws.auth.BasicSessionCredentials; |
| 6 | +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; |
| 7 | +import com.amazonaws.auth.profile.ProfileCredentialsProvider; |
3 | 8 | import com.amazonaws.services.kinesis.producer.KinesisProducer;
|
4 | 9 | import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
|
5 | 10 | import com.amazonaws.services.kinesis.producer.UserRecord;
|
6 | 11 | import com.amazonaws.services.kinesis.producer.UserRecordFailedException;
|
7 | 12 | import com.amazonaws.services.kinesis.producer.UserRecordResult;
|
| 13 | +import com.amazonaws.services.securitytoken.AWSSecurityTokenService; |
| 14 | +import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceAsyncClientBuilder; |
| 15 | +import com.amazonaws.services.securitytoken.model.AssumeRoleRequest; |
| 16 | +import com.amazonaws.services.securitytoken.model.AssumeRoleResult; |
| 17 | +import com.amazonaws.services.securitytoken.model.Credentials; |
8 | 18 | import com.google.common.util.concurrent.FutureCallback;
|
9 | 19 | import com.google.common.util.concurrent.Futures;
|
10 | 20 | import com.google.common.util.concurrent.ListenableFuture;
|
@@ -36,14 +46,48 @@ public class KinesisEventPublisher {
|
36 | 46 | ServerSocket errSocket;
|
37 | 47 | Socket errClient;
|
38 | 48 |
|
39 |
| - public KinesisEventPublisher(String stream, String region, String metricsLevel, ServerSocket errSocket) { |
| 49 | + public KinesisEventPublisher(String stream, String region, String metricsLevel, String crossAccountRole, ServerSocket errSocket) { |
40 | 50 | this.stream = stream;
|
41 | 51 | kinesis = new KinesisProducer(new KinesisProducerConfiguration()
|
42 | 52 | .setRegion(region)
|
43 |
| - .setMetricsLevel(metricsLevel)); |
| 53 | + .setMetricsLevel(metricsLevel) |
| 54 | + .setCredentialsProvider(loadCredentials(crossAccountRole))); |
44 | 55 | this.errSocket = errSocket;
|
45 | 56 | }
|
46 | 57 |
|
| 58 | + private static AWSCredentialsProvider loadCredentials(String crossAccountRole) { |
| 59 | + final AWSCredentialsProvider credentialsProvider; |
| 60 | + |
| 61 | + Boolean isCrossAccount = false; |
| 62 | + if (!crossAccountRole.equals("")) { |
| 63 | + isCrossAccount = true; |
| 64 | + } |
| 65 | + |
| 66 | + if (isCrossAccount) { |
| 67 | + AWSSecurityTokenService stsClient = AWSSecurityTokenServiceAsyncClientBuilder.standard() |
| 68 | + .withCredentials(new ProfileCredentialsProvider("nonprodjump")) |
| 69 | + .withRegion("us-east-1") |
| 70 | + .build(); |
| 71 | + |
| 72 | + AssumeRoleRequest assumeRoleRequest = new AssumeRoleRequest().withDurationSeconds(3600) |
| 73 | + .withRoleArn(crossAccountRole) |
| 74 | + .withRoleSessionName("Kinesis_Session"); |
| 75 | + |
| 76 | + AssumeRoleResult assumeRoleResult = stsClient.assumeRole(assumeRoleRequest); |
| 77 | + Credentials creds = assumeRoleResult.getCredentials(); |
| 78 | + |
| 79 | + credentialsProvider = new AWSStaticCredentialsProvider( |
| 80 | + new BasicSessionCredentials(creds.getAccessKeyId(), |
| 81 | + creds.getSecretAccessKey(), |
| 82 | + creds.getSessionToken()) |
| 83 | + ); |
| 84 | + } else { |
| 85 | + credentialsProvider = new DefaultAWSCredentialsProviderChain(); |
| 86 | + } |
| 87 | + |
| 88 | + return credentialsProvider; |
| 89 | + } |
| 90 | + |
47 | 91 | public void runOnce(String line) throws Exception {
|
48 | 92 | // add new line so that downstream systems have an easier time parsing
|
49 | 93 | String finalLine = line + "\n";
|
|
0 commit comments