Skip to content

Commit

Permalink
Merge pull request #151 from rosswhitfield/srun_openmp
Browse files Browse the repository at this point in the history
Add task_cpp option in launch_task for srun, set OpenMP environment variables
  • Loading branch information
rosswhitfield authored Dec 22, 2021
2 parents f4f41cd + b686c77 commit 4988459
Show file tree
Hide file tree
Showing 9 changed files with 612 additions and 61 deletions.
11 changes: 7 additions & 4 deletions doc/development.rst
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,11 @@ The are some tests that only run on Cori at NERSC and these are not
run as part of the :ref:`CI <continuous integration>` and must be run
manually. To run those test you need to add the option ``--runcori``
to the ``pytest``. There are tests for the :ref:`shifter
functionally<dask_shifter>` that is Cori specific.
functionally<dask_shifter>` that is Cori specific. There are also
tests for the srun commands built with different ``task_ppn`` and
``task_cpp`` options in
:meth:`~ipsframework.services.ServicesProxy.launch_task`.


An example batch script for running the unit tests is:

Expand All @@ -153,8 +157,6 @@ An example batch script for running the unit tests is:
#!/bin/bash
#SBATCH -p debug
#SBATCH --nodes=1
#SBATCH --tasks-per-node=1
#SBATCH --cpus-per-task=32
#SBATCH -t 00:10:00
#SBATCH -C haswell
#SBATCH -J pytest
Expand All @@ -164,7 +166,8 @@ An example batch script for running the unit tests is:
module load python/3.8-anaconda-2020.11
python -m pytest --runcori
The check the output in ``pytest.out`` to see that all the tests passed.
Then check the output in ``pytest.out`` to see that all the tests
passed.

Writing Tests
~~~~~~~~~~~~~
Expand Down
123 changes: 122 additions & 1 deletion doc/user_guides/advanced_guide.rst
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,128 @@ Component invocation in the IPS means one component is calling another component
Task Launch
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

The task launch interface allows components to launch and manage the execution of (parallel) executables. Similar to the component invocation interface, the behavior of *launch_task* and the *wait_task* variants are controlled using the *block* keyword argument and different interfaces to *wait_task*.
The task launch interface allows components to launch and manage the
execution of (parallel) executables. Similar to the component
invocation interface, the behavior of
:py:meth:`~ipsframework.services.ServicesProxy.launch_task` and the
:py:meth:`~ipsframework.services.ServicesProxy.wait_task` variants are
controlled using the *block* keyword argument and different interfaces
to *wait_task*.

The ``task_ppn`` and ``task_cpp`` options all greater control over how
commands are made. ``task_ppn`` will limit the number of task per
node, ``task_ccp`` will limit the number of cores assigned to each
process, this is only used when ``MPIRUN=srun``, if ``task_cpp`` is
not set it will be calculated automatically.

~~~~~~~~~~~~~~
Slurm examples
~~~~~~~~~~~~~~

The following examples show the behavior if you are running on a `Cori
<https://docs.nersc.gov/systems/cori>`_ with 32 cores per node.

Using the `check-mpi.gnu.cori
<https://docs.nersc.gov/jobs/affinity/#use-nersc-prebuilt-binaries>`_
binary provided on Cori with ``nproc=8`` without specifying other
options:

.. code-block:: python
self.services.launch_task(8, cwd, "check-mpi.gnu.cori")
the ``srun`` command created will be ``srun -N 1 -n 8 -c
4 --threads-per-core=1 --cpu-bind=cores check-mpi.gnu.cori`` along
with settings the environment variables for OpenMP
``OMP_PLACES=threads OMP_PROC_BIND=spread OMP_NUM_THREADS=4``. The
resulting core affinity is

