-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsubmission.py
290 lines (241 loc) · 13 KB
/
submission.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
import tarfile
from datetime import datetime, timedelta
from time import sleep
import os
from werkzeug.utils import secure_filename
from gevent.event import Event
from slivka_client import SlivkaClient
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_exponential
from requests.exceptions import RequestException, HTTPError, ConnectionError, Timeout
from config import SESSIONS_FOLDER, SLIVKA_URL, EXPIRATION_DAYS
from logger_config import setup_logging
from session_db import insert_metadata, update_status, update_slivka_id
custom_logger = setup_logging(name='submission')
class SubmissionHandler:
"""Handles FASTA file submissions and associated processing."""
def __init__(self, session_id, form, service_type, config=None, tar_upload=False):
"""Initialize a SubmissionHandler instance.
Args:
session_id (str): Unique identifier for the submission session.
form (FlaskForm): Form object containing the submission details.
service_type (str): Type of service to use for processing.
config (dict): Optional configuration dictionary.
tar_upload (bool): Flag indicating if the upload is a tar file.
"""
self.session_id = session_id
self.form = form
self.service_type = service_type
self.config = config or {}
self.tar_upload = tar_upload
self.submission_time = datetime.now()
self.session_directory = self.create_directory()
self.submission_directory = self.create_submission_directory()
self.filename = None
self.file_path = None
self.metadata_available = Event() # Create an event to signal metadata availability
self.slivka_job_triggered = Event() # Create an event to signal Slivka job submission
self.job_success = None
def create_directory(self):
"""Create a directory for the submission session.
Returns:
str: The path to the created directory.
"""
session_directory = os.path.join(SESSIONS_FOLDER, self.session_id)
os.makedirs(session_directory, exist_ok=True)
custom_logger.debug(f"Directory created for session {self.session_id}.")
return session_directory
def create_submission_directory(self):
"""Create a unique directory for each submission."""
timestamp = self.submission_time.strftime('%Y%m%d%H%M%S')
submission_directory = os.path.join(self.session_directory, f"{timestamp}")
os.makedirs(submission_directory, exist_ok=True)
custom_logger.debug(f"Directory created for submission {self.session_id}/{timestamp}.")
return submission_directory
def save_submission_data(self):
"""Save the uploaded data."""
if self.tar_upload:
self.file_path = os.path.join(self.submission_directory, 'submission.tar.gz')
self.filename = 'submission.tar.gz'
self.save_and_tar_files()
else:
self.save_sequence()
def save_and_tar_files(self):
"""Save and tar the uploaded FASTA files."""
with tarfile.open(self.file_path, "w:gz") as tar:
for file in self.form.files.data:
filename = secure_filename(file.filename)
file_path = os.path.join(self.submission_directory, filename)
file.save(file_path)
tar.add(file_path, arcname=filename)
custom_logger.info(f"Uploaded files saved and tarred for session {self.session_id}.")
def save_sequence(self):
"""Save the uploaded FASTA file or the input sequence."""
if self.form.fasta_file.data:
self.filename = self.form.fasta_file.data.filename
self.file_path = os.path.join(self.submission_directory, self.filename)
self.form.fasta_file.data.save(self.file_path)
else:
self.filename = 'sequence.fasta'
self.file_path = os.path.join(self.submission_directory, self.filename)
with open(self.file_path, 'w') as f:
f.write(self.form.sequence.data)
custom_logger.info(f"FASTA data saved for session {self.session_id}.")
def store_submission_metadata(self):
"""Insert metadata related to the submission into the database."""
expiration_time = (self.submission_time + timedelta(days=EXPIRATION_DAYS)).strftime('%Y-%m-%d %H:%M:%S')
# TODO: output.fasta should be replaced with an actual output file name or some other meaningful result or ID
self.entry_id = insert_metadata(self.session_id, self.filename, 'output.fasta', self.submission_time.strftime('%Y-%m-%d %H:%M:%S'), 'uploaded', expiration_time)
self.metadata_available.set() # Signal that metadata is available
custom_logger.info(f"Metadata inserted into database for session {self.session_id}.")
def read_cached_submission(self):
"""Read the saved submission file.
Returns:
str or bytes: The content of the submission file.
"""
if self.file_path.endswith(('.tar', '.tar.gz', '.tgz')):
mode = 'rb'
else:
mode = 'r'
with open(self.file_path, mode) as f:
return f.read()
def process_and_save_results(self, fasta_content):
"""Process the FASTA file content and save the results."""
processor = SlivkaProcessor(SLIVKA_URL, service=self.service_type, session_id=self.session_id, filename=self.filename, entry_id=self.entry_id, config=self.config)
output_file_path = os.path.join(self.submission_directory, 'output.fasta')
self.job_success = processor.process_file(self.file_path, output_file_path, self.submission_directory, trigger_event=self.slivka_job_triggered)
def update_db_status(self, status):
"""Update the processing status in the database."""
update_status(self.entry_id, status)
custom_logger.info(f"Status updated for session {self.session_id}.")
def handle_submission(self):
"""Handle the submission by orchestrating the various steps.
Returns:
dict: A dictionary containing the status, message, and other details of the submission.
"""
result = {'status': 'failed', 'message': '', 'filename': None}
try:
self.save_submission_data()
self.store_submission_metadata()
fasta_content = self.read_cached_submission()
self.process_and_save_results(fasta_content)
status = 'Ready' if self.job_success else 'Failed'
self.update_db_status(status)
result.update({
'status': 'success',
'message': 'File processed successfully.',
'directory': self.session_directory,
'filename': self.filename
})
except Exception as e:
custom_logger.error(f"An error occurred while handling submission for session {self.session_id}: {str(e)}")
result['status'] = 'failed'
result['message'] = str(e)
return result
class SlivkaProcessor:
"""Handles the processing of FASTA files using Slivka."""
@retry(stop=stop_after_attempt(5), wait=wait_exponential(min=1, max=10), retry=retry_if_exception_type((RequestException, HTTPError, ConnectionError, Timeout)))
def __init__(self, slivka_url, service, session_id, filename, entry_id, config=None):
self.client = SlivkaClient(slivka_url)
self.service = self.client[service]
self.session_id = session_id
self.filename = filename
self.entry_id = entry_id
self.config = config or {}
def process_file(self, input_file_path, output_file_path, submission_directory, trigger_event=None):
"""Process the given FASTA file using Slivka.
Args:
input_file_path (str): The path to the input FASTA file.
output_file_path (str): The path where the output should be saved.
Returns:
bool: True if processing was successful, False otherwise.
"""
try:
custom_logger.info(f"Starting file processing for session {self.session_id}.")
custom_logger.debug(f"Input file path: {input_file_path}")
custom_logger.debug(f"Output file path: {output_file_path}")
custom_logger.debug(f"Submission directory: {submission_directory}")
# Open the FASTA file and set the media type
with open(input_file_path, 'rb') as file_object:
media_type = 'application/fasta'
# Submit the job to Slivka
job = self.submit_job_to_slivka(file_object, media_type)
if trigger_event:
trigger_event.set()
# Wait for the job to complete
status = self.wait_for_job_completion(job)
custom_logger.info(f"Job {job.id} completed with status: {job.status}")
if status == 'FAILED':
custom_logger.error(f"Job {job.id} failed.")
return False
# TODO: Handle job failure
# TODO: Download could be done on demand when the user requests the results...
# Download the job results
self.download_job_results(job, submission_directory)
custom_logger.info(f"File processing completed successfully for session {self.session_id}.")
return True
except FileNotFoundError as e:
custom_logger.error(f"File not found: {e.filename}")
custom_logger.error(f"An error occurred while processing the submission: {str(e)}")
return False
except Exception as e:
custom_logger.error(f"An unexpected error occurred while processing the submission: {str(e)}")
return False
@retry(stop=stop_after_attempt(5), wait=wait_exponential(min=1, max=10), retry=retry_if_exception_type((RequestException, HTTPError, ConnectionError, Timeout)))
def submit_job_to_slivka(self, file_object, media_type):
"""Submit the given file to Slivka for processing with retries on failure.
Args:
file_object (file): The file object to submit.
media_type (str): The media type of the file.
Returns:
SlivkaJob: The job object representing the submitted job.
"""
data = self.config
file_key = data.pop('file_key_override', 'input')
# Create the 'files' dictionary with the correct format
files = {
file_key: (os.path.basename(file_object.name), file_object, media_type)
}
# Submit the job to Slivka
job = self.service.submit_job(data=data, files=files)
custom_logger.info(f"Job submitted: {job.id}")
# Update the slivka_id in the database
update_slivka_id(self.entry_id, job.id)
return job
@retry(stop=stop_after_attempt(5), wait=wait_exponential(min=1, max=10), retry=retry_if_exception_type((RequestException, HTTPError, ConnectionError, Timeout)))
def wait_for_job_completion(self, job):
"""Wait for the given job to complete.
Args:
job (SlivkaJob): The job object representing the submitted job.
"""
last_status = None
# Wait for the job to complete
while job.status not in ('COMPLETED', 'FAILED'):
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
custom_logger.info(f"Polling job status at {current_time}... (Status: {job.status})")
if job.status != last_status:
update_status(self.entry_id, job.status)
last_status = job.status
sleep(3) # Polling interval
# Update the status one last time after the loop ends
update_status(self.entry_id, job.status)
custom_logger.info(f"Completion Time: {job.completion_time}")
return job.status
@retry(stop=stop_after_attempt(5), wait=wait_exponential(min=1, max=10), retry=retry_if_exception_type((RequestException, HTTPError, ConnectionError, Timeout)))
def download_job_results(self, job, subbmission_directory):
"""Download the results of the given job to the specified directory.
Args:
job (SlivkaJob): The job object representing the completed job.
submission_directory (str): The directory where the results should be saved.
"""
# TODO: Ensure retires are handled at correct level (ie. per file?) or ensure downloads are idempotent
# Download each file in the job results
for file in job.files:
try:
# TODO: Explore optimizing the local path
local_path = os.path.join(subbmission_directory, *(file.id.split('/')[1:]))
os.makedirs(os.path.dirname(local_path), exist_ok=True)
file.dump(local_path)
custom_logger.info(f"File {file.id} downloaded to {local_path}")
except Exception as e:
custom_logger.error(f"An error occurred while downloading file {file.id}: {str(e)}")
raise