Skip to content

feat(sysadvisor): report node cpu metrics to kcmas and add tags #802

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 42 additions & 1 deletion pkg/agent/sysadvisor/plugin/metric-emitter/syncer/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package node
import (
"context"
"fmt"
"strconv"
"time"

v1 "k8s.io/api/core/v1"
Expand All @@ -29,6 +30,7 @@ import (
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/metacache"
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/metric-emitter/syncer"
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/metric-emitter/types"
sysadvisortypes "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/types"
"github.com/kubewharf/katalyst-core/pkg/config"
metricemitter "github.com/kubewharf/katalyst-core/pkg/config/agent/sysadvisor/metric-emitter"
"github.com/kubewharf/katalyst-core/pkg/consts"
Expand All @@ -43,8 +45,10 @@ import (
// nodeRawMetricNameMapping maps the raw metricName (collected from agent.MetricsFetcher)
// to the standard metricName (used by custom-metric-api-server)
var nodeRawMetricNameMapping = map[string]string{
consts.MetricLoad1MinSystem: apimetricnode.CustomMetricNodeCPULoad1Min,
consts.MetricCPUTotalSystem: apimetricnode.CustomMetricNodeCPUTotal,
consts.MetricCPUUsageSystem: apimetricnode.CustomMetricNodeCPUUsage,
consts.MetricCPUUsageRatioSystem: apimetricnode.CustomMetricNodeCPUUsageRatio,
consts.MetricLoad1MinSystem: apimetricnode.CustomMetricNodeCPULoad1Min,

consts.MetricMemFreeSystem: apimetricnode.CustomMetricNodeMemoryFree,
consts.MetricMemAvailableSystem: apimetricnode.CustomMetricNodeMemoryAvailable,
Expand Down Expand Up @@ -174,5 +178,42 @@ func (n *MetricSyncerNode) generateMetricTag(ctx context.Context) (tags []metric
}
}

// append cpu codename info
cpuCodeNameInterface := n.metaServer.MetricsFetcher.GetByStringIndex(consts.MetricCPUCodeName)
cpuCodeName, ok := cpuCodeNameInterface.(string)
if !ok {
klog.Warningf("parse cpu code name %v failed", cpuCodeNameInterface)
cpuCodeName = ""
}

tags = append(tags, metrics.MetricTag{
Key: fmt.Sprintf("%s%s", data.CustomMetricLabelSelectorPrefixKey, "cpu_codename"),
Val: cpuCodeName,
})

// append vendor info
isVM := ""
isVMInterface := n.metaServer.MetricsFetcher.GetByStringIndex(consts.MetricInfoIsVM)
isVMBool, ok := isVMInterface.(bool)
if !ok {
klog.Warningf("parse is vm %v failed", isVMInterface)
} else {
isVM = strconv.FormatBool(isVMBool)
}

tags = append(tags, metrics.MetricTag{
Key: fmt.Sprintf("%s%s", data.CustomMetricLabelSelectorPrefixKey, "is_vm"),
Val: isVM,
})

// append node numa bit mask
numas := n.metaServer.KatalystMachineInfo.CPUDetails.NUMANodes()
numaBitMask := sysadvisortypes.NumaIDBitMask(numas.ToSliceInt())

tags = append(tags, metrics.MetricTag{
Key: fmt.Sprintf("%s%s", data.CustomMetricLabelSelectorPrefixKey, "numa_bit_mask"),
Val: fmt.Sprintf("%d", numaBitMask),
})

return tags
}
131 changes: 131 additions & 0 deletions pkg/agent/sysadvisor/plugin/metric-emitter/syncer/node/node_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
Copyright 2022 The Katalyst Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package node

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
dynamicfake "k8s.io/client-go/dynamic/fake"
"k8s.io/client-go/kubernetes/fake"

internalfake "github.com/kubewharf/katalyst-api/pkg/client/clientset/versioned/fake"
"github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/options"
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/metacache"
"github.com/kubewharf/katalyst-core/pkg/client"
"github.com/kubewharf/katalyst-core/pkg/config"
"github.com/kubewharf/katalyst-core/pkg/config/agent/global"
metaconfig "github.com/kubewharf/katalyst-core/pkg/config/agent/metaserver"
"github.com/kubewharf/katalyst-core/pkg/consts"
"github.com/kubewharf/katalyst-core/pkg/metaserver"
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent"
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric"
metrictypes "github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric/types"
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/node"
"github.com/kubewharf/katalyst-core/pkg/metrics"
metricspool "github.com/kubewharf/katalyst-core/pkg/metrics/metrics-pool"
"github.com/kubewharf/katalyst-core/pkg/util/machine"
metricutil "github.com/kubewharf/katalyst-core/pkg/util/metric"
)

func generateTestConfiguration(t *testing.T) *config.Configuration {
testConfiguration, err := options.NewOptions().Config()
require.NoError(t, err)
require.NotNil(t, testConfiguration)
return testConfiguration
}

func generateTestGenericClientSet(kubeObjects, internalObjects []runtime.Object) *client.GenericClientSet {
return &client.GenericClientSet{
KubeClient: fake.NewSimpleClientset(kubeObjects...),
InternalClient: internalfake.NewSimpleClientset(internalObjects...),
DynamicClient: dynamicfake.NewSimpleDynamicClient(runtime.NewScheme(), internalObjects...),
}
}

func generateTestMetaServer() (*metaserver.MetaServer, error) {
nodeName := "test-node"

cpuTopology, err := machine.GenerateDummyCPUTopology(96, 2, 4)
if err != nil {
return nil, err
}

clientSet := generateTestGenericClientSet([]runtime.Object{&v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: nodeName,
},
}}, nil)

