From 19df5602f1f2d2aaa21bf33cab070e3377fb5122 Mon Sep 17 00:00:00 2001 From: kalencaya <1942460489@qq.com> Date: Mon, 28 Aug 2023 19:58:42 +0800 Subject: [PATCH] [Feature][scaleph-ui-react] upgrade seatunnel connectors to 2.3.3 (#609) * feature: upgrade seatunnel fakesource connectors to 2.3.3 * feature: upgrade seatunnel file connectors to 2.3.3 * feature: upgrade seatunnel http connectors to 2.3.3 * feature: upgrade seatunnel jdbc connectors to 2.3.3 * feature: upgrade seatunnel kafka source connector to 2.3.3 * feature: upgrade seatunnel kafka sink connector to 2.3.3 * temp commit: upgrade seatunnel mongodb connector to 2.3.3 * resolve conflicts --- .../seatunnel/SeaTunnelPluginMapping.java | 4 +- .../sink/ElasticsearchSinkPlugin.java | 12 ++-- .../email/sink/EmailSinkProperties.java | 16 ++--- .../fake/source/FakeProperties.java | 49 +++++++++++++++ .../fake/source/FakeSourcePlugin.java | 7 +++ .../connectors/file/FileSinkProperties.java | 8 +++ .../connectors/file/FileSourceProperties.java | 15 +++++ .../file/ftp/source/FtpFileSourcePlugin.java | 1 + .../hdfs/source/HDFSFileSourcePlugin.java | 1 + .../local/source/LocalFileSourcePlugin.java | 1 + .../file/oss/source/OSSSourcePlugin.java | 1 + .../ossjindo/source/OSSJindoSourcePlugin.java | 1 + .../file/s3/source/S3SourcePlugin.java | 1 + .../sftp/source/SftpFileSourcePlugin.java | 1 + .../hbase/sink/HbaseSinkProperties.java | 2 + .../hive/source/HiveSourcePlugin.java | 7 +-- .../hive/source/HiveSourceProperties.java | 43 ------------- .../connectors/iotdb/IoTDBProperties.java | 7 +++ .../iotdb/sink/IoTDBSinkProperties.java | 10 +-- .../flink/connectors/jdbc/JdbcProperties.java | 8 +++ .../connectors/jdbc/sink/JdbcSinkPlugin.java | 2 +- .../jdbc/sink/JdbcSinkProperties.java | 9 --- .../jdbc/source/JdbcSourcePlugin.java | 1 + .../connectors/kafka/KafkaProperties.java | 18 ++++++ .../kafka/sink/KafkaSinkProperties.java | 26 ++------ .../kafka/source/KafkaSourceProperties.java | 18 ------ .../connectors/mongodb/MongoDBProperties.java | 11 +++- .../mongodb/sink/MongoDBSinkProperties.java | 50 ++++++--------- .../mongodb/source/MongoDBSourcePlugin.java | 1 - .../source/MongoDBSourceProperties.java | 62 +++++++------------ .../src/locales/zh-CN/pages/project.ts | 44 ++++++++----- .../Artifact/DI/DiJobFlow/Dag/constant.tsx | 20 +++++- .../steps/sink/sink-elasticsearch-step.tsx | 6 +- .../Dag/steps/sink/sink-http-step.tsx | 15 +++++ .../Dag/steps/sink/sink-jdbc-step.tsx | 22 +++---- .../Dag/steps/sink/sink-kafka-step.tsx | 2 +- .../Dag/steps/sink/sink-mongodb-step.tsx | 10 ++- .../Dag/steps/source/source-fake-step.tsx | 50 ++++++++++++--- .../Dag/steps/source/source-ftp-file-step.tsx | 4 ++ .../steps/source/source-hdfs-file-step.tsx | 4 ++ .../Dag/steps/source/source-http-step.tsx | 15 +++++ .../Dag/steps/source/source-jdbc-step.tsx | 8 +++ .../Dag/steps/source/source-kafka-step.tsx | 28 ++++++--- .../steps/source/source-local-file-step.tsx | 4 ++ .../Dag/steps/source/source-mongodb-step.tsx | 38 +++++++++--- .../Dag/steps/source/source-oss-file-step.tsx | 4 ++ .../source/source-ossjindo-file-step.tsx | 4 ++ .../Dag/steps/source/source-s3-file-step.tsx | 4 ++ .../steps/source/source-sftp-file-step.tsx | 4 ++ .../build/scaleph-seatunnel/plugin_config | 25 +------- 50 files changed, 415 insertions(+), 289 deletions(-) delete mode 100644 scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/hive/source/HiveSourceProperties.java diff --git a/scaleph-common/src/main/java/cn/sliew/scaleph/common/dict/seatunnel/SeaTunnelPluginMapping.java b/scaleph-common/src/main/java/cn/sliew/scaleph/common/dict/seatunnel/SeaTunnelPluginMapping.java index ee0496ec3..5491ca1dc 100644 --- a/scaleph-common/src/main/java/cn/sliew/scaleph/common/dict/seatunnel/SeaTunnelPluginMapping.java +++ b/scaleph-common/src/main/java/cn/sliew/scaleph/common/dict/seatunnel/SeaTunnelPluginMapping.java @@ -86,8 +86,8 @@ public enum SeaTunnelPluginMapping { SINK_REDIS(SEATUNNEL, SINK, REDIS, "connector-redis", BETA), SOURCE_ELASTICSEARCH(SEATUNNEL, SOURCE, ELASTICSEARCH, "connector-elasticsearch", UNKNOWN, BATCH, COLUMN_PROJECTION), SINK_ELASTICSEARCH(SEATUNNEL, SINK, ELASTICSEARCH, "connector-elasticsearch", GA, CDC), - SOURCE_MONGODB(SEATUNNEL, SOURCE, MONGODB, "connector-mongodb", BETA, BATCH, COLUMN_PROJECTION), - SINK_MONGODB(SEATUNNEL, SINK, MONGODB, "connector-mongodb", BETA, BATCH, STREAM), + SOURCE_MONGODB(SEATUNNEL, SOURCE, MONGODB, "connector-mongodb", BETA, BATCH, EXACTLY_ONCE, COLUMN_PROJECTION, PARALLELISM, SUPPORT_USER_DEFINED_SPLIT), + SINK_MONGODB(SEATUNNEL, SINK, MONGODB, "connector-mongodb", BETA, EXACTLY_ONCE, CDC), SOURCE_AMAZON_DYNAMODB(SEATUNNEL, SOURCE, AMAZON_DYNAMODB, "connector-amazondynamodb", BETA, BATCH, COLUMN_PROJECTION), SINK_AMAZON_DYNAMODB(SEATUNNEL, SINK, AMAZON_DYNAMODB, "connector-amazondynamodb", BETA), SOURCE_CASSANDRA(SEATUNNEL, SOURCE, CASSANDRA, "connector-cassandra", BETA, BATCH, COLUMN_PROJECTION), diff --git a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/elasticsearch/sink/ElasticsearchSinkPlugin.java b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/elasticsearch/sink/ElasticsearchSinkPlugin.java index fb19093e4..90ec795fb 100644 --- a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/elasticsearch/sink/ElasticsearchSinkPlugin.java +++ b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/elasticsearch/sink/ElasticsearchSinkPlugin.java @@ -54,12 +54,12 @@ public ElasticsearchSinkPlugin() { props.add(KEY_DELIMITER); props.add(MAX_RETRY_COUNT); props.add(MAX_BATCH_SIZE); -// props.add(TLS_VERIFY_CERTIFICATE); -// props.add(TLS_VERIFY_HOSTNAMES); -// props.add(TLS_KEYSTORE_PATH); -// props.add(TLS_KEYSTORE_PASSWORD); -// props.add(TLS_TRUSTSTORE_PATH); -// props.add(TLS_TRUSTSTORE_PASSWORD); + props.add(TLS_VERIFY_CERTIFICATE); + props.add(TLS_VERIFY_HOSTNAMES); + props.add(TLS_KEYSTORE_PATH); + props.add(TLS_KEYSTORE_PASSWORD); + props.add(TLS_TRUSTSTORE_PATH); + props.add(TLS_TRUSTSTORE_PASSWORD); props.add(CommonProperties.PARALLELISM); props.add(CommonProperties.SOURCE_TABLE_NAME); this.supportedProperties = props; diff --git a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/email/sink/EmailSinkProperties.java b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/email/sink/EmailSinkProperties.java index a0f0b4d04..96b7714a6 100644 --- a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/email/sink/EmailSinkProperties.java +++ b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/email/sink/EmailSinkProperties.java @@ -23,7 +23,7 @@ public enum EmailSinkProperties { ; - public static final PropertyDescriptor EMAIL_HOST = new PropertyDescriptor.Builder() + public static final PropertyDescriptor EMAIL_HOST = new PropertyDescriptor.Builder() .name("email_host") .description("SMTP server host") .type(PropertyType.STRING) @@ -32,7 +32,7 @@ public enum EmailSinkProperties { .addValidator(Validators.NON_BLANK_VALIDATOR) .validateAndBuild(); - public static final PropertyDescriptor EMAIL_TRANSPORT_PROTOCOL = new PropertyDescriptor.Builder() + public static final PropertyDescriptor EMAIL_TRANSPORT_PROTOCOL = new PropertyDescriptor.Builder() .name("email_transport_protocol") .description("The protocol to load the session.") .type(PropertyType.STRING) @@ -41,7 +41,7 @@ public enum EmailSinkProperties { .addValidator(Validators.NON_BLANK_VALIDATOR) .validateAndBuild(); - public static final PropertyDescriptor EMAIL_FROM_ADDRESS = new PropertyDescriptor.Builder() + public static final PropertyDescriptor EMAIL_FROM_ADDRESS = new PropertyDescriptor.Builder() .name("email_from_address") .description("Sender Email Address") .type(PropertyType.STRING) @@ -50,7 +50,7 @@ public enum EmailSinkProperties { .addValidator(Validators.NON_BLANK_VALIDATOR) .validateAndBuild(); - public static final PropertyDescriptor EMAIL_SMTP_AUTH = new PropertyDescriptor.Builder() + public static final PropertyDescriptor EMAIL_SMTP_AUTH = new PropertyDescriptor.Builder() .name("email_smtp_auth") .description("Whether to authenticate the customer") .type(PropertyType.STRING) @@ -59,7 +59,7 @@ public enum EmailSinkProperties { .addValidator(Validators.NON_BLANK_VALIDATOR) .validateAndBuild(); - public static final PropertyDescriptor EMAIL_AUTHORIZATION_CODE = new PropertyDescriptor.Builder() + public static final PropertyDescriptor EMAIL_AUTHORIZATION_CODE = new PropertyDescriptor.Builder() .name("email_authorization_code") .description("authorization code,You can obtain the authorization code from the mailbox Settings.") .type(PropertyType.STRING) @@ -68,7 +68,7 @@ public enum EmailSinkProperties { .addValidator(Validators.NON_BLANK_VALIDATOR) .validateAndBuild(); - public static final PropertyDescriptor EMAIL_TO_ADDRESS = new PropertyDescriptor.Builder() + public static final PropertyDescriptor EMAIL_TO_ADDRESS = new PropertyDescriptor.Builder() .name("email_to_address") .description("Receiver Email Address") .type(PropertyType.STRING) @@ -77,7 +77,7 @@ public enum EmailSinkProperties { .addValidator(Validators.NON_BLANK_VALIDATOR) .validateAndBuild(); - public static final PropertyDescriptor EMAIL_MESSAGE_HEADLINE = new PropertyDescriptor.Builder() + public static final PropertyDescriptor EMAIL_MESSAGE_HEADLINE = new PropertyDescriptor.Builder() .name("email_message_headline") .description("The subject line of the entire message.") .type(PropertyType.STRING) @@ -86,7 +86,7 @@ public enum EmailSinkProperties { .addValidator(Validators.NON_BLANK_VALIDATOR) .validateAndBuild(); - public static final PropertyDescriptor EMAIL_MESSAGE_CONTENT = new PropertyDescriptor.Builder() + public static final PropertyDescriptor EMAIL_MESSAGE_CONTENT = new PropertyDescriptor.Builder() .name("email_message_content") .description("The body of the entire message.") .type(PropertyType.STRING) diff --git a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/fake/source/FakeProperties.java b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/fake/source/FakeProperties.java index b47fc7409..b5d47d39e 100644 --- a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/fake/source/FakeProperties.java +++ b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/fake/source/FakeProperties.java @@ -341,4 +341,53 @@ public enum FakeProperties { .addValidator(Validators.NON_BLANK_VALIDATOR) .validateAndBuild(); + + public static final PropertyDescriptor DATE_YEAR_TEMPLATE = new PropertyDescriptor.Builder() + .name("date.year.template") + .description("The template list of year of date like 'yyyy', if user configured it, connector will randomly select an item from the template list") + .type(PropertyType.OBJECT) + .parser(Parsers.JSON_PARSER) + .addValidator(Validators.NON_BLANK_VALIDATOR) + .validateAndBuild(); + + public static final PropertyDescriptor DATE_MONTH_TEMPLATE = new PropertyDescriptor.Builder() + .name("date.month.template") + .description("The template list of month of date like 'MM', if user configured it, connector will randomly select an item from the template list") + .type(PropertyType.OBJECT) + .parser(Parsers.JSON_PARSER) + .addValidator(Validators.NON_BLANK_VALIDATOR) + .validateAndBuild(); + + public static final PropertyDescriptor DATE_DAY_TEMPLATE = new PropertyDescriptor.Builder() + .name("date.day.template") + .description("The template list of day of date like 'dd', if user configured it, connector will randomly select an item from the template list") + .type(PropertyType.OBJECT) + .parser(Parsers.JSON_PARSER) + .addValidator(Validators.NON_BLANK_VALIDATOR) + .validateAndBuild(); + + public static final PropertyDescriptor TIME_HOUR_TEMPLATE = new PropertyDescriptor.Builder() + .name("time.hour.template") + .description("The template list of hour of time like 'HH', if user configured it, connector will randomly select an item from the template list") + .type(PropertyType.OBJECT) + .parser(Parsers.JSON_PARSER) + .addValidator(Validators.NON_BLANK_VALIDATOR) + .validateAndBuild(); + + public static final PropertyDescriptor TIME_MINUTE_TEMPLATE = new PropertyDescriptor.Builder() + .name("time.minute.template") + .description("The template list of minute of time like 'mm', if user configured it, connector will randomly select an item from the template list") + .type(PropertyType.OBJECT) + .parser(Parsers.JSON_PARSER) + .addValidator(Validators.NON_BLANK_VALIDATOR) + .validateAndBuild(); + + public static final PropertyDescriptor TIME_SECOND_TEMPLATE = new PropertyDescriptor.Builder() + .name("time.second.template") + .description("The template list of second of time like 'ss', if user configured it, connector will randomly select an item from the template list") + .type(PropertyType.OBJECT) + .parser(Parsers.JSON_PARSER) + .addValidator(Validators.NON_BLANK_VALIDATOR) + .validateAndBuild(); + } diff --git a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/fake/source/FakeSourcePlugin.java b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/fake/source/FakeSourcePlugin.java index 391346a65..291c24c3a 100644 --- a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/fake/source/FakeSourcePlugin.java +++ b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/fake/source/FakeSourcePlugin.java @@ -76,6 +76,13 @@ public FakeSourcePlugin() { props.add(DOUBLE_MIN); props.add(DOUBLE_MAX); props.add(DOUBLE_TEMPLATE); + + props.add(DATE_YEAR_TEMPLATE); + props.add(DATE_MONTH_TEMPLATE); + props.add(DATE_DAY_TEMPLATE); + props.add(TIME_HOUR_TEMPLATE); + props.add(TIME_MINUTE_TEMPLATE); + props.add(TIME_SECOND_TEMPLATE); props.add(CommonProperties.PARALLELISM); props.add(CommonProperties.RESULT_TABLE_NAME); supportedProperties = Collections.unmodifiableList(props); diff --git a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/FileSinkProperties.java b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/FileSinkProperties.java index a2cfc5697..adc4228dc 100644 --- a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/FileSinkProperties.java +++ b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/FileSinkProperties.java @@ -154,4 +154,12 @@ public enum FileSinkProperties { .addValidator(Validators.POSITIVE_INTEGER_VALIDATOR) .validateAndBuild(); + public static final PropertyDescriptor TMP_PATH = new PropertyDescriptor.Builder() + .name("tmp_path") + .description("Data write temporary path") + .type(PropertyType.STRING) + .parser(Parsers.STRING_PARSER) + .addValidator(Validators.NON_BLANK_VALIDATOR) + .validateAndBuild(); + } diff --git a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/FileSourceProperties.java b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/FileSourceProperties.java index 8b93c07fb..1210543a3 100644 --- a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/FileSourceProperties.java +++ b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/FileSourceProperties.java @@ -23,6 +23,13 @@ public enum FileSourceProperties { ; + public static final PropertyDescriptor FILE_FILTER_PATTERN = new PropertyDescriptor.Builder() + .name("file_filter_pattern") + .description("File pattern. The connector will filter some files base on the pattern.") + .type(PropertyType.STRING) + .parser(Parsers.STRING_PARSER) + .addValidator(Validators.NON_BLANK_VALIDATOR) + .validateAndBuild(); public static final PropertyDescriptor FILE_FORMAT_TYPE = new PropertyDescriptor.Builder() .name("file_format_type") @@ -50,6 +57,14 @@ public enum FileSourceProperties { .addValidator(Validators.NON_BLANK_VALIDATOR) .validateAndBuild(); + public static final PropertyDescriptor READ_PARTITIONS = new PropertyDescriptor.Builder() + .name("read_partitions") + .description("The partitions that the user want to read") + .type(PropertyType.OBJECT) + .parser(Parsers.JSON_PARSER) + .addValidator(Validators.NON_BLANK_VALIDATOR) + .validateAndBuild(); + public static final PropertyDescriptor SCHEMA = new PropertyDescriptor.Builder() .name("schema") .description("The schema information of upstream data.") diff --git a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/ftp/source/FtpFileSourcePlugin.java b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/ftp/source/FtpFileSourcePlugin.java index 525492310..521694e10 100644 --- a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/ftp/source/FtpFileSourcePlugin.java +++ b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/ftp/source/FtpFileSourcePlugin.java @@ -52,6 +52,7 @@ public FtpFileSourcePlugin() { props.add(PATH); props.add(SKIP_HEADER_ROW_NUMBER); props.add(SHEET_NAME); + props.add(FILE_FILTER_PATTERN); props.add(FILE_FORMAT_TYPE); props.add(READ_COLUMNS); props.add(SCHEMA); diff --git a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/hdfs/source/HDFSFileSourcePlugin.java b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/hdfs/source/HDFSFileSourcePlugin.java index 75b50f656..374cc6bde 100644 --- a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/hdfs/source/HDFSFileSourcePlugin.java +++ b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/hdfs/source/HDFSFileSourcePlugin.java @@ -54,6 +54,7 @@ public HDFSFileSourcePlugin() { props.add(PATH); props.add(SKIP_HEADER_ROW_NUMBER); props.add(SHEET_NAME); + props.add(FILE_FILTER_PATTERN); props.add(FILE_FORMAT_TYPE); props.add(READ_COLUMNS); props.add(SCHEMA); diff --git a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/local/source/LocalFileSourcePlugin.java b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/local/source/LocalFileSourcePlugin.java index 192810684..625ec5fc6 100644 --- a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/local/source/LocalFileSourcePlugin.java +++ b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/local/source/LocalFileSourcePlugin.java @@ -45,6 +45,7 @@ public LocalFileSourcePlugin() { props.add(PATH); props.add(SKIP_HEADER_ROW_NUMBER); props.add(SHEET_NAME); + props.add(FILE_FILTER_PATTERN); props.add(FILE_FORMAT_TYPE); props.add(READ_COLUMNS); props.add(SCHEMA); diff --git a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/oss/source/OSSSourcePlugin.java b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/oss/source/OSSSourcePlugin.java index 1eece86d6..1bf056fc9 100644 --- a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/oss/source/OSSSourcePlugin.java +++ b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/oss/source/OSSSourcePlugin.java @@ -52,6 +52,7 @@ public OSSSourcePlugin() { props.add(PATH); props.add(SKIP_HEADER_ROW_NUMBER); props.add(SHEET_NAME); + props.add(FILE_FILTER_PATTERN); props.add(FILE_FORMAT_TYPE); props.add(READ_COLUMNS); props.add(SCHEMA); diff --git a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/ossjindo/source/OSSJindoSourcePlugin.java b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/ossjindo/source/OSSJindoSourcePlugin.java index 141c91ce7..9d5029422 100644 --- a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/ossjindo/source/OSSJindoSourcePlugin.java +++ b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/ossjindo/source/OSSJindoSourcePlugin.java @@ -52,6 +52,7 @@ public OSSJindoSourcePlugin() { props.add(PATH); props.add(SKIP_HEADER_ROW_NUMBER); props.add(SHEET_NAME); + props.add(FILE_FILTER_PATTERN); props.add(FILE_FORMAT_TYPE); props.add(READ_COLUMNS); props.add(SCHEMA); diff --git a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/s3/source/S3SourcePlugin.java b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/s3/source/S3SourcePlugin.java index dcba3fe65..020abfb7d 100644 --- a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/s3/source/S3SourcePlugin.java +++ b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/s3/source/S3SourcePlugin.java @@ -53,6 +53,7 @@ public S3SourcePlugin() { props.add(PATH); props.add(SKIP_HEADER_ROW_NUMBER); props.add(SHEET_NAME); + props.add(FILE_FILTER_PATTERN); props.add(FILE_FORMAT_TYPE); props.add(READ_COLUMNS); props.add(SCHEMA); diff --git a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/sftp/source/SftpFileSourcePlugin.java b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/sftp/source/SftpFileSourcePlugin.java index 4d0a01bcd..32503c66d 100644 --- a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/sftp/source/SftpFileSourcePlugin.java +++ b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/file/sftp/source/SftpFileSourcePlugin.java @@ -52,6 +52,7 @@ public SftpFileSourcePlugin() { props.add(PATH); props.add(SKIP_HEADER_ROW_NUMBER); props.add(SHEET_NAME); + props.add(FILE_FILTER_PATTERN); props.add(FILE_FORMAT_TYPE); props.add(READ_COLUMNS); props.add(SCHEMA); diff --git a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/hbase/sink/HbaseSinkProperties.java b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/hbase/sink/HbaseSinkProperties.java index f5e4dc0d7..096a172df 100644 --- a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/hbase/sink/HbaseSinkProperties.java +++ b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/hbase/sink/HbaseSinkProperties.java @@ -61,6 +61,7 @@ public enum HbaseSinkProperties { .description("The mode of writing null value") .type(PropertyType.STRING) .parser(Parsers.STRING_PARSER) + .allowableValues("skip", "empty") .addValidator(Validators.NON_BLANK_VALIDATOR) .validateAndBuild(); @@ -86,6 +87,7 @@ public enum HbaseSinkProperties { .description("The encoding of string field") .type(PropertyType.STRING) .parser(Parsers.STRING_PARSER) + .allowableValues("utf8", "gbk") .addValidator(Validators.NON_BLANK_VALIDATOR) .validateAndBuild(); diff --git a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/hive/source/HiveSourcePlugin.java b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/hive/source/HiveSourcePlugin.java index 3451cb7fa..7feca9171 100644 --- a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/hive/source/HiveSourcePlugin.java +++ b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/hive/source/HiveSourcePlugin.java @@ -24,6 +24,7 @@ import cn.sliew.scaleph.plugin.framework.core.PluginInfo; import cn.sliew.scaleph.plugin.framework.property.PropertyDescriptor; import cn.sliew.scaleph.plugin.seatunnel.flink.SeaTunnelConnectorPlugin; +import cn.sliew.scaleph.plugin.seatunnel.flink.connectors.file.FileSourceProperties; import cn.sliew.scaleph.plugin.seatunnel.flink.env.CommonProperties; import cn.sliew.scaleph.plugin.seatunnel.flink.resource.ResourceProperties; import cn.sliew.scaleph.plugin.seatunnel.flink.resource.ResourceProperty; @@ -37,8 +38,6 @@ import java.util.List; import static cn.sliew.scaleph.plugin.seatunnel.flink.connectors.hive.HiveProperties.*; -import static cn.sliew.scaleph.plugin.seatunnel.flink.connectors.hive.source.HiveSourceProperties.READ_COLUMNS; -import static cn.sliew.scaleph.plugin.seatunnel.flink.connectors.hive.source.HiveSourceProperties.READ_PARTITIONS; @AutoService(SeaTunnelConnectorPlugin.class) public class HiveSourcePlugin extends SeaTunnelConnectorPlugin { @@ -50,8 +49,8 @@ public HiveSourcePlugin() { final List props = new ArrayList<>(); props.add(TABLE_NAME); - props.add(READ_PARTITIONS); - props.add(READ_COLUMNS); + props.add(FileSourceProperties.READ_COLUMNS); + props.add(FileSourceProperties.READ_PARTITIONS); props.add(CommonProperties.PARALLELISM); props.add(CommonProperties.RESULT_TABLE_NAME); supportedProperties = Collections.unmodifiableList(props); diff --git a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/hive/source/HiveSourceProperties.java b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/hive/source/HiveSourceProperties.java deleted file mode 100644 index 173e331b0..000000000 --- a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/hive/source/HiveSourceProperties.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cn.sliew.scaleph.plugin.seatunnel.flink.connectors.hive.source; - -import cn.sliew.scaleph.plugin.framework.property.*; -import com.fasterxml.jackson.databind.JsonNode; - -public enum HiveSourceProperties { - ; - - public static final PropertyDescriptor READ_PARTITIONS = new PropertyDescriptor.Builder() - .name("read_partitions") - .description("The target partitions that user want to read from hive table") - .type(PropertyType.OBJECT) - .parser(Parsers.JSON_PARSER) - .addValidator(Validators.NON_BLANK_VALIDATOR) - .validateAndBuild(); - - public static final PropertyDescriptor READ_COLUMNS = new PropertyDescriptor.Builder() - .name("read_columns") - .description("The read column list of the data source") - .type(PropertyType.OBJECT) - .parser(Parsers.JSON_PARSER) - .addValidator(Validators.NON_BLANK_VALIDATOR) - .validateAndBuild(); - -} diff --git a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/iotdb/IoTDBProperties.java b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/iotdb/IoTDBProperties.java index 49c416974..94232bc14 100644 --- a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/iotdb/IoTDBProperties.java +++ b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/iotdb/IoTDBProperties.java @@ -51,4 +51,11 @@ public enum IoTDBProperties { .addValidator(Validators.NON_BLANK_VALIDATOR) .validateAndBuild(); + public static final PropertyDescriptor CONNECTION_TIMEOUT_IN_MS = new PropertyDescriptor.Builder() + .name("connection_timeout_in_ms") + .description("The maximum time (in ms) to wait when connect IoTDB") + .type(PropertyType.INT) + .parser(Parsers.INTEGER_PARSER) + .addValidator(Validators.POSITIVE_INTEGER_VALIDATOR) + .validateAndBuild(); } diff --git a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/iotdb/sink/IoTDBSinkProperties.java b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/iotdb/sink/IoTDBSinkProperties.java index 55b7cb3b4..25f3304ba 100644 --- a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/iotdb/sink/IoTDBSinkProperties.java +++ b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/iotdb/sink/IoTDBSinkProperties.java @@ -129,13 +129,5 @@ public enum IoTDBSinkProperties { .parser(Parsers.BOOLEAN_PARSER) .addValidator(Validators.BOOLEAN_VALIDATOR) .validateAndBuild(); - - public static final PropertyDescriptor CONNECTION_TIMEOUT_IN_MS = new PropertyDescriptor.Builder() - .name("connection_timeout_in_ms") - .description("The maximum time (in ms) to wait when connect IoTDB") - .type(PropertyType.INT) - .parser(Parsers.INTEGER_PARSER) - .addValidator(Validators.POSITIVE_INTEGER_VALIDATOR) - .validateAndBuild(); - + } diff --git a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/jdbc/JdbcProperties.java b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/jdbc/JdbcProperties.java index b3594cdc9..dab516d0b 100644 --- a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/jdbc/JdbcProperties.java +++ b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/jdbc/JdbcProperties.java @@ -73,4 +73,12 @@ public enum JdbcProperties { .parser(Parsers.INTEGER_PARSER) .addValidator(Validators.POSITIVE_INTEGER_VALIDATOR) .validateAndBuild(); + + public static final PropertyDescriptor COMPATIBLE_MODE = new PropertyDescriptor.Builder() + .name("compatible_mode") + .description("The compatible mode of database") + .type(PropertyType.STRING) + .parser(Parsers.STRING_PARSER) + .addValidator(Validators.NON_BLANK_VALIDATOR) + .validateAndBuild(); } diff --git a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/jdbc/sink/JdbcSinkPlugin.java b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/jdbc/sink/JdbcSinkPlugin.java index bd6c01446..8ed9c22d7 100644 --- a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/jdbc/sink/JdbcSinkPlugin.java +++ b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/jdbc/sink/JdbcSinkPlugin.java @@ -48,6 +48,7 @@ public JdbcSinkPlugin() { final List props = new ArrayList<>(); props.add(CONNECTION_CHECK_TIMEOUT_SEC); + props.add(COMPATIBLE_MODE); props.add(DATABASE); props.add(TABLE); props.add(SUPPORT_UPSERT_BY_QUERY_PRIMARY_KEY_EXIST); @@ -56,7 +57,6 @@ public JdbcSinkPlugin() { props.add(QUERY); props.add(MAX_RETRIES); props.add(BATCH_SIZE); - props.add(BATCH_INTERVAL_MS); props.add(IS_EXACTLY_ONCE); props.add(XA_DATA_SOURCE_CLASS_NAME); props.add(MAX_COMMIT_ATTEMPTS); diff --git a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/jdbc/sink/JdbcSinkProperties.java b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/jdbc/sink/JdbcSinkProperties.java index f9c43e564..d6d3577ba 100644 --- a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/jdbc/sink/JdbcSinkProperties.java +++ b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/jdbc/sink/JdbcSinkProperties.java @@ -90,15 +90,6 @@ public enum JdbcSinkProperties { .addValidator(Validators.POSITIVE_INTEGER_VALIDATOR) .validateAndBuild(); - public static final PropertyDescriptor BATCH_INTERVAL_MS = new PropertyDescriptor.Builder() - .name("batch_interval_ms") - .description("For batch writing, when the number of buffers reaches the number of batch_size or the time reaches batch_interval_ms, the data will be flushed into the database") - .type(PropertyType.INT) - .defaultValue(1000) - .parser(Parsers.INTEGER_PARSER) - .addValidator(Validators.POSITIVE_INTEGER_VALIDATOR) - .validateAndBuild(); - public static final PropertyDescriptor IS_EXACTLY_ONCE = new PropertyDescriptor.Builder() .name("is_exactly_once") .description("Whether to enable exactly-once semantics, which will use Xa transactions. If on, you need to set xa_data_source_class_name.") diff --git a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/jdbc/source/JdbcSourcePlugin.java b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/jdbc/source/JdbcSourcePlugin.java index 34300d281..b0ac11000 100644 --- a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/jdbc/source/JdbcSourcePlugin.java +++ b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/jdbc/source/JdbcSourcePlugin.java @@ -48,6 +48,7 @@ public JdbcSourcePlugin() { final List props = new ArrayList<>(); props.add(CONNECTION_CHECK_TIMEOUT_SEC); + props.add(COMPATIBLE_MODE); props.add(QUERY); props.add(PARTITION_COLUMN); props.add(PARTITION_UPPER_BOUND); diff --git a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/kafka/KafkaProperties.java b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/kafka/KafkaProperties.java index bc7a2468f..9ae283f6f 100644 --- a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/kafka/KafkaProperties.java +++ b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/kafka/KafkaProperties.java @@ -43,6 +43,24 @@ public enum KafkaProperties { .addValidator(Validators.NON_BLANK_VALIDATOR) .validateAndBuild(); + public static final PropertyDescriptor FORMAT = new PropertyDescriptor.Builder() + .name("format") + .description("We support the following file types: text, json") + .type(PropertyType.STRING) + .parser(Parsers.STRING_PARSER) + .defaultValue("json") + .allowableValues("text", "json", "canal_json", "debezium_json", "compatible_debezium_json", "compatible_kafka_connect_json") + .addValidator(Validators.NON_BLANK_VALIDATOR) + .validateAndBuild(); + + public static final PropertyDescriptor FIELD_DELIMITER = new PropertyDescriptor.Builder() + .name("field_delimiter") + .description("The separator between columns in a row of data. Only needed by text and csv file format") + .type(PropertyType.STRING) + .parser(Parsers.STRING_PARSER) + .addValidator(Validators.NON_BLANK_VALIDATOR) + .validateAndBuild(); + public static final PropertyDescriptor KAFKA_CONF = new PropertyDescriptor.Builder() .name("kafka.config") .description( diff --git a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/kafka/sink/KafkaSinkProperties.java b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/kafka/sink/KafkaSinkProperties.java index e8e581019..54a9a9eb7 100644 --- a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/kafka/sink/KafkaSinkProperties.java +++ b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/kafka/sink/KafkaSinkProperties.java @@ -24,7 +24,7 @@ public enum KafkaSinkProperties { ; - public static final PropertyDescriptor SEMANTIC = new PropertyDescriptor.Builder() + public static final PropertyDescriptor SEMANTIC = new PropertyDescriptor.Builder() .name("semantics") .description("Semantics that can be chosen EXACTLY_ONCE/AT_LEAST_ONCE/NON, default NON.") .type(PropertyType.STRING) @@ -34,7 +34,7 @@ public enum KafkaSinkProperties { .addValidator(Validators.NON_BLANK_VALIDATOR) .validateAndBuild(); - public static final PropertyDescriptor PARTITION_KEY_FIELDS = new PropertyDescriptor.Builder() + public static final PropertyDescriptor PARTITION_KEY_FIELDS = new PropertyDescriptor.Builder() .name("partition_key_fields") .description("Configure which fields are used as the key of the kafka message.") .type(PropertyType.OBJECT) @@ -42,7 +42,7 @@ public enum KafkaSinkProperties { .addValidator(Validators.NON_BLANK_VALIDATOR) .validateAndBuild(); - public static final PropertyDescriptor PARTITION = new PropertyDescriptor.Builder() + public static final PropertyDescriptor PARTITION = new PropertyDescriptor.Builder() .name("partition") .description( "We can specify the partition, all messages will be sent to this partition.") @@ -59,7 +59,7 @@ public enum KafkaSinkProperties { .addValidator(Validators.NON_BLANK_VALIDATOR) .validateAndBuild(); - public static final PropertyDescriptor TRANSACTION_PREFIX = new PropertyDescriptor.Builder() + public static final PropertyDescriptor TRANSACTION_PREFIX = new PropertyDescriptor.Builder() .name("transaction_prefix") .description("If semantic is specified as EXACTLY_ONCE, the producer will write all messages in a Kafka transaction") .type(PropertyType.STRING) @@ -67,22 +67,4 @@ public enum KafkaSinkProperties { .addValidator(Validators.NON_BLANK_VALIDATOR) .validateAndBuild(); - public static final PropertyDescriptor FORMAT = new PropertyDescriptor.Builder() - .name("format") - .description("Data format.") - .type(PropertyType.STRING) - .parser(Parsers.STRING_PARSER) - .allowableValues("text", "json") - .addValidator(Validators.NON_BLANK_VALIDATOR) - .validateAndBuild(); - - public static final PropertyDescriptor FIELD_DELIMITER = new PropertyDescriptor.Builder() - .name("field_delimiter") - .description("Customize the field delimiter for data format.") - .type(PropertyType.STRING) - .parser(Parsers.STRING_PARSER) - .defaultValue(",") - .addValidator(Validators.NON_BLANK_VALIDATOR) - .validateAndBuild(); - } diff --git a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/kafka/source/KafkaSourceProperties.java b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/kafka/source/KafkaSourceProperties.java index 0b050bba5..465556127 100644 --- a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/kafka/source/KafkaSourceProperties.java +++ b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/kafka/source/KafkaSourceProperties.java @@ -64,16 +64,6 @@ public enum KafkaSourceProperties { .addValidator(Validators.NON_BLANK_VALIDATOR) .validateAndBuild(); - public static final PropertyDescriptor FORMAT = new PropertyDescriptor.Builder() - .name("format") - .description("We support the following file types: text, json") - .type(PropertyType.STRING) - .parser(Parsers.STRING_PARSER) - .defaultValue("json") - .allowableValues("text", "json") - .addValidator(Validators.NON_BLANK_VALIDATOR) - .validateAndBuild(); - public static final PropertyDescriptor FORMAT_ERROR_HANDLE_WAY = new PropertyDescriptor.Builder() .name("format_error_handle_way") .description("The processing method of data format error.") @@ -84,14 +74,6 @@ public enum KafkaSourceProperties { .addValidator(Validators.NON_BLANK_VALIDATOR) .validateAndBuild(); - public static final PropertyDescriptor FIELD_DELIMITER = new PropertyDescriptor.Builder() - .name("field_delimiter") - .description("The separator between columns in a row of data. Only needed by text and csv file format") - .type(PropertyType.STRING) - .parser(Parsers.STRING_PARSER) - .addValidator(Validators.NON_BLANK_VALIDATOR) - .validateAndBuild(); - public static final PropertyDescriptor START_MODE = new PropertyDescriptor.Builder() .name("start_mode") .description("The initial consumption pattern of consumers") diff --git a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/mongodb/MongoDBProperties.java b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/mongodb/MongoDBProperties.java index f98fc3fe9..c9441b89c 100644 --- a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/mongodb/MongoDBProperties.java +++ b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/mongodb/MongoDBProperties.java @@ -19,6 +19,7 @@ package cn.sliew.scaleph.plugin.seatunnel.flink.connectors.mongodb; import cn.sliew.scaleph.plugin.framework.property.*; +import com.fasterxml.jackson.databind.JsonNode; public enum MongoDBProperties { ; @@ -47,7 +48,15 @@ public enum MongoDBProperties { .type(PropertyType.STRING) .parser(Parsers.STRING_PARSER) .properties(Property.Required) - .addValidator(Validators.POSITIVE_LONG_VALIDATOR) + .addValidator(Validators.NON_BLANK_VALIDATOR) + .validateAndBuild(); + + public static final PropertyDescriptor SCHEMA = new PropertyDescriptor.Builder() + .name("schema") + .description("The schema information of upstream data.") + .type(PropertyType.OBJECT) + .parser(Parsers.JSON_PARSER) + .addValidator(Validators.NON_BLANK_VALIDATOR) .validateAndBuild(); } diff --git a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/mongodb/sink/MongoDBSinkProperties.java b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/mongodb/sink/MongoDBSinkProperties.java index 1a25d75ad..128eaf5f5 100644 --- a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/mongodb/sink/MongoDBSinkProperties.java +++ b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/mongodb/sink/MongoDBSinkProperties.java @@ -14,7 +14,6 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * */ package cn.sliew.scaleph.plugin.seatunnel.flink.connectors.mongodb.sink; @@ -23,56 +22,47 @@ import cn.sliew.scaleph.plugin.framework.property.PropertyDescriptor; import cn.sliew.scaleph.plugin.framework.property.PropertyType; import cn.sliew.scaleph.plugin.framework.property.Validators; -import com.fasterxml.jackson.databind.JsonNode; import java.util.Collections; import java.util.List; -@SuppressWarnings("unchecked") -public class MongoDBSinkProperties { - - public static final PropertyDescriptor SCHEMA = new PropertyDescriptor.Builder<>() - .name("schema") - .description("The schema information of upstream data.") - .type(PropertyType.OBJECT) - .parser(Parsers.JSON_PARSER) - .addValidator(Validators.NON_BLANK_VALIDATOR) - .validateAndBuild(); +public enum MongoDBSinkProperties { + ; - public static final PropertyDescriptor BUFFER_FLUSH_MAX_ROWS = new PropertyDescriptor.Builder<>() + public static final PropertyDescriptor BUFFER_FLUSH_MAX_ROWS = new PropertyDescriptor.Builder() .name("buffer-flush.max-rows") .description("Specifies the maximum number of buffered rows per batch request.") - .type(PropertyType.STRING) + .type(PropertyType.INT) .parser(Parsers.INTEGER_PARSER) .defaultValue(1000) .addValidator(Validators.INTEGER_VALIDATOR, Validators.POSITIVE_INTEGER_VALIDATOR) .validateAndBuild(); - public static final PropertyDescriptor BUFFER_FLUSH_INTERVAL = new PropertyDescriptor.Builder<>() + public static final PropertyDescriptor BUFFER_FLUSH_INTERVAL = new PropertyDescriptor.Builder() .name("buffer-flush.interval") .description("Specifies the maximum interval of buffered rows per batch request, the unit is millisecond.") - .type(PropertyType.LONG) - .parser(Parsers.LONG_PARSER) - .defaultValue(30000L) - .addValidator(Validators.LONG_VALIDATOR, Validators.POSITIVE_LONG_VALIDATOR) + .type(PropertyType.INT) + .parser(Parsers.INTEGER_PARSER) + .defaultValue(30000) + .addValidator(Validators.LONG_VALIDATOR, Validators.POSITIVE_INTEGER_VALIDATOR) .validateAndBuild(); - public static final PropertyDescriptor RETRY_MAX = new PropertyDescriptor.Builder<>() + public static final PropertyDescriptor RETRY_MAX = new PropertyDescriptor.Builder() .name("retry.max") .description("Specifies the max number of retry if writing records to database failed.") .type(PropertyType.INT) .parser(Parsers.INTEGER_PARSER) .defaultValue(3) - .addValidator(Validators.INTEGER_VALIDATOR, Validators.POSITIVE_INTEGER_VALIDATOR) + .addValidator(Validators.NON_BLANK_VALIDATOR) .validateAndBuild(); - public static final PropertyDescriptor RETRY_INTERVAL = new PropertyDescriptor.Builder<>() + public static final PropertyDescriptor RETRY_INTERVAL = new PropertyDescriptor.Builder() .name("retry.interval") .description("Specifies the retry time interval if writing records to database failed, the unit is millisecond.") - .type(PropertyType.LONG) - .parser(Parsers.LONG_PARSER) - .defaultValue(1000L) - .addValidator(Validators.LONG_VALIDATOR, Validators.POSITIVE_LONG_VALIDATOR) + .type(PropertyType.INT) + .parser(Parsers.INTEGER_PARSER) + .defaultValue(1000) + .addValidator(Validators.NON_BLANK_VALIDATOR) .validateAndBuild(); public static final PropertyDescriptor UPSERT_ENABLE = new PropertyDescriptor.Builder<>() @@ -80,17 +70,16 @@ public class MongoDBSinkProperties { .description("Whether to write documents via upsert mode.") .type(PropertyType.BOOLEAN) .parser(Parsers.BOOLEAN_PARSER) - .defaultValue(false) .addValidator(Validators.BOOLEAN_VALIDATOR) .validateAndBuild(); - public static final PropertyDescriptor PRIMARY_KEY = new PropertyDescriptor.Builder<>() + public static final PropertyDescriptor> PRIMARY_KEY = new PropertyDescriptor.Builder<>() .name("primary-key") .description("The primary keys for upsert/update. Keys are in [\"id\",\"name\",...] format for properties.") .type(PropertyType.OBJECT) - .parser(Parsers.JSON_PARSER) + .parser(Parsers.STRING_ARRAY_PARSER) .defaultValue(Collections.EMPTY_LIST) - .addValidator(Validators.BOOLEAN_VALIDATOR) + .addValidator(Validators.NON_BLANK_VALIDATOR) .validateAndBuild(); public static final PropertyDescriptor TRANSACTION = new PropertyDescriptor.Builder<>() @@ -102,5 +91,4 @@ public class MongoDBSinkProperties { .addValidator(Validators.BOOLEAN_VALIDATOR) .validateAndBuild(); - } diff --git a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/mongodb/source/MongoDBSourcePlugin.java b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/mongodb/source/MongoDBSourcePlugin.java index 5a9ae138f..ee64897e8 100644 --- a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/mongodb/source/MongoDBSourcePlugin.java +++ b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/mongodb/source/MongoDBSourcePlugin.java @@ -49,7 +49,6 @@ public MongoDBSourcePlugin() { final List props = new ArrayList<>(); props.add(DATABASE); props.add(COLLECTION); - props.add(MATCH_QUERY); props.add(SCHEMA); props.add(MATCH_QUERY); props.add(MATCH_PROJECTION); diff --git a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/mongodb/source/MongoDBSourceProperties.java b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/mongodb/source/MongoDBSourceProperties.java index 01ef96976..60b083cbf 100644 --- a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/mongodb/source/MongoDBSourceProperties.java +++ b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/mongodb/source/MongoDBSourceProperties.java @@ -18,38 +18,32 @@ package cn.sliew.scaleph.plugin.seatunnel.flink.connectors.mongodb.source; -import cn.sliew.scaleph.plugin.framework.property.*; -import com.fasterxml.jackson.databind.JsonNode; +import cn.sliew.scaleph.plugin.framework.property.Parsers; +import cn.sliew.scaleph.plugin.framework.property.PropertyDescriptor; +import cn.sliew.scaleph.plugin.framework.property.PropertyType; +import cn.sliew.scaleph.plugin.framework.property.Validators; @SuppressWarnings("unchecked") public enum MongoDBSourceProperties { ; - public static final PropertyDescriptor SCHEMA = new PropertyDescriptor.Builder<>() - .name("schema") - .description("The schema information of upstream data.") - .type(PropertyType.OBJECT) - .parser(Parsers.JSON_PARSER) - .addValidator(Validators.NON_BLANK_VALIDATOR) - .validateAndBuild(); - - public static final PropertyDescriptor MATCH_QUERY = new PropertyDescriptor.Builder<>() + public static final PropertyDescriptor MATCH_QUERY = new PropertyDescriptor.Builder() .name("match.query") - .description("match.query is a JSON string that specifies the selection criteria using query operators for the documents to be returned from the collection.") + .description("MatchQuery is a JSON string that specifies the selection criteria using query operators for the documents to be returned from the collection.") .type(PropertyType.STRING) .parser(Parsers.STRING_PARSER) .addValidator(Validators.NON_BLANK_VALIDATOR) .validateAndBuild(); - public static final PropertyDescriptor MATCH_PROJECTION = new PropertyDescriptor.Builder<>() + public static final PropertyDescriptor MATCH_PROJECTION = new PropertyDescriptor.Builder() .name("match.projection") - .description("In MongoDB, Projection is used to control the fields contained in the query results") + .description("MatchQuery is a JSON string that specifies the selection criteria using query operators for the documents to be returned from the collection.") .type(PropertyType.STRING) .parser(Parsers.STRING_PARSER) .addValidator(Validators.NON_BLANK_VALIDATOR) .validateAndBuild(); - public static final PropertyDescriptor PARTITION_SPLIT_KEY = new PropertyDescriptor.Builder<>() + public static final PropertyDescriptor PARTITION_SPLIT_KEY = new PropertyDescriptor.Builder() .name("partition.split-key") .description("The key of Mongodb fragmentation.") .type(PropertyType.STRING) @@ -58,61 +52,49 @@ public enum MongoDBSourceProperties { .addValidator(Validators.NON_BLANK_VALIDATOR) .validateAndBuild(); - public static final PropertyDescriptor PARTITION_SPLIT_SIZE = new PropertyDescriptor.Builder<>() + public static final PropertyDescriptor PARTITION_SPLIT_SIZE = new PropertyDescriptor.Builder() .name("partition.split-size") .description("The size of Mongodb fragment.") - .type(PropertyType.LONG) + .type(PropertyType.INT) .parser(Parsers.LONG_PARSER) - .defaultValue(67108864L) // default 64M - .addValidator(Validators.NON_BLANK_VALIDATOR) + .defaultValue(1024 * 1024 * 64L) // default 64M + .addValidator(Validators.POSITIVE_LONG_VALIDATOR) .validateAndBuild(); - public static final PropertyDescriptor CURSOR_NO_TIMEOUT = new PropertyDescriptor.Builder<>() + public static final PropertyDescriptor CURSOR_NO_TIMEOUT = new PropertyDescriptor.Builder() .name("cursor.no-timeout") - .description("MongoDB server normally times out idle cursors after an inactivity period (10 minutes) to prevent excess memory use. " + - "Set this option to true to prevent that. However, " + - "if the application takes longer than 30 minutes to process the current batch of documents, " + - "the session is marked as expired and closed.") + .description("MongoDB server normally times out idle cursors after an inactivity period (10 minutes) to prevent excess memory use") .type(PropertyType.BOOLEAN) .parser(Parsers.BOOLEAN_PARSER) .defaultValue(true) .addValidator(Validators.BOOLEAN_VALIDATOR) .validateAndBuild(); - public static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder<>() + public static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder() .name("fetch.size") - .description("Set the number of documents obtained from the server for each batch." + - " Setting the appropriate batch size can improve query performance and avoid the memory pressure" + - " caused by obtaining a large amount of data at one time.") + .description("Set the number of documents obtained from the server for each batch.") .type(PropertyType.INT) .parser(Parsers.INTEGER_PARSER) .defaultValue(2048) .addValidator(Validators.INTEGER_VALIDATOR, Validators.POSITIVE_INTEGER_VALIDATOR) .validateAndBuild(); - public static final PropertyDescriptor MAX_TIME_MIN = new PropertyDescriptor.Builder<>() + public static final PropertyDescriptor MAX_TIME_MIN = new PropertyDescriptor.Builder() .name("max.time-min") - .description("This parameter is a MongoDB query option that limits the maximum execution time for query operations. " + - "The value of maxTimeMin is in Minute." + - " If the execution time of the query exceeds the specified time limit, " + - "MongoDB will terminate the operation and return an error.") - .type(PropertyType.LONG) + .description("This parameter is a MongoDB query option that limits the maximum execution time for query operations.") + .type(PropertyType.INT) .parser(Parsers.LONG_PARSER) .defaultValue(600L) .addValidator(Validators.LONG_VALIDATOR, Validators.POSITIVE_LONG_VALIDATOR) .validateAndBuild(); - public static final PropertyDescriptor FLAT_SYNC_STRING = new PropertyDescriptor.Builder<>() + public static final PropertyDescriptor FLAT_SYNC_STRING = new PropertyDescriptor.Builder() .name("flat.sync-string") - .description("By utilizing flatSyncString, only one field attribute value can be set," + - " and the field type must be a String." + - " This operation will perform a string mapping on a single MongoDB data entry.") + .description("By utilizing flatSyncString, only one field attribute value can be set, and the field type must be a String.") .type(PropertyType.BOOLEAN) .parser(Parsers.BOOLEAN_PARSER) .defaultValue(true) .addValidator(Validators.BOOLEAN_VALIDATOR) .validateAndBuild(); - - } diff --git a/scaleph-ui-react/src/locales/zh-CN/pages/project.ts b/scaleph-ui-react/src/locales/zh-CN/pages/project.ts index 738bafa51..5226a73c2 100644 --- a/scaleph-ui-react/src/locales/zh-CN/pages/project.ts +++ b/scaleph-ui-react/src/locales/zh-CN/pages/project.ts @@ -132,6 +132,8 @@ export default { // jdbc 'pages.project.di.step.jdbc.connectionCheckTimeoutSec': '数据源连接超时(秒)', + 'pages.project.di.step.jdbc.compatibleMode': '数据库兼容模式', + 'pages.project.di.step.jdbc.compatibleMode.tooltip': '如 OceanBase 支持 Oracle、MySQL 模式, 用户需要选择使用 Oracle 或 MySQL', 'pages.project.di.step.jdbc.database': '数据库', 'pages.project.di.step.jdbc.table': '表', 'pages.project.di.step.jdbc.table.tooltip': '优先级高于SQL', @@ -155,7 +157,6 @@ export default { 'pages.project.di.step.jdbc.getsql': '获取SQL', 'pages.project.di.step.jdbc.preview': '预览结果', 'pages.project.di.step.jdbc.batchSize': '批次数量', - 'pages.project.di.step.jdbc.batchIntervalMs': '批次间隔(毫秒)', 'pages.project.di.step.jdbc.batch.tooltip': 'record buffer 到达批次数量或时间到批次间隔(毫秒)时,会提交 record buffer', 'pages.project.di.step.jdbc.maxRetries': '批次重试次数', @@ -204,9 +205,16 @@ export default { 'pages.project.di.step.fake.doubleMin': 'double 最小值', 'pages.project.di.step.fake.doubleMax': 'double 最大值', 'pages.project.di.step.fake.doubleTemplate': 'double 数据', + 'pages.project.di.step.fake.dateYearTemplate': '日期-年(yyyy)', + 'pages.project.di.step.fake.dateMonthTemplate': '日期-月(MM)', + 'pages.project.di.step.fake.dateDayTemplate': '日期-日(dd)', + 'pages.project.di.step.fake.timeHourTemplate': '时间-小时(HH)', + 'pages.project.di.step.fake.timeMinuteTemplate': '时间-分钟(mm)', + 'pages.project.di.step.fake.timeSecondTemplate': '时间-秒(ss)', // base file 'pages.project.di.step.baseFile.path': '路径', + 'pages.project.di.step.baseFile.fileFilterPattern': '文件过滤表达式', 'pages.project.di.step.baseFile.fileFormatType': '文件格式', 'pages.project.di.step.baseFile.readColumns': '列', 'pages.project.di.step.baseFile.schema': '结构', @@ -375,10 +383,13 @@ export default { 'pages.project.di.step.kudu.savemode': '保存模式', // kafka - 'pages.project.di.step.kafka.topic': '主题', + 'pages.project.di.step.kafka.topic': 'Topic', 'pages.project.di.step.kafka.topic.placeholder': 'topic1,topic2', - 'pages.project.di.step.kafka.pattern': 'pattern', + 'pages.project.di.step.kafka.pattern': '启用Topic 正则表达式', + 'pages.project.di.step.kafka.pattern.tooltip': + '使用正则表达式作为 Topic,与正则表达式匹配的所有 Topic 都将被消费者订阅', 'pages.project.di.step.kafka.partitionDiscoveryIntervalMillis': '动态发现主题和分区间隔(毫秒)', + 'pages.project.di.step.kafka.partitionDiscoveryIntervalMillis.tooltip': '-1: 禁用', 'pages.project.di.step.kafka.conf': 'Kafka配置', 'pages.project.di.step.kafka.conf.tooltip': '指定参数的方式是加上前缀"kafka."到原始参数名称。 比如指定auto.offset.reset的方式是:kafka.auto.offset.reset = latest', @@ -387,9 +398,7 @@ export default { 'pages.project.di.step.kafka.conf.key.placeholder': 'auto.offset.reset', 'pages.project.di.step.kafka.conf.value': '配置值', 'pages.project.di.step.kafka.conf.value.placeholder': 'latest', - 'pages.project.di.step.kafka.pattern.tooltip': - '如果 pattern 设置为 true,则为要读取的主题名称模式的正则表达式。 客户端中名称与指定正则表达式匹配的所有主题都将被消费者订阅。', - 'pages.project.di.step.kafka.consumerGroup': '消费者组ID', + 'pages.project.di.step.kafka.consumerGroup': 'Kafka Group Id', 'pages.project.di.step.kafka.consumerGroup.tooltip': 'Kafka consumer group id,用于区分不同的消费组', 'pages.project.di.step.kafka.commit_on_checkpoint': '在Flink Checkpoint时自动提交偏移量', @@ -440,21 +449,23 @@ export default { 'pages.project.di.step.mongodb.uri': 'URI', 'pages.project.di.step.mongodb.database': '数据库', 'pages.project.di.step.mongodb.collection': '集合', - 'pages.project.di.step.mongodb.matchQuery': '查询条件(match.query)', - 'pages.project.di.step.mongodb.matchProjection': '投影(match.projection)', + 'pages.project.di.step.mongodb.matchQuery': '查询 Query', + 'pages.project.di.step.mongodb.matchProjection': '查询 Project', 'pages.project.di.step.mongodb.partitionSplitKey': '分区主键', - 'pages.project.di.step.mongodb.partitionSplitSize': '分区大小(Byte)', - 'pages.project.di.step.mongodb.cursorNoTimeout': '游标不超时', + 'pages.project.di.step.mongodb.partitionSplitSize': '分区大小(Byte)', + 'pages.project.di.step.mongodb.cursorNoTimeout': '禁用服务端 Cursor 超时', 'pages.project.di.step.mongodb.fetchSize': '批量大小', - 'pages.project.di.step.mongodb.maxTimeMin': '最大查询时间(分钟)', + 'pages.project.di.step.mongodb.maxTimeMin': '最大查询时间(分钟)', 'pages.project.di.step.mongodb.flatSyncString': '宽字符串映射(此操作将对单个MongoDB数据条目执行字符串映射。)', - 'pages.project.di.step.mongodb.bufferFlushMaxRows': '批量写入条数', - 'pages.project.di.step.mongodb.bufferFlushInterval': '批量刷写间隔(毫秒)', - 'pages.project.di.step.mongodb.retryMax': '最大重试次数', + + 'pages.project.di.step.mongodb.bufferFlushMaxRows': 'Buffer Size', + 'pages.project.di.step.mongodb.bufferFlushInterval': 'Buffer Flush 频率(毫秒)', + 'pages.project.di.step.mongodb.retryMax': '重试次数', 'pages.project.di.step.mongodb.retryInterval': '重试间隔(毫秒)', - 'pages.project.di.step.mongodb.upsertEnable': '启用Upsert模式', + 'pages.project.di.step.mongodb.upsertEnable': '启用 Upsert', 'pages.project.di.step.mongodb.primaryKey': '主键', - 'pages.project.di.step.mongodb.transaction': '启用事务(MongoDB 4.2+)', + 'pages.project.di.step.mongodb.primaryKey.placeholder': 'id, name', + 'pages.project.di.step.mongodb.transaction': '启用事(MongoDB 4.2+)', // redis 'pages.project.di.step.redis.host': '主机', @@ -850,7 +861,6 @@ export default { 'pages.project.flink.kubernetes.job.detail.suspend': 'Suspend', 'pages.project.flink.kubernetes.job.detail.resume': 'Resume', - 'pages.project.flink.kubernetes.job.detail.savepoint': 'Savepoint', 'pages.project.flink.kubernetes.job.detail.flinkui': 'Flink UI', 'pages.project.flink.kubernetes.job.detail.metrics': 'Metrics', 'pages.project.flink.kubernetes.job.detail.logs': 'Logs', diff --git a/scaleph-ui-react/src/pages/Project/Workspace/Artifact/DI/DiJobFlow/Dag/constant.tsx b/scaleph-ui-react/src/pages/Project/Workspace/Artifact/DI/DiJobFlow/Dag/constant.tsx index 67b9bd53f..779b4fa63 100644 --- a/scaleph-ui-react/src/pages/Project/Workspace/Artifact/DI/DiJobFlow/Dag/constant.tsx +++ b/scaleph-ui-react/src/pages/Project/Workspace/Artifact/DI/DiJobFlow/Dag/constant.tsx @@ -95,25 +95,31 @@ export const FakeParams = { mapSize: 'map.size', arraySize: 'array.size', bytesLength: 'bytes.length', - stringFakeMode: 'string.fake.mode', stringLength: 'string.length', + + stringFakeMode: 'string.fake.mode', stringTemplate: 'string.template', + tinyintFakeMode: 'tinyint.fake.mode', tinyintMin: 'tinyint.min', tinyintMax: 'tinyint.max', tinyintTemplate: 'tinyint.template', + smallintFakeMode: 'smallint.fake.mode', smallintMin: 'smallint.min', smallintMax: 'smallint.max', smallintTemplate: 'smallint.template', + intFakeMode: 'int.fake.mode', intMin: 'int.min', intMax: 'int.max', intTemplate: 'int.template', + bigintFakeMode: 'bigint.fake.mode', bigintMin: 'bigint.min', bigintMax: 'bigint.max', bigintTemplate: 'bigint.template', + floatFakeMode: 'float.fake.mode', floatMin: 'float.min', floatMax: 'float.max', @@ -122,11 +128,19 @@ export const FakeParams = { doubleFakeMode: 'double.fake.mode', doubleMin: 'double.min', doubleMax: 'double.max', - doubleTemplate: 'double.template' + doubleTemplate: 'double.template', + + dateYearTemplate: 'date.year.template', + dateMonthTemplate: 'date.month.template', + dateDayTemplate: 'date.day.template', + timeHourTemplate: 'time.hour.template', + timeMinuteTemplate: 'time.minute.template', + timeSecondTemplate: 'time.second.template' } export const JdbcParams = { connectionCheckTimeoutSec: 'connection_check_timeout_sec', + compatibleMode: 'compatible_mode', database: 'database', table: 'table', supportUpsert: 'support_upsert_by_query_primary_key_exist', @@ -141,7 +155,6 @@ export const JdbcParams = { partitionNum: 'partition_num', fetchSize: 'fetch_size', batchSize: 'batch_size', - batchIntervalMs: 'batch_interval_ms', maxRetries: 'max_retries', isExactlyOnce: 'is_exactly_once', xaDataSourceClassName: 'xa_data_source_class_name', @@ -152,6 +165,7 @@ export const JdbcParams = { export const BaseFileParams = { path: 'path', + fileFilterPattern: 'file_filter_pattern', fileFormatType: 'file_format_type', readColumns: 'read_columns', schema: 'schema', diff --git a/scaleph-ui-react/src/pages/Project/Workspace/Artifact/DI/DiJobFlow/Dag/steps/sink/sink-elasticsearch-step.tsx b/scaleph-ui-react/src/pages/Project/Workspace/Artifact/DI/DiJobFlow/Dag/steps/sink/sink-elasticsearch-step.tsx index b743a90cd..f9f68e4e6 100644 --- a/scaleph-ui-react/src/pages/Project/Workspace/Artifact/DI/DiJobFlow/Dag/steps/sink/sink-elasticsearch-step.tsx +++ b/scaleph-ui-react/src/pages/Project/Workspace/Artifact/DI/DiJobFlow/Dag/steps/sink/sink-elasticsearch-step.tsx @@ -88,15 +88,15 @@ const SinkElasticsearchStepForm: React.FC diff --git a/scaleph-ui-react/src/pages/Project/Workspace/Artifact/DI/DiJobFlow/Dag/steps/sink/sink-jdbc-step.tsx b/scaleph-ui-react/src/pages/Project/Workspace/Artifact/DI/DiJobFlow/Dag/steps/sink/sink-jdbc-step.tsx index 18cc145c3..7860ae66a 100644 --- a/scaleph-ui-react/src/pages/Project/Workspace/Artifact/DI/DiJobFlow/Dag/steps/sink/sink-jdbc-step.tsx +++ b/scaleph-ui-react/src/pages/Project/Workspace/Artifact/DI/DiJobFlow/Dag/steps/sink/sink-jdbc-step.tsx @@ -112,6 +112,14 @@ const SinkJdbcStepForm: React.FC + , + }} + /> - , - }} - colProps={{span: 8}} - initialValue={1000} - fieldProps={{ - min: 0, - step: 100 - }} - /> {({format}) => { diff --git a/scaleph-ui-react/src/pages/Project/Workspace/Artifact/DI/DiJobFlow/Dag/steps/sink/sink-mongodb-step.tsx b/scaleph-ui-react/src/pages/Project/Workspace/Artifact/DI/DiJobFlow/Dag/steps/sink/sink-mongodb-step.tsx index 15d8430c0..043bcbd8f 100644 --- a/scaleph-ui-react/src/pages/Project/Workspace/Artifact/DI/DiJobFlow/Dag/steps/sink/sink-mongodb-step.tsx +++ b/scaleph-ui-react/src/pages/Project/Workspace/Artifact/DI/DiJobFlow/Dag/steps/sink/sink-mongodb-step.tsx @@ -6,14 +6,10 @@ import {Button, Drawer, Form, message} from 'antd'; import {WsDiJob} from '@/services/project/typings'; import {getIntl, getLocale} from 'umi'; import {useEffect} from 'react'; -import { - ProForm, - ProFormDigit, - ProFormSwitch, - ProFormText -} from '@ant-design/pro-components'; +import {ProForm, ProFormDigit, ProFormSwitch, ProFormText} from '@ant-design/pro-components'; import DataSourceItem from '@/pages/Project/Workspace/Artifact/DI/DiJobFlow/Dag/steps/dataSource'; import FieldItem from "@/pages/Project/Workspace/Artifact/DI/DiJobFlow/Dag/steps/fields"; +import {StepSchemaService} from "@/pages/Project/Workspace/Artifact/DI/DiJobFlow/Dag/steps/helper"; const SinkMongoDBStepForm: React.FC< ModalFormProps<{ @@ -49,6 +45,7 @@ const SinkMongoDBStepForm: React.FC< map.set(STEP_ATTR_TYPE.jobId, jobInfo.id); map.set(STEP_ATTR_TYPE.jobGraph, JSON.stringify(jobGraph)); map.set(STEP_ATTR_TYPE.stepCode, nodeInfo.id); + StepSchemaService.formatSchema(values) map.set(STEP_ATTR_TYPE.stepAttrs, values); WsDiJobService.saveStepAttr(map).then((resp) => { if (resp.success) { @@ -108,6 +105,7 @@ const SinkMongoDBStepForm: React.FC< - { - return DictDataService.listDictDataByType2(DICT_TYPE.seatunnelFakeMode) - }} - /> + { + return DictDataService.listDictDataByType2(DICT_TYPE.seatunnelFakeMode) + }} + /> + + + + + + + + + + diff --git a/scaleph-ui-react/src/pages/Project/Workspace/Artifact/DI/DiJobFlow/Dag/steps/source/source-jdbc-step.tsx b/scaleph-ui-react/src/pages/Project/Workspace/Artifact/DI/DiJobFlow/Dag/steps/source/source-jdbc-step.tsx index d7fbda733..adc175758 100644 --- a/scaleph-ui-react/src/pages/Project/Workspace/Artifact/DI/DiJobFlow/Dag/steps/source/source-jdbc-step.tsx +++ b/scaleph-ui-react/src/pages/Project/Workspace/Artifact/DI/DiJobFlow/Dag/steps/source/source-jdbc-step.tsx @@ -98,6 +98,14 @@ const SourceJdbcStepForm: React.FC + , + }} + /> - + + {({pattern}) => { + if (pattern) { + return ( + , + }} + initialValue={-1} + /> + ); + } + return ; + }} + + {({format}) => { diff --git a/scaleph-ui-react/src/pages/Project/Workspace/Artifact/DI/DiJobFlow/Dag/steps/source/source-local-file-step.tsx b/scaleph-ui-react/src/pages/Project/Workspace/Artifact/DI/DiJobFlow/Dag/steps/source/source-local-file-step.tsx index 7f7b7c1e1..847103080 100644 --- a/scaleph-ui-react/src/pages/Project/Workspace/Artifact/DI/DiJobFlow/Dag/steps/source/source-local-file-step.tsx +++ b/scaleph-ui-react/src/pages/Project/Workspace/Artifact/DI/DiJobFlow/Dag/steps/source/source-local-file-step.tsx @@ -72,6 +72,10 @@ const SourceLocalFileStepForm: React.FC + > = ({data, visible, onCancel, onOK}) => { const nodeInfo = data.node.data; @@ -75,45 +75,63 @@ const SourceMongoDBStepForm: React.FC - + + + - ); diff --git a/scaleph-ui-react/src/pages/Project/Workspace/Artifact/DI/DiJobFlow/Dag/steps/source/source-oss-file-step.tsx b/scaleph-ui-react/src/pages/Project/Workspace/Artifact/DI/DiJobFlow/Dag/steps/source/source-oss-file-step.tsx index b860ac599..b6dfd5297 100644 --- a/scaleph-ui-react/src/pages/Project/Workspace/Artifact/DI/DiJobFlow/Dag/steps/source/source-oss-file-step.tsx +++ b/scaleph-ui-react/src/pages/Project/Workspace/Artifact/DI/DiJobFlow/Dag/steps/source/source-oss-file-step.tsx @@ -74,6 +74,10 @@ const SourceOSSFileStepForm: React.FC + + + +