Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify metrics processor #76

Merged
merged 4 commits into from
Sep 26, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 67 additions & 31 deletions openshift_metrics/metrics_processor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from typing import List, Dict
from collections import namedtuple

GPU_UNKNOWN_TYPE = "GPU_UNKNOWN_TYPE"
GPUInfo = namedtuple("GPUInfo", ["gpu_type", "gpu_resource", "node_model"])


class MetricsProcessor:
Expand All @@ -18,28 +20,20 @@ def merge_metrics(self, metric_name, metric_list):
namespace = metric["metric"]["namespace"]
node = metric["metric"].get("node")

gpu_type = None
gpu_resource = None
node_model = None

self.merged_data.setdefault(namespace, {})
self.merged_data[namespace].setdefault(pod, {"metrics": {}})

if metric_name == "gpu_request":
gpu_type = metric["metric"].get(
"label_nvidia_com_gpu_product", GPU_UNKNOWN_TYPE
)
gpu_resource = metric["metric"].get("resource")
node_model = metric["metric"].get("label_nvidia_com_gpu_machine")
gpu_type, gpu_resource, node_model = self._extract_gpu_info(
metric_name, metric
)

for value in metric["values"]:
epoch_time = value[0]
for epoch_time, metric_value in metric["values"]:

self.merged_data[namespace][pod]["metrics"].setdefault(epoch_time, {})

self.merged_data[namespace][pod]["metrics"][epoch_time][
metric_name
] = value[1]
] = metric_value
if gpu_type:
self.merged_data[namespace][pod]["metrics"][epoch_time][
"gpu_type"
Expand All @@ -57,6 +51,22 @@ def merge_metrics(self, metric_name, metric_list):
"node"
] = node

@staticmethod
def _extract_gpu_info(metric_name: str, metric: Dict) -> GPUInfo:
"""Extract GPU related info"""
gpu_type = None
gpu_resource = None
node_model = None

if metric_name == "gpu_request":
gpu_type = metric["metric"].get(
"label_nvidia_com_gpu_product", GPU_UNKNOWN_TYPE
)
gpu_resource = metric["metric"].get("resource")
node_model = metric["metric"].get("label_nvidia_com_gpu_machine")

return GPUInfo(gpu_type, gpu_resource, node_model)

def condense_metrics(self, metrics_to_check: List[str]) -> Dict:
"""
Checks if the value of metrics is the same, and removes redundant
Expand All @@ -80,33 +90,59 @@ def condense_metrics(self, metrics_to_check: List[str]) -> Dict:

start_metric_dict = metrics_dict[start_epoch_time].copy()

for i in range(len(epoch_times_list)):
epoch_time = epoch_times_list[i]
same_metrics = True
continuous_metrics = True
for metric in metrics_to_check:
# If either cpu, memory or gpu request is diferent.
if metrics_dict[start_epoch_time].get(metric, 0) != metrics_dict[epoch_time].get(metric, 0): # fmt: skip
same_metrics = False

if i != 0 and epoch_time - epoch_times_list[i - 1] > interval:
# i.e. if the difference between 2 consecutive timestamps
# is more than the expected frequency then the pod was stopped
continuous_metrics = False

if not same_metrics or not continuous_metrics:
duration = epoch_times_list[i - 1] - start_epoch_time + interval
for i in range(1, len(epoch_times_list)):
current_time = epoch_times_list[i]
previous_time = epoch_times_list[i - 1]

metrics_changed = self._are_metrics_different(
metrics_dict[start_epoch_time],
metrics_dict[current_time],
metrics_to_check,
)

pod_was_stopped = self._was_pod_stopped(
current_time=current_time,
previous_time=previous_time,
interval=interval,
)

if metrics_changed or pod_was_stopped:
duration = previous_time - start_epoch_time + interval
start_metric_dict["duration"] = duration
new_metrics_dict[start_epoch_time] = start_metric_dict
start_epoch_time = epoch_time

# Reset start_epoch_time and start_metric_dict
start_epoch_time = current_time
start_metric_dict = metrics_dict[start_epoch_time].copy()

duration = epoch_time - start_epoch_time + interval
# Final block after the loop
duration = epoch_times_list[-1] - start_epoch_time + interval
start_metric_dict["duration"] = duration
new_metrics_dict[start_epoch_time] = start_metric_dict

# Update the pod dict with the condensed data
new_pod_dict = pod_dict.copy()
new_pod_dict["metrics"] = new_metrics_dict
condensed_dict[namespace][pod] = new_pod_dict

return condensed_dict

@staticmethod
def _are_metrics_different(
metrics_a: Dict, metrics_b: Dict, metrics_to_check: List[str]
) -> bool:
"""Method that compares all the metrics in metrics_to_check are different in
metrics_a and metrics_b
"""
return any(
metrics_a.get(metric, 0) != metrics_b.get(metric, 0)
for metric in metrics_to_check
)

@staticmethod
def _was_pod_stopped(current_time: int, previous_time: int, interval: int) -> bool:
"""
A pod is assumed to be stopped if the the gap between two consecutive timestamps
is more than the frequency of our metric collection
"""
return (current_time - previous_time) > interval
Loading