Skip to content

Commit 5742b97

Browse files
author
Diogo Behrens
committed
move codec to main package and remove key
1 parent 9ce25e2 commit 5742b97

File tree

14 files changed

+74
-77
lines changed

14 files changed

+74
-77
lines changed

codec.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package goka
2+
3+
// Codec decodes and encodes from and to []byte
4+
type Codec interface {
5+
Encode(value interface{}) (data []byte, err error)
6+
Decode(data []byte) (value interface{}, err error)
7+
}

codec/codec.go

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,11 @@ import (
55
"strconv"
66
)
77

8-
// Codec decodes and encodes from and to []byte
9-
type Codec interface {
10-
Encode(key string, value interface{}) (data []byte, err error)
11-
Decode(key string, data []byte) (value interface{}, err error)
12-
}
13-
148
// Bytes codec is
159
type Bytes struct{}
1610

1711
// Encode does a type conversion into []byte
18-
func (d *Bytes) Encode(key string, value interface{}) ([]byte, error) {
12+
func (d *Bytes) Encode(value interface{}) ([]byte, error) {
1913
var err error
2014
data, isByte := value.([]byte)
2115
if !isByte {
@@ -25,15 +19,15 @@ func (d *Bytes) Encode(key string, value interface{}) ([]byte, error) {
2519
}
2620

2721
// Decode of defaultCodec simply returns the data
28-
func (d *Bytes) Decode(key string, data []byte) (interface{}, error) {
22+
func (d *Bytes) Decode(data []byte) (interface{}, error) {
2923
return data, nil
3024
}
3125

3226
// String is a commonly used codec to encode and decode string <-> []byte
3327
type String struct{}
3428

3529
// Encode encodes from string to []byte
36-
func (c *String) Encode(key string, value interface{}) ([]byte, error) {
30+
func (c *String) Encode(value interface{}) ([]byte, error) {
3731
stringVal, isString := value.(string)
3832
if !isString {
3933
return nil, fmt.Errorf("String: value to encode is not of type string but %T", value)
@@ -42,15 +36,15 @@ func (c *String) Encode(key string, value interface{}) ([]byte, error) {
4236
}
4337

4438
// Decode decodes from []byte to string
45-
func (c *String) Decode(key string, data []byte) (interface{}, error) {
39+
func (c *String) Decode(data []byte) (interface{}, error) {
4640
return string(data), nil
4741
}
4842

4943
// Int64 is a commonly used codec to encode and decode string <-> []byte
5044
type Int64 struct{}
5145

5246
// Encode encodes from string to []byte
53-
func (c *Int64) Encode(key string, value interface{}) ([]byte, error) {
47+
func (c *Int64) Encode(value interface{}) ([]byte, error) {
5448
intVal, isInt := value.(int64)
5549
if !isInt {
5650
return nil, fmt.Errorf("Int64: value to encode is not of type int64")
@@ -59,7 +53,7 @@ func (c *Int64) Encode(key string, value interface{}) ([]byte, error) {
5953
}
6054

6155
// Decode decodes from []byte to string
62-
func (c *Int64) Decode(key string, data []byte) (interface{}, error) {
56+
func (c *Int64) Decode(data []byte) (interface{}, error) {
6357
intVal, err := strconv.ParseInt(string(data), 10, 64)
6458
if err != nil {
6559
return 0, fmt.Errorf("Error parsing data from string %d: %v", intVal, err)

context.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"fmt"
55
"sync"
66

7-
"github.com/lovoo/goka/codec"
87
"github.com/lovoo/goka/kafka"
98
"github.com/lovoo/goka/storage"
109
)
@@ -54,7 +53,7 @@ type context struct {
5453
failer func(err error)
5554

5655
storage storage.Storage
57-
codec codec.Codec
56+
codec Codec
5857
views map[string]*partition
5958

6059
msg *message
@@ -86,7 +85,7 @@ func (ctx *context) Loopback(key string, value interface{}) error {
8685
return fmt.Errorf("No loop topic configured")
8786
}
8887

89-
data, err := ctx.loopTopic.codec.Encode(key, value)
88+
data, err := ctx.loopTopic.codec.Encode(value)
9089
if err != nil {
9190
return fmt.Errorf("Error encoding message for key %s: %v", key, err)
9291
}
@@ -173,7 +172,7 @@ func (ctx *context) setValueForKey(key string, value interface{}) error {
173172
return fmt.Errorf("Error storing value: %v", err)
174173
}
175174

176-
encodedValue, err := ctx.codec.Encode(key, value)
175+
encodedValue, err := ctx.codec.Encode(value)
177176
if err != nil {
178177
return fmt.Errorf("Error encoding value: %v", err)
179178
}

kafkamock.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ func (km *KafkaMock) SetGroupTableCreator(creator func() (string, []byte)) {
114114
// )
115115
func (km *KafkaMock) ProcessorOptions() []ProcessorOption {
116116
return []ProcessorOption{
117-
WithStorageBuilder(func(topic string, partition int32, c codec.Codec, reg metrics.Registry) (storage.Storage, error) {
117+
WithStorageBuilder(func(topic string, partition int32, c Codec, reg metrics.Registry) (storage.Storage, error) {
118118
return km.storage, nil
119119
}),
120120
WithConsumer(km.consumerMock),

options.go

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,9 @@ import (
55
"path/filepath"
66
"time"
77

8-
metrics "github.com/rcrowley/go-metrics"
9-
"github.com/lovoo/goka/codec"
108
"github.com/lovoo/goka/kafka"
119
"github.com/lovoo/goka/storage"
10+
metrics "github.com/rcrowley/go-metrics"
1211
)
1312

1413
// UpdateCallback is invoked upon arrival of a message for a table partition.
@@ -17,7 +16,7 @@ type UpdateCallback func(s storage.Storage, partition int32, key string, value [
1716

1817
// StorageBuilder creates a local storage (a persistent cache) for a topic
1918
// table. StorageBuilder creates one storage for each partition of the topic.
20-
type StorageBuilder func(topic string, partition int32, codec codec.Codec, reg metrics.Registry) (storage.Storage, error)
19+
type StorageBuilder func(topic string, partition int32, codec Codec, reg metrics.Registry) (storage.Storage, error)
2120

2221
///////////////////////////////////////////////////////////////////////////////
2322
// default values
@@ -62,7 +61,7 @@ type poptions struct {
6261
clientID string
6362

6463
tableEnabled bool
65-
tableCodec codec.Codec
64+
tableCodec Codec
6665
updateCallback UpdateCallback
6766
storagePath string
6867
storageSnapshotInterval time.Duration
@@ -80,7 +79,7 @@ type poptions struct {
8079
}
8180

8281
// WithGroupTable enables the group table and defines a codec.
83-
func WithGroupTable(codec codec.Codec) ProcessorOption {
82+
func WithGroupTable(codec Codec) ProcessorOption {
8483
return func(o *poptions) {
8584
o.tableEnabled = true
8685
o.tableCodec = codec
@@ -227,7 +226,7 @@ func (opt *poptions) tableTopic(group string) Subscription {
227226
return Subscription{Name: tableName(group)}
228227
}
229228

230-
func (opt *poptions) defaultStorageBuilder(topic string, partition int32, codec codec.Codec, reg metrics.Registry) (storage.Storage, error) {
229+
func (opt *poptions) defaultStorageBuilder(topic string, partition int32, codec Codec, reg metrics.Registry) (storage.Storage, error) {
231230
return storage.New(opt.storagePathForPartition(topic, partition), codec, reg, opt.storageSnapshotInterval)
232231
}
233232

@@ -239,7 +238,7 @@ func (opt *poptions) defaultStorageBuilder(topic string, partition int32, codec
239238
type ViewOption func(*voptions)
240239

241240
type voptions struct {
242-
tableCodec codec.Codec
241+
tableCodec Codec
243242
updateCallback UpdateCallback
244243
storagePath string
245244
storageSnapshotInterval time.Duration
@@ -369,7 +368,7 @@ func (opt *voptions) tableTopic(group string) Subscription {
369368
return Subscription{Name: tableName(group)}
370369
}
371370

372-
func (opt *voptions) defaultStorageBuilder(topic string, partition int32, codec codec.Codec, reg metrics.Registry) (storage.Storage, error) {
371+
func (opt *voptions) defaultStorageBuilder(topic string, partition int32, codec Codec, reg metrics.Registry) (storage.Storage, error) {
373372
return storage.New(opt.storagePathForPartition(topic, partition), codec, reg, opt.storageSnapshotInterval)
374373
}
375374

@@ -385,7 +384,7 @@ type proptions struct {
385384
clientID string
386385

387386
registry metrics.Registry
388-
codec codec.Codec
387+
codec Codec
389388

390389
builders struct {
391390
topicmgr topicmgrBuilder

processor.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"runtime/debug"
99
"sync"
1010

11-
"github.com/lovoo/goka/codec"
1211
"github.com/lovoo/goka/kafka"
1312
"github.com/lovoo/goka/storage"
1413

@@ -253,11 +252,11 @@ func (g *Processor) Get(key string) (interface{}, error) {
253252
}
254253

255254
// make a deep copy of the object to make it read only.
256-
data, err := g.opts.tableCodec.Encode(key, val)
255+
data, err := g.opts.tableCodec.Encode(val)
257256
if err != nil {
258257
return nil, err
259258
}
260-
return g.opts.tableCodec.Decode(key, data)
259+
return g.opts.tableCodec.Decode(data)
261260
}
262261

263262
func (g *Processor) find(key string) (storage.Storage, error) {
@@ -479,7 +478,7 @@ func (s storageProxy) Stateless() bool {
479478
return s.stateless
480479
}
481480

482-
func (g *Processor) newStorage(topic string, id int32, codec codec.Codec, update UpdateCallback, reg metrics.Registry) (*storageProxy, error) {
481+
func (g *Processor) newStorage(topic string, id int32, codec Codec, update UpdateCallback, reg metrics.Registry) (*storageProxy, error) {
483482
if g.isStateless() {
484483
return &storageProxy{
485484
Storage: storage.NewMock(codec),
@@ -669,7 +668,7 @@ func (g *Processor) process(msg *message, st storage.Storage, wg *sync.WaitGroup
669668
return fmt.Errorf("cannot handle topic %s", msg.Topic)
670669
}
671670
// decode message
672-
m, err := stream.codec.Decode(msg.Key, msg.Data)
671+
m, err := stream.codec.Decode(msg.Data)
673672
if err != nil {
674673
wg.Done()
675674
return fmt.Errorf("error decoding message for key %s from %s/%d: %v", msg.Key, msg.Topic, msg.Partition, err)

processor_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -424,7 +424,7 @@ func TestProcessor_StartWithErrorBeforeRebalance(t *testing.T) {
424424
err error
425425
consumer = mock.NewMockConsumer(ctrl)
426426
st = mock.NewMockStorage(ctrl)
427-
sb = func(topic string, par int32, c codec.Codec, r metrics.Registry) (storage.Storage, error) {
427+
sb = func(topic string, par int32, c Codec, r metrics.Registry) (storage.Storage, error) {
428428
return st, nil
429429
}
430430
final = make(chan bool)
@@ -464,7 +464,7 @@ func TestProcessor_StartWithErrorAfterRebalance(t *testing.T) {
464464
err error
465465
consumer = mock.NewMockConsumer(ctrl)
466466
st = mock.NewMockStorage(ctrl)
467-
sb = func(topic string, par int32, c codec.Codec, r metrics.Registry) (storage.Storage, error) {
467+
sb = func(topic string, par int32, c Codec, r metrics.Registry) (storage.Storage, error) {
468468
return st, nil
469469
}
470470
final = make(chan bool)
@@ -543,7 +543,7 @@ func TestProcessor_Start(t *testing.T) {
543543
err error
544544
consumer = mock.NewMockConsumer(ctrl)
545545
st = mock.NewMockStorage(ctrl)
546-
sb = func(topic string, par int32, c codec.Codec, r metrics.Registry) (storage.Storage, error) {
546+
sb = func(topic string, par int32, c Codec, r metrics.Registry) (storage.Storage, error) {
547547
return st, nil
548548
}
549549
final = make(chan bool)
@@ -693,7 +693,7 @@ func TestProcessor_StartWithTable(t *testing.T) {
693693
err error
694694
consumer = mock.NewMockConsumer(ctrl)
695695
st = mock.NewMockStorage(ctrl)
696-
sb = func(topic string, par int32, c codec.Codec, r metrics.Registry) (storage.Storage, error) {
696+
sb = func(topic string, par int32, c Codec, r metrics.Registry) (storage.Storage, error) {
697697
return st, nil
698698
}
699699
final = make(chan bool)
@@ -805,7 +805,7 @@ func TestProcessor_rebalanceError(t *testing.T) {
805805
wait = make(chan bool)
806806
ch = make(chan kafka.Event)
807807
p = createProcessor(ctrl, consumer, 1,
808-
func(topic string, partition int32, c codec.Codec, r metrics.Registry) (storage.Storage, error) {
808+
func(topic string, partition int32, c Codec, r metrics.Registry) (storage.Storage, error) {
809809
return nil, errors.New("some error")
810810
})
811811
)
@@ -839,7 +839,7 @@ func TestProcessor_HasGet(t *testing.T) {
839839

840840
var (
841841
st = mock.NewMockStorage(ctrl)
842-
sb = func(topic string, partition int32, c codec.Codec, r metrics.Registry) (storage.Storage, error) {
842+
sb = func(topic string, partition int32, c Codec, r metrics.Registry) (storage.Storage, error) {
843843
return st, nil
844844
}
845845
consumer = mock.NewMockConsumer(ctrl)

producer.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,11 @@ import (
44
"fmt"
55
"sync"
66

7-
"github.com/lovoo/goka/codec"
87
"github.com/lovoo/goka/kafka"
98
)
109

1110
type Producer struct {
12-
codec codec.Codec
11+
codec Codec
1312
producer kafka.Producer
1413

1514
topic string
@@ -18,7 +17,7 @@ type Producer struct {
1817
}
1918

2019
// NewProducer creates a new producer using passed brokers, topic, codec and possibly options
21-
func NewProducer(brokers []string, topic string, codec codec.Codec, options ...ProducerOption) (*Producer, error) {
20+
func NewProducer(brokers []string, topic string, codec Codec, options ...ProducerOption) (*Producer, error) {
2221
options = append(
2322
// default options comes first
2423
[]ProducerOption{},
@@ -54,7 +53,7 @@ func (p *Producer) Produce(key string, msg interface{}) (*kafka.Promise, error)
5453
)
5554

5655
if msg != nil {
57-
data, err = p.codec.Encode(key, msg)
56+
data, err = p.codec.Encode(msg)
5857
if err != nil {
5958
return nil, fmt.Errorf("Error encoding value for key %s in topic %s: %v", key, p.topic, err)
6059
}

storage/codec.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package storage
2+
3+
// Codec decodes and encodes from and to []byte
4+
type Codec interface {
5+
Encode(value interface{}) (data []byte, err error)
6+
Decode(data []byte) (value interface{}, err error)
7+
}

0 commit comments

Comments
 (0)