Skip to content

Commit 6f79b21

Browse files
authored
Merge pull request #802 from sun-yuliang/dev/report-node-agg-metrics-to-kcmas
feat(sysadvisor): report node cpu metrics to kcmas and add tags
2 parents 5733818 + dd0c2ba commit 6f79b21

File tree

12 files changed

+316
-14
lines changed

12 files changed

+316
-14
lines changed

pkg/agent/sysadvisor/plugin/metric-emitter/syncer/node/node.go

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package node
1919
import (
2020
"context"
2121
"fmt"
22+
"strconv"
2223
"time"
2324

2425
v1 "k8s.io/api/core/v1"
@@ -29,6 +30,7 @@ import (
2930
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/metacache"
3031
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/metric-emitter/syncer"
3132
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/metric-emitter/types"
33+
sysadvisortypes "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/types"
3234
"github.com/kubewharf/katalyst-core/pkg/config"
3335
metricemitter "github.com/kubewharf/katalyst-core/pkg/config/agent/sysadvisor/metric-emitter"
3436
"github.com/kubewharf/katalyst-core/pkg/consts"
@@ -43,8 +45,10 @@ import (
4345
// nodeRawMetricNameMapping maps the raw metricName (collected from agent.MetricsFetcher)
4446
// to the standard metricName (used by custom-metric-api-server)
4547
var nodeRawMetricNameMapping = map[string]string{
46-
consts.MetricLoad1MinSystem: apimetricnode.CustomMetricNodeCPULoad1Min,
48+
consts.MetricCPUTotalSystem: apimetricnode.CustomMetricNodeCPUTotal,
49+
consts.MetricCPUUsageSystem: apimetricnode.CustomMetricNodeCPUUsage,
4750
consts.MetricCPUUsageRatioSystem: apimetricnode.CustomMetricNodeCPUUsageRatio,
51+
consts.MetricLoad1MinSystem: apimetricnode.CustomMetricNodeCPULoad1Min,
4852

4953
consts.MetricMemFreeSystem: apimetricnode.CustomMetricNodeMemoryFree,
5054
consts.MetricMemAvailableSystem: apimetricnode.CustomMetricNodeMemoryAvailable,
@@ -174,5 +178,42 @@ func (n *MetricSyncerNode) generateMetricTag(ctx context.Context) (tags []metric
174178
}
175179
}
176180

181+
// append cpu codename info
182+
cpuCodeNameInterface := n.metaServer.MetricsFetcher.GetByStringIndex(consts.MetricCPUCodeName)
183+
cpuCodeName, ok := cpuCodeNameInterface.(string)
184+
if !ok {
185+
klog.Warningf("parse cpu code name %v failed", cpuCodeNameInterface)
186+
cpuCodeName = ""
187+
}
188+
189+
tags = append(tags, metrics.MetricTag{
190+
Key: fmt.Sprintf("%s%s", data.CustomMetricLabelSelectorPrefixKey, "cpu_codename"),
191+
Val: cpuCodeName,
192+
})
193+
194+
// append vendor info
195+
isVM := ""
196+
isVMInterface := n.metaServer.MetricsFetcher.GetByStringIndex(consts.MetricInfoIsVM)
197+
isVMBool, ok := isVMInterface.(bool)
198+
if !ok {
199+
klog.Warningf("parse is vm %v failed", isVMInterface)
200+
} else {
201+
isVM = strconv.FormatBool(isVMBool)
202+
}
203+
204+
tags = append(tags, metrics.MetricTag{
205+
Key: fmt.Sprintf("%s%s", data.CustomMetricLabelSelectorPrefixKey, "is_vm"),
206+
Val: isVM,
207+
})
208+
209+
// append node numa bit mask
210+
numas := n.metaServer.KatalystMachineInfo.CPUDetails.NUMANodes()
211+
numaBitMask := sysadvisortypes.NumaIDBitMask(numas.ToSliceInt())
212+
213+
tags = append(tags, metrics.MetricTag{
214+
Key: fmt.Sprintf("%s%s", data.CustomMetricLabelSelectorPrefixKey, "numa_bit_mask"),
215+
Val: fmt.Sprintf("%d", numaBitMask),
216+
})
217+
177218
return tags
178219
}
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/*
2+
Copyright 2022 The Katalyst Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package node
18+
19+
import (
20+
"context"
21+
"testing"
22+
"time"
23+
24+
"github.com/stretchr/testify/assert"
25+
"github.com/stretchr/testify/require"
26+
v1 "k8s.io/api/core/v1"
27+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28+
"k8s.io/apimachinery/pkg/runtime"
29+
dynamicfake "k8s.io/client-go/dynamic/fake"
30+
"k8s.io/client-go/kubernetes/fake"
31+
32+
internalfake "github.com/kubewharf/katalyst-api/pkg/client/clientset/versioned/fake"
33+
"github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/options"
34+
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/metacache"
35+
"github.com/kubewharf/katalyst-core/pkg/client"
36+
"github.com/kubewharf/katalyst-core/pkg/config"
37+
"github.com/kubewharf/katalyst-core/pkg/config/agent/global"
38+
metaconfig "github.com/kubewharf/katalyst-core/pkg/config/agent/metaserver"
39+
"github.com/kubewharf/katalyst-core/pkg/consts"
40+
"github.com/kubewharf/katalyst-core/pkg/metaserver"
41+
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent"
42+
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric"
43+
metrictypes "github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric/types"
44+
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/node"
45+
"github.com/kubewharf/katalyst-core/pkg/metrics"
46+
metricspool "github.com/kubewharf/katalyst-core/pkg/metrics/metrics-pool"
47+
"github.com/kubewharf/katalyst-core/pkg/util/machine"
48+
metricutil "github.com/kubewharf/katalyst-core/pkg/util/metric"
49+
)
50+
51+
func generateTestConfiguration(t *testing.T) *config.Configuration {
52+
testConfiguration, err := options.NewOptions().Config()
53+
require.NoError(t, err)
54+
require.NotNil(t, testConfiguration)
55+
return testConfiguration
56+
}
57+
58+
func generateTestGenericClientSet(kubeObjects, internalObjects []runtime.Object) *client.GenericClientSet {
59+
return &client.GenericClientSet{
60+
KubeClient: fake.NewSimpleClientset(kubeObjects...),
61+
InternalClient: internalfake.NewSimpleClientset(internalObjects...),
62+
DynamicClient: dynamicfake.NewSimpleDynamicClient(runtime.NewScheme(), internalObjects...),
63+
}
64+
}
65+
66+
func generateTestMetaServer() (*metaserver.MetaServer, error) {
67+
nodeName := "test-node"
68+
69+
cpuTopology, err := machine.GenerateDummyCPUTopology(96, 2, 4)
70+
if err != nil {
71+
return nil, err
72+
}
73+
74+
clientSet := generateTestGenericClientSet([]runtime.Object{&v1.Node{
75+
ObjectMeta: metav1.ObjectMeta{
76+
Name: nodeName,
77+
},
78+
}}, nil)
79+
80+
metaServer := &metaserver.MetaServer{
81+
MetaAgent: &agent.MetaAgent{
82+
KatalystMachineInfo: &machine.KatalystMachineInfo{
83+
CPUTopology: cpuTopology,
84+
},
85+
MetricsFetcher: metric.NewFakeMetricsFetcher(metrics.DummyMetrics{}),
86+
NodeFetcher: node.NewRemoteNodeFetcher(&global.BaseConfiguration{NodeName: nodeName}, &metaconfig.NodeConfiguration{}, clientSet.KubeClient.CoreV1().Nodes()),
87+
},
88+
}
89+
90+
return metaServer, nil
91+
}
92+
93+
func TestReceiveRawNode(t *testing.T) {
94+
t.Parallel()
95+
96+
conf := generateTestConfiguration(t)
97+
98+
metaServer, err := generateTestMetaServer()
99+
assert.NoError(t, err)
100+
101+
metaServer.MetricsFetcher.RegisterExternalMetric(func(store *metricutil.MetricStore) {
102+
store.SetByStringIndex(consts.MetricCPUCodeName, "test-codename")
103+
store.SetByStringIndex(consts.MetricInfoIsVM, false)
104+
})
105+
metaServer.MetricsFetcher.Run(context.Background())
106+
107+
si, err := NewMetricSyncerNode(conf, struct{}{}, metrics.DummyMetrics{}, metricspool.DummyMetricsEmitterPool{}, metaServer, metacache.NewDummyMetaCacheImp())
108+
assert.NoError(t, err)
109+
110+
s := si.(*MetricSyncerNode)
111+
ctx, cancel := context.WithCancel(context.Background())
112+
rChan := make(chan metrictypes.NotifiedResponse, 20)
113+
114+
go func() {
115+
now := time.Now()
116+
notifiedResponse := metrictypes.NotifiedResponse{
117+
Req: metrictypes.NotifiedRequest{
118+
MetricName: consts.MetricCPUUsageSystem,
119+
},
120+
MetricData: metricutil.MetricData{
121+
Value: 0.6,
122+
Time: &now,
123+
},
124+
}
125+
rChan <- notifiedResponse
126+
time.Sleep(time.Second)
127+
cancel()
128+
}()
129+
130+
s.receiveRawNode(ctx, rChan)
131+
}

pkg/agent/sysadvisor/plugin/metric-emitter/syncer/pod/model.go

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -96,12 +96,6 @@ func (p *MetricSyncerPod) emitBorweinLatencyRegression() {
9696
nodeName = pod.Spec.NodeName
9797
}
9898

99-
qosLevel, err := p.qosConf.GetQoSLevelForPod(pod)
100-
if err != nil {
101-
klog.Warningf("get pod %v qos level error: %v", pod.Name, err)
102-
qosLevel = ""
103-
}
104-
10599
tags := p.generateMetricTag(pod)
106100

107101
for containerName, latencyRegression := range containerData {
@@ -122,10 +116,6 @@ func (p *MetricSyncerPod) emitBorweinLatencyRegression() {
122116
Key: fmt.Sprintf("%scontainer", data.CustomMetricLabelSelectorPrefixKey),
123117
Val: containerName,
124118
},
125-
metrics.MetricTag{
126-
Key: fmt.Sprintf("%s", qosLevelTag),
127-
Val: qosLevel,
128-
},
129119
)...)
130120
}
131121
}

pkg/agent/sysadvisor/plugin/metric-emitter/syncer/pod/pod.go

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,14 @@ import (
2424
"k8s.io/apimachinery/pkg/util/sets"
2525
"k8s.io/apimachinery/pkg/util/wait"
2626
"k8s.io/klog/v2"
27+
"k8s.io/kubelet/pkg/apis/resourceplugin/v1alpha1"
2728

2829
apimetricpod "github.com/kubewharf/katalyst-api/pkg/metric/pod"
2930
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/metacache"
3031
borweinconsts "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/inference/models/borwein/consts"
3132
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/metric-emitter/syncer"
3233
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/metric-emitter/types"
34+
sysadvisortypes "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/types"
3335
"github.com/kubewharf/katalyst-core/pkg/config"
3436
metricemitter "github.com/kubewharf/katalyst-core/pkg/config/agent/sysadvisor/metric-emitter"
3537
"github.com/kubewharf/katalyst-core/pkg/config/generic"
@@ -40,12 +42,12 @@ import (
4042
"github.com/kubewharf/katalyst-core/pkg/metrics"
4143
metricspool "github.com/kubewharf/katalyst-core/pkg/metrics/metrics-pool"
4244
"github.com/kubewharf/katalyst-core/pkg/util/general"
45+
"github.com/kubewharf/katalyst-core/pkg/util/machine"
4346
"github.com/kubewharf/katalyst-core/pkg/util/native"
4447
)
4548

4649
const (
4750
podMetricLabelSelectorNodeName = "node_name"
48-
qosLevelTag = "qos_level"
4951
podTrainingThroughputInferenceResultBorwein = "pod_borwein_training_throughput_inference_result"
5052
podLatencyRegressionInferenceResultBorwein = "pod_borwein_latency_regression_inference_result"
5153
nodeLatencyRegressionInferenceResultBorwein = "node_borwein_latency_regression_inference_result"
@@ -278,7 +280,36 @@ func (p *MetricSyncerPod) generateMetricTag(pod *v1.Pod) (tags []metrics.MetricT
278280
}
279281
}
280282

281-
return
283+
// append qos level tag
284+
qosLevel, err := p.qosConf.GetQoSLevelForPod(pod)
285+
if err != nil {
286+
klog.Warningf("get pod %v qos level error: %v", pod.Name, err)
287+
qosLevel = ""
288+
}
289+
290+
tags = append(tags, metrics.MetricTag{
291+
Key: fmt.Sprintf("%s%s", data.CustomMetricLabelSelectorPrefixKey, "qos_level"),
292+
Val: qosLevel,
293+
})
294+
295+
// append main container numa bit mask
296+
numaBitMask := int(0)
297+
containerInfos, ok := p.metaReader.GetContainerEntries(string(pod.UID))
298+
if ok {
299+
for _, containerInfo := range containerInfos {
300+
if containerInfo.ContainerType == v1alpha1.ContainerType_MAIN {
301+
cpuset := machine.GetCPUAssignmentNUMAs(containerInfo.TopologyAwareAssignments)
302+
numaBitMask = sysadvisortypes.NumaIDBitMask(cpuset.ToSliceInt())
303+
}
304+
}
305+
}
306+
307+
tags = append(tags, metrics.MetricTag{
308+
Key: fmt.Sprintf("%s%s", data.CustomMetricLabelSelectorPrefixKey, "numa_bit_mask"),
309+
Val: fmt.Sprintf("%d", numaBitMask),
310+
})
311+
312+
return tags
282313
}
283314

284315
// metricPod filter out pods that won't be needed by custom metrics apiserver

pkg/agent/sysadvisor/types/helper.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -339,3 +339,11 @@ func (cl *ContainerInfoList) GetSource(index int) interface{} {
339339
func (cl *ContainerInfoList) SetSource(index int, p interface{}) {
340340
cl.containers[index] = p.(*ContainerInfo)
341341
}
342+
343+
func NumaIDBitMask(numaIDs []int) int {
344+
ret := int(0)
345+
for _, id := range numaIDs {
346+
ret += 1 << id
347+
}
348+
return ret
349+
}

pkg/consts/metric.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ const (
3030

3131
// System compute metrics
3232
const (
33+
MetricCPUTotalSystem = "cpu.total.system"
34+
35+
MetricCPUUsageSystem = "cpu.usage.system"
3336
MetricCPUUsageRatioSystem = "cpu.usage.ratio.system"
3437

3538
MetricLoad1MinSystem = "cpu.load.1min.system"
@@ -174,6 +177,12 @@ const (
174177
MetricCPUUsageNuma = "cpu.usage.numa"
175178
)
176179

180+
// System info metrics
181+
const (
182+
// MetricInfoIsVM is not normal metric, only used to store is virtual machine info into metric store
183+
MetricInfoIsVM = "info.is.vm"
184+
)
185+
177186
// System cpu compute metrics
178187
const (
179188
MetricCPUSchedwait = "cpu.schedwait.cpu"

pkg/metaserver/agent/metric/provisioner/malachite/client/client.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ const (
3535
SystemNetResource = "system/network"
3636
SystemMemoryResource = "system/memory"
3737
SystemComputeResource = "system/compute"
38+
SystemInfoResource = "system/info"
3839

3940
RealtimePowerResource = "realtime/power"
4041
)
@@ -54,7 +55,8 @@ const (
5455
type SystemResourceKind int
5556

5657
const (
57-
Compute SystemResourceKind = iota
58+
Info SystemResourceKind = iota
59+
Compute
5860
Memory
5961
IO
6062
Net
@@ -74,6 +76,7 @@ func NewMalachiteClient(fetcher pod.PodFetcher, emitter metrics.MetricEmitter) *
7476
urls := make(map[string]string)
7577
for _, path := range []string{
7678
CgroupResource,
79+
SystemInfoResource,
7780
SystemIOResource,
7881
SystemNetResource,
7982
SystemComputeResource,

pkg/metaserver/agent/metric/provisioner/malachite/client/client_system.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,25 @@ import (
2828
"github.com/kubewharf/katalyst-core/pkg/util/general"
2929
)
3030

31+
func (c *MalachiteClient) GetSystemInfoStats() (*types.SystemInfoData, error) {
32+
statsData, err := c.getSystemStats(Info)
33+
if err != nil {
34+
return nil, err
35+
}
36+
37+
rsp := &types.MalachiteSystemInfoResponse{}
38+
if err := json.Unmarshal(statsData, rsp); err != nil {
39+
return nil, fmt.Errorf("failed to unmarshal system info stats raw data, err %s", err)
40+
}
41+
42+
if rsp.Status != 0 {
43+
return nil, fmt.Errorf("system info stats status is not ok, %d", rsp.Status)
44+
}
45+
46+
c.checkSystemStatsOutOfDate("info", UpdateTimeout, rsp.Data.UpdateTime)
47+
return &rsp.Data, nil
48+
}
49+
3150
func (c *MalachiteClient) GetSystemComputeStats() (*types.SystemComputeData, error) {
3251
statsData, err := c.getSystemStats(Compute)
3352
if err != nil {
@@ -110,6 +129,8 @@ func (c *MalachiteClient) getSystemStats(kind SystemResourceKind) ([]byte, error
110129

111130
resource := ""
112131
switch kind {
132+
case Info:
133+
resource = SystemInfoResource
113134
case Compute:
114135
resource = SystemComputeResource
115136
case Memory:

0 commit comments

Comments
 (0)