Skip to content

[CORE-12817] - Hook kafka::client::cluster into link #26857

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 15 commits into
base: dev
Choose a base branch
from

Conversation

michael-redpanda
Copy link
Contributor

This PR introduces the cluster_link::remote_cluster_connection abstract class that is used by cluster_link::link and relevant tasks to communicate with the source cluster.

Backports Required

  • none - not a bug fix
  • none - this is a backport
  • none - issue does not exist in previous branches
  • none - papercut/not impactful enough to backport
  • v25.2.x
  • v25.1.x
  • v24.3.x
  • v24.2.x

Release Notes

  • None

Copilot

This comment was marked as outdated.

@michael-redpanda michael-redpanda force-pushed the dr/task/auto-topic/core-12817 branch from 878bb8e to 7897ce2 Compare July 16, 2025 20:28
@michael-redpanda
Copy link
Contributor Author

Force push:

  • Added client_id to model config

@michael-redpanda michael-redpanda requested a review from Copilot July 16, 2025 20:31
Copilot

This comment was marked as outdated.

@vbotbuildovich
Copy link
Collaborator

vbotbuildovich commented Jul 17, 2025

CI test results

test results on build#69127
test_class test_method test_arguments test_kind job_url test_status passed reason
DataMigrationsApiTest test_creating_and_listing_migrations ducktape https://buildkite.com/redpanda/redpanda/builds/69127#0198151f-1371-4f06-90bf-dc7c4d2c7035 FLAKY 20/21 upstream reliability is '100.0'. current run reliability is '95.23809523809523'. drift is 4.7619 and the allowed drift is set to 50. The test should PASS
DatalakeE2ETests test_json_schema_unicode {"catalog_type": "rest_hadoop", "cloud_storage_type": 1, "query_engine": "trino"} ducktape https://buildkite.com/redpanda/redpanda/builds/69127#0198151f-1371-4f06-90bf-dc7c4d2c7035 FLAKY 19/21 upstream reliability is '98.18181818181819'. current run reliability is '90.47619047619048'. drift is 7.70563 and the allowed drift is set to 50. The test should PASS
RandomNodeOperationsTest test_node_operations {"cloud_storage_type": 1, "compaction_mode": "sliding_window", "enable_failures": false, "mixed_versions": false, "with_iceberg": false} ducktape https://buildkite.com/redpanda/redpanda/builds/69127#01981520-4813-476f-9434-221ee3d1efe4 FLAKY 20/21 upstream reliability is '100.0'. current run reliability is '95.23809523809523'. drift is 4.7619 and the allowed drift is set to 50. The test should PASS
test results on build#69201
test_class test_method test_arguments test_kind job_url test_status passed reason
MasterTestSuite test_replica_pair_frequency unit https://buildkite.com/redpanda/redpanda/builds/69201#0198194c-43d5-4769-86ab-de9cffbc14cb FAIL 0/1
DataMigrationsApiTest test_creating_and_listing_migrations ducktape https://buildkite.com/redpanda/redpanda/builds/69201#01981968-73cb-41e3-a72b-f2971405bfe8 FLAKY 20/21 upstream reliability is '100.0'. current run reliability is '95.23809523809523'. drift is 4.7619 and the allowed drift is set to 50. The test should PASS
test results on build#69351
test_class test_method test_arguments test_kind job_url test_status passed reason
NodePoolMigrationTest test_migrating_redpanda_nodes_to_new_pool {"balancing_mode": "node_add", "cleanup_policy": "compact,delete", "test_mode": "tiered_storage"} ducktape https://buildkite.com/redpanda/redpanda/builds/69351#01982d99-c01f-4bb4-b4c3-9608ef03e748 FLAKY 20/21 upstream reliability is '100.0'. current run reliability is '95.23809523809523'. drift is 4.7619 and the allowed drift is set to 50. The test should PASS

@michael-redpanda michael-redpanda force-pushed the dr/task/auto-topic/core-12817 branch from 7897ce2 to 1301ddd Compare July 17, 2025 13:42
@michael-redpanda michael-redpanda requested a review from Copilot July 17, 2025 13:42
@michael-redpanda
Copy link
Contributor Author

Force push:

  • Rebased off dev to fix merge conflicts

Copilot

This comment was marked as outdated.

@michael-redpanda michael-redpanda force-pushed the dr/task/auto-topic/core-12817 branch from 1301ddd to 9010228 Compare July 17, 2025 14:31
@michael-redpanda
Copy link
Contributor Author

Force push:

  • Fixed rebase snafu

