Skip to content

Commit

Permalink
Improvements fixes #157 (#163)
Browse files Browse the repository at this point in the history
  • Loading branch information
mantzas authored Aug 4, 2018
1 parent 6164bef commit 44f336d
Show file tree
Hide file tree
Showing 12 changed files with 62 additions and 684 deletions.
34 changes: 0 additions & 34 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ The entry point of the framework is the `Service`. The `Service` uses `Component
- synchronous processing (HTTP)
- metrics and tracing
- logging
- configuration management

`Patron` provides same defaults for making the usage as simple as possible.

Expand Down Expand Up @@ -218,36 +217,3 @@ type Factory interface {
Two methods are supported:

- Create, which creates a logger with the specified fields (or nil)

## Config

The config package defines a interface that has to be implemented in order to be used inside the application.

```go
type Config interface {
Set(key string, value interface{}) error
Get(key string) (interface{}, error)
GetBool(key string) (bool, error)
GetInt64(key string) (int64, error)
GetString(key string) (string, error)
GetFloat64(key string) (float64, error)
}
```

After implementing the interface a instance has to be provided to the `Setup` method of the package in order to be used directly from the package eg `config.GetBool()`.

The following implementations are provided as sub-packages:

- env, support for env files and env vars

By default the service will use the `env` implementation and look for a `.env` file when starting up in order to set some env vars from a file. This is especially helpful for development.

### env

The env package supports getting env vars from the system. It allows further to provide a file that contain env vars, separated by a equal sign `=`, which are then set up on the environment. In order to setup config just do the following:

```go
c,err := env.New({reader to the config file})
// error checking
config.Setup(c)
```
40 changes: 0 additions & 40 deletions async/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"os"
"strconv"

"github.com/Shopify/sarama"
"github.com/mantzas/patron/async"
Expand All @@ -13,12 +12,6 @@ import (
"github.com/mantzas/patron/trace"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
)

var (
topicPartitionOffsetDiff *prometheus.GaugeVec
messagesConsumedCounter *prometheus.CounterVec
)

type message struct {
Expand Down Expand Up @@ -111,7 +104,6 @@ func New(name, ct, topic string, brokers []string, oo ...OptionFunc) (*Consumer,
}
}

initMetrics(name)
return c, nil
}

Expand Down Expand Up @@ -140,7 +132,6 @@ func (c *Consumer) Consume(ctx context.Context) (<-chan async.Message, <-chan er
chErr <- consumerError
case msg := <-consumer.Messages():
c.log.Debugf("data received from topic %s", msg.Topic)
reportStats(msg.Topic, msg.Partition, consumer.HighWaterMarkOffset(), msg.Offset)
go func() {
sp, chCtx := trace.StartConsumerSpan(ctx, c.name, trace.KafkaConsumerComponent, mapHeader(msg.Headers))

Expand Down Expand Up @@ -226,34 +217,3 @@ func mapHeader(hh []*sarama.RecordHeader) map[string]string {
}
return mp
}

func initMetrics(namespace string) {

topicPartitionOffsetDiff = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: "kafka_consumer",
Name: "offset_diff",
Help: "Message offset classified by topic and partition",
},
[]string{"topic", "partition"},
)

messagesConsumedCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: "kafka_consumer",
Name: "messages_consumed_total",
Help: "Total messages consumed, classified by topic and queue service",
},
[]string{"topic"},
)
prometheus.RegisterOrGet(messagesConsumedCounter)
prometheus.RegisterOrGet(topicPartitionOffsetDiff)
}

func reportStats(topic string, partition int32, highwaterMark, offset int64) {
messagesConsumedCounter.WithLabelValues(topic).Inc()
topicPartitionOffsetDiff.WithLabelValues(topic, strconv.FormatInt(int64(partition), 10)).
Set(float64(highwaterMark - offset))
}
4 changes: 0 additions & 4 deletions async/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,9 @@ func TestNew(t *testing.T) {
if tt.wantErr {
assert.Error(err)
assert.Nil(got)
assert.Nil(messagesConsumedCounter)
assert.Nil(topicPartitionOffsetDiff)
} else {
assert.NoError(err)
assert.NotNil(got)
assert.NotNil(messagesConsumedCounter)
assert.NotNil(topicPartitionOffsetDiff)
}
})
}
Expand Down
57 changes: 0 additions & 57 deletions config/config.go

This file was deleted.

133 changes: 0 additions & 133 deletions config/config_test.go

This file was deleted.

Loading

0 comments on commit 44f336d

Please sign in to comment.