Skip to content

Commit

Permalink
Support cgroup v2 (#21)
Browse files Browse the repository at this point in the history
* Process scanning that should work for cgroups v1 and v2

* Don't roll our own code for environment

* Adapt statistic gathering that works with both versions of cgroups

* Make gpu detection work with cgroup v2

* Make string into strings

* .Proc() -> .Process()

* Don't shadow

* consistent cgroups

* Make sure paths are valid

* Adding Cgroup v1 vs v2 in the README.md

---------

Co-authored-by: Simon Guilbault <[email protected]>
  • Loading branch information
abergeron and guilbaults authored Jan 22, 2024
1 parent aaa8ebb commit 556499f
Show file tree
Hide file tree
Showing 3 changed files with 194 additions and 148 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ optional arguments:
--port PORT Collector http port, default is 9798
```

## Cgroup v1 vs v2
[Slurm currently supports cgroup v1 and v2](https://slurm.schedmd.com/cgroup_v2.html), but there are some limitations with v2, some metrics are not currently available on this exporter:

* `memory.max_usage_in_bytes` becomes `memory.peak`, but this is not in any currently released kernel ([torvalds/linux@8e20d4b](https://github.com/torvalds/linux/commit/8e20d4b332660a32e842e20c34cfc3b3456bc6dc))
* `cpuacct.usage_percpu` is exposed via eBPF in kernel 6.6+, but not through cgroupfs although it might be for a future kernel.

## Sample
```
# HELP slurm_job_memory_usage Memory used by a job
Expand Down
5 changes: 2 additions & 3 deletions get_gpus.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
# instead of cgexec that is deprecated in rhel9, we simply bind this bash
# process to the cgroup, then nvidia-smi will be able to see the GPU
# of the cgroup of the running job
uid=$1
job=$2
echo $$ >> /sys/fs/cgroup/devices/slurm/uid_${uid}/job_${job}/tasks
task_file=$1
echo $$ >> "$task_file"
nvidia-smi -L
331 changes: 186 additions & 145 deletions slurm-job-exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,27 +23,25 @@ def get_username(uid):
return subprocess.check_output(command).strip().decode()


def cgroup_processes(uid, job):
def cgroup_processes(job_dir):
"""
Find all the PIDs for a cgroup of a user+job
Find all the PIDs for a cgroup of a job
"""
procs = []
step_g = '/sys/fs/cgroup/memory/slurm/uid_{}/job_{}/step_*'
for step in glob.glob(step_g.format(uid, job)):
cgroup = '/sys/fs/cgroup/memory/slurm/uid_{}/job_{}/{}/task_*'.format(
uid, job, step.split('/')[-1])
for process_file in glob.glob(cgroup):
with open(process_file + '/cgroup.procs', 'r') as stats:
for proc in stats.readlines():
# check if process is not running as root
# a long sleep running as root can be found in step_extern
try:
ps = psutil.Process(int(proc))
if ps.username() != 'root':
procs.append(int(proc))
except psutil.NoSuchProcess:
pass
return procs
res_uid = -1
for (path, _, _) in os.walk(job_dir):
with open(os.path.join(path, "cgroup.procs"), 'r') as fprocs:
for proc in fprocs.readlines():
pid = int(proc)
try:
ps = psutil.Process(pid)
uid = ps.uids().real
if uid != 0:
res_uid = uid
procs.append(pid)
except psutil.NoSuchProcess:
pass
return res_uid, procs


def split_range(range_str):
Expand All @@ -66,25 +64,32 @@ def get_env(pid):
"""
Return the environment variables of a process
"""
environments = {}
try:
with open('/proc/{}/environ'.format(pid), 'r', encoding='utf-8') as env_f:
for env in env_f.read().split('\000'):
r_env = re.match(r'(.*)=(.*)', env)
if r_env:
environments[r_env.group(1)] = r_env.group(2)
except FileNotFoundError:
raise ValueError('Process {} environment does not exist'.format(pid))
return environments


def cgroup_gpus(uid, job):
ps = psutil.Process(pid)
return ps.environ()
except psutil.NoSuchProcess:
raise ValueError("Could not get environment for {}".format(pid))


def cgroup_gpus(job_dir, cgroups):
if cgroups == 1:
task_file = os.path.join(job_dir, "tasks")
else:
cgroup_path = os.path.join(job_dir, "gpu_probe")
# This will create a new cgroup under the root of the job.
# This is required for v2 since we can only add tasks to leaf cgroups
os.mkdir(cgroup_path)
task_file = os.path.join(cgroup_path, "cgroup.procs")
try:
command = ["get_gpus.sh", uid, job]
res = subprocess.check_output(command).strip().decode()
res = subprocess.check_output(["get_gpus.sh", task_file]).strip().decode()
except FileNotFoundError:
# This is most likely because cgexec or nvidia-smi are not on the machine
# This is most likely because nvidia-smi is not on the machine
return []
finally:
if cgroups == 2:
# We can remove a cgroup if no tasks are remaining inside
os.rmdir(cgroup_path)

gpus = []

mig = 'MIG' in res
Expand Down Expand Up @@ -307,134 +312,170 @@ def collect(self):
'slurm_job_pcie_gpu', 'PCIe tx/rx bytes per second',
labels=['user', 'account', 'slurmjobid', 'gpu', 'gpu_type', 'direction'])

