Skip to content

Commit

Permalink
kadm: use cached metadata where possible
Browse files Browse the repository at this point in the history
As a follow up to #800, we convert kadm to using cached metadata
everywhere except for the actual Metadata function.

With a quick local test using `rpk group describe`, this brings the
prior 4 metadata requests down to 1.
  • Loading branch information
twmb committed Jan 22, 2025
1 parent 7ba4756 commit 6eddacd
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 8 deletions.
2 changes: 2 additions & 0 deletions pkg/kadm/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ go 1.21

toolchain go1.22.0

replace github.com/twmb/franz-go => ../../

require (
github.com/twmb/franz-go v1.18.1
github.com/twmb/franz-go/pkg/kmsg v1.9.0
Expand Down
16 changes: 11 additions & 5 deletions pkg/kadm/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func int32s(is []int32) []int32 {
// ListBrokers issues a metadata request and returns BrokerDetails. This
// returns an error if the request fails to be issued, or an *AuthError.
func (cl *Client) ListBrokers(ctx context.Context) (BrokerDetails, error) {
m, err := cl.Metadata(ctx)
m, err := cl.BrokerMetadata(ctx)
if err != nil {
return nil, err
}
Expand All @@ -212,7 +212,7 @@ func (cl *Client) ListBrokers(ctx context.Context) (BrokerDetails, error) {
//
// This returns an error if the request fails to be issued, or an *AuthErr.
func (cl *Client) BrokerMetadata(ctx context.Context) (Metadata, error) {
return cl.metadata(ctx, true, nil)
return cl.metadata(ctx, true, nil, true)
}

// Metadata issues a metadata request and returns it. Specific topics to
Expand All @@ -224,10 +224,10 @@ func (cl *Client) Metadata(
ctx context.Context,
topics ...string,
) (Metadata, error) {
return cl.metadata(ctx, false, topics)
return cl.metadata(ctx, false, topics, false)
}

func (cl *Client) metadata(ctx context.Context, noTopics bool, topics []string) (Metadata, error) {
func (cl *Client) metadata(ctx context.Context, noTopics bool, topics []string, useCache bool) (Metadata, error) {
req := kmsg.NewPtrMetadataRequest()
req.IncludeClusterAuthorizedOperations = true
req.IncludeTopicAuthorizedOperations = true
Expand All @@ -239,7 +239,13 @@ func (cl *Client) metadata(ctx context.Context, noTopics bool, topics []string)
if noTopics {
req.Topics = []kmsg.MetadataRequestTopic{}
}
resp, err := req.RequestWith(ctx, cl.cl)
var resp *kmsg.MetadataResponse
var err error
if useCache {
resp, err = cl.cl.RequestCachedMetadata(ctx, req, -1)
} else {
resp, err = req.RequestWith(ctx, cl.cl)
}
if err != nil {
return Metadata{}, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kadm/topics.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (cl *Client) ListTopicsWithInternal(
ctx context.Context,
topics ...string,
) (TopicDetails, error) {
m, err := cl.Metadata(ctx, topics...)
m, err := cl.metadata(ctx, false, topics, true)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/kadm/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,13 +228,13 @@ func (ds DescribedProducersTopics) EachProducer(fn func(DescribedProducer)) {
// This may return *ShardErrors or *AuthError.
func (cl *Client) DescribeProducers(ctx context.Context, s TopicsSet) (DescribedProducersTopics, error) {
if len(s) == 0 {
m, err := cl.Metadata(ctx)
m, err := cl.metadata(ctx, false, nil, true)
if err != nil {
return nil, err
}
s = m.Topics.TopicsSet()
} else if e := s.EmptyTopics(); len(e) > 0 {
m, err := cl.Metadata(ctx, e...)
m, err := cl.metadata(ctx, false, e, true)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 6eddacd

Please sign in to comment.