metaServer := &metaserver.MetaServer{
MetaAgent: &agent.MetaAgent{
KatalystMachineInfo: &machine.KatalystMachineInfo{
CPUTopology: cpuTopology,
},
MetricsFetcher: metric.NewFakeMetricsFetcher(metrics.DummyMetrics{}),
NodeFetcher: node.NewRemoteNodeFetcher(&global.BaseConfiguration{NodeName: nodeName}, &metaconfig.NodeConfiguration{}, clientSet.KubeClient.CoreV1().Nodes()),
},
}

return metaServer, nil
}

func TestReceiveRawNode(t *testing.T) {
t.Parallel()

conf := generateTestConfiguration(t)

metaServer, err := generateTestMetaServer()
assert.NoError(t, err)

metaServer.MetricsFetcher.RegisterExternalMetric(func(store *metricutil.MetricStore) {
store.SetByStringIndex(consts.MetricCPUCodeName, "test-codename")
store.SetByStringIndex(consts.MetricInfoIsVM, false)
})
metaServer.MetricsFetcher.Run(context.Background())

si, err := NewMetricSyncerNode(conf, struct{}{}, metrics.DummyMetrics{}, metricspool.DummyMetricsEmitterPool{}, metaServer, metacache.NewDummyMetaCacheImp())
assert.NoError(t, err)

s := si.(*MetricSyncerNode)
ctx, cancel := context.WithCancel(context.Background())
rChan := make(chan metrictypes.NotifiedResponse, 20)

go func() {
now := time.Now()
notifiedResponse := metrictypes.NotifiedResponse{
Req: metrictypes.NotifiedRequest{
MetricName: consts.MetricCPUUsageSystem,
},
MetricData: metricutil.MetricData{
Value: 0.6,
Time: &now,
},
}
rChan <- notifiedResponse
time.Sleep(time.Second)
cancel()
}()

s.receiveRawNode(ctx, rChan)
}
10 changes: 0 additions & 10 deletions pkg/agent/sysadvisor/plugin/metric-emitter/syncer/pod/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,6 @@ func (p *MetricSyncerPod) emitBorweinLatencyRegression() {
nodeName = pod.Spec.NodeName
}

qosLevel, err := p.qosConf.GetQoSLevelForPod(pod)
if err != nil {
klog.Warningf("get pod %v qos level error: %v", pod.Name, err)
qosLevel = ""
}

tags := p.generateMetricTag(pod)

for containerName, latencyRegression := range containerData {
Expand All @@ -122,10 +116,6 @@ func (p *MetricSyncerPod) emitBorweinLatencyRegression() {
Key: fmt.Sprintf("%scontainer", data.CustomMetricLabelSelectorPrefixKey),
Val: containerName,
},
metrics.MetricTag{
Key: fmt.Sprintf("%s", qosLevelTag),
Val: qosLevel,
},
)...)
}
}
Expand Down
35 changes: 33 additions & 2 deletions pkg/agent/sysadvisor/plugin/metric-emitter/syncer/pod/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
"k8s.io/kubelet/pkg/apis/resourceplugin/v1alpha1"

apimetricpod "github.com/kubewharf/katalyst-api/pkg/metric/pod"
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/metacache"
borweinconsts "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/models/borwein/consts"
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/metric-emitter/syncer"
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/metric-emitter/types"
sysadvisortypes "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/types"
"github.com/kubewharf/katalyst-core/pkg/config"
metricemitter "github.com/kubewharf/katalyst-core/pkg/config/agent/sysadvisor/metric-emitter"
"github.com/kubewharf/katalyst-core/pkg/config/generic"
Expand All @@ -40,12 +42,12 @@ import (
"github.com/kubewharf/katalyst-core/pkg/metrics"
metricspool "github.com/kubewharf/katalyst-core/pkg/metrics/metrics-pool"
"github.com/kubewharf/katalyst-core/pkg/util/general"
"github.com/kubewharf/katalyst-core/pkg/util/machine"
"github.com/kubewharf/katalyst-core/pkg/util/native"
)

