Skip to content

Commit e06eeb9

Browse files
mrahsAnas Sulaiman
authored andcommitted
Simplify the headers type
1 parent d32ef29 commit e06eeb9

16 files changed

+129
-281
lines changed

context.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111
"github.com/lovoo/goka/multierr"
1212
)
1313

14-
type emitter func(topic string, key string, value []byte, headers *Headers) *Promise
14+
type emitter func(topic string, key string, value []byte, headers Headers) *Promise
1515

1616
// Context provides access to the processor's table and emit capabilities to
1717
// arbitrary topics in kafka.
@@ -62,7 +62,7 @@ type Context interface {
6262
Value() interface{}
6363

6464
// Headers returns the headers of the input message
65-
Headers() *Headers
65+
Headers() Headers
6666

6767
// SetValue updates the value of the key in the group table.
6868
// It stores the value in the local cache and sends the
@@ -148,7 +148,7 @@ type cbContext struct {
148148

149149
// Headers as passed from sarama. Note that this field will be filled
150150
// lazily after the first call to Headers
151-
headers *Headers
151+
headers Headers
152152

153153
table *PartitionTable
154154
// joins
@@ -223,7 +223,7 @@ func (ctx *cbContext) Loopback(key string, value interface{}, options ...Context
223223
ctx.emit(l.Topic(), key, data, opts.emitHeaders)
224224
}
225225

226-
func (ctx *cbContext) emit(topic string, key string, value []byte, headers *Headers) {
226+
func (ctx *cbContext) emit(topic string, key string, value []byte, headers Headers) {
227227
ctx.counters.emits++
228228
ctx.emitter(topic, key, value, ctx.emitterDefaultHeaders.Merged(headers)).Then(func(err error) {
229229
if err != nil {
@@ -285,7 +285,7 @@ func (ctx *cbContext) Partition() int32 {
285285
return ctx.msg.Partition
286286
}
287287

288-
func (ctx *cbContext) Headers() *Headers {
288+
func (ctx *cbContext) Headers() Headers {
289289
if ctx.headers == nil {
290290
ctx.headers = HeadersFromSarama(ctx.msg.Headers)
291291
}
@@ -349,7 +349,7 @@ func (ctx *cbContext) valueForKey(key string) (interface{}, error) {
349349
return value, nil
350350
}
351351

352-
func (ctx *cbContext) deleteKey(key string, headers *Headers) error {
352+
func (ctx *cbContext) deleteKey(key string, headers Headers) error {
353353
if ctx.graph.GroupTable() == nil {
354354
return fmt.Errorf("Cannot access state in stateless processor")
355355
}
@@ -368,7 +368,7 @@ func (ctx *cbContext) deleteKey(key string, headers *Headers) error {
368368
}
369369

370370
// setValueForKey sets a value for a key in the processor state.
371-
func (ctx *cbContext) setValueForKey(key string, value interface{}, headers *Headers) error {
371+
func (ctx *cbContext) setValueForKey(key string, value interface{}, headers Headers) error {
372372
if ctx.graph.GroupTable() == nil {
373373
return fmt.Errorf("Cannot access state in stateless processor")
374374
}

context_test.go

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import (
1717
)
1818

1919
func newEmitter(err error, done func(err error)) emitter {
20-
return func(topic string, key string, value []byte, headers *Headers) *Promise {
20+
return func(topic string, key string, value []byte, headers Headers) *Promise {
2121
p := NewPromise()
2222
if done != nil {
2323
p.Then(done)
@@ -27,7 +27,7 @@ func newEmitter(err error, done func(err error)) emitter {
2727
}
2828

2929
func newEmitterW(wg *sync.WaitGroup, err error, done func(err error)) emitter {
30-
return func(topic string, key string, value []byte, headers *Headers) *Promise {
30+
return func(topic string, key string, value []byte, headers Headers) *Promise {
3131
wg.Add(1)
3232
p := NewPromise()
3333
if done != nil {
@@ -369,14 +369,14 @@ func TestContext_GetSetStateful(t *testing.T) {
369369
defer ctrl.Finish()
370370

371371
var (
372-
group Group = "some-group"
373-
key = "key"
374-
value = "value"
375-
headers, _ = NewHeaders("key", "headerValue")
376-
offset = int64(123)
377-
wg = new(sync.WaitGroup)
378-
st = NewMockStorage(ctrl)
379-
pt = &PartitionTable{
372+
group Group = "some-group"
373+
key = "key"
374+
value = "value"
375+
headers = Headers{"key": []byte("headerValue")}
376+
offset = int64(123)
377+
wg = new(sync.WaitGroup)
378+
st = NewMockStorage(ctrl)
379+
pt = &PartitionTable{
380380
st: &storageProxy{
381381
Storage: st,
382382
},
@@ -397,7 +397,7 @@ func TestContext_GetSetStateful(t *testing.T) {
397397
graph: graph,
398398
trackOutputStats: func(ctx context.Context, topic string, size int) {},
399399
msg: &sarama.ConsumerMessage{Key: []byte(key), Offset: offset},
400-
emitter: func(tp string, k string, v []byte, h *Headers) *Promise {
400+
emitter: func(tp string, k string, v []byte, h Headers) *Promise {
401401
wg.Add(1)
402402
test.AssertEqual(t, tp, graph.GroupTable().Topic())
403403
test.AssertEqual(t, string(k), key)
@@ -493,18 +493,18 @@ func TestContext_Loopback(t *testing.T) {
493493
defer ctrl.Finish()
494494

495495
var (
496-
key = "key"
497-
value = "value"
498-
headers, _ = NewHeaders("key", "headerValue")
499-
cnt = 0
496+
key = "key"
497+
value = "value"
498+
headers = Headers{"key": []byte("headerValue")}
499+
cnt = 0
500500
)
501501

502502
graph := DefineGroup("group", Persist(c), Loop(c, cb))
503503
ctx := &cbContext{
504504
graph: graph,
505505
msg: &sarama.ConsumerMessage{},
506506
trackOutputStats: func(ctx context.Context, topic string, size int) {},
507-
emitter: func(tp string, k string, v []byte, h *Headers) *Promise {
507+
emitter: func(tp string, k string, v []byte, h Headers) *Promise {
508508
cnt++
509509
test.AssertEqual(t, tp, graph.LoopStream().Topic())
510510
test.AssertEqual(t, string(k), key)
@@ -632,7 +632,7 @@ func TestContext_Headers(t *testing.T) {
632632
}
633633
headers := ctx.Headers()
634634
test.AssertNotNil(t, headers)
635-
test.AssertEqual(t, headers.Len(), 0)
635+
test.AssertEqual(t, len(headers), 0)
636636

637637
ctx = &cbContext{
638638
msg: &sarama.ConsumerMessage{Key: []byte("key"), Headers: []*sarama.RecordHeader{
@@ -643,7 +643,7 @@ func TestContext_Headers(t *testing.T) {
643643
}},
644644
}
645645
headers = ctx.Headers()
646-
test.AssertEqual(t, headers.Val("key"), []byte("value"))
646+
test.AssertEqual(t, headers["key"], []byte("value"))
647647
}
648648

649649
func TestContext_Fail(t *testing.T) {

emitter.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ type Emitter struct {
1717
producer Producer
1818

1919
topic string
20-
defaultHeaders *Headers
20+
defaultHeaders Headers
2121

2222
wg sync.WaitGroup
2323
mu sync.RWMutex
@@ -57,7 +57,7 @@ func NewEmitter(brokers []string, topic Stream, codec Codec, options ...EmitterO
5757
func (e *Emitter) emitDone(err error) { e.wg.Done() }
5858

5959
// EmitWithHeaders sends a message with the given headers for the passed key using the emitter's codec.
60-
func (e *Emitter) EmitWithHeaders(key string, msg interface{}, headers *Headers) (*Promise, error) {
60+
func (e *Emitter) EmitWithHeaders(key string, msg interface{}, headers Headers) (*Promise, error) {
6161
var (
6262
err error
6363
data []byte
@@ -95,7 +95,7 @@ func (e *Emitter) Emit(key string, msg interface{}) (*Promise, error) {
9595
}
9696

9797
// EmitSyncWithHeaders sends a message with the given headers to passed topic and key.
98-
func (e *Emitter) EmitSyncWithHeaders(key string, msg interface{}, headers *Headers) error {
98+
func (e *Emitter) EmitSyncWithHeaders(key string, msg interface{}, headers Headers) error {
9999
var (
100100
err error
101101
promise *Promise

emitter_test.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -118,9 +118,7 @@ func TestEmitter_Emit(t *testing.T) {
118118
})
119119
t.Run("default_headers", func(t *testing.T) {
120120
emitter, bm, ctrl := createEmitter(t)
121-
defaultHeaders, err := NewHeaders("header-key", "header-val")
122-
test.AssertNil(t, err)
123-
emitter.defaultHeaders = defaultHeaders
121+
emitter.defaultHeaders = Headers{"header-key": []byte("header-val")}
124122
defer ctrl.Finish()
125123

126124
var (

header_test.go

Lines changed: 16 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -2,81 +2,37 @@ package goka
22

33
import (
44
"testing"
5-
6-
"github.com/Shopify/sarama"
75
)
86

97
func TestHeaders_Merged(t *testing.T) {
10-
h1, _ := NewHeaders()
11-
h1.Set("key1", []byte("val1"))
12-
h2, _ := NewHeaders()
13-
h2.Set("key1", []byte("val1b"))
14-
h2.Set("key2", []byte("val2"))
8+
h1 := Headers{
9+
"key1": []byte("val1"),
10+
}
11+
h2 := Headers{
12+
"key1": []byte("val1b"),
13+
"key2": []byte("val2"),
14+
}
1515
merged := h1.Merged(h2)
1616

17-
if h1.Len() != 1 || h1.StrVal("key1") != "val1" {
18-
t.Errorf("Merged failed: reciver was modified")
17+
if len(h1) != 1 || string(h1["key1"]) != "val1" {
18+
t.Errorf("Merged failed: receiver was modified")
1919
}
2020

21-
if h2.Len() != 2 || h2.StrVal("key1") != "val1b" || h2.StrVal("key2") != "val2" {
21+
if len(h2) != 2 || string(h2["key1"]) != "val1b" || string(h2["key2"]) != "val2" {
2222
t.Errorf("Merged failed: argument was modified")
2323
}
2424

25-
if merged.Len() != 2 {
26-
t.Errorf("Merged failed: expected %d keys, but found %d", 2, merged.Len())
25+
if len(merged) != 2 {
26+
t.Errorf("Merged failed: expected %d keys, but found %d", 2, len(merged))
2727
}
2828

29-
if merged.StrVal("key1") != "val1b" {
29+
if string(merged["key1"]) != "val1b" {
3030
t.Errorf("Merged failed: expected %q for key %q, but found %q",
31-
"val1b", "key1", merged.StrVal("key1"))
31+
"val1b", "key1", string(merged["key1"]))
3232
}
3333

34-
if merged.StrVal("key2") != "val2" {
34+
if string(merged["key2"]) != "val2" {
3535
t.Errorf("Merged failed: expected %q for key %q, but found %q",
36-
"val2", "key2", merged.StrVal("key2"))
37-
}
38-
}
39-
40-
func TestHeaders_Lazy(t *testing.T) {
41-
h := HeadersFromSarama([]*sarama.RecordHeader{{Key: []byte("key"), Value: []byte("value")}})
42-
if len(h.sarama) == 0 {
43-
t.Fatalf("Sarama headers should be set")
44-
}
45-
if len(h.goka) > 0 {
46-
t.Fatalf("Goka headers should not be set")
47-
}
48-
49-
if h.Len() != 1 {
50-
t.Fatalf("Unexpected header count: expected 1, but found %d", h.Len())
51-
}
52-
53-
if len(h.sarama) > 0 {
54-
t.Fatalf("Sarama headers should not be set")
55-
}
56-
if len(h.goka) == 0 {
57-
t.Fatalf("Goka headers should be set")
58-
}
59-
}
60-
61-
func TestHeaders_New(t *testing.T) {
62-
h, err := NewHeaders("k1", "v1", "k2", "v2")
63-
if err != nil {
64-
t.Fatalf("Unexpected error: %v", err)
65-
}
66-
67-
if h.Len() != 2 {
68-
t.Fatalf("Unexpected header count: expected 2, but found %d", h.Len())
69-
}
70-
71-
if h.StrVal("k1") != "v1" {
72-
t.Fatalf("Unexpected value for k1: %q", h.StrVal("k1"))
73-
}
74-
if h.StrVal("k2") != "v2" {
75-
t.Fatalf("Unexpected value for k2: %q", h.StrVal("k2"))
76-
}
77-
78-
h, err = NewHeaders("k")
79-
if err == nil {
80-
t.Fatalf("Expected an error for a key without value")
36+
"val2", "key2", string(merged["key2"]))
8137
}
8238
}

0 commit comments

Comments
 (0)