Skip to content

Commit

Permalink
[Feature][scaleph-ui-react] upgrade seatunnel connectors to 2.3.3 (#612)
Browse files Browse the repository at this point in the history
* feature: upgrade seatunnel neo4j connectors to 2.3.3

* feature: upgrade seatunnel pulsar connector to 2.3.3

* feature: upgrade seatunnel redis connector to 2.3.3

* feature: upgrade seatunnel starrocks connector to 2.3.3

* feature: upgrade seatunnel plugin to 2.3.3
  • Loading branch information
kalencaya authored Aug 28, 2023
1 parent 19df560 commit 5f5ee29
Show file tree
Hide file tree
Showing 15 changed files with 91 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,16 @@ public enum SeaTunnelPluginMapping {
SINK_S3_FILE(SEATUNNEL, SINK, S3_FILE, "connector-file-s3", GA, EXACTLY_ONCE),
SOURCE_OSS_FILE(SEATUNNEL, SOURCE, OSS_FILE, "connector-file-oss", BETA, BATCH, EXACTLY_ONCE, COLUMN_PROJECTION, PARALLELISM),
SINK_OSS_FILE(SEATUNNEL, SINK, OSS_FILE, "connector-file-oss", BETA, EXACTLY_ONCE),
SOURCE_OSS_JINDO_FILE(SEATUNNEL, SOURCE, OSS_JINDO_FILE, "connector-file-oss-jindo", UNKNOWN, BATCH, EXACTLY_ONCE, COLUMN_PROJECTION, PARALLELISM),
SINK_OSS_JINDO_FILE(SEATUNNEL, SINK, OSS_JINDO_FILE, "connector-file-oss-jindo", UNKNOWN, EXACTLY_ONCE),
SOURCE_OSS_JINDO_FILE(SEATUNNEL, SOURCE, OSS_JINDO_FILE, "connector-file-jindo-oss", UNKNOWN, BATCH, EXACTLY_ONCE, COLUMN_PROJECTION, PARALLELISM),
SINK_OSS_JINDO_FILE(SEATUNNEL, SINK, OSS_JINDO_FILE, "connector-file-jindo-oss", UNKNOWN, EXACTLY_ONCE),
SOURCE_COS_FILE(SEATUNNEL, SOURCE, COS_FILE, "connector-file-cos", UNKNOWN, BATCH, EXACTLY_ONCE, COLUMN_PROJECTION, PARALLELISM),
SINK_COS_FILE(SEATUNNEL, SINK, COS_FILE, "connector-file-cos", UNKNOWN, EXACTLY_ONCE),

SOURCE_KAFKA(SEATUNNEL, SOURCE, KAFKA, "connector-kafka", GA, BATCH, STREAM, EXACTLY_ONCE, PARALLELISM),
SINK_KAFKA(SEATUNNEL, SINK, KAFKA, "connector-kafka", GA, EXACTLY_ONCE),
SOURCE_PULSAR(SEATUNNEL, SOURCE, PULSAR, "connector-pulsar", BETA, BATCH, STREAM, EXACTLY_ONCE, COLUMN_PROJECTION, PARALLELISM),
SOURCE_ROCKETMQ(SEATUNNEL, SOURCE, ROCKETMQ, "connector-rocketmq", UNKNOWN, BATCH, STREAM, EXACTLY_ONCE, COLUMN_PROJECTION),
SINK_ROCKETMQ(SEATUNNEL, SINK, ROCKETMQ, "connector-rocketmq", UNKNOWN, EXACTLY_ONCE),
SINK_DATAHUB(SEATUNNEL, SINK, DATAHUB, "connector-datahub", ALPHA),
SOURCE_RABBITMQ(SEATUNNEL, SOURCE, RABBITMQ, "connector-rabbitmq", BETA, STREAM, EXACTLY_ONCE, COLUMN_PROJECTION),
SINK_RABBITMQ(SEATUNNEL, SINK, RABBITMQ, "connector-rabbitmq", BETA),
Expand All @@ -93,6 +97,7 @@ public enum SeaTunnelPluginMapping {
SOURCE_CASSANDRA(SEATUNNEL, SOURCE, CASSANDRA, "connector-cassandra", BETA, BATCH, COLUMN_PROJECTION),
SINK_CASSANDRA(SEATUNNEL, SINK, CASSANDRA, "connector-cassandra", BETA),
SINK_TABLESTORE(SEATUNNEL, SINK, TABLESTORE, "connector-tablestore", ALPHA),
SINK_GOOGLE_FIRE_STORE(SEATUNNEL, SINK, GOOGLE_FIRE_STORE, "connector-google-firestore", UNKNOWN),

SOURCE_MYSQL_CDC(SEATUNNEL, SOURCE, MYSQL_CDC, "connector-cdc-mysql", GA, STREAM, EXACTLY_ONCE, PARALLELISM, SUPPORT_USER_DEFINED_SPLIT),
SOURCE_SQLSERVER_CDC(SEATUNNEL, SOURCE, SQLSERVER_CDC, "connector-cdc-sqlserver", GA, STREAM, EXACTLY_ONCE, PARALLELISM, SUPPORT_USER_DEFINED_SPLIT),
Expand All @@ -109,6 +114,8 @@ public enum SeaTunnelPluginMapping {
SINK_STARROCKS(SEATUNNEL, SINK, STARROCKS, "connector-starrocks", ALPHA),
SOURCE_HUDI(SEATUNNEL, SOURCE, HUDI, "connector-hudi", BETA, BATCH, EXACTLY_ONCE, PARALLELISM),
SOURCE_ICEBERG(SEATUNNEL, SOURCE, ICEBERG, "connector-iceberg", BETA, BATCH, STREAM, EXACTLY_ONCE, COLUMN_PROJECTION, PARALLELISM),
SOURCE_PAIMON(SEATUNNEL, SOURCE, PAIMON, "connector-paimon", UNKNOWN, BATCH),
SINK_PAIMON(SEATUNNEL, SINK, PAIMON, "connector-paimon", UNKNOWN, EXACTLY_ONCE),
SINK_S3REDSHIFT(SEATUNNEL, SINK, S3REDSHIFT, "connector-s3-redshift", GA, EXACTLY_ONCE),
SOURCE_MAXCOMPUTE(SEATUNNEL, SOURCE, MAXCOMPUTE, "connector-maxcompute", ALPHA, BATCH, PARALLELISM),
SINK_MAXCOMPUTE(SEATUNNEL, SINK, MAXCOMPUTE, "connector-maxcompute", ALPHA),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,11 @@ public enum SeaTunnelPluginName implements DictInstance {
S3_FILE("S3File", "S3File"),
OSS_FILE("OssFile", "OssFile"),
OSS_JINDO_FILE("OssJindoFile", "OssJindoFile"),
COS_FILE("CosFile", "CosFile"),

KAFKA("Kafka", "Kafka"),
PULSAR("Pulsar", "Pulsar"),
ROCKETMQ("Rocketmq", "RocketMQ"),
DATAHUB("DataHub", "DataHub"),
RABBITMQ("RabbitMQ", "RabbitMQ"),

Expand All @@ -68,6 +70,7 @@ public enum SeaTunnelPluginName implements DictInstance {
AMAZON_DYNAMODB("AmazonDynamodb", "AmazonDynamodb"),
CASSANDRA("Cassandra", "Cassandra"),
TABLESTORE("Tablestore", "Tablestore"),
GOOGLE_FIRE_STORE("GoogleFirestore", "GoogleFirestore"),

MYSQL_CDC("MySQL-CDC", "MySQL-CDC"),
SQLSERVER_CDC("SqlServer-CDC", "SqlServer-CDC"),
Expand All @@ -81,6 +84,7 @@ public enum SeaTunnelPluginName implements DictInstance {
STARROCKS("StarRocks", "StarRocks"),
HUDI("Hudi", "Hudi"),
ICEBERG("Iceberg", "Iceberg"),
PAIMON("Paimon", "Paimon"),
S3REDSHIFT("S3Redshift", "S3Redshift"),
MAXCOMPUTE("Maxcompute", "MaxCompute"),
HBASE("Hbase", "Hbase"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public enum Neo4jProperties {
.description("maximum transaction retry time(seconds). transaction fail if exceeded")
.type(PropertyType.INT)
.parser(Parsers.LONG_PARSER)
.defaultValue(30L)
.addValidator(Validators.NON_NEGATIVE_INTEGER_VALIDATOR)
.validateAndBuild();

Expand All @@ -96,6 +97,7 @@ public enum Neo4jProperties {
.description("The maximum amount of time to wait for a TCP connection to be established (seconds)")
.type(PropertyType.INT)
.parser(Parsers.LONG_PARSER)
.defaultValue(30L)
.addValidator(Validators.POSITIVE_INTEGER_VALIDATOR)
.validateAndBuild();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public enum Neo4jSourceProperties {
public static final PropertyDescriptor<JsonNode> SCHEMA = new PropertyDescriptor.Builder()
.name("schema")
.description("returned fields of query")
.type(PropertyType.STRING)
.type(PropertyType.OBJECT)
.parser(Parsers.JSON_PARSER)
.properties(Property.Required)
.addValidator(Validators.NON_BLANK_VALIDATOR)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ public PulsarSourcePlugin() {
props.add(CURSOR_RESET_MODE);
props.add(CURSOR_STOP_MODE);
props.add(CURSOR_STOP_TIMESTAMP);
props.add(FORMAT);
props.add(FIELD_DELIMITER);
props.add(SCHEMA);
props.add(CommonProperties.PARALLELISM);
props.add(CommonProperties.RESULT_TABLE_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,24 @@ public enum PulsarSourceProperties {
.addValidator(Validators.NON_BLANK_VALIDATOR)
.validateAndBuild();

public static final PropertyDescriptor<String> FORMAT = new PropertyDescriptor.Builder()
.name("format")
.description("Data format.")
.type(PropertyType.STRING)
.parser(Parsers.STRING_PARSER)
.defaultValue("json")
.allowableValues("json", "text")
.addValidator(Validators.NON_BLANK_VALIDATOR)
.validateAndBuild();

public static final PropertyDescriptor<String> FIELD_DELIMITER = new PropertyDescriptor.Builder<String>()
.name("field_delimiter")
.description("The separator between columns in a row of data. Only needed by text and csv file format")
.type(PropertyType.STRING)
.parser(Parsers.STRING_PARSER)
.addValidator(Validators.NON_BLANK_VALIDATOR)
.validateAndBuild();

public static final PropertyDescriptor<JsonNode> SCHEMA = new PropertyDescriptor.Builder()
.name("schema")
.description("The schema information of upstream data.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public RedisSinkPlugin() {

final List<PropertyDescriptor> props = new ArrayList<>();
props.add(KEY);
props.add(EXPIRE);
props.add(DATA_TYPE);
props.add(FORMAT);
props.add(CommonProperties.PARALLELISM);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@ public enum RedisSinkProperties {
.addValidator(Validators.NON_BLANK_VALIDATOR)
.validateAndBuild();

public static final PropertyDescriptor<Long> EXPIRE = new PropertyDescriptor.Builder()
.name("expire")
.description("Set redis expiration time.")
.type(PropertyType.INT)
.parser(Parsers.LONG_PARSER)
.defaultValue(-1L)
.addValidator(Validators.LONG_VALIDATOR)
.validateAndBuild();

public static final PropertyDescriptor<String> DATA_TYPE = new PropertyDescriptor.Builder()
.name("data_type")
.description("Redis data types")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,6 @@ public enum SocketSinkProperties {
.type(PropertyType.INT)
.defaultValue(3)
.parser(Parsers.INTEGER_PARSER)
.addValidator(Validators.NON_NEGATIVE_INTEGER_VALIDATOR)
.addValidator(Validators.POSITIVE_INTEGER_VALIDATOR)
.validateAndBuild();
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public enum StarRocksProperties {
;

public static final PropertyDescriptor<List<String>> NODE_URLS = new PropertyDescriptor.Builder()
.name("node_urls")
.name("nodeUrls")
.description("StarRocks cluster address")
.type(PropertyType.STRING)
.parser(Parsers.STRING_PARSER)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@
public enum StarRocksSinkProperties {
;

public static final PropertyDescriptor<String> BASE_URL = new PropertyDescriptor.Builder<>()
public static final PropertyDescriptor<String> BASE_URL = new PropertyDescriptor.Builder()
.name("base-url")
.description("The JDBC URL like jdbc:mysql://localhost:9030/ or jdbc:mysql://localhost:9030 or jdbc:mysql://localhost:9030/db")
.properties(Property.Required)
.description("The JDBC URL")
.type(PropertyType.STRING)
.parser(Parsers.STRING_PARSER)
.properties(Property.Required)
.addValidator(Validators.NON_BLANK_VALIDATOR)
.validateAndBuild();

Expand Down
3 changes: 3 additions & 0 deletions scaleph-ui-react/src/locales/zh-CN/pages/project.ts
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,7 @@ export default {
'pages.project.di.step.redis.hashKeyParseMode': 'Hash Key解析模式',
'pages.project.di.step.redis.format': '格式',
'pages.project.di.step.redis.key': '键',
'pages.project.di.step.redis.expire': '过期时间(秒)',

// pulsar
'pages.project.di.step.pulsar.clientServiceUrl': 'Client Service URL',
Expand All @@ -498,6 +499,8 @@ export default {
'pages.project.di.step.pulsar.cursorStopMode': '游标停止模式',
'pages.project.di.step.pulsar.cursorStopMode.tooltip': '"Never"表示流,其他表示批',
'pages.project.di.step.pulsar.cursorStopTimestamp': '游标停止时间戳',
'pages.project.di.step.pulsar.format': '数据格式',
'pages.project.di.step.pulsar.fieldDelimiter': '分隔符',

// datahub
'pages.project.di.step.datahub.endpoint': '终端节点',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,8 @@ export const RedisParams = {
hashKeyParseMode: 'hash_key_parse_mode',
format: 'format',
schema: 'schema',
key: 'key'
key: 'key',
expire: 'expire'
};

export const PulsarParams = {
Expand All @@ -437,7 +438,9 @@ export const PulsarParams = {
cursorStartupTimestamp: 'cursor.startup.timestamp',
cursorResetMode: 'cursor.reset.mode',
cursorStopMode: 'cursor.stop.mode',
cursorStopTimestamp: 'cursor.stop.timestamp'
cursorStopTimestamp: 'cursor.stop.timestamp',
format: 'format',
fieldDelimiter: 'field_delimiter'
};

export const DatahubParams = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import {Button, Drawer, Form, message} from 'antd';
import {WsDiJob} from '@/services/project/typings';
import {getIntl, getLocale} from 'umi';
import {useEffect} from 'react';
import {ProForm, ProFormSelect, ProFormText} from '@ant-design/pro-components';
import {ProForm, ProFormDigit, ProFormSelect, ProFormText} from '@ant-design/pro-components';
import DataSourceItem from '@/pages/Project/Workspace/Artifact/DI/DiJobFlow/Dag/steps/dataSource';

const SinkRedisStepForm: React.FC<
Expand Down Expand Up @@ -69,6 +69,14 @@ const SinkRedisStepForm: React.FC<
label={intl.formatMessage({id: 'pages.project.di.step.redis.key'})}
rules={[{required: true}]}
/>
<ProFormDigit
name={RedisParams.expire}
label={intl.formatMessage({id: 'pages.project.di.step.redis.expire'})}
initialValue={-1}
fieldProps={{
min: -1
}}
/>
<ProFormSelect
name={RedisParams.dataType}
label={intl.formatMessage({id: 'pages.project.di.step.redis.dataType'})}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,29 @@ const SourcePulsarStepForm: React.FC<
icon: <InfoCircleOutlined/>,
}}
/>
<FieldItem/>
<ProFormSelect
name={'format'}
label={intl.formatMessage({id: 'pages.project.di.step.pulsar.format'})}
rules={[{required: true}]}
initialValue={"json"}
options={["json", "text"]}
/>
<ProFormDependency name={['format']}>
{({format}) => {
if (format == 'json') {
return <FieldItem/>
} else if (format == 'text') {
return (
<ProFormText
name={PulsarParams.fieldDelimiter}
label={intl.formatMessage({id: 'pages.project.di.step.pulsar.fieldDelimiter'})}
/>
);
}
return <ProFormGroup/>;
}}
</ProFormDependency>

<ProFormDigit
name={PulsarParams.pollTimeout}
label={intl.formatMessage({id: 'pages.project.di.step.pulsar.pollTimeout'})}
Expand Down

0 comments on commit 5f5ee29

Please sign in to comment.