Skip to content

Commit e5f2a93

Browse files
committed
admin: add alter-partition-assignments, list-partition-reassignments
1 parent 80f6b0f commit e5f2a93

File tree

2 files changed

+180
-0
lines changed

2 files changed

+180
-0
lines changed

commands/admin/admin.go

+138
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ package admin
44
import (
55
"context"
66
"fmt"
7+
"sort"
78

89
"github.com/spf13/cobra"
910

@@ -25,6 +26,8 @@ func Command(cl *client.Client) *cobra.Command {
2526
cmd.AddCommand(deleteRecordsCommand(cl))
2627
cmd.AddCommand(logdirsDescribeCommand(cl))
2728
cmd.AddCommand(logdirsAlterReplicasCommand(cl))
29+
cmd.AddCommand(alterPartitionAssignments(cl))
30+
cmd.AddCommand(listPartitionReassignments(cl))
2831

2932
return cmd
3033
}
@@ -118,3 +121,138 @@ To avoid accidental triggers, this command requires a --run flag to run.
118121

119122
return cmd
120123
}
124+
125+
func alterPartitionAssignments(cl *client.Client) *cobra.Command {
126+
var topicPartReplicas []string
127+
128+
cmd := &cobra.Command{
129+
Use: "alter-partition-assignments",
130+
Short: "Alter partition assignments.",
131+
Long: `Alter which brokers partitions are assigned to (Kafka 2.4.0+).
132+
133+
The syntax for each topic is
134+
135+
topic: 1->2,3,4 ; 2->1,2,3
136+
137+
where the first number is the partition, and -> points to the replicas you
138+
want to move the partition to. Note that since this contains a >, you likely
139+
need to quote your input to the flag.
140+
141+
If a replica list is empty for a specific partition, this cancels any active
142+
reassignment for that partition.
143+
`,
144+
Example: "alter-partition-assignments -t 'foo:1->1,2,3' -t 'bar:2->3,4,5;5->3,4,5'",
145+
Run: func(_ *cobra.Command, args []string) {
146+
tprs, err := flagutil.ParseTopicPartitionReplicas(topicPartReplicas)
147+
out.MaybeDie(err, "unable to parse topic partitions replicas: %v", err)
148+
149+
req := &kmsg.AlterPartitionAssignmentsRequest{
150+
TimeoutMillis: cl.TimeoutMillis(),
151+
}
152+
for topic, partitions := range tprs {
153+
t := kmsg.AlterPartitionAssignmentsRequestTopic{
154+
Topic: topic,
155+
}
156+
for partition, replicas := range partitions {
157+
t.Partitions = append(t.Partitions, kmsg.AlterPartitionAssignmentsRequestTopicPartition{
158+
Partition: partition,
159+
Replicas: replicas,
160+
})
161+
}
162+
req.Topics = append(req.Topics, t)
163+
}
164+
165+
kresp, err := cl.Client().Request(context.Background(), req)
166+
out.MaybeDie(err, "unable to alter partition assignments: %v", err)
167+
resp := kresp.(*kmsg.AlterPartitionAssignmentsResponse)
168+
if cl.AsJSON() {
169+
out.ExitJSON(resp)
170+
}
171+
172+
if resp.ErrorCode != 0 {
173+
out.ErrAndMsg(resp.ErrorCode, resp.ErrorMessage)
174+
out.Exit()
175+
}
176+
177+
tw := out.BeginTabWrite()
178+
defer tw.Flush()
179+
for _, topic := range resp.Topics {
180+
for _, partition := range topic.Partitions {
181+
msg := "OK"
182+
if err := kerr.ErrorForCode(partition.ErrorCode); err != nil {
183+
msg = err.Error()
184+
}
185+
detail := ""
186+
if partition.ErrorMessage != nil {
187+
detail = *partition.ErrorMessage
188+
}
189+
fmt.Fprintf(tw, "%s\t%d\t%s\t%s\n", topic.Topic, partition.Partition, msg, detail)
190+
}
191+
}
192+
},
193+
}
194+
195+
cmd.Flags().StringArrayVarP(&topicPartReplicas, "topic", "t", nil, "topic, partitions, and replica destinations; repeatable")
196+
return cmd
197+
}
198+
199+
func listPartitionReassignments(cl *client.Client) *cobra.Command {
200+
var topicParts []string
201+
202+
cmd := &cobra.Command{
203+
Use: "list-partition-reassignments",
204+
Short: "List partition reassignments.",
205+
Long: `List which partitions are currently being reassigned (Kafka 2.4.0+).
206+
207+
The syntax for each topic is
208+
209+
topic:1,2,3
210+
211+
where the numbers correspond to partitions for a topic.
212+
213+
If no topics are specified, this lists all active reassignments.
214+
`,
215+
Run: func(_ *cobra.Command, args []string) {
216+
tps, err := flagutil.ParseTopicPartitions(topicParts)
217+
out.MaybeDie(err, "unable to parse topic partitions: %v", err)
218+
219+
req := &kmsg.ListPartitionReassignmentsRequest{
220+
TimeoutMillis: cl.TimeoutMillis(),
221+
}
222+
for topic, partitions := range tps {
223+
req.Topics = append(req.Topics, kmsg.ListPartitionReassignmentsRequestTopic{
224+
Topic: topic,
225+
Partitions: partitions,
226+
})
227+
}
228+
229+
kresp, err := cl.Client().Request(context.Background(), req)
230+
out.MaybeDie(err, "unable to list partition reassignments: %v", err)
231+
resp := kresp.(*kmsg.ListPartitionReassignmentsResponse)
232+
if cl.AsJSON() {
233+
out.ExitJSON(resp)
234+
}
235+
236+
if resp.ErrorCode != 0 {
237+
out.ErrAndMsg(resp.ErrorCode, resp.ErrorMessage)
238+
out.Exit()
239+
}
240+
241+
tw := out.BeginTabWrite()
242+
defer tw.Flush()
243+
fmt.Fprint(tw, "TOPIC\tPARTITION\tCURRENT REPLICAS\tADDING\tREMOVING\n")
244+
for _, topic := range resp.Topics {
245+
for _, p := range topic.Partitions {
246+
sort.Slice(p.Replicas, func(i, j int) bool { return p.Replicas[i] < p.Replicas[j] })
247+
sort.Slice(p.AddingReplicas, func(i, j int) bool { return p.AddingReplicas[i] < p.AddingReplicas[j] })
248+
sort.Slice(p.RemovingReplicas, func(i, j int) bool { return p.RemovingReplicas[i] < p.RemovingReplicas[j] })
249+
fmt.Fprintf(tw, "%s\t%d\t%v\t%v\t%v\n", topic.Topic, p.Partition, p.Replicas, p.AddingReplicas, p.RemovingReplicas)
250+
}
251+
}
252+
},
253+
}
254+
255+
cmd.Flags().StringArrayVarP(&topicParts, "topic", "t", nil, "topic and partitions to list partition reassignments for; repeatable")
256+
257+
return cmd
258+
}

