diff --git a/src/supremm/assets/mongo_setup.js b/src/supremm/assets/mongo_setup.js index 612c7d11..11b85197 100644 --- a/src/supremm/assets/mongo_setup.js +++ b/src/supremm/assets/mongo_setup.js @@ -430,7 +430,12 @@ var summarydef = { "documentation": "Categorization of the CPU utilization of the job as good, pinned, unpinned, or low. A job is good if every core is heavily utilized, and a job is low if the cores are barely utilized or not at all. A pinned job consists of a scenario where a subset of the cores does most of the work, while an unpinned job is when the work is spread unevenly across many cores.", "type": "", "unit": "" - } + }, + "cgroupmemcategories": { + "documentation": "Categorization of the cgroup memory usage of the job as constant, increasing, decreasing, or inconsistent. A job is constant if the memory usage stays approximately the same, increasing if the memory usage increases overall, decreasing if the memory usage decreases overall, or inconsistent if the memory usage does not fall under the previous three categories.", + "type": "", + "unit": "" + }, } }; diff --git a/src/supremm/plugins/CgroupMemCategories.py b/src/supremm/plugins/CgroupMemCategories.py new file mode 100644 index 00000000..4758804d --- /dev/null +++ b/src/supremm/plugins/CgroupMemCategories.py @@ -0,0 +1,95 @@ +#!/usr/bin/env python +""" Memory usage catogorization plugin """ + +import re +import numpy as np +from supremm.plugin import Plugin +from supremm.errors import ProcessingError, NotApplicableError + +class CgroupMemCategories(Plugin): + """ Cgroup memory categorization plugin """ + + name = property(lambda x: "process_memory_categories") + mode = property(lambda x: "all") + requiredMetrics = property(lambda x: ["cgroup.memory.usage"]) + optionalMetrics = property(lambda x: []) + derivedMetrics = property(lambda x: []) + + MIN_DATAPOINTS = 5 + + def __init__(self, job): + super(CgroupMemCategories, self).__init__(job) + self._data = {} + self._hostcounts = {} + if job.acct['resource_manager'] == 'pbs': + self._expectedcgroup = "/torque/{0}".format(job.job_id) + elif job.acct['resource_manager'] == 'slurm': + self._expectedcgroup = "/slurm/uid_{0}/job_{1}".format(job.acct['uid'], job.job_id) + else: + raise NotApplicableError + + def process(self, nodemeta, timestamp, data, description): + """ Categorize a job based on its Cgroup memory usage """ + + if len(data[0]) == 0: + return True + + if nodemeta.nodeindex not in self._data: + self._data[nodemeta.nodeindex] = [] + self._hostcounts[nodemeta.nodeindex] = {"present": 0, "missing": 0} + # First data point for the node is ignored + return True + + try: + dataidx = None + for idx, desc in enumerate(description[0][1]): + if re.match(r"^" + re.escape(self._expectedcgroup) + r"($|\.)", desc): + dataidx = idx + break + # No cgroup info at this datapoint + if dataidx is None: + return True + for i in xrange(len(self.requiredMetrics)): + if len(data[i]) < dataidx: + # Skip timesteps with incomplete information + raise ValueError + + self._hostcounts[nodemeta.nodeindex]["present"] += 1 + except ValueError: + self._hostcounts[nodemeta.nodeindex]["missing"] += 1 + # No cgroup info at this datapoint + return True + + self._data[nodemeta.nodeindex].append(data[0][dataidx]) + + return True + + def results(self): + if len(self._data) != self._job.nodecount: + return {"error": ProcessingError.INSUFFICIENT_HOSTDATA} + + for hoststat in self._hostcounts.itervalues(): + if hoststat['missing'] > hoststat['present']: + return {"error": ProcessingError.CPUSET_UNKNOWN} + + if len(self._data[0]) < self.MIN_DATAPOINTS: + return {"error": ProcessingError.INSUFFICIENT_DATA} + + # Classify the job's memory usage + total = np.sum(list(self._data.itervalues()), 0) + first, middle, last = np.array_split(total, 3) # pylint: disable=unbalanced-tuple-unpacking + first, middle, last = np.median(first), np.median(middle), np.median(last) + + # Threshold used to determine if the job is constant + threshold = middle / 2 + + if abs(middle - first) <= threshold and abs(last - middle) <= threshold: + category = "CONSTANT" + elif first < middle < last: + category = "INCREASING" + elif first > middle > last: + category = "DECREASING" + else: + category = "INCONSISTENT" + + return {"category": category} diff --git a/tests/integration_tests/5894431-1622570028/cpn-d14-02.0 b/tests/integration_tests/5894431-1622570028/cpn-d14-02.0 index 55082a6b..534a955a 100644 Binary files a/tests/integration_tests/5894431-1622570028/cpn-d14-02.0 and b/tests/integration_tests/5894431-1622570028/cpn-d14-02.0 differ diff --git a/tests/integration_tests/5894431-1622570028/cpn-d14-02.index b/tests/integration_tests/5894431-1622570028/cpn-d14-02.index index 7791b742..02fc119d 100644 Binary files a/tests/integration_tests/5894431-1622570028/cpn-d14-02.index and b/tests/integration_tests/5894431-1622570028/cpn-d14-02.index differ diff --git a/tests/integration_tests/5894431-1622570028/cpn-d14-02.meta b/tests/integration_tests/5894431-1622570028/cpn-d14-02.meta index 827b9d11..af339ed8 100644 Binary files a/tests/integration_tests/5894431-1622570028/cpn-d14-02.meta and b/tests/integration_tests/5894431-1622570028/cpn-d14-02.meta differ diff --git a/tests/integration_tests/integration_test.bash b/tests/integration_tests/integration_test.bash index 5031cc79..e4bb10ba 100755 --- a/tests/integration_tests/integration_test.bash +++ b/tests/integration_tests/integration_test.bash @@ -35,3 +35,7 @@ pytest tests/integration_tests/integration_plugin_api.py match=$(python src/supremm/supremm_testharness.py -i CpuCategories tests/integration_tests/5894431-1622570028/ | grep -q "GOOD"; echo $?) [[ $match -eq 0 ]] + +match=$(python src/supremm/supremm_testharness.py -i CgroupMemCategories --job-id=5894431 --acct-uid=545487 tests/integration_tests/5894431-1622570028/ | grep -q "INCONSISTENT"; echo $?) + +[[ $match -eq 0 ]]