Skip to content

Commit cca9028

Browse files
authored
Merge pull request #149 from ing-bank/feature/lineageFilter
whitelist user-agent for lineage
2 parents 37779ae + 9470316 commit cca9028

File tree

5 files changed

+25
-12
lines changed

5 files changed

+25
-12
lines changed

src/it/scala/com/ing/wbaa/rokku/proxy/provider/LineageProviderAtlasItTest.scala

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import org.scalatest.Assertion
1212
import org.scalatest.diagrams.Diagrams
1313
import org.scalatest.wordspec.AnyWordSpecLike
1414

15-
import scala.concurrent.ExecutionContext
15+
import scala.concurrent.{ExecutionContext, TimeoutException}
1616

1717
class LineageProviderAtlasItTest extends AnyWordSpecLike with Diagrams with EmbeddedKafka {
1818

@@ -58,7 +58,7 @@ class LineageProviderAtlasItTest extends AnyWordSpecLike with Diagrams with Embe
5858
Thread.sleep(2000)
5959
createCustomTopic(createEventsTopic)
6060
apr.createLineageFromRequest(
61-
fakeIncomingHttpRequest(HttpMethods.PUT, "/fakeBucket/fakeObject"), userSTS, remoteClientIP)
61+
fakeIncomingHttpRequest(HttpMethods.PUT, "/fakeBucket/fakeObject").withHeaders(RawHeader("User-Agent", "aws-cli/1.16.68 Python/2.7")), userSTS, remoteClientIP)
6262
val message = consumeFirstStringMessageFrom(createEventsTopic)
6363
assert(message.contains("external_object_in/fakeObject"))
6464
}
@@ -69,7 +69,7 @@ class LineageProviderAtlasItTest extends AnyWordSpecLike with Diagrams with Embe
6969
withRunningKafka {
7070
Thread.sleep(2000)
7171
apr.createLineageFromRequest(
72-
fakeIncomingHttpRequest(HttpMethods.PUT, "/fakeBucket/fakeTags").withHeaders(RawHeader("rokku-metadata", "k1=v1")), userSTS, remoteClientIP)
72+
fakeIncomingHttpRequest(HttpMethods.PUT, "/fakeBucket/fakeTags").withHeaders(RawHeader("rokku-metadata", "k1=v1"), RawHeader("User-Agent", "aws-cli/1.16.68 Python/2.7")), userSTS, remoteClientIP)
7373
val message = consumeFirstStringMessageFrom(createEventsTopic)
7474
assert(message.contains("{\"awsTags\":[{\"attributes\":{\"key\":\"k1\",\"value\":\"v1\"},\"typeName\":\"aws_tag\"}]"))
7575
}
@@ -80,7 +80,7 @@ class LineageProviderAtlasItTest extends AnyWordSpecLike with Diagrams with Embe
8080
withRunningKafka {
8181
Thread.sleep(2000)
8282
apr.createLineageFromRequest(
83-
fakeIncomingHttpRequest(HttpMethods.PUT, "/fakeBucket/fakeTags").withHeaders(RawHeader("rokku-classifications", "customerPII,secret")), userSTS, remoteClientIP)
83+
fakeIncomingHttpRequest(HttpMethods.PUT, "/fakeBucket/fakeTags").withHeaders(RawHeader("rokku-classifications", "customerPII,secret"), RawHeader("User-Agent", "aws-cli/1.16.68 Python/2.7")), userSTS, remoteClientIP)
8484
val message = consumeFirstStringMessageFrom(createEventsTopic)
8585
assert(message.contains("\"classifications\":[{\"typeName\":\"customerPII\"},{\"typeName\":\"secret\"}]"))
8686
}
@@ -91,7 +91,7 @@ class LineageProviderAtlasItTest extends AnyWordSpecLike with Diagrams with Embe
9191
withRunningKafka {
9292
Thread.sleep(2000)
9393
apr.createLineageFromRequest(
94-
fakeIncomingHttpRequest(HttpMethods.GET, "/fakeBucket/fakeObject"), userSTS, remoteClientIP)
94+
fakeIncomingHttpRequest(HttpMethods.GET, "/fakeBucket/fakeObject").withHeaders(RawHeader("User-Agent", "aws-cli/1.16.68 Python/2.7")), userSTS, remoteClientIP)
9595
val message = consumeFirstStringMessageFrom(createEventsTopic)
9696
assert(message.contains("external_object_out/fakeObject"))
9797
}
@@ -102,10 +102,20 @@ class LineageProviderAtlasItTest extends AnyWordSpecLike with Diagrams with Embe
102102
withRunningKafka {
103103
Thread.sleep(2000)
104104
apr.createLineageFromRequest(
105-
fakeIncomingHttpRequest(HttpMethods.DELETE, "/fakeBucket/fakeObject"), userSTS, remoteClientIP)
105+
fakeIncomingHttpRequest(HttpMethods.DELETE, "/fakeBucket/fakeObject")withHeaders(RawHeader("User-Agent", "aws-cli/1.16.68 Python/2.7")), userSTS, remoteClientIP)
106106
val message = consumeFirstStringMessageFrom(createEventsTopic)
107107
assert(message.contains("fakeObject"))
108108
}
109109
}
110+
111+
"time out exception because the user agent is not whitelisted" in withLineageProviderAtlas() { apr =>
112+
implicit val config = EmbeddedKafkaConfig(kafkaPort = testKafkaPort)
113+
withRunningKafka {
114+
Thread.sleep(2000)
115+
apr.createLineageFromRequest(
116+
fakeIncomingHttpRequest(HttpMethods.DELETE, "/fakeBucket/fakeObject")withHeaders(RawHeader("User-Agent", "no-aws-cli/1.16.68 Python/2.7")), userSTS, remoteClientIP)
117+
assertThrows[TimeoutException](consumeFirstStringMessageFrom(createEventsTopic))
118+
}
119+
}
110120
}
111121
}

