-
Notifications
You must be signed in to change notification settings - Fork 20
/
Copy pathsystem.go
107 lines (100 loc) · 4.25 KB
/
system.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
// Copyright 2018 GRAIL, Inc. All rights reserved.
// Use of this source code is governed by the Apache 2.0
// license that can be found in the LICENSE file.
package bigmachine
import (
"context"
"encoding/gob"
"io"
"net/http"
"os"
"sync"
"time"
"github.com/grailbio/base/must"
)
// A System implements a set of methods to set up a bigmachine and
// start new machines. Systems are also responsible for providing an
// HTTP client that can be used to communicate between machines
// and drivers.
type System interface {
// Name is the name of this system. It is used to multiplex multiple system
// implementations, and thus should be unique among systems.
Name() string
// Init initializes this system for use by a bigmachine.B. For convenience,
// it is called by bigmachine.Start, so implementations must be idempotent
// and return nil in subsequent calls if the first call returned nil.
Init() error
// Main is called to start a machine. The system is expected to take over
// the process; the bigmachine fails if main returns (and if it does, it
// should always return with an error).
Main() error
// Event logs an event of typ with (key, value) fields given in fieldPairs
// as k0, v0, k1, v1, ...kn, vn. For example:
//
// s.Event("bigmachine:machineStart", "addr", "https://m0")
//
// These semi-structured events are used for analytics.
Event(typ string, fieldPairs ...interface{})
// HTTPClient returns an HTTP client that can be used to communicate
// from drivers to machines as well as between machines.
HTTPClient() *http.Client
// ListenAndServe serves the provided handler on an HTTP server that
// is reachable from other instances in the bigmachine cluster. If addr
// is the empty string, the default cluster address is used.
ListenAndServe(addr string, handle http.Handler) error
// Start launches up to n new machines. The returned machines can be in
// Unstarted state, but should eventually become available.
Start(ctx context.Context, b *B, n int) ([]*Machine, error)
// Exit is called to terminate a machine with the provided exit code.
Exit(int)
// Shutdown is called on graceful driver exit. It's should be used to
// perform system tear down. It is not guaranteed to be called.
Shutdown()
// Maxprocs returns the maximum number of processors per machine,
// as configured. Returns 0 if is a dynamic value.
Maxprocs() int
// KeepaliveConfig returns the various keepalive timeouts that should
// be used to maintain keepalives for machines started by this system.
KeepaliveConfig() (period, timeout, rpcTimeout time.Duration)
// Tail returns a reader that follows the bigmachine process logs.
Tail(ctx context.Context, m *Machine) (io.Reader, error)
// Read returns a reader that reads the contents of the provided filename
// on the host. This is done outside of the supervisor to support external
// monitoring of the host.
Read(ctx context.Context, m *Machine, filename string) (io.Reader, error)
// KeepaliveFailed notifies the system of a failed call to
// Supervisor.Keepalive. This might be an intermittent failure that will be
// retried. The system can use this notification as a hint to otherwise
// probe machine health.
KeepaliveFailed(ctx context.Context, m *Machine)
}
var (
systemsMu sync.Mutex
systems = make(map[string]System)
)
// RegisterSystem is used by systems implementation to register a
// system implementation. RegisterSystem registers the implementation
// with gob, so that instances can be transmitted over the wire. It
// also registers the provided System instance as a default to use
// for the name to support bigmachine.Init.
func RegisterSystem(name string, system System) {
gob.Register(system)
systemsMu.Lock()
defer systemsMu.Unlock()
must.Nil(systems[name], "system ", name, " already registered")
systems[name] = system
}
// Init initializes bigmachine. It should be called after flag
// parsing and global setup in bigmachine-based processes. Init is a
// no-op if the binary is not running as a bigmachine worker; if it
// is, Init never returns.
func Init() {
name := os.Getenv("BIGMACHINE_SYSTEM")
if name == "" {
return
}
system, ok := systems[name]
must.True(ok, "system ", name, " not found")
must.Nil(system.Init(), "system initialization error")
must.Never("start returned: ", Start(system))
}