const (
podMetricLabelSelectorNodeName = "node_name"
qosLevelTag = "qos_level"
podTrainingThroughputInferenceResultBorwein = "pod_borwein_training_throughput_inference_result"
podLatencyRegressionInferenceResultBorwein = "pod_borwein_latency_regression_inference_result"
nodeLatencyRegressionInferenceResultBorwein = "node_borwein_latency_regression_inference_result"
Expand Down Expand Up @@ -278,7 +280,36 @@ func (p *MetricSyncerPod) generateMetricTag(pod *v1.Pod) (tags []metrics.MetricT
}
}

return
// append qos level tag
qosLevel, err := p.qosConf.GetQoSLevelForPod(pod)
if err != nil {
klog.Warningf("get pod %v qos level error: %v", pod.Name, err)
qosLevel = ""
}

tags = append(tags, metrics.MetricTag{
Key: fmt.Sprintf("%s%s", data.CustomMetricLabelSelectorPrefixKey, "qos_level"),
Val: qosLevel,
})

// append main container numa bit mask
numaBitMask := int(0)
containerInfos, ok := p.metaReader.GetContainerEntries(string(pod.UID))
if ok {
for _, containerInfo := range containerInfos {
if containerInfo.ContainerType == v1alpha1.ContainerType_MAIN {
cpuset := machine.GetCPUAssignmentNUMAs(containerInfo.TopologyAwareAssignments)
numaBitMask = sysadvisortypes.NumaIDBitMask(cpuset.ToSliceInt())
}
}
}

tags = append(tags, metrics.MetricTag{
Key: fmt.Sprintf("%s%s", data.CustomMetricLabelSelectorPrefixKey, "numa_bit_mask"),
Val: fmt.Sprintf("%d", numaBitMask),
})

return tags
}

// metricPod filter out pods that won't be needed by custom metrics apiserver
Expand Down
8 changes: 8 additions & 0 deletions pkg/agent/sysadvisor/types/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,3 +339,11 @@ func (cl *ContainerInfoList) GetSource(index int) interface{} {
func (cl *ContainerInfoList) SetSource(index int, p interface{}) {
cl.containers[index] = p.(*ContainerInfo)
}

func NumaIDBitMask(numaIDs []int) int {
ret := int(0)
for _, id := range numaIDs {
ret += 1 << id
}
return ret
}
9 changes: 9 additions & 0 deletions pkg/consts/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ const (

// System compute metrics
const (
MetricCPUTotalSystem = "cpu.total.system"

MetricCPUUsageSystem = "cpu.usage.system"
MetricCPUUsageRatioSystem = "cpu.usage.ratio.system"

MetricLoad1MinSystem = "cpu.load.1min.system"
Expand Down Expand Up @@ -174,6 +177,12 @@ const (
MetricCPUUsageNuma = "cpu.usage.numa"
)

// System info metrics
const (
// MetricInfoIsVM is not normal metric, only used to store is virtual machine info into metric store
MetricInfoIsVM = "info.is.vm"
)

// System cpu compute metrics
const (
MetricCPUSchedwait = "cpu.schedwait.cpu"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ const (
SystemNetResource = "system/network"
SystemMemoryResource = "system/memory"
SystemComputeResource = "system/compute"
SystemInfoResource = "system/info"

RealtimePowerResource = "realtime/power"
)
Expand All @@ -54,7 +55,8 @@ const (
type SystemResourceKind int

const (
Compute SystemResourceKind = iota
Info SystemResourceKind = iota
Compute
Memory
IO
Net
Expand All @@ -74,6 +76,7 @@ func NewMalachiteClient(fetcher pod.PodFetcher, emitter metrics.MetricEmitter) *
urls := make(map[string]string)
for _, path := range []string{
CgroupResource,
SystemInfoResource,
SystemIOResource,
SystemNetResource,
SystemComputeResource,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,25 @@ import (
"github.com/kubewharf/katalyst-core/pkg/util/general"
)

func (c *MalachiteClient) GetSystemInfoStats() (*types.SystemInfoData, error) {
statsData, err := c.getSystemStats(Info)
if err != nil {
return nil, err
}

rsp := &types.MalachiteSystemInfoResponse{}
if err := json.Unmarshal(statsData, rsp); err != nil {
return nil, fmt.Errorf("failed to unmarshal system info stats raw data, err %s", err)
}

if rsp.Status != 0 {
return nil, fmt.Errorf("system info stats status is not ok, %d", rsp.Status)
}

c.checkSystemStatsOutOfDate("info", UpdateTimeout, rsp.Data.UpdateTime)
return &rsp.Data, nil
}

func (c *MalachiteClient) GetSystemComputeStats() (*types.SystemComputeData, error) {
statsData, err := c.getSystemStats(Compute)
if err != nil {
Expand Down Expand Up @@ -110,6 +129,8 @@ func (c *MalachiteClient) getSystemStats(kind SystemResourceKind) ([]byte, error

resource := ""
switch kind {
case Info:
resource = SystemInfoResource
case Compute:
resource = SystemComputeResource
case Memory:
Expand Down
Loading