Skip to content

Commit

Permalink
Refactor for integration fixes #104, fixes #106, fixes #107 (#105)
Browse files Browse the repository at this point in the history
  • Loading branch information
mantzas authored Jun 22, 2018
1 parent 3d9ddbf commit 6bf4941
Show file tree
Hide file tree
Showing 42 changed files with 718 additions and 715 deletions.
5 changes: 5 additions & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Lines starting with '#' are comments.
# Each line is a file pattern followed by one or more owners.

# These owners will be the default owners for everything in the repo.
* @mantzas
1 change: 0 additions & 1 deletion CODEOWNERS

This file was deleted.

6 changes: 3 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

197 changes: 0 additions & 197 deletions Gopkg.lock.orig

This file was deleted.

16 changes: 8 additions & 8 deletions async/amqp/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ type Component struct {
name string
url string
queue string
p async.Processor
proc async.ProcessorFunc
tag string
ch *amqp.Channel
conn *amqp.Connection
}

// New returns a new client
func New(name, url, queue string, p async.Processor) (*Component, error) {
func New(name, url, queue string, p async.ProcessorFunc) (*Component, error) {

if name == "" {
return nil, errors.New("name is required")
Expand All @@ -44,7 +44,7 @@ func New(name, url, queue string, p async.Processor) (*Component, error) {
return nil, errors.New("work processor is required")
}

return &Component{name, url, queue, p, "", nil, nil}, nil
return &Component{name: name, url: url, queue: queue, proc: p, tag: "", ch: nil, conn: nil}, nil
}

// Run starts the async processing.
Expand Down Expand Up @@ -85,22 +85,22 @@ func (c *Component) Run(ctx context.Context) error {
dec, err := async.DetermineDecoder(d.ContentType)
if err != nil {
handlerMessageError(d, a, err, fmt.Sprintf("failed to determine encoding %s. Sending NACK", d.ContentType))
trace.FinishConsumerSpan(sp, true)
trace.FinishSpan(sp, true)
return
}
err = c.p.Process(ctx, async.NewMessage(d.Body, dec))
err = c.proc(ctx, async.NewMessage(d.Body, dec))
if err != nil {
handlerMessageError(d, a, err, fmt.Sprintf("failed to process message %s. Sending NACK", d.MessageId))
trace.FinishConsumerSpan(sp, true)
trace.FinishSpan(sp, true)
return
}
err = d.Ack(false)
if err != nil {
a.Append(errors.Wrapf(err, "failed to ACK message %s", d.MessageId))
trace.FinishConsumerSpan(sp, true)
trace.FinishSpan(sp, true)
return
}
trace.FinishConsumerSpan(sp, false)
trace.FinishSpan(sp, false)
}(&d, agr)

if agr.Count() > 0 {
Expand Down
14 changes: 7 additions & 7 deletions async/amqp/component_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,22 @@ func TestNew(t *testing.T) {
name string
url string
queue string
p async.Processor
proc async.ProcessorFunc
}
tests := []struct {
name string
args args
wantErr bool
}{
{"success", args{"test", "url", "queue", &async.MockProcessor{}}, false},
{"failed with invalid name", args{"", "url", "queue", &async.MockProcessor{}}, true},
{"failed with invalid url", args{"test", "", "queue", &async.MockProcessor{}}, true},
{"failed with invalid queue name", args{"test", "url", "", &async.MockProcessor{}}, true},
{"failed with invalid processor", args{"test", "url", "queue", nil}, true},
{"success", args{name: "test", url: "url", queue: "queue", proc: async.MockProcessor{}.Process}, false},
{"failed with invalid name", args{name: "", url: "url", queue: "queue", proc: async.MockProcessor{}.Process}, true},
{"failed with invalid url", args{name: "test", url: "", queue: "queue", proc: async.MockProcessor{}.Process}, true},
{"failed with invalid queue name", args{name: "test", url: "url", queue: "", proc: async.MockProcessor{}.Process}, true},
{"failed with invalid processor", args{name: "test", url: "url", queue: "queue", proc: nil}, true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := New(tt.args.name, tt.args.url, tt.args.queue, tt.args.p)
got, err := New(tt.args.name, tt.args.url, tt.args.queue, tt.args.proc)
if tt.wantErr {
assert.Error(err)
assert.Nil(got)
Expand Down
12 changes: 5 additions & 7 deletions async/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,23 @@ import (
"github.com/pkg/errors"
)

// Processor interface for implementing processing of messages
type Processor interface {
Process(context.Context, *Message) error
}
// ProcessorFunc definition of a async processor.
type ProcessorFunc func(context.Context, *Message) error

// Message definition of a async message.
type Message struct {
data []byte
Data []byte
decode encoding.DecodeRaw
}

// NewMessage creates a new message.
func NewMessage(d []byte, dec encoding.DecodeRaw) *Message {
return &Message{d, dec}
return &Message{Data: d, decode: dec}
}

// Decode a the raw message into the given value.
func (m *Message) Decode(v interface{}) error {
return m.decode(m.data, v)
return m.decode(m.Data, v)
}

// DetermineDecoder determines the decoder based on the content type.
Expand Down
4 changes: 2 additions & 2 deletions async/async_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func TestDetermineDecoder(t *testing.T) {
args args
wantErr bool
}{
{"success", args{json.ContentType}, false},
{"failure", args{"XXX"}, true},
{"success", args{contentType: json.ContentType}, false},
{"failure", args{contentType: "XXX"}, true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
Loading

0 comments on commit 6bf4941

Please sign in to comment.