Skip to content

Commit c798baa

Browse files
committed
feat(core): 添加注册功能
- 在数据库中增加注册数据字段
1 parent 1736cf3 commit c798baa

File tree

9 files changed

+190
-26
lines changed

9 files changed

+190
-26
lines changed

doc/db/tables_xxl_mq.sql

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ CREATE TABLE `xxl_mq_instance` (
7171
`appname` varchar(50) NOT NULL COMMENT 'AppName(服务唯一标识)',
7272
`uuid` varchar(50) NOT NULL COMMENT '实例唯一标识',
7373
`register_heartbeat` datetime DEFAULT NULL COMMENT '实例最后心跳时间,动态注册时判定是否过期',
74+
`registry_data` text DEFAULT NULL COMMENT '注册数据,JSON',
7475
`add_time` datetime NOT NULL COMMENT '新增时间',
7576
`update_time` datetime NOT NULL COMMENT '更新时间',
7677
PRIMARY KEY (`id`),
@@ -85,7 +86,7 @@ CREATE TABLE `xxl_mq_application` (
8586
`appname` varchar(50) NOT NULL COMMENT 'AppName(服务唯一标识)',
8687
`name` varchar(20) NOT NULL COMMENT '服务名称',
8788
`desc` varchar(100) NOT NULL COMMENT '服务描述',
88-
`registry_data` text COMMENT '在线节点列表,数据JSON',
89+
`registry_data` text DEFAULT NULL COMMENT '注册数据,JSON',
8990
`add_time` datetime NOT NULL COMMENT '新增时间',
9091
`update_time` datetime NOT NULL COMMENT '更新时间',
9192
PRIMARY KEY (`id`),

xxl-mq-admin2/src/main/java/com/xxl/mq/admin/model/entity/Application.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ public class Application implements Serializable {
3131
*/
3232
private String desc;
3333

34+
/**
35+
* 注册数据
36+
*/
37+
private String registryData;
38+
3439
/**
3540
* 新增时间
3641
*/
@@ -74,6 +79,14 @@ public void setDesc(String desc) {
7479
this.desc = desc;
7580
}
7681

82+
public String getRegistryData() {
83+
return registryData;
84+
}
85+
86+
public void setRegistryData(String registryData) {
87+
this.registryData = registryData;
88+
}
89+
7790
public Date getAddTime() {
7891
return addTime;
7992
}

xxl-mq-admin2/src/main/java/com/xxl/mq/admin/model/entity/Instance.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ public class Instance implements Serializable {
3131
*/
3232
private Date registerHeartbeat;
3333

34+
/**
35+
* 注册数据
36+
*/
37+
private String registryData;
38+
3439
/**
3540
* 新增时间
3641
*/
@@ -74,6 +79,14 @@ public void setRegisterHeartbeat(Date registerHeartbeat) {
7479
this.registerHeartbeat = registerHeartbeat;
7580
}
7681

82+
public String getRegistryData() {
83+
return registryData;
84+
}
85+
86+
public void setRegistryData(String registryData) {
87+
this.registryData = registryData;
88+
}
89+
7790
public Date getAddTime() {
7891
return addTime;
7992
}

xxl-mq-admin2/src/main/java/com/xxl/mq/admin/openapi/biz/BrokerServiceImpl.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.xxl.mq.admin.openapi.biz;
22

3+
import com.xxl.mq.admin.openapi.config.BrokerFactory;
34
import com.xxl.mq.core.openapi.BrokerService;
45
import com.xxl.mq.core.openapi.model.*;
56
import com.xxl.tool.core.CollectionTool;
@@ -20,11 +21,19 @@ public class BrokerServiceImpl implements BrokerService {
2021

2122
@Override
2223
public Response<String> registry(RegistryRequest registryRequest) {
23-
return null;
24+
// valid token
25+
if (registryRequest==null || !BrokerFactory.getInstance().validAccessToken(registryRequest.getAccessToken())) {
26+
return Response.ofFail("accessToken invalid");
27+
}
28+
29+
// invoke
30+
boolean ret = BrokerFactory.getInstance().registry(registryRequest);
31+
return ret? Response.ofSuccess() : Response.ofFail();
2432
}
2533

2634
@Override
2735
public Response<String> produce(ProduceRequest produceRequest) {
36+
// todo
2837
return null;
2938
}
3039

xxl-mq-admin2/src/main/java/com/xxl/mq/admin/openapi/config/BrokerFactory.java

Lines changed: 55 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,11 @@
33
import com.xxl.mq.admin.constant.enums.AccessTokenStatuEnum;
44
import com.xxl.mq.admin.mapper.*;
55
import com.xxl.mq.admin.model.entity.AccessToken;
6+
import com.xxl.mq.admin.model.entity.Instance;
67
import com.xxl.mq.core.openapi.BrokerService;
8+
import com.xxl.mq.core.openapi.model.RegistryRequest;
79
import com.xxl.tool.concurrent.CyclicThread;
10+
import com.xxl.tool.concurrent.MessageQueue;
811
import com.xxl.tool.core.CollectionTool;
912
import com.xxl.tool.gson.GsonTool;
1013
import com.xxl.tool.jsonrpc.JsonRpcServer;
@@ -16,6 +19,7 @@
1619
import org.springframework.context.annotation.Configuration;
1720

1821
import javax.annotation.Resource;
22+
import java.util.Date;
1923
import java.util.List;
2024
import java.util.Objects;
2125
import java.util.Set;
@@ -47,13 +51,9 @@ public static BrokerFactory getInstance() {
4751
private ApplicationMapper applicationMapper;
4852
@Resource
4953
private AccessTokenMapper accessTokenMapper;
54+
@Resource
55+
private InstanceMapper instanceMapper;
5056

51-
public ApplicationMapper getApplicationMapper() {
52-
return applicationMapper;
53-
}
54-
public AccessTokenMapper getAccessTokenMapper() {
55-
return accessTokenMapper;
56-
}
5757

5858
@Override
5959
public void afterPropertiesSet() throws Exception {
@@ -62,12 +62,14 @@ public void afterPropertiesSet() throws Exception {
6262

6363
// 1、AccessTokenThread
6464
startAccessTokenThread();
65+
66+
// 2、Registry MessageQueue
67+
startRegistryMessageQueue();
6568
}
6669

6770
@Override
6871
public void destroy() throws Exception {
69-
// 1、AccessTokenThread
70-
stopAccessTokenThread();
72+
// stop
7173
}
7274

7375

@@ -84,14 +86,16 @@ public JsonRpcServer jsonRpcServer() {
8486

8587
// ---------------------- AccessToken ----------------------
8688

89+
/**
90+
* AccessToken LocalStore
91+
*/
8792
private volatile Set<String> accessTokenStore = new ConcurrentSkipListSet<>();
88-
private CyclicThread accessTokenThread;
8993

9094
/**
91-
* start AccessTokenThread
95+
* start AccessTokenThread (will stop with jvm)
9296
*/
9397
private void startAccessTokenThread() {
94-
accessTokenThread = new CyclicThread("accessTokenThread", true, 30 * 1000, new Runnable() {
98+
CyclicThread accessTokenThread = new CyclicThread("accessTokenThread", true, 30 * 1000, new Runnable() {
9599
@Override
96100
public void run() {
97101
try {
@@ -120,15 +124,6 @@ public void run() {
120124
accessTokenThread.start();
121125
}
122126

123-
/*
124-
* stop AccessTokenThread
125-
*/
126-
private void stopAccessTokenThread() {
127-
if (accessTokenThread != null) {
128-
accessTokenThread.stop();
129-
}
130-
}
131-
132127
/**
133128
* valid AccessToken
134129
*
@@ -139,4 +134,44 @@ public boolean validAccessToken(String accessToken) {
139134
return accessToken!=null && accessTokenStore.contains(accessToken);
140135
}
141136

137+
// ---------------------- Registry ----------------------
138+
139+
private volatile MessageQueue<RegistryRequest> registryMessageQueue;
140+
141+
/**
142+
* start Registry MessageQueue (will stop with jvm)
143+
*/
144+
private void startRegistryMessageQueue() {
145+
registryMessageQueue = new MessageQueue<RegistryRequest>(
146+
"registryMessageQueue",
147+
1000,
148+
messages -> {
149+
for (RegistryRequest registryRequest : messages) {
150+
// write registry
151+
String registryDate = GsonTool.toJson(registryRequest);
152+
153+
Instance newInstance = new Instance();
154+
newInstance.setAppname(registryRequest.getAppname());
155+
newInstance.setUuid(registryRequest.getInstanceUuid());
156+
newInstance.setRegisterHeartbeat(new Date());
157+
newInstance.setRegistryData(registryDate);
158+
159+
instanceMapper.insert(newInstance); // todo, save or update
160+
}
161+
},
162+
1,
163+
1);
164+
}
165+
166+
/**
167+
* registry
168+
*
169+
* @param registryRequest
170+
* @return
171+
*/
172+
public boolean registry(RegistryRequest registryRequest) {
173+
return registryMessageQueue.produce(registryRequest);
174+
}
175+
176+
142177
}

xxl-mq-admin2/src/main/resources/mapper/ApplicationMapper.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
<result column="appname" property="appname" />
99
<result column="name" property="name" />
1010
<result column="desc" property="desc" />
11+
<result column="registry_data" property="registryData" />
1112
<result column="add_time" property="addTime" />
1213
<result column="update_time" property="updateTime" />
1314
</resultMap>
@@ -17,6 +18,7 @@
1718
t.`appname`,
1819
t.`name`,
1920
t.`desc`,
21+
t.`registry_data`,
2022
t.`add_time`,
2123
t.`update_time`
2224
</sql>
@@ -26,13 +28,15 @@
2628
`appname`,
2729
`name`,
2830
`desc`,
31+
`registry_data`,
2932
`add_time`,
3033
`update_time`
3134
)
3235
VALUES(
3336
#{application.appname} ,
3437
#{application.name} ,
3538
#{application.desc} ,
39+
#{application.registryData} ,
3640
NOW() ,
3741
NOW()
3842
)
@@ -51,6 +55,7 @@
5155
SET
5256
`name` = #{application.name},
5357
`desc` = #{application.desc},
58+
`registry_data` = #{application.registryData},
5459
`update_time` = NOW()
5560
WHERE `id` = #{application.id}
5661
</update>

xxl-mq-admin2/src/main/resources/mapper/InstanceMapper.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
<result column="appname" property="appname" />
99
<result column="uuid" property="uuid" />
1010
<result column="register_heartbeat" property="registerHeartbeat" />
11+
<result column="registry_data" property="registryData" />
1112
<result column="add_time" property="addTime" />
1213
<result column="update_time" property="updateTime" />
1314
</resultMap>
@@ -17,6 +18,7 @@
1718
t.`appname`,
1819
t.`uuid`,
1920
t.`register_heartbeat`,
21+
t.`registry_data`,
2022
t.`add_time`,
2123
t.`update_time`
2224
</sql>
@@ -26,13 +28,15 @@
2628
`appname`,
2729
`uuid`,
2830
`register_heartbeat`,
31+
`registry_data`,
2932
`add_time`,
3033
`update_time`
3134
)
3235
VALUES(
3336
#{instance.appname} ,
3437
#{instance.uuid} ,
3538
#{instance.registerHeartbeat} ,
39+
#{instance.registryData} ,
3640
NOW() ,
3741
NOW()
3842
)
@@ -52,6 +56,7 @@
5256
`appname` = #{instance.appname},
5357
`uuid` = #{instance.uuid},
5458
`register_heartbeat` = #{instance.registerHeartbeat},
59+
`registry_data` = #{instance.registryData},
5560
`update_time` = NOW()
5661
WHERE `id` = #{instance.id}
5762
</update>

0 commit comments

Comments
 (0)