Skip to content

Commit

Permalink
Fix the metrics timestamp (#31)
Browse files Browse the repository at this point in the history
Signed-off-by: sarabala1979 <[email protected]>
  • Loading branch information
sarabala1979 committed Nov 17, 2023
1 parent bb818d1 commit 3e31473
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 15 deletions.
36 changes: 23 additions & 13 deletions prometheus-pusher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ const (
)

type prometheusSink struct {
logger *zap.SugaredLogger
skipFailed bool
labels map[string]string
metrics *MetricsPublisher
logger *zap.SugaredLogger
skipFailed bool
labels map[string]string
metrics *MetricsPublisher
ignoreMetricsTs bool
}

type myCollector struct {
Expand Down Expand Up @@ -63,12 +64,20 @@ func (p *prometheusSink) push(msgPayloads []Payload) error {
switch payload.Type {
case "Gauge":
p.logger.Debugf("Creating Collector %s", payload.Name)
pusher = pusher.Collector(&myCollector{
metric: prometheus.NewDesc(payload.Name, "", nil, nil),
metricType: prometheus.GaugeValue,
value: payload.Value,
ts: time.UnixMilli(payload.TimestampMs),
})
if p.ignoreMetricsTs {
pusher = pusher.Collector(&myCollector{
metric: prometheus.NewDesc(payload.Name, "", nil, nil),
metricType: prometheus.GaugeValue,
value: payload.Value,
})
} else {
pusher = pusher.Collector(&myCollector{
metric: prometheus.NewDesc(payload.Name, "", nil, nil),
metricType: prometheus.GaugeValue,
value: payload.Value,
ts: time.UnixMilli(payload.TimestampMs),
})
}

for key, value := range payload.Labels {
pusher.Grouping(key, value)
Expand Down Expand Up @@ -159,8 +168,10 @@ func main() {
skipFailedStr := os.Getenv(SKIP_VALIDATION_FAILED)
labels := parseStringToMap(os.Getenv(METRICS_LABELS))
var metricPort int
var ignoreMetricsTs bool
meticslabels := numaflag.MapFlag{}

flag.BoolVar(&ignoreMetricsTs, "ignoreMetricsTs", true, "Ignore Metrics Timestamp")
flag.IntVar(&metricPort, "udsinkMetricsPort", 9090, "Metrics Port")
flag.Var(&meticslabels, "udsinkMetricsLabels", "Sink Metrics Labels E.g: label=val1,label1=val2")
// Parse the flag
Expand All @@ -174,12 +185,11 @@ func main() {
}
}

ps := prometheusSink{logger: logger, skipFailed: skipFailed, labels: labels}
ps := prometheusSink{logger: logger, skipFailed: skipFailed, labels: labels, ignoreMetricsTs: ignoreMetricsTs}
ps.metrics = NewMetricsServer(labels)
go ps.metrics.startMetricServer(metricPort)
ps.logger.Infof("Metrics publisher initialized with port=%d", metricPort)

err = sinksdk.NewServer(&prometheusSink{}).Start(context.Background())
err = sinksdk.NewServer(&ps).Start(context.Background())
if err != nil {
log.Panic("Failed to start sink function server: ", err)
}
Expand Down
3 changes: 1 addition & 2 deletions prometheus-pusher/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,5 @@ func NewMetricsServer(labels map[string]string) *MetricsPublisher {
func (mp *MetricsPublisher) startMetricServer(port int) error {
address := fmt.Sprintf(":%d", port)
http.Handle("/metrics", promhttp.Handler())
http.ListenAndServe(address, nil)
return nil
return http.ListenAndServe(address, nil)
}

0 comments on commit 3e31473

Please sign in to comment.