Skip to content

Commit 7138ed0

Browse files
committed
update: 更新nutzboot/fastjson版本 & 优化掉redis keys -> scan 提升性能
1 parent 3186a76 commit 7138ed0

File tree

9 files changed

+57
-49
lines changed

9 files changed

+57
-49
lines changed

mqtt-auth/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<parent>
66
<artifactId>mqtt-wk</artifactId>
77
<groupId>cn.wizzer</groupId>
8-
<version>1.1.2-netty</version>
8+
<version>1.2.0-netty</version>
99
</parent>
1010
<modelVersion>4.0.0</modelVersion>
1111
<packaging>jar</packaging>

mqtt-broker/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<parent>
66
<artifactId>mqtt-wk</artifactId>
77
<groupId>cn.wizzer</groupId>
8-
<version>1.1.2-netty</version>
8+
<version>1.2.0-netty</version>
99
</parent>
1010
<modelVersion>4.0.0</modelVersion>
1111
<packaging>jar</packaging>

mqtt-common/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<parent>
66
<artifactId>mqtt-wk</artifactId>
77
<groupId>cn.wizzer</groupId>
8-
<version>1.1.2-netty</version>
8+
<version>1.2.0-netty</version>
99
</parent>
1010

1111
<modelVersion>4.0.0</modelVersion>

mqtt-store/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<parent>
66
<artifactId>mqtt-wk</artifactId>
77
<groupId>cn.wizzer</groupId>
8-
<version>1.1.2-netty</version>
8+
<version>1.2.0-netty</version>
99
</parent>
1010
<modelVersion>4.0.0</modelVersion>
1111
<packaging>jar</packaging>

mqtt-store/src/main/java/cn/wizzer/iot/mqtt/server/store/cache/RetainMessageCache.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
import org.nutz.ioc.impl.PropertiesProxy;
88
import org.nutz.ioc.loader.annotation.Inject;
99
import org.nutz.ioc.loader.annotation.IocBean;
10+
import redis.clients.jedis.ScanParams;
11+
import redis.clients.jedis.ScanResult;
1012

1113
import java.util.HashMap;
1214
import java.util.Map;
@@ -43,14 +45,15 @@ public void remove(String topic) {
4345

4446
public Map<String, RetainMessageStore> all() {
4547
Map<String, RetainMessageStore> map = new HashMap<>();
46-
Set<String> set=redisService.keys(CACHE_PRE + "*");
47-
if(set!=null&&!set.isEmpty()) {
48-
set.forEach(
49-
entry -> {
50-
map.put(entry.substring(CACHE_PRE.length()), JSONObject.parseObject(redisService.get(entry), RetainMessageStore.class));
51-
}
52-
);
53-
}
48+
ScanParams match = new ScanParams().match(CACHE_PRE + "*");
49+
ScanResult<String> scan = null;
50+
do {
51+
scan = redisService.scan(scan == null ? ScanParams.SCAN_POINTER_START : scan.getStringCursor(), match);
52+
for (String key : scan.getResult()) {
53+
map.put(key.substring(CACHE_PRE.length()), JSONObject.parseObject(redisService.get(key), RetainMessageStore.class));
54+
55+
}
56+
} while (!scan.isCompleteIteration());
5457
return map;
5558
}
5659
}

mqtt-store/src/main/java/cn/wizzer/iot/mqtt/server/store/cache/SubscribeNotWildcardCache.java

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
import org.nutz.integration.jedis.RedisService;
77
import org.nutz.ioc.loader.annotation.Inject;
88
import org.nutz.ioc.loader.annotation.IocBean;
9+
import redis.clients.jedis.ScanParams;
10+
import redis.clients.jedis.ScanResult;
911