@michael-redpanda michael-redpanda force-pushed the dr/task/auto-topic/core-12817 branch from 9010228 to 001c62f Compare July 17, 2025 15:53
Copy link
Member

@oleiman oleiman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all looks pretty sensible to me. handful of nits and questions.

};

struct topic_metadata {
int32_t authorized_operations
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: i agree with the bot. i had to look this up.

Comment on lines 202 to 204
vassert(
std::holds_alternative<model::tls_file_path>(
link_connection.key.value()),
Copy link
Member

@oleiman oleiman Jul 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

^^ this (Copilot's suggestion) is phrased stupidly, but it might be a bit neater w/ a visitor that takes multiple variant instances

like this:

struct visitor {
    auto operator()(model::tls_file_path&, model::tls_file_path&) {
        // OK!
        return stuff;
    }
    ....
    template<typename T1, typename T2>
    requires(!std::is_same_v<T1, T2>)
    auto operator()(T1&, T2&) {
        vassert(false, "OH NO!");
    }
};

then you can call it like

auto my_stuff = std::visit(visitor{}, connection.cert.value(), connection.key.value());

Comment on lines 202 to 204
vassert(
std::holds_alternative<model::tls_file_path>(
link_connection.key.value()),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my other question is whether this should actually assert. where is the sameness of the key & cert variant types enforced, and if violated does that indicate a logic error in the code?

@@ -49,6 +52,13 @@ class topic_cache {
std::optional<kafka::leader_epoch>
leader_epoch(model::topic_partition_view) const;

chunked_vector<model::topic> topics() const;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need a copy ? It would be better to return topics_t const ref

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also copilot suggest using view, it is also an option here

/// The CA file to use
std::optional<tls_file_or_value> ca;
/// The client ID to use
ss::sstring client_id;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be autogenerated by the cluster linking subsystem f.e. based on target cluster id. This way we will be able to identify the client logs related with a certain link

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking of using the cluster link name as part of the client id

@@ -105,6 +105,11 @@ struct scram_credentials
auto serde_fields() { return std::tie(username, password, mechanism); }
};

using tls_file_path = named_type<ss::sstring, struct tls_file_path_tag>;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we even need paths ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess depends on how the customer wants to deploy the solution... the option is there

Copy link
Member

@mmaslankaprv mmaslankaprv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall looks good, left some comments

@michael-redpanda
Copy link
Contributor Author

@mmaslankaprv when I was working on my next task, I discovered that you already kind of built in a 'mock' system within cluster that allows us to provide 'mocked' brokers. I think that's a better system than the remote_cluster_connection I did in here. I'm going to make some changes to use that instead.

cluster::cluster_link::errc error_code) const {
if (config.bootstrap_servers.empty()) {
vlog(
cluster::clusterlog.info,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warn?

virtual ~link_factory() = default;

virtual std::unique_ptr<link> create_link(
::model::node_id self,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: curious why ::model..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because there's a cluster_link::model namespace

Comment on lines 77 to 79
if (res.code() != errc::success) {
co_await handle_exception([this] { return _cluster.stop(); });
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we ensure that anything that is start()-ed should be stop()-ed too (will avoid these patterns).


try {
auto describe_configs_version
= co_await _cluster.supported_api_versions(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whats the retry story? This may be flaky even if one broker doesn't respond.

@michael-redpanda michael-redpanda force-pushed the dr/task/auto-topic/core-12817 branch from de352f3 to 4e5bf97 Compare July 19, 2025 02:01
Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR introduces infrastructure to hook the kafka::client::cluster component into the cluster link system, enabling tasks to communicate with remote clusters. The changes include extending the Kafka client's topic cache with new metadata fields, creating utility functions to convert cluster link metadata to Kafka client configuration, and updating the link and task systems to pass cluster connections through the dependency chain.

  • Extends Kafka client topic cache to track authorized operations and provide new accessor methods
  • Adds utility functions to convert cluster link metadata to Kafka client configuration with TLS and SASL support
  • Updates link and task creation to include cluster connection dependencies
  • Adds comprehensive test coverage for the new utilities and integration points

Reviewed Changes

Copilot reviewed 37 out of 37 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
src/v/kafka/client/topic_cache.h Adds authorized_operations field and new accessor methods
src/v/kafka/client/topic_cache.cc Implements new topic cache methods for operations and partition counts
src/v/kafka/client/test/direct_consumer_test.cc Updates test mocks to use new topic_data structure
src/v/kafka/client/test/cluster_test_with_mock.cc Adds test for topic metadata functionality
src/v/kafka/client/test/cluster_mock.h Extends mock to support topic metadata with authorized operations
src/v/kafka/client/test/cluster_mock.cc Implements topic metadata support in cluster mock
src/v/kafka/client/cluster.cc Enables topic authorized operations in metadata requests
src/v/kafka/client/BUILD Makes cluster library publicly visible
src/v/cluster_link/utils.h Declares utility for converting metadata to Kafka config
src/v/cluster_link/utils.cc Implements metadata to Kafka config conversion with TLS/SASL support
src/v/cluster_link/tests/utils_test.cc Comprehensive tests for utility functions
src/v/cluster_link/task.h Updates task constructor to accept link pointer
src/v/cluster_link/task.cc Implements link pointer storage in task base class
src/v/cluster_link/link.h Updates link to store cluster connection
src/v/cluster_link/link.cc Implements cluster connection lifecycle management
src/v/cluster_link/manager.h Updates manager to use cluster factory
src/v/cluster_link/manager.cc Implements cluster factory integration
src/v/cluster_link/deps.h Defines cluster factory and updated link factory interfaces
src/v/cluster_link/deps.cc Implements default cluster factory
src/v/cluster_link/model/types.h Updates connection config with new TLS field types
src/v/cluster_link/model/types.cc Implements formatters for new TLS types
Comments suppressed due to low confidence (1)

src/v/cluster_link/tests/utils_test.cc:86

  • [nitpick] Using EXPECT_DEATH in unit tests can be fragile and may not work reliably in all environments. Consider testing the error condition through a more controlled mechanism.
    EXPECT_DEATH(


template<typename T1, typename T2>
requires(!std::is_same_v<T1, T2>)
kafka::client::key_store operator()(const T1&, const T2&) {
Copy link
Preview

Copilot AI Jul 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The requires clause and vassert combination creates complex error handling. Consider using a more explicit approach with clear error types or exceptions that provide better diagnostic information.

Copilot uses AI. Check for mistakes.

}

if (it != end && *it != '}') {
throw fmt::format_error("invalid format specifier for principal");
Copy link
Preview

Copilot AI Jul 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message mentions 'principal' but this formatter is for tls_file_or_value. The error message should be updated to reflect the correct type being formatted.

Suggested change
throw fmt::format_error("invalid format specifier for principal");
throw fmt::format_error("invalid format specifier for tls_file_or_value");

Copilot uses AI. Check for mistakes.

}

if (it != end && *it != '}') {
throw fmt::format_error("invalid format specifier for principal");
Copy link
Preview

Copilot AI Jul 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message mentions 'principal' but this formatter is for optional<tls_file_or_value>. The error message should be updated to reflect the correct type being formatted.

Suggested change
throw fmt::format_error("invalid format specifier for principal");
throw fmt::format_error("invalid format specifier for optional<tls_file_or_value>");

Copilot uses AI. Check for mistakes.

@michael-redpanda michael-redpanda force-pushed the dr/task/auto-topic/core-12817 branch 2 times, most recently from 135c05a to f27d2ab Compare July 20, 2025 02:13
Added named types for topic and cluster authorized operations field in
metadata response.

Signed-off-by: Michael Boquard <[email protected]>
@michael-redpanda michael-redpanda force-pushed the dr/task/auto-topic/core-12817 branch 2 times, most recently from 5e73d0b to cc730c2 Compare July 21, 2025 11:55
TLS settings can now be provided via a file path or the PEM value

Signed-off-by: Michael Boquard <[email protected]>
Improved validation of TLS configuration

Signed-off-by: Michael Boquard <[email protected]>
Useful for dependencies

Signed-off-by: Michael Boquard <[email protected]>
Signed-off-by: Michael Boquard <[email protected]>
Signed-off-by: Michael Boquard <[email protected]>
Used to create instances of kafka::client::cluster

Signed-off-by: Michael Boquard <[email protected]>
Signed-off-by: Michael Boquard <[email protected]>
Signed-off-by: Michael Boquard <[email protected]>
While looping through the test factories and registering new tasks, it
is possible that the task_factories would be modified while awaiting the
register coroutine.  Lock the task reconciler mutex while registering
new task factories to ensure that doesn't happen.

Signed-off-by: Michael Boquard <[email protected]>
When a task is created, give it a pointer back to the owning link so it
can be referenced by the task.  Will be helpful for when tasks need
access to the remote_cluster_connection instance.

Signed-off-by: Michael Boquard <[email protected]>
@michael-redpanda michael-redpanda force-pushed the dr/task/auto-topic/core-12817 branch from cc730c2 to dfb67d7 Compare July 21, 2025 14:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants