We need several steps to setup a Flink cluster with the provided connector.
- Setup a Flink cluster with version 1.14+ and Java 8+ installed.
- Download the connector SQL jars from the Download page (or build yourself).
- Put the downloaded jars under FLINK_HOME/lib/.
- Restart the Flink cluster.
The example shows how to create a table using the connector.
-- create a table source with json format
CREATE TEMPORARY TABLE basic_qot (
security row<code string, market int>,
openPrice double,
lowPrice double,
highPrice double,
curPrice double,
volume double,
updateTime string
)
WITH (
'subType'='Basic',
'codes' = 'HK|00700,HSI2302',
'connector' = 'FutuOpenD',
'hostname' = 'futu-opend.default',
'port' = '11111',
'format' = 'json'
);
-- read update basic quote data
select * from basic_qot;Or we can also use the raw format to simply view the data structure of each subType
CREATE TEMPORARY TABLE basic_qot (
data string
)
WITH (
'subType'='Basic',
'codes' = 'HK|00700,HSI2302',
'connector' = 'FutuOpenD',
'hostname' = 'futu-opend.default',
'port' = '11111',
'format' = 'raw'
);| Option | Required | Default | Description |
|---|---|---|---|
| connector | required | (none) | Specify what connector to use, here should be 'FutuOpenD'. |
| hostname | required | 127.0.0.1 | The hostname of the FutuOpenD server. |
| port | required | 11111 | The port of the FutuOpenD server. |
| subType | required | Basic | The subType of the data source. *(1) |
| codes | required | HK|00700,HSI2302 | The codes of the data source. *(2) |
| format | required | json | The format of the data source. Currently only supports json and raw formats |
- Value refer to enum
SubTypein the FutuOpenD Api #subType. and removes the prefixSubType_. e.g.SubType_Basic->Basic,SubType_OrderBook->OrderBook,SubType_KL_5Min->KL_5Min
- Same markets codes separated by commas, e.g.
HK|00700,HSI2302. - Different markets codes separated by semicolons, e.g.
HK|00700,HSI2302;US|AAPL,BA.