Skip to content

Commit 141cb46

Browse files
committed
fix: 解决遗嘱消息缓存部分丢失问题 & 发布1.0.6-netty
1 parent 39a042a commit 141cb46

File tree

8 files changed

+94
-14
lines changed

8 files changed

+94
-14
lines changed

mqtt-auth/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<parent>
66
<artifactId>mqtt-wk</artifactId>
77
<groupId>cn.wizzer</groupId>
8-
<version>1.0.5-netty</version>
8+
<version>1.0.6-netty</version>
99
</parent>
1010
<modelVersion>4.0.0</modelVersion>
1111
<packaging>jar</packaging>

mqtt-broker/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<parent>
66
<artifactId>mqtt-wk</artifactId>
77
<groupId>cn.wizzer</groupId>
8-
<version>1.0.5-netty</version>
8+
<version>1.0.6-netty</version>
99
</parent>
1010
<modelVersion>4.0.0</modelVersion>
1111
<packaging>jar</packaging>

mqtt-common/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<parent>
66
<artifactId>mqtt-wk</artifactId>
77
<groupId>cn.wizzer</groupId>
8-
<version>1.0.5-netty</version>
8+
<version>1.0.6-netty</version>
99
</parent>
1010

1111
<modelVersion>4.0.0</modelVersion>

