diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/dto/WsFlinkSqlGatewayQueryResultDTO.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/dto/WsFlinkSqlGatewayQueryResultDTO.java index dffc62344..4209cf1e2 100644 --- a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/dto/WsFlinkSqlGatewayQueryResultDTO.java +++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/dto/WsFlinkSqlGatewayQueryResultDTO.java @@ -24,9 +24,7 @@ import org.apache.commons.codec.binary.Hex; import org.apache.flink.table.api.ResultKind; import org.apache.flink.table.catalog.Column; -import org.apache.flink.table.data.ArrayData; -import org.apache.flink.table.data.MapData; -import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.*; import org.apache.flink.table.gateway.api.results.ResultSet; import org.apache.flink.table.types.logical.*; @@ -128,8 +126,9 @@ private static Object getDataFromRow(Object rowData, LogicalType logicalType, in return dataClass.getDeclaredMethod("getDouble", int.class).invoke(rowData, index); case DECIMAL: DecimalType decimalType = (DecimalType) logicalType; - return dataClass.getDeclaredMethod("getDecimal", int.class, int.class, int.class) + DecimalData decimalData = (DecimalData) dataClass.getDeclaredMethod("getDecimal", int.class, int.class, int.class) .invoke(rowData, index, decimalType.getPrecision(), decimalType.getScale()); + return decimalData.toBigDecimal().doubleValue(); case BIGINT: case INTERVAL_DAY_TIME: return dataClass.getDeclaredMethod("getLong", int.class).invoke(rowData, index); @@ -178,7 +177,8 @@ private static Object getDataFromRow(Object rowData, LogicalType logicalType, in return list; case TIMESTAMP_WITH_LOCAL_TIME_ZONE: case TIMESTAMP_WITHOUT_TIME_ZONE: - return dataClass.getDeclaredMethod("getTimestamp", int.class, int.class).invoke(rowData, index, ((TimestampType) logicalType).getPrecision()); + TimestampData timestampData = (TimestampData) dataClass.getDeclaredMethod("getTimestamp", int.class, int.class).invoke(rowData, index, ((TimestampType) logicalType).getPrecision()); + return timestampData.toTimestamp(); case DISTINCT_TYPE: DistinctType distinctType = (DistinctType) logicalType; LogicalType sourceType = distinctType.getSourceType(); diff --git a/tools/docker/mysql/init.d/scaleph-ws-mysql.sql b/tools/docker/mysql/init.d/scaleph-ws-mysql.sql index d8979e631..6d3da50f1 100644 --- a/tools/docker/mysql/init.d/scaleph-ws-mysql.sql +++ b/tools/docker/mysql/init.d/scaleph-ws-mysql.sql @@ -112,7 +112,7 @@ VALUES (4, 6, '1.17.1', INSERT INTO `ws_flink_artifact_sql` (`id`, `flink_artifact_id`, `flink_version`, `script`, `current`, `creator`, `editor`) VALUES (5, 7, '1.17.1', - 'CREATE CATALOG my_catalog WITH (\n \'type\' = \'generic_in_memory\'\n);\n\nCREATE DATABASE my_catalog.my_database;\n\n\nCREATE TABLE my_catalog.my_database.source_table (\n `id` bigint,\n `name` string,\n `age` int,\n `address` string,\n `create_time`TIMESTAMP(3),\n `update_time`TIMESTAMP(3),\n WATERMARK FOR `update_time` AS update_time - INTERVAL \'1\' MINUTE\n)\nCOMMENT \'\'\nWITH (\n \'connector\' = \'datagen\',\n \'number-of-rows\' = \'100000\'\n);\n\nSELECT * FROM my_catalog.my_database.source_table;', + 'CREATE CATALOG my_catalog WITH (\n \'type\' = \'generic_in_memory\'\n);\n\nCREATE DATABASE my_catalog.my_database;\n\n\nCREATE TABLE my_catalog.my_database.source_table (\n `id` bigint,\n `name` string,\n `age` int,\n `address` string,\n `money` decimal(64, 4),\n `create_time`TIMESTAMP(3),\n `update_time`TIMESTAMP(3),\n WATERMARK FOR `update_time` AS update_time - INTERVAL \'1\' MINUTE\n)\nCOMMENT \'\'\nWITH (\n \'connector\' = \'datagen\',\n \'number-of-rows\' = \'100000\'\n);\n\nSELECT * FROM my_catalog.my_database.source_table;', '1', 'sys', 'sys'); INSERT INTO `ws_flink_artifact_sql` (`id`, `flink_artifact_id`, `flink_version`, `script`, `current`, `creator`,