-
Notifications
You must be signed in to change notification settings - Fork 0
/
comparison.py
171 lines (147 loc) · 6.9 KB
/
comparison.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
import multiprocessing
import os
import time
import optimus_env
import tetris_env
import trace
from typing import List, Tuple
import drf_env
import srtf_env
import fifo_env
import parameters as pm
import log
def drf(job_trace=None) :
if job_trace is None:
job_trace = trace.Trace(None).get_trace()
env = drf_env.DRF_Env("DRF", job_trace, None)
while not env.end:
env.step()
print("progress: drf finished ")
#print(multiprocessing.current_process().name + '-' + str(env.get_results()) + str(env.get_job_jcts().values()))
return [env.get_results(), list(env.get_job_jcts().values())]
def srtf(job_trace=None):
if job_trace is None:
job_trace = trace.Trace(None).get_trace()
env = srtf_env.SRTF_Env("SRTF", job_trace, None)
while not env.end:
env.step()
print("progress: srtf finished ")
#print(multiprocessing.current_process().name + '-' + str(env.get_results()) + str(env.get_job_jcts().values()))
return [env.get_results(), list(env.get_job_jcts().values())]
def fifo(job_trace=None) :
if job_trace is None:
job_trace = trace.Trace(None).get_trace()
env = fifo_env.FIFO_Env("FIFO", job_trace, None)
while not env.end:
env.step()
print("progress: fifo finished ")
#print(multiprocessing.current_process().name + '-' + str(env.get_results()) + str(env.get_job_jcts().values()))
return [env.get_results(), list(env.get_job_jcts().values())]
def tetris(job_trace=None) :
if job_trace is None:
job_trace = trace.Trace(None).get_trace()
env = tetris_env.Tetris_Env("Tetris", job_trace, None)
while not env.end:
env.step()
return [env.get_results(), list(env.get_job_jcts().values())]
def optimus(job_trace=None) :
if job_trace is None:
job_trace = trace.Trace(None).get_trace()
env = optimus_env.Optimus_Env("Optimus", job_trace, None)
while not env.end:
env.step()
return [env.get_results(), list(env.get_job_jcts().values())]
# 你自定义的error_callback函数
def print_error(value):
print("error: ", value)
def compare(traces, logger, debug=False) -> List[Tuple]:
if debug:
drf(traces[0])
srtf(traces[0])
fifo(traces[0])
tetris(traces[0])
optimus(traces[0])
f = open("DRF_JCTs.txt", 'w')
f.close()
num_schedulers = 5
thread_list = [[] for i in range(num_schedulers)] # a two dimension matrix
tic = time.time()
pool = multiprocessing.Pool(processes=40)
print("pool ")
for i in range(len(traces)): # one example takes about 10s
thread_list[0].append(pool.apply_async(drf, args=(traces[i],), error_callback=print_error))
thread_list[1].append(pool.apply_async(srtf, args=(traces[i],), error_callback=print_error))
thread_list[2].append(pool.apply_async(fifo, args=(traces[i],), error_callback=print_error))
thread_list[3].append(pool.apply_async(tetris, args=(traces[i],), error_callback=print_error))
thread_list[4].append(pool.apply_async(optimus, args=(traces[i],), error_callback=print_error))
pool.close()
print("closed pool")
pool.join()
print("finished all task")
jct_list = [[] for i in range(num_schedulers)] # a two dimension matrix
makespan_list = [[] for i in range(num_schedulers)]
reward_list = [[] for i in range(num_schedulers)]
for i in range(num_schedulers):
for j in range(len(thread_list[i])):
result, jcts = thread_list[i][j].get()
print(jcts)
if i == 0: # DRF
with open("DRF_JCTs.txt", 'a') as f:
f.write(" ".join('%s' % u for u in jcts) + '\n')
num_jobs, jct, makespan, reward = result
jct_list[i].append(jct)
makespan_list[i].append(makespan)
reward_list[i].append(reward)
toc = time.time()
logger.info("---------------------------------------------------------------")
logger.info("progress: finish testing " + str(len(traces)) + " traces within " + str(int(toc - tic)) + " seconds")
logger.info("Average JCT: DRF " + '%.3f' % (sum(jct_list[0]) / len(jct_list[0])) + " SRTF " + \
'%.3f' % (sum(jct_list[1]) / len(jct_list[1])) + " FIFO " + '%.3f' % (
sum(jct_list[2]) / len(jct_list[2])) \
+ " Tetris " + '%.3f' % (sum(jct_list[3]) / len(jct_list[3])) + " Optimus " + '%.3f' % (
sum(jct_list[4]) / len(jct_list[4])))
logger.info("Average Makespan: DRF " + '%.3f' % (1.0 * sum(makespan_list[0]) / len(makespan_list[0])) + \
" SRTF " + '%.3f' % (1.0 * sum(makespan_list[1]) / len(makespan_list[1])) + \
" FIFO " + '%.3f' % (1.0 * sum(makespan_list[2]) / len(makespan_list[2])) + " Tetris " + '%.3f' % (
1.0 * sum(makespan_list[3]) / len(makespan_list[3])) + " Optimus " + '%.3f' % (
sum(makespan_list[4]) / len(makespan_list[4])))
logger.info("Average Reward: DRF " + '%.3f' % (1.0 * sum(reward_list[0]) / len(reward_list[0])) + \
" SRTF " + '%.3f' % (1.0 * sum(reward_list[1]) / len(reward_list[1])) + \
" FIFO " + '%.3f' % (1.0 * sum(reward_list[2]) / len(reward_list[2])) + " Tetris " + '%.3f' % (
1.0 * sum(reward_list[3]) / len(reward_list[3])) + " Optimus " + '%.3f' % (
sum(reward_list[4]) / len(reward_list[4])))
stats = [() for i in range(num_schedulers)]
for i in range(num_schedulers):
jct = 1.0 * sum(jct_list[i]) / len(jct_list[i])
makespan = 1.0 * sum(makespan_list[i]) / len(makespan_list[i])
reward = 1.0 * sum(reward_list[i]) / len(reward_list[i])
stats[i] = (jct, makespan, reward)
if pm.EXPERIMENT_NAME is not None:
LOG_DIR = "./" + pm.EXPERIMENT_NAME + "/"
os.system("rm -rf " + LOG_DIR)
os.system("mkdir -p " + LOG_DIR + "; cp *.py *.txt " + LOG_DIR)
f = open(LOG_DIR + "rl_validation.txt", 'a')
tags_prefix = ["DRF", "SRTF", "FIFO", "Tetris", "Optimus"]
assert len(tags_prefix) == len(stats)
for i in range(len(stats)):
if pm.HEURISTIC == tags_prefix[i]:
jct, makespan, reward = stats[i]
f.write(pm.HEURISTIC + " 0: " + str(jct) + " " + str(makespan) + " " + str(reward) + "\n")
f.close()
return stats
def main():
os.system("rm -f *.log")
logger = log.getLogger(name="comparison", level="DEBUG")
num_traces = 10
traces = []
for i in range(num_traces):
job_trace = trace.Trace(None).get_trace()
traces.append(job_trace)
compare(traces, logger, False)
if __name__ == '__main__':
main()
'''
comparison.py:74 INFO: Average JCT: DRF 5.900 SRTF 8.132 FIFO 8.203 Tetris 9.606
comparison.py:78 INFO: Average Makespan: DRF 29.207 SRTF 36.991 FIFO 37.221 Tetris 36.204
comparison.py:82 INFO: Average Reward: DRF 2.063 SRTF 1.633 FIFO 1.623 Tetris 1.668
'''