-
Notifications
You must be signed in to change notification settings - Fork 0
/
process_manager.go
118 lines (99 loc) · 2.75 KB
/
process_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package main
import (
"os"
"os/signal"
"syscall"
"github.com/rs/zerolog/log"
)
type Process struct {
Process *os.Process
FinalState *os.ProcessState
}
func startProcess(chan_process chan *Process) (*Process, error) {
log.Info().Msg("Starting new process")
procAttr := os.ProcAttr{
Files: []*os.File{os.Stdin, os.Stdout, os.Stderr},
}
// Ensure the children does not have the --process-manager switch
var args = make([]string, 0, len(os.Args))
for _, v := range os.Args {
if v == "--process-manager" || v == "-process-manager" {
continue
}
args = append(args, v)
}
args = append(args, "--notify-parent")
p := &Process{}
proc, err := os.StartProcess(args[0], args, &procAttr)
if err != nil {
log.Error().Err(err).Msg("Error while starting new process")
return p, err
}
p.Process = proc
go func() {
s, _ := proc.Wait()
p.FinalState = s
chan_process <- p
}()
return p, err
}
func processManager() {
processes := map[*Process]*Process{}
chan_process := make(chan *Process)
var starting *Process
// Start first worker process
proc, err := startProcess(chan_process)
if err != nil {
log.Panic().Err(err).Msg("Unable to start the worker process")
}
processes[proc] = proc
starting = proc
// Signals
chan_signals := make(chan os.Signal, 1)
signal.Notify(chan_signals, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP, syscall.SIGUSR1)
process_manager_loop:
for {
select {
case s := <-chan_signals:
switch s {
case syscall.SIGINT, syscall.SIGTERM:
log.Info().Msg("Termination signal received, forwarding to worker processes")
for _, p := range processes {
p.Process.Signal(syscall.SIGTERM)
}
case syscall.SIGHUP:
if starting != nil {
log.Warn().Msg("Restart signal received but a restart is already ongoing")
} else {
log.Info().Msg("Restart signal received, starting new worker process")
proc, err := startProcess(chan_process)
if err != nil {
log.Panic().Err(err).Msg("Unable to start the new worker process")
}
processes[proc] = proc
starting = proc
}
case syscall.SIGUSR1:
log.Info().Msg("New worker successfully started")
for _, p := range processes {
if p != starting {
p.Process.Signal(syscall.SIGTERM)
}
}
starting = nil
}
case p := <-chan_process:
if p == starting {
log.Error().Int("worker_pid", p.Process.Pid).Int("exit_code", p.FinalState.ExitCode()).Msg("New worker process exited unexpectedly")
starting = nil
} else {
log.Info().Int("worker_pid", p.Process.Pid).Int("exit_code", p.FinalState.ExitCode()).Msg("Worker process exited")
}
delete(processes, p)
if len(processes) == 0 {
log.Info().Msg("All worker processes have ended")
break process_manager_loop
}
}
}
}