Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add plugin for categorizing memory usage #247

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/supremm/assets/mongo_setup.js
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,12 @@ var summarydef = {
"documentation": "Categorization of the CPU utilization of the job as good, pinned, unpinned, or low. A job is good if every core is heavily utilized, and a job is low if the cores are barely utilized or not at all. A pinned job consists of a scenario where a subset of the cores does most of the work, while an unpinned job is when the work is spread unevenly across many cores.",
"type": "",
"unit": ""
}
},
"cgroupmemcategories": {
"documentation": "Categorization of the cgroup memory usage of the job as constant, increasing, decreasing, or inconsistent. A job is constant if the memory usage stays approximately the same, increasing if the memory usage increases overall, decreasing if the memory usage decreases overall, or inconsistent if the memory usage does not fall under the previous three categories.",
"type": "",
"unit": ""
},
}
};

Expand Down
95 changes: 95 additions & 0 deletions src/supremm/plugins/CgroupMemCategories.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
#!/usr/bin/env python
""" Memory usage catogorization plugin """

import re
import numpy as np
from supremm.plugin import Plugin
from supremm.errors import ProcessingError, NotApplicableError

class CgroupMemCategories(Plugin):
""" Cgroup memory categorization plugin """

name = property(lambda x: "process_memory_categories")
mode = property(lambda x: "all")
requiredMetrics = property(lambda x: ["cgroup.memory.usage"])
optionalMetrics = property(lambda x: [])
derivedMetrics = property(lambda x: [])

MIN_DATAPOINTS = 5

def __init__(self, job):
super(CgroupMemCategories, self).__init__(job)
self._data = {}
self._hostcounts = {}
if job.acct['resource_manager'] == 'pbs':
self._expectedcgroup = "/torque/{0}".format(job.job_id)
elif job.acct['resource_manager'] == 'slurm':
self._expectedcgroup = "/slurm/uid_{0}/job_{1}".format(job.acct['uid'], job.job_id)
else:
raise NotApplicableError

def process(self, nodemeta, timestamp, data, description):
""" Categorize a job based on its Cgroup memory usage """

if len(data[0]) == 0:
return True

if nodemeta.nodeindex not in self._data:
self._data[nodemeta.nodeindex] = []
self._hostcounts[nodemeta.nodeindex] = {"present": 0, "missing": 0}
# First data point for the node is ignored
return True

try:
dataidx = None
for idx, desc in enumerate(description[0][1]):
if re.match(r"^" + re.escape(self._expectedcgroup) + r"($|\.)", desc):
dataidx = idx
break
# No cgroup info at this datapoint
if dataidx is None:
return True
for i in xrange(len(self.requiredMetrics)):
if len(data[i]) < dataidx:
# Skip timesteps with incomplete information
raise ValueError

self._hostcounts[nodemeta.nodeindex]["present"] += 1
except ValueError:
self._hostcounts[nodemeta.nodeindex]["missing"] += 1
# No cgroup info at this datapoint
return True

self._data[nodemeta.nodeindex].append(data[0][dataidx])

return True

def results(self):
if len(self._data) != self._job.nodecount:
return {"error": ProcessingError.INSUFFICIENT_HOSTDATA}

for hoststat in self._hostcounts.itervalues():
if hoststat['missing'] > hoststat['present']:
return {"error": ProcessingError.CPUSET_UNKNOWN}

if len(self._data[0]) < self.MIN_DATAPOINTS:
return {"error": ProcessingError.INSUFFICIENT_DATA}

# Classify the job's memory usage
total = np.sum(list(self._data.itervalues()), 0)
first, middle, last = np.array_split(total, 3) # pylint: disable=unbalanced-tuple-unpacking
first, middle, last = np.median(first), np.median(middle), np.median(last)

# Threshold used to determine if the job is constant
threshold = middle / 2

if abs(middle - first) <= threshold and abs(last - middle) <= threshold:
category = "CONSTANT"
elif first < middle < last:
category = "INCREASING"
elif first > middle > last:
category = "DECREASING"
else:
category = "INCONSISTENT"

return {"category": category}
Binary file modified tests/integration_tests/5894431-1622570028/cpn-d14-02.0
Binary file not shown.
Binary file modified tests/integration_tests/5894431-1622570028/cpn-d14-02.index
Binary file not shown.
Binary file modified tests/integration_tests/5894431-1622570028/cpn-d14-02.meta
Binary file not shown.
4 changes: 4 additions & 0 deletions tests/integration_tests/integration_test.bash
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,7 @@ pytest tests/integration_tests/integration_plugin_api.py
match=$(python src/supremm/supremm_testharness.py -i CpuCategories tests/integration_tests/5894431-1622570028/ | grep -q "GOOD"; echo $?)

[[ $match -eq 0 ]]

match=$(python src/supremm/supremm_testharness.py -i CgroupMemCategories --job-id=5894431 --acct-uid=545487 tests/integration_tests/5894431-1622570028/ | grep -q "INCONSISTENT"; echo $?)

[[ $match -eq 0 ]]