Skip to content

Commit 160b4d8

Browse files
authored
abstract pipe api via F[A, B] (#68)
* abstract pipe api via F[A, B] The library defines a Go channel morphism F[A, B] with support of different error handling strategies Pure, Lift, Try and also FMap equivalents * update ci/cd to go1.24 * eliminate false positive with static check
1 parent 5614e10 commit 160b4d8

File tree

15 files changed

+628
-192
lines changed

15 files changed

+628
-192
lines changed

.github/workflows/build.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ jobs:
1919
steps:
2020
- uses: actions/setup-go@v5
2121
with:
22-
go-version: "1.22"
22+
go-version: "1.24"
2323

2424
- uses: actions/checkout@v4
2525

.github/workflows/check.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ jobs:
1919
steps:
2020
- uses: actions/setup-go@v5
2121
with:
22-
go-version: "1.22"
22+
go-version: "1.24"
2323

2424
- uses: actions/checkout@v4
2525

pipe/README.md

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Type Safe Channels (`pipe`)
22

3-
Go's concurrency features simplify the creation of streaming data pipelines that effectively utilize I/O and multiple CPUs. [Go Concurrency Patterns: Pipelines and cancellation](https://go.dev/blog/pipelines) explains the topic in-depth. Despite the simplicity, the boilerplate code is required to build channels and establish "chrome" for its management. This module offers consistent operation over channels to form a processing pipelines in a clean and effective manner.
3+
Go's concurrency features simplify the creation of streaming data pipelines that effectively utilize I/O and multiple CPUs. [Go Concurrency Patterns: Pipelines and cancellation](https://go.dev/blog/pipelines) explains the topic in-depth. Despite the simplicity, the boilerplate code is required to build channels and establish "chrome" for its management. This module offers consistent operation over channels to form a processing pipelines in a clean and type safe manner.
44

55
The module support sequential `pipe` and parallel `fork` type safe channels for building pipelines. The module consider channels as "sequential data structure" trying to derive semantic from [stream interface](http://srfi.schemers.org/srfi-41/srfi-41.html)
66

@@ -19,27 +19,32 @@ func main() {
1919

2020
// Generate sequence of integers
2121
ints := pipe.StdErr(pipe.Unfold(ctx, cap, 0,
22-
func(x int) (int, error) { return x + 1, nil },
22+
pipe.Pure(func(x int) int { return x + 1 }),
2323
))
2424

2525
// Limit sequence of integers
2626
ints10 := pipe.TakeWhile(ctx, ints,
27-
func(x int) bool { return x <= 10 },
27+
pipe.Pure(func(x int) bool { return x <= 10 }),
2828
)
2929

3030
// Calculate squares
3131
sqrt := pipe.StdErr(pipe.Map(ctx, ints10,
32-
func(x int) (int, error) { return x * x, nil },
32+
pipe.Pure(func(x int) int { return x * x }),
3333
))
3434

3535
// Numbers to string
3636
vals := pipe.StdErr(pipe.Map(ctx, sqrt,
37-
func(x int) (string, error) { return strconv.Itoa(x), nil },
37+
pipe.Pure(strconv.Itoa),
3838
))
3939

4040
// Output strings
4141
<-pipe.ForEach(ctx, vals,
42-
func(x string) { fmt.Printf("==> %s\n", x) },
42+
pipe.Pure(
43+
func(x string) string {
44+
fmt.Printf("==> %s\n", x)
45+
return x
46+
},
47+
),
4348
)
4449

4550
close()

pipe/examples/files/main.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import (
1919
"runtime"
2020
"strings"
2121

22-
"github.com/fogfish/golem/pipe/fork"
22+
"github.com/fogfish/golem/pipe/v2/fork"
2323
)
2424

2525
var (
@@ -33,10 +33,15 @@ func main() {
3333

3434
// Parallel SHA1 digest
3535
seq, errf := walk(ctx)
36-
sha, errh := fork.Map(ctx, threads, seq, digest)
37-
str, errs := fork.Map(ctx, threads, sha, stringify)
36+
sha, errh := fork.Map(ctx, threads, seq, fork.Lift(digest))
37+
str, errs := fork.Map(ctx, threads, sha, fork.Lift(stringify))
3838
<-fork.ForEach(ctx, threads, str,
39-
func(x string) { fmt.Printf("==> %s\n", x) },
39+
fork.Pure(
40+
func(x string) string {
41+
fmt.Printf("==> %s\n", x)
42+
return x
43+
},
44+
),
4045
)
4146

4247
if err := <-fork.Join(ctx, errf, errh, errs); err != nil {

pipe/examples/numbers/main.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import (
1313
"fmt"
1414
"strconv"
1515

16-
"github.com/fogfish/golem/pipe"
16+
"github.com/fogfish/golem/pipe/v2"
1717
)
1818

1919
const (
@@ -25,27 +25,32 @@ func main() {
2525

2626
// Generate sequence of integers
2727
ints := pipe.StdErr(pipe.Unfold(ctx, cap, 0,
28-
func(x int) (int, error) { return x + 1, nil },
28+
pipe.Pure(func(x int) int { return x + 1 }),
2929
))
3030

3131
// Limit sequence of integers
3232
ints10 := pipe.TakeWhile(ctx, ints,
33-
func(x int) bool { return x <= 10 },
33+
pipe.Pure(func(x int) bool { return x <= 10 }),
3434
)
3535

3636
// Calculate squares
3737
sqrt := pipe.StdErr(pipe.Map(ctx, ints10,
38-
func(x int) (int, error) { return x * x, nil },
38+
pipe.Pure(func(x int) int { return x * x }),
3939
))
4040

4141
// Numbers to string
4242
vals := pipe.StdErr(pipe.Map(ctx, sqrt,
43-
func(x int) (string, error) { return strconv.Itoa(x), nil },
43+
pipe.Pure(strconv.Itoa),
4444
))
4545

4646
// Output strings
4747
<-pipe.ForEach(ctx, vals,
48-
func(x string) { fmt.Printf("==> %s\n", x) },
48+
pipe.Pure(
49+
func(x string) string {
50+
fmt.Printf("==> %s\n", x)
51+
return x
52+
},
53+
),
4954
)
5055

5156
close()

pipe/examples/throttling/main.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414
"strconv"
1515
"time"
1616

17-
"github.com/fogfish/golem/pipe"
17+
"github.com/fogfish/golem/pipe/v2"
1818
)
1919

2020
const (
@@ -27,22 +27,25 @@ func main() {
2727

2828
// Generate sequence of integers
2929
fast := pipe.StdErr(pipe.Unfold(ctx, fastProducer, 0,
30-
func(x int) (int, error) { return x + 1, nil },
30+
pipe.Pure(func(x int) int { return x + 1 }),
3131
))
3232

3333
// Throttle the "fast" pipe
3434
slow := pipe.Throttling(ctx, fast, 1, 100*time.Millisecond)
3535

3636
// Numbers to string
3737
vals := pipe.StdErr(pipe.Map(ctx, slow,
38-
func(x int) (string, error) { return strconv.Itoa(x), nil },
38+
pipe.Pure(strconv.Itoa),
3939
))
4040

4141
// Output strings
4242
<-pipe.ForEach(ctx, vals,
43-
func(x string) {
44-
fmt.Printf("==> %s | %s\n", time.Now().Format(time.StampMilli), x)
45-
},
43+
pipe.Pure(
44+
func(x string) string {
45+
fmt.Printf("==> %s | %s\n", time.Now().Format(time.StampMilli), x)
46+
return x
47+
},
48+
),
4649
)
4750

4851
close()

pipe/fork/fork.go

Lines changed: 27 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,18 @@ import (
1515
"sync"
1616
"time"
1717

18-
"github.com/fogfish/golem/pipe"
18+
"github.com/fogfish/golem/pipe/v2"
1919
"github.com/fogfish/golem/pure/monoid"
2020
)
2121

2222
// Emit creates a channel and takes a function that emits data at a specified frequency.
23-
func Emit[T any](ctx context.Context, cap int, frequency time.Duration, emit func() (T, error)) (<-chan T, <-chan error) {
24-
return pipe.Emit(ctx, cap, frequency, emit)
23+
func Emit[T any](ctx context.Context, cap int, frequency time.Duration, emit F[int, T]) (<-chan T, <-chan error) {
24+
return pipe.Emit(ctx, cap, frequency, emit.pipef())
2525
}
2626

2727
// Filter returns a newly-allocated channel that contains only those elements x
2828
// of the input channel for which predicate is true.
29-
func Filter[A any](ctx context.Context, par int, in <-chan A, f func(A) bool) <-chan A {
29+
func Filter[A any](ctx context.Context, par int, in <-chan A, f F[A, bool]) <-chan A {
3030
var wg sync.WaitGroup
3131
out := make(chan A, par)
3232

@@ -35,7 +35,7 @@ func Filter[A any](ctx context.Context, par int, in <-chan A, f func(A) bool) <-
3535

3636
var a A
3737
for a = range in {
38-
if f(a) {
38+
if take, err := f.apply(a); take && err == nil {
3939
select {
4040
case out <- a:
4141
case <-ctx.Done():
@@ -59,7 +59,7 @@ func Filter[A any](ctx context.Context, par int, in <-chan A, f func(A) bool) <-
5959
}
6060

6161
// ForEach applies function for each message in the channel
62-
func ForEach[A any](ctx context.Context, par int, in <-chan A, f func(A)) <-chan struct{} {
62+
func ForEach[A any](ctx context.Context, par int, in <-chan A, f F[A, A]) <-chan struct{} {
6363
var wg sync.WaitGroup
6464
done := make(chan struct{})
6565

@@ -68,7 +68,7 @@ func ForEach[A any](ctx context.Context, par int, in <-chan A, f func(A)) <-chan
6868

6969
var a A
7070
for a = range in {
71-
f(a)
71+
f.apply(a)
7272
select {
7373
case <-ctx.Done():
7474
return
@@ -92,7 +92,7 @@ func ForEach[A any](ctx context.Context, par int, in <-chan A, f func(A)) <-chan
9292

9393
// FMap applies function over channel messages, flatten the output channel and
9494
// emits it result to new channel.
95-
func FMap[A, B any](ctx context.Context, par int, in <-chan A, fmap func(context.Context, A, chan<- B) error) (<-chan B, <-chan error) {
95+
func FMap[A, B any](ctx context.Context, par int, in <-chan A, fmap FF[A, B]) (<-chan B, <-chan error) {
9696
var wg sync.WaitGroup
9797
out := make(chan B, par)
9898
exx := make(chan error, par)
@@ -102,9 +102,11 @@ func FMap[A, B any](ctx context.Context, par int, in <-chan A, fmap func(context
102102

103103
var a A
104104
for a = range in {
105-
if err := fmap(ctx, a, out); err != nil {
106-
exx <- err
107-
return
105+
if err := fmap.apply(ctx, a, out); err != nil {
106+
if !fmap.catch(ctx, err, exx) {
107+
return
108+
}
109+
continue
108110
}
109111

110112
select {
@@ -176,7 +178,7 @@ func Fold[A any](ctx context.Context, par int, in <-chan A, m monoid.Monoid[A])
176178
}
177179

178180
// Map applies function over channel messages, emits result to new channel
179-
func Map[A, B any](ctx context.Context, par int, in <-chan A, fmap func(A) (B, error)) (<-chan B, <-chan error) {
181+
func Map[A, B any](ctx context.Context, par int, in <-chan A, f F[A, B]) (<-chan B, <-chan error) {
180182
var wg sync.WaitGroup
181183
out := make(chan B, par)
182184
exx := make(chan error, par)
@@ -191,10 +193,12 @@ func Map[A, B any](ctx context.Context, par int, in <-chan A, fmap func(A) (B, e
191193
)
192194

193195
for a = range in {
194-
val, err = fmap(a)
196+
val, err = f.apply(a)
195197
if err != nil {
196-
exx <- err
197-
return
198+
if !f.catch(ctx, err, exx) {
199+
return
200+
}
201+
continue
198202
}
199203

200204
select {
@@ -220,16 +224,16 @@ func Map[A, B any](ctx context.Context, par int, in <-chan A, fmap func(A) (B, e
220224
}
221225

222226
// Partition channel into two channels according to predicate
223-
func Partition[A any](ctx context.Context, par int, in <-chan A, f func(A) bool) (<-chan A, <-chan A) {
227+
func Partition[A any](ctx context.Context, par int, in <-chan A, f F[A, bool]) (<-chan A, <-chan A) {
224228
var wg sync.WaitGroup
225229
lout := make(chan A, par)
226230
rout := make(chan A, par)
227231

228232
pf := func() {
229233
defer wg.Done()
230234

231-
sel := func(x bool) chan<- A {
232-
if x {
235+
sel := func(x bool, err error) chan<- A {
236+
if x && err == nil {
233237
return lout
234238
}
235239
return rout
@@ -238,7 +242,7 @@ func Partition[A any](ctx context.Context, par int, in <-chan A, f func(A) bool)
238242
var a A
239243
for a = range in {
240244
select {
241-
case sel(f(a)) <- a:
245+
case sel(f.apply(a)) <- a:
242246
case <-ctx.Done():
243247
return
244248
}
@@ -261,8 +265,8 @@ func Partition[A any](ctx context.Context, par int, in <-chan A, f func(A) bool)
261265

262266
// Unfold is the fundamental recursive constructor, it applies a function to
263267
// each previous seed element in turn to determine the next element.
264-
func Unfold[A any](ctx context.Context, cap int, seed A, f func(A) (A, error)) (<-chan A, <-chan error) {
265-
return pipe.Unfold(ctx, cap, seed, f)
268+
func Unfold[A any](ctx context.Context, cap int, seed A, f F[A, A]) (<-chan A, <-chan error) {
269+
return pipe.Unfold(ctx, cap, seed, f.pipef())
266270
}
267271

268272
// Join concatenate channels, returns newly-allocated channel composed of
@@ -278,8 +282,8 @@ func Take[A any](ctx context.Context, in <-chan A, n int) <-chan A {
278282

279283
// Filter returns a newly-allocated channel that contains only those elements x
280284
// of the input channel for which predicate is true.
281-
func TakeWhile[A any](ctx context.Context, in <-chan A, f func(A) bool) <-chan A {
282-
return pipe.TakeWhile(ctx, in, f)
285+
func TakeWhile[A any](ctx context.Context, in <-chan A, f F[A, bool]) <-chan A {
286+
return pipe.TakeWhile(ctx, in, f.pipef())
283287
}
284288

285289
// Throttling the channel to ops per time interval

0 commit comments

Comments
 (0)