Skip to content

Commit f7f57e6

Browse files
authored
add channel version and common interface (#4)
* add channel version and common interface. refactor tests/benchmark to run both implementations * improve doc * avoid extra fragmentation/allocs * fix godoc
1 parent 73b0ef7 commit f7f57e6

File tree

3 files changed

+166
-19
lines changed

3 files changed

+166
-19
lines changed

cb/cb.go

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,45 @@
1-
// FIFO Queue with fixed capacity.
1+
// First In First Out (FIFO) [Queue] with fixed capacity.
22
// Circular Buffer implementation in Go with both
33
// pub/sub thread safe blocking API and pure FIFO queue with set capacity
4-
// unsynchronized base.
4+
// unsynchronized base. Two versions of the same [Queue] interface:
5+
// one actually a circular buffer [CircularBuffer], the other using a channel
6+
// [CircularBufferChan], created using [cb.New] or [cb.NewC] respectively.
57
package cb
68

79
import "sync"
810

11+
type Queue[T any] interface {
12+
Empty() bool
13+
Full() bool
14+
Size() int
15+
Capacity() int
16+
Push(item T) bool
17+
Pop() (value T, ok bool)
18+
PushBlocking(item T)
19+
PopBlocking() (value T)
20+
}
21+
22+
// FIFO [Queue] with fixed capacity. Fixed array implementation.
923
type CircularBuffer[T any] struct {
1024
buffer []T
1125
head int
1226
tail int
1327
size int
1428
mu sync.Mutex
15-
full *sync.Cond
16-
empty *sync.Cond
29+
full sync.Cond
30+
empty sync.Cond
1731
}
1832

33+
// New returns the fixed array version of 0 alloc fixed capacity (optionally blocking) [Queue].
1934
func New[T any](capacity int) *CircularBuffer[T] {
2035
cb := &CircularBuffer[T]{
2136
buffer: make([]T, capacity),
2237
head: 0,
2338
tail: 0,
2439
size: 0,
2540
}
26-
cb.full = sync.NewCond(&cb.mu)
27-
cb.empty = sync.NewCond(&cb.mu)
41+
cb.full.L = &cb.mu
42+
cb.empty.L = &cb.mu
2843
return cb
2944
}
3045

cb/cb_channel.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package cb
2+
3+
// FIFO [Queue] with fixed capacity.
4+
// Channel / go idiomatic version (blocking, multi thread safe),
5+
// use [CircularBufferChan] for low/no contention cases as it is faster.
6+
type CircularBufferChan[T any] struct {
7+
buffer chan T
8+
}
9+
10+
// NewC returns a channel ([CircularBufferChan]) version of 0 alloc pub/sub fixed capacity blocking queue.
11+
func NewC[T any](capacity int) *CircularBufferChan[T] {
12+
cb := &CircularBufferChan[T]{
13+
buffer: make(chan T, capacity),
14+
}
15+
return cb
16+
}
17+
18+
func (cb *CircularBufferChan[T]) Empty() bool {
19+
return len(cb.buffer) == 0
20+
}
21+
22+
func (cb *CircularBufferChan[T]) Full() bool {
23+
return len(cb.buffer) == cap(cb.buffer)
24+
}
25+
26+
func (cb *CircularBufferChan[T]) Size() int {
27+
return len(cb.buffer)
28+
}
29+
30+
func (cb *CircularBufferChan[T]) Capacity() int {
31+
return cap(cb.buffer)
32+
}
33+
34+
// Push adds an item to the queue. returns false if queue is full.
35+
// Note: might block and not return false at times. Use PushBlocking for
36+
// correct version.
37+
func (cb *CircularBufferChan[T]) Push(item T) bool {
38+
// Note: this is for equivalent with the array/conditional variable version
39+
// but isn't correct, as in Full can return false and yet the push can block
40+
// if another producer enqueued in the meanwhile.
41+
if cb.Full() {
42+
return false
43+
}
44+
cb.buffer <- item
45+
return true
46+
}
47+
48+
// Pop removes an item from the queue. returns false if queue is empty.
49+
// Note: might block and not return false at times. Use PopBlocking for
50+
// correct version.
51+
func (cb *CircularBufferChan[T]) Pop() (value T, ok bool) {
52+
if cb.Empty() {
53+
return
54+
}
55+
ok = true
56+
value = <-cb.buffer
57+
return
58+
}
59+
60+
// Thread safe blocking versions:
61+
62+
// Push adds an item to the queue. blocks if queue is full.
63+
func (cb *CircularBufferChan[T]) PushBlocking(item T) {
64+
cb.buffer <- item
65+
}
66+
67+
// Pop removes an item from the queue. blocks if queue is empty.
68+
func (cb *CircularBufferChan[T]) PopBlocking() T {
69+
return <-cb.buffer
70+
}

cb/cb_test.go

Lines changed: 75 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,24 @@ import (
77
"fortio.org/memstore/cb"
88
)
99

10+
const capacity = 5
11+
1012
func TestBoundaryConditions(t *testing.T) {
11-
const capacity = 5
12-
buffer := cb.New[int](capacity)
13+
testCases := []struct {
14+
name string
15+
cb cb.Queue[int]
16+
}{
17+
{name: "CircBuffer", cb: cb.New[int](capacity)},
18+
{name: "Channel", cb: cb.NewC[int](capacity)},
19+
}
20+
for _, tc := range testCases {
21+
t.Run(tc.name, func(tt *testing.T) {
22+
testBoundaryConditions(tt, tc.cb)
23+
})
24+
}
25+
}
1326

27+
func testBoundaryConditions(t *testing.T, buffer cb.Queue[int]) {
1428
// Test empty buffer
1529
if !buffer.Empty() {
1630
t.Error("Buffer should be empty")
@@ -91,7 +105,22 @@ func TestBoundaryConditions(t *testing.T) {
91105
}
92106

93107
func BenchmarkCircularBuffer(b *testing.B) {
94-
c := cb.New[int](100)
108+
capacity := 100
109+
testCases := []struct {
110+
name string
111+
cb cb.Queue[int]
112+
}{
113+
{name: "CircBuffer", cb: cb.New[int](capacity)},
114+
{name: "Channel", cb: cb.NewC[int](capacity)},
115+
}
116+
for _, tc := range testCases {
117+
b.Run(tc.name, func(bb *testing.B) {
118+
benchmarkCircularBuffer(bb, tc.cb)
119+
})
120+
}
121+
}
122+
123+
func benchmarkCircularBuffer(b *testing.B, c cb.Queue[int]) {
95124
var x int
96125
var ok bool
97126
for i := 0; i < b.N; i++ {
@@ -105,8 +134,20 @@ func BenchmarkCircularBuffer(b *testing.B) {
105134
}
106135

107136
func TestProducerConsumerScenario(t *testing.T) {
108-
buffer := cb.New[int](10)
109-
137+
testCases := []struct {
138+
name string
139+
cb cb.Queue[int]
140+
}{
141+
{name: "CircBuffer", cb: cb.New[int](10)},
142+
{name: "Channel", cb: cb.NewC[int](10)},
143+
}
144+
for _, tc := range testCases {
145+
t.Run(tc.name, func(tt *testing.T) {
146+
testProducerConsumerScenario(tt, tc.cb)
147+
})
148+
}
149+
}
150+
func testProducerConsumerScenario(t *testing.T, buffer cb.Queue[int]) {
110151
var wg sync.WaitGroup
111152
wg.Add(11) // 10 producers + 1 consumer
112153

@@ -143,7 +184,21 @@ func TestProducerConsumerScenario(t *testing.T) {
143184
}
144185

145186
func BenchmarkCircularBufferBlocking(b *testing.B) {
146-
c := cb.New[int](100)
187+
capacity := 100
188+
testCases := []struct {
189+
name string
190+
cb cb.Queue[int]
191+
}{
192+
{name: "CircBuffer", cb: cb.New[int](capacity)},
193+
{name: "Channel", cb: cb.NewC[int](capacity)},
194+
}
195+
for _, tc := range testCases {
196+
b.Run(tc.name, func(bb *testing.B) {
197+
benchmarkCircularBufferBlocking(bb, tc.cb)
198+
})
199+
}
200+
}
201+
func benchmarkCircularBufferBlocking(b *testing.B, c cb.Queue[int]) {
147202
var x int
148203
for i := 0; i < b.N; i++ {
149204
c.PushBlocking(i)
@@ -155,9 +210,7 @@ func BenchmarkCircularBufferBlocking(b *testing.B) {
155210
b.Logf("x=%d", x)
156211
}
157212

158-
func benchmarkPushBlocking(b *testing.B, numProducers, numConsumers int) {
159-
buffer := cb.New[int](20) // small queue, higher contention
160-
213+
func benchmarkPushBlocking(b *testing.B, buffer cb.Queue[int], numProducers, numConsumers int) {
161214
var wg sync.WaitGroup
162215
wg.Add(numProducers + numConsumers)
163216
prodN := b.N * numConsumers
@@ -191,11 +244,20 @@ func benchmarkPushBlocking(b *testing.B, numProducers, numConsumers int) {
191244
wg.Wait()
192245
}
193246

194-
func BenchmarkPushBlocking(b *testing.B) {
247+
func BenchmarkHighContention(b *testing.B) {
195248
numProducers := 13
196249
numConsumers := 7
197250

198-
b.Run("BenchmarkPushBlocking 7,5", func(b *testing.B) {
199-
benchmarkPushBlocking(b, numProducers, numConsumers)
200-
})
251+
testCases := []struct {
252+
name string
253+
cb cb.Queue[int]
254+
}{
255+
{name: "CircBuffer", cb: cb.New[int](capacity)},
256+
{name: "Channel", cb: cb.NewC[int](capacity)},
257+
}
258+
for _, tc := range testCases {
259+
b.Run(tc.name, func(bb *testing.B) {
260+
benchmarkPushBlocking(bb, tc.cb, numProducers, numConsumers)
261+
})
262+
}
201263
}

0 commit comments

Comments
 (0)