Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update examples #16

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 44 additions & 21 deletions docs/src/examples/pmapreduce.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# Example of the use of pmapreduce

## Using [ClusterManagers.jl](https://github.com/JuliaParallel/ClusterManagers.jl)

The function `pmapreduce` performs a parallel `mapreduce`. This is primarily useful when the function has to perform an expensive calculation, that is the evaluation time per core exceeds the setup and communication time. This is also useful when each core is allocated memory and has to work with arrays that won't fit into memory collectively, as is often the case on a cluster.

We walk through an example where we initialize and concatenate arrays in serial and in parallel.
Expand All @@ -11,13 +13,15 @@ using ParallelUtilities
using Distributed
```

We define the function that performs the initialization on each core. This step is embarassingly parallel as no communication happens between workers. We simulate an expensive calculation by adding a sleep interval for each index.
We define the function that performs the initialization on each core. This step is embarassingly parallel as no communication happens between workers.

```julia
function initialize(sleeptime)
A = Array{Int}(undef, 20, 20)
function initialize(x, n)
inds = 1:n
d, r = divrem(length(inds), nworkers())
ninds_local = d + (x <= r)
A = zeros(Int, 50, ninds_local)
for ind in eachindex(A)
sleep(sleeptime)
A[ind] = ind
end
return A
Expand All @@ -27,48 +31,67 @@ end
Next we define the function that calls `pmapreduce`:

```julia
function main_pmapreduce(sleeptime)
pmapreduce(x -> initialize(sleeptime), hcat, 1:20)
function mapreduce_parallel(n)
pmapreduce(x -> initialize(x, n), hcat, 1:nworkers())
end
```

We also define a function that carries out a serial mapreduce:

```julia
function main_mapreduce(sleeptime)
mapreduce(x -> initialize(sleeptime), hcat, 1:20)
function mapreduce_serial(n)
mapreduce(x -> initialize(x, n), hcat, 1:nworkers())
end
```

We compare the performance of the serial and parallel evaluations using 20 cores on one node:
We compare the performance of the distributed for loop and the parallel mapreduce using `3` nodes with `28` cores on each node.

We define a caller function first

```julia
function compare_with_serial()
# precompile
main_mapreduce(0)
main_pmapreduce(0)
mapreduce_serial(1)
mapreduce_parallel(nworkers())

# time
println("Tesing serial")
A = @time main_mapreduce(5e-6)
println("Tesing parallel")
B = @time main_pmapreduce(5e-6)
n = 2_000_000
println("Tesing serial mapreduce")
A = @time mapreduce_serial(n)
println("Tesing pmapreduce")
B = @time mapreduce_parallel(n)

# check results
println("Results match : ", A == B)
end
```

We run this caller on the cluster:
```julia
julia> compare_with_serial()
Tesing serial
9.457601 seconds (40.14 k allocations: 1.934 MiB)
Tesing parallel
0.894611 seconds (23.16 k allocations: 1.355 MiB, 2.56% compilation time)
```console
Tesing serial mapreduce
23.986976 seconds (8.26 k allocations: 30.166 GiB, 11.71% gc time, 0.02% compilation time)
Tesing pmapreduce
7.465366 seconds (29.55 k allocations: 764.166 MiB)
Results match : true
```

In this case the the overall gain is only around a factor of `3`. In general a parallel mapreduce is advantageous if the time required to evaluate the function far exceeds that required to communicate across workers.

The time required for a `@distributed` `for` loop is unfortunately exceedingly high for it to be practical here.

The full script may be found in the examples directory.

## Using [MPIClusterManagers.jl](https://github.com/JuliaParallel/MPIClusterManagers.jl)

The same script may also be used by initiating an MPI cluster (the cluster in this case has 77 workers + 1 master process). This leads to the timings

```console
Using MPI_TRANSPORT_ALL
Tesing serial mapreduce
22.263389 seconds (8.07 k allocations: 29.793 GiB, 11.70% gc time, 0.02% compilation time)
Tesing pmapreduce
11.374551 seconds (65.92 k allocations: 2.237 GiB, 0.46% gc time)
Results match : true
```

The performance is worse in this case than that obtained using `ClusterManagers.jl`.
2 changes: 1 addition & 1 deletion docs/src/examples/sharedarrays.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ using SharedArrays
using Distributed
```

We create a function to initailize the local part on each worker. In this case we simulate a heavy workload by adding a `sleep` period. In other words we assume that the individual elements of the array are expensive to evaluate. We obtain the local indices of the `SharedArray` through the function `localindices`.
We create a function to initailize the local part on each worker. In this case we simulate a heavy workload by adding a `sleep` period. In other words we assume that the individual elements of the array are expensive to evaluate. We obtain the local indices of the `SharedArray` through the function `localindices` to split the load among workers.