src/main/resources/application.conf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ rokku {
4545

4646
atlas {
4747
enabled = ${?ROKKU_ATLAS_ENABLED}
48+
whitelistUserAgentSplitByComma = ${?ROKKU_ATLAS_WHITELIST_LOWERCASE_COMMA_SEPARATED_USER_AGENT}
4849
}
4950

5051
kerberos {

src/main/resources/reference.conf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ rokku {
5454

5555
atlas {
5656
enabled = false
57+
whitelistUserAgentSplitByComma = "aws-cli,boto3"
5758
}
5859

5960
kerberos {

src/main/scala/com/ing/wbaa/rokku/proxy/provider/LineageProviderAtlas.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ trait LineageProviderAtlas extends LineageHelpers {
1919

2020
private val logger = new LoggerHandlerWithId
2121

22+
private val whitelistUserAgents = system.settings.config.getString("rokku.atlas.whitelistUserAgentSplitByComma").trim.split(",")
23+
2224
def createLineageFromRequest(httpRequest: HttpRequest, userSTS: User, userIPs: UserIps)(implicit id: RequestId): Future[Done] = {
2325
val lineageHeaders = getLineageHeaders(httpRequest)
2426
val pseudoDir = lineageHeaders.pseduoDir
@@ -29,7 +31,9 @@ trait LineageProviderAtlas extends LineageHelpers {
2931

3032
val extractObjectFromPath = bucketObject.getOrElse("").split("/").takeRight(1).mkString
3133

32-
if (lineageHeaders.bucket.length > 1) {
34+
val isClientTypeWhitelisted = whitelistUserAgents.contains(lineageHeaders.clientType.getOrElse("").toLowerCase())
35+
36+
if (lineageHeaders.bucket.length > 1 && isClientTypeWhitelisted) {
3337
lineageHeaders.method match {
3438
// mb bucket
3539
case HttpMethods.PUT if !lineageHeaders.bucket.isEmpty && pseudoDir.isEmpty && bucketObject.isEmpty =>

src/test/scala/com/ing/wbaa/rokku/proxy/handler/RequestHandlerS3Spec.scala

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,25 +2,22 @@ package com.ing.wbaa.rokku.proxy.handler
22

33
import akka.actor.ActorSystem
44
import akka.http.scaladsl.model._
5-
import com.ing.wbaa.rokku.proxy.config.{ KafkaSettings, StorageS3Settings }
5+
import com.ing.wbaa.rokku.proxy.config.StorageS3Settings
66
import com.ing.wbaa.rokku.proxy.data._
7-
import com.ing.wbaa.rokku.proxy.provider.LineageProviderAtlas
87
import com.ing.wbaa.rokku.proxy.queue.MemoryUserRequestQueue
98
import org.scalatest.diagrams.Diagrams
109
import org.scalatest.wordspec.AsyncWordSpec
1110

1211
import scala.concurrent.{ ExecutionContext, Future }
1312

14-
class RequestHandlerS3Spec extends AsyncWordSpec with Diagrams with RequestHandlerS3 with LineageProviderAtlas with MemoryUserRequestQueue {
13+
class RequestHandlerS3Spec extends AsyncWordSpec with Diagrams with RequestHandlerS3 with MemoryUserRequestQueue {
1514

1615
override implicit val system: ActorSystem = ActorSystem.create("test-system")
1716
override implicit val executionContext: ExecutionContext = system.dispatcher
1817
override val storageS3Settings: StorageS3Settings = new StorageS3Settings(system.settings.config) {
1918
override val storageS3Authority: Uri.Authority = Uri.Authority(Uri.Host("1.2.3.4"), 1234)
2019
}
2120

22-
override val kafkaSettings: KafkaSettings = new KafkaSettings(system.settings.config)
23-
2421
implicit val requestId: RequestId = RequestId("test")
2522

2623
var numFiredRequests = 0

0 commit comments

Comments
 (0)