Skip to content

Commit 878bb8e

Browse files
cl: Hook in remote_cluster_connection to link
Signed-off-by: Michael Boquard <[email protected]>
1 parent e9e81a9 commit 878bb8e

File tree

10 files changed

+232
-85
lines changed

10 files changed

+232
-85
lines changed

src/v/cluster_link/deps.cc

Lines changed: 69 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -145,69 +145,8 @@ class remote_cluster_connection_factory_impl
145145
link_id);
146146
return nullptr;
147147
}
148-
const auto& link_connection = metadata->get().connection;
149-
kafka::client::connection_configuration config;
150-
config.initial_brokers = link_connection.bootstrap_servers;
151-
if (
152-
link_connection.ca.has_value() || link_connection.cert.has_value()
153-
|| link_connection.key.has_value()) {
154-
kafka::client::tls_configuration tls_cfg;
155-
if (link_connection.ca.has_value()) {
156-
ss::visit(
157-
link_connection.ca.value(),
158-
[&](const model::tls_file_path& path) {
159-
tls_cfg.truststore = std::filesystem::path(path());
160-
},
161-
[&](const model::tls_value& value) {
162-
tls_cfg.truststore = value();
163-
});
164-
}
165-
if (
166-
link_connection.cert.has_value()
167-
&& link_connection.key.has_value()) {
168-
kafka::client::key_store k_store;
169-
ss::visit(
170-
link_connection.cert.value(),
171-
[&](const model::tls_file_path& path) {
172-
vassert(
173-
std::holds_alternative<model::tls_file_path>(
174-
link_connection.key.value()),
175-
"Key must be a file path if cert is a file path");
176-
k_store = kafka::client::key_cert_path{
177-
.key = std::filesystem::path(path()),
178-
.cert = std::filesystem::path(
179-
std::get<model::tls_file_path>(
180-
link_connection.key.value())())};
181-
},
182-
[&](const model::tls_value& value) {
183-
vassert(
184-
std::holds_alternative<model::tls_value>(
185-
link_connection.key.value()),
186-
"Key must be a value if cert is a value");
187-
k_store = kafka::client::key_cert{
188-
.key = value(),
189-
.cert = std::get<model::tls_value>(
190-
link_connection.key.value())()};
191-
});
192-
tls_cfg.k_store = std::move(k_store);
193-
}
194-
config.broker_tls = std::move(tls_cfg);
195-
}
196-
if (link_connection.authn_config.has_value()) {
197-
ss::visit(
198-
link_connection.authn_config.value(),
199-
[&](const model::scram_credentials& creds) {
200-
config.sasl_cfg = kafka::client::sasl_configuration{
201-
.mechanism = creds.mechanism,
202-
.username = creds.username,
203-
.password = creds.password};
204-
});
205-
}
206-
207-
config.client_id = ssx::sformat(
208-
"cluster-link-{}-{}", metadata->get().name, metadata->get().uuid);
209-
210-
return remote_cluster_connection::make_default(std::move(config));
148+
return remote_cluster_connection::make_default(
149+
make_configuration(metadata->get()));
211150
}
212151

