Skip to content

Commit c73c3eb

Browse files
authored
add module purge, improve envs and upload oss path with random address. (#155)
* add envs , module purge and lebesgue context * fix format * fix backup * add module_purge to test * fix test case * refract backup * fix unexpected lebesgue job status * change logger method to exception * fix dlog * fix type
1 parent e3e4513 commit c73c3eb

File tree

7 files changed

+92
-12
lines changed

7 files changed

+92
-12
lines changed

dpdispatcher/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,14 @@
4646
from .pbs import Torque
4747
from .shell import Shell
4848
from .lsf import LSF
49-
from .dp_cloud_server import DpCloudServer
49+
from .dp_cloud_server import DpCloudServer, Lebesgue
5050
from .distributed_shell import DistributedShell
5151
from .machine import Machine
5252

5353
from .lazy_local_context import LazyLocalContext
5454
from .local_context import LocalContext
5555
from .ssh_context import SSHContext
56-
from .dp_cloud_server_context import DpCloudServerContext
56+
from .dp_cloud_server_context import DpCloudServerContext, LebesgueContext
5757
from .hdfs_context import HDFSContext
5858

5959
def info():

dpdispatcher/distributed_shell.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@ def gen_script_env(self, job):
4646
source_files_part = ""
4747

4848
module_unload_part = ""
49+
module_purge = job.resources.module_purge
50+
if module_purge:
51+
module_unload_part += "module purge\n"
4952
module_unload_list = job.resources.module_unload_list
5053
for ii in module_unload_list:
5154
module_unload_part += f"module unload {ii}\n"
@@ -63,7 +66,11 @@ def gen_script_env(self, job):
6366
export_envs_part = ""
6467
envs = job.resources.envs
6568
for k, v in envs.items():
66-
export_envs_part += f"export {k}={v}\n"
69+
if isinstance(v, list):
70+
for each_value in v:
71+
export_envs_part += f"export {k}={each_value}\n"
72+
else:
73+
export_envs_part += f"export {k}={v}\n"
6774

6875
flag_if_job_task_fail = job.job_hash + '_flag_if_job_task_fail'
6976

dpdispatcher/dp_cloud_server.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import time
77
import warnings
88
import os
9+
import uuid
910

1011
shell_script_header_template="""
1112
#!/bin/bash -l
@@ -60,10 +61,24 @@ def _gen_backward_files_list(self, job):
6061
result_file_list.extend([ os.path.join(task.task_work_path,b_f) for b_f in task.backward_files])
6162
return result_file_list
6263

64+
def _gen_oss_path(self, job, zip_filename):
65+
if hasattr(job, 'upload_path') and job.upload_path:
66+
return job.upload_path
67+
else:
68+
program_id = self.context.remote_profile.get('program_id')
69+
if program_id is None:
70+
dlog.info("can not find program id in remote profile, upload to default program id.")
71+
program_id = 0
72+
uid = uuid.uuid4()
73+
path = os.path.join("program", str(program_id), str(uid), zip_filename)
74+
setattr(job, 'upload_path', path)
75+
return path
76+
6377
def do_submit(self, job):
6478
self.gen_local_script(job)
6579
zip_filename = job.job_hash + '.zip'
66-
oss_task_zip = 'indicate/' + job.job_hash + '/' + zip_filename
80+
# oss_task_zip = 'indicate/' + job.job_hash + '/' + zip_filename
81+
oss_task_zip = self._gen_oss_path(job, zip_filename)
6782
job_resources = ALI_OSS_BUCKET_URL + oss_task_zip
6883

6984
input_data = self.input_data.copy()
@@ -167,3 +182,6 @@ def map_dp_job_state(status):
167182
# job_tag_finished = job.job_hash + '_job_tag_finished'
168183
# return self.context.check_file_exists(job_tag_finished)
169184

185+
186+
class Lebesgue(DpCloudServer):
187+
pass

dpdispatcher/dp_cloud_server_context.py

Lines changed: 48 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,18 @@
11
#!/usr/bin/env python
22
# coding: utf-8
33
# %%
4+
import uuid
5+
46
from dargs.dargs import Argument
57
from dpdispatcher.base_context import BaseContext
68
from typing import List
79
import os
8-
# from dpdispatcher import dlog
10+
from dpdispatcher import dlog
911
# from dpdispatcher.submission import Machine
12+
# from . import dlog
1013
from .dpcloudserver.api import API
1114
from .dpcloudserver import zip_file
15+
import shutil
1216
# from zip_file import zip_files
1317
DP_CLOUD_SERVER_HOME_DIR = os.path.join(
1418
os.path.expanduser('~'),
@@ -66,6 +70,18 @@ def bind_submission(self, submission):
6670
# file_uuid = uuid.uuid1().hex
6771
# oss_task_dir = os.path.join()
6872

73+
def _gen_oss_path(self, job, zip_filename):
74+
if hasattr(job, 'upload_path') and job.upload_path:
75+
return job.upload_path
76+
else:
77+
program_id = self.remote_profile.get('program_id')
78+
if program_id is None:
79+
program_id = 0
80+
uid = uuid.uuid4()
81+
path = os.path.join("program", str(program_id), str(uid), zip_filename)
82+
setattr(job, 'upload_path', path)
83+
return path
84+
6985
def upload(self, submission):
7086
# oss_task_dir = os.path.join('%s/%s/%s.zip' % ('indicate', file_uuid, file_uuid))
7187
# zip_filename = submission.submission_hash + '.zip'
@@ -77,7 +93,7 @@ def upload(self, submission):
7793
for job in submission.belonging_jobs:
7894
self.machine.gen_local_script(job)
7995
zip_filename = job.job_hash + '.zip'
80-
oss_task_zip = 'indicate/' + job.job_hash + '/' + zip_filename
96+
oss_task_zip = self._gen_oss_path(job, zip_filename)
8197
zip_task_file = os.path.join(self.local_root, zip_filename)
8298

8399
upload_file_list = [job.script_file_name, ]
@@ -97,6 +113,7 @@ def upload(self, submission):
97113
file_list=upload_file_list
98114
)
99115
result = self.api.upload(oss_task_zip, upload_zip, ENDPOINT, BUCKET_NAME)
116+
self._backup(self.local_root, upload_zip, keep_backup=self.remote_profile.get('keep_backup', True))
100117
return result
101118
# return oss_task_zip
102119
# api.upload(self.oss_task_dir, zip_task_file)
@@ -110,23 +127,42 @@ def download(self, submission):
110127
if isinstance(job.job_id, str) and ':job_group_id:' in job.job_id:
111128
ids = job.job_id.split(":job_group_id:")
112129
jid, gid = int(ids[0]), int(ids[1])
113-
job_hashs[jid] = job.job_hash
130+
job_hashs[jid] = job.job_hash
114131
group_id = gid
115132
else:
116-
job_infos[job.job_hash] = self.get_tasks(job.job_id)[0]
133+
job_infos[job.job_hash] = self.api.get_tasks(job.job_id)[0]
117134
if group_id is not None:
118135
job_result = self.api.get_tasks_v2_list(group_id)
119136
for each in job_result:
120137
if 'result_url' in each and each['result_url'] != '' and each['status'] == 2:
121-
job_hash = job_hashs[each['task_id']]
138+
job_hash = ''
139+
if each['task_id'] not in job_hashs:
140+
dlog.info(f"find unexpect job_hash, but task {each['task_id']} still been download.")
141+
dlog.debug(str(job_hashs))
142+
job_hash = str(each['task_id'])
143+
else:
144+
job_hash = job_hashs[each['task_id']]
122145
job_infos[job_hash] = each
123-
for hash, info in job_infos.items():
124-
result_filename = hash + '_back.zip'
146+
for job_hash, info in job_infos.items():
147+
result_filename = job_hash + '_back.zip'
125148
target_result_zip = os.path.join(self.local_root, result_filename)
126149
self.api.download_from_url(info['result_url'], target_result_zip)
127150
zip_file.unzip_file(target_result_zip, out_dir=self.local_root)
151+
self._backup(self.local_root, target_result_zip, keep_backup=self.remote_profile.get('keep_backup', True))
128152
return True
129153

154+
def _backup(self, local_root, target, keep_backup=True):
155+
try:
156+
if keep_backup:
157+
# move to backup directory
158+
os.makedirs(os.path.join(local_root, 'backup'), exist_ok=True)
159+
shutil.move(target,
160+
os.path.join(local_root, 'backup', os.path.split(target)[1]))
161+
else:
162+
os.remove(target)
163+
except (OSError, shutil.Error) as e:
164+
dlog.exception("unable to backup file, " + str(e))
165+
130166
def write_file(self, fname, write_str):
131167
result = self.write_home_file(fname, write_str)
132168
return result
@@ -214,4 +250,9 @@ def machine_subfields(cls) -> List[Argument]:
214250
'None or empty.')
215251
], optional=False, doc="Configuration of job"),
216252
], doc=doc_remote_profile)]
253+
254+
255+
class LebesgueContext(DpCloudServerContext):
256+
pass
257+
217258
#%%

dpdispatcher/machine.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,9 @@ def gen_script_env(self, job):
201201
source_files_part = ""
202202

203203
module_unload_part = ""
204+
module_purge = job.resources.module_purge
205+
if module_purge:
206+
module_unload_part += "module purge\n"
204207
module_unload_list = job.resources.module_unload_list
205208
for ii in module_unload_list:
206209
module_unload_part += f"module unload {ii}\n"
@@ -218,7 +221,11 @@ def gen_script_env(self, job):
218221
export_envs_part = ""
219222
envs = job.resources.envs
220223
for k,v in envs.items():
221-
export_envs_part += f"export {k}={v}\n"
224+
if isinstance(v, list):
225+
for each_value in v:
226+
export_envs_part += f"export {k}={each_value}\n"
227+
else:
228+
export_envs_part += f"export {k}={v}\n"
222229

223230
flag_if_job_task_fail = job.job_hash + '_flag_if_job_task_fail'
224231

dpdispatcher/submission.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -659,6 +659,7 @@ def __init__(self,
659659
strategy=default_strategy,
660660
para_deg=1,
661661
module_unload_list=[],
662+
module_purge=False,
662663
module_list=[],
663664
source_list=[],
664665
envs={},
@@ -673,6 +674,7 @@ def __init__(self,
673674
self.custom_flags = custom_flags
674675
self.strategy = strategy
675676
self.para_deg = para_deg
677+
self.module_purge = module_purge
676678
self.module_unload_list = module_unload_list
677679
self.module_list = module_list
678680
self.source_list = source_list
@@ -707,6 +709,7 @@ def serialize(self):
707709
resources_dict['custom_flags'] = self.custom_flags
708710
resources_dict['strategy'] = self.strategy
709711
resources_dict['para_deg'] = self.para_deg
712+
resources_dict['module_purge'] = self.module_purge
710713
resources_dict['module_unload_list'] = self.module_unload_list
711714
resources_dict['module_list'] = self.module_list
712715
resources_dict['source_list'] = self.source_list
@@ -725,6 +728,7 @@ def deserialize(cls, resources_dict):
725728
custom_flags=resources_dict.get('custom_flags', []),
726729
strategy=resources_dict.get('strategy', default_strategy),
727730
para_deg=resources_dict.get('para_deg', 1),
731+
module_purge=resources_dict.get('module_purge', False),
728732
module_unload_list=resources_dict.get('module_unload_list', []),
729733
module_list=resources_dict.get('module_list', []),
730734
source_list=resources_dict.get('source_list', []),
@@ -757,6 +761,7 @@ def arginfo():
757761
doc_custom_flags = 'The extra lines pass to job submitting script header'
758762
doc_para_deg = 'Decide how many tasks will be run in parallel.'
759763
doc_source_list = 'The env file to be sourced before the command execution.'
764+
doc_module_purge = 'Remove all modules on HPC system before module load (module_list)'
760765
doc_module_unload_list = 'The modules to be unloaded on HPC system before submitting jobs'
761766
doc_module_list = 'The modules to be loaded on HPC system before submitting jobs'
762767
doc_envs = 'The environment variables to be exported on before submitting jobs'
@@ -779,6 +784,7 @@ def arginfo():
779784
strategy_format,
780785
Argument("para_deg", int, optional=True, doc=doc_para_deg, default=1),
781786
Argument("source_list", list, optional=True, doc=doc_source_list, default=[]),
787+
Argument("module_purge", bool, optional=True, doc=doc_module_purge, default=False),
782788
Argument("module_unload_list", list, optional=True, doc=doc_module_unload_list, default=[]),
783789
Argument("module_list", list, optional=True, doc=doc_module_list, default=[]),
784790
Argument("envs", dict, optional=True, doc=doc_envs, default={}),

tests/sample_class.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ def get_sample_resources_dict(cls):
4242
'custom_flags':[],
4343
'strategy':{'if_cuda_multi_devices': False},
4444
'para_deg':1,
45+
'module_purge':False,
4546
'module_unload_list':[],
4647
'module_list':[],
4748
'source_list':[],

0 commit comments

Comments
 (0)