for uid_dir in glob.glob("/sys/fs/cgroup/memory/slurm/uid_*"):
uid = uid_dir.split('/')[-1].split('_')[1]
job_path = "/sys/fs/cgroup/memory/slurm/uid_{}/job_*".format(uid)
for job_dir in glob.glob(job_path):
job = job_dir.split('/')[-1].split('_')[1]
mem_path = '/sys/fs/cgroup/memory/slurm/uid_{}/job_{}/'.format(
uid, job)
procs = cgroup_processes(uid, job)
if len(procs) == 0:
continue

# Job is alive, we can get the stats
user = get_username(uid)
gpu_set = set()
if self.MONITOR_PYNVML or self.MONITOR_DCGM:
gpu_set.update(cgroup_gpus(uid, job))
if os.path.exists("/sys/fs/cgroup/memory"):
cgroups = 1 # we are running cgroups v1
else:
cgroups = 2 # we are running cgroups v2

for proc in procs:
# get the SLURM_JOB_ACCOUNT
try:
envs = get_env(proc)
except ValueError:
# Process does not have an environment, its probably gone
continue
if 'SLURM_JOB_ACCOUNT' in envs:
account = envs['SLURM_JOB_ACCOUNT']
break
if cgroups == 1:
jobs_glob = "/sys/fs/cgroup/memory/slurm/uid_*/job_*"
else:
jobs_glob = "/sys/fs/cgroup/system.slice/slurmstepd.scope/job_*"
for job_dir in glob.glob(jobs_glob):
job = job_dir.split('/')[-1].split('_')[1]
uid, procs = cgroup_processes(job_dir)
if len(procs) == 0:
continue

# Job is alive, we can get the stats
user = get_username(uid)
gpu_set = set()
if self.MONITOR_PYNVML or self.MONITOR_DCGM:
if cgroups == 1:
gpu_dir = "/sys/fs/cgroup/devices/slurm/uid_{}/job_{}".format(uid, job)
else:
# Could not find the env variables, slurm_adopt only fill the jobid
account = "error"
gpu_dir = job_dir
gpu_set.update(cgroup_gpus(gpu_dir, cgroups))

with open(mem_path + 'memory.usage_in_bytes', 'r') as f_usage:
gauge_memory_usage.add_metric([user, account, job], int(f_usage.read()))
with open(mem_path + 'memory.max_usage_in_bytes', 'r') as f_max:
for proc in procs:
# get the SLURM_JOB_ACCOUNT
try:
envs = get_env(proc)
except ValueError:
# Process does not have an environment, its probably gone
continue
if 'SLURM_JOB_ACCOUNT' in envs:
account = envs['SLURM_JOB_ACCOUNT']
break
else:
# Could not find the env variables, slurm_adopt only fill the jobid
account = "error"

