Skip to content

Commit

Permalink
[Feature][scaleph-engine-sql-gateway] optimize Flink TimestampData an…
Browse files Browse the repository at this point in the history
…d DecimalData format (#619)

feature: optimize Flink TimestampData and DecimalData format
  • Loading branch information
kalencaya authored Sep 8, 2023
1 parent a185e9f commit 8a47056
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@
import org.apache.commons.codec.binary.Hex;
import org.apache.flink.table.api.ResultKind;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.MapData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.*;
import org.apache.flink.table.gateway.api.results.ResultSet;
import org.apache.flink.table.types.logical.*;

Expand Down Expand Up @@ -128,8 +126,9 @@ private static Object getDataFromRow(Object rowData, LogicalType logicalType, in
return dataClass.getDeclaredMethod("getDouble", int.class).invoke(rowData, index);
case DECIMAL:
DecimalType decimalType = (DecimalType) logicalType;
return dataClass.getDeclaredMethod("getDecimal", int.class, int.class, int.class)
DecimalData decimalData = (DecimalData) dataClass.getDeclaredMethod("getDecimal", int.class, int.class, int.class)
.invoke(rowData, index, decimalType.getPrecision(), decimalType.getScale());
return decimalData.toBigDecimal().doubleValue();
case BIGINT:
case INTERVAL_DAY_TIME:
return dataClass.getDeclaredMethod("getLong", int.class).invoke(rowData, index);
Expand Down Expand Up @@ -178,7 +177,8 @@ private static Object getDataFromRow(Object rowData, LogicalType logicalType, in
return list;
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
case TIMESTAMP_WITHOUT_TIME_ZONE:
return dataClass.getDeclaredMethod("getTimestamp", int.class, int.class).invoke(rowData, index, ((TimestampType) logicalType).getPrecision());
TimestampData timestampData = (TimestampData) dataClass.getDeclaredMethod("getTimestamp", int.class, int.class).invoke(rowData, index, ((TimestampType) logicalType).getPrecision());
return timestampData.toTimestamp();
case DISTINCT_TYPE:
DistinctType distinctType = (DistinctType) logicalType;
LogicalType sourceType = distinctType.getSourceType();
Expand Down
2 changes: 1 addition & 1 deletion tools/docker/mysql/init.d/scaleph-ws-mysql.sql
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ VALUES (4, 6, '1.17.1',
INSERT INTO `ws_flink_artifact_sql` (`id`, `flink_artifact_id`, `flink_version`, `script`, `current`, `creator`,
`editor`)
VALUES (5, 7, '1.17.1',
'CREATE CATALOG my_catalog WITH (\n \'type\' = \'generic_in_memory\'\n);\n\nCREATE DATABASE my_catalog.my_database;\n\n\nCREATE TABLE my_catalog.my_database.source_table (\n `id` bigint,\n `name` string,\n `age` int,\n `address` string,\n `create_time`TIMESTAMP(3),\n `update_time`TIMESTAMP(3),\n WATERMARK FOR `update_time` AS update_time - INTERVAL \'1\' MINUTE\n)\nCOMMENT \'\'\nWITH (\n \'connector\' = \'datagen\',\n \'number-of-rows\' = \'100000\'\n);\n\nSELECT * FROM my_catalog.my_database.source_table;',
'CREATE CATALOG my_catalog WITH (\n \'type\' = \'generic_in_memory\'\n);\n\nCREATE DATABASE my_catalog.my_database;\n\n\nCREATE TABLE my_catalog.my_database.source_table (\n `id` bigint,\n `name` string,\n `age` int,\n `address` string,\n `money` decimal(64, 4),\n `create_time`TIMESTAMP(3),\n `update_time`TIMESTAMP(3),\n WATERMARK FOR `update_time` AS update_time - INTERVAL \'1\' MINUTE\n)\nCOMMENT \'\'\nWITH (\n \'connector\' = \'datagen\',\n \'number-of-rows\' = \'100000\'\n);\n\nSELECT * FROM my_catalog.my_database.source_table;',
'1', 'sys', 'sys');

INSERT INTO `ws_flink_artifact_sql` (`id`, `flink_artifact_id`, `flink_version`, `script`, `current`, `creator`,
Expand Down

0 comments on commit 8a47056

Please sign in to comment.