```julia
function initialize_localpart(s, sleeptime)
Expand Down
72 changes: 31 additions & 41 deletions docs/src/examples/threads.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ We create a function to initailize the local part on each worker. In this case w

```julia
function initializenode_threads(sleeptime)
s = zeros(Int, 2_000)
s = zeros(Int, 5_000)
Threads.@threads for ind in eachindex(s)
sleep(sleeptime)
s[ind] = ind
Expand All @@ -22,37 +22,21 @@ function initializenode_threads(sleeptime)
end
```

We create a main function that runs on the calling process and launches the array initialization task on each node. This is run on a `WorkerPool` consisting of one worker per node which acts as the root process. We may obtain such a pool through the function `ParallelUtilities.workerpool_nodes()`. The array creation step on each node is followed by an eventual concatenation.
We create a main function that runs on the calling process and launches the array initialization task on each node. The array creation step on each node is followed by an eventual concatenation.

```julia
function main_threads(sleeptime)
# obtain the workerpool with one process on each node
pool = ParallelUtilities.workerpool_nodes()

# obtain the number of workers in the pool.
nw_nodes = nworkers(pool)

# Evaluate the parallel mapreduce
pmapreduce(x -> initializenode_threads(sleeptime), hcat, pool, 1:nw_nodes)
function pmapreduce_threads(sleeptime)
pmapreduce(x -> initializenode_threads(sleeptime), hcat, 1:nworkers())
end
```

We compare the results with a serial execution that uses a similar workflow, except we use `mapreduce` instead of `pmapreduce` and do not use threads.
We compare the results with
* a `mapreduce` that uses a similar workflow, except the operation takes place entirely on one node
* a `@distributed` mapreduce, where the evaluation is spread across nodes.

```julia
function initialize_serial(sleeptime)
s = zeros(Int, 2_000)
for ind in eachindex(s)
sleep(sleeptime)
s[ind] = ind
end
return s
end

function main_serial(sleeptime)
pool = ParallelUtilities.workerpool_nodes()
nw_nodes = nworkers(pool)
mapreduce(x -> initialize_serial(sleeptime), hcat, 1:nw_nodes)
function mapreduce_threads(sleeptime)
mapreduce(x -> initializenode_threads(sleeptime), hcat, 1:nworkers())
end
```

Expand All @@ -61,28 +45,34 @@ We create a function to compare the performance of the two. We start with a prec
```julia
function compare_with_serial()
# precompile
main_serial(0)
main_threads(0)

mapreduce_threads(0)
mapreduce_distributed_threads(0)
pmapreduce_threads(0)
# time
println("Testing serial")
A = @time main_serial(5e-3);
println("Testing threads")
B = @time main_threads(5e-3);

println("Results match : ", A == B)
sleeptime = 1e-2
println("Testing threaded mapreduce")
A = @time mapreduce_threads(sleeptime);
println("Testing threaded+distributed mapreduce")
B = @time mapreduce_distributed_threads(sleeptime);
println("Testing threaded pmapreduce")
C = @time pmapreduce_threads(sleeptime);

println("Results match : ", A == B == C)
end
```

We run this script on a Slurm cluster across 2 nodes with 28 cores on each node. The results are:

```julia
julia> compare_with_serial()
Testing serial
24.601593 seconds (22.49 k allocations: 808.266 KiB)
Testing threads
0.666256 seconds (3.71 k allocations: 201.703 KiB)
```console
Testing threaded mapreduce
4.161118 seconds (66.27 k allocations: 2.552 MiB, 0.95% compilation time)
Testing threaded+distributed mapreduce
2.232924 seconds (48.64 k allocations: 2.745 MiB, 3.20% compilation time)
Testing threaded pmapreduce
2.432104 seconds (6.79 k allocations: 463.788 KiB, 0.44% compilation time)
Results match : true
```

The full script may be found in the examples directory.
We see that there is little difference in evaluation times between the `@distributed` reduction and `pmapreduce`, both of which are roughly doubly faster than the one-node evaluation.

The full script along with the Slurm jobscript may be found in the examples directory.
12 changes: 12 additions & 0 deletions examples/mpiclustermanager_mpitransport.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using MPIClusterManagers
import MPI
using Distributed

# This uses MPI to communicate with the workers
mgr = MPIClusterManagers.start_main_loop(MPI_TRANSPORT_ALL)

@everywhere include(joinpath(@__DIR__, "pmapreduce.jl"))
println("Using MPI_TRANSPORT_ALL")
PMapReduceTiming.compare_with_serial()

MPIClusterManagers.stop_main_loop(mgr)
10 changes: 10 additions & 0 deletions examples/mpijobscript.slurm
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#!/bin/bash

#SBATCH -n 78
#SBATCH --job-name=mpitest
#SBATCH --time=00:05:00
#SBATCH -e mpitest.err
#SBATCH -o mpitest.out

juliaexe=$SCRATCH/julia/julia-1.7.0-rc2/bin/julia
mpirun $juliaexe --startup=no mpiclustermanager_mpitransport.jl
29 changes: 16 additions & 13 deletions examples/pmapreduce.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,36 @@ module PMapReduceTiming
using ParallelUtilities
using Distributed

