From a83f2e866bcb2578185313bd36089fdfb90c3dab Mon Sep 17 00:00:00 2001 From: burnout87 Date: Tue, 9 Jan 2024 14:50:55 +0100 Subject: [PATCH 01/17] group_by_job option inspect_state --- oda_api/api.py | 4 ++-- oda_api/cli.py | 5 +++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/oda_api/api.py b/oda_api/api.py index f5b5d6d6..ae16ef11 100644 --- a/oda_api/api.py +++ b/oda_api/api.py @@ -336,8 +336,8 @@ def __init__(self, } } - def inspect_state(self, job_id=None): - params = dict(token=oda_api.token.discover_token()) + def inspect_state(self, job_id=None, group_by_job=False): + params = dict(token=oda_api.token.discover_token(), group_by_job=group_by_job) if job_id is not None: params['job_id'] = job_id diff --git a/oda_api/cli.py b/oda_api/cli.py index 3149836f..cdaa9d81 100644 --- a/oda_api/cli.py +++ b/oda_api/cli.py @@ -167,14 +167,15 @@ def mutate_token_payload(payload): @cli.command("inspect") @click.option("-s", "--store", default="dispatcher-state.json") @click.option("-j", "--job-id", default=None) +@click.option("--group-by-job", default=False, is_flag=True) @click.option("-l", "--local", default=False, is_flag=True) # @click.option("-V", "--validate", default=None) @click.pass_obj -def inspect_state(obj, store, job_id, local): +def inspect_state(obj, store, job_id, local, group_by_job): if local: state = json.load(open(store)) else: - state = obj['dispatcher'].inspect_state(job_id=job_id) + state = obj['dispatcher'].inspect_state(job_id=job_id, group_by_job=group_by_job) json.dump( state, open(store, "w"), From 71857330ece03a90f80af5611f9e66041cce935a Mon Sep 17 00:00:00 2001 From: burnout87 Date: Wed, 10 Jan 2024 13:28:57 +0100 Subject: [PATCH 02/17] dispatcher test branch --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index d0339578..70e0bf7d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,7 +6,7 @@ matplotlib numpy jsonschema astroquery --e git+https://github.com/oda-hub/dispatcher-app.git#egg=cdci_data_analysis[test] +-e git+https://github.com/oda-hub/dispatcher-app.git@flask_restx-version#egg=cdci_data_analysis[test] simplejson sentry_sdk rdflib From 74e9d7cf2a31034ab6f6785b956895d1fcff2ab4 Mon Sep 17 00:00:00 2001 From: burnout87 Date: Wed, 10 Jan 2024 14:37:05 +0100 Subject: [PATCH 03/17] master branch dispatcher --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 70e0bf7d..d0339578 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,7 +6,7 @@ matplotlib numpy jsonschema astroquery --e git+https://github.com/oda-hub/dispatcher-app.git@flask_restx-version#egg=cdci_data_analysis[test] +-e git+https://github.com/oda-hub/dispatcher-app.git#egg=cdci_data_analysis[test] simplejson sentry_sdk rdflib From b0c420a523fe9066297431620256bb4051a60479 Mon Sep 17 00:00:00 2001 From: burnout87 Date: Wed, 10 Jan 2024 17:51:05 +0100 Subject: [PATCH 04/17] different outputs if grouped by job_id --- oda_api/cli.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/oda_api/cli.py b/oda_api/cli.py index cdaa9d81..0153195a 100644 --- a/oda_api/cli.py +++ b/oda_api/cli.py @@ -184,11 +184,17 @@ def inspect_state(obj, store, job_id, local, group_by_job): ) # if validate: - for record in sorted(state['records'], key=lambda r:r['mtime']): - print("session_id", record['session_id'], "job_id", record['job_id'], datetime.fromtimestamp(record['mtime'])) - for email in record.get('analysis_parameters', {}).get('email_history', []): - print(" - ", email) - + if not group_by_job: + for record in sorted(state['records'], key=lambda r:r['mtime']): + print("session_id", record['session_id'], "job_id", record['job_id'], datetime.fromtimestamp(record['mtime'])) + for email in record.get('analysis_parameters', {}).get('email_history', []): + print(" - ", email) + for matrix_message in record.get('analysis_parameters', {}).get('matrix_message_history', []): + print(" - ", matrix_message) + else: + for record in state['records']: + print("job_id", record['job_id']) + # TODO which information should be printed? def main(): cli(obj={}) From e390b78fdeef6b7656907de6556318148f7e8349 Mon Sep 17 00:00:00 2001 From: burnout87 Date: Wed, 10 Jan 2024 17:51:09 +0100 Subject: [PATCH 05/17] dedicated test --- tests/test_cli.py | 37 ++++++++++++++++++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/tests/test_cli.py b/tests/test_cli.py index be286c18..a51dc366 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -4,9 +4,44 @@ import pytest import os import jwt +import time from oda_api import cli -from tests.test_basic import default_token_payload, secret_key +from cdci_data_analysis.pytest_fixtures import DispatcherJobState +# from tests.test_basic import default_token_payload, secret_key + +secret_key = 'secretkey_test' +default_exp_time = int(time.time()) + 5000 +default_token_payload = dict( + sub="mtm@mtmco.net", + name="mmeharga", + roles="general", + exp=default_exp_time, + tem=0, + mstout=True, + mssub=True +) + +@pytest.mark.parametrize('group_by_job', [True, False]) +def test_inspect_state(dispatcher_live_fixture, caplog, monkeypatch, group_by_job): + token_payload = default_token_payload.copy() + token_payload['roles'] = ['general', 'job manager'] + encoded_token = jwt.encode(token_payload, secret_key, algorithm='HS256') + monkeypatch.setenv('ODA_TOKEN', encoded_token) + + DispatcherJobState.remove_scratch_folders() + + runner = CliRunner() + result = runner.invoke(cli.cli, ['-u', dispatcher_live_fixture, 'get', '-i', 'empty', '-p', 'dummy', '-a', 'product_type=Dummy'], obj={}) + assert result.exit_code == 0 + + runner = CliRunner() + args = ['-u', dispatcher_live_fixture, 'inspect'] + if group_by_job: + args.append('--group-by-job') + result = runner.invoke(cli.cli, args=args, obj={}) + + assert result.exit_code == 0 @pytest.mark.parametrize('token_placement', ['no', 'env', 'homedotfile', 'cwddotfile']) From 2736de1e815e1ca75390f1874bde090f9a2d10d2 Mon Sep 17 00:00:00 2001 From: burnout87 Date: Wed, 10 Jan 2024 19:10:46 +0100 Subject: [PATCH 06/17] no caplog for test_token_inspect --- tests/test_cli.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_cli.py b/tests/test_cli.py index a51dc366..a2b03d0c 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -23,7 +23,7 @@ ) @pytest.mark.parametrize('group_by_job', [True, False]) -def test_inspect_state(dispatcher_live_fixture, caplog, monkeypatch, group_by_job): +def test_inspect_state(dispatcher_live_fixture, monkeypatch, group_by_job): token_payload = default_token_payload.copy() token_payload['roles'] = ['general', 'job manager'] encoded_token = jwt.encode(token_payload, secret_key, algorithm='HS256') From d33f3ff225ce2085bb9da73a5b37204c155e18b8 Mon Sep 17 00:00:00 2001 From: burnout87 Date: Wed, 10 Jan 2024 20:00:16 +0100 Subject: [PATCH 07/17] setting caplog logging level --- tests/test_cli.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/test_cli.py b/tests/test_cli.py index a2b03d0c..6e38fe8d 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1,3 +1,4 @@ +import logging import re from typing import ChainMap from click.testing import CliRunner @@ -40,12 +41,13 @@ def test_inspect_state(dispatcher_live_fixture, monkeypatch, group_by_job): if group_by_job: args.append('--group-by-job') result = runner.invoke(cli.cli, args=args, obj={}) - + print(result) assert result.exit_code == 0 @pytest.mark.parametrize('token_placement', ['no', 'env', 'homedotfile', 'cwddotfile']) def test_token_inspect(token_placement, default_token, monkeypatch, caplog, tmpdir): + caplog.set_level(logging.DEBUG) # reset any existing token locations os.makedirs(tmpdir, exist_ok=True) monkeypatch.setenv('HOME', tmpdir) From e658a445a37ea8fa86fd2573ed664cf061d38d0a Mon Sep 17 00:00:00 2001 From: burnout87 Date: Wed, 10 Jan 2024 20:10:54 +0100 Subject: [PATCH 08/17] comment --- tests/test_cli.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_cli.py b/tests/test_cli.py index 6e38fe8d..1506969e 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -47,6 +47,7 @@ def test_inspect_state(dispatcher_live_fixture, monkeypatch, group_by_job): @pytest.mark.parametrize('token_placement', ['no', 'env', 'homedotfile', 'cwddotfile']) def test_token_inspect(token_placement, default_token, monkeypatch, caplog, tmpdir): + # this to make sure the level is sufficient to capture the DEBUG logs caplog.set_level(logging.DEBUG) # reset any existing token locations os.makedirs(tmpdir, exist_ok=True) From 50b5c0841575e8dc1a70c382ea49e935b2699534 Mon Sep 17 00:00:00 2001 From: burnout87 Date: Wed, 10 Jan 2024 20:18:03 +0100 Subject: [PATCH 09/17] setting logging level for test --- tests/test_cli.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/test_cli.py b/tests/test_cli.py index 1506969e..6d7165b6 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -41,14 +41,13 @@ def test_inspect_state(dispatcher_live_fixture, monkeypatch, group_by_job): if group_by_job: args.append('--group-by-job') result = runner.invoke(cli.cli, args=args, obj={}) - print(result) assert result.exit_code == 0 @pytest.mark.parametrize('token_placement', ['no', 'env', 'homedotfile', 'cwddotfile']) def test_token_inspect(token_placement, default_token, monkeypatch, caplog, tmpdir): # this to make sure the level is sufficient to capture the DEBUG logs - caplog.set_level(logging.DEBUG) + logging.getLogger('oda_api').setLevel("DEBUG") # reset any existing token locations os.makedirs(tmpdir, exist_ok=True) monkeypatch.setenv('HOME', tmpdir) From 3ab485c38b1804e6fb7648c840918906cbbe0144 Mon Sep 17 00:00:00 2001 From: burnout87 Date: Thu, 11 Jan 2024 09:24:59 +0100 Subject: [PATCH 10/17] using logger --- oda_api/cli.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/oda_api/cli.py b/oda_api/cli.py index 0153195a..07567c87 100644 --- a/oda_api/cli.py +++ b/oda_api/cli.py @@ -186,14 +186,14 @@ def inspect_state(obj, store, job_id, local, group_by_job): # if validate: if not group_by_job: for record in sorted(state['records'], key=lambda r:r['mtime']): - print("session_id", record['session_id'], "job_id", record['job_id'], datetime.fromtimestamp(record['mtime'])) + logger.info("session_id", record['session_id'], "job_id", record['job_id'], datetime.fromtimestamp(record['mtime'])) for email in record.get('analysis_parameters', {}).get('email_history', []): - print(" - ", email) + logger.info(" - ", email) for matrix_message in record.get('analysis_parameters', {}).get('matrix_message_history', []): - print(" - ", matrix_message) + logger.info(" - ", matrix_message) else: for record in state['records']: - print("job_id", record['job_id']) + logger.info("job_id", record['job_id']) # TODO which information should be printed? def main(): From c0aebf52f2045977ca20388757ccc86bd6bb819b Mon Sep 17 00:00:00 2001 From: burnout87 Date: Thu, 11 Jan 2024 09:31:41 +0100 Subject: [PATCH 11/17] using logger --- oda_api/cli.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/oda_api/cli.py b/oda_api/cli.py index 07567c87..b194e053 100644 --- a/oda_api/cli.py +++ b/oda_api/cli.py @@ -186,14 +186,14 @@ def inspect_state(obj, store, job_id, local, group_by_job): # if validate: if not group_by_job: for record in sorted(state['records'], key=lambda r:r['mtime']): - logger.info("session_id", record['session_id'], "job_id", record['job_id'], datetime.fromtimestamp(record['mtime'])) + logger.info(f"session_id: {record['session_id']}, job_id: {record['job_id']} - {datetime.fromtimestamp(record['mtime'])}") for email in record.get('analysis_parameters', {}).get('email_history', []): logger.info(" - ", email) for matrix_message in record.get('analysis_parameters', {}).get('matrix_message_history', []): - logger.info(" - ", matrix_message) + logger.info(f" - {matrix_message}") else: for record in state['records']: - logger.info("job_id", record['job_id']) + logger.info(f"job_id: {record['job_id']}") # TODO which information should be printed? def main(): From 9edd5f50251ab3c023dde4676a5fa8591750b7b5 Mon Sep 17 00:00:00 2001 From: burnout87 Date: Thu, 11 Jan 2024 09:49:52 +0100 Subject: [PATCH 12/17] using logger --- oda_api/cli.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/oda_api/cli.py b/oda_api/cli.py index b194e053..a515a871 100644 --- a/oda_api/cli.py +++ b/oda_api/cli.py @@ -188,7 +188,7 @@ def inspect_state(obj, store, job_id, local, group_by_job): for record in sorted(state['records'], key=lambda r:r['mtime']): logger.info(f"session_id: {record['session_id']}, job_id: {record['job_id']} - {datetime.fromtimestamp(record['mtime'])}") for email in record.get('analysis_parameters', {}).get('email_history', []): - logger.info(" - ", email) + logger.info(f" - {email}") for matrix_message in record.get('analysis_parameters', {}).get('matrix_message_history', []): logger.info(f" - {matrix_message}") else: From 0131de07a4050eeb2bb64314d4376d25100e2f26 Mon Sep 17 00:00:00 2001 From: burnout87 Date: Thu, 11 Jan 2024 10:56:09 +0100 Subject: [PATCH 13/17] logging session_id, extracting request's completion info --- oda_api/cli.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/oda_api/cli.py b/oda_api/cli.py index a515a871..96f694e0 100644 --- a/oda_api/cli.py +++ b/oda_api/cli.py @@ -195,6 +195,12 @@ def inspect_state(obj, store, job_id, local, group_by_job): for record in state['records']: logger.info(f"job_id: {record['job_id']}") # TODO which information should be printed? + for job_status_data in record['job_id']: + request_completed = job_status_data['request_completed'] + token_expired = job_status_data.get('token_expired', None) + scratch_dir_fn = job_status_data['scratch_dir_fn'] + session_id = scratch_dir_fn.split('_')[2] + logger.info(f"\tsession_id: {session_id}") def main(): cli(obj={}) From b0a1f05efbaf5d31199df9857651b8fc2023a08a Mon Sep 17 00:00:00 2001 From: burnout87 Date: Thu, 11 Jan 2024 11:12:18 +0100 Subject: [PATCH 14/17] typo --- oda_api/cli.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/oda_api/cli.py b/oda_api/cli.py index 96f694e0..51dca6af 100644 --- a/oda_api/cli.py +++ b/oda_api/cli.py @@ -195,7 +195,7 @@ def inspect_state(obj, store, job_id, local, group_by_job): for record in state['records']: logger.info(f"job_id: {record['job_id']}") # TODO which information should be printed? - for job_status_data in record['job_id']: + for job_status_data in record['job_status_data']: request_completed = job_status_data['request_completed'] token_expired = job_status_data.get('token_expired', None) scratch_dir_fn = job_status_data['scratch_dir_fn'] From 4d60419214055a936be492bf671fa331be2c59db Mon Sep 17 00:00:00 2001 From: burnout87 Date: Thu, 11 Jan 2024 12:33:03 +0100 Subject: [PATCH 15/17] checking request completion and logging --- oda_api/cli.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/oda_api/cli.py b/oda_api/cli.py index 51dca6af..0b88bb54 100644 --- a/oda_api/cli.py +++ b/oda_api/cli.py @@ -194,13 +194,20 @@ def inspect_state(obj, store, job_id, local, group_by_job): else: for record in state['records']: logger.info(f"job_id: {record['job_id']}") - # TODO which information should be printed? for job_status_data in record['job_status_data']: request_completed = job_status_data['request_completed'] token_expired = job_status_data.get('token_expired', None) scratch_dir_fn = job_status_data['scratch_dir_fn'] session_id = scratch_dir_fn.split('_')[2] - logger.info(f"\tsession_id: {session_id}") + msg = f'\tsession_id: {session_id}' + if not request_completed: + msg += ' - request did not complete' + if token_expired: + msg += ' because the token was expired' + else: + msg += ' - request completed successfully' + logger.info(msg) + def main(): cli(obj={}) From 650dea219d761220c1a1bc80e8fe8d991d066d49 Mon Sep 17 00:00:00 2001 From: burnout87 Date: Fri, 12 Jan 2024 16:37:09 +0100 Subject: [PATCH 16/17] logging token_expired no group_by_job --- oda_api/cli.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/oda_api/cli.py b/oda_api/cli.py index 0b88bb54..c0e74582 100644 --- a/oda_api/cli.py +++ b/oda_api/cli.py @@ -191,6 +191,15 @@ def inspect_state(obj, store, job_id, local, group_by_job): logger.info(f" - {email}") for matrix_message in record.get('analysis_parameters', {}).get('matrix_message_history', []): logger.info(f" - {matrix_message}") + request_completed = record['request_completed'] + token_expired = record.get('token_expired', None) + if not request_completed: + msg = '\tRequest did not complete' + if token_expired: + msg += ' because the token was expired' + else: + msg = '\tRequest completed successfully' + logger.info(msg) else: for record in state['records']: logger.info(f"job_id: {record['job_id']}") From 5de1a516d9e665c00453c5cc17244862823d4f5a Mon Sep 17 00:00:00 2001 From: burnout87 Date: Fri, 12 Jan 2024 16:38:23 +0100 Subject: [PATCH 17/17] files to ignore --- .gitignore | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 0dee4bd1..51906787 100644 --- a/.gitignore +++ b/.gitignore @@ -47,4 +47,7 @@ vlf_table.fits tests/.oda-token .ipynb_checkpoints -.venv \ No newline at end of file +.venv + +dispatcher-state.json +tests/dispatcher-state.json \ No newline at end of file