diff --git a/.gitignore b/.gitignore index 0dee4bd1..51906787 100644 --- a/.gitignore +++ b/.gitignore @@ -47,4 +47,7 @@ vlf_table.fits tests/.oda-token .ipynb_checkpoints -.venv \ No newline at end of file +.venv + +dispatcher-state.json +tests/dispatcher-state.json \ No newline at end of file diff --git a/oda_api/api.py b/oda_api/api.py index 4dc2dc2c..a668012b 100644 --- a/oda_api/api.py +++ b/oda_api/api.py @@ -337,8 +337,8 @@ def __init__(self, } } - def inspect_state(self, job_id=None): - params = dict(token=oda_api.token.discover_token()) + def inspect_state(self, job_id=None, group_by_job=False): + params = dict(token=oda_api.token.discover_token(), group_by_job=group_by_job) if job_id is not None: params['job_id'] = job_id diff --git a/oda_api/cli.py b/oda_api/cli.py index 3149836f..c0e74582 100644 --- a/oda_api/cli.py +++ b/oda_api/cli.py @@ -167,14 +167,15 @@ def mutate_token_payload(payload): @cli.command("inspect") @click.option("-s", "--store", default="dispatcher-state.json") @click.option("-j", "--job-id", default=None) +@click.option("--group-by-job", default=False, is_flag=True) @click.option("-l", "--local", default=False, is_flag=True) # @click.option("-V", "--validate", default=None) @click.pass_obj -def inspect_state(obj, store, job_id, local): +def inspect_state(obj, store, job_id, local, group_by_job): if local: state = json.load(open(store)) else: - state = obj['dispatcher'].inspect_state(job_id=job_id) + state = obj['dispatcher'].inspect_state(job_id=job_id, group_by_job=group_by_job) json.dump( state, open(store, "w"), @@ -183,10 +184,38 @@ def inspect_state(obj, store, job_id, local): ) # if validate: - for record in sorted(state['records'], key=lambda r:r['mtime']): - print("session_id", record['session_id'], "job_id", record['job_id'], datetime.fromtimestamp(record['mtime'])) - for email in record.get('analysis_parameters', {}).get('email_history', []): - print(" - ", email) + if not group_by_job: + for record in sorted(state['records'], key=lambda r:r['mtime']): + logger.info(f"session_id: {record['session_id']}, job_id: {record['job_id']} - {datetime.fromtimestamp(record['mtime'])}") + for email in record.get('analysis_parameters', {}).get('email_history', []): + logger.info(f" - {email}") + for matrix_message in record.get('analysis_parameters', {}).get('matrix_message_history', []): + logger.info(f" - {matrix_message}") + request_completed = record['request_completed'] + token_expired = record.get('token_expired', None) + if not request_completed: + msg = '\tRequest did not complete' + if token_expired: + msg += ' because the token was expired' + else: + msg = '\tRequest completed successfully' + logger.info(msg) + else: + for record in state['records']: + logger.info(f"job_id: {record['job_id']}") + for job_status_data in record['job_status_data']: + request_completed = job_status_data['request_completed'] + token_expired = job_status_data.get('token_expired', None) + scratch_dir_fn = job_status_data['scratch_dir_fn'] + session_id = scratch_dir_fn.split('_')[2] + msg = f'\tsession_id: {session_id}' + if not request_completed: + msg += ' - request did not complete' + if token_expired: + msg += ' because the token was expired' + else: + msg += ' - request completed successfully' + logger.info(msg) def main(): diff --git a/tests/test_cli.py b/tests/test_cli.py index be286c18..6d7165b6 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1,16 +1,53 @@ +import logging import re from typing import ChainMap from click.testing import CliRunner import pytest import os import jwt +import time from oda_api import cli -from tests.test_basic import default_token_payload, secret_key +from cdci_data_analysis.pytest_fixtures import DispatcherJobState +# from tests.test_basic import default_token_payload, secret_key + +secret_key = 'secretkey_test' +default_exp_time = int(time.time()) + 5000 +default_token_payload = dict( + sub="mtm@mtmco.net", + name="mmeharga", + roles="general", + exp=default_exp_time, + tem=0, + mstout=True, + mssub=True +) + +@pytest.mark.parametrize('group_by_job', [True, False]) +def test_inspect_state(dispatcher_live_fixture, monkeypatch, group_by_job): + token_payload = default_token_payload.copy() + token_payload['roles'] = ['general', 'job manager'] + encoded_token = jwt.encode(token_payload, secret_key, algorithm='HS256') + monkeypatch.setenv('ODA_TOKEN', encoded_token) + + DispatcherJobState.remove_scratch_folders() + + runner = CliRunner() + result = runner.invoke(cli.cli, ['-u', dispatcher_live_fixture, 'get', '-i', 'empty', '-p', 'dummy', '-a', 'product_type=Dummy'], obj={}) + assert result.exit_code == 0 + + runner = CliRunner() + args = ['-u', dispatcher_live_fixture, 'inspect'] + if group_by_job: + args.append('--group-by-job') + result = runner.invoke(cli.cli, args=args, obj={}) + assert result.exit_code == 0 @pytest.mark.parametrize('token_placement', ['no', 'env', 'homedotfile', 'cwddotfile']) def test_token_inspect(token_placement, default_token, monkeypatch, caplog, tmpdir): + # this to make sure the level is sufficient to capture the DEBUG logs + logging.getLogger('oda_api').setLevel("DEBUG") # reset any existing token locations os.makedirs(tmpdir, exist_ok=True) monkeypatch.setenv('HOME', tmpdir)