Skip to content

Commit 87070a5

Browse files
committed
Update RM
1 parent 1ba365f commit 87070a5

File tree

5 files changed

+313
-75
lines changed

5 files changed

+313
-75
lines changed

doc/XXL-MQ官方文档.md

Lines changed: 54 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -633,51 +633,71 @@ transaction | 事务开关,开启消息事务性保证只会成功执行一次
633633
### 5.7 版本 v1.4.0 Release Notes[迭代中]
634634

635635
### Tmp
636-
636+
```
637637
1、特性:
638638
- 特性:存算分离、水平扩展、高性能(TPS:Mysql单机1W/Blade 10W)、海量消息(Mysql日百万/Blade日十亿)、消息轨迹、多消费模式(分片/串行/广播)、延迟消息、失败重试(固定/增长/指数)、
639639
- 其他:失败告警、AccessToken、容器化;
640-
2、流程:
641-
- pub->broker:根据group生产,服务端处理。
642-
- sub-> broker:根据cId分片消费,服务端分片;服务端滚动预热,抗并发。
643-
- broker :无状态,维护topic注册节点。
644-
- appname:应用服务,注册节点uuid
645-
- topic:关联appname,分片切分。清理及归档配置。
646-
- message:topic,group,shardId
647-
- 归档数据:定时归档,自动清理
648-
3、设计:
640+
2、设计:
649641
- Broker:
650642
- Manage:
651-
- User:服务授权
643+
- User:服务授权
652644
- AccessToken:能力
653-
- AppName:能力(注册、节点动态更新;用于Topic数据分片;);【AppName维度,在线实例信息:实例总数 = instanceNum;序号 = instanceIndex;partitionScope = 3~5;】
645+
- AppName:
646+
- 能力(注册、节点动态更新;用于Topic数据分片;);【AppName维度,在线实例信息:实例总数 = instanceNum;序号 = instanceIndex;partitionScope = 3~5;】
654647
- 模型:Instance注册:字段(appname + uuid + register_heartbeat);app维度20s汇总一次,同步至app表;时钟打平,从0开始每20s一次;
655-
- Topic:能力(定义管理 + 查看注册节点 / 节点分片分配情况;);【Topic】【名称】【负责人】【告警邮箱】【状态】【存储策略】【partition数量】【优先级】【重试次数】【重试间隔策略】【归档策略】
656-
- 模型:topic + store + partition + level + author + alarm_email + timeout
657-
- Message:查看 + 管理(增 + 该状态 + 归档);消息队列,物理消息队列;【10min一次,自动数据归档;】
648+
- Topic:
649+
- 能力(定义管理 + 查看注册节点 / 节点分片分配情况;);【Topic】【名称】【负责人】【告警邮箱】【状态】【存储策略】【partition数量】【优先级】【重试次数】【重试间隔策略】【归档策略】
650+
- 模型:topic + store + partitionCount + level + author + alarm_email + timeout
651+
- Message:
652+
- 能力:查看 + 管理(增 + 该状态 + 归档);消息队列,物理消息队列;【10min一次,自动数据归档;】
658653
- 模型:msgid + msgbody + topic + group + partitionId + status + retryCount + intervalTime + effectTime + consume_log;
659654
- 属性:
660655
- topic:关联 消息主题;
661656
- group:数据广播;【topic向上;广播;】
662657
- uuid序号:并行处理
663658
- partitionKey:分区Key进行hashcode取模,会转成分区ID,限制 [0-10000] 之内;结合Consumer在线列表,匹配消费分片范围,实现并行分片消费消息;【topic向下;并行;同sId保障顺序;根据消费者 partition 信息计算 范围;】
664-
- MessageArchive:归档消息,同 message;
665-
- Registry:提供 Consumer 注册、动态发现能力;消息分片消费时使用;
666-
- Broker Server:提供消息存储、读写能力;
659+
- MessageArchive:
660+
- 能力:定期讲终止态消息,同步归档,清理原始表;
661+
- 模型:同 message;
662+
- Registry:
663+
- 能力:提供 Consumer 注册、动态发现能力;消息分片消费时使用;
664+
- Broker
665+
- 能力:Server:提供消息存储、读写能力;
667666
- OpenAPI:统一“Token验证”(http+gson;借助 xxl-tool 实现通用 http-rpc 能力;)
668667
- a 、注册:app+topic初始化 + 节点心跳注册/摘除;
669-
- 数据格式:
670-
- instance01(UUID):
671-
- topic01 / group01
672-
- topic02 / group02
673-
- b、生产:异步队列,批量写入;处理group广播。
674-
- c、批量查询(锁定):pullAndLock,多topic并行查询能力,根据node分配10条,同时锁定。分配不到直接返回。
675-
- 分片数据:
676-
- topic01:
677-
- group01:2/3;
678-
- 逻辑:pullAndLock(topics,group +节点)【生成唯一标识,lock时写入;根据标识判断锁定值;】
679-
- 分片查询:topic + group + 分片范围(分片序号计算;动态计算);默认每个topic取100条数据;
680-
- 数据所动:查询出的数据,锁定执行状态;根基Topic自定义超时时间(默认10min)超时释放;
668+
- 数据格式:
669+
- app01 :
670+
- instanceUuid01:
671+
- topicList:
672+
- topic01 : default
673+
- topic02 : group02
674+
- RegistryData:存app表;
675+
- app01:
676+
- instanceList:
677+
- instance01:partitionScope: [0-5000]
678+
- instance02:partitionScope:[5001-10000]
679+
- topicList:
680+
- topic01 : [default、group01、group02]
681+
- topic02 : [default]
682+
- RegistryCache:30s一次;
683+
- topic01:
684+
- info:配置信息(app);
685+
- instanceList:
686+
- instance01:partitionScope: [0-5000]
687+
- instance02:partitionScope:[5001-10000]
688+
- groupList:[default、group01、group02]
689+
- app01:基础信息;
690+
- b、生产:
691+
- 能力:异步队列,批量写入;处理group广播。
692+
- 数据格式:topic + group(null广播) + partitionKey(null随机;partitionId [0, 10000]) + msgBody
693+
- c、批量查询(锁定):
694+
- 能力:多topic并行查询;单topic分片查询;能力,根据node分配10条,同时锁定。分配不到直接返回。
695+
- pullAndLock:(topics,group +节点)【生成唯一标识,lock时写入;根据标识判断锁定值;】
696+
- 数据格式:
697+
- topic01 + group01 + clientId(计算partitionScope)
698+
- topic02 + group01 +
699+
- 查询逻辑:多topic并行查询 + 单topic分片查询 + 每topic每次取10个;
700+
- 锁定逻辑:针对查询出的数据,更新锁定态;注意超时释放;
681701
- d、消费消息:异步队列,批量更新消费结果;
682702
- Client:
683703
- Registry 组件:
@@ -687,9 +707,9 @@ transaction | 事务开关,开启消息事务性保证只会成功执行一次
687707
- 功能:直连Broker;发起消息生产;
688708
- 性能:内存queue,批量异步推送;异常,写本地磁盘;(xxl-tool)
689709
- 要点:
690-
- 并行消息:指定 topic + group(固定) + shardingid(随机),生产单条消息;借助 shardingid 分片消费;
691-
- 串行消息:指定 topic + group(固定) + shardingid(固定>0),生产单条消息;固定 shardingid 绑定固定节点消费;
692-
- 广播消息:指定 topic + group(uuid) + shardingid(随机);根据在线 group 列表生产多条消息;
710+
- 并行消息:指定 topic + group(固定) + partitionKey(随机),生产单条消息;借助 partitionKey 分片消费;
711+
- 串行消息:指定 topic + group(固定) + partitionKey(固定>0),生产单条消息;固定 partitionKey 绑定固定节点消费;
712+
- 广播消息:指定 topic + group(uuid) + partitionKey(随机);根据在线 group 列表生产多条消息;
693713
- Consumer 组件:
694714
- 功能:查询消息,消费消息,回调消息;
695715
- 性能:批量查询、批量回调;
@@ -700,7 +720,7 @@ transaction | 事务开关,开启消息事务性保证只会成功执行一次
700720
- ConsumerInvokeThread 组件:
701721
- 功能:消费业务逻辑,执行线程;
702722
- 要点:单Topic单线程;不同Topic隔离;
703-
723+
```
704724

