-
-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathpipeline_test.go
More file actions
107 lines (90 loc) · 1.91 KB
/
pipeline_test.go
File metadata and controls
107 lines (90 loc) · 1.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package pipeline
import (
"context"
"errors"
"fmt"
"reflect"
"regexp"
"testing"
)
func TestDataFlow(t *testing.T) {
stages := make([]Stage, 10)
for i := 0; i < len(stages); i++ {
stages[i] = testStage{t: t}
}
src := &sourceStub{data: stringDataValues(3)}
sink := new(sinkStub)
p := NewPipeline(stages...)
if err := p.Execute(context.TODO(), src, sink); err != nil {
t.Errorf("Error executing the Pipeline: %v", err)
}
if !reflect.DeepEqual(sink.data, src.data) {
t.Errorf("Data does not match.\nWanted:%v\nGot:%v\n", src.data, sink.data)
}
}
func TestTaskErrorHandling(t *testing.T) {
err := errors.New("task error")
stages := make([]Stage, 10)
for i := 0; i < len(stages); i++ {
var sErr error
if i == 5 {
sErr = err
}
stages[i] = testStage{
t: t,
err: sErr,
}
}
src := &sourceStub{data: stringDataValues(3)}
sink := new(sinkStub)
p := NewPipeline(stages...)
re := regexp.MustCompile("(?s).*task error.*")
if err := p.Execute(context.TODO(), src, sink); err == nil || !re.MatchString(err.Error()) {
t.Errorf("Error did not match the expectation: %v", err)
}
}
type testStage struct {
id string
t *testing.T
dropData bool
err error
}
func (s testStage) ID() string {
return s.id
}
func (s testStage) Run(ctx context.Context, sp StageParams) {
for {
select {
case <-ctx.Done():
return
case d, ok := <-sp.Input():
if !ok {
return
}
if s.err != nil {
sp.Error().Append(s.err)
return
}
if s.dropData {
continue
}
select {
case <-ctx.Done():
return
case sp.Output() <- d:
}
}
}
}
type stringData struct {
val string
}
func (s *stringData) Clone() Data { return &stringData{val: s.val} }
func (s *stringData) String() string { return s.val }
func stringDataValues(num int) []Data {
out := make([]Data, num)
for i := 0; i < len(out); i++ {
out[i] = &stringData{val: fmt.Sprint(i)}
}
return out
}