1012
import java.util.*;
1113
import java.util.concurrent.ConcurrentHashMap;
@@ -50,21 +52,21 @@ public void removeForClient(String clientId) {
5052

5153
public Map<String, ConcurrentHashMap<String, SubscribeStore>> all() {
5254
Map<String, ConcurrentHashMap<String, SubscribeStore>> map = new HashMap<>();
53-
Set<String> set = redisService.keys(CACHE_PRE + "*");
54-
if (set != null && !set.isEmpty()) {
55-
set.forEach(
56-
entry -> {
57-
ConcurrentHashMap<String, SubscribeStore> map1 = new ConcurrentHashMap<>();
58-
Map<String, String> map2 = redisService.hgetAll(entry);
59-
if (map2 != null && !map2.isEmpty()) {
60-
map2.forEach((k, v) -> {
61-
map1.put(k, JSONObject.parseObject(v, SubscribeStore.class));
62-
});
63-
map.put(entry.substring(CACHE_PRE.length()), map1);
64-
}
65-
}
66-
);
67-
}
55+
ScanParams match = new ScanParams().match(CACHE_PRE + "*");
56+
ScanResult<String> scan = null;
57+
do {
58+
scan = redisService.scan(scan == null ? ScanParams.SCAN_POINTER_START : scan.getStringCursor(), match);
59+
for (String key : scan.getResult()) {
60+
ConcurrentHashMap<String, SubscribeStore> map1 = new ConcurrentHashMap<>();
61+
Map<String, String> map2 = redisService.hgetAll(key);
62+
if (map2 != null && !map2.isEmpty()) {
63+
map2.forEach((k, v) -> {
64+
map1.put(k, JSONObject.parseObject(v, SubscribeStore.class));
65+
});
66+
map.put(key.substring(CACHE_PRE.length()), map1);
67+
}
68+
}
69+
} while (!scan.isCompleteIteration());
6870
return map;
6971
}
7072

mqtt-store/src/main/java/cn/wizzer/iot/mqtt/server/store/cache/SubscribeWildcardCache.java

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
package cn.wizzer.iot.mqtt.server.store.cache;
22

3+
import cn.wizzer.iot.mqtt.server.common.message.RetainMessageStore;
34
import cn.wizzer.iot.mqtt.server.common.subscribe.SubscribeStore;
45
import com.alibaba.fastjson.JSONObject;
56
import org.nutz.aop.interceptor.async.Async;
67
import org.nutz.integration.jedis.RedisService;
78
import org.nutz.ioc.loader.annotation.Inject;
89
import org.nutz.ioc.loader.annotation.IocBean;
10+
import redis.clients.jedis.ScanParams;
11+
import redis.clients.jedis.ScanResult;
912

1013
import java.util.*;
1114
import java.util.concurrent.ConcurrentHashMap;
@@ -50,21 +53,21 @@ public void removeForClient(String clientId) {
5053

5154
public Map<String, ConcurrentHashMap<String, SubscribeStore>> all() {
5255
Map<String, ConcurrentHashMap<String, SubscribeStore>> map = new HashMap<>();
53-
Set<String> set = redisService.keys(CACHE_PRE + "*");
54-
if (set != null && !set.isEmpty()) {
55-
set.forEach(
56-
entry -> {
57-
ConcurrentHashMap<String, SubscribeStore> map1 = new ConcurrentHashMap<>();
58-
Map<String, String> map2 = redisService.hgetAll(entry);
59-
if (map2 != null && !map2.isEmpty()) {
60-
map2.forEach((k, v) -> {
61-
map1.put(k, JSONObject.parseObject(v, SubscribeStore.class));
62-
});
63-
map.put(entry.substring(CACHE_PRE.length()), map1);
64-
}
65-
}
66-
);
67-
}
56+
ScanParams match = new ScanParams().match(CACHE_PRE + "*");
57+
ScanResult<String> scan = null;
58+
do {
59+
scan = redisService.scan(scan == null ? ScanParams.SCAN_POINTER_START : scan.getStringCursor(), match);
60+
for (String key : scan.getResult()) {
61+
ConcurrentHashMap<String, SubscribeStore> map1 = new ConcurrentHashMap<>();
62+
Map<String, String> map2 = redisService.hgetAll(key);
63+
if (map2 != null && !map2.isEmpty()) {
64+
map2.forEach((k, v) -> {
65+
map1.put(k, JSONObject.parseObject(v, SubscribeStore.class));
66+
});
67+
map.put(key.substring(CACHE_PRE.length()), map1);
68+
}
69+
}
70+
} while (!scan.isCompleteIteration());
6871
return map;
6972
}
7073

mqtt-zoo/mqtt-test-kafka/pom.xml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,12 @@
44
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
55
<groupId>cn.wizzer</groupId>
66
<artifactId>mqtt-test-kafka</artifactId>
7-
<version>1.1.2-netty</version>
7+
<version>1.2.0-netty</version>
88
<modelVersion>4.0.0</modelVersion>
99
<packaging>jar</packaging>
1010
<properties>
11-
<mqttwk.version>1.1.2-netty</mqttwk.version>
12-
<nutzboot.version>2.3.5.v20190516</nutzboot.version>
11+
<mqttwk.version>1.2.0-netty</mqttwk.version>
12+
<nutzboot.version>2.4.1.v20201014</nutzboot.version>
1313
<netty.version>4.1.28.Final</netty.version>
1414
<fastjson.version>1.2.55</fastjson.version>
1515
<hutool.version>4.1.2</hutool.version>

pom.xml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
<groupId>cn.wizzer</groupId>
77
<artifactId>mqtt-wk</artifactId>
88
<packaging>pom</packaging>
9-
<version>1.1.2-netty</version>
9+
<version>1.2.0-netty</version>
1010
<name>MqttWk</name>
1111
<modules>
1212
<module>mqtt-common</module>
@@ -15,10 +15,10 @@
1515
<module>mqtt-store</module>
1616
</modules>
1717
<properties>
18-
<mqttwk.version>1.1.2-netty</mqttwk.version>
19-
<nutzboot.version>2.3.5.v20190516</nutzboot.version>
18+
<mqttwk.version>1.2.0-netty</mqttwk.version>
19+
<nutzboot.version>2.4.1.v20201014</nutzboot.version>
2020
<netty.version>4.1.28.Final</netty.version>
21-
<fastjson.version>1.2.60</fastjson.version>
21+
<fastjson.version>1.2.74</fastjson.version>
2222
<hutool.version>4.1.2</hutool.version>
2323
<kafka_2.12.version>2.0.0</kafka_2.12.version>
2424
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

0 commit comments

Comments
 (0)