Skip to content

Commit fb3056d

Browse files
authored
Refactorings for simplify readability (#35)
1 parent 3ecf98c commit fb3056d

File tree

21 files changed

+359
-875
lines changed

21 files changed

+359
-875
lines changed

internal/controller/api.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ import (
66
"errors"
77
storepkg "github.com/cirruslabs/orchard/internal/controller/store"
88
"github.com/cirruslabs/orchard/internal/responder"
9-
"github.com/cirruslabs/orchard/pkg/client"
109
v1pkg "github.com/cirruslabs/orchard/pkg/resource/v1"
10+
"github.com/cirruslabs/orchard/rpc"
1111
"github.com/deckarep/golang-set/v2"
1212
"github.com/gin-gonic/gin"
1313
"google.golang.org/grpc/metadata"
@@ -169,11 +169,11 @@ func (controller *Controller) authorizeGRPC(ctx context.Context, scopes ...v1pkg
169169
return true
170170
}
171171

172-
name := metadata.ValueFromIncomingContext(ctx, client.MetadataServiceAccountNameKey)
172+
name := metadata.ValueFromIncomingContext(ctx, rpc.MetadataServiceAccountNameKey)
173173
if len(name) != 1 {
174174
return false
175175
}
176-
token := metadata.ValueFromIncomingContext(ctx, client.MetadataServiceAccountTokenKey)
176+
token := metadata.ValueFromIncomingContext(ctx, rpc.MetadataServiceAccountTokenKey)
177177
if len(token) != 1 {
178178
return false
179179
}

internal/controller/api_vms_portforward.go

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -45,35 +45,36 @@ func (controller *Controller) portForwardVM(ctx *gin.Context) responder.Responde
4545
}
4646

4747
// Sanity-check
48-
if vm.Worker == "" {
48+
if vm.WorkerUID == "" {
4949
return responder.Code(http.StatusServiceUnavailable)
5050
}
5151

52-
// Request and wait for a rendez-vous with a worker
53-
token := uuid.New().String()
54-
55-
rendezvousConnCh, cancel := controller.proxy.Request(ctx, token)
52+
// Request and wait for a connection with a worker
53+
session := uuid.New().String()
54+
boomerangConnCh, cancel := controller.proxy.Request(ctx, session)
5655
defer cancel()
5756

58-
err = controller.watcher.Notify(ctx, vm.Worker, &rpc.WatchFromController{
59-
Action: &rpc.WatchFromController_PortForwardAction{
60-
PortForwardAction: &rpc.WatchFromController_PortForward{
61-
Token: token,
62-
VmUid: vm.UID,
63-
VmPort: uint32(port),
57+
// send request to worker to initiate port-forwarding connection back to us
58+
err = controller.workerNotifier.Notify(ctx, vm.WorkerUID, &rpc.WatchInstruction{
59+
Action: &rpc.WatchInstruction_PortForwardAction{
60+
PortForwardAction: &rpc.WatchInstruction_PortForward{
61+
Session: session,
62+
VmUid: vm.UID,
63+
VmPort: uint32(port),
6464
},
6565
},
6666
})
6767
if err != nil {
68-
controller.logger.Warnf("failed to rendez-vous with the worker %s: %v", vm.Worker, err)
68+
controller.logger.Warnf("failed to request port-forwarding from the worker %s: %v", vm.WorkerUID, err)
6969

7070
return responder.Code(http.StatusServiceUnavailable)
7171
}
7272

73+
// worker will asynchronously start port-forwarding so we wait
7374
select {
74-
case rendezvousConn := <-rendezvousConnCh:
75+
case fromWorkerConnection := <-boomerangConnCh:
7576
websocket.Handler(func(wsConn *websocket.Conn) {
76-
if err := proxy.Connections(wsConn, rendezvousConn); err != nil {
77+
if err := proxy.Connections(wsConn, fromWorkerConnection); err != nil {
7778
controller.logger.Warnf("failed to port-forward: %v", err)
7879
}
7980
}).ServeHTTP(ctx.Writer, ctx.Request)

internal/controller/controller.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@ import (
55
"crypto/tls"
66
"errors"
77
"fmt"
8-
"github.com/cirruslabs/orchard/internal/controller/rendezvous"
8+
"github.com/cirruslabs/orchard/internal/controller/notifier"
9+
"github.com/cirruslabs/orchard/internal/controller/proxy"
910
storepkg "github.com/cirruslabs/orchard/internal/controller/store"
1011
"github.com/cirruslabs/orchard/internal/controller/store/badger"
1112
"github.com/cirruslabs/orchard/internal/netconstants"
@@ -37,16 +38,16 @@ type Controller struct {
3738
store storepkg.Store
3839
logger *zap.SugaredLogger
3940
grpcServer *grpc.Server
40-
watcher *rendezvous.Watcher
41-
proxy *rendezvous.Proxy
41+
workerNotifier *notifier.Notifier
42+
proxy *proxy.Proxy
4243

4344
rpc.UnimplementedControllerServer
4445
}
4546

4647
func New(opts ...Option) (*Controller, error) {
4748
controller := &Controller{
48-
watcher: rendezvous.NewWatcher(),
49-
proxy: rendezvous.NewProxy(),
49+
workerNotifier: notifier.NewNotifier(),
50+
proxy: proxy.NewProxy(),
5051
}
5152

5253
// Apply options
@@ -129,7 +130,7 @@ func (controller *Controller) EnsureServiceAccount(serviceAccount *v1.ServiceAcc
129130

130131
func (controller *Controller) Run(ctx context.Context) error {
131132
// Run the scheduler so that each VM will eventually
132-
// be assigned to a specific Worker
133+
// be assigned to a specific WorkerUID
133134
go func() {
134135
err := runScheduler(controller.store)
135136
if err != nil {
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package notifier
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"github.com/cirruslabs/orchard/internal/concurrentmap"
8+
"github.com/cirruslabs/orchard/rpc"
9+
)
10+
11+
var ErrNoWorker = errors.New("no worker registered with this name")
12+
13+
type Notifier struct {
14+
workers *concurrentmap.ConcurrentMap[*WorkerSlot]
15+
}
16+
17+
type WorkerSlot struct {
18+
ctx context.Context
19+
ch chan *rpc.WatchInstruction
20+
}
21+
22+
func NewNotifier() *Notifier {
23+
return &Notifier{
24+
workers: concurrentmap.NewConcurrentMap[*WorkerSlot](),
25+
}
26+
}
27+
28+
func (watcher *Notifier) Register(ctx context.Context, workerUID string) (chan *rpc.WatchInstruction, func()) {
29+
subCtx, cancel := context.WithCancel(ctx)
30+
workerCh := make(chan *rpc.WatchInstruction)
31+
32+
watcher.workers.Store(workerUID, &WorkerSlot{
33+
ctx: subCtx,
34+
ch: workerCh,
35+
})
36+
37+
return workerCh, func() {
38+
watcher.workers.Delete(workerUID)
39+
cancel()
40+
}
41+
}
42+
43+
func (watcher *Notifier) Notify(ctx context.Context, workerUID string, msg *rpc.WatchInstruction) error {
44+
slot, ok := watcher.workers.Load(workerUID)
45+
if !ok {
46+
return fmt.Errorf("%w: %s", ErrNoWorker, workerUID)
47+
}
48+
49+
select {
50+
case slot.ch <- msg:
51+
return nil
52+
case <-slot.ctx.Done():
53+
return slot.ctx.Err()
54+
case <-ctx.Done():
55+
return ctx.Err()
56+
}
57+
}

internal/controller/rendezvous/watcher_test.go renamed to internal/controller/notifier/notifier_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,35 @@
1-
package rendezvous_test
1+
package notifier_test
22

33
import (
44
"context"
55
"fmt"
6-
"github.com/cirruslabs/orchard/internal/controller/rendezvous"
6+
"github.com/cirruslabs/orchard/internal/controller/notifier"
77
"github.com/google/uuid"
88
"github.com/stretchr/testify/require"
99
"sync"
1010
"testing"
1111
"time"
1212
)
1313

14-
func TestWatcher(t *testing.T) {
14+
func TestNotifier(t *testing.T) {
1515
ctx := context.Background()
1616

17-
watcher := rendezvous.NewWatcher()
17+
notifier := notifier.NewNotifier()
1818

1919
var topic = uuid.New().String()
2020

21-
msgCh, cancel := watcher.Subscribe(context.Background(), topic)
21+
msgCh, cancel := notifier.Register(context.Background(), topic)
2222
defer cancel()
2323

2424
var wg sync.WaitGroup
2525
wg.Add(1)
2626

2727
go func() {
28-
require.NoError(t, watcher.Notify(ctx, topic, nil))
28+
require.NoError(t, notifier.Notify(ctx, topic, nil))
2929

3030
time.Sleep(time.Second)
3131

32-
require.NoError(t, watcher.Notify(ctx, topic, nil))
32+
require.NoError(t, notifier.Notify(ctx, topic, nil))
3333

3434
wg.Done()
3535
}()

internal/controller/proxy/proxy.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package proxy
2+
3+
import (
4+
"context"
5+
"errors"
6+
"github.com/cirruslabs/orchard/internal/concurrentmap"
7+
"net"
8+
)
9+
10+
var ErrInvalidToken = errors.New("invalid proxy token")
11+
12+
type Proxy struct {
13+
sessions *concurrentmap.ConcurrentMap[*TokenSlot]
14+
}
15+
16+
type TokenSlot struct {
17+
ctx context.Context
18+
ch chan net.Conn
19+
}
20+
21+
func NewProxy() *Proxy {
22+
return &Proxy{
23+
sessions: concurrentmap.NewConcurrentMap[*TokenSlot](),
24+
}
25+
}
26+
27+
func (proxy *Proxy) Request(ctx context.Context, session string) (chan net.Conn, func()) {
28+
tokenSlot := &TokenSlot{
29+
ctx: ctx,
30+
ch: make(chan net.Conn),
31+
}
32+
33+
proxy.sessions.Store(session, tokenSlot)
34+
35+
return tokenSlot.ch, func() {
36+
proxy.sessions.Delete(session)
37+
}
38+
}
39+
40+
func (proxy *Proxy) Respond(session string, conn net.Conn) (context.Context, error) {
41+
tokenSlot, ok := proxy.sessions.Load(session)
42+
if !ok {
43+
return nil, ErrInvalidToken
44+
}
45+
46+
tokenSlot.ch <- conn
47+
48+
return tokenSlot.ctx, nil
49+
}

internal/controller/rendezvous/proxy_test.go renamed to internal/controller/proxy/proxy_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
package rendezvous_test
1+
package proxy_test
22

33
import (
44
"context"
5-
"github.com/cirruslabs/orchard/internal/controller/rendezvous"
5+
"github.com/cirruslabs/orchard/internal/controller/proxy"
66
"github.com/google/uuid"
77
"github.com/stretchr/testify/require"
88
"net"
@@ -15,7 +15,7 @@ func TestProxy(t *testing.T) {
1515

1616
expectedConn, _ := net.Pipe()
1717

18-
proxy := rendezvous.NewProxy()
18+
proxy := proxy.NewProxy()
1919

2020
token := uuid.New().String()
2121

internal/controller/rendezvous/proxy.go

Lines changed: 0 additions & 49 deletions
This file was deleted.

internal/controller/rendezvous/watcher.go

Lines changed: 0 additions & 57 deletions
This file was deleted.

0 commit comments

Comments
 (0)