mqtt-common/src/main/java/cn/wizzer/iot/mqtt/server/common/session/SessionStore.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ public class SessionStore implements Serializable {
2323

2424
private MqttPublishMessage willMessage;
2525

26+
public SessionStore(){
27+
28+
}
29+
2630
public SessionStore(String clientId, String channelId, boolean cleanSession, MqttPublishMessage willMessage) {
2731
this.clientId = clientId;
2832
this.channelId = channelId;

mqtt-store/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<parent>
66
<artifactId>mqtt-wk</artifactId>
77
<groupId>cn.wizzer</groupId>
8-
<version>1.0.5-netty</version>
8+
<version>1.0.6-netty</version>
99
</parent>
1010
<modelVersion>4.0.0</modelVersion>
1111
<packaging>jar</packaging>

mqtt-store/src/main/java/cn/wizzer/iot/mqtt/server/store/session/SessionStoreService.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,19 @@
11
/**
2-
* Copyright (c) 2018, Mr.Wang ([email protected]) All rights reserved.
2+
* Created by wizzer on 2018
33
*/
44

55
package cn.wizzer.iot.mqtt.server.store.session;
66

77
import cn.wizzer.iot.mqtt.server.common.session.ISessionStoreService;
88
import cn.wizzer.iot.mqtt.server.common.session.SessionStore;
9+
import cn.wizzer.iot.mqtt.server.store.util.StoreUtil;
10+
import com.alibaba.fastjson.JSON;
911
import org.nutz.aop.interceptor.async.Async;
1012
import org.nutz.integration.jedis.RedisService;
1113
import org.nutz.ioc.loader.annotation.Inject;
1214
import org.nutz.ioc.loader.annotation.IocBean;
13-
import org.nutz.json.Json;
14-
import org.nutz.json.JsonFormat;
15+
import org.nutz.lang.Strings;
16+
import org.nutz.lang.util.NutMap;
1517

1618
/**
1719
* 会话存储服务
@@ -24,16 +26,19 @@ public class SessionStoreService implements ISessionStoreService {
2426

2527
@Override
2628
public void put(String clientId, SessionStore sessionStore) {
27-
//fastjson需要对象有get/set方法,而MqttPublishMessage对象没有get/set方法造成转换失败,改成nutz的工具类
28-
redisService.set(CACHE_PRE + clientId, Json.toJson(sessionStore, JsonFormat.compact()));
29+
//SessionStore对象不能正常转为JSON,使用工具类类解决
30+
NutMap nutMap = StoreUtil.transPublishToMapBeta(sessionStore);
31+
redisService.set(CACHE_PRE + clientId, JSON.toJSONString(nutMap));
2932
}
3033

3134

3235
@Override
3336
public SessionStore get(String clientId) {
34-
String obj = redisService.get(CACHE_PRE + clientId);
35-
if (obj != null)
36-
return Json.fromJson(SessionStore.class, obj);
37+
String jsonObj = redisService.get(CACHE_PRE + clientId);
38+
if (Strings.isNotBlank(jsonObj)) {
39+
NutMap nutMap = JSON.parseObject(jsonObj, NutMap.class);
40+
return StoreUtil.mapTransToPublishMsgBeta(nutMap);
41+
}
3742
return null;
3843
}
3944

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package cn.wizzer.iot.mqtt.server.store.util;
2+
3+
import cn.wizzer.iot.mqtt.server.common.session.SessionStore;
4+
import io.netty.buffer.ByteBuf;
5+
import io.netty.buffer.ByteBufAllocator;
6+
import io.netty.buffer.ByteBufUtil;
7+
import io.netty.handler.codec.mqtt.*;
8+
import org.nutz.lang.util.NutMap;
9+
import org.slf4j.Logger;
10+
import org.slf4j.LoggerFactory;
11+
12+
/**
13+
* Created by cs on 2018
14+
*/
15+
public class StoreUtil {
16+
17+
private static final Logger LOGGER = LoggerFactory.getLogger(StoreUtil.class);
18+
19+
public static NutMap transPublishToMapBeta(SessionStore store) {
20+
MqttPublishMessage msg = store.getWillMessage();
21+
try {
22+
NutMap sessionStore = new NutMap();
23+
sessionStore.addv("clientId", store.getClientId());
24+
sessionStore.addv("channelId", store.getChannelId());
25+
sessionStore.addv("cleanSession", store.isCleanSession());
26+
sessionStore.addv("payload", new String(msg.payload().array(), "UTF-8"));
27+
28+
sessionStore.addv("messageType", msg.fixedHeader().messageType().value());
29+
sessionStore.addv("isDup", msg.fixedHeader().isDup());
30+
sessionStore.addv("qosLevel", msg.fixedHeader().qosLevel().value());
31+
sessionStore.addv("isRetain", msg.fixedHeader().isRetain());
32+
sessionStore.addv("remainingLength", msg.fixedHeader().remainingLength());
33+
34+
sessionStore.addv("topicName", msg.variableHeader().topicName());
35+
sessionStore.addv("packetId", msg.variableHeader().packetId());
36+
37+
38+
return sessionStore;
39+
} catch (Exception e) {
40+
LOGGER.error(e.getMessage(), e);
41+
}
42+
return null;
43+
}
44+
45+
public static SessionStore mapTransToPublishMsgBeta(NutMap store) {
46+
SessionStore sessionStore = new SessionStore();
47+
48+
String payload = store.getString("payload");
49+
ByteBuf buf = ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, payload);
50+
51+
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(
52+
MqttMessageType.valueOf(store.getInt("messageType")),
53+
store.getBoolean("isDup"),
54+
MqttQoS.valueOf(store.getInt("qosLevel")),
55+
store.getBoolean("isRetain"),
56+
store.getInt("remainingLength"));
57+
58+
MqttPublishVariableHeader mqttPublishVariableHeader = new MqttPublishVariableHeader(store.getString("topicName"),
59+
store.getInt("packetId"));
60+
61+
MqttPublishMessage mqttPublishMessage = new MqttPublishMessage(mqttFixedHeader, mqttPublishVariableHeader, buf);
62+
63+
64+
sessionStore.setChannelId(store.getString("channelId"));
65+
sessionStore.setClientId(store.getString("clientId"));
66+
sessionStore.setCleanSession(store.getBoolean("cleanSession"));
67+
sessionStore.setWillMessage(mqttPublishMessage);
68+
return sessionStore;
69+
}
70+
71+
}

pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
<groupId>cn.wizzer</groupId>
77
<artifactId>mqtt-wk</artifactId>
88
<packaging>pom</packaging>
9-
<version>1.0.5-netty</version>
9+
<version>1.0.6-netty</version>
1010
<name>MqttWk</name>
1111
<modules>
1212
<module>mqtt-common</module>
@@ -15,7 +15,7 @@
1515
<module>mqtt-store</module>
1616
</modules>
1717
<properties>
18-
<mqttwk.version>1.0.3-netty</mqttwk.version>
18+
<mqttwk.version>1.0.6-netty</mqttwk.version>
1919
<nutzboot.version>2.3-SNAPSHOT</nutzboot.version>
2020
<netty.version>4.1.28.Final</netty.version>
2121
<fastjson.version>1.2.49</fastjson.version>

0 commit comments

Comments
 (0)