Skip to content

Commit

Permalink
Support Protobuf in Components (fixes #198) (#259)
Browse files Browse the repository at this point in the history
Signed-off-by: Sotirios Mantziaris <[email protected]>
  • Loading branch information
mantzas authored Dec 21, 2018
1 parent c6a38f7 commit 07da69e
Show file tree
Hide file tree
Showing 31 changed files with 839 additions and 188 deletions.
5 changes: 4 additions & 1 deletion async/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/mantzas/patron/encoding"
"github.com/mantzas/patron/encoding/json"
"github.com/mantzas/patron/encoding/protobuf"
"github.com/mantzas/patron/errors"
)

Expand Down Expand Up @@ -36,6 +37,8 @@ func DetermineDecoder(contentType string) (encoding.DecodeRawFunc, error) {
switch contentType {
case json.Type, json.TypeCharset:
return json.DecodeRaw, nil
case protobuf.Type, protobuf.TypeGoogle:
return protobuf.DecodeRaw, nil
}
return nil, errors.Errorf("accept header %s is unsupported", contentType)
return nil, errors.Errorf("content header %s is unsupported", contentType)
}
4 changes: 3 additions & 1 deletion async/async_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"testing"

"github.com/mantzas/patron/encoding/json"
"github.com/mantzas/patron/encoding/protobuf"
"github.com/stretchr/testify/assert"
)

