Skip to content

Commit 76f192b

Browse files
authored
API endpoint and associated RPC changes to resolve VMs IP's (#188)
* API endpoint and associated RPC changes to resolve VMs IP's * Fix "Missing expected argument '<name>'" error when doing "tart set" * Implement TestIPEndpoint() and IP() method in controller HTTP client
1 parent 8119b22 commit 76f192b

File tree

18 files changed

+603
-124
lines changed

18 files changed

+603
-124
lines changed

api/openapi.yaml

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,31 @@ paths:
321321
description: VM resource with the given name doesn't exist
322322
'503':
323323
description: Failed to establish connection with the worker responsible for the specified VM
324+
/vms/{name}/ip:
325+
get:
326+
summary: "Resolve the VM's IP address on the worker"
327+
tags:
328+
- vms
329+
parameters:
330+
- in: query
331+
name: wait
332+
description: Duration in seconds to wait for the VM to transition into "running" state if not already running.
333+
schema:
334+
type: integer
335+
minimum: 0
336+
maximum: 65535
337+
required: false
338+
responses:
339+
'200':
340+
description: OK
341+
content:
342+
application/json:
343+
schema:
344+
$ref: '#/components/schemas/IP'
345+
'404':
346+
description: VM resource with the given name doesn't exist
347+
'503':
348+
description: Failed to resolve the IP address on the worker responsible for the specified VM
324349
components:
325350
schemas:
326351
Worker:
@@ -369,6 +394,13 @@ components:
369394
type: object
370395
items:
371396
$ref: '#/components/schemas/Event'
397+
IP:
398+
title: Result of VM's IP resolution
399+
type: object
400+
properties:
401+
ip:
402+
type: string
403+
description: The resolved IP address
372404
Event:
373405
title: Generic Resource Event
374406
type: object

