Skip to content

Commit 3cda12d

Browse files
feat(runner/metrics): Add metric broker (#1496)
* Revert "Revert "chore(deps): update frontend dependencies" (#1477)" This reverts commit ca0034a. * feat(runner): implement runner registration with gocast * fix: lint protofile * feat(runner): implement heartbeat notification * chore: lint imports/code * feat(runner): get streaming to work * feat(runner/stream): rework streaming logic * feat(runner/metrics): add broker to collect metrics * chore(runner/metrics): Add documentation and tests * Revert "Revert "Revert "chore(deps): update frontend dependencies" (#1477)"" This reverts commit fda4d5e. * fix: re-add comment for runner model
1 parent dcdead1 commit 3cda12d

File tree

10 files changed

+134
-11
lines changed

10 files changed

+134
-11
lines changed

runner/cmd/runner/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ func main() {
2020
shouldShutdown := false // set to true once we receive a shutdown signal
2121

2222
currentCount := 0
23+
2324
go func() {
2425
for {
2526
currentCount += <-r.JobCount // count Job start/stop

runner/pkg/actions/actions.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"log/slog"
66

7+
"github.com/tum-dev/gocast/runner/pkg/metrics"
78
"github.com/tum-dev/gocast/runner/protobuf"
89
)
910

@@ -13,4 +14,4 @@ import (
1314
// Actions should use log for logging and notify for sending messages like their progress to gocast.
1415
// d contains data passed to the action and is used to pass data to the next actions.
1516
// Any error, the action returns will be logged. If that error is an AbortingError, the subsequent actions will be skipped.
16-
type Action func(ctx context.Context, log *slog.Logger, notify chan *protobuf.Notification, d map[string]any) error
17+
type Action func(ctx context.Context, log *slog.Logger, notify chan *protobuf.Notification, d map[string]any, metrics *metrics.Broker) error

runner/pkg/actions/stream.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,12 @@ import (
1313
"time"
1414

1515
"github.com/tum-dev/gocast/runner/config"
16+
"github.com/tum-dev/gocast/runner/pkg/metrics"
1617
"github.com/tum-dev/gocast/runner/pkg/ptr"
1718
"github.com/tum-dev/gocast/runner/protobuf"
1819
)
1920

20-
func Stream(ctx context.Context, log *slog.Logger, notify chan *protobuf.Notification, d map[string]any) error {
21+
func Stream(ctx context.Context, log *slog.Logger, notify chan *protobuf.Notification, d map[string]any, metrics *metrics.Broker) error {
2122
if ctx.Err() != nil {
2223
return AbortingError(ctx.Err())
2324
}
@@ -51,6 +52,11 @@ func Stream(ctx context.Context, log *slog.Logger, notify chan *protobuf.Notific
5152
}
5253
log = log.With("stream_id", streamID)
5354

55+
metrics.Streams.With(metrics.With().Stream(streamID).Source(input).L()).Inc()
56+
defer func() {
57+
metrics.Streams.With(metrics.With().Stream(streamID).Source(input).L()).Dec()
58+
}()
59+
5460
// e.g. ./storage/live/1/STREAM_VERSION_COMBINED
5561
liveRecDir := path.Join(config.Config.SegmentPath, fmt.Sprintf("%d", streamID), streamVersion)
5662
err := os.MkdirAll(liveRecDir, os.ModePerm)
@@ -96,6 +102,7 @@ func Stream(ctx context.Context, log *slog.Logger, notify chan *protobuf.Notific
96102
go logCmdPipe(log, stdo, []any{"stream", streamID, "logStream", "stdout"})
97103
err = command.Run()
98104
if err != nil {
105+
metrics.StreamErrors.With(metrics.With().Stream(streamID).Source(input).L()).Inc()
99106
return err
100107
}
101108
return nil

runner/pkg/actions/stream_end.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,14 @@ import (
55
"fmt"
66
"log/slog"
77

8+
"github.com/tum-dev/gocast/runner/pkg/metrics"
89
"github.com/tum-dev/gocast/runner/protobuf"
910
)
1011

1112
// StreamEnd is an action who's sole purpose is to notify gocast about the end of a stream.
1213
// the only reason it is a separate action is to avoid sending unnecessary
1314
// stream_end notifications if Stream errors.
14-
func StreamEnd(_ context.Context, _ *slog.Logger, notify chan *protobuf.Notification, d map[string]any) error {
15+
func StreamEnd(_ context.Context, _ *slog.Logger, notify chan *protobuf.Notification, d map[string]any, metrics *metrics.Broker) error {
1516
streamID, ok := d["streamID"].(uint64)
1617
if !ok {
1718
return AbortingError(fmt.Errorf("no stream id in context"))

runner/pkg/metrics/broker.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
// Package metrics provides Prometheus compatible monitoring.
2+
package metrics
3+
4+
import (
5+
"fmt"
6+
"log/slog"
7+
"net/http"
8+
9+
"github.com/prometheus/client_golang/prometheus"
10+
"github.com/prometheus/client_golang/prometheus/promauto"
11+
"github.com/prometheus/client_golang/prometheus/promhttp"
12+
)
13+
14+
// Broker manages Prometheus metrics.
15+
type Broker struct {
16+
port int
17+
Streams *prometheus.GaugeVec
18+
StreamErrors *prometheus.CounterVec
19+
}
20+
21+
// Option represents a functional option for configuring a Broker.
22+
type Option func(broker *Broker)
23+
24+
// NewBroker initializes a new Broker with optional configurations.
25+
// It sets up Prometheus all available metrics.
26+
//
27+
// Example usage:
28+
//
29+
// b := metrics.NewBroker(metrics.WithPort(8080))
30+
// go b.Run()
31+
//
32+
// b.Streams.With(b.With().Stream(123).Input("rtmp://1.2.3.4/src")).Set(5) // Set active streams
33+
func NewBroker(options ...Option) *Broker {
34+
b := &Broker{
35+
port: 9947,
36+
Streams: promauto.NewGaugeVec(prometheus.GaugeOpts{
37+
Namespace: "runner",
38+
Subsystem: "stream",
39+
Name: "n_streams",
40+
Help: "Number of active streams",
41+
}, []string{"stream_id", "input"}),
42+
43+
StreamErrors: promauto.NewCounterVec(prometheus.CounterOpts{
44+
Namespace: "runner",
45+
Subsystem: "stream",
46+
Name: "n_errors",
47+
Help: "Number of stream ffmpeg errors",
48+
}, []string{"stream_id", "input"}),
49+
}
50+
for _, option := range options {
51+
option(b)
52+
}
53+
return b
54+
}
55+
56+
// WithPort configures the Broker to listen on a custom port.
57+
func WithPort(port int) Option {
58+
return func(broker *Broker) {
59+
broker.port = port
60+
}
61+
}
62+
63+
// Run starts an HTTP server that exposes the metrics.
64+
func (b *Broker) Run() {
65+
http.Handle("/metrics", promhttp.Handler())
66+
slog.Error("Serving metrics", "err", http.ListenAndServe(fmt.Sprintf(":%d", b.port), nil))
67+
}
68+
69+
// LabelBuilder helps construct Prometheus labels dynamically.
70+
type LabelBuilder prometheus.Labels
71+
72+
// L converts LabelBuilder to a Prometheus Labels map.
73+
func (b LabelBuilder) L() prometheus.Labels {
74+
return prometheus.Labels(b)
75+
}
76+
77+
// With initializes a new LabelBuilder.
78+
func (b *Broker) With() LabelBuilder {
79+
return LabelBuilder{}
80+
}
81+
82+
// Stream adds a "stream_id" label to LabelBuilder.
83+
func (b LabelBuilder) Stream(streamID uint64) LabelBuilder {
84+
b["stream_id"] = fmt.Sprintf("%d", streamID)
85+
return b
86+
}
87+
88+
// Source adds a "source" label to LabelBuilder.
89+
func (b LabelBuilder) Source(source string) LabelBuilder {
90+
b["source"] = source
91+
return b
92+
}

runner/pkg/metrics/broker_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package metrics
2+
3+
import "testing"
4+
5+
func TestLabelBuilder(t *testing.T) {
6+
b := NewBroker()
7+
if b == nil {
8+
t.Errorf("broker unexpectedly nil")
9+
return
10+
}
11+
if b.StreamErrors == nil {
12+
t.Errorf("broker metric not initialized")
13+
}
14+
labels := b.With().Stream(123).Source("test").L()
15+
if labels["stream_id"] != "123" || labels["source"] != "test" {
16+
t.Errorf("Unexpected labels: %+v", labels)
17+
}
18+
}

runner/protobuf/notifications.pb.go

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

runner/protobuf/runner.pb.go

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

runner/protobuf/runner_grpc.pb.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

runner/runner.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818

1919
"github.com/tum-dev/gocast/runner/config"
2020
"github.com/tum-dev/gocast/runner/pkg/actions"
21+
"github.com/tum-dev/gocast/runner/pkg/metrics"
2122
"github.com/tum-dev/gocast/runner/pkg/netutil"
2223
"github.com/tum-dev/gocast/runner/pkg/ptr"
2324
"github.com/tum-dev/gocast/runner/pkg/vmstat"
@@ -51,6 +52,7 @@ type Runner struct {
5152
protobuf.UnimplementedRunnerServiceServer
5253

5354
notifications chan *protobuf.Notification
55+
Metrics *metrics.Broker
5456
}
5557

5658
func NewRunner(v string) *Runner {
@@ -70,6 +72,7 @@ func NewRunner(v string) *Runner {
7072
stats: vmstats,
7173
StartTime: start,
7274
notifications: make(chan *protobuf.Notification),
75+
Metrics: metrics.NewBroker(),
7376
}
7477
}
7578

@@ -85,7 +88,8 @@ func (r *Runner) Run() {
8588
config.Config.Port = p
8689
}
8790
r.log.Info("using port", "port", config.Config.Port)
88-
91+
92+
go r.Metrics.Run()
8993
go r.handleNotifications()
9094
go r.InitApiGrpc()
9195
go func() {
@@ -147,7 +151,6 @@ func (r *Runner) InitApiGrpc() {
147151
r.log.Error("failed to serve", "error", err)
148152
os.Exit(1)
149153
}
150-
151154
}
152155

153156
func (r *Runner) RunAction(a []actions.Action, data map[string]any) string {
@@ -165,7 +168,7 @@ func (r *Runner) RunAction(a []actions.Action, data map[string]any) string {
165168
for _, action := range a {
166169
for {
167170
log := r.log.With("action", getFunctionName(action)).With("job", job)
168-
err := action(c, log, r.notifications, data)
171+
err := action(c, log, r.notifications, data, r.Metrics)
169172
if err != nil {
170173
log.Error("action error", "error", err) // use action specific logger
171174
if actions.IsAbortingError(err) {

0 commit comments

Comments
 (0)