Skip to content

Commit bb32fc7

Browse files
authored
feat: adding lo.BufferWithContext (#580)
* feat: adding lo.BufferWithContext * Add BufferWithContext to README table of contents
1 parent 677bfd1 commit bb32fc7

File tree

3 files changed

+77
-9
lines changed

3 files changed

+77
-9
lines changed

README.md

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,7 @@ Supported helpers for channels:
191191
- [SliceToChannel](#slicetochannel)
192192
- [Generator](#generator)
193193
- [Buffer](#buffer)
194+
- [BufferWithContext](#bufferwithcontext)
194195
- [BufferWithTimeout](#bufferwithtimeout)
195196
- [FanIn](#fanin)
196197
- [FanOut](#fanout)
@@ -1928,6 +1929,32 @@ for {
19281929
}
19291930
```
19301931

1932+
### BufferWithContext
1933+
1934+
Creates a slice of n elements from a channel, with timeout. Returns the slice, the slice length, the read time and the channel status (opened/closed).
1935+
1936+
```go
1937+
ctx, cancel := context.WithCancel(context.TODO())
1938+
go func() {
1939+
ch <- 0
1940+
time.Sleep(10*time.Millisecond)
1941+
ch <- 1
1942+
time.Sleep(10*time.Millisecond)
1943+
ch <- 2
1944+
time.Sleep(10*time.Millisecond)
1945+
ch <- 3
1946+
time.Sleep(10*time.Millisecond)
1947+
ch <- 4
1948+
time.Sleep(10*time.Millisecond)
1949+
cancel()
1950+
}()
1951+
1952+
items1, length1, duration1, ok1 := lo.BufferWithContext(ctx, ch, 3)
1953+
// []int{0, 1, 2}, 3, 20ms, true
1954+
items2, length2, duration2, ok2 := lo.BufferWithContext(ctx, ch, 3)
1955+
// []int{3, 4}, 2, 30ms, false
1956+
```
1957+
19311958
### BufferWithTimeout
19321959

19331960
Creates a slice of n elements from a channel, with timeout. Returns the slice, the slice length, the read time and the channel status (opened/closed).

channel.go

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package lo
22

33
import (
4+
"context"
45
"sync"
56
"time"
67

@@ -220,17 +221,13 @@ func Batch[T any](ch <-chan T, size int) (collection []T, length int, readTime t
220221
return Buffer(ch, size)
221222
}
222223

223-
// BufferWithTimeout creates a slice of n elements from a channel, with timeout. Returns the slice and the slice length.
224+
// BufferWithContext creates a slice of n elements from a channel, with context. Returns the slice and the slice length.
224225
// @TODO: we should probably provide an helper that reuse the same buffer.
225-
func BufferWithTimeout[T any](ch <-chan T, size int, timeout time.Duration) (collection []T, length int, readTime time.Duration, ok bool) {
226-
expire := time.NewTimer(timeout)
227-
defer expire.Stop()
228-
226+
func BufferWithContext[T any](ctx context.Context, ch <-chan T, size int) (collection []T, length int, readTime time.Duration, ok bool) {
229227
buffer := make([]T, 0, size)
230-
index := 0
231228
now := time.Now()
232229

233-
for ; index < size; index++ {
230+
for index := 0; index < size; index++ {
234231
select {
235232
case item, ok := <-ch:
236233
if !ok {
@@ -239,12 +236,19 @@ func BufferWithTimeout[T any](ch <-chan T, size int, timeout time.Duration) (col
239236

240237
buffer = append(buffer, item)
241238

242-
case <-expire.C:
239+
case <-ctx.Done():
243240
return buffer, index, time.Since(now), true
244241
}
245242
}
246243

247-
return buffer, index, time.Since(now), true
244+
return buffer, size, time.Since(now), true
245+
}
246+
247+
// BufferWithTimeout creates a slice of n elements from a channel, with timeout. Returns the slice and the slice length.
248+
func BufferWithTimeout[T any](ch <-chan T, size int, timeout time.Duration) (collection []T, length int, readTime time.Duration, ok bool) {
249+
ctx, cancel := context.WithTimeout(context.Background(), timeout)
250+
defer cancel()
251+
return BufferWithContext(ctx, ch, size)
248252
}
249253

250254
// BatchWithTimeout creates a slice of n elements from a channel, with timeout. Returns the slice and the slice length.

channel_test.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package lo
22

33
import (
4+
"context"
45
"math/rand"
56
"testing"
67
"time"
@@ -279,6 +280,42 @@ func TestBuffer(t *testing.T) {
279280
is.False(ok3)
280281
}
281282

283+
func TestBufferWithContext(t *testing.T) {
284+
t.Parallel()
285+
testWithTimeout(t, 200*time.Millisecond)
286+
is := assert.New(t)
287+
288+
ch1 := make(chan int, 10)
289+
ctx, cancel := context.WithCancel(context.Background())
290+
go func() {
291+
ch1 <- 0
292+
ch1 <- 1
293+
ch1 <- 2
294+
time.Sleep(5 * time.Millisecond)
295+
cancel()
296+
ch1 <- 3
297+
ch1 <- 4
298+
ch1 <- 5
299+
close(ch1)
300+
}()
301+
items1, length1, _, ok1 := BufferWithContext(ctx, ch1, 20)
302+
is.Equal([]int{0, 1, 2}, items1)
303+
is.Equal(3, length1)
304+
is.True(ok1)
305+
306+
ch2 := make(chan int, 10)
307+
ctx, cancel = context.WithCancel(context.Background())
308+
defer cancel()
309+
defer close(ch2)
310+
for i := 0; i < 10; i++ {
311+
ch2 <- i
312+
}
313+
items2, length2, _, ok2 := BufferWithContext(ctx, ch2, 5)
314+
is.Equal([]int{0, 1, 2, 3, 4}, items2)
315+
is.Equal(5, length2)
316+
is.True(ok2)
317+
}
318+
282319
func TestBufferWithTimeout(t *testing.T) {
283320
t.Parallel()
284321
testWithTimeout(t, 200*time.Millisecond)

0 commit comments

Comments
 (0)