.. code-block:: text
Hello from rank 0, on nid00025. (core affinity = 0-3)
Hello from rank 1, on nid00025. (core affinity = 16-19)
Hello from rank 2, on nid00025. (core affinity = 4-7)
Hello from rank 3, on nid00025. (core affinity = 20-23)
Hello from rank 4, on nid00025. (core affinity = 8-11)
Hello from rank 5, on nid00025. (core affinity = 24-27)
Hello from rank 6, on nid00025. (core affinity = 12-15)
Hello from rank 7, on nid00025. (core affinity = 28-31)
If you also include the option ``task_ppn=4``:

.. code-block:: python
self.services.launch_task(8, cwd, "check-mpi.gnu.cori", task_ppn=4)
then the command created will be ``srun -N 2 -n 8 -c
8 --threads-per-core=1 --cpu-bind=cores check-mpi.gnu.cori`` along
with settings the environment variables for OpenMP
``OMP_PLACES=threads OMP_PROC_BIND=spread OMP_NUM_THREADS=8``. The
resulting core affinity is

.. code-block:: text
Hello from rank 0, on nid00025. (core affinity = 0-7)
Hello from rank 1, on nid00025. (core affinity = 16-23)
Hello from rank 2, on nid00025. (core affinity = 8-15)
Hello from rank 3, on nid00025. (core affinity = 24-31)
Hello from rank 4, on nid00026. (core affinity = 0-7)
Hello from rank 5, on nid00026. (core affinity = 16-23)
Hello from rank 6, on nid00026. (core affinity = 8-15)
Hello from rank 7, on nid00026. (core affinity = 24-31)
You can limit the ``--cpus-per-task`` of the ``srun`` command by
setting ``task_cpp``, adding ``task_cpp=2``

.. code-block:: python
self.services.launch_task(8, cwd, "check-mpi.gnu.cori", task_ppn=4, task_cpp=2)
will create the command ``srun -N 2 -n 8 -c
2 --threads-per-core=1 --cpu-bind=cores check-mpi.gnu.cori`` and set
``OMP_PLACES=threads OMP_PROC_BIND=spread OMP_NUM_THREADS=2``. This
will result in under-utilizing the nodes, which may be needed if your
task is memory bound. The resulting core affinity is

.. code-block:: text
Hello from rank 0, on nid00025. (core affinity = 0,1)
Hello from rank 1, on nid00025. (core affinity = 16,17)
Hello from rank 2, on nid00025. (core affinity = 2,3)
Hello from rank 3, on nid00025. (core affinity = 18,19)
Hello from rank 4, on nid00026. (core affinity = 0,1)
Hello from rank 5, on nid00026. (core affinity = 16,17)
Hello from rank 6, on nid00026. (core affinity = 2,3)
Hello from rank 7, on nid00026. (core affinity = 18,19)
Using the `check-hybrid.gnu.cori
<https://docs.nersc.gov/jobs/affinity/#use-nersc-prebuilt-binaries>`_
binary with the same options:

.. code-block:: python
self.services.launch_task(8, cwd, "check-hybrid.gnu.cori", task_ppn=4, task_cpp=2)
the resulting core affinity of the OpenMP threads are:

