Skip to content
This repository has been archived by the owner on Sep 19, 2024. It is now read-only.

Commit

Permalink
fix(mqtt): feedback the internal error
Browse files Browse the repository at this point in the history
  • Loading branch information
Frank Mai authored and guangbochen committed Sep 25, 2020
1 parent 0ba39be commit b9b2408
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 32 deletions.
27 changes: 16 additions & 11 deletions adaptors/mqtt/pkg/adaptor/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,18 +92,23 @@ func (s *Service) Connect(server api.Connection_ConnectServer) error {
var logger = log.WithValues("mqtt device", deviceName)

// creates handler for syncing to limb
var toLimb = func(in *v1alpha1.MQTTDevice) error {
// send device by {name, namespace, status} tuple
var resp = &v1alpha1.MQTTDevice{}
resp.Namespace = in.Namespace
resp.Name = in.Name
resp.Status = in.Status

// convert device to json bytes
var respBytes = s.toJSON(resp)

var toLimb = func(in *v1alpha1.MQTTDevice, internalError error) error {
var resp *api.ConnectResponse
if internalError != nil {
// feedback error message
resp = &api.ConnectResponse{ErrorMessage: internalError.Error()}
} else {
// send device by {name, namespace, status} tuple
var device = &v1alpha1.MQTTDevice{}
device.Namespace = in.Namespace
device.Name = in.Name
device.Status = in.Status
// convert device to json bytes
var deviceBytes = s.toJSON(device)
resp = &api.ConnectResponse{Device: deviceBytes}
}
// send device to limb
if err := server.Send(&api.ConnectResponse{Device: respBytes}); err != nil {
if err := server.Send(resp); err != nil {
return status.Errorf(codes.Unknown, "failed to send device to limb, %v", err)
}
return nil
Expand Down
31 changes: 29 additions & 2 deletions adaptors/mqtt/pkg/physical/device.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package physical

import (
"io"
"reflect"
"sync"

MQTT "github.com/eclipse/paho.mqtt.golang"
"github.com/go-logr/logr"
"github.com/pkg/errors"
"github.com/tidwall/gjson"
Expand Down Expand Up @@ -80,7 +82,32 @@ func (d *mqttDevice) Configure(references api.ReferencesHandler, configuration i
d.log.V(1).Info("Disconnected stale connection")
}

var cli, err = mqtt.NewClient(newSpec.Protocol.MQTTOptions, object.GetControlledOwnerObjectReference(device), references)
var clientBuilder = mqtt.NewClientBuilder(newSpec.Protocol.MQTTOptions, object.GetControlledOwnerObjectReference(device))
clientBuilder.Render(references)
clientBuilder.ConfigureOptions(func(options *MQTT.ClientOptions) error {
var autoReconnect = options.AutoReconnect
options.SetConnectionLostHandler(func(_ MQTT.Client, cerr error) {
if autoReconnect {
d.log.Error(cerr, "MQTT broker connection is closed, please turn off the AutoReconnect if want to know this at the first time")
return
}

// NB(thxCode) feedbacks the EOF of MQTT broker connection if turn off the auto reconnection.
var feedbackErr error
if cerr != io.EOF {
feedbackErr = errors.Wrapf(cerr, "error for MQTT broker connection")
} else {
feedbackErr = errors.New("MQTT broker connection is closed")
}
if d.toLimb != nil {
if err := d.toLimb(nil, feedbackErr); err != nil {
d.log.Error(err, "failed to feedback the lost error of MQTT broker connection")
}
}
})
return nil
})
var cli, err = clientBuilder.Build()
if err != nil {
return errors.Wrap(err, "failed to create MQTT client")
}
Expand Down Expand Up @@ -298,7 +325,7 @@ func (d *mqttDevice) refreshAsAttributedTopic(staleSpecPropsIndex map[string]v1a
// sync combines all synchronization operations.
func (d *mqttDevice) sync() error {
if d.toLimb != nil {
if err := d.toLimb(d.instance); err != nil {
if err := d.toLimb(d.instance, nil); err != nil {
return err
}
}
Expand Down
2 changes: 1 addition & 1 deletion adaptors/mqtt/pkg/physical/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ import (
)

// MQTTDeviceLimbSyncer is used to sync mqtt device to limb.
type MQTTDeviceLimbSyncer func(in *v1alpha1.MQTTDevice) error
type MQTTDeviceLimbSyncer func(in *v1alpha1.MQTTDevice, internalError error) error
8 changes: 3 additions & 5 deletions pkg/mqtt/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,7 @@ func (c *client) Publish(message PublishMessage) error {

// NewClient creates the MQTT client with expected options.
func NewClient(spec api.MQTTOptions, ref corev1.ObjectReference, handler adaptorapi.ReferencesHandler) (Client, error) {
var cb = NewClientBuilder(spec, ref)
if err := cb.Render(handler); err != nil {
return nil, err
}
return cb.Build(), nil
var clientBuilder = NewClientBuilder(spec, ref)
clientBuilder.Render(handler)
return clientBuilder.Build()
}
41 changes: 29 additions & 12 deletions pkg/mqtt/client_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@ import (
"github.com/rancher/octopus/pkg/util/uuid"
)

type CustomMQTTOptionFunc func(options *mqtt.ClientOptions) error

type ClientBuilder struct {
ref corev1.ObjectReference
spec *api.MQTTOptions
status *mqtt.ClientOptions
err error
}

// Render renders the MQTT client options with expected options.
func (b *ClientBuilder) Render(handler adaptorapi.ReferencesHandler) error {
func (b *ClientBuilder) Render(handler adaptorapi.ReferencesHandler) {
var ref = b.ref
var clientSpec = b.spec.Client
var messageSpec = b.spec.Message
Expand All @@ -39,7 +42,8 @@ func (b *ClientBuilder) Render(handler adaptorapi.ReferencesHandler) error {
username = basicAuthSpec.Username
} else if ref := basicAuthSpec.UsernameRef; ref != nil {
if handler == nil {
return errors.Errorf("references handler is nil")
b.err = errors.Errorf("references handler is nil")
return
}
username = converter.UnsafeBytesToString(handler.GetData(ref.Name, ref.Item))
}
Expand All @@ -49,13 +53,15 @@ func (b *ClientBuilder) Render(handler adaptorapi.ReferencesHandler) error {
password = basicAuthSpec.Password
} else if ref := basicAuthSpec.PasswordRef; ref != nil {
if handler == nil {
return errors.Errorf("references handler is nil")
b.err = errors.Errorf("references handler is nil")
return
}
password = converter.UnsafeBytesToString(handler.GetData(ref.Name, ref.Item))
}

if username == "" || password == "" {
return errors.Errorf("illegal basic auth account as blank username or password")
b.err = errors.Errorf("illegal basic auth account as blank username or password")
return
}
status.SetUsername(username).SetPassword(password)
}
Expand Down Expand Up @@ -83,7 +89,8 @@ func (b *ClientBuilder) Render(handler adaptorapi.ReferencesHandler) error {
return caPool, nil
}()
if caPoolErr != nil {
return caPoolErr
b.err = caPoolErr
return
}

var certs, certsErr = func() ([]tls.Certificate, error) {
Expand Down Expand Up @@ -122,7 +129,8 @@ func (b *ClientBuilder) Render(handler adaptorapi.ReferencesHandler) error {
return certs, nil
}()
if certsErr != nil {
return certsErr
b.err = certsErr
return
}

status.SetTLSConfig(&tls.Config{
Expand Down Expand Up @@ -210,17 +218,26 @@ func (b *ClientBuilder) Render(handler adaptorapi.ReferencesHandler) error {
if len(clientSpec.HTTPHeaders) != 0 {
status.SetHTTPHeaders(clientSpec.HTTPHeaders)
}

return nil
}

// GetOptions returns the MQTT client options, call it after called `Render`.
func (b *ClientBuilder) GetOptions() *mqtt.ClientOptions {
return b.status
}

// ConfigureOptions allows to customize the MQTT client options.
func (b *ClientBuilder) ConfigureOptions(customFunc CustomMQTTOptionFunc) {
if b.err == nil && customFunc != nil {
b.err = customFunc(b.status)
}
}

// Build returns a MQTT client wrapper.
func (b *ClientBuilder) Build() Client {
func (b *ClientBuilder) Build() (Client, error) {
if b.err != nil {
return nil, b.err
}

var ref = b.ref
var clientSpec = b.spec.Client
var messageSpec = b.spec.Message
Expand Down Expand Up @@ -259,10 +276,10 @@ func (b *ClientBuilder) Build() Client {
),
)

return cli
return cli, nil
}

// NewClientBuilder creates the MQTT client builder.
func NewClientBuilder(spec api.MQTTOptions, ref corev1.ObjectReference) ClientBuilder {
return ClientBuilder{ref: ref, spec: &spec, status: mqtt.NewClientOptions()}
func NewClientBuilder(spec api.MQTTOptions, ref corev1.ObjectReference) *ClientBuilder {
return &ClientBuilder{ref: ref, spec: &spec, status: mqtt.NewClientOptions()}
}
3 changes: 2 additions & 1 deletion pkg/mqtt/client_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,8 @@ func TestClientBuilder_Render(t *testing.T) {

for _, tc := range testCases {
var cb = NewClientBuilder(tc.given.spec, tc.given.ref)
var err = cb.Render(tc.given.handler)
cb.Render(tc.given.handler)
var _, err = cb.Build()
if assert.Nil(t, err, "case %q", tc.name) {
var actual = cb.GetOptions()
assert.Equal(t, cleanFuncs(tc.expected.ret), cleanFuncs(actual), "case %q", tc.name)
Expand Down

0 comments on commit b9b2408

Please sign in to comment.