Skip to content

Commit a5cc3b3

Browse files
authored
pipe function to consume stream to void (#71)
1 parent f1eca00 commit a5cc3b3

File tree

5 files changed

+66
-1
lines changed

5 files changed

+66
-1
lines changed

pipe/fork/fork.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,36 @@ func ForEach[A any](ctx context.Context, par int, in <-chan A, f F[A, A]) <-chan
9090
return done
9191
}
9292

93+
// Void applies nothing for each message in the channel, making channel empty
94+
func Void[A any](ctx context.Context, par int, in <-chan A) <-chan struct{} {
95+
var wg sync.WaitGroup
96+
done := make(chan struct{})
97+
98+
fmap := func() {
99+
defer wg.Done()
100+
101+
for range in {
102+
select {
103+
case <-ctx.Done():
104+
return
105+
default:
106+
}
107+
}
108+
}
109+
110+
wg.Add(par)
111+
for i := 1; i <= par; i++ {
112+
go fmap()
113+
}
114+
115+
go func() {
116+
wg.Wait()
117+
close(done)
118+
}()
119+
120+
return done
121+
}
122+
93123
// FMap applies function over channel messages, flatten the output channel and
94124
// emits it result to new channel.
95125
func FMap[A, B any](ctx context.Context, par int, in <-chan A, fmap FF[A, B]) (<-chan B, <-chan error) {

pipe/fork/fork_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,14 @@ func TestForEach(t *testing.T) {
101101
close()
102102
}
103103

104+
func TestVoid(t *testing.T) {
105+
ctx, close := context.WithCancel(context.Background())
106+
seq := fork.Seq(1, 2, 3, 4, 5)
107+
<-fork.Void(ctx, par, seq)
108+
109+
close()
110+
}
111+
104112
func TestFMap(t *testing.T) {
105113
t.Run("FMap", func(t *testing.T) {
106114
fun := fork.LiftF(

pipe/pipe.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,25 @@ func ForEach[A any](ctx context.Context, in <-chan A, f F[A, A]) <-chan struct{}
9999
return done
100100
}
101101

102+
// Void applies nothing for each message in the channel, making channel empty
103+
func Void[A any](ctx context.Context, in <-chan A) <-chan struct{} {
104+
done := make(chan struct{})
105+
106+
go func() {
107+
defer close(done)
108+
109+
for range in {
110+
select {
111+
case <-ctx.Done():
112+
return
113+
default:
114+
}
115+
}
116+
}()
117+
118+
return done
119+
}
120+
102121
// FMap applies function over channel messages, flatten the output channel and
103122
// emits it result to new channel.
104123
func FMap[A, B any](ctx context.Context, in <-chan A, fmap FF[A, B]) (<-chan B, <-chan error) {

pipe/pipe_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,14 @@ func TestForEach(t *testing.T) {
108108
close()
109109
}
110110

111+
func TestVoid(t *testing.T) {
112+
ctx, close := context.WithCancel(context.Background())
113+
seq := pipe.Seq(1, 2, 3, 4, 5)
114+
<-pipe.Void(ctx, seq)
115+
116+
close()
117+
}
118+
111119
func TestFMap(t *testing.T) {
112120
t.Run("FMap", func(t *testing.T) {
113121
fun := pipe.LiftF(

pipe/version.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,4 @@
88

99
package pipe
1010

11-
const Version = "pipe/v2.0.1"
11+
const Version = "pipe/v2.1.0"

0 commit comments

Comments
 (0)