705725
### TODO
706726
- 会考虑移除 mysql 强依赖的,迁移 jpa 进一步提升通用型。

doc/db/tables_xxl_mq.sql

Lines changed: 43 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -8,65 +8,67 @@ use `xxl_mq`;
88
SET NAMES utf8mb4;
99

1010

11-
## —————————————————————— config data ——————————————————
11+
## —————————————————————— topic and message ——————————————————
1212

1313
CREATE TABLE `xxl_mq_topic`(
14-
`id` int(11) NOT NULL AUTO_INCREMENT,
15-
`topic` varchar(255) NOT NULL COMMENT '消息主题Topic',
16-
`desc` varchar(100) NOT NULL COMMENT '消息主题名称',
17-
`store_type` tinyint(4) NOT NULL COMMENT '存储类型:0-通用存储,2-单独存储',
18-
`level` int(11) NOT NULL COMMENT '优先级',
19-
`owner` varchar(50) NOT NULL COMMENT '负责人',
20-
`alarm_email` varchar(255) DEFAULT NULL COMMENT '告警配置(邮箱)',
21-
`add_time` datetime NOT NULL COMMENT '新增时间',
22-
`update_time` datetime NOT NULL COMMENT '更新时间',
14+
`id` bigint(20) NOT NULL AUTO_INCREMENT,
15+
`topic` varchar(255) NOT NULL COMMENT '消息主题Topic',
16+
`name` varchar(100) NOT NULL COMMENT '消息主题名称',
17+
`owner` varchar(50) NOT NULL COMMENT '负责人',
18+
`alarm_email` varchar(255) DEFAULT NULL COMMENT '告警配置(邮箱)',
19+
`status` tinyint(4) NOT NULL COMMENT '状态:0-正常、1-禁用',
20+
`store_strategy` tinyint(4) NOT NULL COMMENT '存储策略:0-统一存储,2-隔离存储',
21+
`partition_strategy` tinyint(4) NOT NULL COMMENT '分区策略:0-Hash分区,1-随机分区,2-轮询分区',
22+
`level` int(11) NOT NULL COMMENT '优先级',
23+
`retry_type` varchar(100) NOT NULL COMMENT '重试策略(固定;增长;指数;不重试;)',
24+
`retry_count` int(11) NOT NULL COMMENT '重试次数',
25+
`retry_interval` int(11) NOT NULL COMMENT '重试间隔,单位秒(3s;3/6/9;3/9/27)',
26+
`execution_timeout` int(11) NOT NULL COMMENT '执行超时时间',
27+
`add_time` datetime NOT NULL COMMENT '新增时间',
28+
`update_time` datetime NOT NULL COMMENT '更新时间',
2329
PRIMARY KEY (`id`),
2430
UNIQUE KEY `uni_topic` (`topic`) USING BTREE
2531
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 COMMENT ='消息主题';
2632

2733
CREATE TABLE `xxl_mq_message`(
28-
`id` bigint(20) NOT NULL AUTO_INCREMENT,
29-
`data` text NOT NULL COMMENT '消息数据',
30-
`topic` varchar(255) NOT NULL COMMENT '消息主题Topic',
31-
`group` varchar(255) NOT NULL COMMENT '消息分组',
32-
`sharding_id` int(11) NOT NULL COMMENT '消息分片序号',
33-
`status` tinyint(4) NOT NULL COMMENT '状态:0-正常、1-运行中、2-成功、3-失败',
34-
`retry_count` int(11) NOT NULL COMMENT '重试次数',
35-
`retry_type` varchar(100) NOT NULL COMMENT '重试策略(固定;增长;指数)',
36-
`retry_interval` int(11) NOT NULL COMMENT '重试间隔(3s;2/4/6;2/4/8)',
37-
`effect_time` datetime NOT NULL COMMENT '生效时间',
38-
`consume_log` text DEFAULT NULL COMMENT '消费地址',
39-
`add_time` datetime NOT NULL COMMENT '新增时间',
40-
`update_time` datetime NOT NULL COMMENT '更新时间',
34+
`id` bigint(20) NOT NULL AUTO_INCREMENT,
35+
`data` text NOT NULL COMMENT '消息数据',
36+
`topic` varchar(255) NOT NULL COMMENT '消息主题Topic',
37+
`group` varchar(255) NOT NULL COMMENT '消息主题分组',
38+
`partition_id` int(11) NOT NULL COMMENT '消息分片ID',
39+
`status` tinyint(4) NOT NULL COMMENT '状态:0-正常、1-执行中、2-成功、3-失败、4-超时失败',
40+
`effect_time` datetime NOT NULL COMMENT '生效时间',
41+
`consume_log` text DEFAULT NULL COMMENT '消费日志',
42+
`consume_instance_uuid` varchar(50) DEFAULT NULL COMMENT '消费实例实例唯一标识',
43+
`add_time` datetime NOT NULL COMMENT '新增时间',
44+
`update_time` datetime NOT NULL COMMENT '更新时间',
4145
PRIMARY KEY (`id`),
42-
KEY `i_t_g_1` (`topic`, `group`, `sharding_id`)
46+
KEY `i_t_g_p` (`topic`, `group`, `partition_id`)
4347
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 COMMENT ='消息数据表';
4448

4549
CREATE TABLE `xxl_mq_message_archive` (
46-
`id` bigint(20) NOT NULL AUTO_INCREMENT,
47-
`data` text NOT NULL COMMENT '消息数据',
48-
`topic` varchar(255) NOT NULL COMMENT '消息主题Topic',
49-
`group` varchar(255) NOT NULL COMMENT '消息分组',
50-
`sharding_id` int(11) NOT NULL COMMENT '消息分片序号',
51-
`status` tinyint(4) NOT NULL COMMENT '状态:0-正常、1-运行中、2-成功、3-失败',
52-
`retry_count` int(11) NOT NULL COMMENT '重试次数',
53-
`retry_type` varchar(100) NOT NULL COMMENT '重试策略(固定;增长;指数)',
54-
`retry_interval` int(11) NOT NULL COMMENT '重试间隔(3s;2/4/6;2/4/8)',
55-
`effect_time` datetime NOT NULL COMMENT '生效时间',
56-
`consume_log` text DEFAULT NULL COMMENT '消费日志',
57-
`add_time` datetime NOT NULL COMMENT '新增时间',
58-
`update_time` datetime NOT NULL COMMENT '更新时间',
50+
`id` bigint(20) NOT NULL AUTO_INCREMENT,
51+
`data` text NOT NULL COMMENT '消息数据',
52+
`topic` varchar(255) NOT NULL COMMENT '消息主题Topic',
53+
`group` varchar(255) NOT NULL COMMENT '消息主题分组',
54+
`partition_id` int(11) NOT NULL COMMENT '消息分片ID',
55+
`status` tinyint(4) NOT NULL COMMENT '状态:0-正常、1-执行中、2-成功、3-失败、4-超时失败',
56+
`effect_time` datetime NOT NULL COMMENT '生效时间',
57+
`consume_log` text DEFAULT NULL COMMENT '消费日志',
58+
`consume_instance_uuid` varchar(50) DEFAULT NULL COMMENT '消费实例实例唯一标识',
59+
`add_time` datetime NOT NULL COMMENT '新增时间',
60+
`update_time` datetime NOT NULL COMMENT '更新时间',
5961
PRIMARY KEY (`id`),
60-
KEY `i_t_g_1` (`topic`, `group`, `sharding_id`)
62+
KEY `i_t_g_p` (`topic`, `group`, `partition_id`)
6163
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消息数据归档表';
6264

6365
## —————————————————————— registry ——————————————————
6466

6567
CREATE TABLE `xxl_mq_instance` (
6668
`id` bigint(20) NOT NULL AUTO_INCREMENT,
6769
`appname` varchar(50) NOT NULL COMMENT 'AppName(服务唯一标识)',
68-
`uuid` varchar(50) NOT NULL COMMENT '节点唯一标识',
69-
`register_heartbeat` datetime DEFAULT NULL COMMENT '节点最后心跳时间,动态注册时判定是否过期',
70+
`uuid` varchar(50) NOT NULL COMMENT '实例唯一标识',
71+
`register_heartbeat` datetime DEFAULT NULL COMMENT '实例最后心跳时间,动态注册时判定是否过期',
7072
`add_time` datetime NOT NULL COMMENT '新增时间',
7173
`update_time` datetime NOT NULL COMMENT '更新时间',
7274
PRIMARY KEY (`id`),
@@ -81,7 +83,7 @@ CREATE TABLE `xxl_mq_application` (
8183
`appname` varchar(50) NOT NULL COMMENT 'AppName(服务唯一标识)',
8284
`name` varchar(20) NOT NULL COMMENT '服务名称',
8385
`desc` varchar(100) NOT NULL COMMENT '服务描述',
84-
`registry_instance` text COMMENT '在线节点列表,数据JSON',
86+
`registry_data` text COMMENT '在线节点列表,数据JSON',
8587
`add_time` datetime NOT NULL COMMENT '新增时间',
8688
`update_time` datetime NOT NULL COMMENT '更新时间',
8789
PRIMARY KEY (`id`),
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package com.xxl.mq.admin.mapper;
2+
3+
import com.xxl.mq.admin.model.entity.Instance;
4+
import org.apache.ibatis.annotations.Param;
5+
import org.apache.ibatis.annotations.Mapper;
6+
7+
import java.util.List;
8+
9+
/**
10+
* Instance Mapper
11+
*
12+
* Created by xuxueli on '2025-03-16 12:29:49'.
13+
*/
14+
@Mapper
15+
public interface InstanceMapper {
16+
17+
/**
18+
* 新增
19+
*/
20+
public int insert(@Param("instance") Instance instance);
21+
22+
/**
23+
* 删除
24+
*/
25+
public int delete(@Param("ids") List<Integer> ids);
26+
27+
/**
28+
* 更新
29+
*/
30+
public int update(@Param("instance") Instance instance);
31+
32+
/**
33+
* Load查询
34+
*/
35+
public Instance load(@Param("id") int id);
36+
37+
/**
38+
* 分页查询Data
39+
*/
40+
public List<Instance> pageList(@Param("offset") int offset, @Param("pagesize") int pagesize);
41+
42+
/**
43+
* 分页查询Count
44+
*/
45+
public int pageListCount(@Param("offset") int offset, @Param("pagesize") int pagesize);
46+
47+
}
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package com.xxl.mq.admin.model.entity;
2+
3+
import java.io.Serializable;
4+
import java.util.Date;
5+
6+
/**
7+
* Instance Entity
8+
*
9+
* Created by xuxueli on '2025-03-16 12:28:40'.
10+
*/
11+
public class Instance implements Serializable {
12+
private static final long serialVersionUID = 42L;
13+
14+
/**
15+
* id
16+
*/
17+
private long id;
18+
19+
/**
20+
* AppName(服务唯一标识)
21+
*/
22+
private String appname;
23+
24+
/**
25+
* 节点唯一标识
26+
*/
27+
private String uuid;
28+
29+
/**
30+
* 节点最后心跳时间,动态注册时判定是否过期
31+
*/
32+
private Date registerHeartbeat;
33+
34+
/**
35+
* 新增时间
36+
*/
37+
private Date addTime;
38+
39+
/**
40+
* 更新时间
41+
*/
42+
private Date updateTime;
43+
44+
45+
public long getId() {
46+
return id;
47+
}
48+
49+
public void setId(long id) {
50+
this.id = id;
51+
}
52+
53+
public String getAppname() {
54+
return appname;
55+
}
56+
57+
public void setAppname(String appname) {
58+
this.appname = appname;
59+
}
60+
61+
public String getUuid() {
62+
return uuid;
63+
}
64+
65+
public void setUuid(String uuid) {
66+
this.uuid = uuid;
67+
}
68+
69+
public Date getRegisterHeartbeat() {
70+
return registerHeartbeat;
71+
}
72+
73+
public void setRegisterHeartbeat(Date registerHeartbeat) {
74+
this.registerHeartbeat = registerHeartbeat;
75+
}
76+
77+
public Date getAddTime() {
78+
return addTime;
79+
}
80+
81+
public void setAddTime(Date addTime) {
82+
this.addTime = addTime;
83+
}
84+
85+
public Date getUpdateTime() {
86+
return updateTime;
87+
}
88+
89+
public void setUpdateTime(Date updateTime) {
90+
this.updateTime = updateTime;
91+
}
92+
93+
}

0 commit comments

Comments
 (0)