-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy path_helpers.py
211 lines (175 loc) · 6.86 KB
/
_helpers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
import hashlib
import os
import sys
from os.path import isfile, realpath, join as opj, sep as pathsep
from string import Template
from configparser import ConfigParser
def attempt_load_config():
"""
tries to load config file from expected path in instances where neither a
filepath or dict-like object is provided
"""
splitpath = realpath(__file__).split(pathsep)
try:
try:
# get path to project root directory
splitroot = splitpath[: splitpath.index('cluster-tools-dartmouth') + 1]
project_root = pathsep.join(splitroot)
config_dir = opj(project_root, 'configs')
except ValueError as e:
# pass exceptions onto broad outer exception for function
raise FileNotFoundError(f"cluster-tools-dartmouth not found in path\
{realpath(__file__)}").with_traceback(e.__traceback__)
configs = os.listdir(config_dir)
# filter out hidden files and the template config
configs = [f for f in configs if not (f.startswith('template')
or f.startswith('.'))
]
if len(configs) == 1:
config_path = opj(config_dir, configs[0])
config = parse_config(config_path)
return config
else:
# fail if multiple or no config files are found
raise FileNotFoundError(f"Unable to determine which config file to \
read from {len(configs)} choices in {config_dir}")
except FileNotFoundError as e:
raise FileNotFoundError("Failed to load config file from expected \
location").with_traceback(e.__traceback__)
def fmt_remote_commands(commands):
"""
Formats a list-like iterable of shell commands to be run in the SshShell
instance. Necessary because underlying Python SSH client (Paramiko) won't
run any state changes between commands. So we run them all at once.
"""
assert hasattr(commands, "__iter__"), \
"Commands passed to fmt_remote_commands must be as an iterable (i.e., \
list-like) object"
executable = ['bash', '-c']
# TODO: switch to ; sep?
commands_str = [' && '.join(commands)]
return executable + commands_str
def get_qstat(remote_shell, options=None):
"""
Return the status of running "qstat" on the cluster, optionally with a
filter for the job's status
:param remote_shell: (spurplus.SshShell instance)
:param options: (str)
options to run along with the "qstat" command. For further
information, run "get_qstat(remote_shell, options=['man'])
locally or "man qstat" from the cluster.
:return qstat_output: (str) output of running command on the cluster
"""
if options is None:
cmd = ['qstat']
elif options == 'man':
cmd = ['man qstat']
elif not options.startswith('-'):
cmd = ['qstat -' + options]
else:
cmd = ['qstat ' + options]
cmds_fmt = fmt_remote_commands(cmd)
return remote_shell.check_output(cmds_fmt)
def md5_checksum(filepath):
"""
computes the MD5 checksum of a local file to compare against remote
NOTE: MD5 IS CONSIDERED CRYPTOGRAPHICALLY INSECURE
(see https://en.wikipedia.org/wiki/MD5#Security)
However, it's still very much suitable in cases (like ours) where one
wouldn't expect **intentional** data corruption
"""
hash_md5 = hashlib.md5()
with open(filepath, 'rb') as f:
# avoid having to read the whole file into memory at once
for chunk in iter(lambda: f.read(4096), b''):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def parse_config(config_path):
"""
parses various user-specifc options from config file in configs dir
"""
config_path = realpath(config_path)
if not isfile(config_path):
raise FileNotFoundError(f'Invalid path to config file: {config_path}')
raw_config = ConfigParser(inline_comment_prefixes='#')
with open(config_path, 'r') as f:
raw_config.read_file(f)
config = dict(raw_config['CONFIG'])
config['confirm_overwrite_on_upload'] = raw_config.getboolean(
'CONFIG', 'confirm_overwrite_on_upload'
)
config['confirm_resubmission'] = raw_config.getboolean(
'CONFIG', 'confirm_resubmission'
)
return config
def prompt_input(question, default=None):
"""
given a question, prompts user for command line input
returns True for 'yes'/'y' and False for 'no'/'n' responses
"""
assert default in ('yes', 'no', None), \
"Default response must be either 'yes', 'no', or None"
valid_responses = {
'yes': True,
'y': True,
'no': False,
'n': False
}
if default is None:
prompt = "[y/n]"
elif default == 'yes':
prompt = "[Y/n]"
else:
prompt = "[y/N]"
while True:
sys.stdout.write(f"{question}\n{prompt}")
response = input().lower()
# if user hits return without typing, return default response
if (default is not None) and (not response):
return valid_responses[default]
elif response in valid_responses:
return valid_responses[response]
else:
sys.stdout.write("Please respond with either 'yes' (or 'y') \
or 'no' (or 'n')\n")
def write_remote_submitter(remote_shell, job_config, env_activate_cmd, env_deactivate_cmd, submitter_walltime='12:00:00'):
remote_dir = job_config['workingdir']
# TODO: ability to handle custom-named submission script
submitter_fpath = opj(remote_dir, 'submit_jobs.sh')
try:
assert remote_shell.is_dir(remote_dir)
except AssertionError as e:
raise ValueError(
f"Can't create job submission script in dir: {remote_dir}. \
Intended directory is an existing file."
).with_traceback(e.__traceback__)
except FileNotFoundError as e:
raise FileNotFoundError(
f"Can't create job submission script in dir: {remote_dir}. \
Intended directory does not exist."
).with_traceback(e.__traceback__)
template_vals = {
'jobname': job_config['jobname'],
'walltime': submitter_walltime,
'modules': job_config['modules'],
'activate_cmd': env_activate_cmd,
'deactivate_cmd': env_deactivate_cmd,
'env_name': job_config['env_name'],
'cmd_wrapper': job_config['cmd_wrapper'],
'submitter_script': submitter_fpath
}
template = Template(
"""#!/bin/bash -l
#PBS -N ${jobname}-submitter
#PBS -q default
#PBS -l nodes=1:ppn=1
#PBS -l walltime=${walltime}
#PBS -m bea
module load $modules
$activate_cmd $env_name
$cmd_wrapper $submitter_script
$deactivate_cmd"""
)
content = template.substitute(template_vals)
remote_shell.write_text(submitter_fpath, content)
return submitter_fpath