213152
private:
@@ -232,4 +171,71 @@ remote_cluster_connection_factory::make_default(
232171
return std::make_unique<remote_cluster_connection_factory_impl>(
233172
std::move(link_registry));
234173
}
174+
175+
kafka::client::connection_configuration
176+
remote_cluster_connection_factory::make_configuration(
177+
const model::metadata& md) {
178+
const auto& link_connection = md.connection;
179+
180+
kafka::client::connection_configuration config;
181+
config.initial_brokers = link_connection.bootstrap_servers;
182+
if (
183+
link_connection.ca.has_value() || link_connection.cert.has_value()
184+
|| link_connection.key.has_value()) {
185+
kafka::client::tls_configuration tls_cfg;
186+
if (link_connection.ca.has_value()) {
187+
ss::visit(
188+
link_connection.ca.value(),
189+
[&](const model::tls_file_path& path) {
190+
tls_cfg.truststore = std::filesystem::path(path());
191+
},
192+
[&](const model::tls_value& value) {
193+
tls_cfg.truststore = value();
194+
});
195+
}
196+
if (
197+
link_connection.cert.has_value() && link_connection.key.has_value()) {
198+
kafka::client::key_store k_store;
199+
ss::visit(
200+
link_connection.cert.value(),
201+
[&](const model::tls_file_path& path) {
202+
vassert(
203+
std::holds_alternative<model::tls_file_path>(
204+
link_connection.key.value()),
205+
"Key must be a file path if cert is a file path");
206+
k_store = kafka::client::key_cert_path{
207+
.key = std::filesystem::path(path()),
208+
.cert = std::filesystem::path(
209+
std::get<model::tls_file_path>(
210+
link_connection.key.value())())};
211+
},
212+
[&](const model::tls_value& value) {
213+
vassert(
214+
std::holds_alternative<model::tls_value>(
215+
link_connection.key.value()),
216+
"Key must be a value if cert is a value");
217+
k_store = kafka::client::key_cert{
218+
.key = value(),
219+
.cert = std::get<model::tls_value>(
220+
link_connection.key.value())()};
221+
});
222+
tls_cfg.k_store = std::move(k_store);
223+
}
224+
config.broker_tls = std::move(tls_cfg);
225+
}
226+
if (link_connection.authn_config.has_value()) {
227+
ss::visit(
228+
link_connection.authn_config.value(),
229+
[&](const model::scram_credentials& creds) {
230+
config.sasl_cfg = kafka::client::sasl_configuration{
231+
.mechanism = creds.mechanism,
232+
.username = creds.username,
233+
.password = creds.password};
234+
});
235+
}
236+
237+
config.client_id = ssx::sformat("cluster-link-{}-{}", md.name, md.uuid);
238+
239+
return config;
240+
}
235241
} // namespace cluster_link

src/v/cluster_link/deps.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,9 @@ class remote_cluster_connection_factory {
100100

101101
static std::unique_ptr<remote_cluster_connection_factory>
102102
make_default(std::unique_ptr<link_registry> link_registry);
103+
104+
static kafka::client::connection_configuration
105+
make_configuration(const model::metadata&);
103106
};
104107

