From 070b5f11f71a17da29e43c42b7b40dbe9903d3e2 Mon Sep 17 00:00:00 2001 From: liubodong Date: Tue, 22 Aug 2023 00:47:14 +0800 Subject: [PATCH 1/4] [Feature][Seatunnel] StarRocks add base-url property --- .../scaleph/ds/modal/olap/StarRocksDataSource.java | 4 ++++ .../starrocks/sink/StarRocksSinkPlugin.java | 4 ++++ .../starrocks/sink/StarRocksSinkProperties.java | 14 ++++++++++---- 3 files changed, 18 insertions(+), 4 deletions(-) diff --git a/scaleph-datasource/src/main/java/cn/sliew/scaleph/ds/modal/olap/StarRocksDataSource.java b/scaleph-datasource/src/main/java/cn/sliew/scaleph/ds/modal/olap/StarRocksDataSource.java index 48d7b9c09..ec530ba7c 100644 --- a/scaleph-datasource/src/main/java/cn/sliew/scaleph/ds/modal/olap/StarRocksDataSource.java +++ b/scaleph-datasource/src/main/java/cn/sliew/scaleph/ds/modal/olap/StarRocksDataSource.java @@ -41,6 +41,10 @@ public class StarRocksDataSource extends AbstractDataSource { @Schema(description = "Node Urls") private String nodeUrls; + @NotBlank + @Schema(description = "Base url") + private String baseUrl; + @NotBlank @Schema(description = "username") private String username; diff --git a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/starrocks/sink/StarRocksSinkPlugin.java b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/starrocks/sink/StarRocksSinkPlugin.java index 87eba74e2..d68cab0bb 100644 --- a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/starrocks/sink/StarRocksSinkPlugin.java +++ b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/starrocks/sink/StarRocksSinkPlugin.java @@ -47,6 +47,7 @@ public StarRocksSinkPlugin() { "Used to send data to StarRocks. Both support streaming and batch mode. The internal implementation of StarRocks sink connector is cached and imported by stream load in batches.", StarRocksSinkPlugin.class.getName()); final List props = new ArrayList<>(); + props.add(BASE_URL); props.add(DATABASE); props.add(TABLE); props.add(LABEL_PREFIX); @@ -81,6 +82,9 @@ public ObjectNode createConf() { if (StringUtils.hasText(dataSource.getPassword())) { conf.putPOJO(PASSWORD.getName(), dataSource.getPassword()); } + if (StringUtils.hasText(dataSource.getBaseUrl())) { + conf.putPOJO(BASE_URL.getName(), dataSource.getBaseUrl()); + } return conf; } diff --git a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/starrocks/sink/StarRocksSinkProperties.java b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/starrocks/sink/StarRocksSinkProperties.java index a546b71b3..671ba08ad 100644 --- a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/starrocks/sink/StarRocksSinkProperties.java +++ b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/starrocks/sink/StarRocksSinkProperties.java @@ -18,15 +18,21 @@ package cn.sliew.scaleph.plugin.seatunnel.flink.connectors.starrocks.sink; -import cn.sliew.scaleph.plugin.framework.property.Parsers; -import cn.sliew.scaleph.plugin.framework.property.PropertyDescriptor; -import cn.sliew.scaleph.plugin.framework.property.PropertyType; -import cn.sliew.scaleph.plugin.framework.property.Validators; +import cn.sliew.scaleph.plugin.framework.property.*; import com.fasterxml.jackson.databind.JsonNode; public enum StarRocksSinkProperties { ; + public static final PropertyDescriptor BASE_URL = new PropertyDescriptor.Builder<>() + .name("base-url") + .description("The JDBC URL like jdbc:mysql://localhost:9030/ or jdbc:mysql://localhost:9030 or jdbc:mysql://localhost:9030/db") + .properties(Property.Required) + .type(PropertyType.STRING) + .parser(Parsers.STRING_PARSER) + .addValidator(Validators.NON_BLANK_VALIDATOR) + .validateAndBuild(); + public static final PropertyDescriptor LABEL_PREFIX = new PropertyDescriptor.Builder() .name("labelPrefix") .description("The prefix of StarRocks stream load label") From 969cc495952339a127aacb988f815e0842ee60c7 Mon Sep 17 00:00:00 2001 From: liubodong Date: Tue, 22 Aug 2023 16:33:05 +0800 Subject: [PATCH 2/4] [Feature][setunnel-plugin-seatunnel-connector] Update starrocks plugin to 2.3.2 --- scaleph-ui-react/src/locales/zh-CN/pages/project.ts | 3 ++- .../Project/Workspace/Artifact/DI/DiJobFlow/Dag/constant.tsx | 1 + .../DI/DiJobFlow/Dag/steps/sink/sink-starrocks-step.tsx | 5 +++++ 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/scaleph-ui-react/src/locales/zh-CN/pages/project.ts b/scaleph-ui-react/src/locales/zh-CN/pages/project.ts index f467196e0..8f4b5cefb 100644 --- a/scaleph-ui-react/src/locales/zh-CN/pages/project.ts +++ b/scaleph-ui-react/src/locales/zh-CN/pages/project.ts @@ -570,6 +570,7 @@ export default { // starrocks + 'pages.project.di.step.starrocks.base-url': '基础连接地址', 'pages.project.di.step.starrocks.database': '数据库', 'pages.project.di.step.starrocks.table': '数据表', 'pages.project.di.step.starrocks.labelPrefix': '标签前缀', @@ -616,7 +617,7 @@ export default { // s3redshift 'pages.project.di.step.s3redshift.jdbcUrl': 'Jdbc URL', - 'pages.project.di.step.s3redshift.jdbcUser': 'Jdbc哟你见过户', + 'pages.project.di.step.s3redshift.jdbcUser': 'Jdbc用户名', 'pages.project.di.step.s3redshift.jdbcPassword': 'Jdbc密码', 'pages.project.di.step.s3redshift.executeSql': 'SQL查询语句', 'pages.project.di.step.s3redshift.executeSql.placeholoder': diff --git a/scaleph-ui-react/src/pages/Project/Workspace/Artifact/DI/DiJobFlow/Dag/constant.tsx b/scaleph-ui-react/src/pages/Project/Workspace/Artifact/DI/DiJobFlow/Dag/constant.tsx index d0d989602..c41e08fa2 100644 --- a/scaleph-ui-react/src/pages/Project/Workspace/Artifact/DI/DiJobFlow/Dag/constant.tsx +++ b/scaleph-ui-react/src/pages/Project/Workspace/Artifact/DI/DiJobFlow/Dag/constant.tsx @@ -519,6 +519,7 @@ export const DorisParams = { }; export const StarRocksParams = { + baseUrl: 'base-url', database: 'database', table: 'table', labelPrefix: 'labelPrefix', diff --git a/scaleph-ui-react/src/pages/Project/Workspace/Artifact/DI/DiJobFlow/Dag/steps/sink/sink-starrocks-step.tsx b/scaleph-ui-react/src/pages/Project/Workspace/Artifact/DI/DiJobFlow/Dag/steps/sink/sink-starrocks-step.tsx index 32017c19c..b3428b890 100644 --- a/scaleph-ui-react/src/pages/Project/Workspace/Artifact/DI/DiJobFlow/Dag/steps/sink/sink-starrocks-step.tsx +++ b/scaleph-ui-react/src/pages/Project/Workspace/Artifact/DI/DiJobFlow/Dag/steps/sink/sink-starrocks-step.tsx @@ -75,6 +75,11 @@ const SinkStarRocksStepForm: React.FC< colProps={{span: 24}} /> + Date: Tue, 22 Aug 2023 17:13:03 +0800 Subject: [PATCH 3/4] [Feature][setunnel-plugin-seatunnel-connector] Change file system connector property 'file_format' to 'file_format_type' --- .../flink/connectors/file/FileSinkProperties.java | 4 ++-- .../connectors/file/ftp/sink/FtpFileSinkPlugin.java | 2 +- .../connectors/file/hdfs/sink/HDFSFileSinkPlugin.java | 2 +- .../file/local/sink/LocalFileSinkPlugin.java | 2 +- .../flink/connectors/file/oss/sink/OSSSinkPlugin.java | 2 +- .../file/ossjindo/sink/OSSJindoSinkPlugin.java | 2 +- .../flink/connectors/file/s3/sink/S3SinkPlugin.java | 2 +- .../connectors/file/sftp/sink/SftpFileSinkPlugin.java | 2 +- .../s3redshift/sink/S3RedshiftSinkPlugin.java | 2 +- .../Workspace/Artifact/DI/DiJobFlow/Dag/constant.tsx | 1 - .../DI/DiJobFlow/Dag/steps/sink/sink-ftp-file-step.tsx | 10 +++++----- .../DiJobFlow/Dag/steps/sink/sink-hdfs-file-step.tsx | 10 +++++----- .../DiJobFlow/Dag/steps/sink/sink-local-file-step.tsx | 10 +++++----- .../DI/DiJobFlow/Dag/steps/sink/sink-oss-file-step.tsx | 10 +++++----- .../Dag/steps/sink/sink-ossjindo-file-step.tsx | 10 +++++----- .../DI/DiJobFlow/Dag/steps/sink/sink-s3-file-step.tsx | 10 +++++----- .../Dag/steps/sink/sink-s3redshift-file-step.tsx | 8 ++++---- .../DiJobFlow/Dag/steps/sink/sink-sftp-file-step.tsx | 10 +++++----- 18 files changed, 49 insertions(+), 50 deletions(-) diff --git a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/FileSinkProperties.java b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/FileSinkProperties.java index 45304b368..a2cfc5697 100644 --- a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/FileSinkProperties.java +++ b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/FileSinkProperties.java @@ -45,8 +45,8 @@ public enum FileSinkProperties { .addValidator(Validators.NON_BLANK_VALIDATOR) .validateAndBuild(); - public static final PropertyDescriptor FILE_FORMAT = new PropertyDescriptor.Builder() - .name("file_format") + public static final PropertyDescriptor FILE_FORMAT_TYPE = new PropertyDescriptor.Builder() + .name("file_format_type") .description("We supported as the following file types:text, csv, excel, parquet, orc, json") .type(PropertyType.STRING) .defaultValue("text") diff --git a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/ftp/sink/FtpFileSinkPlugin.java b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/ftp/sink/FtpFileSinkPlugin.java index 37a39ec99..e85d6d869 100644 --- a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/ftp/sink/FtpFileSinkPlugin.java +++ b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/ftp/sink/FtpFileSinkPlugin.java @@ -50,7 +50,7 @@ public FtpFileSinkPlugin() { final List props = new ArrayList<>(); props.add(PATH); - props.add(FILE_FORMAT); + props.add(FILE_FORMAT_TYPE); props.add(CUSTOM_FILENAME); props.add(FILE_NAME_EXPRESSION); props.add(FILENAME_TIME_FORMAT); diff --git a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/hdfs/sink/HDFSFileSinkPlugin.java b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/hdfs/sink/HDFSFileSinkPlugin.java index f5d66441e..66c91c65e 100644 --- a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/hdfs/sink/HDFSFileSinkPlugin.java +++ b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/hdfs/sink/HDFSFileSinkPlugin.java @@ -52,7 +52,7 @@ public HDFSFileSinkPlugin() { final List props = new ArrayList<>(); props.add(PATH); - props.add(FILE_FORMAT); + props.add(FILE_FORMAT_TYPE); props.add(CUSTOM_FILENAME); props.add(FILE_NAME_EXPRESSION); props.add(FILENAME_TIME_FORMAT); diff --git a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/local/sink/LocalFileSinkPlugin.java b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/local/sink/LocalFileSinkPlugin.java index 5832112bb..3b61468ab 100644 --- a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/local/sink/LocalFileSinkPlugin.java +++ b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/local/sink/LocalFileSinkPlugin.java @@ -43,7 +43,7 @@ public LocalFileSinkPlugin() { final List props = new ArrayList<>(); props.add(PATH); - props.add(FILE_FORMAT); + props.add(FILE_FORMAT_TYPE); props.add(CUSTOM_FILENAME); props.add(FILE_NAME_EXPRESSION); props.add(FILENAME_TIME_FORMAT); diff --git a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/oss/sink/OSSSinkPlugin.java b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/oss/sink/OSSSinkPlugin.java index f65938fad..e0c3ad746 100644 --- a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/oss/sink/OSSSinkPlugin.java +++ b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/oss/sink/OSSSinkPlugin.java @@ -50,7 +50,7 @@ public OSSSinkPlugin() { final List props = new ArrayList<>(); props.add(PATH); - props.add(FILE_FORMAT); + props.add(FILE_FORMAT_TYPE); props.add(CUSTOM_FILENAME); props.add(FILE_NAME_EXPRESSION); props.add(FILENAME_TIME_FORMAT); diff --git a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/ossjindo/sink/OSSJindoSinkPlugin.java b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/ossjindo/sink/OSSJindoSinkPlugin.java index cf3f77a08..2ad1997c1 100644 --- a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/ossjindo/sink/OSSJindoSinkPlugin.java +++ b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/ossjindo/sink/OSSJindoSinkPlugin.java @@ -50,7 +50,7 @@ public OSSJindoSinkPlugin() { final List props = new ArrayList<>(); props.add(PATH); - props.add(FILE_FORMAT); + props.add(FILE_FORMAT_TYPE); props.add(CUSTOM_FILENAME); props.add(FILE_NAME_EXPRESSION); props.add(FILENAME_TIME_FORMAT); diff --git a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/s3/sink/S3SinkPlugin.java b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/s3/sink/S3SinkPlugin.java index 0b14e1035..ffabfc8bf 100644 --- a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/s3/sink/S3SinkPlugin.java +++ b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/s3/sink/S3SinkPlugin.java @@ -51,7 +51,7 @@ public S3SinkPlugin() { final List props = new ArrayList<>(); props.add(HADOOP_S3_PROPERTIES); props.add(PATH); - props.add(FILE_FORMAT); + props.add(FILE_FORMAT_TYPE); props.add(CUSTOM_FILENAME); props.add(FILE_NAME_EXPRESSION); props.add(FILENAME_TIME_FORMAT); diff --git a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/sftp/sink/SftpFileSinkPlugin.java b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/sftp/sink/SftpFileSinkPlugin.java index d991c2f0b..10167ae99 100644 --- a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/sftp/sink/SftpFileSinkPlugin.java +++ b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/sftp/sink/SftpFileSinkPlugin.java @@ -50,7 +50,7 @@ public SftpFileSinkPlugin() { final List props = new ArrayList<>(); props.add(PATH); - props.add(FILE_FORMAT); + props.add(FILE_FORMAT_TYPE); props.add(CUSTOM_FILENAME); props.add(FILE_NAME_EXPRESSION); props.add(FILENAME_TIME_FORMAT); diff --git a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/s3redshift/sink/S3RedshiftSinkPlugin.java b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/s3redshift/sink/S3RedshiftSinkPlugin.java index 0676007b4..ffc002777 100644 --- a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/s3redshift/sink/S3RedshiftSinkPlugin.java +++ b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/s3redshift/sink/S3RedshiftSinkPlugin.java @@ -56,7 +56,7 @@ public S3RedshiftSinkPlugin() { props.add(HADOOP_S3_PROPERTIES); props.add(PATH); - props.add(FILE_FORMAT); + props.add(FILE_FORMAT_TYPE); props.add(FILE_NAME_EXPRESSION); props.add(FILENAME_TIME_FORMAT); props.add(FIELD_DELIMITER); diff --git a/scaleph-ui-react/src/pages/Project/Workspace/Artifact/DI/DiJobFlow/Dag/constant.tsx b/scaleph-ui-react/src/pages/Project/Workspace/Artifact/DI/DiJobFlow/Dag/constant.tsx index c41e08fa2..d7ed06a10 100644 --- a/scaleph-ui-react/src/pages/Project/Workspace/Artifact/DI/DiJobFlow/Dag/constant.tsx +++ b/scaleph-ui-react/src/pages/Project/Workspace/Artifact/DI/DiJobFlow/Dag/constant.tsx @@ -160,7 +160,6 @@ export const BaseFileParams = { dateFormat: 'date_format', timeFormat: 'time_format', datetimeFormat: 'datetime_format', - fileFormat: 'file_format', customFilename: 'custom_filename', fileNameExpression: 'file_name_expression', filenameTimeFormat: 'filename_time_format', diff --git a/scaleph-ui-react/src/pages/Project/Workspace/Artifact/DI/DiJobFlow/Dag/steps/sink/sink-ftp-file-step.tsx b/scaleph-ui-react/src/pages/Project/Workspace/Artifact/DI/DiJobFlow/Dag/steps/sink/sink-ftp-file-step.tsx index 3acd8afc4..e30ff03e8 100644 --- a/scaleph-ui-react/src/pages/Project/Workspace/Artifact/DI/DiJobFlow/Dag/steps/sink/sink-ftp-file-step.tsx +++ b/scaleph-ui-react/src/pages/Project/Workspace/Artifact/DI/DiJobFlow/Dag/steps/sink/sink-ftp-file-step.tsx @@ -80,14 +80,14 @@ const SinkFtpFileStepForm: React.FC< colProps={{span: 24}} /> - - {({file_format}) => { - if (file_format == 'text' || file_format == 'csv') { + + {({file_format_type}) => { + if (file_format_type == 'text' || file_format_type == 'csv') { return ( ); } - if (file_format == 'excel') { + if (file_format_type == 'excel') { return ( - - {({file_format}) => { - if (file_format == 'text' || file_format == 'csv') { + + {({file_format_type}) => { + if (file_format_type == 'text' || file_format_type == 'csv') { return ( ); } - if (file_format == 'excel') { + if (file_format_type == 'excel') { return ( - - {({file_format}) => { - if (file_format == 'text' || file_format == 'csv') { + + {({file_format_type}) => { + if (file_format_type == 'text' || file_format_type == 'csv') { return ( ); } - if (file_format == 'excel') { + if (file_format_type == 'excel') { return ( - - {({file_format}) => { - if (file_format == 'text' || file_format == 'csv') { + + {({file_format_type}) => { + if (file_format_type == 'text' || file_format_type == 'csv') { return ( ); } - if (file_format == 'excel') { + if (file_format_type == 'excel') { return ( - - {({file_format}) => { - if (file_format == 'text' || file_format == 'csv') { + + {({file_format_type}) => { + if (file_format_type == 'text' || file_format_type == 'csv') { return ( ); } - if (file_format == 'excel') { + if (file_format_type == 'excel') { return ( - - {({file_format}) => { - if (file_format == 'text' || file_format == 'csv') { + + {({file_format_type}) => { + if (file_format_type == 'text' || file_format_type == 'csv') { return ( ); } - if (file_format == 'excel') { + if (file_format_type == 'excel') { return ( - - {({file_format}) => { - if (file_format == 'text' || file_format == 'csv') { + + {({file_format_type}) => { + if (file_format_type == 'text' || file_format_type == 'csv') { return ( - - {({file_format}) => { - if (file_format == 'text' || file_format == 'csv') { + + {({file_format_type}) => { + if (file_format_type == 'text' || file_format_type == 'csv') { return ( ); } - if (file_format == 'excel') { + if (file_format_type == 'excel') { return ( Date: Tue, 22 Aug 2023 19:50:19 +0800 Subject: [PATCH 4/4] [Bugfix][Seatunnel] Seatunnel pulsar connector property type error. --- .../connectors/pulsar/source/PulsarSourceProperties.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/pulsar/source/PulsarSourceProperties.java b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/pulsar/source/PulsarSourceProperties.java index ab0750798..fbbb5c340 100644 --- a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/pulsar/source/PulsarSourceProperties.java +++ b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/pulsar/source/PulsarSourceProperties.java @@ -149,8 +149,8 @@ public enum PulsarSourceProperties { public static final PropertyDescriptor CURSOR_STOP_TIMESTAMP = new PropertyDescriptor.Builder() .name("cursor.stop.timestamp") .description("Stop from the specified epoch timestamp (in milliseconds).") - .type(PropertyType.STRING) - .parser(Parsers.STRING_PARSER) + .type(PropertyType.LONG) + .parser(Parsers.LONG_PARSER) .addValidator(Validators.NON_BLANK_VALIDATOR) .validateAndBuild();