Skip to content

Commit 02f8d95

Browse files
committed
reverting back to redis backed cache
1 parent f193fd3 commit 02f8d95

File tree

6 files changed

+119
-69
lines changed

6 files changed

+119
-69
lines changed

apps/threat-detection/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,12 @@
132132
<version>2.24.2</version>
133133
</dependency>
134134

135+
<dependency>
136+
<groupId>com.github.ben-manes.caffeine</groupId>
137+
<artifactId>caffeine</artifactId>
138+
<version>2.9.3</version>
139+
</dependency>
140+
135141

136142
</dependencies>
137143
<build>

apps/threat-detection/src/main/java/com/akto/threat/detection/cache/CounterCache.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,5 @@ public interface CounterCache {
1010

1111
boolean exists(String key);
1212

13-
void clear(String key);
13+
void reset(String key);
1414
}
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package com.akto.threat.detection.cache;
2+
3+
import com.github.benmanes.caffeine.cache.Cache;
4+
import com.github.benmanes.caffeine.cache.Caffeine;
5+
import io.lettuce.core.RedisClient;
6+
import io.lettuce.core.api.StatefulRedisConnection;
7+
8+
import java.util.HashMap;
9+
import java.util.HashSet;
10+
import java.util.Map;
11+
import java.util.Set;
12+
import java.util.concurrent.*;
13+
14+
public class RedisBackedCounterCache implements CounterCache {
15+
private final StatefulRedisConnection<String, Long> redis;
16+
17+
private final Cache<String, Long> localCache;
18+
19+
private final String prefix;
20+
private final ConcurrentLinkedQueue<Object> pendingOps;
21+
22+
static class PendingCounterOp {
23+
private final String key;
24+
private final long value;
25+
26+
public PendingCounterOp(String key, long value) {
27+
this.key = key;
28+
this.value = value;
29+
}
30+
31+
public String getKey() {
32+
return key;
33+
}
34+
35+
public long getValue() {
36+
return value;
37+
}
38+
}
39+
40+
public RedisBackedCounterCache(RedisClient redisClient, String prefix) {
41+
this.prefix = prefix;
42+
this.redis = redisClient.connect(new LongValueCodec());
43+
this.localCache = Caffeine.newBuilder().maximumSize(10000).expireAfterWrite(3, TimeUnit.HOURS).build();
44+
this.pendingOps = new ConcurrentLinkedQueue<>();
45+
}
46+
47+
@Override
48+
public void increment(String key) {
49+
this.incrementBy(key, 1);
50+
}
51+
52+
@Override
53+
public void incrementBy(String key, long val) {
54+
long cv = this.get(key);
55+
this.localCache.put(key, cv + val);
56+
57+
this.pendingOps.add(new PendingCounterOp(key, val));
58+
if (this.pendingOps.size() >= 100) {
59+
this.flush();
60+
}
61+
}
62+
63+
@Override
64+
public long get(String key) {
65+
if (this.localCache.asMap().containsKey(key)) {
66+
return this.localCache.asMap().get(key);
67+
}
68+
69+
Long rv = this.redis.sync().hget(prefix, key);
70+
71+
this.localCache.put(key, rv != null ? rv : 0L);
72+
return rv != null ? rv : 0L;
73+
}
74+
75+
@Override
76+
public boolean exists(String key) {
77+
if (this.localCache.asMap().containsKey(key)) {
78+
return true;
79+
}
80+
81+
return this.redis.sync().hexists(prefix, key);
82+
}
83+
84+
@Override
85+
public void reset(String key) {
86+
this.localCache.put(key, 0L);
87+
this.redis.async().hset(prefix, key, 0L);
88+
}
89+
90+
private void flush() {
91+
Set<String> keys = new HashSet<>();
92+
while (!this.pendingOps.isEmpty()) {
93+
PendingCounterOp op = (PendingCounterOp) this.pendingOps.poll();
94+
keys.add(op.getKey());
95+
}
96+
97+
Map<String, Long> val = new HashMap<>();
98+
for (String key : keys) {
99+
long cv = this.localCache.asMap().getOrDefault(key, 0L);
100+
val.put(key, cv);
101+
}
102+
103+
this.redis.async().hset(prefix, val);
104+
val.forEach((k, v) -> this.redis.async().expire(k, 3 * 60 * 60));
105+
106+
this.pendingOps.clear();
107+
}
108+
109+
}

apps/threat-detection/src/main/java/com/akto/threat/detection/cache/RedisCounterCache.java

Lines changed: 0 additions & 64 deletions
This file was deleted.

apps/threat-detection/src/main/java/com/akto/threat/detection/smart_event_detector/window_based/WindowBasedThresholdNotifier.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public Result shouldNotify(String aggKey, SampleMaliciousRequest maliciousEvent,
7070
boolean thresholdBreached = windowCount >= rule.getCondition().getMatchCount();
7171

7272
if (thresholdBreached) {
73-
this.cache.clear(cacheKey);
73+
this.cache.reset(cacheKey);
7474
}
7575

7676
return new Result(thresholdBreached);

apps/threat-detection/src/main/java/com/akto/threat/detection/tasks/MaliciousTrafficDetectorTask.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import com.akto.dto.HttpResponseParams;
1010
import com.akto.dto.RawApi;
1111
import com.akto.dto.api_protection_parse_layer.AggregationRules;
12-
import com.akto.dto.api_protection_parse_layer.Condition;
1312
import com.akto.dto.api_protection_parse_layer.Rule;
1413
import com.akto.dto.monitoring.FilterConfig;
1514
import com.akto.dto.test_editor.YamlTemplate;
@@ -26,7 +25,7 @@
2625
import com.akto.test_editor.execution.VariableResolver;
2726
import com.akto.test_editor.filter.data_operands_impl.ValidationResult;
2827
import com.akto.threat.detection.actor.SourceIPActorGenerator;
29-
import com.akto.threat.detection.cache.RedisCounterCache;
28+
import com.akto.threat.detection.cache.RedisBackedCounterCache;
3029
import com.akto.threat.detection.constants.KafkaTopic;
3130
import com.akto.threat.detection.kafka.KafkaProtoProducer;
3231
import com.akto.threat.detection.smart_event_detector.window_based.WindowBasedThresholdNotifier;
@@ -82,7 +81,7 @@ public MaliciousTrafficDetectorTask(
8281

8382
this.windowBasedThresholdNotifier =
8483
new WindowBasedThresholdNotifier(
85-
new RedisCounterCache(redisClient, "wbt"),
84+
new RedisBackedCounterCache(redisClient, "wbt"),
8685
new WindowBasedThresholdNotifier.Config(100, 10 * 60));
8786

8887
this.internalKafka = new KafkaProtoProducer(internalConfig);

0 commit comments

Comments
 (0)