Skip to content

Commit 25aadde

Browse files
authored
throttling pipe with ops per time interval (rate limit) (#55)
* throttling pipe with ops per time interval (rate limit) * update static check GitHub action to v1.3.1
1 parent 8db50f9 commit 25aadde

File tree

7 files changed

+146
-2
lines changed

7 files changed

+146
-2
lines changed

.github/workflows/check.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ jobs:
4141
flag-name: ${{ matrix.module }}
4242
parallel: true
4343

44-
- uses: dominikh/[email protected].0
44+
- uses: dominikh/[email protected].1
4545
with:
4646
install-go: false
4747
working-directory: ${{ matrix.module }}

pipe/examples/throttling/main.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
//
2+
// Copyright (C) 2022 - 2024 Dmitry Kolesnikov
3+
//
4+
// This file may be modified and distributed under the terms
5+
// of the MIT license. See the LICENSE file for details.
6+
// https://github.com/fogfish/golem
7+
//
8+
9+
package main
10+
11+
import (
12+
"context"
13+
"fmt"
14+
"strconv"
15+
"time"
16+
17+
"github.com/fogfish/golem/pipe"
18+
)
19+
20+
const (
21+
fastProducer = 10000
22+
cap = 1
23+
)
24+
25+
func main() {
26+
ctx, close := context.WithCancel(context.Background())
27+
28+
// Generate sequence of integers
29+
fast := pipe.StdErr(pipe.Unfold(ctx, fastProducer, 0,
30+
func(x int) (int, error) { return x + 1, nil },
31+
))
32+
33+
// Throttle the "fast" pipe
34+
slow := pipe.Throttling(ctx, fast, 1, 100*time.Millisecond)
35+
36+
// Numbers to string
37+
vals := pipe.StdErr(pipe.Map(ctx, slow,
38+
func(x int) (string, error) { return strconv.Itoa(x), nil },
39+
))
40+
41+
// Output strings
42+
<-pipe.ForEach(ctx, vals,
43+
func(x string) {
44+
fmt.Printf("==> %s | %s\n", time.Now().Format(time.StampMilli), x)
45+
},
46+
)
47+
48+
close()
49+
}

pipe/fork/fork.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,11 @@ func TakeWhile[A any](ctx context.Context, in <-chan A, f func(A) bool) <-chan A
282282
return pipe.TakeWhile(ctx, in, f)
283283
}
284284

285+
// Throttling the channel to ops per time interval
286+
func Throttling[A any](ctx context.Context, in <-chan A, ops int, interval time.Duration) <-chan A {
287+
return pipe.Throttling(ctx, in, ops, interval)
288+
}
289+
285290
// Lift sequence of values into channel
286291
func Seq[T any](xs ...T) <-chan T {
287292
return pipe.Seq(xs...)

pipe/fork/fork_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,3 +246,25 @@ func TestTakeWhile(t *testing.T) {
246246

247247
close()
248248
}
249+
250+
func TestThrottling(t *testing.T) {
251+
ctx, close := context.WithCancel(context.Background())
252+
seq := fork.Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 0)
253+
slowSeq := fork.Throttling(ctx, seq, 1, 100*time.Millisecond)
254+
out := fork.StdErr(fork.Map(ctx, par, slowSeq,
255+
func(_ int) (time.Time, error) {
256+
return time.Now(), nil
257+
},
258+
))
259+
wt := fork.ToSeq(out)
260+
for i := 1; i < len(wt); i++ {
261+
diff := wt[i].Sub(wt[i-1])
262+
263+
it.Then(t).Should(
264+
it.Less(diff, 110*time.Millisecond),
265+
it.Greater(diff, 99*time.Millisecond),
266+
)
267+
}
268+
269+
close()
270+
}

pipe/pipe.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,52 @@ func TakeWhile[A any](ctx context.Context, in <-chan A, f func(A) bool) <-chan A
325325
return out
326326
}
327327

328+
// Throttling the channel to ops per time interval.
329+
func Throttling[A any](ctx context.Context, in <-chan A, ops int, interval time.Duration) <-chan A {
330+
out := make(chan A, cap(in))
331+
ctl := make(chan struct{}, ops)
332+
333+
go func() {
334+
defer close(ctl)
335+
336+
for {
337+
for i := 0; i < ops; i++ {
338+
select {
339+
case ctl <- struct{}{}:
340+
case <-ctx.Done():
341+
return
342+
}
343+
}
344+
select {
345+
case <-time.After(interval):
346+
case <-ctx.Done():
347+
return
348+
}
349+
}
350+
}()
351+
352+
go func() {
353+
defer close(out)
354+
355+
var a A
356+
for a = range in {
357+
select {
358+
case <-ctl:
359+
case <-ctx.Done():
360+
return
361+
}
362+
363+
select {
364+
case out <- a:
365+
case <-ctx.Done():
366+
return
367+
}
368+
}
369+
}()
370+
371+
return out
372+
}
373+
328374
// Lift sequence of values into channel
329375
func Seq[T any](xs ...T) <-chan T {
330376
out := make(chan T, len(xs))

pipe/pipe_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,28 @@ func TestTakeWhile(t *testing.T) {
268268
close()
269269
}
270270

271+
func TestThrottling(t *testing.T) {
272+
ctx, close := context.WithCancel(context.Background())
273+
seq := pipe.Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 0)
274+
slowSeq := pipe.Throttling(ctx, seq, 1, 100*time.Millisecond)
275+
out := pipe.StdErr(pipe.Map(ctx, slowSeq,
276+
func(_ int) (time.Time, error) {
277+
return time.Now(), nil
278+
},
279+
))
280+
wt := pipe.ToSeq(out)
281+
for i := 1; i < len(wt); i++ {
282+
diff := wt[i].Sub(wt[i-1])
283+
284+
it.Then(t).Should(
285+
it.Less(diff, 110*time.Millisecond),
286+
it.Greater(diff, 99*time.Millisecond),
287+
)
288+
}
289+
290+
close()
291+
}
292+
271293
func BenchmarkPipe(b *testing.B) {
272294
ctx, close := context.WithCancel(context.Background())
273295
in, eg := pipe.New[int](ctx, 0)

pipe/version.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,4 @@
88

99
package pipe
1010

11-
const Version = "pipe/v1.1.1"
11+
const Version = "pipe/v1.2.0"

0 commit comments

Comments
 (0)