Skip to content

Commit a06a4de

Browse files
committed
for ctl commands, read all lines before processing, to prevent out of sync protocol when handling errors.
the protocol is often: read one or more lines. only then return error. if we would return an error after reading 1 line, parsing it and failing, the writer (client connecting) may be busy writing more lines, not reading an error response yet.
1 parent 1a0a396 commit a06a4de

File tree

1 file changed

+43
-24
lines changed

1 file changed

+43
-24
lines changed

ctl.go

Lines changed: 43 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -442,7 +442,8 @@ func servectlcmd(ctx context.Context, ctl *ctl, shutdown func()) {
442442
> id
443443
< "ok" or error
444444
*/
445-
id, err := strconv.ParseInt(ctl.xread(), 10, 64)
445+
idstr := ctl.xread()
446+
id, err := strconv.ParseInt(idstr, 10, 64)
446447
ctl.xcheck(err, "parsing id")
447448
err = queue.HoldRuleRemove(ctx, log, id)
448449
ctl.xcheck(err, "remove hold rule")
@@ -456,10 +457,12 @@ func servectlcmd(ctx context.Context, ctl *ctl, shutdown func()) {
456457
< "ok"
457458
< stream
458459
*/
460+
filterline := ctl.xread()
461+
sortline := ctl.xread()
459462
var f queue.Filter
460-
xparseJSON(ctl, ctl.xread(), &f)
463+
xparseJSON(ctl, filterline, &f)
461464
var s queue.Sort
462-
xparseJSON(ctl, ctl.xread(), &s)
465+
xparseJSON(ctl, sortline, &s)
463466
qmsgs, err := queue.List(ctx, f, s)
464467
ctl.xcheck(err, "listing queue")
465468
ctl.xwriteok()
@@ -487,9 +490,10 @@ func servectlcmd(ctx context.Context, ctl *ctl, shutdown func()) {
487490
< count
488491
*/
489492

490-
var f queue.Filter
491-
xparseJSON(ctl, ctl.xread(), &f)
493+
filterline := ctl.xread()
492494
hold := ctl.xread() == "true"
495+
var f queue.Filter
496+
xparseJSON(ctl, filterline, &f)
493497
count, err := queue.HoldSet(ctx, f, hold)
494498
ctl.xcheck(err, "setting on hold status for messages")
495499
ctl.xwriteok()
@@ -505,10 +509,12 @@ func servectlcmd(ctx context.Context, ctl *ctl, shutdown func()) {
505509
< count
506510
*/
507511

508-
var f queue.Filter
509-
xparseJSON(ctl, ctl.xread(), &f)
512+
filterline := ctl.xread()
510513
relnow := ctl.xread()
511-
d, err := time.ParseDuration(ctl.xread())
514+
duration := ctl.xread()
515+
var f queue.Filter
516+
xparseJSON(ctl, filterline, &f)
517+
d, err := time.ParseDuration(duration)
512518
ctl.xcheck(err, "parsing duration for next delivery attempt")
513519
var count int
514520
if relnow == "" {
@@ -529,9 +535,10 @@ func servectlcmd(ctx context.Context, ctl *ctl, shutdown func()) {
529535
< count
530536
*/
531537

532-
var f queue.Filter
533-
xparseJSON(ctl, ctl.xread(), &f)
538+
filterline := ctl.xread()
534539
transport := ctl.xread()
540+
var f queue.Filter
541+
xparseJSON(ctl, filterline, &f)
535542
count, err := queue.TransportSet(ctx, f, transport)
536543
ctl.xcheck(err, "adding to next delivery attempts in queue")
537544
ctl.xwriteok()
@@ -546,8 +553,7 @@ func servectlcmd(ctx context.Context, ctl *ctl, shutdown func()) {
546553
< count
547554
*/
548555

549-
var f queue.Filter
550-
xparseJSON(ctl, ctl.xread(), &f)
556+
filterline := ctl.xread()
551557
reqtls := ctl.xread()
552558
var req *bool
553559
switch reqtls {
@@ -561,6 +567,8 @@ func servectlcmd(ctx context.Context, ctl *ctl, shutdown func()) {
561567
default:
562568
ctl.xcheck(fmt.Errorf("unknown value %q", reqtls), "parsing value")
563569
}
570+
var f queue.Filter
571+
xparseJSON(ctl, filterline, &f)
564572
count, err := queue.RequireTLSSet(ctx, f, req)
565573
ctl.xcheck(err, "setting tls requirements on messages in queue")
566574
ctl.xwriteok()
@@ -574,8 +582,9 @@ func servectlcmd(ctx context.Context, ctl *ctl, shutdown func()) {
574582
< count
575583
*/
576584

585+
filterline := ctl.xread()
577586
var f queue.Filter
578-
xparseJSON(ctl, ctl.xread(), &f)
587+
xparseJSON(ctl, filterline, &f)
579588
count, err := queue.Fail(ctx, log, f)
580589
ctl.xcheck(err, "marking messages from queue as failed")
581590
ctl.xwriteok()
@@ -589,8 +598,9 @@ func servectlcmd(ctx context.Context, ctl *ctl, shutdown func()) {
589598
< count
590599
*/
591600

601+
filterline := ctl.xread()
592602
var f queue.Filter
593-
xparseJSON(ctl, ctl.xread(), &f)
603+
xparseJSON(ctl, filterline, &f)
594604
count, err := queue.Drop(ctx, log, f)
595605
ctl.xcheck(err, "dropping messages from queue")
596606
ctl.xwriteok()
@@ -626,10 +636,12 @@ func servectlcmd(ctx context.Context, ctl *ctl, shutdown func()) {
626636
< "ok"
627637
< stream
628638
*/
639+
filterline := ctl.xread()
640+
sortline := ctl.xread()
629641
var f queue.RetiredFilter
630-
xparseJSON(ctl, ctl.xread(), &f)
642+
xparseJSON(ctl, filterline, &f)
631643
var s queue.RetiredSort
632-
xparseJSON(ctl, ctl.xread(), &s)
644+
xparseJSON(ctl, sortline, &s)
633645
qmsgs, err := queue.RetiredList(ctx, f, s)
634646
ctl.xcheck(err, "listing retired queue")
635647
ctl.xwriteok()
@@ -688,10 +700,12 @@ func servectlcmd(ctx context.Context, ctl *ctl, shutdown func()) {
688700
< "ok"
689701
< stream
690702
*/
703+
filterline := ctl.xread()
704+
sortline := ctl.xread()
691705
var f queue.HookFilter
692-
xparseJSON(ctl, ctl.xread(), &f)
706+
xparseJSON(ctl, filterline, &f)
693707
var s queue.HookSort
694-
xparseJSON(ctl, ctl.xread(), &s)
708+
xparseJSON(ctl, sortline, &s)
695709
hooks, err := queue.HookList(ctx, f, s)
696710
ctl.xcheck(err, "listing webhooks")
697711
ctl.xwriteok()
@@ -720,10 +734,12 @@ func servectlcmd(ctx context.Context, ctl *ctl, shutdown func()) {
720734
< count
721735
*/
722736

723-
var f queue.HookFilter
724-
xparseJSON(ctl, ctl.xread(), &f)
737+
filterline := ctl.xread()
725738
relnow := ctl.xread()
726-
d, err := time.ParseDuration(ctl.xread())
739+
duration := ctl.xread()
740+
var f queue.HookFilter
741+
xparseJSON(ctl, filterline, &f)
742+
d, err := time.ParseDuration(duration)
727743
ctl.xcheck(err, "parsing duration for next delivery attempt")
728744
var count int
729745
if relnow == "" {
@@ -743,8 +759,9 @@ func servectlcmd(ctx context.Context, ctl *ctl, shutdown func()) {
743759
< count
744760
*/
745761

762+
filterline := ctl.xread()
746763
var f queue.HookFilter
747-
xparseJSON(ctl, ctl.xread(), &f)
764+
xparseJSON(ctl, filterline, &f)
748765
count, err := queue.HookCancel(ctx, log, f)
749766
ctl.xcheck(err, "canceling webhooks in queue")
750767
ctl.xwriteok()
@@ -784,10 +801,12 @@ func servectlcmd(ctx context.Context, ctl *ctl, shutdown func()) {
784801
< "ok"
785802
< stream
786803
*/
804+
filterline := ctl.xread()
805+
sortline := ctl.xread()
787806
var f queue.HookRetiredFilter
788-
xparseJSON(ctl, ctl.xread(), &f)
807+
xparseJSON(ctl, filterline, &f)
789808
var s queue.HookRetiredSort
790-
xparseJSON(ctl, ctl.xread(), &s)
809+
xparseJSON(ctl, sortline, &s)
791810
l, err := queue.HookRetiredList(ctx, f, s)
792811
ctl.xcheck(err, "listing retired webhooks")
793812
ctl.xwriteok()

0 commit comments

Comments
 (0)