-
Notifications
You must be signed in to change notification settings - Fork 1
/
queue_buffer_test.go
67 lines (62 loc) · 1.24 KB
/
queue_buffer_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package flyline
import (
"context"
"runtime"
"sync"
"testing"
"time"
)
func TestNewQueueBuffer(t *testing.T) {
buf := NewQueueBuffer()
t.Logf("new buffer: %v", buf)
sendErr := buf.Send(time.Now())
if sendErr != nil {
t.Errorf("send failed, %v", sendErr)
t.FailNow()
}
t.Logf("send ok, len = %v", buf.Len())
v, ok := buf.Recv()
recvTime := time.Time{}
ValueScan(v, &recvTime)
t.Logf("recv: [%v:%v] %v", ok, buf.Len(), recvTime)
buf.Close()
buf.Sync(context.Background())
}
func TestQueueBuffer_Sample(t *testing.T) {
runtime.GOMAXPROCS(8)
buf := NewQueueBuffer()
t.Logf("new buffer: %v", buf)
wg := new(sync.WaitGroup)
wg.Add(1)
// send
go func(buf Buffer, wg *sync.WaitGroup) {
for i := 0; i < 10; i++ {
sendErr := buf.Send(time.Now())
if sendErr != nil {
t.Errorf("send failed, %v", sendErr)
t.FailNow()
}
t.Logf("send ok, len = %v", buf.Len())
}
buf.Close()
t.Logf("buf closed.")
buf.Sync(context.Background())
t.Logf("buf sync.")
wg.Done()
}(buf, wg)
// recv
go func(buf Buffer) {
for {
v, ok := buf.Recv()
if !ok {
t.Logf("recve: %v", ok)
break
}
tt := time.Time{}
ValueScan(v, &tt)
t.Logf("recv: %v, %v", ok, tt)
}
}(buf)
wg.Wait()
time.Sleep(1 * time.Second)
}