From a185e9f11a1d415c1618509f1cce31edbede69c5 Mon Sep 17 00:00:00 2001 From: kalencaya <1942460489@qq.com> Date: Thu, 7 Sep 2023 21:32:17 +0800 Subject: [PATCH] [Feature][scaleph-engine-sql-gateway] add column info for flink sql query result (#618) feature: add column info for flink sql query result --- .../dto/WsFlinkSqlGatewayQueryResultDTO.java | 43 ++++++++++++------- .../docker/mysql/init.d/scaleph-ws-mysql.sql | 2 +- 2 files changed, 29 insertions(+), 16 deletions(-) diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/dto/WsFlinkSqlGatewayQueryResultDTO.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/dto/WsFlinkSqlGatewayQueryResultDTO.java index 163969aec..dffc62344 100644 --- a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/dto/WsFlinkSqlGatewayQueryResultDTO.java +++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/dto/WsFlinkSqlGatewayQueryResultDTO.java @@ -18,6 +18,7 @@ package cn.sliew.scaleph.engine.sql.gateway.services.dto; +import cn.sliew.scaleph.engine.sql.gateway.services.dto.catalog.ColumnInfo; import io.swagger.v3.oas.annotations.media.Schema; import lombok.*; import org.apache.commons.codec.binary.Hex; @@ -43,6 +44,9 @@ @Builder public class WsFlinkSqlGatewayQueryResultDTO { + @Schema(description = "SQL 执行状态。NOT_READY: 未就绪,需轮询重试, PAYLOAD: 可查询, EOS: 数据查询已至末尾,后续无数据,不在调用", + allowableValues = {"PAYLOAD", "NOT_READY", "EOS"}) + private ResultSet.ResultType resultType; @Schema(description = "结果类型。SUCCESS: 执行成功, SUCCESS_WITH_CONTENT: 执行成功并可获取执行结果", allowableValues = {"SUCCESS", "SUCCESS_WITH_CONTENT"}) private ResultKind resultKind; @@ -52,9 +56,8 @@ public class WsFlinkSqlGatewayQueryResultDTO { private Long nextToken; @Schema(description = "是否支持查询数据") private Boolean isQueryResult; - @Schema(description = "数据就绪状态。NOT_READY: 未就绪, PAYLOAD: 可查询, EOS: 数据查询已至末尾,后续无数据", - allowableValues = {"PAYLOAD", "NOT_READY", "EOS"}) - private ResultSet.ResultType resultType; + @Schema(description = "数据类型信息") + private List columns; @Schema(description = "数据") private List> data; @@ -65,18 +68,28 @@ public static WsFlinkSqlGatewayQueryResultDTO fromResultSet(ResultSet resultSet) builder.resultKind(resultSet.getResultKind()).jobID(resultSet.getJobID().toHexString()); if (resultSet.isQueryResult()) { List columns = resultSet.getResultSchema().getColumns(); - builder.data(resultSet.getData().stream().map(rowData -> { - Map map = new HashMap<>(); - for (int i = 0; i < columns.size(); i++) { - Column column = columns.get(i); - try { - map.put(column.getName(), getDataFromRow(rowData, column.getDataType().getLogicalType(), i)); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - return map; - }).collect(Collectors.toList())); + builder + .columns(columns.stream().map(column -> { + ColumnInfo.ColumnInfoBuilder columnInfoBuilder = ColumnInfo.builder() + .columnName(column.getName()) + .dataType(column.getDataType().getLogicalType().toString()) + .isPersist(column.isPersisted()) + .isPhysical(column.isPhysical()); + column.getComment().ifPresent(columnInfoBuilder::comment); + return columnInfoBuilder.build(); + }).collect(Collectors.toList())) + .data(resultSet.getData().stream().map(rowData -> { + Map map = new HashMap<>(); + for (int i = 0; i < columns.size(); i++) { + Column column = columns.get(i); + try { + map.put(column.getName(), getDataFromRow(rowData, column.getDataType().getLogicalType(), i)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + return map; + }).collect(Collectors.toList())); builder.nextToken(resultSet.getNextToken()); } } diff --git a/tools/docker/mysql/init.d/scaleph-ws-mysql.sql b/tools/docker/mysql/init.d/scaleph-ws-mysql.sql index 3274d869d..d8979e631 100644 --- a/tools/docker/mysql/init.d/scaleph-ws-mysql.sql +++ b/tools/docker/mysql/init.d/scaleph-ws-mysql.sql @@ -112,7 +112,7 @@ VALUES (4, 6, '1.17.1', INSERT INTO `ws_flink_artifact_sql` (`id`, `flink_artifact_id`, `flink_version`, `script`, `current`, `creator`, `editor`) VALUES (5, 7, '1.17.1', - 'CREATE CATALOG my_catalog WITH (\n \'type\' = \'generic_in_memory\'\n);\n\nCREATE DATABASE my_catalog.my_database;\n\n\nCREATE TABLE my_catalog.my_database.source_table (\n `id` bigint,\n `name` string,\n `age` int,\n `address` string,\n `create_time`TIMESTAMP(3),\n `update_time`TIMESTAMP(3),\n WATERMARK FOR `update_time` AS update_time - INTERVAL \'1\' MINUTE\n)\nCOMMENT \'\'\nWITH (\n \'connector\' = \'datagen\'\n);\n\nSELECT * FROM my_catalog.my_database.source_table;', + 'CREATE CATALOG my_catalog WITH (\n \'type\' = \'generic_in_memory\'\n);\n\nCREATE DATABASE my_catalog.my_database;\n\n\nCREATE TABLE my_catalog.my_database.source_table (\n `id` bigint,\n `name` string,\n `age` int,\n `address` string,\n `create_time`TIMESTAMP(3),\n `update_time`TIMESTAMP(3),\n WATERMARK FOR `update_time` AS update_time - INTERVAL \'1\' MINUTE\n)\nCOMMENT \'\'\nWITH (\n \'connector\' = \'datagen\',\n \'number-of-rows\' = \'100000\'\n);\n\nSELECT * FROM my_catalog.my_database.source_table;', '1', 'sys', 'sys'); INSERT INTO `ws_flink_artifact_sql` (`id`, `flink_artifact_id`, `flink_version`, `script`, `current`, `creator`,