.. code-block:: text
Hello from rank 0, thread 0, on nid00025. (core affinity = 0)
Hello from rank 0, thread 1, on nid00025. (core affinity = 1)
Hello from rank 1, thread 0, on nid00025. (core affinity = 16)
Hello from rank 1, thread 1, on nid00025. (core affinity = 17)
Hello from rank 2, thread 0, on nid00025. (core affinity = 2)
Hello from rank 2, thread 1, on nid00025. (core affinity = 3)
Hello from rank 3, thread 0, on nid00025. (core affinity = 18)
Hello from rank 3, thread 1, on nid00025. (core affinity = 19)
Hello from rank 4, thread 0, on nid00026. (core affinity = 0)
Hello from rank 4, thread 1, on nid00026. (core affinity = 1)
Hello from rank 5, thread 0, on nid00026. (core affinity = 16)
Hello from rank 5, thread 1, on nid00026. (core affinity = 17)
Hello from rank 6, thread 0, on nid00026. (core affinity = 2)
Hello from rank 6, thread 1, on nid00026. (core affinity = 3)
Hello from rank 7, thread 0, on nid00026. (core affinity = 18)
Hello from rank 7, thread 1, on nid00026. (core affinity = 19)
.. automethod:: ipsframework.services.ServicesProxy.launch_task
:noindex:
Expand Down
18 changes: 16 additions & 2 deletions ipsframework/resourceManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ def add_nodes(self, listOfNodes):
# RM getAllocation
# pylint: disable=inconsistent-return-statements
def get_allocation(self, comp_id, nproc, task_id,
whole_nodes, whole_socks, task_ppn=0):
whole_nodes, whole_socks, task_ppn=0, task_cpp=0):
"""
Traverse available nodes to return:
Expand Down Expand Up @@ -316,6 +316,20 @@ def get_allocation(self, comp_id, nproc, task_id,
allocation_possible = False
if whole_nodes:
allocation_possible, nodes = self.check_whole_node_cap(nproc, ppn)

if allocation_possible:
num_cores = self.cores_per_node
max_cpp = num_cores//ppn
if task_cpp > 0:
if task_cpp > max_cpp:
self.fwk.warning(f"task cpp ({task_cpp}) exceeds maximum possible for {ppn} procs per node "
f"with {num_cores} cores per node, using {max_cpp} cpus per proc instead")
cpp = max_cpp
else:
cpp = task_cpp
else:
cpp = max_cpp

elif whole_socks:
allocation_possible, nodes = self.check_whole_sock_cap(nproc, ppn)
else:
Expand Down Expand Up @@ -429,7 +443,7 @@ def get_allocation(self, comp_id, nproc, task_id,

if whole_nodes:
self.report_RM_status("allocation for task %d using whole nodes" % task_id)
return not whole_nodes, nodes, ppn, self.max_ppn, self.accurateNodes
return not whole_nodes, nodes, ppn, self.max_ppn, cpp, self.accurateNodes
else:
self.report_RM_status("allocation for task %d using partial nodes" % task_id)
return not whole_nodes, nodes, node_file_entries, ppn, self.max_ppn, self.accurateNodes
Expand Down
33 changes: 25 additions & 8 deletions ipsframework/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ def __init__(self, fwk, fwk_in_q, svc_response_q, sim_conf, log_pipe_name):
self.sub_flows = {}
self.binary_fullpath_cache = {}
self.ppn = 0
self.cpp = 0
self.shared_nodes = False

def __initialize__(self, component_ref):
Expand Down Expand Up @@ -241,6 +242,11 @@ def __initialize__(self, component_ref):
except Exception:
self.ppn = 0

try:
self.cpp = int(conf['CPUS_PER_PROC'])
except Exception:
self.cpp = 0

if self.sim_conf['SIMULATION_MODE'] == 'RESTART':
if self.sim_conf['RESTART_TIME'] == 'LATEST':
chkpts = glob.glob(os.path.join(self.sim_conf['RESTART_ROOT'], 'restart', '*'))
Expand Down Expand Up @@ -548,6 +554,7 @@ def launch_task(self, nproc, working_dir, binary, *args, **keywords):
manage how the binary is launched. Keywords may be the following:
* *task_ppn* : the processes per node value for this task
* *task_cpp* : the cores per process, only used when ``MPIRUN=srun`` commands
* *block* : specifies that this task will block (or raise an
exception) if not enough resources are available to run
immediately. If ``True``, the task will be retried until it
Expand Down Expand Up @@ -611,6 +618,7 @@ def launch_task(self, nproc, working_dir, binary, *args, **keywords):
self.binary_fullpath_cache[binary] = binary_fullpath

task_ppn = keywords.get('task_ppn', self.ppn)
task_cpp = keywords.get('task_cpp', self.cpp)
block = keywords.get('block', True)
tag = keywords.get('tag', 'None')

Expand All @@ -622,14 +630,18 @@ def launch_task(self, nproc, working_dir, binary, *args, **keywords):
msg_id = self._invoke_service(self.fwk.component_id,
'init_task', nproc, binary_fullpath,
working_dir, task_ppn, block,
whole_nodes, whole_socks, *args)
whole_nodes, whole_socks, task_cpp, *args)
(task_id, command, env_update) = self._get_service_response(msg_id, block=True)
except Exception:
raise

task_id = self._launch_task(nproc, working_dir, task_id, command, env_update, tag, keywords)
self._send_monitor_event('IPS_LAUNCH_TASK', 'task_id = %s , Tag = %s , nproc = %d , Target = %s' %
(str(task_id), tag, int(nproc), command))

if env_update:
self._send_monitor_event('IPS_LAUNCH_TASK', f'task_id = {task_id} , Tag = {tag} , nproc = {nproc} , Target = {command}, env = {env_update}')
else:
self._send_monitor_event('IPS_LAUNCH_TASK', f'task_id = {task_id} , Tag = {tag} , nproc = {nproc} , Target = {command}')

return task_id

def _launch_task(self, nproc, working_dir, task_id, command, env_update, tag, keywords):
Expand Down Expand Up @@ -662,7 +674,7 @@ def _launch_task(self, nproc, working_dir, task_id, command, env_update, tag, ke
try:
self.debug('Launching command : %s', command)
if env_update:
new_env = os.environ
new_env = os.environ.copy()
new_env.update(env_update)
process = subprocess.Popen(cmd_lst, stdout=task_stdout,
stderr=task_stderr,
Expand Down Expand Up @@ -707,9 +719,10 @@ def launch_task_pool(self, task_pool_name, launch_interval=0.0):
task_ppn = task.keywords.get('task_ppn', self.ppn)
wnodes = task.keywords.get('whole_nodes', not self.shared_nodes)
wsocks = task.keywords.get('whole_sockets', not self.shared_nodes)
task_cpp = task.keywords.get('task_cpp', self.cpp)
submit_dict[task_name] = (task.nproc, task.working_dir,
task.binary, task.args,
task_ppn, wnodes, wsocks)
task_ppn, wnodes, wsocks, task_cpp)

try:
msg_id = self._invoke_service(self.fwk.component_id,
Expand All @@ -729,9 +742,13 @@ def launch_task_pool(self, task_pool_name, launch_interval=0.0):

active_tasks[task_name] = self._launch_task(task.nproc, task.working_dir, task_id, command, env_update, tag, task.keywords)

self._send_monitor_event('IPS_LAUNCH_TASK_POOL',
'task_id = %s , Tag = %s , nproc = %d , Target = %s , task_name = %s' %
(str(task_id), str(tag), int(task.nproc), command, task_name))
if env_update:
self._send_monitor_event('IPS_LAUNCH_TASK_POOL',
f'task_id = {task_id} , Tag = {tag} , nproc = {task.nproc} , Target = {command} , task_name = {task_name}'
f', env = {env_update}')
else:
self._send_monitor_event('IPS_LAUNCH_TASK_POOL',
f'task_id = {task_id} , Tag = {tag} , nproc = {task.nproc} , Target = {command} , task_name = {task_name}')

return active_tasks

Expand Down
38 changes: 27 additions & 11 deletions ipsframework/taskManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,13 @@ def init_task(self, init_task_msg):
block = init_task_msg.args[4] # Block waiting for available resources
wnodes = init_task_msg.args[5]
wsocks = init_task_msg.args[6]
tcpp = init_task_msg.args[7]

# SIMYAN: increased arguments
cmd_args = init_task_msg.args[7:]
cmd_args = init_task_msg.args[8:]

try:
return self._init_task(caller_id, nproc, binary, working_dir, tppn, wnodes, wsocks, cmd_args)
return self._init_task(caller_id, nproc, binary, working_dir, tppn, tcpp, wnodes, wsocks, cmd_args)
except InsufficientResourcesException:
if block:
raise BlockedMessageException(init_task_msg, '***%s waiting for %d resources' %
Expand All @@ -245,7 +246,7 @@ def init_task(self, init_task_msg):
except Exception:
raise

def _init_task(self, caller_id, nproc, binary, working_dir, tppn, wnodes, wsocks, cmd_args):
def _init_task(self, caller_id, nproc, binary, working_dir, tppn, tcpp, wnodes, wsocks, cmd_args):
# handle for task related things
task_id = self.get_task_id()

Expand All @@ -254,13 +255,14 @@ def _init_task(self, caller_id, nproc, binary, working_dir, tppn, wnodes, wsocks
task_id,
wnodes,
wsocks,
task_ppn=tppn)
task_ppn=tppn,
task_cpp=tcpp)
self.fwk.debug('RM: get_allocation() returned %s', str(retval))
partial_node = retval[0]
if partial_node:
(nodelist, corelist, ppn, max_ppn, accurateNodes) = retval[1:]
else:
(nodelist, ppn, max_ppn, accurateNodes) = retval[1:]
(nodelist, ppn, max_ppn, cpp, accurateNodes) = retval[1:]

if partial_node:
nodes = ','.join(nodelist)
Expand All @@ -279,7 +281,8 @@ def _init_task(self, caller_id, nproc, binary, working_dir, tppn, wnodes, wsocks
working_dir, ppn,
max_ppn, nodes,
accurateNodes,
False, task_id)
False, task_id,
cpp)

self.curr_task_table[task_id] = {'component': caller_id,
'status': 'init_task',
Expand All @@ -293,7 +296,7 @@ def _init_task(self, caller_id, nproc, binary, working_dir, tppn, wnodes, wsocks

def build_launch_cmd(self, nproc, binary, cmd_args, working_dir, ppn,
max_ppn, nodes, accurateNodes, partial_nodes,
task_id, core_list=''):
task_id, cpp=0, core_list=''):
"""
Construct task launch command to be executed by the component.
Expand Down Expand Up @@ -504,8 +507,21 @@ def build_launch_cmd(self, nproc, binary, cmd_args, working_dir, ppn,
nproc_flag = '-n'
nnodes_flag = '-N'
num_nodes = len(nodes.split(','))
cmd = ' '.join([self.task_launch_cmd, nnodes_flag,
str(num_nodes), nproc_flag, str(nproc)])
if partial_nodes:
cmd = ' '.join([self.task_launch_cmd,
nnodes_flag, str(num_nodes),
nproc_flag, str(nproc)])
else:
cpuptask_flag = '-c'
cpubind_flag = '--threads-per-core=1 --cpu-bind=cores'
cmd = ' '.join([self.task_launch_cmd,
nnodes_flag, str(num_nodes),
nproc_flag, str(nproc),
cpuptask_flag, str(cpp),
cpubind_flag])
env_update = {'OMP_PLACES': 'threads',
'OMP_PROC_BIND': 'spread',
'OMP_NUM_THREADS': str(cpp)}
else:
self.fwk.error("invalid task launch command.")
raise RuntimeError("invalid task launch command.")
Expand All @@ -532,10 +548,10 @@ def init_task_pool(self, init_task_msg):
ret_dict = {}
for task_name in task_dict:
# handle for task related things
(nproc, working_dir, binary, cmd_args, tppn, wnodes, wsocks) = task_dict[task_name]
(nproc, working_dir, binary, cmd_args, tppn, wnodes, wsocks, tcpp) = task_dict[task_name]

try:
ret_dict[task_name] = self._init_task(caller_id, nproc, binary, working_dir, tppn, wnodes, wsocks, cmd_args)
ret_dict[task_name] = self._init_task(caller_id, nproc, binary, working_dir, tppn, tcpp, wnodes, wsocks, cmd_args)
except InsufficientResourcesException:
continue
except BadResourceRequestException as e:
Expand Down
Loading

0 comments on commit 4988459

Please sign in to comment.