Skip to content

Commit bcdf2a2

Browse files
authored
Merge pull request #223 from aiven/unqualified-table-names
Enable use of unqualified table names in source connector
2 parents 20fc05e + 6afa658 commit bcdf2a2

File tree

13 files changed

+543
-107
lines changed

13 files changed

+543
-107
lines changed

build.gradle

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,6 @@ task integrationTest(type: Test) {
214214
testClassesDirs = sourceSets.integrationTest.output.classesDirs
215215
classpath = sourceSets.integrationTest.runtimeClasspath
216216

217-
// Integration tests run independently from unit tests
218217
dependsOn testClasses, distTar
219218

220219
useJUnitPlatform()

checkstyle/suppressions.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,13 @@
2222
<!-- Legacy suppressions -->
2323
<!-- TODO: must be fixed -->
2424
<suppress checks="CyclomaticComplexity"
25-
files="(BufferedRecords|DataConverter|DatabaseDialect|FieldsMetadata|HanaDialect|JdbcSourceTask|MySqlDatabaseDialect|OracleDatabaseDialect|PostgreSqlDatabaseDialect|PreparedStatementBinder|SqlServerDatabaseDialect|SqliteDatabaseDialect|TimestampIncrementingTableQuerier|VerticaDatabaseDialect|SapHanaDatabaseDialect|TableId|ColumnDefinition|TableMonitorThread).java"/>
25+
files="(BufferedRecords|DataConverter|DatabaseDialect|FieldsMetadata|HanaDialect|JdbcSourceTask|JdbcSourceConnector|MySqlDatabaseDialect|OracleDatabaseDialect|PostgreSqlDatabaseDialect|PreparedStatementBinder|SqlServerDatabaseDialect|SqliteDatabaseDialect|TimestampIncrementingTableQuerier|VerticaDatabaseDialect|SapHanaDatabaseDialect|TableId|ColumnDefinition|TableMonitorThread).java"/>
2626

2727
<suppress checks="ClassDataAbstractionCoupling"
2828
files="(DbDialect|JdbcSourceTask|GenericDatabaseDialect).java"/>
2929

3030
<suppress checks="NPathComplexity"
31-
files="(DataConverter|FieldsMetadata|JdbcSourceTask|GenericDatabaseDialect).java"/>
31+
files="(DataConverter|FieldsMetadata|JdbcSourceTask|JdbcSourceConnector|GenericDatabaseDialect).java"/>
3232

3333
<suppress checks="JavaNCSS"
3434
files="(DataConverter|FieldsMetadata|JdbcSourceTask|GenericDatabaseDialect).java"/>

src/integrationTest/java/io/aiven/kafka/connect/jdbc/AbstractIT.java

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import org.apache.kafka.clients.admin.AdminClient;
2828
import org.apache.kafka.clients.admin.AdminClientConfig;
2929
import org.apache.kafka.clients.admin.NewTopic;
30+
import org.apache.kafka.clients.consumer.ConsumerConfig;
31+
import org.apache.kafka.clients.consumer.KafkaConsumer;
3032
import org.apache.kafka.clients.producer.KafkaProducer;
3133
import org.apache.kafka.clients.producer.ProducerConfig;
3234

@@ -51,6 +53,7 @@ public abstract class AbstractIT {
5153
DockerImageName.parse("confluentinc/cp-kafka")
5254
.withTag(DEFAULT_KAFKA_TAG);
5355
protected static KafkaProducer<String, GenericRecord> producer;
56+
protected static KafkaConsumer<String, GenericRecord> consumer;
5457
@Container
5558
protected KafkaContainer kafkaContainer = new KafkaContainer(DEFAULT_IMAGE_NAME)
5659
.withNetwork(Network.newNetwork())
@@ -69,6 +72,7 @@ void startKafka() throws Exception {
6972
final Path pluginDir = setupPluginDir();
7073
setupKafkaConnect(pluginDir);
7174
producer = createProducer();
75+
consumer = createConsumer();
7276
}
7377

7478
private static Path setupPluginDir() throws Exception {
@@ -85,15 +89,19 @@ private static Path setupPluginDir() throws Exception {
8589
return pluginDir;
8690
}
8791

88-
private void setupKafka() throws Exception {
89-
LOGGER.info("Setup Kafka");
92+
93+
protected void createTopic(final String topic, final int numPartitions) throws Exception {
9094
try (final AdminClient adminClient = createAdminClient()) {
91-
LOGGER.info("Create topic {}", TEST_TOPIC_NAME);
92-
final NewTopic newTopic = new NewTopic(TEST_TOPIC_NAME, 4, (short) 1);
95+
LOGGER.info("Create topic {}", topic);
96+
final NewTopic newTopic = new NewTopic(topic, numPartitions, (short) 1);
9397
adminClient.createTopics(List.of(newTopic)).all().get();
9498
}
9599
}
96100

101+
private void setupKafka() throws Exception {
102+
createTopic(TEST_TOPIC_NAME, 4);
103+
}
104+
97105
protected AdminClient createAdminClient() {
98106
final Properties adminClientConfig = new Properties();
99107
adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
@@ -118,10 +126,24 @@ protected KafkaProducer<String, GenericRecord> createProducer() {
118126
return new KafkaProducer<>(producerProps);
119127
}
120128

129+
protected KafkaConsumer<String, GenericRecord> createConsumer() {
130+
LOGGER.info("Create kafka consumer");
131+
final Map<String, Object> consumerProps = new HashMap<>();
132+
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
133+
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
134+
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
135+
"io.confluent.kafka.serializers.KafkaAvroDeserializer");
136+
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
137+
"io.confluent.kafka.serializers.KafkaAvroDeserializer");
138+
consumerProps.put("schema.registry.url", schemaRegistryContainer.getSchemaRegistryUrl());
139+
return new KafkaConsumer<>(consumerProps);
140+
}
141+
121142
@AfterEach
122143
final void tearDown() {
123144
connectRunner.stop();
124145
producer.close();
146+
consumer.close();
125147

126148
connectRunner.awaitStop();
127149
}

src/integrationTest/java/io/aiven/kafka/connect/jdbc/ConnectRunner.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
3232
import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
3333
import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
34+
import org.apache.kafka.connect.util.ConnectorTaskId;
3435
import org.apache.kafka.connect.util.FutureCallback;
3536

3637
import org.slf4j.Logger;
@@ -102,6 +103,22 @@ public void createConnector(final Map<String, String> config) throws ExecutionEx
102103
assert connectorInfoCreated.created();
103104
}
104105

106+
public void restartTask(final String connector, final int task) throws ExecutionException, InterruptedException {
107+
assert herder != null;
108+
109+
final FutureCallback<Void> cb = new FutureCallback<>(
110+
(error, ignored) -> {
111+
if (error != null) {
112+
LOGGER.error("Failed to restart task {}-{}", connector, task, error);
113+
} else {
114+
LOGGER.info("Restarted task {}-{}", connector, task);
115+
}
116+
});
117+
118+
herder.restartTask(new ConnectorTaskId(connector, task), cb);
119+
cb.get();
120+
}
121+
105122
void stop() {
106123
connect.stop();
107124
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Copyright 2022 Aiven Oy and jdbc-connector-for-apache-kafka project contributors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.connect.jdbc.postgres;
18+
19+
import javax.sql.DataSource;
20+
21+
import java.sql.Connection;
22+
import java.sql.SQLException;
23+
import java.sql.Statement;
24+
import java.util.HashMap;
25+
import java.util.Map;
26+
27+
import io.aiven.kafka.connect.jdbc.AbstractIT;
28+
29+
import org.assertj.core.util.Arrays;
30+
import org.postgresql.ds.PGSimpleDataSource;
31+
import org.testcontainers.containers.PostgreSQLContainer;
32+
import org.testcontainers.junit.jupiter.Container;
33+
import org.testcontainers.utility.DockerImageName;
34+
35+
public class AbstractPostgresIT extends AbstractIT {
36+
37+
public static final String DEFAULT_POSTGRES_TAG = "10.20";
38+
private static final DockerImageName DEFAULT_POSTGRES_IMAGE_NAME =
39+
DockerImageName.parse("postgres")
40+
.withTag(DEFAULT_POSTGRES_TAG);
41+
42+
@Container
43+
protected final PostgreSQLContainer<?> postgreSqlContainer = new PostgreSQLContainer<>(DEFAULT_POSTGRES_IMAGE_NAME);
44+
45+
protected void executeUpdate(final String updateStatement) throws SQLException {
46+
try (final Connection connection = getDatasource().getConnection();
47+
final Statement statement = connection.createStatement()) {
48+
statement.executeUpdate(updateStatement);
49+
}
50+
}
51+
52+
protected DataSource getDatasource() {
53+
final PGSimpleDataSource pgSimpleDataSource = new PGSimpleDataSource();
54+
pgSimpleDataSource.setServerNames(Arrays.array(postgreSqlContainer.getHost()));
55+
pgSimpleDataSource.setPortNumbers(new int[] {postgreSqlContainer.getMappedPort(5432)});
56+
pgSimpleDataSource.setDatabaseName(postgreSqlContainer.getDatabaseName());
57+
pgSimpleDataSource.setUser(postgreSqlContainer.getUsername());
58+
pgSimpleDataSource.setPassword(postgreSqlContainer.getPassword());
59+
return pgSimpleDataSource;
60+
}
61+
62+
protected Map<String, String> basicConnectorConfig() {
63+
final HashMap<String, String> config = new HashMap<>();
64+
config.put("key.converter", "io.confluent.connect.avro.AvroConverter");
65+
config.put("key.converter.schema.registry.url", schemaRegistryContainer.getSchemaRegistryUrl());
66+
config.put("value.converter", "io.confluent.connect.avro.AvroConverter");
67+
config.put("value.converter.schema.registry.url", schemaRegistryContainer.getSchemaRegistryUrl());
68+
config.put("tasks.max", "1");
69+
config.put("connection.url", postgreSqlContainer.getJdbcUrl());
70+
config.put("connection.user", postgreSqlContainer.getUsername());
71+
config.put("connection.password", postgreSqlContainer.getPassword());
72+
config.put("dialect.name", "PostgreSqlDatabaseDialect");
73+
return config;
74+
}
75+
76+
}

src/integrationTest/java/io/aiven/kafka/connect/jdbc/postgres/PartitionedTableIntegrationTest.java

Lines changed: 6 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,9 @@
1616

1717
package io.aiven.kafka.connect.jdbc.postgres;
1818

19-
import javax.sql.DataSource;
20-
21-
import java.sql.Connection;
2219
import java.sql.SQLException;
23-
import java.sql.Statement;
2420
import java.time.Duration;
2521
import java.util.ArrayList;
26-
import java.util.HashMap;
2722
import java.util.List;
2823
import java.util.Map;
2924
import java.util.concurrent.ExecutionException;
@@ -33,27 +28,18 @@
3328
import org.apache.kafka.clients.producer.RecordMetadata;
3429

3530
import io.aiven.connect.jdbc.JdbcSinkConnector;
36-
import io.aiven.kafka.connect.jdbc.AbstractIT;
3731

3832
import org.apache.avro.Schema;
3933
import org.apache.avro.generic.GenericRecord;
40-
import org.assertj.core.util.Arrays;
4134
import org.assertj.db.type.Table;
4235
import org.junit.jupiter.api.Test;
43-
import org.postgresql.ds.PGSimpleDataSource;
44-
import org.testcontainers.containers.PostgreSQLContainer;
45-
import org.testcontainers.junit.jupiter.Container;
46-
import org.testcontainers.junit.jupiter.Testcontainers;
47-
import org.testcontainers.utility.DockerImageName;
4836

4937
import static org.apache.avro.generic.GenericData.Record;
5038
import static org.assertj.db.api.Assertions.assertThat;
5139
import static org.awaitility.Awaitility.await;
5240

53-
@Testcontainers
54-
public class PartitionedTableIntegrationTest extends AbstractIT {
41+
public class PartitionedTableIntegrationTest extends AbstractPostgresIT {
5542

56-
public static final String DEFAULT_POSTGRES_TAG = "10.20";
5743
private static final String CONNECTOR_NAME = "test-sink-connector";
5844
private static final int TEST_TOPIC_PARTITIONS = 1;
5945
private static final Schema VALUE_RECORD_SCHEMA =
@@ -82,17 +68,11 @@ public class PartitionedTableIntegrationTest extends AbstractIT {
8268
private static final String CREATE_PARTITION =
8369
"create table partition partition of \"" + TEST_TOPIC_NAME
8470
+ "\" for values from ('2022-03-03') to ('2122-03-03');";
85-
private static final DockerImageName DEFAULT_POSTGRES_IMAGE_NAME =
86-
DockerImageName.parse("postgres")
87-
.withTag(DEFAULT_POSTGRES_TAG);
88-
89-
@Container
90-
private final PostgreSQLContainer<?> postgreSqlContainer = new PostgreSQLContainer<>(DEFAULT_POSTGRES_IMAGE_NAME);
9171

9272
@Test
9373
final void testBasicDelivery() throws ExecutionException, InterruptedException, SQLException {
9474
executeUpdate(CREATE_TABLE);
95-
connectRunner.createConnector(basicConnectorConfig());
75+
connectRunner.createConnector(basicSinkConnectorConfig());
9676

9777
sendTestData(1000);
9878

@@ -104,31 +84,14 @@ final void testBasicDelivery() throws ExecutionException, InterruptedException,
10484
final void testBasicDeliveryForPartitionedTable() throws ExecutionException, InterruptedException, SQLException {
10585
executeUpdate(CREATE_TABLE_WITH_PARTITION);
10686
executeUpdate(CREATE_PARTITION);
107-
connectRunner.createConnector(basicConnectorConfig());
87+
connectRunner.createConnector(basicSinkConnectorConfig());
10888

10989
sendTestData(1000);
11090

11191
await().atMost(Duration.ofSeconds(15)).pollInterval(Duration.ofMillis(100))
11292
.untilAsserted(() -> assertThat(new Table(getDatasource(), TEST_TOPIC_NAME)).hasNumberOfRows(1000));
11393
}
11494

115-
private void executeUpdate(final String updateStatement) throws SQLException {
116-
try (final Connection connection = getDatasource().getConnection();
117-
final Statement statement = connection.createStatement()) {
118-
statement.executeUpdate(updateStatement);
119-
}
120-
}
121-
122-
public DataSource getDatasource() {
123-
final PGSimpleDataSource pgSimpleDataSource = new PGSimpleDataSource();
124-
pgSimpleDataSource.setServerNames(Arrays.array(postgreSqlContainer.getHost()));
125-
pgSimpleDataSource.setPortNumbers(new int[] {postgreSqlContainer.getMappedPort(5432)});
126-
pgSimpleDataSource.setDatabaseName(postgreSqlContainer.getDatabaseName());
127-
pgSimpleDataSource.setUser(postgreSqlContainer.getUsername());
128-
pgSimpleDataSource.setPassword(postgreSqlContainer.getPassword());
129-
return pgSimpleDataSource;
130-
}
131-
13295
private void sendTestData(final int numberOfRecords) throws InterruptedException, ExecutionException {
13396
final List<Future<RecordMetadata>> sendFutures = new ArrayList<>();
13497
for (int i = 0; i < numberOfRecords; i++) {
@@ -155,21 +118,13 @@ private Record createRecord(final String name, final String value) {
155118
return valueRecord;
156119
}
157120

158-
private Map<String, String> basicConnectorConfig() {
159-
final HashMap<String, String> config = new HashMap<>();
121+
private Map<String, String> basicSinkConnectorConfig() {
122+
final Map<String, String> config = basicConnectorConfig();
160123
config.put("name", CONNECTOR_NAME);
161124
config.put("connector.class", JdbcSinkConnector.class.getName());
162125
config.put("topics", TEST_TOPIC_NAME);
163-
config.put("key.converter", "io.confluent.connect.avro.AvroConverter");
164-
config.put("key.converter.schema.registry.url", schemaRegistryContainer.getSchemaRegistryUrl());
165-
config.put("value.converter", "io.confluent.connect.avro.AvroConverter");
166-
config.put("value.converter.schema.registry.url", schemaRegistryContainer.getSchemaRegistryUrl());
167-
config.put("tasks.max", "1");
168-
config.put("connection.url", postgreSqlContainer.getJdbcUrl());
169-
config.put("connection.user", postgreSqlContainer.getUsername());
170-
config.put("connection.password", postgreSqlContainer.getPassword());
171126
config.put("insert.mode", "insert");
172-
config.put("dialect.name", "PostgreSqlDatabaseDialect");
173127
return config;
174128
}
129+
175130
}

0 commit comments

Comments
 (0)