From 28ce7fffb6b7c2bf401edf96ca1084a493570b2f Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Tue, 29 Oct 2024 21:43:15 -0500 Subject: [PATCH] updates --- workflows/balsam/.gitignore | 3 +- .../globus_compute/1_register_function.py | 2 +- workflows/globus_compute/README.md | 33 +-- workflows/globus_compute/polaris_config.yaml | 4 - workflows/parsl/5_mpi_app_example.py | 61 +++++ workflows/parsl/README.md | 248 +++++++++++++++++- 6 files changed, 328 insertions(+), 23 deletions(-) create mode 100644 workflows/parsl/5_mpi_app_example.py diff --git a/workflows/balsam/.gitignore b/workflows/balsam/.gitignore index 7b7c6b0..a7e386f 100644 --- a/workflows/balsam/.gitignore +++ b/workflows/balsam/.gitignore @@ -1,2 +1,3 @@ *~ -*.png \ No newline at end of file +*.png +polaris_tutorial \ No newline at end of file diff --git a/workflows/globus_compute/1_register_function.py b/workflows/globus_compute/1_register_function.py index 445c70d..c0dfa12 100644 --- a/workflows/globus_compute/1_register_function.py +++ b/workflows/globus_compute/1_register_function.py @@ -13,7 +13,7 @@ def hello_affinity(run_directory): os.chdir(os.path.expandvars(run_directory)) # This is the command that calls the compiled executable - command = "/eagle/fallwkshp23/workflows/affinity_gpu/hello_affinity" + command = "/grand/alcf_training/workflows_2024/GettingStarted/Examples/Polaris/affinity_gpu/hello_affinity" # This runs the application command res = subprocess.run(command.split(" "), stdout=subprocess.PIPE, stderr=subprocess.PIPE) diff --git a/workflows/globus_compute/README.md b/workflows/globus_compute/README.md index 9cffd73..e34f0ef 100644 --- a/workflows/globus_compute/README.md +++ b/workflows/globus_compute/README.md @@ -22,7 +22,7 @@ On Polaris, you will need to create a python virtual environment or a conda envi For the workshop, you can use the workshop python virtual environment: ```bash -source /eagle/fallwkshp23/workflows/env/bin/activate +source /grand/alcf_training/workflows_2024/_env/bin/activate ``` To create your own environment: @@ -60,7 +60,7 @@ git clone git@github.com:argonne-lcf/ALCF_Hands_on_HPC_Workshop.git cd ALCF_Hands_on_HPC_Workshop/workflows/globus_compute # If you haven't already, activate the environment -source /eagle/fallwkshp23/workflows/env/bin/activate +source /grand/alcf_training/workflows_2024/_env/bin/activate ``` Use the sample config [polaris_config.yaml](polaris_config.yaml) provided to configure and start your endpoint. The sample config has similar features to the Parsl config and looks like this: @@ -70,16 +70,17 @@ engine: type: GlobusComputeEngine available_accelerators: 4 # Assign one worker per GPU - cpu_affinity: block-reverse # Assigns cpus in reverse sequential order - prefetch_capacity: 0 # Increase if you have many more tasks than workers - - address: - type: address_by_interface - ifname: bond0 + max_workers_per_node: 4 + + cpu_affinity: "list:24-31,56-63:16-23,48-55:8-15,40-47:0-7,32-39" + + prefetch_capacity: 0 # Increase if you have many more tasks than workers + max_retries_on_system_failure: 2 - strategy: - type: SimpleStrategy + strategy: simple + job_status_kwargs: max_idletime: 300 + strategy_period: 60 provider: type: PBSProProvider @@ -90,18 +91,18 @@ engine: bind_cmd: --cpu-bind overrides: --ppn 1 - account: fallwkshp23 - queue: fallws23single + account: alcf_training + queue: HandsOnHPC cpus_per_node: 64 select_options: ngpus=4 # e.g., "#PBS -l filesystems=home:grand:eagle\n#PBS -k doe" - scheduler_options: "#PBS -l filesystems=home:eagle" + scheduler_options: "#PBS -l filesystems=home:eagle:grand" # Node setup: activate necessary conda environment and such - worker_init: "source /eagle/fallwkshp23/workflows/env/bin/activate; module load PrgEnv-nvhpc; cd $HOME/.globus_compute/workshop-endpoint" + worker_init: "source /grand/alcf_training/workflows_2024/_env/bin/activate; module load PrgEnv-nvhpc; cd $HOME/.globus_compute/workshop-endpoint" - walltime: 00:05:00 + walltime: 00:30:00 nodes_per_block: 1 init_blocks: 0 min_blocks: 0 @@ -180,7 +181,7 @@ def hello_affinity(run_directory): os.chdir(os.path.expandvars(run_directory)) # This is the command that calls the compiled executable - command = f"/eagle/fallwkshp23/workflows/affinity_gpu/hello_affinity" + command = f"/grand/alcf_training/workflows_2024/GettingStarted/Examples/Polaris/affinity_gpu/hello_affinity" # This runs the application command res = subprocess.run(command.split(" "), stdout=subprocess.PIPE, stderr=subprocess.PIPE) diff --git a/workflows/globus_compute/polaris_config.yaml b/workflows/globus_compute/polaris_config.yaml index b37ac3b..f7ba82c 100644 --- a/workflows/globus_compute/polaris_config.yaml +++ b/workflows/globus_compute/polaris_config.yaml @@ -10,10 +10,6 @@ engine: prefetch_capacity: 0 # Increase if you have many more tasks than workers max_retries_on_system_failure: 2 - address: - type: address_by_interface - ifname: bond0 - strategy: simple job_status_kwargs: max_idletime: 300 diff --git a/workflows/parsl/5_mpi_app_example.py b/workflows/parsl/5_mpi_app_example.py new file mode 100644 index 0000000..74734a6 --- /dev/null +++ b/workflows/parsl/5_mpi_app_example.py @@ -0,0 +1,61 @@ +import parsl +import os +from parsl.config import Config +from parsl import bash_app +# PBSPro is the right provider for polaris: +from parsl.providers import PBSProProvider +# The MPIExecutor is for running MPI applications: +from parsl.executors import MPIExecutor +# Use the Simple launcher +from parsl.launchers import SimpleLauncher + +# We will save outputs in the current working directory +working_directory = os.getcwd() + +config = Config( + executors=[ + MPIExecutor( + max_workers_per_block=2, # Assuming 2 nodes per task + provider=PBSProProvider( + account="alcf_training", + worker_init=f"""source /grand/alcf_training/workflows_2024/_env/bin/activate; \ + cd {working_directory}""", + walltime="1:00:00", + queue="debug-scaling", + scheduler_options="#PBS -l filesystems=home:eagle:grand", + launcher=SimpleLauncher(), + select_options="ngpus=4", + nodes_per_block=4, + max_blocks=1, + cpus_per_node=64, + ), + ), + ] +) + +resource_specification = { + 'num_nodes': 2, # Number of nodes required for the application instance + 'ranks_per_node': 4, # Number of ranks / application elements to be launched per node + 'num_ranks': 8, # Number of ranks in total +} + +@bash_app +def mpi_hello_affinity(parsl_resource_specification, depth=8, stdout='mpi_hello.stdout', stderr='mpi_hello.stderr'): + # PARSL_MPI_PREFIX will resolve to `mpiexec -n 8 -ppn 4 -hosts NODE001,NODE002` + APP_DIR = "/grand/alcf_training/workflows_2024/GettingStarted/Examples/Polaris/affinity_gpu" + return f"$PARSL_MPI_PREFIX --cpu-bind depth --depth={depth} \ + {APP_DIR}/set_affinity_gpu_polaris.sh {APP_DIR}/hello_affinity" + +with parsl.load(config): + tasks = [] + for i in range(4): + tasks.append(mpi_hello_affinity(parsl_resource_specification=resource_specification, + stdout=f"{working_directory}/mpi_output/hello_{i}.stdout", + stderr=f"{working_directory}/mpi_output/hello_{i}.stderr")) + + # Wait on futures to return, and print results + for i, t in enumerate(tasks): + t.result() + with open(f"{working_directory}/mpi_output/hello_{i}.stdout", "r") as f: + print(f"Stdout of task {i}:") + print(f.read()) \ No newline at end of file diff --git a/workflows/parsl/README.md b/workflows/parsl/README.md index 3ead657..611eb7d 100644 --- a/workflows/parsl/README.md +++ b/workflows/parsl/README.md @@ -82,7 +82,33 @@ with open('hello-stdout', 'r') as f: To run many tasks in parallel, create futures that call your app. Once all futures have been created, only then wait on the results. ```python +import parsl +from parsl import python_app + +# Scripts adapted from Parsl docs +# https://parsl.readthedocs.io/en/stable/1-parsl-introduction.html + + +# App that generates a random number after a delay +@python_app +def generate(limit, delay): + from random import randint + import time + time.sleep(delay) + return randint(1, limit) + + +with parsl.load(): + # Generate 5 random numbers between 1 and 10 + rand_nums = [] + for i in range(5): + rand_nums.append(generate(10, i)) + + # Wait for all apps to finish and collect the results + outputs = [i.result() for i in rand_nums] + # Print results + print(outputs) ``` ## Running tasks with sequential dependencies (2_sequential_workflow.py) @@ -90,7 +116,40 @@ To run many tasks in parallel, create futures that call your app. Once all futu To create dependencies between tasks, make the creation of one future dependent on the result of another future. ```python +import parsl +from parsl import python_app, bash_app +from parsl.data_provider.files import File + +# Scripts adapted from Parsl docs +# https://parsl.readthedocs.io/en/stable/1-parsl-introduction.html + + +# App that generates a random number +@python_app +def generate(limit): + from random import randint + return randint(1, limit) + + +# App that writes a variable to a file +@bash_app +def save(variable, outputs=[]): + return 'echo %s &> %s' % (variable, outputs[0]) + + +with parsl.load(): + # Generate a random number between 1 and 10 + random = generate(10) + + # This call will make the script wait before continuing + print(f"Random number: {random.result()}") + + # Now, random has returned save the random number to a file + saved = save(random, outputs=[File("sequential-output.txt")]) +# Print the output file +with open(saved.outputs[0].result(), 'r') as f: + print('File contents: %s' % f.read()) ``` ## Creating tasks within tasks (3_dynamic_workflow.py) @@ -98,7 +157,38 @@ To create dependencies between tasks, make the creation of one future dependent Sometimes it is advantageous for an app to call another app. Apps that call other apps have a special type called a `join_app` (the join the results of other apps). Here is an example: ```python +import parsl +from parsl.app.app import join_app, python_app + +# Scripts adapted from Parsl docs +# https://parsl.readthedocs.io/en/stable/1-parsl-introduction.html + + +@python_app +def add(*args): + """Add all of the arguments together. If no arguments, then + zero is returned (the neutral element of +) + """ + accumulator = 0 + for v in args: + accumulator += v + return accumulator + + +@join_app +def fibonacci(n): + if n == 0: + return add() + elif n == 1: + return add(1) + else: + return add(fibonacci(n - 1), fibonacci(n - 2)) + + +with parsl.load(): + fib_series = fibonacci(10) + print(fib_series.result()) ``` # Parsl Configuration and Running on Polaris @@ -124,7 +214,66 @@ On Polaris, for cases where you wish to run one task per gpu (a common use case) The Config object below will run 4 workers at a time. These workers will be run on one Polaris node and each will access 1 GPU. ```python - +import os +from parsl.config import Config + +# PBSPro is the right provider for polaris: +from parsl.providers import PBSProProvider +# The high throughput executor is for scaling to HPC systems: +from parsl.executors import HighThroughputExecutor +# Use the MPI launcher +from parsl.launchers import MpiExecLauncher + +# These options will run work in 1 node batch jobs run one at a time +nodes_per_job = 1 +max_num_jobs = 1 + +# The config will launch workers from this directory +execute_dir = os.getcwd() + +polaris_config = Config( + executors=[ + HighThroughputExecutor( + # Ensures one worker per GPU + available_accelerators=4, + max_workers_per_node=4, + # Distributes threads to workers/GPUs in a way optimized for Polaris + cpu_affinity="list:24-31,56-63:16-23,48-55:8-15,40-47:0-7,32-39", + # Increase if you have many more tasks than workers + prefetch_capacity=0, + # Needed to avoid interactions between MPI and os.fork + provider=PBSProProvider( + # Project name + account="alcf_training", + # Submission queue + queue="HandsOnHPC", + # Commands run before workers launched + # Make sure to activate your environment where Parsl is installed + worker_init=f'''source /grand/alcf_training/workflows_2024/_env/bin/activate; + cd {execute_dir}''', + # Wall time for batch jobs + walltime="0:05:00", + # Change if data/modules located on other filesystem + scheduler_options="#PBS -l filesystems=home:eagle:grand", + # Ensures 1 manger per node and allows it to divide work to all 64 threads + launcher=MpiExecLauncher(bind_cmd="--cpu-bind", overrides="--ppn 1"), + # options added to #PBS -l select aside from ncpus + select_options="ngpus=4", + # Number of nodes per batch job + nodes_per_block=nodes_per_job, + # Minimum number of batch jobs running workflow + min_blocks=0, + # Maximum number of batch jobs running workflow + max_blocks=max_num_jobs, + # Threads per node + cpus_per_node=64, + ), + ), + ], + # How many times to retry failed tasks + # this is necessary if you have tasks that are interrupted by a batch job ending + retries=0, +) ``` ## Example: Run hello_affinity on Polaris compute nodes (4_hello_polaris.py) @@ -132,9 +281,106 @@ The Config object below will run 4 workers at a time. These workers will be run This script runs the application hello_affinity from our [GettingStarted](https://github.com/argonne-lcf/GettingStarted/tree/master/Examples/Polaris/affinity_gpu) repo. It reports GPU and CPU affinities. This script will run 4 instances of hello_affinity, one on each GPU of a polaris compute node in parallel. It will create a batch job and block until the task futures are fulfilled by workers on the compute node. ```python +import parsl +import os +from parsl import bash_app +from config import polaris_config +# We will save outputs in the current working directory +working_directory = os.getcwd() + +# Application that reports which worker affinities +@bash_app +def hello_affinity(stdout='hello.stdout', stderr='hello.stderr'): + return '/grand/alcf_training/workflows_2024/GettingStarted/Examples/Polaris/affinity_gpu/hello_affinity' + +# Load config for polaris +with parsl.load(polaris_config): + + # Create futures calling 'hello_affinity', store them in list 'tasks' + tasks = [] + for i in range(4): + tasks.append(hello_affinity(stdout=f"{working_directory}/output/hello_{i}.stdout", + stderr=f"{working_directory}/output/hello_{i}.stderr")) + + # Wait on futures to return, and print results + for i, t in enumerate(tasks): + t.result() + with open(f"{working_directory}/output/hello_{i}.stdout", "r") as f: + print(f"Stdout of task {i}:") + print(f.read()) + + # Workflow complete! + print("Hello tasks completed") ``` +## Example: Run hello_affinity as an MPI application (5_mpi_app_example.py) + +In the previous example, `mpiexec` was used as a launcher, rather than an executor. In order to run applications with MPI applications, `mpi` has to be used a different way by Parsl. To run MPI applications, use the `SimpleLauncher` and the `MPIExecutor`. Note that a the configuration has to set `max_workers_per_block` to align with the resource needs of the application. To run applications with different node numbers a different `Config` object is needed. + +```python +import parsl +import os +from parsl.config import Config +from parsl import bash_app +# PBSPro is the right provider for polaris: +from parsl.providers import PBSProProvider +# The MPIExecutor is for running MPI applications: +from parsl.executors import MPIExecutor +# Use the Simple launcher +from parsl.launchers import SimpleLauncher + +# We will save outputs in the current working directory +working_directory = os.getcwd() + +config = Config( + executors=[ + MPIExecutor( + max_workers_per_block=2, # Assuming 2 nodes per task + provider=PBSProProvider( + account="alcf_training", + worker_init=f"""source /grand/alcf_training/workflows_2024/_env/bin/activate; \ + cd {working_directory}""", + walltime="1:00:00", + queue="HandsOnHPCScale", + scheduler_options="#PBS -l filesystems=home:eagle:grand", + launcher=SimpleLauncher(), + select_options="ngpus=4", + nodes_per_block=4, + max_blocks=1, + cpus_per_node=64, + ), + ), + ] +) + +resource_specification = { + 'num_nodes': 2, # Number of nodes required for the application instance + 'ranks_per_node': 4, # Number of ranks / application elements to be launched per node + 'num_ranks': 8, # Number of ranks in total +} + +@bash_app +def mpi_hello_affinity(parsl_resource_specification, depth=8, stdout='mpi_hello.stdout', stderr='mpi_hello.stderr'): + # PARSL_MPI_PREFIX will resolve to `mpiexec -n 8 -ppn 4 -hosts NODE001,NODE002` + APP_DIR = "/grand/alcf_training/workflows_2024/GettingStarted/Examples/Polaris/affinity_gpu" + return f"$PARSL_MPI_PREFIX --cpu-bind depth --depth={depth} \ + {APP_DIR}/set_affinity_gpu_polaris.sh {APP_DIR}/hello_affinity" + +with parsl.load(config): + tasks = [] + for i in range(4): + tasks.append(mpi_hello_affinity(parsl_resource_specification=resource_specification, + stdout=f"{working_directory}/mpi_output/hello_{i}.stdout", + stderr=f"{working_directory}/mpi_output/hello_{i}.stderr")) + + # Wait on futures to return, and print results + for i, t in enumerate(tasks): + t.result() + with open(f"{working_directory}/mpi_output/hello_{i}.stdout", "r") as f: + print(f"Stdout of task {i}:") + print(f.read()) +``` # A Note on Running a Parsl enabled script As you have seen, a Parsl script will not return until all futures have been fulfilled. The time this takes can depend on queue times and the overall runtime of the workflow. Combined, this can be many hours. It is therefore recommended to run Parsl scripts in a [screen](https://linuxize.com/post/how-to-use-linux-screen/) session or with [NoMachine](https://www.nomachine.com).