-
-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathstage.go
More file actions
78 lines (62 loc) · 2.01 KB
/
stage.go
File metadata and controls
78 lines (62 loc) · 2.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package pipeline
import (
"context"
"github.com/caffix/queue"
)
// StageRegistry is a map of stage labels to data queues.
type StageRegistry map[string]queue.Queue
// StageParams provides the information needed for executing a pipeline
// Stage. The Pipeline passes a StageParams instance to the Run method
// of each stage.
type StageParams interface {
// Pipeline returns the pipeline executing this stage.
Pipeline() *Pipeline
// Position returns the position of this stage in the pipeline.
Position() int
// Input returns the input channel for this stage.
Input() <-chan Data
// Output returns the output channel for this stage.
Output() chan<- Data
// DataQueue returns the alternative data queue for this stage.
DataQueue() queue.Queue
// Error returns the queue that reports errors encountered by the stage.
Error() queue.Queue
// Registry returns a map of stage names to stage input channels.
Registry() StageRegistry
}
// Stage is designed to be executed in sequential order to
// form a multi-stage data pipeline.
type Stage interface {
// ID returns the optional identifier assigned to this stage.
ID() string
// Run executes the processing logic for this stage by reading
// data from the input channel, processing the data and sending
// the results to the output channel. Run blocks until the stage
// input channel is closed, the context expires, or an error occurs.
Run(context.Context, StageParams)
}
type execTask func(context.Context, Data, StageParams) (Data, error)
func processStageData(ctx context.Context, sp StageParams, task execTask) bool {
fromQueue := func() {
if d, ok := sp.DataQueue().Next(); ok {
if data, valid := d.(Data); valid {
_, _ = task(ctx, data, sp)
}
}
}
// Process items from the input channel and data queue
select {
case in, open := <-sp.Input():
if open {
_, _ = task(ctx, in, sp)
return true
} else if sp.DataQueue().Len() > 0 {
fromQueue()
return true
}
case <-sp.DataQueue().Signal():
fromQueue()
return true
}
return false
}