Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kgo: add Client.RequestCachedMetadata #896

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 124 additions & 38 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
@@ -13,6 +13,7 @@ import (
"errors"
"fmt"
"hash/crc32"
"math"
"math/rand"
"net"
"reflect"
@@ -65,6 +66,7 @@ type Client struct {

controllerIDMu sync.Mutex
controllerID int32
clusterID *string // we piggy back updating clusterID

// The following two ensure that we only have one fetchBrokerMetadata
// at once. This avoids unnecessary broker metadata requests and
@@ -858,11 +860,11 @@ func (cl *Client) fetchBrokerMetadata(ctx context.Context) error {
close(wait.done)
}()

_, _, wait.err = cl.fetchMetadata(ctx, kmsg.NewPtrMetadataRequest(), true)
_, _, wait.err = cl.fetchMetadata(ctx, kmsg.NewPtrMetadataRequest(), true, nil)
return wait.err
}

func (cl *Client) fetchMetadataForTopics(ctx context.Context, all bool, topics []string) (*broker, *kmsg.MetadataResponse, error) {
func (cl *Client) fetchMetadataForTopics(ctx context.Context, all bool, topics []string, intoMapped map[string]mappedMetadataTopic) (*broker, *kmsg.MetadataResponse, error) {
req := kmsg.NewPtrMetadataRequest()
req.AllowAutoTopicCreation = cl.cfg.allowAutoTopicCreation
if all {
@@ -876,10 +878,10 @@ func (cl *Client) fetchMetadataForTopics(ctx context.Context, all bool, topics [
req.Topics = append(req.Topics, reqTopic)
}
}
return cl.fetchMetadata(ctx, req, true)
return cl.fetchMetadata(ctx, req, true, intoMapped)
}

func (cl *Client) fetchMetadata(ctx context.Context, req *kmsg.MetadataRequest, limitRetries bool) (*broker, *kmsg.MetadataResponse, error) {
func (cl *Client) fetchMetadata(ctx context.Context, req *kmsg.MetadataRequest, limitRetries bool, intoMapped map[string]mappedMetadataTopic) (*broker, *kmsg.MetadataResponse, error) {
r := cl.retryable()

// We limit retries for internal metadata refreshes, because these do
@@ -897,12 +899,16 @@ func (cl *Client) fetchMetadata(ctx context.Context, req *kmsg.MetadataRequest,

meta, err := req.RequestWith(ctx, r)
if err == nil {
cl.controllerIDMu.Lock()
if meta.ControllerID >= 0 {
cl.controllerIDMu.Lock()
cl.controllerID = meta.ControllerID
cl.controllerIDMu.Unlock()
}
cl.clusterID = meta.ClusterID
cl.controllerIDMu.Unlock()
cl.updateBrokers(meta.Brokers)

// Cache the mapped metadata, and potentially store each topic in the results.
cl.storeCachedMappedMetadata(meta, intoMapped)
}
return r.last, meta, err
}
@@ -1140,6 +1146,89 @@ func (cl *Client) Request(ctx context.Context, req kmsg.Request) (kmsg.Response,
return merge(resps)
}

// RequestCachedMetadata returns a metadata response, using any cached topic
// data possible. Any topic with data cached longer than 'limit' has its
// metadata updated before being returned. If limit is zero or less,
// MetadataMinAge is used (default 5s).
//
// This function is useful if you run a lot of functions that internally
// fetch metadata to execute. As an example, many functions in the kadm
// package all require metadata to run; those functions use cached metadata
// as much as possible.
//
// This function does *not* return authorized operations, even if the request
// has IncludeClusterAuthorizedOperations or IncludeTopicAuthorizedOperations
// set to true. This function cannot be used to request topics via TopicID;
// the direct topic name must be used.
func (cl *Client) RequestCachedMetadata(ctx context.Context, req *kmsg.MetadataRequest, limit time.Duration) (*kmsg.MetadataResponse, error) {
topics := make([]string, 0, len(req.Topics))
for _, t := range req.Topics {
if t.Topic == nil || *t.Topic == "" {
return nil, errors.New("unable to request cached metadata with a missing topic name (topic IDs are not supported)")
}
topics = append(topics, *t.Topic)
}
mapped, err := cl.fetchMappedMetadata(ctx, topics, true, limit)
if err != nil {
return nil, err
}

// With potentially cached data, we build the response. We deeply clone
// all cached data so that the end user cannot modify internal data.
dups := func(s *string) *string {
if s == nil {
return nil
}
s2 := *s
return &s2
}
dupp := func(p kmsg.MetadataResponseTopicPartition) kmsg.MetadataResponseTopicPartition {
p2 := p
p2.Replicas = append([]int32(nil), p2.Replicas...)
p2.ISR = append([]int32(nil), p2.ISR...)
p2.OfflineReplicas = append([]int32(nil), p2.OfflineReplicas...)
p2.UnknownTags = kmsg.Tags{}
return p2
}
dupt := func(t kmsg.MetadataResponseTopic) kmsg.MetadataResponseTopic {
t2 := t
t2.Topic = dups(t2.Topic)
t2.Partitions = make([]kmsg.MetadataResponseTopicPartition, 0, len(t2.Partitions))
for _, p := range t.Partitions {
t2.Partitions = append(t2.Partitions, dupp(p))
}
t2.AuthorizedOperations = math.MinInt32
t2.UnknownTags = kmsg.Tags{}
return t2
}

resp := kmsg.NewPtrMetadataResponse()

cl.brokersMu.RLock()
for _, b := range cl.brokers {
resp.Brokers = append(resp.Brokers, kmsg.MetadataResponseBroker{
NodeID: b.meta.NodeID,
Host: b.meta.Host,
Port: b.meta.Port,
Rack: b.meta.Rack,
})
}
cl.brokersMu.RUnlock()

cl.controllerIDMu.Lock()
resp.ClusterID = dups(cl.clusterID)
resp.ControllerID = cl.controllerID
cl.controllerIDMu.Unlock()

for _, t := range mapped {
resp.Topics = append(resp.Topics, dupt(t.t))
}

resp.AuthorizedOperations = math.MinInt32

return resp, nil
}

func (cl *Client) retryable() *retryable {
return cl.retryableBrokerFn(func() (*broker, error) { return cl.broker(), nil })
}
@@ -1354,7 +1443,7 @@ func (cl *Client) shardedRequest(ctx context.Context, req kmsg.Request) ([]Respo
case *kmsg.MetadataRequest:
// We hijack any metadata request so as to populate our
// own brokers and controller ID.
br, resp, err := cl.fetchMetadata(ctx, t, false)
br, resp, err := cl.fetchMetadata(ctx, t, false, nil)
return shards(shard(br, req, resp, err)), nil

case kmsg.AdminRequest:
@@ -2383,11 +2472,12 @@ func (cl *Client) maybeDeleteMappedMetadata(unknownTopic bool, ts ...string) (sh
}
}

now := time.Now()
cl.mappedMetaMu.Lock()
defer cl.mappedMetaMu.Unlock()
for _, t := range ts {
tcached, exists := cl.mappedMeta[t]
if exists && (min == 0 || time.Since(tcached.when) > min) {
if exists && (min == 0 || now.Sub(tcached.when) > min) {
shouldRetry = true
delete(cl.mappedMeta, t)
}
@@ -2401,7 +2491,7 @@ func (cl *Client) maybeDeleteMappedMetadata(unknownTopic bool, ts ...string) (sh
// requests that are sharded and use metadata, and the one this benefits most
// is ListOffsets. Likely, ListOffsets for the same topic will be issued back
// to back, so not caching for so long is ok.
func (cl *Client) fetchCachedMappedMetadata(ts ...string) (map[string]mappedMetadataTopic, []string) {
func (cl *Client) fetchCachedMappedMetadata(limit time.Duration, ts ...string) (map[string]mappedMetadataTopic, []string) {
cl.mappedMetaMu.Lock()
defer cl.mappedMetaMu.Unlock()
if cl.mappedMeta == nil {
@@ -2410,9 +2500,13 @@ func (cl *Client) fetchCachedMappedMetadata(ts ...string) (map[string]mappedMeta
cached := make(map[string]mappedMetadataTopic)
needed := ts[:0]

if limit <= 0 {
limit = cl.cfg.metadataMinAge
}

for _, t := range ts {
tcached, exists := cl.mappedMeta[t]
if exists && time.Since(tcached.when) < cl.cfg.metadataMinAge {
if exists && time.Since(tcached.when) < limit {
cached[t] = tcached
} else {
needed = append(needed, t)
@@ -2425,35 +2519,26 @@ func (cl *Client) fetchCachedMappedMetadata(ts ...string) (map[string]mappedMeta
// fetchMappedMetadata provides a convenience type of working with metadata;
// this is garbage heavy, so it is only used in one off requests in this
// package.
func (cl *Client) fetchMappedMetadata(ctx context.Context, topics []string, useCache bool) (map[string]mappedMetadataTopic, error) {
var r map[string]mappedMetadataTopic
func (cl *Client) fetchMappedMetadata(ctx context.Context, topics []string, useCache bool, limit time.Duration) (map[string]mappedMetadataTopic, error) {
var intoMapped map[string]mappedMetadataTopic
needed := topics
if useCache {
r, needed = cl.fetchCachedMappedMetadata(topics...)
intoMapped, needed = cl.fetchCachedMappedMetadata(limit, topics...)
if len(needed) == 0 {
return r, nil
return intoMapped, nil
}
}
if r == nil {
r = make(map[string]mappedMetadataTopic)
}

_, meta, err := cl.fetchMetadataForTopics(ctx, false, needed)
if err != nil {
return nil, err
if intoMapped == nil {
intoMapped = make(map[string]mappedMetadataTopic)
}

// Cache the mapped metadata, and also store each topic in the results.
cl.storeCachedMappedMetadata(meta, func(entry mappedMetadataTopic) {
r[*entry.t.Topic] = entry
})

return r, nil
_, _, err := cl.fetchMetadataForTopics(ctx, false, needed, intoMapped)
return intoMapped, err
}

// storeCachedMappedMetadata caches the fetched metadata in the Client, and calls the onEachTopic callback
// function for each topic in the MetadataResponse.
func (cl *Client) storeCachedMappedMetadata(meta *kmsg.MetadataResponse, onEachTopic func(_ mappedMetadataTopic)) {
func (cl *Client) storeCachedMappedMetadata(meta *kmsg.MetadataResponse, intoMapped map[string]mappedMetadataTopic) {
cl.mappedMetaMu.Lock()
defer cl.mappedMetaMu.Unlock()
if cl.mappedMeta == nil {
@@ -2476,16 +2561,17 @@ func (cl *Client) storeCachedMappedMetadata(meta *kmsg.MetadataResponse, onEachT
t.ps[partition.Partition] = partition
}

if onEachTopic != nil {
onEachTopic(t)
if intoMapped != nil {
intoMapped[*t.t.Topic] = t
}
}
if len(meta.Topics) != len(cl.mappedMeta) {
now := time.Now()
for topic, mapped := range cl.mappedMeta {
if mapped.when.Equal(when) {
continue
}
if time.Since(mapped.when) > cl.cfg.metadataMinAge {
if now.Sub(mapped.when) > cl.cfg.metadataMinAge {
delete(cl.mappedMeta, topic)
}
}
@@ -2599,7 +2685,7 @@ func (cl *listOffsetsSharder) shard(ctx context.Context, kreq kmsg.Request, _ er
for _, topic := range req.Topics {
need = append(need, topic.Topic)
}
mapping, err := cl.fetchMappedMetadata(ctx, need, true)
mapping, err := cl.fetchMappedMetadata(ctx, need, true, 0)
if err != nil {
return nil, false, err
}
@@ -3159,7 +3245,7 @@ func (cl *deleteRecordsSharder) shard(ctx context.Context, kreq kmsg.Request, _
for _, topic := range req.Topics {
need = append(need, topic.Topic)
}
mapping, err := cl.fetchMappedMetadata(ctx, need, true)
mapping, err := cl.fetchMappedMetadata(ctx, need, true, 0)
if err != nil {
return nil, false, err
}
@@ -3280,7 +3366,7 @@ func (cl *offsetForLeaderEpochSharder) shard(ctx context.Context, kreq kmsg.Requ
for _, topic := range req.Topics {
need = append(need, topic.Topic)
}
mapping, err := cl.fetchMappedMetadata(ctx, need, true)
mapping, err := cl.fetchMappedMetadata(ctx, need, true, 0)
if err != nil {
return nil, false, err
}
@@ -3599,7 +3685,7 @@ func (cl *writeTxnMarkersSharder) shard(ctx context.Context, kreq kmsg.Request,
need = append(need, topic.Topic)
}
}
mapping, err := cl.fetchMappedMetadata(ctx, need, true)
mapping, err := cl.fetchMappedMetadata(ctx, need, true, 0)
if err != nil {
return nil, false, err
}
@@ -3921,7 +4007,7 @@ func (cl *alterReplicaLogDirsSharder) shard(ctx context.Context, kreq kmsg.Reque
for topic := range needMap {
need = append(need, topic)
}
mapping, err := cl.fetchMappedMetadata(ctx, need, false) // bypass cache, tricky to manage response
mapping, err := cl.fetchMappedMetadata(ctx, need, false, 0) // bypass cache, tricky to manage response
if err != nil {
return nil, false, err
}
@@ -4069,7 +4155,7 @@ func (cl *describeLogDirsSharder) shard(ctx context.Context, kreq kmsg.Request,
for _, topic := range req.Topics {
need = append(need, topic.Topic)
}
mapping, err := cl.fetchMappedMetadata(ctx, need, false) // bypass cache, tricky to manage response
mapping, err := cl.fetchMappedMetadata(ctx, need, false, 0) // bypass cache, tricky to manage response
if err != nil {
return nil, false, err
}
@@ -4324,7 +4410,7 @@ func (cl *describeProducersSharder) shard(ctx context.Context, kreq kmsg.Request
for _, topic := range req.Topics {
need = append(need, topic.Topic)
}
mapping, err := cl.fetchMappedMetadata(ctx, need, true)
mapping, err := cl.fetchMappedMetadata(ctx, need, true, 0)
if err != nil {
return nil, false, err
}
2 changes: 1 addition & 1 deletion pkg/kgo/group_balancer.go
Original file line number Diff line number Diff line change
@@ -403,7 +403,7 @@ func (g *groupConsumer) balanceGroup(proto string, members []kmsg.JoinGroupRespo
metaTopics = append(metaTopics, topic)
}

_, resp, err := g.cl.fetchMetadataForTopics(g.ctx, false, metaTopics)
_, resp, err := g.cl.fetchMetadataForTopics(g.ctx, false, metaTopics, nil)
if err != nil {
return nil, fmt.Errorf("unable to fetch metadata for group topics: %v", err)
}
7 changes: 1 addition & 6 deletions pkg/kgo/metadata.go
Original file line number Diff line number Diff line change
@@ -535,16 +535,11 @@ func (mp metadataPartition) newPartition(cl *Client, isProduce bool) *topicParti
// fetchTopicMetadata fetches metadata for all reqTopics and returns new
// topicPartitionsData for each topic.
func (cl *Client) fetchTopicMetadata(all bool, reqTopics []string) (map[string]*metadataTopic, error) {
_, meta, err := cl.fetchMetadataForTopics(cl.ctx, all, reqTopics)
_, meta, err := cl.fetchMetadataForTopics(cl.ctx, all, reqTopics, nil)
if err != nil {
return nil, err
}

// Since we've fetched the metadata for some topics we can optimistically cache it
// for mapped metadata too. This may reduce the number of Metadata requests issued
// by the client.
cl.storeCachedMappedMetadata(meta, nil)

topics := make(map[string]*metadataTopic, len(meta.Topics))

// Even if metadata returns a leader epoch, we do not use it unless we