Skip to content

Commit

Permalink
[Feature][scaleph-engine-sql-gateway] add column info for flink sql q…
Browse files Browse the repository at this point in the history
…uery result (#618)

feature: add column info for flink sql query result
  • Loading branch information
kalencaya authored Sep 7, 2023
1 parent 55102d0 commit a185e9f
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package cn.sliew.scaleph.engine.sql.gateway.services.dto;

import cn.sliew.scaleph.engine.sql.gateway.services.dto.catalog.ColumnInfo;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import org.apache.commons.codec.binary.Hex;
Expand All @@ -43,6 +44,9 @@
@Builder
public class WsFlinkSqlGatewayQueryResultDTO {

@Schema(description = "SQL 执行状态。NOT_READY: 未就绪,需轮询重试, PAYLOAD: 可查询, EOS: 数据查询已至末尾,后续无数据,不在调用",
allowableValues = {"PAYLOAD", "NOT_READY", "EOS"})
private ResultSet.ResultType resultType;
@Schema(description = "结果类型。SUCCESS: 执行成功, SUCCESS_WITH_CONTENT: 执行成功并可获取执行结果",
allowableValues = {"SUCCESS", "SUCCESS_WITH_CONTENT"})
private ResultKind resultKind;
Expand All @@ -52,9 +56,8 @@ public class WsFlinkSqlGatewayQueryResultDTO {
private Long nextToken;
@Schema(description = "是否支持查询数据")
private Boolean isQueryResult;
@Schema(description = "数据就绪状态。NOT_READY: 未就绪, PAYLOAD: 可查询, EOS: 数据查询已至末尾,后续无数据",
allowableValues = {"PAYLOAD", "NOT_READY", "EOS"})
private ResultSet.ResultType resultType;
@Schema(description = "数据类型信息")
private List<ColumnInfo> columns;
@Schema(description = "数据")
private List<Map<String, Object>> data;

Expand All @@ -65,18 +68,28 @@ public static WsFlinkSqlGatewayQueryResultDTO fromResultSet(ResultSet resultSet)
builder.resultKind(resultSet.getResultKind()).jobID(resultSet.getJobID().toHexString());
if (resultSet.isQueryResult()) {
List<Column> columns = resultSet.getResultSchema().getColumns();
builder.data(resultSet.getData().stream().map(rowData -> {
Map<String, Object> map = new HashMap<>();
for (int i = 0; i < columns.size(); i++) {
Column column = columns.get(i);
try {
map.put(column.getName(), getDataFromRow(rowData, column.getDataType().getLogicalType(), i));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
return map;
}).collect(Collectors.toList()));
builder
.columns(columns.stream().map(column -> {
ColumnInfo.ColumnInfoBuilder columnInfoBuilder = ColumnInfo.builder()
.columnName(column.getName())
.dataType(column.getDataType().getLogicalType().toString())
.isPersist(column.isPersisted())
.isPhysical(column.isPhysical());
column.getComment().ifPresent(columnInfoBuilder::comment);
return columnInfoBuilder.build();
}).collect(Collectors.toList()))
.data(resultSet.getData().stream().map(rowData -> {
Map<String, Object> map = new HashMap<>();
for (int i = 0; i < columns.size(); i++) {
Column column = columns.get(i);
try {
map.put(column.getName(), getDataFromRow(rowData, column.getDataType().getLogicalType(), i));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
return map;
}).collect(Collectors.toList()));
builder.nextToken(resultSet.getNextToken());
}
}
Expand Down
2 changes: 1 addition & 1 deletion tools/docker/mysql/init.d/scaleph-ws-mysql.sql
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ VALUES (4, 6, '1.17.1',
INSERT INTO `ws_flink_artifact_sql` (`id`, `flink_artifact_id`, `flink_version`, `script`, `current`, `creator`,
`editor`)
VALUES (5, 7, '1.17.1',
'CREATE CATALOG my_catalog WITH (\n \'type\' = \'generic_in_memory\'\n);\n\nCREATE DATABASE my_catalog.my_database;\n\n\nCREATE TABLE my_catalog.my_database.source_table (\n `id` bigint,\n `name` string,\n `age` int,\n `address` string,\n `create_time`TIMESTAMP(3),\n `update_time`TIMESTAMP(3),\n WATERMARK FOR `update_time` AS update_time - INTERVAL \'1\' MINUTE\n)\nCOMMENT \'\'\nWITH (\n \'connector\' = \'datagen\'\n);\n\nSELECT * FROM my_catalog.my_database.source_table;',
'CREATE CATALOG my_catalog WITH (\n \'type\' = \'generic_in_memory\'\n);\n\nCREATE DATABASE my_catalog.my_database;\n\n\nCREATE TABLE my_catalog.my_database.source_table (\n `id` bigint,\n `name` string,\n `age` int,\n `address` string,\n `create_time`TIMESTAMP(3),\n `update_time`TIMESTAMP(3),\n WATERMARK FOR `update_time` AS update_time - INTERVAL \'1\' MINUTE\n)\nCOMMENT \'\'\nWITH (\n \'connector\' = \'datagen\',\n \'number-of-rows\' = \'100000\'\n);\n\nSELECT * FROM my_catalog.my_database.source_table;',
'1', 'sys', 'sys');

INSERT INTO `ws_flink_artifact_sql` (`id`, `flink_artifact_id`, `flink_version`, `script`, `current`, `creator`,
Expand Down

0 comments on commit a185e9f

Please sign in to comment.