File tree Expand file tree Collapse file tree 1 file changed +34
-0
lines changed Expand file tree Collapse file tree 1 file changed +34
-0
lines changed Original file line number Diff line number Diff line change 11package kgo
22
33import (
4+ "context"
5+ "fmt"
6+ "sync/atomic"
47 "testing"
8+ "time"
59)
610
11+ func TestPromises (t * testing.T ) {
12+ ctx := context .Background ()
13+ cl , err := NewClient ()
14+ if err != nil {
15+ panic (err )
16+ }
17+
18+ var promised atomic.Int64
19+ var finished atomic.Int64
20+
21+ p := & cl .producer
22+ for range 8 {
23+ go func () {
24+ for {
25+ promise := func (* Record , error ) {
26+ finished .Add (1 )
27+ }
28+ r := new (Record )
29+ p .promiseRecordBeforeBuf (promisedRec {ctx , promise , r }, ErrMaxBuffered )
30+ promised .Add (1 )
31+ }
32+ }()
33+ }
34+
35+ for {
36+ fmt .Println ("promised" , promised .Load (), "finished" , finished .Load ())
37+ time .Sleep (time .Second )
38+ }
39+ }
40+
741func TestRing (t * testing.T ) {
842 t .Run ("push multiple elements and then drop them" , func (t * testing.T ) {
943 r := & ring [int ]{}
You can’t perform that action at this time.
0 commit comments