From 80124b3f60476c4fb5f655e6c62395249d12d209 Mon Sep 17 00:00:00 2001 From: Renan Souza Date: Mon, 3 Jul 2023 12:45:48 -0400 Subject: [PATCH 1/6] Native nvidia gpu telemetry --- .../commons/flowcept_dataclasses/telemetry.py | 132 ++++++++++++++---- .../flowceptor/plugins/base_interceptor.py | 22 +++ .../plugins/dask/dask_interceptor.py | 6 +- flowcept/flowceptor/telemetry_capture.py | 123 +++++++++++----- requirements.txt | 1 + setup.py | 2 +- tests/telemetry_test.py | 7 +- 7 files changed, 231 insertions(+), 62 deletions(-) diff --git a/flowcept/commons/flowcept_dataclasses/telemetry.py b/flowcept/commons/flowcept_dataclasses/telemetry.py index 9e2891f4..0f51b095 100644 --- a/flowcept/commons/flowcept_dataclasses/telemetry.py +++ b/flowcept/commons/flowcept_dataclasses/telemetry.py @@ -1,28 +1,85 @@ from typing import List, Dict +from dataclasses import dataclass, asdict, field + + +def remove_none_values(_dict): + return {k: v for (k, v) in _dict if v is not None} class Telemetry: - class _CPU: - times: Dict[str, float] = None # this is an average of all cpus - percent: float = None + @dataclass(init=False) + class CPU: + @dataclass + class CPUMetrics: + user: float + nice: float + system: float + idle: float + + times_avg: CPUMetrics + percent_all: float = None - times_per_cpu: List[Dict[str, float]] = None + times_per_cpu: List[CPUMetrics] = None percent_per_cpu: List[float] = None - class _Memory: - virtual: Dict[str, float] - swap: Dict[str, float] + @dataclass(init=False) + class Memory: + @dataclass + class MemoryMetrics: + total: int = field(default=None) + used: int = field(default=None) + free: int = field(default=None) + percent: int = field(default=None) + sin: int = field(default=None) + sout: int = field(default=None) + available: int = field(default=None) + active: int = field(default=None) + inactive: int = field(default=None) + wired: int = field(default=None) + + virtual: MemoryMetrics = field(default=None) + swap: MemoryMetrics = field(default=None) + + @dataclass(init=False) + class Network: + @dataclass + class NetworkMetrics: + bytes_sent: int + bytes_recv: int + packets_sent: int + packets_recv: int + errin: int + errout: int + dropin: int + dropout: int - class _Network: - netio: Dict[str, int] - netio_per_interface: Dict[str, Dict[str, int]] + netio_sum: NetworkMetrics = None + netio_per_interface: Dict[str, NetworkMetrics] = None - class _Disk: - disk_usage: Dict[str, float] - io: Dict[str, float] - io_per_disk: Dict[str, Dict[str, float]] + @dataclass(init=False) + class Disk: + @dataclass + class DiskUsage: + total: int + used: int + free: int + percent: float - class _Process: + @dataclass + class DiskMetrics: + read_count: int + write_count: int + read_bytes: int + write_bytes: int + read_time: int + write_time: int + + disk_usage: DiskUsage + io_sum: DiskMetrics + io_per_disk: Dict[str, DiskMetrics] = field(default=None) + + # TODO: make it dataclass, like the others + class Process: pid: int cpu_number: int memory: Dict[str, float] @@ -38,22 +95,45 @@ class _Process: executable: str cmd_line: List[str] - cpu: _CPU = None - process: _Process = None - memory: _Memory = None - disk: _Disk = None - network: _Network = None + @dataclass(init=False) + class GPU: + @dataclass + class GPUMetrics: + total: int + free: int + used: int + + gpu_total: GPUMetrics + per_gpu: Dict[int, GPUMetrics] = None + + cpu: CPU = None + process: Process = None + memory: Memory = None + disk: Disk = None + network: Network = None + gpu: GPU = None def to_dict(self): ret = {} if self.cpu is not None: - ret["cpu"] = self.cpu.__dict__ - if self.process is not None: - ret["process"] = self.process.__dict__ + ret["cpu"] = asdict(self.cpu, dict_factory=remove_none_values) + if self.memory is not None: - ret["memory"] = self.memory.__dict__ + ret["memory"] = asdict( + self.memory, dict_factory=remove_none_values + ) if self.disk is not None: - ret["disk"] = self.disk.__dict__ + ret["disk"] = asdict(self.disk, dict_factory=remove_none_values) + if self.network is not None: - ret["network"] = self.network.__dict__ + ret["network"] = asdict( + self.network, dict_factory=remove_none_values + ) + + if self.gpu is not None: + ret["gpu"] = asdict(self.gpu, dict_factory=remove_none_values) + + if self.process is not None: + ret["process"] = self.process.__dict__ + return ret diff --git a/flowcept/flowceptor/plugins/base_interceptor.py b/flowcept/flowceptor/plugins/base_interceptor.py index accc4a9e..e343d95e 100644 --- a/flowcept/flowceptor/plugins/base_interceptor.py +++ b/flowcept/flowceptor/plugins/base_interceptor.py @@ -11,6 +11,7 @@ CAMPAIGN_ID, HOSTNAME, EXTRA_METADATA, + TELEMETRY_CAPTURE, ) from flowcept.commons.flowcept_logger import FlowceptLogger from flowcept.commons.daos.mq_dao import MQDao @@ -73,6 +74,17 @@ def start(self) -> "BaseInterceptor": :return: """ self._mq_dao.start_time_based_flushing() + + if TELEMETRY_CAPTURE is not None: + if TELEMETRY_CAPTURE["gpu"]: + try: + from pynvml import nvmlInit + + nvmlInit() + except Exception as e: + self.logger.error("NVIDIA GPU NOT FOUND!") + self.logger.exception(e) + return self def stop(self) -> bool: @@ -82,6 +94,16 @@ def stop(self) -> bool: """ self._mq_dao.stop() + if TELEMETRY_CAPTURE is not None: + if TELEMETRY_CAPTURE["gpu"]: + try: + from pynvml import nvmlShutdown + + nvmlShutdown() + except Exception as e: + self.logger.error("NVIDIA GPU NOT FOUND!") + self.logger.exception(e) + def observe(self, *args, **kwargs): """ This method implements data observability over a data channel diff --git a/flowcept/flowceptor/plugins/dask/dask_interceptor.py b/flowcept/flowceptor/plugins/dask/dask_interceptor.py index 9c12d529..c0c4b612 100644 --- a/flowcept/flowceptor/plugins/dask/dask_interceptor.py +++ b/flowcept/flowceptor/plugins/dask/dask_interceptor.py @@ -148,7 +148,9 @@ def callback(self, task_id, start, finish, *args, **kwargs): if ts.state == "executing": if TELEMETRY_CAPTURE is not None: - task_msg.telemetry_at_start = capture_telemetry() + task_msg.telemetry_at_start = capture_telemetry( + self.logger + ) task_msg.status = Status.RUNNING task_msg.address = self._worker.worker_address if self.settings.worker_create_timestamps: @@ -160,7 +162,7 @@ def callback(self, task_id, start, finish, *args, **kwargs): else: get_times_from_task_state(task_msg, ts) if TELEMETRY_CAPTURE is not None: - task_msg.telemetry_at_end = capture_telemetry() + task_msg.telemetry_at_end = capture_telemetry(self.logger) elif ts.state == "error": task_msg.status = Status.ERROR if self.settings.worker_create_timestamps: diff --git a/flowcept/flowceptor/telemetry_capture.py b/flowcept/flowceptor/telemetry_capture.py index 4c307d58..e047d5d5 100644 --- a/flowcept/flowceptor/telemetry_capture.py +++ b/flowcept/flowceptor/telemetry_capture.py @@ -1,81 +1,95 @@ +from logging import Logger from typing import Dict import psutil +from pynvml import ( + nvmlDeviceGetCount, + nvmlDeviceGetHandleByIndex, + nvmlDeviceGetMemoryInfo, +) from flowcept.configs import TELEMETRY_CAPTURE from flowcept.commons.flowcept_dataclasses.telemetry import Telemetry -from flowcept.commons.flowcept_logger import FlowceptLogger -def capture_telemetry() -> Telemetry: +def capture_telemetry(logger: Logger) -> Telemetry: conf = TELEMETRY_CAPTURE if conf is None: return None tel = Telemetry() - tel.cpu = _capture_cpu(conf) - tel.process = _capture_process_info(conf) - tel.memory = _capture_memory(conf) - tel.network = _capture_network(conf) - tel.disk = _capture_disk(conf) + tel.process = _capture_process_info(conf, logger) + tel.cpu = _capture_cpu(conf, logger) + tel.memory = _capture_memory(conf, logger) + tel.network = _capture_network(conf, logger) + tel.disk = _capture_disk(conf, logger) + tel.gpu = _capture_gpu(conf, logger) return tel -def _capture_disk(conf): +def _capture_disk(conf, logger): capt = conf.get("disk", False) if not capt: return None try: - disk = Telemetry._Disk() - disk.disk_usage = psutil.disk_usage("/")._asdict() - disk.io = psutil.disk_io_counters(perdisk=False)._asdict() + disk = Telemetry.Disk() + disk.disk_usage = disk.DiskUsage(**psutil.disk_usage("/")._asdict()) + disk.io_sum = disk.DiskMetrics( + **psutil.disk_io_counters(perdisk=False)._asdict() + ) io_perdisk = psutil.disk_io_counters(perdisk=True) if len(io_perdisk) > 1: disk.io_per_disk = {} for d in io_perdisk: - disk.io_per_disk[d] = io_perdisk[d]._asdict() + disk.io_per_disk[d] = disk.DiskMetrics( + **io_perdisk[d]._asdict() + ) return disk except Exception as e: - FlowceptLogger.get_logger().exception(e) + logger.exception(e) -def _capture_network(conf): +def _capture_network(conf, logger): capt = conf.get("network", False) if not capt: return None try: - net = Telemetry._Network() - net.netio = psutil.net_io_counters(pernic=False)._asdict() + net = Telemetry.Network() + net.netio_sum = net.NetworkMetrics( + **psutil.net_io_counters(pernic=False)._asdict() + ) pernic = psutil.net_io_counters(pernic=True) net.netio_per_interface = {} for ic in pernic: if pernic[ic].bytes_sent and pernic[ic].bytes_recv: - net.netio_per_interface[ic] = pernic[ic] + net.netio_per_interface[ic] = net.NetworkMetrics( + **pernic[ic]._asdict() + ) return net except Exception as e: - FlowceptLogger.get_logger().exception(e) + logger.exception(e) -def _capture_memory(conf): +def _capture_memory(conf, logger): capt = conf.get("mem", False) if not capt: return None try: - mem = Telemetry._Memory() - mem.virtual = psutil.virtual_memory()._asdict() - mem.swap = psutil.swap_memory()._asdict() + mem = Telemetry.Memory() + mem.virtual = mem.MemoryMetrics(**psutil.virtual_memory()._asdict()) + mem.swap = mem.MemoryMetrics(**psutil.swap_memory()._asdict()) return mem except Exception as e: - FlowceptLogger.get_logger().exception(e) + logger.exception(e) -def _capture_process_info(conf): +def _capture_process_info(conf, logger): capt = conf.get("process_info", False) if not capt: return None try: - p = Telemetry._Process() + p = Telemetry.Process() psutil_p = psutil.Process() with psutil_p.oneshot(): p.pid = psutil_p.pid @@ -85,6 +99,7 @@ def _capture_process_info(conf): pass p.memory = psutil_p.memory_full_info() p.memory_percent = psutil_p.memory_percent() + # TODO dataclass: p.cpu_times = psutil_p.cpu_times()._asdict() p.cpu_percent = psutil_p.cpu_percent() p.executable = psutil_p.exe() @@ -92,33 +107,77 @@ def _capture_process_info(conf): p.num_open_file_descriptors = psutil_p.num_fds() p.num_connections = len(psutil_p.connections()) try: + # TODO dataclass: p.io_counters = psutil_p.io_counters()._asdict() except: pass p.num_open_files = len(psutil_p.open_files()) p.num_threads = psutil_p.num_threads() + # TODO dataclass: p.num_ctx_switches = psutil_p.num_ctx_switches()._asdict() return p except Exception as e: - FlowceptLogger.get_logger().exception(e) + logger.exception(e) -def _capture_cpu(conf: Dict): +def _capture_cpu(conf: Dict, logger): capt_cpu = conf.get("cpu", False) capt_per_cpu = conf.get("per_cpu", False) if not (capt_cpu or capt_per_cpu): return None try: - cpu = Telemetry._CPU() + cpu = Telemetry.CPU() if conf.get("cpu", False): - cpu.times = psutil.cpu_times(percpu=False)._asdict() - cpu.percent = psutil.cpu_percent() + cpu.times_avg = cpu.CPUMetrics( + **psutil.cpu_times(percpu=False)._asdict() + ) + cpu.percent_all = psutil.cpu_percent() if conf.get("per_cpu", False): cpu.times_per_cpu = [ - c._asdict() for c in psutil.cpu_times(percpu=True) + cpu.CPUMetrics(**c._asdict()) + for c in psutil.cpu_times(percpu=True) ] cpu.percent_per_cpu = psutil.cpu_percent(percpu=True) return cpu except Exception as e: - FlowceptLogger.get_logger().exception(e) + logger.exception(e) + return None + + +def _capture_gpu(conf: Dict, logger): + capt = conf.get("gpu", False) + if not capt: + return None + + try: + deviceCount = nvmlDeviceGetCount() + handle = nvmlDeviceGetHandleByIndex(0) + info = nvmlDeviceGetMemoryInfo(handle) + _this_gpu = { + "total": info.total, + "free": info.free, + "used": info.used, + } + gpu = Telemetry.GPU() + if len(deviceCount) == 0: + gpu.gpu_total = gpu.GPUMetrics(**_this_gpu) + else: + gpu.per_gpu = {0: gpu.GPUMetrics(**_this_gpu)} + sums = _this_gpu.copy() + for i in range(1, deviceCount): + handle = nvmlDeviceGetHandleByIndex(i) + info = nvmlDeviceGetMemoryInfo(handle) + sums["total"] += info.total + sums["free"] += info.free + sums["used"] += info.used + + gpu.per_gpu[i] = gpu.GPUMetrics( + total=info.total, free=info.free, used=info.used + ) + + gpu.gpu_total = gpu.GPUMetrics(**sums) + + return gpu + except Exception as e: + logger.exception(e) return None diff --git a/requirements.txt b/requirements.txt index b890e129..af4a753f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ PyYAML==6.0 redis==4.4.2 psutil==5.9.5 +nvidia-ml-py==11.525.131 diff --git a/setup.py b/setup.py index 2d11108d..d329309c 100644 --- a/setup.py +++ b/setup.py @@ -95,7 +95,7 @@ def get_requirements(file_path): setup( - name="flowcept", + name=PROJECT_NAME, version=version, license="MIT", author="Oak Ridge National Laboratory", diff --git a/tests/telemetry_test.py b/tests/telemetry_test.py index 57640dea..d8241516 100644 --- a/tests/telemetry_test.py +++ b/tests/telemetry_test.py @@ -1,8 +1,13 @@ import unittest +import json + +from flowcept.commons.flowcept_logger import FlowceptLogger from flowcept.flowceptor.telemetry_capture import capture_telemetry class TestTelemetry(unittest.TestCase): def test_telemetry(self): - telemetry = capture_telemetry() + self.logger = FlowceptLogger().get_logger() + telemetry = capture_telemetry(self.logger) assert telemetry.to_dict() + print(json.dumps(telemetry.to_dict(), indent=True)) From 8d7ef8d333a59032a101b659b2a3c96c3982eab2 Mon Sep 17 00:00:00 2001 From: Renan Souza Date: Mon, 3 Jul 2023 14:02:42 -0400 Subject: [PATCH 2/6] Removing dataclasses from telemetry --- .../commons/flowcept_dataclasses/telemetry.py | 102 +++++------------- .../plugins/dask/dask_interceptor.py | 2 +- flowcept/flowceptor/telemetry_capture.py | 32 ++---- 3 files changed, 36 insertions(+), 100 deletions(-) diff --git a/flowcept/commons/flowcept_dataclasses/telemetry.py b/flowcept/commons/flowcept_dataclasses/telemetry.py index 0f51b095..5a7f83f7 100644 --- a/flowcept/commons/flowcept_dataclasses/telemetry.py +++ b/flowcept/commons/flowcept_dataclasses/telemetry.py @@ -1,5 +1,5 @@ from typing import List, Dict -from dataclasses import dataclass, asdict, field +from dataclasses import dataclass, asdict def remove_none_values(_dict): @@ -7,78 +7,36 @@ def remove_none_values(_dict): class Telemetry: - @dataclass(init=False) - class CPU: - @dataclass - class CPUMetrics: - user: float - nice: float - system: float - idle: float + """ + Class representing telemetry information captured in the platform where t + he experiment runs. - times_avg: CPUMetrics + We are using psutils and the data it can capture depends on the platform. + So, we won't use dataclasses because we can't list all possible info + to be captured in any platform. + + """ + + class CPU: + times_avg: Dict[str, float] = None percent_all: float = None - times_per_cpu: List[CPUMetrics] = None + times_per_cpu: List[Dict[str, float]] = None percent_per_cpu: List[float] = None - @dataclass(init=False) class Memory: - @dataclass - class MemoryMetrics: - total: int = field(default=None) - used: int = field(default=None) - free: int = field(default=None) - percent: int = field(default=None) - sin: int = field(default=None) - sout: int = field(default=None) - available: int = field(default=None) - active: int = field(default=None) - inactive: int = field(default=None) - wired: int = field(default=None) - - virtual: MemoryMetrics = field(default=None) - swap: MemoryMetrics = field(default=None) + virtual: Dict[str, float] + swap: Dict[str, float] - @dataclass(init=False) class Network: - @dataclass - class NetworkMetrics: - bytes_sent: int - bytes_recv: int - packets_sent: int - packets_recv: int - errin: int - errout: int - dropin: int - dropout: int - - netio_sum: NetworkMetrics = None - netio_per_interface: Dict[str, NetworkMetrics] = None + netio: Dict[str, int] + netio_per_interface: Dict[str, Dict[str, int]] - @dataclass(init=False) class Disk: - @dataclass - class DiskUsage: - total: int - used: int - free: int - percent: float + disk_usage: Dict[str, float] + io: Dict[str, float] + io_per_disk: Dict[str, Dict[str, float]] - @dataclass - class DiskMetrics: - read_count: int - write_count: int - read_bytes: int - write_bytes: int - read_time: int - write_time: int - - disk_usage: DiskUsage - io_sum: DiskMetrics - io_per_disk: Dict[str, DiskMetrics] = field(default=None) - - # TODO: make it dataclass, like the others class Process: pid: int cpu_number: int @@ -116,24 +74,16 @@ class GPUMetrics: def to_dict(self): ret = {} if self.cpu is not None: - ret["cpu"] = asdict(self.cpu, dict_factory=remove_none_values) - + ret["cpu"] = self.cpu.__dict__ + if self.process is not None: + ret["process"] = self.process.__dict__ if self.memory is not None: - ret["memory"] = asdict( - self.memory, dict_factory=remove_none_values - ) + ret["memory"] = self.memory.__dict__ if self.disk is not None: - ret["disk"] = asdict(self.disk, dict_factory=remove_none_values) - + ret["disk"] = self.disk.__dict__ if self.network is not None: - ret["network"] = asdict( - self.network, dict_factory=remove_none_values - ) - + ret["network"] = self.network.__dict__ if self.gpu is not None: ret["gpu"] = asdict(self.gpu, dict_factory=remove_none_values) - if self.process is not None: - ret["process"] = self.process.__dict__ - return ret diff --git a/flowcept/flowceptor/plugins/dask/dask_interceptor.py b/flowcept/flowceptor/plugins/dask/dask_interceptor.py index c0c4b612..5d3f20f6 100644 --- a/flowcept/flowceptor/plugins/dask/dask_interceptor.py +++ b/flowcept/flowceptor/plugins/dask/dask_interceptor.py @@ -174,7 +174,7 @@ def callback(self, task_id, start, finish, *args, **kwargs): "traceback": ts.traceback_text, } if TELEMETRY_CAPTURE is not None: - task_msg.telemetry_at_end = capture_telemetry() + task_msg.telemetry_at_end = capture_telemetry(self.logger) else: return diff --git a/flowcept/flowceptor/telemetry_capture.py b/flowcept/flowceptor/telemetry_capture.py index e047d5d5..a0c1ce9f 100644 --- a/flowcept/flowceptor/telemetry_capture.py +++ b/flowcept/flowceptor/telemetry_capture.py @@ -33,17 +33,13 @@ def _capture_disk(conf, logger): return None try: disk = Telemetry.Disk() - disk.disk_usage = disk.DiskUsage(**psutil.disk_usage("/")._asdict()) - disk.io_sum = disk.DiskMetrics( - **psutil.disk_io_counters(perdisk=False)._asdict() - ) + disk.disk_usage = psutil.disk_usage("/")._asdict() + disk.io_sum = psutil.disk_io_counters(perdisk=False)._asdict() io_perdisk = psutil.disk_io_counters(perdisk=True) if len(io_perdisk) > 1: disk.io_per_disk = {} for d in io_perdisk: - disk.io_per_disk[d] = disk.DiskMetrics( - **io_perdisk[d]._asdict() - ) + disk.io_per_disk[d] = io_perdisk[d]._asdict() return disk except Exception as e: @@ -56,16 +52,12 @@ def _capture_network(conf, logger): return None try: net = Telemetry.Network() - net.netio_sum = net.NetworkMetrics( - **psutil.net_io_counters(pernic=False)._asdict() - ) + net.netio_sum = psutil.net_io_counters(pernic=False)._asdict() pernic = psutil.net_io_counters(pernic=True) net.netio_per_interface = {} for ic in pernic: if pernic[ic].bytes_sent and pernic[ic].bytes_recv: - net.netio_per_interface[ic] = net.NetworkMetrics( - **pernic[ic]._asdict() - ) + net.netio_per_interface[ic] = pernic[ic]._asdict() return net except Exception as e: logger.exception(e) @@ -77,8 +69,8 @@ def _capture_memory(conf, logger): return None try: mem = Telemetry.Memory() - mem.virtual = mem.MemoryMetrics(**psutil.virtual_memory()._asdict()) - mem.swap = mem.MemoryMetrics(**psutil.swap_memory()._asdict()) + mem.virtual = psutil.virtual_memory()._asdict() + mem.swap = psutil.swap_memory()._asdict() return mem except Exception as e: logger.exception(e) @@ -99,7 +91,6 @@ def _capture_process_info(conf, logger): pass p.memory = psutil_p.memory_full_info() p.memory_percent = psutil_p.memory_percent() - # TODO dataclass: p.cpu_times = psutil_p.cpu_times()._asdict() p.cpu_percent = psutil_p.cpu_percent() p.executable = psutil_p.exe() @@ -107,13 +98,11 @@ def _capture_process_info(conf, logger): p.num_open_file_descriptors = psutil_p.num_fds() p.num_connections = len(psutil_p.connections()) try: - # TODO dataclass: p.io_counters = psutil_p.io_counters()._asdict() except: pass p.num_open_files = len(psutil_p.open_files()) p.num_threads = psutil_p.num_threads() - # TODO dataclass: p.num_ctx_switches = psutil_p.num_ctx_switches()._asdict() return p except Exception as e: @@ -128,14 +117,11 @@ def _capture_cpu(conf: Dict, logger): try: cpu = Telemetry.CPU() if conf.get("cpu", False): - cpu.times_avg = cpu.CPUMetrics( - **psutil.cpu_times(percpu=False)._asdict() - ) + cpu.times_avg = psutil.cpu_times(percpu=False)._asdict() cpu.percent_all = psutil.cpu_percent() if conf.get("per_cpu", False): cpu.times_per_cpu = [ - cpu.CPUMetrics(**c._asdict()) - for c in psutil.cpu_times(percpu=True) + c._asdict() for c in psutil.cpu_times(percpu=True) ] cpu.percent_per_cpu = psutil.cpu_percent(percpu=True) return cpu From 388d0e98b6ec12a1e22618d9367b56bebe1e74d5 Mon Sep 17 00:00:00 2001 From: Renan Souza Date: Mon, 3 Jul 2023 14:50:01 -0400 Subject: [PATCH 3/6] Fixes in GPU telemetry capture --- .../commons/flowcept_dataclasses/telemetry.py | 1 + flowcept/flowcept_api/consumer_api.py | 5 --- .../flowceptor/plugins/base_interceptor.py | 29 ++++----------- flowcept/flowceptor/telemetry_capture.py | 35 ++++++++++++++++++- 4 files changed, 42 insertions(+), 28 deletions(-) diff --git a/flowcept/commons/flowcept_dataclasses/telemetry.py b/flowcept/commons/flowcept_dataclasses/telemetry.py index 5a7f83f7..e0325d0e 100644 --- a/flowcept/commons/flowcept_dataclasses/telemetry.py +++ b/flowcept/commons/flowcept_dataclasses/telemetry.py @@ -60,6 +60,7 @@ class GPUMetrics: total: int free: int used: int + percent: float gpu_total: GPUMetrics per_gpu: Dict[int, GPUMetrics] = None diff --git a/flowcept/flowcept_api/consumer_api.py b/flowcept/flowcept_api/consumer_api.py index c1e49474..7caea82f 100644 --- a/flowcept/flowcept_api/consumer_api.py +++ b/flowcept/flowcept_api/consumer_api.py @@ -1,12 +1,7 @@ from typing import List, Union from time import sleep -import random from flowcept.commons.daos.mq_dao import MQDao -from flowcept.configs import ( - REDIS_INSERTION_BUFFER_TIME, - MONGO_INSERTION_BUFFER_TIME, -) from flowcept.flowceptor.consumers.document_inserter import DocumentInserter from flowcept.commons.flowcept_logger import FlowceptLogger from flowcept.flowceptor.plugins.base_interceptor import BaseInterceptor diff --git a/flowcept/flowceptor/plugins/base_interceptor.py b/flowcept/flowceptor/plugins/base_interceptor.py index e343d95e..46419ccb 100644 --- a/flowcept/flowceptor/plugins/base_interceptor.py +++ b/flowcept/flowceptor/plugins/base_interceptor.py @@ -11,13 +11,17 @@ CAMPAIGN_ID, HOSTNAME, EXTRA_METADATA, - TELEMETRY_CAPTURE, ) from flowcept.commons.flowcept_logger import FlowceptLogger from flowcept.commons.daos.mq_dao import MQDao from flowcept.commons.flowcept_dataclasses.task_message import TaskMessage from flowcept.flowceptor.plugins.settings_factory import get_settings +from flowcept.flowceptor.telemetry_capture import ( + init_gpu_telemetry, + shutdown_gpu_telemetry, +) + from flowcept.version import __version__ @@ -74,17 +78,7 @@ def start(self) -> "BaseInterceptor": :return: """ self._mq_dao.start_time_based_flushing() - - if TELEMETRY_CAPTURE is not None: - if TELEMETRY_CAPTURE["gpu"]: - try: - from pynvml import nvmlInit - - nvmlInit() - except Exception as e: - self.logger.error("NVIDIA GPU NOT FOUND!") - self.logger.exception(e) - + init_gpu_telemetry() return self def stop(self) -> bool: @@ -93,16 +87,7 @@ def stop(self) -> bool: :return: """ self._mq_dao.stop() - - if TELEMETRY_CAPTURE is not None: - if TELEMETRY_CAPTURE["gpu"]: - try: - from pynvml import nvmlShutdown - - nvmlShutdown() - except Exception as e: - self.logger.error("NVIDIA GPU NOT FOUND!") - self.logger.exception(e) + shutdown_gpu_telemetry() def observe(self, *args, **kwargs): """ diff --git a/flowcept/flowceptor/telemetry_capture.py b/flowcept/flowceptor/telemetry_capture.py index a0c1ce9f..e2ec3dee 100644 --- a/flowcept/flowceptor/telemetry_capture.py +++ b/flowcept/flowceptor/telemetry_capture.py @@ -5,6 +5,8 @@ nvmlDeviceGetCount, nvmlDeviceGetHandleByIndex, nvmlDeviceGetMemoryInfo, + nvmlInit, + nvmlShutdown, ) from flowcept.configs import TELEMETRY_CAPTURE @@ -143,6 +145,7 @@ def _capture_gpu(conf: Dict, logger): "total": info.total, "free": info.free, "used": info.used, + "percent": info.used / info.total * 100, } gpu = Telemetry.GPU() if len(deviceCount) == 0: @@ -158,12 +161,42 @@ def _capture_gpu(conf: Dict, logger): sums["used"] += info.used gpu.per_gpu[i] = gpu.GPUMetrics( - total=info.total, free=info.free, used=info.used + total=info.total, + free=info.free, + used=info.used, + percent=info.used / info.total * 100, ) + sums["percent"] = sums["used"] / sums["total"] * 100 gpu.gpu_total = gpu.GPUMetrics(**sums) return gpu except Exception as e: logger.exception(e) return None + + +def init_gpu_telemetry(logger: Logger): + conf = TELEMETRY_CAPTURE + if conf is None: + return None + + if TELEMETRY_CAPTURE.get("gpu", False): + try: + nvmlInit() + except Exception as e: + logger.error("NVIDIA GPU NOT FOUND!") + logger.exception(e) + + +def shutdown_gpu_telemetry(logger: Logger): + conf = TELEMETRY_CAPTURE + if conf is None: + return None + + if TELEMETRY_CAPTURE.get("gpu", False): + try: + nvmlShutdown() + except Exception as e: + logger.error("NVIDIA GPU NOT FOUND!") + logger.exception(e) From 5f34155eb690136eabee07b4ba5835bad94a7275 Mon Sep 17 00:00:00 2001 From: Renan Souza Date: Mon, 3 Jul 2023 15:36:13 -0400 Subject: [PATCH 4/6] Turning TelemetryCapture into a class --- .../commons/flowcept_dataclasses/telemetry.py | 6 +- .../flowceptor/plugins/base_interceptor.py | 10 +- .../plugins/dask/dask_interceptor.py | 13 +- flowcept/flowceptor/telemetry_capture.py | 372 +++++++++--------- tests/telemetry_test.py | 9 +- 5 files changed, 211 insertions(+), 199 deletions(-) diff --git a/flowcept/commons/flowcept_dataclasses/telemetry.py b/flowcept/commons/flowcept_dataclasses/telemetry.py index e0325d0e..0575d466 100644 --- a/flowcept/commons/flowcept_dataclasses/telemetry.py +++ b/flowcept/commons/flowcept_dataclasses/telemetry.py @@ -60,9 +60,11 @@ class GPUMetrics: total: int free: int used: int - percent: float + usage_percent: float + temperature: float + power_usage: float - gpu_total: GPUMetrics + gpu_sums: GPUMetrics per_gpu: Dict[int, GPUMetrics] = None cpu: CPU = None diff --git a/flowcept/flowceptor/plugins/base_interceptor.py b/flowcept/flowceptor/plugins/base_interceptor.py index 46419ccb..ecf65bde 100644 --- a/flowcept/flowceptor/plugins/base_interceptor.py +++ b/flowcept/flowceptor/plugins/base_interceptor.py @@ -17,10 +17,7 @@ from flowcept.commons.flowcept_dataclasses.task_message import TaskMessage from flowcept.flowceptor.plugins.settings_factory import get_settings -from flowcept.flowceptor.telemetry_capture import ( - init_gpu_telemetry, - shutdown_gpu_telemetry, -) +from flowcept.flowceptor.telemetry_capture import TelemetryCapture from flowcept.version import __version__ @@ -68,6 +65,7 @@ def __init__(self, plugin_key): self.logger = FlowceptLogger().get_logger() self.settings = get_settings(plugin_key) self._mq_dao = MQDao() + self.telemetry_capture = TelemetryCapture() def prepare_task_msg(self, *args, **kwargs) -> TaskMessage: raise NotImplementedError() @@ -78,7 +76,7 @@ def start(self) -> "BaseInterceptor": :return: """ self._mq_dao.start_time_based_flushing() - init_gpu_telemetry() + self.telemetry_capture.init_gpu_telemetry() return self def stop(self) -> bool: @@ -87,7 +85,7 @@ def stop(self) -> bool: :return: """ self._mq_dao.stop() - shutdown_gpu_telemetry() + self.telemetry_capture.shutdown_gpu_telemetry() def observe(self, *args, **kwargs): """ diff --git a/flowcept/flowceptor/plugins/dask/dask_interceptor.py b/flowcept/flowceptor/plugins/dask/dask_interceptor.py index 5d3f20f6..d65c964e 100644 --- a/flowcept/flowceptor/plugins/dask/dask_interceptor.py +++ b/flowcept/flowceptor/plugins/dask/dask_interceptor.py @@ -9,7 +9,6 @@ ) from flowcept.commons.utils import get_utc_now from flowcept.configs import TELEMETRY_CAPTURE -from flowcept.flowceptor.telemetry_capture import capture_telemetry def get_run_spec_data(task_msg: TaskMessage, run_spec): @@ -148,8 +147,8 @@ def callback(self, task_id, start, finish, *args, **kwargs): if ts.state == "executing": if TELEMETRY_CAPTURE is not None: - task_msg.telemetry_at_start = capture_telemetry( - self.logger + task_msg.telemetry_at_start = ( + self.telemetry_capture.capture() ) task_msg.status = Status.RUNNING task_msg.address = self._worker.worker_address @@ -162,7 +161,9 @@ def callback(self, task_id, start, finish, *args, **kwargs): else: get_times_from_task_state(task_msg, ts) if TELEMETRY_CAPTURE is not None: - task_msg.telemetry_at_end = capture_telemetry(self.logger) + task_msg.telemetry_at_end = ( + self.telemetry_capture.capture() + ) elif ts.state == "error": task_msg.status = Status.ERROR if self.settings.worker_create_timestamps: @@ -174,7 +175,9 @@ def callback(self, task_id, start, finish, *args, **kwargs): "traceback": ts.traceback_text, } if TELEMETRY_CAPTURE is not None: - task_msg.telemetry_at_end = capture_telemetry(self.logger) + task_msg.telemetry_at_end = ( + self.telemetry_capture.capture() + ) else: return diff --git a/flowcept/flowceptor/telemetry_capture.py b/flowcept/flowceptor/telemetry_capture.py index e2ec3dee..ce2c5db9 100644 --- a/flowcept/flowceptor/telemetry_capture.py +++ b/flowcept/flowceptor/telemetry_capture.py @@ -1,202 +1,210 @@ -from logging import Logger -from typing import Dict import psutil +import pynvml from pynvml import ( nvmlDeviceGetCount, nvmlDeviceGetHandleByIndex, nvmlDeviceGetMemoryInfo, nvmlInit, nvmlShutdown, + nvmlDeviceGetTemperature, ) +from flowcept.commons.flowcept_logger import FlowceptLogger from flowcept.configs import TELEMETRY_CAPTURE from flowcept.commons.flowcept_dataclasses.telemetry import Telemetry -def capture_telemetry(logger: Logger) -> Telemetry: - conf = TELEMETRY_CAPTURE - if conf is None: - return None - - tel = Telemetry() - tel.process = _capture_process_info(conf, logger) - tel.cpu = _capture_cpu(conf, logger) - tel.memory = _capture_memory(conf, logger) - tel.network = _capture_network(conf, logger) - tel.disk = _capture_disk(conf, logger) - tel.gpu = _capture_gpu(conf, logger) - - return tel - - -def _capture_disk(conf, logger): - capt = conf.get("disk", False) - if not capt: - return None - try: - disk = Telemetry.Disk() - disk.disk_usage = psutil.disk_usage("/")._asdict() - disk.io_sum = psutil.disk_io_counters(perdisk=False)._asdict() - io_perdisk = psutil.disk_io_counters(perdisk=True) - if len(io_perdisk) > 1: - disk.io_per_disk = {} - for d in io_perdisk: - disk.io_per_disk[d] = io_perdisk[d]._asdict() - - return disk - except Exception as e: - logger.exception(e) - - -def _capture_network(conf, logger): - capt = conf.get("network", False) - if not capt: - return None - try: - net = Telemetry.Network() - net.netio_sum = psutil.net_io_counters(pernic=False)._asdict() - pernic = psutil.net_io_counters(pernic=True) - net.netio_per_interface = {} - for ic in pernic: - if pernic[ic].bytes_sent and pernic[ic].bytes_recv: - net.netio_per_interface[ic] = pernic[ic]._asdict() - return net - except Exception as e: - logger.exception(e) - - -def _capture_memory(conf, logger): - capt = conf.get("mem", False) - if not capt: - return None - try: - mem = Telemetry.Memory() - mem.virtual = psutil.virtual_memory()._asdict() - mem.swap = psutil.swap_memory()._asdict() - return mem - except Exception as e: - logger.exception(e) - - -def _capture_process_info(conf, logger): - capt = conf.get("process_info", False) - if not capt: - return None - try: - p = Telemetry.Process() - psutil_p = psutil.Process() - with psutil_p.oneshot(): - p.pid = psutil_p.pid - try: - p.cpu_number = psutil_p.cpu_num() - except: - pass - p.memory = psutil_p.memory_full_info() - p.memory_percent = psutil_p.memory_percent() - p.cpu_times = psutil_p.cpu_times()._asdict() - p.cpu_percent = psutil_p.cpu_percent() - p.executable = psutil_p.exe() - p.cmd_line = psutil_p.cmdline() - p.num_open_file_descriptors = psutil_p.num_fds() - p.num_connections = len(psutil_p.connections()) - try: - p.io_counters = psutil_p.io_counters()._asdict() - except: - pass - p.num_open_files = len(psutil_p.open_files()) - p.num_threads = psutil_p.num_threads() - p.num_ctx_switches = psutil_p.num_ctx_switches()._asdict() - return p - except Exception as e: - logger.exception(e) - - -def _capture_cpu(conf: Dict, logger): - capt_cpu = conf.get("cpu", False) - capt_per_cpu = conf.get("per_cpu", False) - if not (capt_cpu or capt_per_cpu): - return None - try: - cpu = Telemetry.CPU() - if conf.get("cpu", False): - cpu.times_avg = psutil.cpu_times(percpu=False)._asdict() - cpu.percent_all = psutil.cpu_percent() - if conf.get("per_cpu", False): - cpu.times_per_cpu = [ - c._asdict() for c in psutil.cpu_times(percpu=True) - ] - cpu.percent_per_cpu = psutil.cpu_percent(percpu=True) - return cpu - except Exception as e: - logger.exception(e) - return None - - -def _capture_gpu(conf: Dict, logger): - capt = conf.get("gpu", False) - if not capt: - return None - - try: - deviceCount = nvmlDeviceGetCount() - handle = nvmlDeviceGetHandleByIndex(0) - info = nvmlDeviceGetMemoryInfo(handle) - _this_gpu = { - "total": info.total, - "free": info.free, - "used": info.used, - "percent": info.used / info.total * 100, - } - gpu = Telemetry.GPU() - if len(deviceCount) == 0: - gpu.gpu_total = gpu.GPUMetrics(**_this_gpu) - else: - gpu.per_gpu = {0: gpu.GPUMetrics(**_this_gpu)} - sums = _this_gpu.copy() - for i in range(1, deviceCount): - handle = nvmlDeviceGetHandleByIndex(i) - info = nvmlDeviceGetMemoryInfo(handle) - sums["total"] += info.total - sums["free"] += info.free - sums["used"] += info.used - - gpu.per_gpu[i] = gpu.GPUMetrics( - total=info.total, - free=info.free, - used=info.used, - percent=info.used / info.total * 100, - ) - - sums["percent"] = sums["used"] / sums["total"] * 100 - gpu.gpu_total = gpu.GPUMetrics(**sums) - - return gpu - except Exception as e: - logger.exception(e) - return None - - -def init_gpu_telemetry(logger: Logger): - conf = TELEMETRY_CAPTURE - if conf is None: - return None - - if TELEMETRY_CAPTURE.get("gpu", False): +class TelemetryCapture: + def __init__(self, conf=TELEMETRY_CAPTURE): + self.conf = conf + self.logger = FlowceptLogger().get_logger() + + def capture(self) -> Telemetry: + if self.conf is None: + return None + + tel = Telemetry() + tel.process = self._capture_process_info() + tel.cpu = self._capture_cpu() + tel.memory = self._capture_memory() + tel.network = self._capture_network() + tel.disk = self._capture_disk() + tel.gpu = self._capture_gpu() + + return tel + + def _capture_disk(self): + capt = self.conf.get("disk", False) + if not capt: + return None try: - nvmlInit() + disk = Telemetry.Disk() + disk.disk_usage = psutil.disk_usage("/")._asdict() + disk.io_sum = psutil.disk_io_counters(perdisk=False)._asdict() + io_perdisk = psutil.disk_io_counters(perdisk=True) + if len(io_perdisk) > 1: + disk.io_per_disk = {} + for d in io_perdisk: + disk.io_per_disk[d] = io_perdisk[d]._asdict() + + return disk except Exception as e: - logger.error("NVIDIA GPU NOT FOUND!") - logger.exception(e) + self.logger.exception(e) + def _capture_network(self): + capt = self.conf.get("network", False) + if not capt: + return None + try: + net = Telemetry.Network() + net.netio_sum = psutil.net_io_counters(pernic=False)._asdict() + pernic = psutil.net_io_counters(pernic=True) + net.netio_per_interface = {} + for ic in pernic: + if pernic[ic].bytes_sent and pernic[ic].bytes_recv: + net.netio_per_interface[ic] = pernic[ic]._asdict() + return net + except Exception as e: + self.logger.exception(e) -def shutdown_gpu_telemetry(logger: Logger): - conf = TELEMETRY_CAPTURE - if conf is None: - return None + def _capture_memory(self): + capt = self.conf.get("mem", False) + if not capt: + return None + try: + mem = Telemetry.Memory() + mem.virtual = psutil.virtual_memory()._asdict() + mem.swap = psutil.swap_memory()._asdict() + return mem + except Exception as e: + self.logger.exception(e) - if TELEMETRY_CAPTURE.get("gpu", False): + def _capture_process_info(self): + capt = self.conf.get("process_info", False) + if not capt: + return None try: - nvmlShutdown() + p = Telemetry.Process() + psutil_p = psutil.Process() + with psutil_p.oneshot(): + p.pid = psutil_p.pid + try: + p.cpu_number = psutil_p.cpu_num() + except: + pass + p.memory = psutil_p.memory_full_info() + p.memory_percent = psutil_p.memory_percent() + p.cpu_times = psutil_p.cpu_times()._asdict() + p.cpu_percent = psutil_p.cpu_percent() + p.executable = psutil_p.exe() + p.cmd_line = psutil_p.cmdline() + p.num_open_file_descriptors = psutil_p.num_fds() + p.num_connections = len(psutil_p.connections()) + try: + p.io_counters = psutil_p.io_counters()._asdict() + except: + pass + p.num_open_files = len(psutil_p.open_files()) + p.num_threads = psutil_p.num_threads() + p.num_ctx_switches = psutil_p.num_ctx_switches()._asdict() + return p except Exception as e: - logger.error("NVIDIA GPU NOT FOUND!") - logger.exception(e) + self.logger.exception(e) + + def _capture_cpu(self): + capt_cpu = self.conf.get("cpu", False) + capt_per_cpu = self.conf.get("per_cpu", False) + if not (capt_cpu or capt_per_cpu): + return None + try: + cpu = Telemetry.CPU() + if capt_cpu: + cpu.times_avg = psutil.cpu_times(percpu=False)._asdict() + cpu.percent_all = psutil.cpu_percent() + if capt_per_cpu: + cpu.times_per_cpu = [ + c._asdict() for c in psutil.cpu_times(percpu=True) + ] + cpu.percent_per_cpu = psutil.cpu_percent(percpu=True) + return cpu + except Exception as e: + self.logger.exception(e) + return None + + def _capture_gpu(self): + capt = self.conf.get("gpu", False) + if not capt: + return None + + try: + deviceCount = nvmlDeviceGetCount() + handle = nvmlDeviceGetHandleByIndex(0) + info = nvmlDeviceGetMemoryInfo(handle) + _this_gpu = { + "total": info.total, + "free": info.free, + "used": info.used, + "usage_percent": info.used / info.total * 100, + "temperature": nvmlDeviceGetTemperature( + handle, pynvml.NVML_TEMPERATURE_GPU + ), + "power_usage": pynvml.nvmlDeviceGetPowerUsage(handle), + } + gpu = Telemetry.GPU() + if len(deviceCount) == 0: + gpu.gpu_sums = gpu.GPUMetrics(**_this_gpu) + else: + gpu.per_gpu = {0: gpu.GPUMetrics(**_this_gpu)} + sums = _this_gpu.copy() + for i in range(1, deviceCount): + handle = nvmlDeviceGetHandleByIndex(i) + info = nvmlDeviceGetMemoryInfo(handle) + _temp = nvmlDeviceGetTemperature( + handle, pynvml.NVML_TEMPERATURE_GPU + ) + _pow = pynvml.nvmlDeviceGetPowerUsage(handle) + + sums["total"] += info.total + sums["free"] += info.free + sums["used"] += info.used + sums["temperature"] += _temp + sums["power_usage"] += _pow + + gpu.per_gpu[i] = gpu.GPUMetrics( + total=info.total, + free=info.free, + used=info.used, + usage_percent=info.used / info.total * 100, + temperature=_temp, + power_usage=_pow, + ) + + sums["usage_percent"] = sums["used"] / sums["total"] * 100 + gpu.gpu_sums = gpu.GPUMetrics(**sums) + + return gpu + except Exception as e: + self.logger.exception(e) + return None + + def init_gpu_telemetry(self): + if self.conf is None: + return None + + if self.conf.get("gpu", False): + try: + nvmlInit() + except Exception as e: + self.logger.error("NVIDIA GPU NOT FOUND!") + self.logger.exception(e) + + def shutdown_gpu_telemetry(self): + if self.conf is None: + return None + + if self.conf.get("gpu", False): + try: + nvmlShutdown() + except Exception as e: + self.logger.error("NVIDIA GPU NOT FOUND!") + self.logger.exception(e) diff --git a/tests/telemetry_test.py b/tests/telemetry_test.py index d8241516..e0433ef5 100644 --- a/tests/telemetry_test.py +++ b/tests/telemetry_test.py @@ -1,13 +1,14 @@ import unittest import json -from flowcept.commons.flowcept_logger import FlowceptLogger -from flowcept.flowceptor.telemetry_capture import capture_telemetry +from flowcept.flowceptor.telemetry_capture import TelemetryCapture class TestTelemetry(unittest.TestCase): def test_telemetry(self): - self.logger = FlowceptLogger().get_logger() - telemetry = capture_telemetry(self.logger) + tele_capture = TelemetryCapture() + tele_capture.init_gpu_telemetry() + telemetry = tele_capture.capture() assert telemetry.to_dict() print(json.dumps(telemetry.to_dict(), indent=True)) + tele_capture.shutdown_gpu_telemetry() From 91387320419c0e76e4c0d1f68fe85a62609f4b3b Mon Sep 17 00:00:00 2001 From: Renan Souza Date: Mon, 3 Jul 2023 15:48:09 -0400 Subject: [PATCH 5/6] Minor fixes in gpu telemetry capture --- flowcept/flowceptor/telemetry_capture.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flowcept/flowceptor/telemetry_capture.py b/flowcept/flowceptor/telemetry_capture.py index ce2c5db9..1e0b8ff5 100644 --- a/flowcept/flowceptor/telemetry_capture.py +++ b/flowcept/flowceptor/telemetry_capture.py @@ -151,7 +151,7 @@ def _capture_gpu(self): "power_usage": pynvml.nvmlDeviceGetPowerUsage(handle), } gpu = Telemetry.GPU() - if len(deviceCount) == 0: + if deviceCount == 1: gpu.gpu_sums = gpu.GPUMetrics(**_this_gpu) else: gpu.per_gpu = {0: gpu.GPUMetrics(**_this_gpu)} From 07cb3b9ab350c130cdf28411e684e30bd7ffb493 Mon Sep 17 00:00:00 2001 From: Renan Souza Date: Mon, 3 Jul 2023 16:17:42 -0400 Subject: [PATCH 6/6] changing to memory_info --- flowcept/flowceptor/telemetry_capture.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flowcept/flowceptor/telemetry_capture.py b/flowcept/flowceptor/telemetry_capture.py index 1e0b8ff5..893e82c5 100644 --- a/flowcept/flowceptor/telemetry_capture.py +++ b/flowcept/flowceptor/telemetry_capture.py @@ -92,7 +92,7 @@ def _capture_process_info(self): p.cpu_number = psutil_p.cpu_num() except: pass - p.memory = psutil_p.memory_full_info() + p.memory = psutil_p.memory_info() p.memory_percent = psutil_p.memory_percent() p.cpu_times = psutil_p.cpu_times()._asdict() p.cpu_percent = psutil_p.cpu_percent()