Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lock mechanism for scratch_dir creation, small refactoring #713

Merged
merged 6 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 34 additions & 17 deletions cdci_data_analysis/flask_app/dispatcher_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import glob
import string
import random
import fcntl

from flask import jsonify, send_from_directory, make_response
from flask import request, g
Expand Down Expand Up @@ -886,9 +887,13 @@
return request_files_dir.path

def set_scratch_dir(self, session_id, job_id=None, verbose=False):
if verbose == True:
print('SETSCRATCH ---->', session_id,
type(session_id), job_id, type(job_id))
lock_file = f".lock_{self.job_id}"
scratch_dir_retry_attempts = 5
scratch_dir_retry_delay = 0.2
scratch_dir_created = True

if verbose:
print('SETSCRATCH ---->', session_id, type(session_id), job_id, type(job_id))

Check warning on line 896 in cdci_data_analysis/flask_app/dispatcher_query.py

View check run for this annotation

Codecov / codecov/patch

cdci_data_analysis/flask_app/dispatcher_query.py#L896

Added line #L896 was not covered by tests

wd = 'scratch'

Expand All @@ -898,14 +903,28 @@
if job_id is not None:
wd += '_jid_'+job_id

alias_workdir = self.get_existing_job_ID_path(
wd=FilePath(file_dir=wd).path)
if alias_workdir is not None:
wd = wd+'_aliased'

wd = FilePath(file_dir=wd)
wd.mkdir()
self.scratch_dir = wd.path
for attempt in range(scratch_dir_retry_attempts):
try:
with open(lock_file, 'w') as lock:
fcntl.flock(lock, fcntl.LOCK_EX | fcntl.LOCK_NB)
alias_workdir = self.get_existing_job_ID_path(wd=FilePath(file_dir=wd).path)
if alias_workdir is not None:
wd = wd + '_aliased'

wd_path_obj = FilePath(file_dir=wd)
wd_path_obj.mkdir()
self.scratch_dir = wd_path_obj.path
scratch_dir_created = True
break
except (OSError, IOError) as io_e:
scratch_dir_created = False
self.logger.warning(f'Failed to acquire lock for the scratch directory creation, attempt number {attempt + 1} ({scratch_dir_retry_attempts - (attempt + 1)} left), sleeping {scratch_dir_retry_delay} seconds until retry.\nError: {str(io_e)}')
time.sleep(scratch_dir_retry_delay)

if not scratch_dir_created:
dir_list = glob.glob(f"*_jid_{job_id}*")
sentry.capture_message(f"Failed to acquire lock for directory creation after multiple attempts.\njob_id: {self.job_id}\ndir_list: {dir_list}")
raise InternalError(f"Failed to acquire lock for directory creation after {scratch_dir_retry_attempts} attempts.", status_code=500)

def set_temp_dir(self, session_id, job_id=None, verbose=False):
if verbose:
Expand Down Expand Up @@ -1659,9 +1678,7 @@
def get_existing_job_ID_path(self, wd):
# exist same job_ID, different session ID
dir_list = glob.glob(f'*_jid_{self.job_id}')
# print('dirs',dir_list)
if dir_list:
dir_list = [d for d in dir_list if 'aliased' not in d]
dir_list = [d for d in dir_list if 'aliased' not in d]

if len(dir_list) == 1:
if dir_list[0] != wd:
Expand All @@ -1670,9 +1687,8 @@
alias_dir = None

elif len(dir_list) > 1:
sentry.capture_message(f'Found two non aliased identical job_id, dir_list: {dir_list}')
self.logger.warning(f'Found two non aliased identical job_id, dir_list: {dir_list}')

sentry.capture_message(f'Found two or more non aliased identical job_id, dir_list: {dir_list}')
self.logger.warning(f'Found two or more non aliased identical job_id, dir_list: {dir_list}')
raise InternalError("We have encountered an internal error! "
"Our team is notified and is working on it. We are sorry! "
"When we find a solution we will try to reach you",
Expand All @@ -1683,6 +1699,7 @@

return alias_dir


def get_file_mtime(self, file):
return os.path.getmtime(file)

Expand Down
51 changes: 51 additions & 0 deletions tests/test_server_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import shutil
import urllib
import io

import requests
import time
import uuid
Expand All @@ -11,6 +12,7 @@
import jwt
import glob
import pytest
import fcntl
from datetime import datetime, timedelta
from dateutil import parser, tz
from functools import reduce
Expand Down Expand Up @@ -320,6 +322,55 @@ def test_error_two_scratch_dir_same_job_id(dispatcher_live_fixture):
os.rmdir(fake_scratch_dir)


@pytest.mark.not_safe_parallel
@pytest.mark.fast
def test_scratch_dir_creation_lock_error(dispatcher_live_fixture):
DispatcherJobState.remove_scratch_folders()
server = dispatcher_live_fixture
logger.info("constructed server: %s", server)

encoded_token = jwt.encode(default_token_payload, secret_key, algorithm='HS256')
# issuing a request each, with the same set of parameters
params = dict(
query_status="new",
query_type="Real",
instrument="empty-async",
product_type="dummy",
token=encoded_token
)
DataServerQuery.set_status('submitted')
# let's generate a fake scratch dir
jdata = ask(server,
params,
expected_query_status=["submitted"],
max_time_s=50,
)

job_id = jdata['job_monitor']['job_id']
session_id = jdata['session_id']
fake_scratch_dir = f'scratch_sid_01234567890_jid_{job_id}'
os.makedirs(fake_scratch_dir)

params['job_id'] = job_id
params['session_id'] = session_id

lock_file = f".lock_{job_id}"

with open(lock_file, 'w') as f_lock:
fcntl.flock(f_lock, fcntl.LOCK_EX)

jdata = ask(server,
params,
expected_status_code=500,
expected_query_status=None,
)
scratch_dir_retry_attempts = 5
assert jdata['error'] == f"InternalError():Failed to acquire lock for directory creation after {scratch_dir_retry_attempts} attempts."
assert jdata['error_message'] == f"Failed to acquire lock for directory creation after {scratch_dir_retry_attempts} attempts."
os.rmdir(fake_scratch_dir)
os.remove(lock_file)


@pytest.mark.fast
def test_same_request_different_users(dispatcher_live_fixture):
server = dispatcher_live_fixture
Expand Down
Loading