Skip to content

Commit 50c55a5

Browse files
authored
remove lebesgue version 1 api support. add progress bar, and fix resume cause upload. (#162)
* add envs , module purge and lebesgue context * fix format * fix backup * add module_purge to test * fix test case * refract backup * fix unexpected lebesgue job status * change logger method to exception * fix dlog * fix type * remove version 1 and add tqdm * newline * rm newline * change execption to dlog * add tqdm * fix uploading to lebesgue * add more log * improve upload * fix raise * more info * fix check exist * remove some log * fix rmdir * fix keep backup * remove
1 parent d792862 commit 50c55a5

File tree

5 files changed

+124
-82
lines changed

5 files changed

+124
-82
lines changed

dpdispatcher/dp_cloud_server.py

Lines changed: 40 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
1+
import shutil
2+
13
from dpdispatcher.JobStatus import JobStatus
24
from dpdispatcher import dlog
5+
from dpdispatcher.dpcloudserver import zip_file
36
from dpdispatcher.machine import Machine
47
from dpdispatcher.dpcloudserver.api import API
58
from dpdispatcher.dpcloudserver.config import ALI_OSS_BUCKET_URL
@@ -18,9 +21,9 @@ def __init__(self, context):
1821
self.input_data = context.remote_profile['input_data'].copy()
1922
self.api_version = 2
2023
if 'api_version' in self.input_data:
21-
self.api_version = self.input_data.get('api_version')
24+
self.api_version = self.input_data.get('api_version', 2)
2225
if 'lebesgue_version' in self.input_data:
23-
self.api_version = self.input_data.get('lebesgue_version')
26+
self.api_version = self.input_data.get('lebesgue_version', 2)
2427
self.grouped = self.input_data.get('grouped', False)
2528
email = context.remote_profile.get("email", None)
2629
username = context.remote_profile.get('username', None)
@@ -33,7 +36,7 @@ def __init__(self, context):
3336
if password is None:
3437
raise ValueError("can not find password in remote_profile, please check your machine file.")
3538
if self.api_version == 1:
36-
warnings.warn('api version 1 is deprecated and will be removed in a future version. Use version 2 instead.', DeprecationWarning)
39+
raise DeprecationWarning('api version 1 is deprecated. Use version 2 instead.')
3740
self.api = API(email, password)
3841
self.group_id = None
3942

@@ -88,26 +91,17 @@ def do_submit(self, job):
8891
# input_data['backward_files'] = self._gen_backward_files_list(job)
8992
if self.context.remote_profile.get('program_id') is None:
9093
warnings.warn('program_id will be compulsory in the future.')
91-
job_id = None
92-
if self.api_version == 2:
93-
job_id, group_id = self.api.job_create_v2(
94-
job_type=input_data['job_type'],
95-
oss_path=input_data['job_resources'],
96-
input_data=input_data,
97-
program_id=self.context.remote_profile.get('program_id', None),
98-
group_id=self.group_id
99-
)
100-
if self.grouped:
101-
self.group_id = group_id
102-
job.job_id = str(job_id) + ':job_group_id:' + str(group_id)
103-
job_id = job.job_id
104-
else:
105-
job_id = self.api.job_create(
106-
job_type=input_data['job_type'],
107-
oss_path=input_data['job_resources'],
108-
input_data=input_data,
109-
program_id=self.context.remote_profile.get('program_id', None)
110-
)
94+
job_id, group_id = self.api.job_create(
95+
job_type=input_data['job_type'],
96+
oss_path=input_data['job_resources'],
97+
input_data=input_data,
98+
program_id=self.context.remote_profile.get('program_id', None),
99+
group_id=self.group_id
100+
)
101+
if self.grouped:
102+
self.group_id = group_id
103+
job.job_id = str(job_id) + ':job_group_id:' + str(group_id)
104+
job_id = job.job_id
111105
job.job_state = JobStatus.waiting
112106
return job_id
113107

@@ -126,28 +120,40 @@ def check_status(self, job):
126120
dlog.debug(f"debug: check_status; job.job_id:{job_id}; job.job_hash:{job.job_hash}")
127121
check_return = None
128122
# print("api",self.api_version,self.input_data.get('job_group_id'),job.job_id)
129-
if self.api_version == 2:
130-
check_return = self.api.get_tasks_v2(job_id,group_id)
131-
else:
132-
check_return = self.api.get_tasks(job_id)
123+
check_return = self.api.get_tasks(job_id,group_id)
133124
try:
134-
dp_job_status = check_return[0]["status"]
125+
dp_job_status = check_return["status"]
135126
except IndexError as e:
136127
dlog.error(f"cannot find job information in check_return. job {job.job_id}. check_return:{check_return}; retry one more time after 60 seconds")
137128
time.sleep(60)
138-
retry_return = None
139-
if self.api_version == 2:
140-
retry_return = self.api.get_tasks_v2(job_id, group_id)
141-
else:
142-
retry_return = self.api.get_tasks(job_id)
129+
retry_return = self.api.get_tasks(job_id, group_id)
143130
try:
144-
dp_job_status = retry_return[0]["status"]
131+
dp_job_status = retry_return["status"]
145132
except IndexError as e:
146133
raise RuntimeError(f"cannot find job information in dpcloudserver's database for job {job.job_id} {check_return} {retry_return}")
147134

148135
job_state = self.map_dp_job_state(dp_job_status)
136+
if job_state == JobStatus.finished:
137+
self._download_job(job)
149138
return job_state
150139

140+
141+
def _download_job(self, job):
142+
job_url = self.api.get_job_result_url(job.job_id)
143+
if not job_url:
144+
return
145+
job_hash = job.job_hash
146+
result_filename = job_hash + '_back.zip'
147+
target_result_zip = os.path.join(self.context.local_root, result_filename)
148+
self.api.download_from_url(job_url, target_result_zip)
149+
zip_file.unzip_file(target_result_zip, out_dir=self.context.local_root)
150+
try:
151+
os.makedirs(os.path.join(self.context.local_root, 'backup'), exist_ok=True)
152+
shutil.move(target_result_zip,
153+
os.path.join(self.context.local_root, 'backup', os.path.split(target_result_zip)[1]))
154+
except (OSError, shutil.Error) as e:
155+
dlog.exception("unable to backup file, " + str(e))
156+
151157
def check_finish_tag(self, job):
152158
job_tag_finished = job.job_hash + '_job_tag_finished'
153159
dlog.info('check if job finished: ',job.job_id, job_tag_finished)

dpdispatcher/dp_cloud_server_context.py

Lines changed: 41 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from .dpcloudserver.api import API
1414
from .dpcloudserver import zip_file
1515
import shutil
16+
import tqdm
1617
# from zip_file import zip_files
1718
DP_CLOUD_SERVER_HOME_DIR = os.path.join(
1819
os.path.expanduser('~'),
@@ -89,8 +90,17 @@ def upload(self, submission):
8990

9091
# zip_path = "/home/felix/workplace/22_dpdispatcher/dpdispatcher-yfb/dpdispatcher/dpcloudserver/t.txt"
9192
# zip_path = self.local_root
92-
93+
bar_format = "{l_bar}{bar}| {n:.02f}/{total:.02f} % [{elapsed}<{remaining}, {rate_fmt}{postfix}]"
94+
job_to_be_uploaded = []
95+
result = None
96+
dlog.info("checking all job has been uploaded")
9397
for job in submission.belonging_jobs:
98+
if not self.api.check_job_has_uploaded(job.job_id):
99+
job_to_be_uploaded.append(job)
100+
if len(job_to_be_uploaded) == 0:
101+
dlog.info("all job has been uploaded, continue")
102+
return result
103+
for job in tqdm.tqdm(job_to_be_uploaded, desc="Uploading to Lebesgue", bar_format=bar_format):
94104
self.machine.gen_local_script(job)
95105
zip_filename = job.job_hash + '.zip'
96106
oss_task_zip = self._gen_oss_path(job, zip_filename)
@@ -113,7 +123,7 @@ def upload(self, submission):
113123
file_list=upload_file_list
114124
)
115125
result = self.api.upload(oss_task_zip, upload_zip, ENDPOINT, BUCKET_NAME)
116-
self._backup(self.local_root, upload_zip, keep_backup=self.remote_profile.get('keep_backup', True))
126+
self._backup(self.local_root, upload_zip)
117127
return result
118128
# return oss_task_zip
119129
# api.upload(self.oss_task_dir, zip_task_file)
@@ -124,15 +134,12 @@ def download(self, submission):
124134
group_id = None
125135
job_infos = {}
126136
for job in jobs:
127-
if isinstance(job.job_id, str) and ':job_group_id:' in job.job_id:
128-
ids = job.job_id.split(":job_group_id:")
129-
jid, gid = int(ids[0]), int(ids[1])
130-
job_hashs[jid] = job.job_hash
131-
group_id = gid
132-
else:
133-
job_infos[job.job_hash] = self.api.get_tasks(job.job_id)[0]
137+
ids = job.job_id.split(":job_group_id:")
138+
jid, gid = int(ids[0]), int(ids[1])
139+
job_hashs[jid] = job.job_hash
140+
group_id = gid
134141
if group_id is not None:
135-
job_result = self.api.get_tasks_v2_list(group_id)
142+
job_result = self.api.get_tasks_list(group_id)
136143
for each in job_result:
137144
if 'result_url' in each and each['result_url'] != '' and each['status'] == 2:
138145
job_hash = ''
@@ -143,26 +150,40 @@ def download(self, submission):
143150
else:
144151
job_hash = job_hashs[each['task_id']]
145152
job_infos[job_hash] = each
146-
for job_hash, info in job_infos.items():
153+
bar_format = "{l_bar}{bar}| {n:.02f}/{total:.02f} % [{elapsed}<{remaining}, {rate_fmt}{postfix}]"
154+
for job_hash, info in tqdm.tqdm(job_infos.items(), desc="Validating download file from Lebesgue", bar_format=bar_format):
147155
result_filename = job_hash + '_back.zip'
148156
target_result_zip = os.path.join(self.local_root, result_filename)
157+
if self._check_if_job_has_already_downloaded(target_result_zip, self.local_root):
158+
continue
149159
self.api.download_from_url(info['result_url'], target_result_zip)
150160
zip_file.unzip_file(target_result_zip, out_dir=self.local_root)
151-
self._backup(self.local_root, target_result_zip, keep_backup=self.remote_profile.get('keep_backup', True))
161+
self._backup(self.local_root, target_result_zip)
162+
self._clean_backup(self.local_root, keep_backup=self.remote_profile.get('keep_backup', True))
152163
return True
153164

154-
def _backup(self, local_root, target, keep_backup=True):
165+
def _check_if_job_has_already_downloaded(self, target, local_root):
166+
backup_file_location = os.path.join(local_root, 'backup', os.path.split(target)[1])
167+
if os.path.exists(backup_file_location):
168+
return True
169+
else:
170+
return False
171+
172+
def _backup(self, local_root, target):
155173
try:
156-
if keep_backup:
157-
# move to backup directory
158-
os.makedirs(os.path.join(local_root, 'backup'), exist_ok=True)
159-
shutil.move(target,
160-
os.path.join(local_root, 'backup', os.path.split(target)[1]))
161-
else:
162-
os.remove(target)
174+
# move to backup directory
175+
os.makedirs(os.path.join(local_root, 'backup'), exist_ok=True)
176+
shutil.move(target,
177+
os.path.join(local_root, 'backup', os.path.split(target)[1]))
163178
except (OSError, shutil.Error) as e:
164179
dlog.exception("unable to backup file, " + str(e))
165180

181+
def _clean_backup(self, local_root, keep_backup=True):
182+
if not keep_backup:
183+
dir_to_be_removed = os.path.join(local_root, 'backup')
184+
if os.path.exists(dir_to_be_removed):
185+
shutil.rmtree(dir_to_be_removed)
186+
166187
def write_file(self, fname, write_str):
167188
result = self.write_home_file(fname, write_str)
168189
return result

dpdispatcher/dpcloudserver/api.py

Lines changed: 40 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -170,18 +170,8 @@ def upload(self, oss_task_zip, zip_task_file, endpoint, bucket_name):
170170
# print('debug:upload_result:', result, dir())
171171
return result
172172

173-
def job_create(self, job_type, oss_path, input_data, program_id=None):
174-
post_data = {
175-
'job_type': job_type,
176-
'oss_path': oss_path,
177-
'input_data': input_data,
178-
}
179-
if program_id is not None:
180-
post_data["program_id"] = program_id
181-
ret = self.post('/data/insert_job', post_data)
182-
return ret['job_id']
183173

184-
def job_create_v2(self, job_type, oss_path, input_data, program_id=None, group_id=None):
174+
def job_create(self, job_type, oss_path, input_data, program_id=None, group_id=None):
185175
post_data = {
186176
'job_type': job_type,
187177
'oss_path': oss_path,
@@ -210,17 +200,7 @@ def get_jobs(self, page=1, per_page=10):
210200
)
211201
return ret['items']
212202

213-
def get_tasks(self, job_id, page=1, per_page=10):
214-
ret = self.get(
215-
f'data/job/{job_id}/tasks',
216-
{
217-
'page': page,
218-
'per_page': per_page,
219-
}
220-
)
221-
return ret['items']
222-
223-
def get_tasks_v2(self, job_id, group_id, page=1, per_page=10):
203+
def get_tasks(self, job_id, group_id, page=1, per_page=10):
224204
ret = self.get(
225205
f'data/job/{group_id}/tasks',
226206
{
@@ -230,12 +210,12 @@ def get_tasks_v2(self, job_id, group_id, page=1, per_page=10):
230210
)
231211
for each in ret['items']:
232212
if job_id == each["task_id"]:
233-
return [each]
213+
return each
234214
if len(ret['items']) != 0:
235-
return self.get_tasks_v2(job_id, group_id, page=page + 1)
236-
return []
215+
return self.get_tasks(job_id, group_id, page=page + 1)
216+
return None
237217

238-
def get_tasks_v2_list(self, group_id, per_page=30):
218+
def get_tasks_list(self, group_id, per_page=30):
239219
result = []
240220
page = 0
241221
while True:
@@ -253,4 +233,38 @@ def get_tasks_v2_list(self, group_id, per_page=30):
253233
page += 1
254234
return result
255235

236+
def check_job_has_uploaded(self, job_id):
237+
try:
238+
if not job_id:
239+
return False
240+
if 'job_group_id' in job_id:
241+
ids = job_id.split(":job_group_id:")
242+
job_id, _ = int(ids[0]), int(ids[1])
243+
ret = self.get(f'data/job/{job_id}', {})
244+
if len(ret) == 0:
245+
return False
246+
if ret.get('input_data'):
247+
return True
248+
else:
249+
return False
250+
except ValueError as e:
251+
dlog.error(e)
252+
return False
253+
254+
def get_job_result_url(self, job_id):
255+
try:
256+
if not job_id:
257+
return None
258+
if 'job_group_id' in job_id:
259+
ids = job_id.split(":job_group_id:")
260+
job_id, _ = int(ids[0]), int(ids[1])
261+
ret = self.get(f'data/job/{job_id}', {})
262+
if 'result_url' in ret and len(ret['result_url']) != 0:
263+
return ret.get('result_url')
264+
else:
265+
return None
266+
except ValueError as e:
267+
dlog.error(e)
268+
return None
269+
256270
# %%

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
paramiko
22
dargs>=0.2.6
33
oss2
4+
tqdm

setup.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
with open(path.join(NAME, '_date.py'), 'w') as fp :
2020
fp.write('date = \'%s\'' % today)
2121

22-
install_requires=['paramiko', 'dargs>=0.2.6', 'requests']
22+
install_requires=['paramiko', 'dargs>=0.2.6', 'requests', 'tqdm']
2323

2424
setuptools.setup(
2525
name=NAME,
@@ -41,7 +41,7 @@
4141
install_requires=install_requires,
4242
extras_require={
4343
'docs': ['sphinx', 'recommonmark', 'sphinx_rtd_theme>=1.0.0rc1', 'numpydoc'],
44-
"cloudserver": ["oss2"],
44+
"cloudserver": ["oss2", "tqdm"],
4545
":python_version<'3.7'": ["typing_extensions"],
4646
},
4747
entry_points={

0 commit comments

Comments
 (0)