Skip to content

Commit

Permalink
metric for metric on len k8s meta
Browse files Browse the repository at this point in the history
  • Loading branch information
DmitryRomanov committed Feb 25, 2025
1 parent df13d20 commit c9e9693
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 8 deletions.
13 changes: 12 additions & 1 deletion plugin/input/k8s/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ import (
"github.com/ozontech/file.d/cfg"
"github.com/ozontech/file.d/decoder"
"github.com/ozontech/file.d/fd"
"github.com/ozontech/file.d/metric"
"github.com/ozontech/file.d/pipeline"
"github.com/ozontech/file.d/plugin/input/file"
"github.com/ozontech/file.d/plugin/input/k8s/meta"
"github.com/prometheus/client_golang/prometheus"

"go.uber.org/atomic"
"go.uber.org/zap"
Expand Down Expand Up @@ -50,6 +52,9 @@ type Plugin struct {
params *pipeline.InputPluginParams

fp *file.Plugin

// plugin metrics
metaObjectsCount prometheus.Gauge
}

type Config struct {
Expand Down Expand Up @@ -154,11 +159,13 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.InputPluginPa
p.params = params
p.config = config.(*Config)

p.registerMetrics(params.MetricCtl)

startCounter := startCounter.Inc()

if startCounter == 1 {
meta.DeletedPodsCacheSize = p.config.DeletedPodsCacheSize
meta.EnableGatherer(p.logger)
meta.EnableGatherer(p.logger, p.metaObjectsCount)
}

if meta.CriType == "docker" {
Expand Down Expand Up @@ -192,6 +199,10 @@ func setBuiltInMeta(metaConfig cfg.MetaTemplates) {
metaConfig["k8s_container_id"] = "{{ .container_id }}"
}

func (p *Plugin) registerMetrics(ctl *metric.Ctl) {
p.metaObjectsCount = ctl.RegisterGauge("meta_objects_count", "")
}

/*{ meta-params
**`pod_name`** - string
Expand Down
12 changes: 8 additions & 4 deletions plugin/input/k8s/k8s_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import (

"github.com/ozontech/file.d/cfg"
"github.com/ozontech/file.d/logger"
"github.com/ozontech/file.d/metric"
"github.com/ozontech/file.d/pipeline"
"github.com/ozontech/file.d/plugin/input/k8s/meta"
"github.com/ozontech/file.d/plugin/output/devnull"
"github.com/ozontech/file.d/test"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
)
Expand Down Expand Up @@ -101,7 +103,8 @@ func getTestMeta() cfg.MetaTemplates {

func TestAllowedLabels(t *testing.T) {
p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(MultilineActionFactory, config(), pipeline.MatchModeAnd, nil, false))
meta.EnableGatherer(logger.Instance)
ctl := metric.NewCtl("test", prometheus.NewRegistry())
meta.EnableGatherer(logger.Instance, ctl.RegisterGauge("metadata_cache_size", ""))

wg := &sync.WaitGroup{}
wg.Add(2)
Expand Down Expand Up @@ -143,7 +146,8 @@ func TestAllowedLabels(t *testing.T) {

func TestK8SJoin(t *testing.T) {
p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(MultilineActionFactory, config(), pipeline.MatchModeAnd, nil, false))
meta.EnableGatherer(logger.Instance)
ctl := metric.NewCtl("test", prometheus.NewRegistry())
meta.EnableGatherer(logger.Instance, ctl.RegisterGauge("metadata_cache_size", ""))
wg := &sync.WaitGroup{}
wg.Add(4)

Expand Down Expand Up @@ -216,8 +220,8 @@ func TestK8SJoin(t *testing.T) {

func TestCleanUp(t *testing.T) {
p, _, _ := test.NewPipelineMock(test.NewActionPluginStaticInfo(MultilineActionFactory, config(), pipeline.MatchModeAnd, nil, false))

meta.EnableGatherer(logger.Instance)
ctl := metric.NewCtl("test", prometheus.NewRegistry())
meta.EnableGatherer(logger.Instance, ctl.RegisterGauge("metadata_cache_size", ""))

meta.PutMeta(getPodInfo(&meta.MetaItem{
Namespace: "sre",
Expand Down
17 changes: 15 additions & 2 deletions plugin/input/k8s/meta/gatherer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

lru "github.com/hashicorp/golang-lru/v2"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -76,13 +77,16 @@ var (

SelfNodeName string

localLogger *zap.SugaredLogger
localLogger *zap.SugaredLogger
metaObjectsCountMetric prometheus.Gauge
)

func EnableGatherer(l *zap.SugaredLogger) {
func EnableGatherer(l *zap.SugaredLogger, metaObjectsCount prometheus.Gauge) {
localLogger = l
localLogger.Info("enabling k8s meta gatherer")

metaObjectsCountMetric = metaObjectsCount

var err error
deletedPodsCache, err = lru.New[PodName, bool](DeletedPodsCacheSize)
if err != nil {
Expand Down Expand Up @@ -265,6 +269,10 @@ func cleanUpItems(items []*MetaItem) {
delete(MetaData, item.Namespace)
}
}

if len(items) > 0 {
metaObjectsCountMetric.Sub(float64(len(items)))
}
}

func GetPodMeta(ns Namespace, pod PodName, cid ContainerID) (bool, *podMeta) {
Expand Down Expand Up @@ -378,6 +386,11 @@ func putContainerMeta(ns Namespace, pod PodName, fullContainerID string, podInfo
}

metaDataMu.Lock()
_, exists := MetaData[ns][pod][containerID]
if !exists {
metaObjectsCountMetric.Add(1)
}

MetaData[ns][pod][containerID] = meta
metaDataMu.Unlock()
}
Expand Down
2 changes: 1 addition & 1 deletion plugin/input/k8s/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func TestPipeline(t *testing.T) {
k8sContainerID = strings.Clone(e.Root.Dig("k8s_container_id").AsString())
wg.Done()
})
p.Start()

item := &meta.MetaItem{
Namespace: "sre",
Expand All @@ -51,7 +52,6 @@ func TestPipeline(t *testing.T) {
meta.PutMeta(getPodInfo(item, true))
filename := getLogFilename(dir, item)

p.Start()
file, err := os.Create(filename)
if err != nil {
logger.Fatalf("Error creating file: %s", err.Error())
Expand Down

0 comments on commit c9e9693

Please sign in to comment.