Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

plugin: enforce max resource limits per-association #562

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ noinst_HEADERS = \
fairness/writer/data_writer_db.hpp \
fairness/writer/data_writer_stdout.hpp \
plugins/accounting.hpp \
plugins/job.hpp \
plugins/jj.hpp

fairness_libweighted_tree_la_SOURCES = \
Expand Down
2 changes: 1 addition & 1 deletion src/plugins/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ jobtapdir = \
$(fluxlibdir)/job-manager/plugins/

jobtap_LTLIBRARIES = mf_priority.la
mf_priority_la_SOURCES = mf_priority.cpp accounting.cpp jj.cpp
mf_priority_la_SOURCES = mf_priority.cpp accounting.cpp jj.cpp job.cpp
mf_priority_la_CPPFLAGS = -I$(top_srcdir)/src/plugins
mf_priority_la_LDFLAGS = $(fluxplugin_ldflags) -module
42 changes: 42 additions & 0 deletions src/plugins/job.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/************************************************************\
* Copyright 2025 Lawrence Livermore National Security, LLC
* (c.f. AUTHORS, NOTICE.LLNS, COPYING)
*
* This file is part of the Flux resource manager framework.
* For details, see https://github.com/flux-framework.
*
* SPDX-License-Identifier: LGPL-3.0
\************************************************************/

#include "job.hpp"

int count_resources (Job &job, json_t *jobspec)
{
struct jj_counts counts;
if (jj_get_counts_json (jobspec, &counts) < 0)
return -1;

job.nnodes = counts.nnodes;
job.ncores = counts.nslots * counts.slot_size;
return 0;
}


bool contains_dep (const Job &job, const std::string &dep)
{
const auto &job_deps = job.deps;
return std::find (job_deps.begin (),
job_deps.end (),
dep) != job_deps.end ();
}


void remove_dep (Job &job, const std::string &dep)
{
auto &job_deps = job.deps;
job_deps.erase (
std::remove(job_deps.begin (), job_deps.end (), dep),
job_deps.end ()
);
}

57 changes: 57 additions & 0 deletions src/plugins/job.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/************************************************************\
* Copyright 2025 Lawrence Livermore National Security, LLC
* (c.f. AUTHORS, NOTICE.LLNS, COPYING)
*
* This file is part of the Flux resource manager framework.
* For details, see https://github.com/flux-framework.
*
* SPDX-License-Identifier: LGPL-3.0
\************************************************************/

// header file for the Accounting class
extern "C" {
#if HAVE_CONFIG_H
#include "config.h"
#endif
#include <flux/core.h>
#include <flux/jobtap.h>
#include <jansson.h>
}

#ifndef JOB_H
#define JOB_H

#include <vector>
#include <string>
#include <map>
#include <iterator>
#include <sstream>
#include <algorithm>

// custom job resource counting file
#include "jj.hpp"

// all attributes are per-user/bank
class Job {
public:
// attributes
long int id = 0; // the ID of the job
std::vector<std::string> deps; // any dependencies on job
int nnodes = 0; // the number of nodes requested
int ncores = 0; // the number of cores requested

// constructor
Job () = default;
Job (long int id_) : id (id_), nnodes (0), ncores (0) {}
};

// count the resources requested for a job
int count_resources (Job &job, json_t *jobspec);

// determine if a job contains a certain dependency
bool contains_dep (const Job &job, const std::string &dep);

// remove a job dependency from a job's list of dependencies
void remove_dep (Job &job, const std::string &dep);