function initialize(sleeptime)
A = Array{Int}(undef, 20, 20)
function initialize(x, n)
inds = 1:n
d, r = divrem(length(inds), nworkers())
ninds_local = d + (x <= r)
A = zeros(Int, 50, ninds_local)
for ind in eachindex(A)
sleep(sleeptime)
A[ind] = ind
end
return A
end

function main_mapreduce(sleeptime)
mapreduce(x -> initialize(sleeptime), hcat, 1:20)
function mapreduce_serial(n)
mapreduce(x -> initialize(x, n), hcat, 1:nworkers())
end

function main_pmapreduce(sleeptime)
pmapreduce(x -> initialize(sleeptime), hcat, 1:20)
function mapreduce_parallel(n)
pmapreduce(x -> initialize(x, n), hcat, 1:nworkers())
end

function compare_with_serial()
# precompile
main_mapreduce(0)
main_pmapreduce(0)
mapreduce_serial(1)
mapreduce_parallel(nworkers())

# time
println("Tesing serial")
A = @time main_mapreduce(5e-6)
println("Tesing parallel")
B = @time main_pmapreduce(5e-6)
n = 2_000_000
println("Tesing serial mapreduce")
A = @time mapreduce_serial(n)
println("Tesing pmapreduce")
B = @time mapreduce_parallel(n)

# check results
println("Results match : ", A == B)
Expand Down
6 changes: 6 additions & 0 deletions examples/pmapreducejobscript.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
using ClusterManagers
job_file_loc = mktempdir(@__DIR__)
addprocs_slurm(78, exeflags=["--startup=no"], job_file_loc = job_file_loc)
using Distributed
@everywhere include(joinpath(@__DIR__, "pmapreduce.jl"))
PMapReduceTiming.compare_with_serial()
10 changes: 10 additions & 0 deletions examples/pmapreducejobscript.slurm
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#!/bin/bash

#SBATCH -n 78
#SBATCH --job-name=threadstest
#SBATCH --time=00:10:00
#SBATCH -e pmapreducetest.err
#SBATCH -o pmapreducetest.out

juliaexe=$SCRATCH/julia/julia-1.7.0-rc2/bin/julia
$juliaexe --startup=no pmapreducejobscript.jl
49 changes: 23 additions & 26 deletions examples/threads.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3,47 +3,44 @@ module ThreadsTiming
using ParallelUtilities
using Distributed

function initialize_serial(sleeptime)
s = zeros(Int, 2_000)
for ind in eachindex(s)
sleep(sleeptime)
s[ind] = ind
end
return s
end

function initializenode_threads(sleeptime)
s = zeros(Int, 2_000)
s = zeros(Int, 5_000)
Threads.@threads for ind in eachindex(s)
sleep(sleeptime)
s[ind] = ind
end
return s
end

function main_threads(sleeptime)
workers_node_pool = ParallelUtilities.workerpool_nodes()
nw_nodes = nworkers(workers_node_pool)
pmapreduce(x -> initializenode_threads(sleeptime), hcat, workers_node_pool, 1:nw_nodes)
function mapreduce_threads(sleeptime)
mapreduce(x -> initializenode_threads(sleeptime), hcat, 1:nworkers())
end

function main_serial(sleeptime)
workers_node_pool = ParallelUtilities.workerpool_nodes()
nw_nodes = nworkers(workers_node_pool)
mapreduce(x -> initialize_serial(sleeptime), hcat, 1:nw_nodes)
function mapreduce_distributed_threads(sleeptime)
@distributed hcat for _ in 1:nworkers()
initializenode_threads(sleeptime)
end
end

function pmapreduce_threads(sleeptime)
pmapreduce(x -> initializenode_threads(sleeptime), hcat, 1:nworkers())
end

function compare_with_serial()
# precompile
main_serial(0)
main_threads(0)
mapreduce_threads(0)
mapreduce_distributed_threads(0)
pmapreduce_threads(0)
# time
println("Testing serial")
A = @time main_serial(5e-3);
println("Testing threads")
B = @time main_threads(5e-3);

println("Results match : ", A == B)
sleeptime = 1e-2
println("Testing threaded mapreduce")
A = @time mapreduce_threads(sleeptime);
println("Testing threaded+distributed mapreduce")
B = @time mapreduce_distributed_threads(sleeptime);
println("Testing threaded pmapreduce")
C = @time pmapreduce_threads(sleeptime);

println("Results match : ", A == B == C)
end

end
6 changes: 6 additions & 0 deletions examples/threadsjobscript.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
using ClusterManagers
job_file_loc = mktempdir(@__DIR__)
addprocs_slurm(2, exeflags=["-t 28", "--startup=no"], job_file_loc = job_file_loc)
using Distributed
@everywhere include(joinpath(@__DIR__, "threads.jl"))
ThreadsTiming.compare_with_serial()
Loading