Skip to content

Commit 7ab3e5e

Browse files
committed
feat(data): 添加 Doris 数据库支持
- 实现了 Doris 数据库的自动配置 - 添加了 Doris DDL 操作的接口和实现 - 编写了 Doris 表自动分区管理的功能 - 提供了 Doris集成测试的基类和测试用例
1 parent be34372 commit 7ab3e5e

File tree

15 files changed

+2024
-0
lines changed

15 files changed

+2024
-0
lines changed
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
<parent>
7+
<groupId>cn.fxbin.bubble</groupId>
8+
<artifactId>bubble-starters</artifactId>
9+
<version>2.0.0.BUILD-SNAPSHOT</version>
10+
<relativePath>../pom.xml</relativePath>
11+
</parent>
12+
13+
<artifactId>bubble-starter-data-doris</artifactId>
14+
<name>Bubble Starter Data Doris</name>
15+
<description>Bubble Doris Starter</description>
16+
17+
<properties>
18+
<java.version>17</java.version>
19+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
20+
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
21+
22+
<testcontainers.version>1.21.0</testcontainers.version>
23+
</properties>
24+
25+
<dependencies>
26+
<!-- bubble-starter -->
27+
<dependency>
28+
<groupId>cn.fxbin.bubble</groupId>
29+
<artifactId>bubble-starter</artifactId>
30+
</dependency>
31+
32+
<dependency>
33+
<groupId>com.mysql</groupId>
34+
<artifactId>mysql-connector-j</artifactId>
35+
<scope>runtime</scope>
36+
</dependency>
37+
38+
<!-- HikariCP connection pool -->
39+
<dependency>
40+
<groupId>com.zaxxer</groupId>
41+
<artifactId>HikariCP</artifactId>
42+
<optional>true</optional>
43+
</dependency>
44+
45+
<!-- Spring JDBC -->
46+
<dependency>
47+
<groupId>org.springframework</groupId>
48+
<artifactId>spring-jdbc</artifactId>
49+
</dependency>
50+
<dependency>
51+
<groupId>org.springframework</groupId>
52+
<artifactId>spring-tx</artifactId>
53+
</dependency>
54+
55+
56+
<!-- 测试依赖 -->
57+
<dependency>
58+
<groupId>cn.fxbin.bubble</groupId>
59+
<artifactId>bubble-starter-test</artifactId>
60+
<scope>test</scope>
61+
</dependency>
62+
63+
<dependency>
64+
<groupId>org.testcontainers</groupId>
65+
<artifactId>junit-jupiter</artifactId>
66+
</dependency>
67+
<dependency>
68+
<groupId>org.testcontainers</groupId>
69+
<artifactId>jdbc</artifactId>
70+
<scope>test</scope>
71+
</dependency>
72+
</dependencies>
73+
74+
<build>
75+
<plugins>
76+
<plugin>
77+
<groupId>org.apache.maven.plugins</groupId>
78+
<artifactId>maven-surefire-plugin</artifactId>
79+
<version>3.5.3</version>
80+
<configuration>
81+
<forkCount>1</forkCount>
82+
<reuseForks>false</reuseForks>
83+
<argLine>@{jacocoArgLine}</argLine>
84+
</configuration>
85+
</plugin>
86+
</plugins>
87+
</build>
88+
89+
</project>
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package cn.fxbin.bubble.data.doris;
2+
3+
import org.springframework.lang.Nullable;
4+
5+
/**
6+
* DorisDdlOperations
7+
* 提供 Doris 数据库通用的 DDL 操作接口,例如执行原生 SQL,检查表/分区是否存在等。
8+
*
9+
* @author fxbin
10+
* @version v1.0
11+
* @since 2024/5/26 0:00
12+
*/
13+
public interface DorisDdlOperations {
14+
15+
/**
16+
* 执行原生 DDL 语句
17+
*
18+
* @param ddl SQL DDL语句
19+
*/
20+
void execute(String ddl);
21+
22+
/**
23+
* 检查表是否存在
24+
*
25+
* @param databaseName 数据库名称
26+
* @param tableName 表名称
27+
* @return 如果表存在则返回 true,否则返回 false
28+
*/
29+
boolean tableExists(@Nullable String databaseName, String tableName);
30+
31+
/**
32+
* 检查分区是否存在
33+
*
34+
* @param databaseName 数据库名称
35+
* @param tableName 表名称
36+
* @param partitionName 分区名称
37+
* @return 如果分区存在则返回 true,否则返回 false
38+
*/
39+
boolean partitionExists(@Nullable String databaseName, String tableName, String partitionName);
40+
41+
/**
42+
* 创建分区
43+
*
44+
* @param databaseName 数据库名称
45+
* @param tableName 表名称
46+
* @param partitionDefinition 分区定义,例如: PARTITION p202301 VALUES LESS THAN ('2023-02-01')
47+
*/
48+
void addPartition(@Nullable String databaseName, String tableName, String partitionDefinition);
49+
50+
/**
51+
* 删除分区
52+
*
53+
* @param databaseName 数据库名称
54+
* @param tableName 表名称
55+
* @param partitionName 分区名称
56+
*/
57+
void dropPartition(@Nullable String databaseName, String tableName, String partitionName);
58+
59+
/**
60+
* 获取自动创建的分区名称,根据传入的日期值
61+
*
62+
* @param dateValue 日期值
63+
* @param granularity 分区粒度 (year, month, day)
64+
* @return 分区名称
65+
*/
66+
String getAutoPartitionName(String dateValue, String granularity);
67+
68+
}
Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
package cn.fxbin.bubble.data.doris;
2+
3+
import cn.fxbin.bubble.core.constant.StringPool;
4+
import cn.fxbin.bubble.data.doris.autoconfigure.DorisProperties;
5+
import org.slf4j.Logger;
6+
import org.slf4j.LoggerFactory;
7+
import org.springframework.util.Assert;
8+
import org.springframework.util.StringUtils;
9+
10+
import java.time.LocalDateTime;
11+
import java.time.format.DateTimeFormatter;
12+
import java.util.Locale;
13+
import java.util.regex.Matcher;
14+
import java.util.regex.Pattern;
15+
16+
/**
17+
* DorisTableAutoPartition
18+
*
19+
* @author fxbin
20+
* @version v1.0
21+
* @since 2024/5/26 0:00
22+
*/
23+
public class DorisTableAutoPartition {
24+
25+
private static final Logger log = LoggerFactory.getLogger(DorisTableAutoPartition.class);
26+
27+
private final DorisDdlOperations ddlOperations;
28+
private final DorisProperties dorisProperties;
29+
30+
// 正则表达式用于从 DDL 语句中提取表名和数据库名
31+
// 例如:CREATE TABLE date_table AUTO PARTITION BY RANGE (date_trunc(TIME_STAMP, 'month')) ()
32+
private static final Pattern CREATE_TABLE_PATTERN = Pattern.compile(
33+
"CREATE\\s+TABLE\\s+(?:(?<database>[a-zA-Z_][a-zA-Z0-9_]*)\\.)?(?<tableName>[a-zA-Z_][a-zA-Z0-9_]*)"
34+
+ "\\s+AUTO\\s+PARTITION\\s+BY\\s+(?<partitionType>RANGE|LIST)\\s*\\((?<partitionBy>[^)]+)\\)",
35+
Pattern.CASE_INSENSITIVE
36+
);
37+
38+
// 正则表达式用于提取 date_trunc 函数的粒度
39+
private static final Pattern DATE_TRUNC_PATTERN = Pattern.compile(
40+
"date_trunc\\([^,]+,\\s*'(?<granularity>year|month|day|hour|minute)'\\)",
41+
Pattern.CASE_INSENSITIVE
42+
);
43+
44+
public DorisTableAutoPartition(DorisDdlOperations ddlOperations, DorisProperties dorisProperties) {
45+
this.ddlOperations = ddlOperations;
46+
this.dorisProperties = dorisProperties;
47+
}
48+
49+
/**
50+
* 检查并自动创建分区
51+
* <p>
52+
* 假设数据导入时,可以获取到分区相关的值 (例如时间戳或列表值)
53+
* 在实际应用中,此方法可能需要在数据导入前被调用,或者在拦截数据导入请求时调用。
54+
* 并且需要根据实际的 DDL 来获取表名、数据库名、分区类型、分区字段或表达式、分区粒度。
55+
* </p>
56+
*
57+
* @param databaseName 数据库名称
58+
* @param tableName 表名称
59+
* @param ddl 创建表的DDL语句,用于解析自动分区配置
60+
* @param partitionValue 当前要写入数据的分区值 (时间值或列表值)
61+
*/
62+
public void checkAndCreatePartition(String databaseName, String tableName, String ddl, String partitionValue) {
63+
if (!dorisProperties.getAutoPartition().isEnabled()) {
64+
log.debug("Doris 自动分区功能未启用,跳过分区检查和创建。");
65+
return;
66+
}
67+
68+
Matcher matcher = CREATE_TABLE_PATTERN.matcher(ddl);
69+
if (!matcher.find()) {
70+
log.warn("无法从 DDL 语句中解析出自动分区信息:{}", ddl);
71+
return;
72+
}
73+
74+
// 从 DDL 中获取数据库名、表名、分区类型和分区表达式
75+
String parsedDatabaseName = matcher.group("database");
76+
String parsedTableName = matcher.group("tableName");
77+
String partitionTypeStr = matcher.group("partitionType");
78+
String partitionBy = matcher.group("partitionBy");
79+
80+
// 优先使用传入的数据库名和表名,如果 DDL 中没有指定,则使用传入的
81+
String effectiveDatabaseName = StringUtils.hasText(parsedDatabaseName) ? parsedDatabaseName : databaseName;
82+
String effectiveTableName = StringUtils.hasText(parsedTableName) ? parsedTableName : tableName;
83+
84+
Assert.hasText(effectiveDatabaseName, "数据库名称不能为空");
85+
Assert.hasText(effectiveTableName, "表名称不能为空");
86+
87+
DorisProperties.AutoPartitionType autoPartitionType = DorisProperties.AutoPartitionType.valueOf(partitionTypeStr.toUpperCase(Locale.ROOT));
88+
89+
String partitionName = null;
90+
String partitionDefinition = null;
91+
92+
switch (autoPartitionType) {
93+
case RANGE:
94+
Matcher dateTruncMatcher = DATE_TRUNC_PATTERN.matcher(partitionBy);
95+
if (!dateTruncMatcher.find()) {
96+
log.error("RANGE 自动分区类型必须使用 date_trunc 函数,且指定粒度。DDL: {}", ddl);
97+
return;
98+
}
99+
String granularity = dateTruncMatcher.group("granularity");
100+
// 假设 partitionValue 是一个符合粒度的日期字符串,例如 "2023-01-15"
101+
partitionName = ddlOperations.getAutoPartitionName(partitionValue, granularity);
102+
// 获取下一个分区的值,例如对于 'month','2023-01-15' 对应分区 'p202301',下一个分区应为 '2023-02-01'
103+
String nextPartitionValue = calculateNextRangePartitionValue(partitionValue, granularity);
104+
partitionDefinition = String.format("PARTITION %s VALUES LESS THAN ('%s')", partitionName, nextPartitionValue);
105+
break;
106+
case LIST:
107+
// LIST 类型直接使用 partitionValue 作为分区键
108+
// Doris 的 LIST 分区 VALUES IN 语法
109+
partitionName = "p_" + partitionValue.replaceAll("[^a-zA-Z0-9_]", "_"); // 简单处理分区名,避免特殊字符
110+
partitionDefinition = String.format("PARTITION %s VALUES IN ('%s')", partitionName, partitionValue);
111+
112+
if (partitionBy.contains(StringPool.COMMA)) {
113+
// 支持多列 LIST 分区
114+
String[] partitionColumns = partitionBy.split(StringPool.COMMA);
115+
String[] partitionValues = partitionValue.split(StringPool.COMMA);
116+
if (partitionColumns.length != partitionValues.length) {
117+
log.error("LIST 自动分区多列分区时,分区列数与分区值数不匹配. 列: {}, 值: {}", partitionBy, partitionValue);
118+
return;
119+
}
120+
// 构建多列分区值,例如: (value1, value2)
121+
partitionDefinition = String.format("PARTITION %s VALUES IN ((%s))", partitionName, String.join(StringPool.COMMA, partitionValues));
122+
}
123+
break;
124+
default:
125+
log.error("不支持的自动分区类型: {}", autoPartitionType);
126+
return;
127+
}
128+
129+
if (partitionName == null || partitionDefinition == null) {
130+
log.error("无法构建分区名称或分区定义,跳过自动分区创建。");
131+
return;
132+
}
133+
134+
// 检查分区是否存在,如果不存在则创建
135+
if (!ddlOperations.partitionExists(effectiveDatabaseName, effectiveTableName, partitionName)) {
136+
log.info("分区 {} 不存在于表 {}.{} 中,正在自动创建 ...", partitionName, effectiveDatabaseName, effectiveTableName);
137+
ddlOperations.addPartition(effectiveDatabaseName, effectiveTableName, partitionDefinition);
138+
log.info("分区 {} 已成功创建在表 {}.{} 中。", partitionName, effectiveDatabaseName, effectiveTableName);
139+
} else {
140+
log.debug("分区 {} 已存在于表 {}.{} 中,无需重复创建。", partitionName, effectiveDatabaseName, effectiveTableName);
141+
}
142+
}
143+
144+
/**
145+
* 计算 RANGE 分区的下一个边界值
146+
*
147+
* @param currentValue 当前分区值,例如 "2023-01-15"
148+
* @param granularity 分区粒度 (year, month, day, hour, minute)
149+
* @return 下一个分区的边界值,例如 "2023-02-01"
150+
*/
151+
private String calculateNextRangePartitionValue(String currentValue, String granularity) {
152+
LocalDateTime dateTime;
153+
DateTimeFormatter formatter;
154+
155+
// 根据粒度解析当前值
156+
switch (granularity.toLowerCase(Locale.ROOT)) {
157+
case "year":
158+
formatter = DateTimeFormatter.ofPattern("yyyy");
159+
dateTime = LocalDateTime.parse(currentValue + "-01-01T00:00:00", DateTimeFormatter.ISO_LOCAL_DATE_TIME);
160+
break;
161+
case "month":
162+
formatter = DateTimeFormatter.ofPattern("yyyy-MM");
163+
dateTime = LocalDateTime.parse(currentValue + "-01T00:00:00", DateTimeFormatter.ISO_LOCAL_DATE_TIME);
164+
break;
165+
case "day":
166+
formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
167+
dateTime = LocalDateTime.parse(currentValue + "T00:00:00", DateTimeFormatter.ISO_LOCAL_DATE_TIME);
168+
break;
169+
case "hour":
170+
formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH");
171+
dateTime = LocalDateTime.parse(currentValue + ":00:00", DateTimeFormatter.ISO_LOCAL_DATE_TIME);
172+
break;
173+
case "minute":
174+
formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm");
175+
dateTime = LocalDateTime.parse(currentValue + ":00", DateTimeFormatter.ISO_LOCAL_DATE_TIME);
176+
break;
177+
default:
178+
throw new IllegalArgumentException("不支持的粒度: " + granularity);
179+
}
180+
181+
// 计算下一个日期时间
182+
LocalDateTime nextDateTime;
183+
switch (granularity.toLowerCase(Locale.ROOT)) {
184+
case "year":
185+
nextDateTime = dateTime.plusYears(1);
186+
formatter = DateTimeFormatter.ofPattern("yyyy-01-01"); // Doris RANGE 分区通常需要完整日期
187+
break;
188+
case "month":
189+
nextDateTime = dateTime.plusMonths(1);
190+
formatter = DateTimeFormatter.ofPattern("yyyy-MM-01"); // Doris RANGE 分区通常需要完整日期
191+
break;
192+
case "day":
193+
nextDateTime = dateTime.plusDays(1);
194+
formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
195+
break;
196+
case "hour":
197+
nextDateTime = dateTime.plusHours(1);
198+
formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:00:00");
199+
break;
200+
case "minute":
201+
nextDateTime = dateTime.plusMinutes(1);
202+
formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:00");
203+
break;
204+
default:
205+
throw new IllegalArgumentException("不支持的粒度: " + granularity);
206+
}
207+
208+
return nextDateTime.format(formatter);
209+
}
210+
}

0 commit comments

Comments
 (0)