-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathpool.go
284 lines (245 loc) · 7.24 KB
/
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
/**
* Copyright © 2019 Hamed Yousefi <[email protected]>.
*/
package gowl
import (
"context"
"errors"
"fmt"
"log"
"sync"
"time"
"github.com/hamed-yousefi/gowl/status/pool"
"github.com/hamed-yousefi/gowl/status/process"
"github.com/hamed-yousefi/gowl/status/worker"
)
const (
// defaultWorkerName is the default worker name prefix.
defaultWorkerName = "W%d"
)
type (
// WorkerName is a custom type of string that represents worker's name.
WorkerName string
// PID is a custom type of string that represents process id.
PID string
// Process is an interface that represents a process
Process interface {
// Start runs the process. It returns an error object if any thing wrong
// happens in runtime.
Start(ctx context.Context) error
// Name returns process name.
Name() string
// PID returns process id.
PID() PID
}
// Pool is a mechanism to dispatch processes between a group of workers.
Pool interface {
// Start runs the pool.
Start() error
// Register adds the process to the pool queue.
Register(p ...Process)
// Close stops a running pool.
Close() error
// Kill cancels a process before it starts.
Kill(pid PID)
// Monitor returns pool monitor.
Monitor() Monitor
}
// Monitor is a mechanism for observation processes and pool stats.
Monitor interface {
// PoolStatus returns pool status
PoolStatus() pool.Status
// Error returns process's error by process id.
Error(PID) error
// WorkerList returns the list of worker names of the pool.
WorkerList() []WorkerName
// WorkerStatus returns worker status. It accepts worker name as input.
WorkerStatus(name WorkerName) worker.Status
// ProcessStats returns process stats. It accepts process id as input.
ProcessStats(pid PID) ProcessStats
}
// ProcessStats represents process statistics.
ProcessStats struct {
// WorkerName is the name of the worker that this process belongs to.
WorkerName WorkerName
// Process is process that this stats belongs to.
Process Process
// Status represents the current state of the process.
Status process.Status
// StartedAt represents the start date time of the process.
StartedAt time.Time
// FinishedAt represents the end date time of the process.
FinishedAt time.Time
err error
}
// workerPool is an implementation of Pool and Monitor interfaces.
workerPool struct {
status pool.Status
size int
queue chan Process
wg *sync.WaitGroup
processes *processStatusMap
workers []WorkerName
workersStats *workerStatsMap
controlPanel *controlPanelMap
mutex *sync.Mutex
isClosed bool
}
)
// NewPool makes a new instance of Pool. I accept an integer value as input
// that represents pool size.
func NewPool(size int) Pool {
return &workerPool{
status: pool.Created,
size: size,
queue: make(chan Process, size),
workers: []WorkerName{},
processes: new(processStatusMap),
workersStats: new(workerStatsMap),
controlPanel: new(controlPanelMap),
mutex: new(sync.Mutex),
wg: new(sync.WaitGroup),
}
}
// Start runs the pool. It returns error if pool is already in running state.
// It changes the pool state to Running and calls workerPool.run() function to
// run the pool.
func (w *workerPool) Start() error {
if w.status == pool.Running {
return errors.New("unable to start the pool, status: " + w.status.String())
}
w.status = pool.Running
w.run()
return nil
}
// run is the function that creates worker and starts the pool.
func (w *workerPool) run() {
// Create workers
for i := 0; i < w.size; i++ {
// For each worker add one to the waitGroup.
w.wg.Add(1)
wName := WorkerName(fmt.Sprintf(defaultWorkerName, i))
w.workers = append(w.workers, wName)
// Create worker.
go func(wn WorkerName) {
defer w.wg.Done()
// Consume process from the queue.
for p := range w.queue {
w.workersStats.put(wn, worker.Busy)
pStats := w.processes.get(p.PID())
pStats.Status = process.Running
pStats.StartedAt = time.Now()
pStats.WorkerName = wn
w.processes.put(p.PID(), pStats)
wgp := new(sync.WaitGroup)
wgp.Add(1)
go func() {
stats := w.processes.get(p.PID())
defer func() {
w.processes.put(p.PID(), stats)
wgp.Done()
}()
pContext := w.controlPanel.get(p.PID())
select {
case <-pContext.ctx.Done():
log.Printf("processFunc with id %s has been killed.\n", p.PID().String())
stats.Status = process.Killed
return
default:
if err := p.Start(pContext.ctx); err != nil { //nolint:typecheck
stats.err = err
stats.Status = process.Failed
if errors.Is(pContext.ctx.Err(), context.Canceled) {
stats.Status = process.Killed
}
} else {
stats.Status = process.Succeeded
}
pContext.cancel()
}
}()
wgp.Wait()
pStats = w.processes.get(p.PID())
pStats.FinishedAt = time.Now()
w.processes.put(p.PID(), pStats)
w.workersStats.put(wn, worker.Waiting)
}
}(wName)
}
}
// Register adds the process to the pool queue. It accept a list of processes
// and adds them to the queue. It publishes the process to queue in a separate
// goroutine. It means that Register function provides multi-publisher that
// each of them works asynchronously.
func (w *workerPool) Register(args ...Process) {
// Create control panel for each process and make process stat for each of them.
for _, p := range args {
ctx, cancel := context.WithCancel(context.Background())
w.controlPanel.put(p.PID(), &processContext{
ctx: ctx,
cancel: cancel,
})
w.processes.put(p.PID(), ProcessStats{
Process: p,
Status: process.Waiting,
})
}
// Publish processes to the queue.
go func(args ...Process) {
for i := range args {
w.mutex.Lock()
if w.isClosed {
break
}
w.queue <- args[i]
w.mutex.Unlock()
}
}(args...)
}
// Close stops a running pool. It returns an error if the pool is not running.
// Close waits for all workers to finish their current job and then closes the
// pool.
func (w *workerPool) Close() error {
if w.status != pool.Running {
return errors.New("pool is not running, status " + w.status.String())
}
w.mutex.Lock()
w.isClosed = true
close(w.queue)
w.mutex.Unlock()
w.wg.Wait()
w.status = pool.Closed
return nil
}
// WorkerList returns the list of worker names of the pool.
func (w *workerPool) WorkerList() []WorkerName {
return w.workers
}
// Kill cancel a process before it starts.
func (w *workerPool) Kill(pid PID) {
w.controlPanel.get(pid).cancel()
}
// Monitor returns pool monitor.
func (w *workerPool) Monitor() Monitor {
return w
}
// String returns the string value of process id.
func (p PID) String() string {
return string(p)
}
// PoolStatus returns pool status
func (w *workerPool) PoolStatus() pool.Status {
return w.status
}
// Error returns process's error by process id.
func (w *workerPool) Error(pid PID) error {
return w.processes.get(pid).err
}
// WorkerStatus returns worker status. It accepts worker name as input.
func (w *workerPool) WorkerStatus(name WorkerName) worker.Status {
return w.workersStats.get(name)
}
// ProcessStats returns process stats. It accepts process id as input.
func (w *workerPool) ProcessStats(pid PID) ProcessStats {
return w.processes.get(pid)
}