Skip to content

Commit 2bc82a2

Browse files
committed
Update RM
1 parent 075c42d commit 2bc82a2

File tree

2 files changed

+82
-81
lines changed

2 files changed

+82
-81
lines changed

doc/XXL-MQ官方文档.md

Lines changed: 62 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -637,68 +637,70 @@ transaction | 事务开关,开启消息事务性保证只会成功执行一次
637637
1、特性:
638638
- 特性:存算分离、水平扩展、高性能(TPS:Mysql单机1W/Blade 10W)、海量消息(Mysql日百万/Blade日十亿)、消息轨迹、多消费模式(分片/串行/广播)、延迟消息、失败重试(固定/增长/指数)、
639639
- 其他:失败告警、AccessToken、容器化;
640-
2、模型设计:
641-
- _user:用户管理
642-
- _access_token:通讯Token,for OpenAPI
643-
- _registry:【10s一次;广播更新;】Broker动态注册(内置RPC服务) + Consumer动态注册(数据分片用)
644-
- 字段:type + key + data + addtime
645-
- 示例:
646-
- 01(broker) + “broker” + “address01” + "" + now()
647-
- 02(comsumer) + “consumer_uuid” + [{topic01&group, topic02&group}] + now()
648-
- 【RegistryData】不额外存储,内存动态生成;
649-
- Broker:[address01、address02]
650-
- Consumer:topic : [{group : consumer_uuid01, uuid02}]
651-
- _topic:消息主题,配置管理;查看注册节点,借助 “consumer_uuid + group” 分片处理数据;
652-
- 字段:name + store_table(通用/单独表) + 优先级() + author + alarm_email
653-
- 示例:“topic01” + "单独表" + "1" + "" + "" +
654-
- _message:消息队列,物理消息队列;【10min一次,自动数据归档;】
655-
- 字段:msgid + msgbody + topic + group + shardingId + status + retryCount + intervalTime + effectTime + consume_log;
656-
- topic:关联 消息主题;
657-
- group:数据广播
658-
- uuid序号:并行处理
659-
- shardingId:消费分片ID,限制0-1000之内;结合Consumer在线列表,匹配消费分片范围,实现并行分片消费消息;
660-
- _message_archive:归档消息
661-
- 字段:同 message;
662-
3、模块设计:
663-
- 首页:Topic数量、集群数量、消息管数量;
664-
- Topic管理:
665-
- 查询条件:Topic(模糊搜索)
666-
- 管理:Topic,存储配置、优先级(归档等)、告警邮箱;注册信息查看;
667-
- 消息管理:
668-
- 查询条件 》 Topic/必选(精确搜索) + Group + 状态 + 时间;
669-
- 操作 》新增 + 删除 + 状态更新;
670-
- Broker管理:Broker 集群节点;IP : PORT;
671-
4、组件组成:
672-
- Broker:
673-
- Manage:提供 AccessToken、Topic、Message 管理能力;
640+
2、流程:
641+
- pub->broker:根据group生产,服务端处理。
642+
- sub-> broker:根据cId分片消费,服务端分片;服务端滚动预热,抗并发。
643+
- broker :无状态,维护topic注册节点。
644+
- appname:应用服务,注册节点uuid
645+
- topic:关联appname,分片切分。清理及归档配置。
646+
- message:topic,group,shardId
647+
- 归档数据:定时归档,自动清理
648+
3、设计:
649+
- Broker:
650+
- Manage:
651+
- User:服务授权
652+
- AccessToken:能力
653+
- AppName:能力(注册、节点动态更新;用于Topic数据分片;)
654+
- 模型:Instance注册:字段(appname + uuid + register_heartbeat);app维度20s汇总一次,同步至app表;时钟打平,从0开始每20s一次;
655+
- Topic:能力(定义管理 + 查看注册节点 / 节点分片分配情况;)
656+
- 模型:topic + store_table(通用/单独表) + 优先级() + author + alarm_email + timeout
657+
- Message:查看 + 管理(增 + 该状态 + 归档);消息队列,物理消息队列;【10min一次,自动数据归档;】
658+
- 模型:msgid + msgbody + topic + group + shardingId + status + retryCount + intervalTime + effectTime + consume_log;
659+
- 属性:
660+
- topic:关联 消息主题;
661+
- group:数据广播
662+
- uuid序号:并行处理
663+
- shardingId:消费分片ID,限制0-1000之内;结合Consumer在线列表,匹配消费分片范围,实现并行分片消费消息;
664+
- MessageArchive:归档消息,同 message;
674665
- Registry:提供 Consumer 注册、动态发现能力;消息分片消费时使用;
675666
- Broker Server:提供消息存储、读写能力;
676-
- OpenAPI:生产、批量查询、锁定、消费消息;(http+gson;借助 xxl-tool 实现通用 http-rpc 能力;)
677-
- Producer:
678-
- 功能:直连Broker;发起消息生产;
679-
- 性能:内存queue,批量异步推送;异常,写本地磁盘;(xxl-tool)
680-
- 要点:
681-
- 并行消息:指定 topic + group(固定) + shardingid(随机),生产单条消息;借助 shardingid 分片消费;
682-
- 串行消息:指定 topic + group(固定) + shardingid(固定>0),生产单条消息;固定 shardingid 绑定固定节点消费;
683-
- 广播消息:指定 topic + group(uuid) + shardingid(随机);根据在线 group 列表生产多条消息;
684-
- Consumer:
685-
- 功能:查询消息,消费消息,回调消息;
686-
- 性能:批量查询、批量回调;
687-
- 要点:
688-
- 注册:
689-
- 写:instanceUUID : topic + group
690-
- instance01
691-
- topic01 / group01
692-
- topic02 / group02
693-
- 读:instanceIndex / instanceNum;
694-
- topic01
695-
- group01: 3/5
696-
- topic02
697-
- group02: 4/6
698-
- 查询:分片查询:topic + group + 注册分片计算->消息分片范围;
699-
- 属性:
700-
- topic:绑定 Topic,只消费该topic的消息;
701-
- group:绑定 Topic Group,group 维度隔离,只消费该group下消息;同一 topic 支持绑定多group,可借助 group 实效消息广播;
667+
- OpenAPI:统一“Token验证”(http+gson;借助 xxl-tool 实现通用 http-rpc 能力;)
668+
- a 、注册:app+topic初始化 + 节点心跳注册/摘除;
669+
- 数据格式:
670+
- instance01(UUID):
671+
- topic01 / group01
672+
- topic02 / group02
673+
- b、生产:异步队列,批量写入;处理group广播。
674+
- c、批量查询(锁定):pullAndLock,多topic并行查询能力,根据node分配10条,同时锁定。分配不到直接返回。
675+
- 分片数据:
676+
- topic01:
677+
- group01:2/3;
678+
- 逻辑:pullAndLock(topics,group +节点)
679+
- 分片查询:topic + group + 分片范围(分片序号计算;动态计算);默认每个topic取100条数据;
680+
- 数据所动:查询出的数据,锁定执行状态;根基Topic自定义超时时间(默认10min)超时释放;
681+
- d、消费消息:异步队列,批量更新消费结果;
682+
- Client:
683+
- Registry 组件:
684+
- 数据:app + 节点UUID(IP+时间戳) + topics
685+
- 动作:初始化(app + topics绑定) + 节点心跳 + 节点下线摘除;
686+
- Producer 组件:
687+
- 功能:直连Broker;发起消息生产;
688+
- 性能:内存queue,批量异步推送;异常,写本地磁盘;(xxl-tool)
689+
- 要点:
690+
- 并行消息:指定 topic + group(固定) + shardingid(随机),生产单条消息;借助 shardingid 分片消费;
691+
- 串行消息:指定 topic + group(固定) + shardingid(固定>0),生产单条消息;固定 shardingid 绑定固定节点消费;
692+
- 广播消息:指定 topic + group(uuid) + shardingid(随机);根据在线 group 列表生产多条消息;
693+
- Consumer 组件:
694+
- 功能:查询消息,消费消息,回调消息;
695+
- 性能:批量查询、批量回调;
696+
- 要点:pullAndLock前过滤topic,空闲topic才查询;
697+
- 属性:
698+
- topic:绑定 Topic,只消费该topic的消息;
699+
- group:绑定 Topic Group,group 维度隔离,只消费该group下消息;同一 topic 支持绑定多group,可借助 group 实效消息广播;
700+
- ConsumerInvokeThread 组件:
701+
- 功能:消费业务逻辑,执行线程;
702+
- 要点:单Topic单线程;不同Topic隔离;
703+
702704