flagutil/flagutil.go

+42
Original file line numberDiff line numberDiff line change
@@ -33,3 +33,45 @@ func ParseTopicPartitions(list []string) (map[string][]int32, error) {
3333
}
3434
return tps, nil
3535
}
36+
37+
// ParseTopicPartitionReplicas parses a list of the following, spaces trimmed:
38+
//
39+
// topic: 4->3,2,1 ; 5->3,2,1
40+
func ParseTopicPartitionReplicas(list []string) (map[string]map[int32][]int32, error) {
41+
tprs := make(map[string]map[int32][]int32)
42+
for _, item := range list {
43+
tps := strings.SplitN(item, ":", 2)
44+
if len(tps) != 2 {
45+
return nil, fmt.Errorf("item %q invalid empty topic", item)
46+
}
47+
48+
topic := strings.TrimSpace(tps[0])
49+
prs := make(map[int32][]int32)
50+
tprs[topic] = prs
51+
52+
for _, partitionReplicasRaw := range strings.Split(tps[1], ";") {
53+
partitionReplicas := strings.SplitN(partitionReplicasRaw, "->", 2)
54+
if len(partitionReplicas) != 2 {
55+
return nil, fmt.Errorf("item %q invalid partition->replicas bit %q", item, partitionReplicasRaw)
56+
}
57+
partition, err := strconv.Atoi(strings.TrimSpace(partitionReplicas[0]))
58+
if err != nil {
59+
return nil, fmt.Errorf("item %q invalid partition in %q", item, partitionReplicasRaw)
60+
}
61+
p := int32(partition)
62+
prs[p] = nil
63+
for _, r := range strings.Split(partitionReplicas[1], ",") {
64+
r = strings.TrimSpace(r)
65+
if len(r) == 0 {
66+
continue
67+
}
68+
replica, err := strconv.Atoi(r)
69+
if err != nil {
70+
return nil, fmt.Errorf("item %q invalid replica in %q", item, partitionReplicasRaw)
71+
}
72+
prs[p] = append(prs[p], int32(replica))
73+
}
74+
}
75+
}
76+
return tprs, nil
77+
}

0 commit comments

Comments
 (0)