Expand All @@ -16,7 +17,8 @@ func TestDetermineDecoder(t *testing.T) {
args args
wantErr bool
}{
{"success", args{contentType: json.Type}, false},
{"success json", args{contentType: json.Type}, false},
{"success protobuf", args{contentType: protobuf.Type}, false},
{"failure", args{contentType: "XXX"}, true},
}
for _, tt := range tests {
Expand Down
34 changes: 34 additions & 0 deletions encoding/protobuf/protobuf.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package protobuf

import (
"io"
"io/ioutil"

"github.com/golang/protobuf/proto"
)

const (
// Type definition.
Type string = "application/x-protobuf"
// TypeGoogle definition.
TypeGoogle string = "application/x-google-protobuf"
)

// Decode a protobuf input in the form of a reader.
func Decode(data io.Reader, v interface{}) error {
b, err := ioutil.ReadAll(data)
if err != nil {
return err
}
return DecodeRaw(b, v)
}

// DecodeRaw a protobuf input in the form of a byte slice.
func DecodeRaw(data []byte, v interface{}) error {
return proto.Unmarshal(data, v.(proto.Message))
}

// Encode a model to protobuf.
func Encode(v interface{}) ([]byte, error) {
return proto.Marshal(v.(proto.Message))
}
71 changes: 71 additions & 0 deletions encoding/protobuf/protobuf_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package protobuf

import (
"bytes"
"errors"
"testing"

"github.com/stretchr/testify/assert"

"github.com/golang/protobuf/proto"
)

func TestEncodeDecode(t *testing.T) {
test := Test{
Label: proto.String("hello"),
Type: proto.Int32(17),
Reps: []int64{1, 2, 3},
}
test2 := Test{}
test3 := Test{}

b, err := Encode(&test)
assert.NoError(t, err)
err = DecodeRaw(b, &test2)
assert.NoError(t, err)
assert.Equal(t, test.GetLabel(), test2.GetLabel())
assert.Equal(t, test.GetType(), test2.GetType())
assert.Equal(t, test.GetReps(), test2.GetReps())

r := bytes.NewReader(b)
err = Decode(r, &test3)
assert.NoError(t, err)
assert.Equal(t, test.GetLabel(), test3.GetLabel())
assert.Equal(t, test.GetType(), test3.GetType())
assert.Equal(t, test.GetReps(), test3.GetReps())
}

func TestDecodeError(t *testing.T) {
test := Test{}
err := Decode(errReader(0), &test)
assert.Error(t, err)
}

func TestProtobuf(t *testing.T) {
test := Test{
Label: proto.String("hello"),
Type: proto.Int32(17),
Reps: []int64{1, 2, 3},
}

test1 := Test{
Type: nil,
}

assert.Equal(t, "", test1.GetLabel())
assert.Equal(t, int32(0), test1.GetType())
assert.Equal(t, []int64([]int64(nil)), test1.GetReps())

test.XXX_DiscardUnknown()
test.XXX_Merge(&test1)
assert.Equal(t, "label:\"hello\" type:17 reps:1 reps:2 reps:3 ", test.String())
b, c := test.Descriptor()
assert.NotEmpty(t, b)
assert.Len(t, c, 1)
}

type errReader int

func (errReader) Read(p []byte) (n int, err error) {
return 0, errors.New("test error")
}
94 changes: 94 additions & 0 deletions encoding/protobuf/test.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions encoding/protobuf/test.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
syntax = "proto2";
package protobuf;

message Test {
required string label = 1;
optional int32 type = 2;
repeated int64 reps = 3;
}
19 changes: 14 additions & 5 deletions examples/first/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import (
"time"

"github.com/mantzas/patron"
"github.com/mantzas/patron/encoding/json"
"github.com/mantzas/patron/encoding/protobuf"
"github.com/mantzas/patron/examples"
"github.com/mantzas/patron/log"
"github.com/mantzas/patron/sync"
patronhttp "github.com/mantzas/patron/sync/http"
Expand Down Expand Up @@ -61,7 +62,15 @@ func main() {
}

func first(ctx context.Context, req *sync.Request) (*sync.Response, error) {
b, err := json.Encode("patron")

var u examples.User

err := req.Decode(&u)
if err != nil {
return nil, errors.Wrap(err, "failed to decode request")
}

b, err := protobuf.Encode(&u)
if err != nil {
return nil, errors.Wrap(err, "failed create request")
}
Expand All @@ -70,8 +79,8 @@ func first(ctx context.Context, req *sync.Request) (*sync.Response, error) {
if err != nil {
return nil, errors.Wrap(err, "failed create request")
}
secondRouteReq.Header.Add("Content-Type", "application/json")
secondRouteReq.Header.Add("Accept", "application/json")
secondRouteReq.Header.Add("Content-Type", protobuf.Type)
secondRouteReq.Header.Add("Accept", protobuf.Type)
secondRouteReq.Header.Add("Authorization", "Apikey 123456")
cl, err := tracehttp.New(tracehttp.Timeout(5 * time.Second))
if err != nil {
Expand All @@ -82,6 +91,6 @@ func first(ctx context.Context, req *sync.Request) (*sync.Response, error) {
return nil, errors.Wrap(err, "failed to post to second service")
}

log.FromContext(ctx).Infof("request processed")
log.FromContext(ctx).Infof("request processed: %s %s", u.GetFirstname(), u.GetLastname())
return sync.NewResponse(fmt.Sprintf("got %s from second HTTP route", rsp.Status)), nil
}
7 changes: 4 additions & 3 deletions examples/fourth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/mantzas/patron"
"github.com/mantzas/patron/async"
"github.com/mantzas/patron/async/amqp"
"github.com/mantzas/patron/examples"
"github.com/mantzas/patron/log"
)

Expand Down Expand Up @@ -88,13 +89,13 @@ func newAmqpComponent(url, queue, exchange string) (*amqpComponent, error) {
}

func (ac *amqpComponent) Process(msg async.Message) error {
var m string
var u examples.User

err := msg.Decode(&m)
err := msg.Decode(&u)
if err != nil {
return err
}

log.FromContext(msg.Context()).Infof("request processed: %s", m)
log.FromContext(msg.Context()).Infof("request processed: %s %s", u.GetFirstname(), u.GetLastname())
return nil
}
15 changes: 8 additions & 7 deletions examples/second/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/mantzas/patron"
"github.com/mantzas/patron/examples"
"github.com/mantzas/patron/log"
"github.com/mantzas/patron/sync"
patronhttp "github.com/mantzas/patron/sync/http"
Expand Down Expand Up @@ -96,26 +97,26 @@ func newHTTPComponent(kafkaBroker, topic, url string) (*httpComponent, error) {

func (hc *httpComponent) second(ctx context.Context, req *sync.Request) (*sync.Response, error) {

var m string
err := req.Decode(&m)
var u examples.User
err := req.Decode(&u)
if err != nil {
return nil, errors.Wrap(err, "failed to decode message")
}

googleReq, err := http.NewRequest("GET", "https://www.google.com", nil)
if err != nil {
return nil, errors.Wrap(err, "failed to create requestfor www.google.com")
return nil, errors.Wrap(err, "failed to create request for www.google.com")
}
cl, err := tracehttp.New(tracehttp.Timeout(5 * time.Second))
if err != nil {
return nil, err
}
rsp, err := cl.Do(ctx, googleReq)
_, err = cl.Do(ctx, googleReq)
if err != nil {
return nil, errors.Wrap(err, "failed to get www.google.com")
}

kafkaMsg, err := kafka.NewJSONMessage(hc.topic, m)
kafkaMsg, err := kafka.NewJSONMessage(hc.topic, &u)
if err != nil {
return nil, err
}
Expand All @@ -125,8 +126,8 @@ func (hc *httpComponent) second(ctx context.Context, req *sync.Request) (*sync.R
return nil, err
}

log.FromContext(ctx).Infof("request processed: %s", m)
return sync.NewResponse(fmt.Sprintf("got %s from google", rsp.Status)), nil
log.FromContext(ctx).Infof("request processed: %s %s", u.GetFirstname(), u.GetLastname())
return nil, nil
}

type apiKeyValidator struct {
Expand Down
2 changes: 1 addition & 1 deletion examples/start_processing.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
#!/bin/bash

curl -i -X POST http://localhost:50000
curl -i -H "Content-Type: application/json" -X POST http://localhost:50000 --data '{"firstname":"John","lastname":"Doe"}'
Loading

0 comments on commit 07da69e

Please sign in to comment.