703705
### TODO
704706
- 会考虑移除 mysql 强依赖的,迁移 jpa 进一步提升通用型。

doc/db/tables_xxl_mq.sql

Lines changed: 20 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -60,34 +60,33 @@ CREATE TABLE `xxl_mq_message_archive` (
6060
KEY `i_t_g_1` (`topic`, `group`, `sharding_id`)
6161
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消息数据归档表';
6262

63-
## —————————————————————— registry of application ——————————————————
64-
65-
CREATE TABLE `xxl_mq_registry` (
66-
`id` bigint(20) NOT NULL AUTO_INCREMENT,
67-
`type` tinyint(4) NOT NULL COMMENT '注册类型:1-Broker,2-Consumer',
68-
`key` varchar(255) NOT NULL COMMENT '注册Key(broker,consumer—uuid)',
69-
`data` text NOT NULL COMMENT '数据正文,json结构体(address01,topic01&group)',
70-
`add_time` datetime NOT NULL COMMENT '新增时间',
71-
`update_time` datetime NOT NULL COMMENT '更新时间',
72-
PRIMARY KEY (`id`)
73-
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='注册表';
74-
75-
76-
77-
63+
## —————————————————————— registry ——————————————————
64+
65+
CREATE TABLE `xxl_mq_instance` (
66+
`id` bigint(20) NOT NULL AUTO_INCREMENT,
67+
`appname` varchar(50) NOT NULL COMMENT 'AppName(服务唯一标识)',
68+
`uuid` varchar(50) NOT NULL COMMENT '节点唯一标识',
69+
`register_heartbeat` datetime DEFAULT NULL COMMENT '节点最后心跳时间,动态注册时判定是否过期',
70+
`add_time` datetime NOT NULL COMMENT '新增时间',
71+
`update_time` datetime NOT NULL COMMENT '更新时间',
72+
PRIMARY KEY (`id`),
73+
UNIQUE KEY `uni_instance` (`appname`, `uuid`) USING BTREE,
74+
KEY `i_e_a` (`appname`)
75+
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='服务注册实例';
7876

7977
## —————————————————————— user and token and application ——————————————————
8078

8179
CREATE TABLE `xxl_mq_application` (
8280
`id` int(11) NOT NULL AUTO_INCREMENT,
83-
`appname` varchar(50) NOT NULL COMMENT 'AppName(应用唯一标识)',
84-
`name` varchar(20) NOT NULL COMMENT '应用名称',
85-
`desc` varchar(100) NOT NULL COMMENT '应用描述',
81+
`appname` varchar(50) NOT NULL COMMENT 'AppName(服务唯一标识)',
82+
`name` varchar(20) NOT NULL COMMENT '服务名称',
83+
`desc` varchar(100) NOT NULL COMMENT '服务描述',
84+
`registry_instance` text COMMENT '在线节点列表,数据JSON',
8685
`add_time` datetime NOT NULL COMMENT '新增时间',
8786
`update_time` datetime NOT NULL COMMENT '更新时间',
8887
PRIMARY KEY (`id`),
8988
UNIQUE KEY `i_appname` (`appname`) USING BTREE
90-
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='应用';
89+
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='服务';
9190

9291
CREATE TABLE `xxl_mq_user` (
9392
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '用户ID',
@@ -97,7 +96,7 @@ CREATE TABLE `xxl_mq_user` (
9796
`status` tinyint(4) NOT NULL COMMENT '状态:0-正常、1-禁用',
9897
`real_name` varchar(50) DEFAULT NULL COMMENT '真实姓名',
9998
`role` varchar(20) NOT NULL COMMENT '角色:ADMIN-管理员,NORMAL-普通用户',
100-
`permission` varchar(255) DEFAULT NULL COMMENT '权限:应用ID列表,多个逗号分割',
99+
`permission` varchar(255) DEFAULT NULL COMMENT '权限:服务ID列表,多个逗号分割',
101100
`add_time` datetime NOT NULL COMMENT '新增时间',
102101
`update_time` datetime NOT NULL COMMENT '更新时间',
103102
PRIMARY KEY (`id`),
@@ -123,7 +122,7 @@ INSERT INTO `xxl_mq_access_token` (id, `access_token`, `status`, add_time, updat
123122
VALUES (1, 'defaultaccesstoken', 0, now(), now());
124123

125124
INSERT INTO `xxl_mq_application` (id, appname, name, `desc`, add_time, update_time)
126-
VALUES (1, 'xxl-mq-sample', '示例应用', '示例应用,演示使用', '2025-01-18 20:03:13', '2025-01-18 20:03:13');
125+
VALUES (1, 'xxl-mq-sample', '示例服务', '示例服务,演示使用', '2025-01-18 20:03:13', '2025-01-18 20:03:13');
127126

128127

129128
commit;

0 commit comments

Comments
 (0)