#endif // JOB_H
138 changes: 125 additions & 13 deletions src/plugins/mf_priority.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ extern "C" {

// custom bank_info class file
#include "accounting.hpp"
// custom job class file
#include "job.hpp"
// custom job resource counting file
#include "jj.hpp"

Expand All @@ -50,6 +52,9 @@ std::map<std::string, Queue> queues;
std::map<int, std::string> users_def_bank;
std::vector<std::string> projects;
std::map<std::string, int> priority_weights;
// map to keep track of which flux-accounting dependencies are
// associated with a held job
std::map<long int, Job> held_jobs;

/******************************************************************************
* *
Expand Down Expand Up @@ -869,12 +874,16 @@ static int depend_cb (flux_plugin_t *p,
int userid;
long int id;
Association *b;
json_t *jobspec = NULL;
Job job;

flux_t *h = flux_jobtap_get_flux (p);
if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_IN,
"{s:i, s:I}",
"userid", &userid, "id", &id) < 0) {
"{s:i, s:I, s:o}",
"userid", &userid,
"id", &id,
"jobspec", &jobspec) < 0) {
flux_log (h,
LOG_ERR,
"flux_plugin_arg_unpack: %s",
Expand All @@ -895,6 +904,27 @@ static int depend_cb (flux_plugin_t *p,
return -1;
}

if (jobspec == NULL) {
flux_jobtap_raise_exception (p,
FLUX_JOBTAP_CURRENT_JOB,
"mf_priority",
0,
"job.state.depend: failed to unpack " \
"jobspec");
return -1;
} else {
// count resources requested for the job
if (count_resources (job, jobspec) < 0) {
flux_jobtap_raise_exception (p,
FLUX_JOBTAP_CURRENT_JOB,
"mf_priority",
0,
"job.state.depend: unable to " \
"unpack jobspec");
return -1;
}
}

// if user has already hit their max running jobs count, add a job
// dependency to hold job until an already running job has finished
if ((b->max_run_jobs > 0) && (b->cur_run_jobs == b->max_run_jobs)) {
Expand All @@ -908,6 +938,40 @@ static int depend_cb (flux_plugin_t *p,
return -1;
}
b->held_jobs.push_back (id);
// add job to plugin's internal map of held jobs, initialize the
// Job object with the job ID and resources
held_jobs[id].id = id;
held_jobs[id].nnodes = job.nnodes;
held_jobs[id].ncores = job.ncores;
held_jobs[id].deps.push_back ("max-running-jobs-user-limit");
}

if ((b->max_nodes > 0 && b->max_cores > 0) &&
(((b->cur_nodes + job.nnodes) > b->max_nodes) ||
((b->cur_cores + job.ncores) > b->max_cores))) {
// the job would put the association over either their max cores or
// max nodes limit(s); add a dependency on the job
if (flux_jobtap_dependency_add (p,
id,
"max-resource-user-limit") < 0) {
flux_jobtap_raise_exception (p,
FLUX_JOBTAP_CURRENT_JOB,
"mf_priority",
0,
"failed to add job dependency");

return -1;
}
held_jobs[id].id = id;
held_jobs[id].nnodes = job.nnodes;
held_jobs[id].ncores = job.ncores;
held_jobs[id].deps.push_back ("max-resource-user-limit");
if (std::find (b->held_jobs.begin (),
b->held_jobs.end (),
id) == b->held_jobs.end ()) {
// add held job to Association object
b->held_jobs.push_back (id);
}
}

return 0;
Expand Down Expand Up @@ -1225,21 +1289,69 @@ static int inactive_cb (flux_plugin_t *p,
}
}

// if the user/bank combo has any currently held jobs and the user is now
// under their max jobs limit, remove the dependency from first held job
if ((b->held_jobs.size () > 0) && (b->cur_run_jobs < b->max_run_jobs)) {
if (b->held_jobs.size () > 0) {
// association has at least one held job; check to see if it fits all
// requirements in order to be released
long int jobid = b->held_jobs.front ();

if (flux_jobtap_dependency_remove (p,
jobid,
"max-running-jobs-user-limit") < 0)
{
flux_jobtap_raise_exception (p, jobid, "mf_priority",
0, "failed to remove job dependency");
if (held_jobs.find (jobid) == held_jobs.end ()) {
// held job cannot be found in plugin's internal map; raise error
flux_jobtap_raise_exception (p,
FLUX_JOBTAP_CURRENT_JOB,
"mf_priority",
0,
"job.state.inactive: failed to " \
"locate held job in priority plugin");
return -1;
}

b->held_jobs.erase (b->held_jobs.begin ());
if (b->cur_run_jobs < b->max_run_jobs) {
// association is under their max running jobs limit;
// see if the job has this dependency and remove it
if (contains_dep (held_jobs[jobid],
"max-running-jobs-user-limit")) {
if (flux_jobtap_dependency_remove (
p,
jobid,
"max-running-jobs-user-limit") < 0) {
flux_jobtap_raise_exception (p,
jobid,
"mf_priority",
0,
"failed to remove " \
"running jobs " \
"dependency");
}
// remove dependency from job ID's entry in plugin's
// internal map of held jobs
remove_dep (held_jobs[jobid], "max-running-jobs-user-limit");
}
}

if (((b->cur_nodes + held_jobs[jobid].nnodes) <= b->max_nodes) &&
((b->cur_cores + held_jobs[jobid].ncores) <= b->max_cores)) {
// association is under their max resources limit;
// see if the job has this dependency and remove it
if (contains_dep (held_jobs[jobid], "max-resource-user-limit")) {
if (flux_jobtap_dependency_remove (
p,
jobid,
"max-resource-user-limit") < 0) {
flux_jobtap_raise_exception (p,
jobid,
"mf_priority",
0,
"failed to remove " \
"max resources " \
"dependency");
}
}
remove_dep (held_jobs[jobid], "max-resource-user-limit");
}
if (held_jobs[jobid].deps.size () == 0)
// job no longer has any dependencies on it and was removed
// from the plugin's internal map of held jobs; also remove
// it from the Association object
b->held_jobs.erase (b->held_jobs.begin ());
}

return 0;
Expand Down
24 changes: 18 additions & 6 deletions t/t1005-max-jobs-limits.t
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ test_expect_success 'create fake_user.json' '
"queues": "",
"active": 1,
"projects": "*",
"def_project": "*"
"def_project": "*",
"max_nodes": 2147483647,
"max_cores": 2147483647
},
{
"userid": 5011,
Expand All @@ -54,7 +56,9 @@ test_expect_success 'create fake_user.json' '
"queues": "",
"active": 1,
"projects": "*",
"def_project": "*"
"def_project": "*",
"max_nodes": 2147483647,
"max_cores": 2147483647
}
]
}
Expand Down Expand Up @@ -152,7 +156,9 @@ test_expect_success 'increase the max jobs count of the user' '
"queues": "",
"active": 1,
"projects": "*",
"def_project": "*"
"def_project": "*",
"max_nodes": 2147483647,
"max_cores": 2147483647
}
]
}
Expand Down Expand Up @@ -204,7 +210,9 @@ test_expect_success 'update max_active_jobs limit' '
"queues": "",
"active": 1,
"projects": "*",
"def_project": "*"
"def_project": "*",
"max_nodes": 2147483647,
"max_cores": 2147483647
}
]
}
Expand Down Expand Up @@ -264,7 +272,9 @@ test_expect_success 'create another user with the same limits in multiple banks'
"queues": "",
"active": 1,
"projects": "*",
"def_project": "*"
"def_project": "*",
"max_nodes": 2147483647,
"max_cores": 2147483647
},
{
"userid": 5012,
Expand All @@ -276,7 +286,9 @@ test_expect_success 'create another user with the same limits in multiple banks'
"queues": "",
"active": 1,
"projects": "*",
"def_project": "*"
"def_project": "*",
"max_nodes": 2147483647,
"max_cores": 2147483647
}
]
}
Expand Down
8 changes: 6 additions & 2 deletions t/t1012-mf-priority-load.t
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ test_expect_success 'create fake_payload.py' '
"queues": "",
"active": 1,
"projects": "*",
"def_project": "*"
"def_project": "*",
"max_nodes": 2147483647,
"max_cores": 2147483647
},
{
"userid": userid,
Expand All @@ -54,7 +56,9 @@ test_expect_success 'create fake_payload.py' '
"queues": "",
"active": 1,
"projects": "*",
"def_project": "*"
"def_project": "*",
"max_nodes": 2147483647,
"max_cores": 2147483647
}
]
}
Expand Down
Loading
Loading