Skip to content

Commit 784adc1

Browse files
committed
kcl: update deps & fix api breakage from franz-go 0.8.0
1 parent 9a9984d commit 784adc1

File tree

6 files changed

+58
-41
lines changed

6 files changed

+58
-41
lines changed

client/client.go

+23-4
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,10 @@ type Cfg struct {
6969

7070
// Client contains kgo client options and a kgo client.
7171
type Client struct {
72-
opts []kgo.Opt
73-
once sync.Once
74-
client *kgo.Client
72+
opts []kgo.Opt
73+
once sync.Once
74+
client *kgo.Client
75+
txnSess *kgo.GroupTransactSession
7576

7677
logLevel string
7778
logFile string
@@ -151,6 +152,15 @@ func (c *Client) Client() *kgo.Client {
151152
return c.client
152153
}
153154

155+
// GroupTransactSession returns a new kgo.GroupTransactSession using all
156+
// buffered options.
157+
//
158+
// This can only be used once, and is incompatible with the Client function.
159+
func (c *Client) GroupTransactSession() *kgo.GroupTransactSession {
160+
c.loadTxnSessOnce()
161+
return c.txnSess
162+
}
163+
154164
// DiskCfg returns the loaded disk configuration.
155165
func (c *Client) DiskCfg() Cfg {
156166
c.loadClientOnce()
@@ -165,11 +175,12 @@ func (c *Client) DefaultCfgPath() string {
165175
// RemakeWithOpts remakes the client with additional opts added. The opts are
166176
// not persisted to the overall Client opts, but the created client does
167177
// persist. This is not concurrent safe.
168-
func (c *Client) RemakeWithOpts(opts ...kgo.Opt) {
178+
func (c *Client) RemakeWithOpts(opts ...kgo.Opt) *kgo.Client {
169179
var err error
170180
c.client.Close()
171181
c.client, err = kgo.NewClient(append(c.opts, opts...)...)
172182
out.MaybeDie(err, "unable to load client: %v", err)
183+
return c.client
173184
}
174185

175186
func (c *Client) loadClientOnce() {
@@ -180,6 +191,14 @@ func (c *Client) loadClientOnce() {
180191
out.MaybeDie(err, "unable to load client: %v", err)
181192
})
182193
}
194+
func (c *Client) loadTxnSessOnce() {
195+
c.once.Do(func() {
196+
c.fillOpts()
197+
var err error
198+
c.txnSess, err = kgo.NewGroupTransactSession(c.opts...)
199+
out.MaybeDie(err, "unable to load group transact session: %v", err)
200+
})
201+
}
183202

184203
func (c *Client) fillOpts() {
185204
c.parseCfgFile() // loads config file if needed

commands/admin/txn/unhang.go

+8-7
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414
"github.com/twmb/kcl/out"
1515
)
1616

17-
func unstickLSO(cl *client.Client) *cobra.Command {
17+
func unstickLSO(kcl *client.Client) *cobra.Command {
1818
return &cobra.Command{
1919
Use: "unstick-lso",
2020
Aliases: []string{"unhang-lso"},
@@ -39,12 +39,13 @@ anyway).
3939
Args: cobra.ExactArgs(1),
4040

4141
Run: func(_ *cobra.Command, topics []string) {
42-
unstick(cl.Client(), topics[0])
42+
unstick(kcl, topics[0])
4343
},
4444
}
4545
}
4646

47-
func unstick(cl *kgo.Client, topic string) {
47+
func unstick(kcl *client.Client, topic string) {
48+
cl := kcl.Client()
4849
defer cl.Close()
4950

5051
var nparts int
@@ -140,7 +141,7 @@ func unstick(cl *kgo.Client, topic string) {
140141
{
141142
// First find the stuck LSO.
142143
fmt.Printf("Looking for the last stable offset on a partition in %s...\n", topic)
143-
cl.AssignPartitions(kgo.ConsumeTopics(kgo.NewOffset().AtStart(), topic))
144+
cl = kcl.RemakeWithOpts(kgo.ConsumeTopics(topic))
144145
var found bool
145146
for !found && len(possibleParts) > 0 {
146147
fetches := cl.PollFetches(context.Background())
@@ -171,7 +172,7 @@ func unstick(cl *kgo.Client, topic string) {
171172
// Now we directly consume at that offset to find which
172173
// producer ID and epoch caused this.
173174
fmt.Printf("Found partition %d stuck at offset %d, looking for producer that caused this...\n", stuckPartition, stuckOffset)
174-
cl.AssignPartitions(kgo.ConsumePartitions(map[string]map[int32]kgo.Offset{
175+
cl = kcl.RemakeWithOpts(kgo.ConsumePartitions(map[string]map[int32]kgo.Offset{
175176
topic: map[int32]kgo.Offset{
176177
stuckPartition: kgo.NewOffset().At(stuckOffset),
177178
},
@@ -198,7 +199,7 @@ func unstick(cl *kgo.Client, topic string) {
198199
var txnid string
199200
{
200201
fmt.Printf("Found causing producer ID and epoch %d/%d, looking in __transaction_state to find its transactional id...\n", pid, epoch)
201-
cl.AssignPartitions(kgo.ConsumeTopics(kgo.NewOffset().AtStart(), "__transaction_state"))
202+
cl = kcl.RemakeWithOpts(kgo.ConsumeTopics("__transaction_state"))
202203
var found bool
203204
for !found {
204205
fetches := cl.PollFetches(context.Background())
@@ -225,7 +226,7 @@ func unstick(cl *kgo.Client, topic string) {
225226
txnid = k.TransactionalID
226227
})
227228
}
228-
cl.AssignPartitions() // stop consuming, drop buffered data
229+
cl = kcl.RemakeWithOpts() // stop consuming, drop buffered data
229230
}
230231

231232
{

commands/consume/consume.go

+9-13
Original file line numberDiff line numberDiff line change
@@ -69,12 +69,10 @@ func (c *consumption) run(topics []string) {
6969
out.Die("__consumer_offsets or __transaction_state must be the only topic listed when trying to consume it")
7070
}
7171

72-
var directOpts []kgo.DirectConsumeOpt
73-
var groupOpts []kgo.GroupOpt
7472
offset := c.parseOffset()
73+
c.cl.AddOpt(kgo.ConsumeResetOffset(offset))
7574
if len(c.partitions) == 0 {
76-
directOpts = append(directOpts, kgo.ConsumeTopics(offset, topics...))
77-
groupOpts = append(groupOpts, kgo.GroupTopics(topics...))
75+
c.cl.AddOpt(kgo.ConsumeTopics(topics...))
7876
} else {
7977
if len(c.group) != 0 {
8078
out.Die("incompatible flag assignment: group consuming cannot be used with direct partition consuming")
@@ -87,11 +85,10 @@ func (c *consumption) run(topics []string) {
8785
}
8886
offsets[topic] = partOffsets
8987
}
90-
directOpts = append(directOpts, kgo.ConsumePartitions(offsets))
88+
c.cl.AddOpt(kgo.ConsumePartitions(offsets))
9189
}
9290
if c.regex {
93-
directOpts = append(directOpts, kgo.ConsumeTopicsRegex())
94-
groupOpts = append(groupOpts, kgo.GroupTopicsRegex())
91+
c.cl.AddOpt(kgo.ConsumeRegex())
9592
}
9693

9794
var balancer kgo.GroupBalancer
@@ -107,10 +104,10 @@ func (c *consumption) run(topics []string) {
107104
default:
108105
out.Die("unrecognized group balancer %q", c.groupAlg)
109106
}
110-
groupOpts = append(groupOpts, kgo.Balancers(balancer))
107+
c.cl.AddOpt(kgo.Balancers(balancer))
111108

112109
if c.instanceID != "" {
113-
groupOpts = append(groupOpts, kgo.InstanceID(c.instanceID))
110+
c.cl.AddOpt(kgo.InstanceID(c.instanceID))
114111
}
115112

116113
sigs := make(chan os.Signal, 2)
@@ -126,13 +123,12 @@ func (c *consumption) run(topics []string) {
126123
c.cl.AddOpt(kgo.FetchMaxWait(c.fetchMaxWait))
127124
c.cl.AddOpt(kgo.Rack(c.rack))
128125

129-
cl := c.cl.Client()
130126
isGroup := len(c.group) > 0 && !(isConsumerOffsets || isTransactionState)
131127
if isGroup {
132-
cl.AssignGroup(c.group, groupOpts...)
133-
} else {
134-
cl.AssignPartitions(directOpts...)
128+
c.cl.AddOpt(kgo.ConsumerGroup(c.group))
135129
}
130+
131+
cl := c.cl.Client()
136132
co := &consumeOutput{
137133
cl: cl,
138134
numPerPartition: c.numPerPartition,

commands/transact/transact.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -96,10 +96,9 @@ func Command(cl *client.Client) *cobra.Command {
9696
// regex,
9797
// balancer,
9898
// instance ID
99-
var groupOpts []kgo.GroupOpt
100-
groupOpts = append(groupOpts, kgo.GroupTopics(topics...))
99+
cl.AddOpt(kgo.ConsumeTopics(topics...))
101100
if regex {
102-
groupOpts = append(groupOpts, kgo.GroupTopicsRegex())
101+
cl.AddOpt(kgo.ConsumeRegex())
103102
}
104103
var balancer kgo.GroupBalancer
105104
switch groupAlg {
@@ -114,9 +113,9 @@ func Command(cl *client.Client) *cobra.Command {
114113
default:
115114
out.Die("unrecognized group balancer %q", groupAlg)
116115
}
117-
groupOpts = append(groupOpts, kgo.Balancers(balancer))
116+
cl.AddOpt(kgo.Balancers(balancer))
118117
if instanceID != "" {
119-
groupOpts = append(groupOpts, kgo.InstanceID(instanceID))
118+
cl.AddOpt(kgo.InstanceID(instanceID))
120119
}
121120

122121
cl.AddOpt(kgo.FetchIsolationLevel(kgo.ReadCommitted())) // we will be reading committed
@@ -152,6 +151,7 @@ func Command(cl *client.Client) *cobra.Command {
152151
cl.AddOpt(kgo.TransactionalID(txnID))
153152
cl.AddOpt(kgo.StopOnDataLoss())
154153
cl.AddOpt(kgo.BatchCompression(codec))
154+
cl.AddOpt(kgo.ConsumerGroup(group))
155155

156156
sigs := make(chan os.Signal, 2)
157157
signal.Notify(sigs, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP)
@@ -162,7 +162,7 @@ func Command(cl *client.Client) *cobra.Command {
162162

163163
// If we made it this far, our options are valid:
164164
// assign our group and begin execing.
165-
txnSess := cl.Client().AssignGroupTransactSession(group, groupOpts...)
165+
txnSess := cl.GroupTransactSession()
166166

167167
quitCtx, cancel := context.WithCancel(context.Background())
168168
go transact(quitCtx, cl.Client(), txnSess, w, r, destTopic, verbose, args...)

go.mod

+2-2
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ go 1.13
44

55
require (
66
github.com/BurntSushi/toml v0.3.1
7-
github.com/aws/aws-sdk-go v1.38.45
7+
github.com/aws/aws-sdk-go v1.38.57
88
github.com/spf13/cobra v1.1.3
9-
github.com/twmb/franz-go v0.7.5
9+
github.com/twmb/franz-go v0.8.0
1010
github.com/twmb/go-strftime v0.0.0-20190915101236-e74f7c4fe4fa
1111
golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a
1212
)

go.sum

+10-9
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmV
2222
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
2323
github.com/aws/aws-sdk-go v1.38.45 h1:pQmv1vT/voRAjENnPsT4WobFBgLwnODDFogrt2kXc7M=
2424
github.com/aws/aws-sdk-go v1.38.45/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro=
25+
github.com/aws/aws-sdk-go v1.38.57 h1:Jo6uOnWNbj4jL/8t/XUrHOKm1J6pPcYFhGzda20UcUk=
26+
github.com/aws/aws-sdk-go v1.38.57/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro=
2527
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
2628
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
2729
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
@@ -118,8 +120,8 @@ github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfV
118120
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
119121
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
120122
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
121-
github.com/klauspost/compress v1.12.2 h1:2KCfW3I9M7nSc5wOqXAlW2v2U6v+w6cbjvbfp+OykW8=
122-
github.com/klauspost/compress v1.12.2/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
123+
github.com/klauspost/compress v1.13.0 h1:2T7tUoQrQT+fQWdaY5rjWztFGAFwbGD04iPJg90ZiOs=
124+
github.com/klauspost/compress v1.13.0/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
123125
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
124126
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
125127
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
@@ -146,8 +148,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW
146148
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
147149
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
148150
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
149-
github.com/pierrec/lz4/v4 v4.1.6 h1:ueMTcBBFrbT8K4uGDNNZPa8Z7LtPV7Cl0TDjaeHxP44=
150-
github.com/pierrec/lz4/v4 v4.1.6/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
151+
github.com/pierrec/lz4/v4 v4.1.7 h1:UDV9geJWhFIufAliH7HQlz9wP3JA0t748w+RwbWMLow=
152+
github.com/pierrec/lz4/v4 v4.1.7/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
151153
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
152154
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
153155
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
@@ -191,8 +193,8 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P
191193
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
192194
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
193195
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
194-
github.com/twmb/franz-go v0.7.5 h1:31zdvpnfz8CGGZW4vyDV8v99ygtfUpISBiBQ+81LAAk=
195-
github.com/twmb/franz-go v0.7.5/go.mod h1:StwVC7bQkTM3I6DJyNGvmgpnza7Tz11YfLACXwMvQ0k=
196+
github.com/twmb/franz-go v0.8.0 h1:DFe9ptohEBtzuFyDKpUM1d39h+jkuEg/fEudDHqKhyw=
197+
github.com/twmb/franz-go v0.8.0/go.mod h1:v6QnB3abhlVAzlIEIO5L/1Emu8NlkreCI2HSps9utH0=
196198
github.com/twmb/go-rbtree v1.0.0 h1:KxN7dXJ8XaZ4cvmHV1qqXTshxX3EBvX/toG5+UR49Mg=
197199
github.com/twmb/go-rbtree v1.0.0/go.mod h1:UlIAI8gu3KRPkXSobZnmJfVwCJgEhD/liWzT5ppzIyc=
198200
github.com/twmb/go-strftime v0.0.0-20190915101236-e74f7c4fe4fa h1:eq9HJTMjHC3K/GYE7RuvhweBhIxXi9y6BYM6Aox3UbA=
@@ -211,7 +213,6 @@ golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8U
211213
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
212214
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
213215
golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
214-
golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
215216
golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a h1:kr2P4QFmQr29mSLA43kwrOcgcReGTfbE9N577tCTuBc=
216217
golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8=
217218
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
@@ -249,8 +250,8 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL
249250
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
250251
golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
251252
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
252-
golang.org/x/net v0.0.0-20210423184538-5f58ad60dda6 h1:0PC75Fz/kyMGhL0e1QnypqK2kQMqKt9csD1GnMJR+Zk=
253-
golang.org/x/net v0.0.0-20210423184538-5f58ad60dda6/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk=
253+
golang.org/x/net v0.0.0-20210525063256-abc453219eb5 h1:wjuX4b5yYQnEQHzd+CBcrcC6OVR2J1CN6mUy0oSxIPo=
254+
golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
254255
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
255256
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
256257
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=

0 commit comments

Comments
 (0)