-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsympyTesting2.py
218 lines (182 loc) · 6.25 KB
/
sympyTesting2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sympy as sy
from blockops.problem import BlockProblem
# Dummy problem
prob = BlockProblem(1, 1, 3, 1, 'BE')
prob.setApprox('BE')
prob.setCoarseLevel(1)
algo = prob.getBlockIteration('PFASST')
B00 = algo.blockCoeffs[(0, 0)].symbol
B01 = algo.blockCoeffs[(0, 1)].symbol
B10 = algo.blockCoeffs[(1, 0)].symbol
u00, u01, u10 = sy.symbols('u_0^0, u_0^1, u_1^0', commutative=False)
deps = {u00, u01, u10}
expr = B00*u00 + B01*u01 + B10*u10
# Expression from which we should extract the tasks
e1 = expr.expand()
verbose = True
# -----------------------------------------------------------------------------
# Stage 1 : factorize expression into a dictionnary
# -----------------------------------------------------------------------------
# Utility aliases
Add = sy.core.add.Add
Mul = sy.core.mul.Mul
Pow = sy.core.power.Pow
Symbol = sy.core.symbol.Symbol
def getLeadingTerm(expr: Mul):
"""Decompose a multiplication into its leading term and the rest"""
if expr.args[0] != -1:
# Non-negative term
leading = expr.args[0]
rest = Mul(*expr.args[1:])
else:
# Negative term with leading -1
if len(expr.args) == 2:
# Just minus one term
leading, rest = expr.args
else:
# Minus of several multiplicated terms
leading = expr.args[1]
rest = Mul(-1, *expr.args[2:])
return leading, rest
def decomposeAddition(expr: Add, dico: dict):
"""Decompose an addition into a dictionnary with leading terms as key"""
for term in expr.args:
if type(term) == Symbol:
dico[term] = 1
elif type(term) == Mul:
leading, rest = getLeadingTerm(term)
try:
dico[leading] += rest
except KeyError:
dico[leading] = rest
else:
raise ValueError('got neither Symbol nor Mul')
return dico
def expandTree(dico: dict):
"""Expand an operation tree stored into a dictionnary"""
for leading, rest in dico.items():
if rest == 1 or type(rest) == Symbol:
continue
if type(rest) == Mul:
l, r = getLeadingTerm(rest)
dico[leading] = {l: r}
elif type(rest) == Add:
subDico = decomposeAddition(rest, {})
expandTree(subDico)
dico[leading] = subDico
else:
raise ValueError('got neither Add nor Mul')
dico = decomposeAddition(e1, {}) # Note : suppose that e1 is an addition ...
expandTree(dico)
# --> end of stage 1
if verbose:
print('Factorized dictionnary :')
print(' -- note : (+) are additions, (x) are multiplications')
# Helping function for printing
def printFacto(dico: dict, tab=0):
indent = " "*3*tab+"(+)"
for key, val in dico.items():
if type(val) == dict:
print(f'{indent} {key} (x)')
printFacto(val, tab+1)
else:
print(f'{indent} {key} (x) {val}')
printFacto(dico)
# -----------------------------------------------------------------------------
# Stage 2 : generate tasks from factorized dictionnary
# -----------------------------------------------------------------------------
class Counter(object):
"""Helping class to be used as a counter (value stored in n attribute)"""
def __init__(self):
self.n = 0
def increment(self):
"""Increment counter value by one"""
self.n += 1
def __str__(self):
return str(self.n)
class TasksPool(object):
"""Helping class to store the description of the tasks"""
def __init__(self):
self.counter = Counter()
self.tasks = {}
self.results = {}
def addTask(self, ope, inp, dep):
"""
Add a task to the pool, considering one operator, one input,
and a task dependency.
Parameters
----------
ope : Symbol
The operator used for this task.
inp : Expression
The input of this task (full expression).
dep : sympy expression
The dependencies for this task, in function of other tasks or
block variables.
Returns
-------
task : Symbol
The symbol fo this task (T0, T1, ...).
res : Expression
The result of this task (full expression).
"""
res = (ope*inp).simplify()
if res in self.results:
# Task already in pool
task = self.results[res]
elif -res in self.results:
# Negative of the task already in pool
task = -self.results[-res]
else:
# Task not in pool, create and add it
task = sy.symbols(f'T{self.counter}', commutative=False)
# print(task, ':', ope, '--', inp)
self.tasks[task] = (ope, inp, dep)
self.results[res] = task
self.counter.increment()
return task, res
# To store the tasks
pool = TasksPool()
def createTasks(dico: dict):
"""
Function extracting the tasks from a factorized dictionnary
Parameters
----------
dico : dict
Dictionnary containig a factorized expression
Returns
-------
res : Expression
Full expression for the result of the dictionnary.
dep : Expression
Expression for the result in function of tasks.
"""
res = 0
dep = 0
for ope, inp in dico.items():
if inp == 1:
res = res + ope
dep = res + ope
elif type(inp) is dict:
t, r = pool.addTask(ope, *createTasks(inp))
res = res + r
dep = dep + t
elif type(inp) is Symbol:
t, r = pool.addTask(ope, inp, inp)
res = res + r
dep = dep + t
else:
raise ValueError('AAAAAAAAAAAAAAAAHHHHHH')
return res, dep
createTasks(dico)
# --> end of stage 2
if verbose:
print('List of tasks :')
for task, descr in pool.tasks.items():
print(' --', task)
print(' -- operator :', descr[0])
print(' -- input :', descr[1])
print(' -- dependency :', descr[2])
print(' -- result :', descr[0]*descr[1])