Kafa topic에서 데이터를 가져오고 2개의 Topic을 조인할 수 있는 간단한 커넥터.
Flink의 Session mode로 실행
Read Kafka topic, Sink kafka Topic, Join Topics log data using flink
- kafka source / kafka sink
- Java : 11
- Flink : 1.16.1-bin-scala_2.12
src/main/java/examples/JoinTopics
- Java Class 내 Kafka Property 값을 변경
- Change kafka Property for your system
prop.setProperty("BOOTSTRAP_SERVERS", "localhost:9092");
prop.setProperty("FIRST_TOPIC", { your source topic name });
prop.setProperty("SECOND_TOPIC", { your source topic name });
prop.setProperty("JOIN_TOPIC", { your sink topic name });
- 사용할 Join SQL 쿼리문 변경
- Change Join SQL query
Table result = tableEnv.sqlQuery( "{ set your Query }");
- model, schema 폴더에 POJO 및 serialize , deserialize 코드 추가
- Maven Packaing
.jar
file
src/main/java/examples/ReadTopics
- Change target topic serialization
Change in pom.xml
file
<mainClass>{ target main classpath }</mainClass>
.
├── README.md
├── data
├── pom.xml
├── src
├── main
│ ├── java
│ │ └── examples
│ │ ├── JoinTopics.java
│ │ ├── ReadTopic.java
│ │ ├── model
│ │ │ ├── AddressInfo.java
│ │ │ ├── AddressTopic.java
│ │ │ ├── GetPersonNameTopic.java
│ │ │ ├── JoinTopic.java
│ │ │ └── PersonTopic.java
│ │ └── schema
│ │ ├── AddressTopicDeserializationSchema.java
│ │ ├── GetPersonNameSerializationSchema.java
│ │ ├── JoinTopicSerializationSchema.java
│ │ └── PersonTopicDeserializationSchema.java
│ └── resources
│ └── log4j.properties
└── test
- model 디렉토리
- Topic의 POJO 코드
- Log 데이터 포맷에 맞게 코드 수정 및 추가
- schema
- Serialization / Deserialization 을 위한 코드
Run flink cluster
- Flink Dashboard
./bin/start-cluster.sh
Stop flink cluster
./bin/stop-cluster.sh
Submit Job
- Check Running Job List on Dashboard
./bin/flink run <job name>