Skip to content

Commit 676a48c

Browse files
authored
Merge pull request #6531 from tonistiigi/exec-order-fix
gateway: fix exec process lifecycle ordering
2 parents bb491e1 + 6f005f2 commit 676a48c

File tree

2 files changed

+12
-8
lines changed

2 files changed

+12
-8
lines changed

frontend/gateway/gateway.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1571,12 +1571,20 @@ func (lbf *llbBridgeForwarder) ExecProcess(srv pb.LLBBridge_ExecProcessServer) e
15711571
return stack.Enable(err)
15721572
})
15731573

1574+
// startedSent gates the proc.Wait goroutine until
1575+
// Started is sent and output readers are spawned,
1576+
// preventing Exit-before-Started and a deadlock
1577+
// where pio.Close() races with reader setup.
1578+
startedSent := make(chan struct{})
1579+
15741580
eg.Go(func() error {
15751581
defer func() {
15761582
pio.Close()
15771583
}()
15781584
err := proc.Wait()
15791585

1586+
<-startedSent
1587+
15801588
var statusCode uint32
15811589
var exitError *pb.ExitError
15821590
var statusError *spb.Status
@@ -1626,6 +1634,7 @@ func (lbf *llbBridgeForwarder) ExecProcess(srv pb.LLBBridge_ExecProcessServer) e
16261634
},
16271635
})
16281636
if err != nil {
1637+
close(startedSent)
16291638
return stack.Enable(err)
16301639
}
16311640

@@ -1669,6 +1678,7 @@ func (lbf *llbBridgeForwarder) ExecProcess(srv pb.LLBBridge_ExecProcessServer) e
16691678
return stack.Enable(err)
16701679
})
16711680
}
1681+
close(startedSent)
16721682
}
16731683
}
16741684
})

frontend/gateway/grpcclient/client.go

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -820,9 +820,8 @@ func (c *grpcClient) Inputs(ctx context.Context) (map[string]llb.State, error) {
820820
// communication channel between the process and the ExecProcess message
821821
// stream.
822822
type procMessageForwarder struct {
823-
done chan struct{}
824-
closeOnce sync.Once
825-
msgs chan *pb.ExecMessage
823+
done chan struct{}
824+
msgs chan *pb.ExecMessage
826825
}
827826

828827
func newProcMessageForwarder() *procMessageForwarder {
@@ -836,9 +835,6 @@ func (b *procMessageForwarder) Send(ctx context.Context, m *pb.ExecMessage) {
836835
select {
837836
case <-ctx.Done():
838837
case <-b.done:
839-
b.closeOnce.Do(func() {
840-
close(b.msgs)
841-
})
842838
case b.msgs <- m:
843839
}
844840
}
@@ -856,8 +852,6 @@ func (b *procMessageForwarder) Recv(ctx context.Context) (m *pb.ExecMessage, ok
856852

857853
func (b *procMessageForwarder) Close() {
858854
close(b.done)
859-
b.Recv(context.Background()) // flush any messages in queue
860-
b.Send(context.Background(), nil) // ensure channel is closed
861855
}
862856

863857
// messageForwarder manages a single grpc stream for ExecProcess to facilitate

0 commit comments

Comments
 (0)