internal/controller/api.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,9 @@ func (controller *Controller) initAPI() *gin.Engine {
121121
v1.GET("/vms/:name/port-forward", func(c *gin.Context) {
122122
controller.portForwardVM(c).Respond(c)
123123
})
124+
v1.GET("/vms/:name/ip", func(c *gin.Context) {
125+
controller.ip(c).Respond(c)
126+
})
124127
v1.DELETE("/vms/:name", func(c *gin.Context) {
125128
controller.deleteVM(c).Respond(c)
126129
})

internal/controller/api_vms_ip.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package controller
2+
3+
import (
4+
"context"
5+
"github.com/cirruslabs/orchard/internal/responder"
6+
v1 "github.com/cirruslabs/orchard/pkg/resource/v1"
7+
"github.com/cirruslabs/orchard/rpc"
8+
"github.com/gin-gonic/gin"
9+
"github.com/google/uuid"
10+
"net/http"
11+
"strconv"
12+
"time"
13+
)
14+
15+
func (controller *Controller) ip(ctx *gin.Context) responder.Responder {
16+
if responder := controller.authorize(ctx, v1.ServiceAccountRoleComputeWrite); responder != nil {
17+
return responder
18+
}
19+
20+
// Retrieve and parse path and query parameters
21+
name := ctx.Param("name")
22+
23+
waitRaw := ctx.Query("wait")
24+
wait, err := strconv.ParseUint(waitRaw, 10, 16)
25+
if err != nil {
26+
return responder.Code(http.StatusBadRequest)
27+
}
28+
waitContext, waitContextCancel := context.WithTimeout(ctx, time.Duration(wait)*time.Second)
29+
defer waitContextCancel()
30+
31+
// Look-up the VM
32+
vm, responderImpl := controller.waitForVM(waitContext, name)
33+
if responderImpl != nil {
34+
return responderImpl
35+
}
36+
37+
// Send an IP resolution request and wait for the result
38+
session := uuid.New().String()
39+
boomerangConnCh, cancel := controller.ipRendezvous.Request(ctx, session)
40+
defer cancel()
41+
42+
err = controller.workerNotifier.Notify(ctx, vm.Worker, &rpc.WatchInstruction{
43+
Action: &rpc.WatchInstruction_ResolveIpAction{
44+
ResolveIpAction: &rpc.WatchInstruction_ResolveIP{
45+
Session: session,
46+
VmUid: vm.UID,
47+
},
48+
},
49+
})
50+
if err != nil {
51+
controller.logger.Warnf("failed to request VM's IP from the worker %s: %v",
52+
vm.Worker, err)
53+
54+
return responder.Code(http.StatusServiceUnavailable)
55+
}
56+
57+
select {
58+
case ip := <-boomerangConnCh:
59+
result := struct {
60+
IP string `json:"ip"`
61+
}{
62+
IP: ip,
63+
}
64+
65+
return responder.JSON(http.StatusOK, &result)
66+
case <-ctx.Done():
67+
return responder.Error(ctx.Err())
68+
}
69+
}

internal/controller/api_vms_portforward.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ func (controller *Controller) portForward(
5959
) responder.Responder {
6060
// Request and wait for a connection with a worker
6161
session := uuid.New().String()
62-
boomerangConnCh, cancel := controller.proxy.Request(ctx, session)
62+
boomerangConnCh, cancel := controller.connRendezvous.Request(ctx, session)
6363
defer cancel()
6464

6565
// send request to worker to initiate port-forwarding connection back to us

internal/controller/controller.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import (
66
"errors"
77
"fmt"
88
"github.com/cirruslabs/orchard/internal/controller/notifier"
9-
"github.com/cirruslabs/orchard/internal/controller/proxy"
9+
"github.com/cirruslabs/orchard/internal/controller/rendezvous"
1010
"github.com/cirruslabs/orchard/internal/controller/scheduler"
1111
"github.com/cirruslabs/orchard/internal/controller/sshserver"
1212
storepkg "github.com/cirruslabs/orchard/internal/controller/store"
@@ -54,7 +54,8 @@ type Controller struct {
5454
logger *zap.SugaredLogger
5555
grpcServer *grpc.Server
5656
workerNotifier *notifier.Notifier
57-
proxy *proxy.Proxy
57+
connRendezvous *rendezvous.Rendezvous[net.Conn]
58+
ipRendezvous *rendezvous.Rendezvous[string]
5859
enableSwaggerDocs bool
5960
workerOfflineTimeout time.Duration
6061
maxWorkersPerLicense uint
@@ -69,7 +70,8 @@ type Controller struct {
6970

7071
func New(opts ...Option) (*Controller, error) {
7172
controller := &Controller{
72-
proxy: proxy.NewProxy(),
73+
connRendezvous: rendezvous.New[net.Conn](),
74+
ipRendezvous: rendezvous.New[string](),
7375
workerOfflineTimeout: 3 * time.Minute,
7476
maxWorkersPerLicense: maxWorkersPerDefaultLicense,
7577
}
@@ -125,7 +127,7 @@ func New(opts ...Option) (*Controller, error) {
125127
// Instantiate the SSH server (if configured)
126128
if controller.sshListenAddr != "" && controller.sshSigner != nil {
127129
controller.sshServer, err = sshserver.NewSSHServer(controller.sshListenAddr, controller.sshSigner,
128-
store, controller.proxy, controller.workerNotifier, controller.sshNoClientAuth, controller.logger)
130+
store, controller.connRendezvous, controller.workerNotifier, controller.sshNoClientAuth, controller.logger)
129131
if err != nil {
130132
return nil, err
131133
}

internal/controller/proxy/proxy.go

Lines changed: 0 additions & 49 deletions
This file was deleted.
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package rendezvous
2+
3+
import (
4+
"context"
5+
"errors"
6+
"github.com/cirruslabs/orchard/internal/concurrentmap"
7+
)
8+
9+
var ErrInvalidToken = errors.New("invalid rendezvous token")
10+
11+
type Rendezvous[T any] struct {
12+
sessions *concurrentmap.ConcurrentMap[*TokenSlot[T]]
13+
}
14+
15+
type TokenSlot[T any] struct {
16+
ctx context.Context
17+
ch chan T
18+
}
19+
20+
func New[T any]() *Rendezvous[T] {
21+
return &Rendezvous[T]{
22+
sessions: concurrentmap.NewConcurrentMap[*TokenSlot[T]](),
23+
}
24+
}
25+
26+
func (rendezvous *Rendezvous[T]) Request(ctx context.Context, session string) (chan T, func()) {
27+
tokenSlot := &TokenSlot[T]{
28+
ctx: ctx,
29+
ch: make(chan T),
30+
}
31+
32+
rendezvous.sessions.Store(session, tokenSlot)
33+
34+
return tokenSlot.ch, func() {
35+
rendezvous.sessions.Delete(session)
36+
}
37+
}
38+
39+
func (rendezvous *Rendezvous[T]) Respond(session string, conn T) (context.Context, error) {
40+
tokenSlot, ok := rendezvous.sessions.Load(session)
41+
if !ok {
42+
return nil, ErrInvalidToken
43+
}
44+
45+
tokenSlot.ch <- conn
46+
47+
return tokenSlot.ctx, nil
48+
}

internal/controller/proxy/proxy_test.go renamed to internal/controller/rendezvous/rendezvous_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
package proxy_test
1+
package rendezvous_test
22

33
import (
44
"context"
5-
"github.com/cirruslabs/orchard/internal/controller/proxy"
5+
"github.com/cirruslabs/orchard/internal/controller/rendezvous"
66
"github.com/google/uuid"
77
"github.com/stretchr/testify/require"
88
"net"
@@ -15,7 +15,7 @@ func TestProxy(t *testing.T) {
1515

1616
expectedConn, _ := net.Pipe()
1717

18-
proxy := proxy.NewProxy()
18+
proxy := rendezvous.New[net.Conn]()
1919

2020
token := uuid.New().String()
2121

internal/controller/rpc.go

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package controller
22

33
import (
4+
"context"
45
v1pkg "github.com/cirruslabs/orchard/pkg/resource/v1"
56
"github.com/cirruslabs/orchard/rpc"
67
"google.golang.org/grpc/metadata"
@@ -61,8 +62,8 @@ func (controller *Controller) PortForward(stream rpc.Controller_PortForwardServe
6162
}),
6263
}
6364

64-
// make proxy aware of the connection
65-
proxyCtx, err := controller.proxy.Respond(sessionMetadataValue[0], conn)
65+
// make connection rendezvous aware of the connection
66+
proxyCtx, err := controller.connRendezvous.Respond(sessionMetadataValue[0], conn)
6667
if err != nil {
6768
return err
6869
}
@@ -74,3 +75,22 @@ func (controller *Controller) PortForward(stream rpc.Controller_PortForwardServe
7475
return stream.Context().Err()
7576
}
7677
}
78+
79+
func (controller *Controller) ResolveIP(ctx context.Context, request *rpc.ResolveIPResult) (*emptypb.Empty, error) {
80+
if !controller.authorizeGRPC(ctx, v1pkg.ServiceAccountRoleComputeWrite) {
81+
return nil, status.Errorf(codes.Unauthenticated, "auth failed")
82+
}
83+
84+
sessionMetadataValue := metadata.ValueFromIncomingContext(ctx, rpc.MetadataWorkerPortForwardingSessionKey)
85+
if len(sessionMetadataValue) == 0 {
86+
return nil, status.Errorf(codes.InvalidArgument, "no session in metadata")
87+
}
88+
89+
// Respond with the resolved IP address
90+
_, err := controller.ipRendezvous.Respond(sessionMetadataValue[0], request.Ip)
91+
if err != nil {
92+
return nil, err
93+
}
94+
95+
return &emptypb.Empty{}, nil
96+
}

internal/controller/sshserver/sshserver.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import (
66
"errors"
77
"fmt"
88
"github.com/cirruslabs/orchard/internal/controller/notifier"
9-
proxypkg "github.com/cirruslabs/orchard/internal/controller/proxy"
9+
"github.com/cirruslabs/orchard/internal/controller/rendezvous"
1010
storepkg "github.com/cirruslabs/orchard/internal/controller/store"
1111
"github.com/cirruslabs/orchard/internal/proxy"
1212
"github.com/cirruslabs/orchard/pkg/resource/v1"
@@ -31,7 +31,7 @@ type SSHServer struct {
3131
listener net.Listener
3232
serverConfig *ssh.ServerConfig
3333
store storepkg.Store
34-
proxy *proxypkg.Proxy
34+
connRendezvous *rendezvous.Rendezvous[net.Conn]
3535
workerNotifier *notifier.Notifier
3636
logger *zap.SugaredLogger
3737
}
@@ -40,14 +40,14 @@ func NewSSHServer(
4040
address string,
4141
signer ssh.Signer,
4242
store storepkg.Store,
43-
proxy *proxypkg.Proxy,
43+
connRendezvous *rendezvous.Rendezvous[net.Conn],
4444
workerNotifier *notifier.Notifier,
4545
noClientAuth bool,
4646
logger *zap.SugaredLogger,
4747
) (*SSHServer, error) {
4848
server := &SSHServer{
4949
store: store,
50-
proxy: proxy,
50+
connRendezvous: connRendezvous,
5151
workerNotifier: workerNotifier,
5252
logger: logger,
5353
}
@@ -232,7 +232,7 @@ func (server *SSHServer) handleDirectTCPIP(ctx context.Context, newChannel ssh.N
232232
// The user wants to connect to an existing VM, request and wait
233233
// for a connection with the worker before accepting the channel
234234
session := uuid.New().String()
235-
boomerangConnCh, cancel := server.proxy.Request(ctx, session)
235+
boomerangConnCh, cancel := server.connRendezvous.Request(ctx, session)
236236
defer cancel()
237237

238238
err = server.workerNotifier.Notify(ctx, vm.Worker, &rpc.WatchInstruction{

0 commit comments

Comments
 (0)