Skip to content

Commit

Permalink
Kafka consumer metrics fixes #166 (#167)
Browse files Browse the repository at this point in the history
  • Loading branch information
mantzas authored Aug 7, 2018
1 parent c23ac55 commit df8f29a
Showing 1 changed file with 36 additions and 0 deletions.
36 changes: 36 additions & 0 deletions async/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"os"
"strconv"

"github.com/Shopify/sarama"
"github.com/mantzas/patron/async"
Expand All @@ -12,6 +13,7 @@ import (
"github.com/mantzas/patron/trace"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
)

type message struct {
Expand Down Expand Up @@ -49,6 +51,8 @@ const (
OffsetOldest Offset = -2
)

var topicPartitionOffsetDiff *prometheus.GaugeVec

// Consumer definition of a Kafka consumer.
type Consumer struct {
name string
Expand Down Expand Up @@ -104,6 +108,10 @@ func New(name, ct, topic string, brokers []string, oo ...OptionFunc) (*Consumer,
}
}

err = setupMetrics(name)
if err != nil {
return nil, err
}
return c, nil
}

Expand Down Expand Up @@ -132,6 +140,7 @@ func (c *Consumer) Consume(ctx context.Context) (<-chan async.Message, <-chan er
chErr <- consumerError
case msg := <-consumer.Messages():
c.log.Debugf("data received from topic %s", msg.Topic)
topicPartitionOffsetDiffGaugeSet(msg.Topic, msg.Partition, consumer.HighWaterMarkOffset(), msg.Offset)
go func() {
sp, chCtx := trace.StartConsumerSpan(ctx, c.name, trace.KafkaConsumerComponent, mapHeader(msg.Headers))

Expand Down Expand Up @@ -220,3 +229,30 @@ func mapHeader(hh []*sarama.RecordHeader) map[string]string {
}
return mp
}

func setupMetrics(namespace string) error {
if topicPartitionOffsetDiff != nil {
return nil
}

topicPartitionOffsetDiff = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: "kafka_consumer",
Name: "offset_diff",
Help: "Message offset difference with high watermark, classified by topic and partition",
},
[]string{"topic", "partition"},
)

if err := prometheus.Register(topicPartitionOffsetDiff); err != nil {
if _, ok := err.(prometheus.AlreadyRegisteredError); ok {
return errors.Wrap(err, "failed to register kafka consumer metrics")
}
}
return nil
}

func topicPartitionOffsetDiffGaugeSet(topic string, partition int32, high, offset int64) {
topicPartitionOffsetDiff.WithLabelValues(topic, strconv.FormatInt(int64(partition), 10)).Set(float64(high - offset))
}

0 comments on commit df8f29a

Please sign in to comment.