-
Notifications
You must be signed in to change notification settings - Fork 649
[CORE-12817] - Hook kafka::client::cluster into link #26857
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
[CORE-12817] - Hook kafka::client::cluster into link #26857
Conversation
878bb8e
to
7897ce2
Compare
Force push:
|
CI test resultstest results on build#69127
test results on build#69201
test results on build#69351
|
7897ce2
to
1301ddd
Compare
Force push:
|
1301ddd
to
9010228
Compare
Force push:
|
9010228
to
001c62f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all looks pretty sensible to me. handful of nits and questions.
}; | ||
|
||
struct topic_metadata { | ||
int32_t authorized_operations |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick: i agree with the bot. i had to look this up.
src/v/cluster_link/deps.cc
Outdated
vassert( | ||
std::holds_alternative<model::tls_file_path>( | ||
link_connection.key.value()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
^^ this (Copilot's suggestion) is phrased stupidly, but it might be a bit neater w/ a visitor that takes multiple variant instances
like this:
struct visitor {
auto operator()(model::tls_file_path&, model::tls_file_path&) {
// OK!
return stuff;
}
....
template<typename T1, typename T2>
requires(!std::is_same_v<T1, T2>)
auto operator()(T1&, T2&) {
vassert(false, "OH NO!");
}
};
then you can call it like
auto my_stuff = std::visit(visitor{}, connection.cert.value(), connection.key.value());
src/v/cluster_link/deps.cc
Outdated
vassert( | ||
std::holds_alternative<model::tls_file_path>( | ||
link_connection.key.value()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my other question is whether this should actually assert. where is the sameness of the key & cert variant types enforced, and if violated does that indicate a logic error in the code?
src/v/kafka/client/topic_cache.h
Outdated
@@ -49,6 +52,13 @@ class topic_cache { | |||
std::optional<kafka::leader_epoch> | |||
leader_epoch(model::topic_partition_view) const; | |||
|
|||
chunked_vector<model::topic> topics() const; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need a copy ? It would be better to return topics_t
const ref
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also copilot suggest using view, it is also an option here
/// The CA file to use | ||
std::optional<tls_file_or_value> ca; | ||
/// The client ID to use | ||
ss::sstring client_id; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be autogenerated by the cluster linking subsystem f.e. based on target cluster id. This way we will be able to identify the client logs related with a certain link
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking of using the cluster link name as part of the client id
@@ -105,6 +105,11 @@ struct scram_credentials | |||
auto serde_fields() { return std::tie(username, password, mechanism); } | |||
}; | |||
|
|||
using tls_file_path = named_type<ss::sstring, struct tls_file_path_tag>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we even need paths ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess depends on how the customer wants to deploy the solution... the option is there
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall looks good, left some comments
@mmaslankaprv when I was working on my next task, I discovered that you already kind of built in a 'mock' system within |
cluster::cluster_link::errc error_code) const { | ||
if (config.bootstrap_servers.empty()) { | ||
vlog( | ||
cluster::clusterlog.info, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
warn?
virtual ~link_factory() = default; | ||
|
||
virtual std::unique_ptr<link> create_link( | ||
::model::node_id self, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: curious why ::model..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because there's a cluster_link::model
namespace
src/v/cluster_link/deps.cc
Outdated
if (res.code() != errc::success) { | ||
co_await handle_exception([this] { return _cluster.stop(); }); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we ensure that anything that is start()-ed should be stop()-ed too (will avoid these patterns).
src/v/cluster_link/deps.cc
Outdated
|
||
try { | ||
auto describe_configs_version | ||
= co_await _cluster.supported_api_versions( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
whats the retry story? This may be flaky even if one broker doesn't respond.
de352f3
to
4e5bf97
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces infrastructure to hook the kafka::client::cluster
component into the cluster link system, enabling tasks to communicate with remote clusters. The changes include extending the Kafka client's topic cache with new metadata fields, creating utility functions to convert cluster link metadata to Kafka client configuration, and updating the link and task systems to pass cluster connections through the dependency chain.
- Extends Kafka client topic cache to track authorized operations and provide new accessor methods
- Adds utility functions to convert cluster link metadata to Kafka client configuration with TLS and SASL support
- Updates link and task creation to include cluster connection dependencies
- Adds comprehensive test coverage for the new utilities and integration points
Reviewed Changes
Copilot reviewed 37 out of 37 changed files in this pull request and generated 4 comments.
Show a summary per file
File | Description |
---|---|
src/v/kafka/client/topic_cache.h | Adds authorized_operations field and new accessor methods |
src/v/kafka/client/topic_cache.cc | Implements new topic cache methods for operations and partition counts |
src/v/kafka/client/test/direct_consumer_test.cc | Updates test mocks to use new topic_data structure |
src/v/kafka/client/test/cluster_test_with_mock.cc | Adds test for topic metadata functionality |
src/v/kafka/client/test/cluster_mock.h | Extends mock to support topic metadata with authorized operations |
src/v/kafka/client/test/cluster_mock.cc | Implements topic metadata support in cluster mock |
src/v/kafka/client/cluster.cc | Enables topic authorized operations in metadata requests |
src/v/kafka/client/BUILD | Makes cluster library publicly visible |
src/v/cluster_link/utils.h | Declares utility for converting metadata to Kafka config |
src/v/cluster_link/utils.cc | Implements metadata to Kafka config conversion with TLS/SASL support |
src/v/cluster_link/tests/utils_test.cc | Comprehensive tests for utility functions |
src/v/cluster_link/task.h | Updates task constructor to accept link pointer |
src/v/cluster_link/task.cc | Implements link pointer storage in task base class |
src/v/cluster_link/link.h | Updates link to store cluster connection |
src/v/cluster_link/link.cc | Implements cluster connection lifecycle management |
src/v/cluster_link/manager.h | Updates manager to use cluster factory |
src/v/cluster_link/manager.cc | Implements cluster factory integration |
src/v/cluster_link/deps.h | Defines cluster factory and updated link factory interfaces |
src/v/cluster_link/deps.cc | Implements default cluster factory |
src/v/cluster_link/model/types.h | Updates connection config with new TLS field types |
src/v/cluster_link/model/types.cc | Implements formatters for new TLS types |
Comments suppressed due to low confidence (1)
src/v/cluster_link/tests/utils_test.cc:86
- [nitpick] Using EXPECT_DEATH in unit tests can be fragile and may not work reliably in all environments. Consider testing the error condition through a more controlled mechanism.
EXPECT_DEATH(
|
||
template<typename T1, typename T2> | ||
requires(!std::is_same_v<T1, T2>) | ||
kafka::client::key_store operator()(const T1&, const T2&) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The requires clause and vassert combination creates complex error handling. Consider using a more explicit approach with clear error types or exceptions that provide better diagnostic information.
Copilot uses AI. Check for mistakes.
} | ||
|
||
if (it != end && *it != '}') { | ||
throw fmt::format_error("invalid format specifier for principal"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error message mentions 'principal' but this formatter is for tls_file_or_value. The error message should be updated to reflect the correct type being formatted.
throw fmt::format_error("invalid format specifier for principal"); | |
throw fmt::format_error("invalid format specifier for tls_file_or_value"); |
Copilot uses AI. Check for mistakes.
} | ||
|
||
if (it != end && *it != '}') { | ||
throw fmt::format_error("invalid format specifier for principal"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error message mentions 'principal' but this formatter is for optional<tls_file_or_value>. The error message should be updated to reflect the correct type being formatted.
throw fmt::format_error("invalid format specifier for principal"); | |
throw fmt::format_error("invalid format specifier for optional<tls_file_or_value>"); |
Copilot uses AI. Check for mistakes.
135c05a
to
f27d2ab
Compare
Added named types for topic and cluster authorized operations field in metadata response. Signed-off-by: Michael Boquard <[email protected]>
5e73d0b
to
cc730c2
Compare
Signed-off-by: Michael Boquard <[email protected]>
TLS settings can now be provided via a file path or the PEM value Signed-off-by: Michael Boquard <[email protected]>
Improved validation of TLS configuration Signed-off-by: Michael Boquard <[email protected]>
Signed-off-by: Michael Boquard <[email protected]>
Useful for dependencies Signed-off-by: Michael Boquard <[email protected]>
Signed-off-by: Michael Boquard <[email protected]>
Signed-off-by: Michael Boquard <[email protected]>
Signed-off-by: Michael Boquard <[email protected]>
Used to create instances of kafka::client::cluster Signed-off-by: Michael Boquard <[email protected]>
Signed-off-by: Michael Boquard <[email protected]>
Signed-off-by: Michael Boquard <[email protected]>
Signed-off-by: Michael Boquard <[email protected]>
While looping through the test factories and registering new tasks, it is possible that the task_factories would be modified while awaiting the register coroutine. Lock the task reconciler mutex while registering new task factories to ensure that doesn't happen. Signed-off-by: Michael Boquard <[email protected]>
When a task is created, give it a pointer back to the owning link so it can be referenced by the task. Will be helpful for when tasks need access to the remote_cluster_connection instance. Signed-off-by: Michael Boquard <[email protected]>
cc730c2
to
dfb67d7
Compare
This PR introduces the
cluster_link::remote_cluster_connection
abstract class that is used bycluster_link::link
and relevant tasks to communicate with the source cluster.Backports Required
Release Notes