with open(os.path.join(job_dir, ('memory.usage_in_bytes' if cgroups == 1 else 'memory.current')), 'r') as f_usage:
gauge_memory_usage.add_metric([user, account, job], int(f_usage.read()))
try:
with open(os.path.join(job_dir, ('memory.max_usage_in_bytes' if cgroups == 1 else 'memory.peak')), 'r') as f_max:
gauge_memory_max.add_metric([user, account, job], int(f_max.read()))
with open(mem_path + 'memory.limit_in_bytes', 'r') as f_limit:
gauge_memory_limit.add_metric([user, account, job], int(f_limit.read()))

with open(mem_path + 'memory.stat', 'r') as f_stats:
for line in f_stats.readlines():
data = line.split()
if data[0] == 'total_cache':
gauge_memory_cache.add_metric([user, account, job], int(data[1]))
elif data[0] == 'total_rss':
gauge_memory_rss.add_metric([user, account, job], int(data[1]))
elif data[0] == 'total_rss_huge':
gauge_memory_rss_huge.add_metric([user, account, job], int(data[1]))
elif data[0] == 'total_mapped_file':
gauge_memory_mapped_file.add_metric([user, account, job], int(data[1]))
elif data[0] == 'total_active_file':
gauge_memory_active_file.add_metric([user, account, job], int(data[1]))
elif data[0] == 'total_inactive_file':
gauge_memory_inactive_file.add_metric([user, account, job], int(data[1]))
elif data[0] == 'total_unevictable':
gauge_memory_unevictable.add_metric([user, account, job], int(data[1]))

# get the allocated cores
with open('/sys/fs/cgroup/cpuset/slurm/uid_{}/job_{}/\
cpuset.effective_cpus'.format(uid, job), 'r') as f_cores:
cores = split_range(f_cores.read())
with open('/sys/fs/cgroup/cpu,cpuacct/slurm/uid_{}/job_{}/\
cpuacct.usage_percpu'.format(uid, job), 'r') as f_usage:
except FileNotFoundError:
# 'memory.peak' is only available in kernel 6.8+
pass

with open(os.path.join(job_dir, ('memory.limit_in_bytes' if cgroups == 1 else 'memory.max')), 'r') as f_limit:
gauge_memory_limit.add_metric([user, account, job], int(f_limit.read()))

with open(os.path.join(job_dir, 'memory.stat'), 'r') as f_stats:
stats = dict(line.split() for line in f_stats.readlines())
if cgroups == 1:
gauge_memory_cache.add_metric(
[user, account, job], int(stats['total_cache']))
gauge_memory_rss.add_metric(
[user, account, job], int(stats['total_rss']))
gauge_memory_rss_huge.add_metric(
[user, account, job], int(stats['total_rss_huge']))
gauge_memory_mapped_file.add_metric(
[user, account, job], int(stats['total_mapped_file']))
gauge_memory_active_file.add_metric(
[user, account, job], int(stats['total_active_file']))
gauge_memory_inactive_file.add_metric(
[user, account, job], int(stats['total_inactive_file']))
gauge_memory_unevictable.add_metric(
[user, account, job], int(stats['total_unevictable']))
else:
gauge_memory_cache.add_metric(
[user, account, job], int(stats['file']))
gauge_memory_rss.add_metric(
[user, account, job],
int(stats['anon']) + int(stats['swapcached']))
gauge_memory_rss_huge.add_metric(
[user, account, job], int(stats['anon_thp']))
gauge_memory_mapped_file.add_metric(
[user, account, job],
int(stats['file_mapped']) + int(stats['shmem']))
gauge_memory_active_file.add_metric(
[user, account, job], int(stats['active_file']))
gauge_memory_inactive_file.add_metric(
[user, account, job], int(stats['inactive_file']))
gauge_memory_unevictable.add_metric(
[user, account, job], int(stats['unevictable']))

# get the allocated cores
if cgroups == 1:
cpuset_path = '/sys/fs/cgroup/cpuset/slurm/uid_{}/job_{}/cpuset.effective_cpus'.format(uid, job)
else:
cpuset_path = os.path.join(job_dir, 'cpuset.cpus.effective')

with open(cpuset_path, 'r') as f_cores:
cores = split_range(f_cores.read())