105108
/**
@@ -117,6 +120,7 @@ class link_factory {
117120

118121
virtual std::unique_ptr<link> create_link(
119122
::model::node_id self,
123+
model::id_t link_id,
120124
model::metadata config,
121125
kafka::data::rpc::partition_leader_cache* partition_leader_cache,
122126
kafka::data::rpc::partition_manager* partition_manager)

src/v/cluster_link/link.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,13 @@ link::link(
5252
ss::lowres_clock::duration task_reconciler_interval,
5353
model::metadata config,
5454
partition_leader_cache* partition_leader_cache,
55-
partition_manager* partition_manager)
55+
partition_manager* partition_manager,
56+
std::unique_ptr<remote_cluster_connection> remote_cluster)
5657
: _self(self)
5758
, _config(std::move(config))
5859
, _partition_leader_cache(partition_leader_cache)
5960
, _partition_manager(partition_manager)
61+
, _remote_cluster(std::move(remote_cluster))
6062
, _task_reconciler_interval(task_reconciler_interval) {}
6163

6264
ss::future<> link::start() {

src/v/cluster_link/link.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
#pragma once
1313

14+
#include "cluster_link/deps.h"
1415
#include "cluster_link/model/types.h"
1516
#include "cluster_link/task.h"
1617
#include "cluster_link/types.h"
@@ -29,7 +30,8 @@ class link {
2930
ss::lowres_clock::duration task_reconciler_interval,
3031
model::metadata config,
3132
kafka::data::rpc::partition_leader_cache* partition_leader_cache,
32-
kafka::data::rpc::partition_manager* partition_manager);
33+
kafka::data::rpc::partition_manager* partition_manager,
34+
std::unique_ptr<remote_cluster_connection> remote_cluster);
3335
link(const link&) = delete;
3436
link(link&&) = delete;
3537
link& operator=(const link&) = delete;
@@ -79,6 +81,7 @@ class link {
7981
model::metadata _config;
8082
kafka::data::rpc::partition_leader_cache* _partition_leader_cache;
8183
kafka::data::rpc::partition_manager* _partition_manager;
84+
std::unique_ptr<remote_cluster_connection> _remote_cluster;
8285
notification_list<task_state_change_cb, task_state_notification_id>
8386
_task_state_change_notifications;
8487
ss::lowres_clock::duration _task_reconciler_interval;

src/v/cluster_link/manager.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ ss::future<> manager::handle_on_link_change(model::id_t id) {
130130
auto units = co_await _link_task_reconciler_mutex.get_units(_as);
131131
auto new_link = _link_factory->create_link(
132132
_self,
133+
id,
133134
link_metadata.copy(),
134135
_partition_leader_cache.get(),
135136
_partition_manager.get());

src/v/cluster_link/service.cc

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
#include "cluster/cluster_link/frontend.h"
1515
#include "cluster/partition_manager.h"
16+
#include "cluster_link/deps.h"
1617
#include "cluster_link/link.h"
1718
#include "cluster_link/logger.h"
1819
#include "cluster_link/manager.h"
@@ -52,18 +53,35 @@ class link_registry_adapter : public link_registry {
5253
class default_link_factory : public link_factory {
5354
public:
5455
static constexpr auto link_reconciler_period = 5min;
56+
explicit default_link_factory(
57+
std::unique_ptr<remote_cluster_connection_factory> rccf)
58+
: _rccf(std::move(rccf)) {}
59+
5560
std::unique_ptr<link> create_link(
5661
::model::node_id self,
62+
model::id_t link_id,
5763
model::metadata config,
5864
partition_leader_cache* partition_leader_cache,
5965
partition_manager* partition_manager) override {
66+
auto remote_cluster = _rccf->make_remote_cluster_connection(link_id);
67+
if (!remote_cluster) {
68+
vlog(
69+
cllog.error,
70+
"Failed to create remote cluster connection for link {}",
71+
link_id);
72+
return nullptr;
73+
}
6074
return std::make_unique<link>(
6175
self,
6276
link_reconciler_period,
6377
std::move(config),
6478
partition_leader_cache,
65-
partition_manager);
79+
partition_manager,
80+
std::move(remote_cluster));
6681
}
82+
83+
private:
84+
std::unique_ptr<remote_cluster_connection_factory> _rccf;
6785
};
6886

6987
service::service(
@@ -92,7 +110,9 @@ ss::future<> service::start() {
92110
partition_manager::make_default(
93111
_shard_table, _partition_manager, _smp_group),
94112
std::make_unique<link_registry_adapter>(&_plf->local()),
95-
std::make_unique<default_link_factory>(),
113+
std::make_unique<default_link_factory>(
114+
remote_cluster_connection_factory::make_default(
115+
std::make_unique<link_registry_adapter>(&_plf->local()))),
96116
30s); // Temporary until we have a proper configuration for this
97117

98118
// Register notifications before the manager starts. The manager will have

src/v/cluster_link/tests/BUILD

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@ redpanda_cc_gtest(
3030
":test_deps",
3131
"//src/v/cluster",
3232
"//src/v/cluster/cluster_link/tests:test_lib",
33+
"//src/v/cluster_link:errc",
3334
"//src/v/cluster_link:impl",
35+
"//src/v/kafka/client:configuration",
3436
"//src/v/model",
3537
"//src/v/test_utils:gtest",
3638
"@abseil-cpp//absl/container:flat_hash_map",

src/v/cluster_link/tests/deps.h

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,67 @@
1212
#pragma once
1313

1414
#include "cluster/cluster_link/table.h"
15+
#include "cluster_link/deps.h"
1516
#include "cluster_link/manager.h"
1617
#include "kafka/data/rpc/deps.h"
1718

1819
#include <seastar/util/defer.hh>
1920
namespace cluster_link::tests {
2021

22+
class unimplemented_connection : public remote_cluster_connection {
23+
public:
24+
explicit unimplemented_connection(
25+
kafka::client::connection_configuration config)
26+
: _config(std::move(config)) {}
27+
28+
ss::future<err_info> start() override { co_return err_info(errc::success); }
29+
ss::future<> stop() override { return ss::now(); }
30+
31+
kafka::client::topic_cache& get_topics() override {
32+
throw std::runtime_error("not implemented");
33+
}
34+
35+
ss::future<> request_metadata_update() override {
36+
throw std::runtime_error("not implemented");
37+
}
38+
39+
bool is_connected() const override { return true; }
40+
41+
ss::future<kafka::describe_configs_response> describe_topics(
42+
chunked_vector<::model::topic>, chunked_vector<ss::sstring>) override {
43+
throw std::runtime_error("not implemented");
44+
}
45+
46+
const kafka::client::connection_configuration& get_config() const {
47+
return _config;
48+
}
49+
50+
private:
51+
kafka::client::connection_configuration _config;
52+
};
53+
54+
class unimplemented_connection_factory
55+
: public remote_cluster_connection_factory {
56+
public:
57+
explicit unimplemented_connection_factory(std::unique_ptr<link_registry> lr)
58+
: _link_registry(std::move(lr)) {}
59+
60+
std::unique_ptr<remote_cluster_connection>
61+
make_remote_cluster_connection(model::id_t link_id) override {
62+
auto metadata = _link_registry->find_link_by_id(link_id);
63+
if (!metadata) {
64+
throw std::runtime_error(
65+
fmt::format("No link found for id {}", link_id()));
66+
}
67+
return std::make_unique<unimplemented_connection>(
68+
remote_cluster_connection_factory::make_configuration(
69+
metadata->get()));
70+
}
71+
72+
private:
73+
std::unique_ptr<link_registry> _link_registry;
74+
};
75+
2176
class test_link_registry : public link_registry {
2277
public:
2378
explicit test_link_registry(cluster::cluster_link::table* table)
@@ -195,6 +250,8 @@ class cluster_link_manager_test_fixture {
195250

196251
link_factory* get_link_factory() { return _lf; }
197252

253+
ss::sharded<cluster::cluster_link::table>& get_table() { return _table; }
254+
198255
private:
199256
chunked_vector<ss::deferred_action<ss::noncopyable_function<void()>>>
200257
_notification_cleanups;
@@ -208,4 +265,25 @@ class cluster_link_manager_test_fixture {
208265
::model::node_id _self;
209266
model::id_t _next_link_id{0};
210267
};
268+
269+
class test_fixture_rccf : public remote_cluster_connection_factory {
270+
public:
271+
explicit test_fixture_rccf(cluster_link_manager_test_fixture* clmtf)
272+
: _clmtf(clmtf) {}
273+
274+
std::unique_ptr<remote_cluster_connection>
275+
make_remote_cluster_connection(model::id_t link_id) override {
276+
auto md = _clmtf->get_table().local().find_link_by_id(link_id);
277+
if (!md) {
278+
throw std::runtime_error(
279+
fmt::format("No link found for id {}", link_id()));
280+
}
281+
282+
return std::make_unique<unimplemented_connection>(
283+
remote_cluster_connection_factory::make_configuration(md->get()));
284+
}
285+
286+
private:
287+
cluster_link_manager_test_fixture* _clmtf;
288+
};
211289
} // namespace cluster_link::tests

0 commit comments

Comments
 (0)