-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathparallel.py
executable file
·130 lines (96 loc) · 3.18 KB
/
parallel.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
"""
:mod:`parallel` -- base parallel processing functions
=====================================================
.. module:: parallel
:synopsis: Provides a couple of base functions that set up a
parallel processing environment. There are only a
small number of parallel processing functions
required for TCRM, so we only need to ensure
those functions are available, either as
the real thing or, if the required modules are
not available, dummy functions that pass straight
through.
We base our parallel processing on :term:`mpi4py`
.. moduleauthor: Craig Arthur, <[email protected]>
"""
from functools import wraps
class DummyStatus(object):
"""
A dummy `Status` class that provides a placeholder
for the methods that are used to control processing
in parallel implementation
"""
def __init__(self):
self.source = 0
self.tag = -1
self.error = 0
class DummyCommWorld(object):
"""
A dummy COMM_WORLD class that provides the bare
essential methods for running the code. This is used
for basic parallelisation (task distribution).
This is returned only if mpi4py raises an ImportError or
ModuleNotFoundError.
"""
def __init__(self):
self._rank = 0
self._size = 1
self._name = 'DummyCommWorld'
@property
def name(self):
return self._name
@property
def rank(self):
return self._rank
@property
def size(self):
return self._size
def Get_size(self):
return self._size
def Get_rank(self):
return self._rank
def barrier(self):
pass
def finalize(self):
pass
def attemptParallel():
"""
Attempt to load `mpi4py.MPI` globally as `MPI`. If mpi4py loads
successfully, then a call to `MPI.Finalize` is registered to be
called at exit of the Python interpreter. This is to ensure that
MPI exits cleanly.
If `mpi4py.MPI` cannot be loaded then a dummy `mpi4py.MPI` is created.
:returns: An `mpi4py.MPI` object - either a dummy or the real thing
"""
global MPI
try:
# load mpi4py for everyone
from mpi4py import MPI
except (ImportError, ModuleNotFoundError):
# no mpi4py, create a dummy version of COMM_WORLD and
# additional methods and attributes
class DummyMPI(object):
def __init__(self):
self.COMM_WORLD = DummyCommWorld()
self.Status = DummyStatus()
self.ANY_SOURCE = -1
def Init(self):
pass
def Finalize(self):
pass
MPI = DummyMPI()
return MPI
def disableOnWorkers(f):
"""
Decorator to disable function `f` calculation on workers.
The function will only be evaluated on the master thread.
:param f: Function to be wrapped
:type f: function
"""
@wraps(f)
def wrap(*args, **kwargs):
if MPI.COMM_WORLD.size > 1 and MPI.COMM_WORLD.rank > 0:
return
else:
return f(*args, **kwargs)
return wrap