if cgroups == 1:
# There is no equivalent to this in cgroups v2
with open('/sys/fs/cgroup/cpu,cpuacct/slurm/uid_{}/job_{}/cpuacct.usage_percpu'.format(uid, job), 'r') as f_usage:
cpu_usages = f_usage.read().split()
for core in cores:
counter_core_usage.add_metric([user, account, job, str(core)],
int(cpu_usages[core]))

processes = 0
tasks_state = {}
for proc in procs:
try:
p = psutil.Process(proc)
cmdline = p.cmdline()
except psutil.NoSuchProcess:
continue
if len(cmdline) == 0:
# sometimes the cmdline is empty, we don't want to count it
continue
if cmdline[0] == '/bin/bash':
if len(cmdline) > 1:
if '/var/spool' in cmdline[1] and 'slurm_script' in cmdline[1]:
# This is the bash script of the job, we don't want to count it
continue
processes += 1

for t in p.threads():
try:
pt = psutil.Process(t.id)
except psutil.NoSuchProcess:
# The thread disappeared between the time we got the list and now
processes = 0
tasks_state = {}
for proc in procs:
try:
p = psutil.Process(proc)
cmdline = p.cmdline()
except psutil.NoSuchProcess:
continue
if len(cmdline) == 0:
# sometimes the cmdline is empty, we don't want to count it
continue
if cmdline[0] == '/bin/bash':
if len(cmdline) > 1:
if '/var/spool' in cmdline[1] and 'slurm_script' in cmdline[1]:
# This is the bash script of the job, we don't want to count it
continue
pt_status = pt.status()
if pt_status in tasks_state:
tasks_state[pt_status] += 1
else:
tasks_state[pt_status] = 1

for status in tasks_state.keys():
gauge_threads_count.add_metric([user, account, job, status], tasks_state[status])
gauge_process_count.add_metric([user, account, job], processes)
processes += 1

processes_sum = {}
for proc in procs:
# get the counter_process_usage data
for t in p.threads():
try:
p = psutil.Process(proc)
with p.oneshot():
exe = p.exe()
if os.path.basename(exe) in ['ssh', 'sshd', 'bash', 'srun']:
# We don't want to count them
continue
else:
t = p.cpu_times().user + p.cpu_times().system + p.cpu_times().children_user + p.cpu_times().children_system
if exe in processes_sum:
processes_sum[exe] += t
else:
processes_sum[exe] = t
pt = psutil.Process(t.id)
except psutil.NoSuchProcess:
# The thread disappeared between the time we got the list and now
continue
pt_status = pt.status()
if pt_status in tasks_state:
tasks_state[pt_status] += 1
else:
tasks_state[pt_status] = 1

for status in tasks_state.keys():
gauge_threads_count.add_metric([user, account, job, status], tasks_state[status])
gauge_process_count.add_metric([user, account, job], processes)

processes_sum = {}
for proc in procs:
# get the counter_process_usage data
try:
p = psutil.Process(proc)
with p.oneshot():
exe = p.exe()
if os.path.basename(exe) in ['ssh', 'sshd', 'bash', 'srun']:
# We don't want to count them
continue
else:
t = p.cpu_times().user + p.cpu_times().system + p.cpu_times().children_user + p.cpu_times().children_system
if exe in processes_sum:
processes_sum[exe] += t
else:
processes_sum[exe] = t
except psutil.NoSuchProcess:
continue

# we only count the processes that used more than 60 seconds of CPU
processes_sum_filtered = processes_sum.copy()
for exe in processes_sum.keys():
if processes_sum[exe] < 60:
del processes_sum_filtered[exe]
# we only count the processes that used more than 60 seconds of CPU
processes_sum_filtered = processes_sum.copy()
for exe in processes_sum.keys():
if processes_sum[exe] < 60:
del processes_sum_filtered[exe]

for exe in processes_sum_filtered.keys():
counter_process_usage.add_metric([user, account, job, exe], processes_sum_filtered[exe])
for exe in processes_sum_filtered.keys():
counter_process_usage.add_metric([user, account, job, exe], processes_sum_filtered[exe])

if self.MONITOR_PYNVML:
for gpu in gpu_set:
Expand Down

0 comments on commit 556499f

Please sign in to comment.