-
-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathpool.go
More file actions
124 lines (106 loc) · 2.44 KB
/
pool.go
File metadata and controls
124 lines (106 loc) · 2.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package pipeline
import (
"context"
"fmt"
"sync"
)
type fixedPool struct {
id string
fifos []Stage
}
// FixedPool returns a Stage that spins up a pool containing numWorkers
// to process incoming data in parallel and emit their outputs to the next stage.
func FixedPool(id string, task Task, num int) Stage {
if num <= 0 {
return nil
}
fifos := make([]Stage, num)
for i := 0; i < num; i++ {
fifos[i] = FIFO("", task)
}
return &fixedPool{
id: id,
fifos: fifos,
}
}
// ID implements Stage.
func (p *fixedPool) ID() string {
return p.id
}
// Run implements Stage.
func (p *fixedPool) Run(ctx context.Context, params StageParams) {
var wg sync.WaitGroup
// Spin up each task in the pool and wait for them to exit
for i := 0; i < len(p.fifos); i++ {
wg.Add(1)
go func(idx int) {
p.fifos[idx].Run(ctx, params)
wg.Done()
}(i)
}
wg.Wait()
}
type dynamicPool struct {
id string
task Task
tokenPool chan struct{}
}
// DynamicPool returns a Stage that maintains a dynamic pool that can scale
// up to max parallel tasks for processing incoming inputs in parallel and
// emitting their outputs to the next stage.
func DynamicPool(id string, task Task, max int) Stage {
if max <= 0 {
return nil
}
tp := make(chan struct{}, max)
for i := 0; i < cap(tp); i++ {
tp <- struct{}{}
}
return &dynamicPool{
id: id,
task: task,
tokenPool: tp,
}
}
// ID implements Stage.
func (p *dynamicPool) ID() string {
return p.id
}
// Run implements Stage.
func (p *dynamicPool) Run(ctx context.Context, sp StageParams) {
for processStageData(ctx, sp, p.executeTask) {
}
// Wait for all workers to exit by trying to empty the token pool
for i := 0; i < cap(p.tokenPool); i++ {
<-p.tokenPool
}
}
func (p *dynamicPool) executeTask(ctx context.Context, data Data, sp StageParams) (Data, error) {
select {
case <-ctx.Done():
return nil, nil
case <-p.tokenPool:
}
go func(dataIn Data) {
defer func() { p.tokenPool <- struct{}{} }()
dataOut, err := p.task.Process(ctx, dataIn, &taskParams{
pipeline: sp.Pipeline(),
registry: sp.Registry(),
})
if err != nil {
sp.Error().Append(fmt.Errorf("pipeline stage %d: %v", sp.Position(), err))
return
}
// If the task did not output data for the
// next stage there is nothing we need to do.
if dataOut == nil {
return
}
// Output processed data
select {
case <-ctx.Done():
case sp.Output() <- dataOut:
}
}(data)
return data, nil
}