Skip to content

Commit

Permalink
Merge pull request #232 from oda-hub/group_by_job-option-inspect_state
Browse files Browse the repository at this point in the history
group_by_job option inspect_state
  • Loading branch information
burnout87 committed Jan 18, 2024
2 parents d56ad3a + 5de1a51 commit b0b57ea
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 10 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,7 @@ vlf_table.fits
tests/.oda-token

.ipynb_checkpoints
.venv
.venv

dispatcher-state.json
tests/dispatcher-state.json
4 changes: 2 additions & 2 deletions oda_api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,8 @@ def __init__(self,
}
}

def inspect_state(self, job_id=None):
params = dict(token=oda_api.token.discover_token())
def inspect_state(self, job_id=None, group_by_job=False):
params = dict(token=oda_api.token.discover_token(), group_by_job=group_by_job)

if job_id is not None:
params['job_id'] = job_id
Expand Down
41 changes: 35 additions & 6 deletions oda_api/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,14 +167,15 @@ def mutate_token_payload(payload):
@cli.command("inspect")
@click.option("-s", "--store", default="dispatcher-state.json")
@click.option("-j", "--job-id", default=None)
@click.option("--group-by-job", default=False, is_flag=True)
@click.option("-l", "--local", default=False, is_flag=True)
# @click.option("-V", "--validate", default=None)
@click.pass_obj
def inspect_state(obj, store, job_id, local):
def inspect_state(obj, store, job_id, local, group_by_job):
if local:
state = json.load(open(store))
else:
state = obj['dispatcher'].inspect_state(job_id=job_id)
state = obj['dispatcher'].inspect_state(job_id=job_id, group_by_job=group_by_job)
json.dump(
state,
open(store, "w"),
Expand All @@ -183,10 +184,38 @@ def inspect_state(obj, store, job_id, local):
)

# if validate:
for record in sorted(state['records'], key=lambda r:r['mtime']):
print("session_id", record['session_id'], "job_id", record['job_id'], datetime.fromtimestamp(record['mtime']))
for email in record.get('analysis_parameters', {}).get('email_history', []):
print(" - ", email)
if not group_by_job:
for record in sorted(state['records'], key=lambda r:r['mtime']):
logger.info(f"session_id: {record['session_id']}, job_id: {record['job_id']} - {datetime.fromtimestamp(record['mtime'])}")
for email in record.get('analysis_parameters', {}).get('email_history', []):
logger.info(f" - {email}")
for matrix_message in record.get('analysis_parameters', {}).get('matrix_message_history', []):
logger.info(f" - {matrix_message}")
request_completed = record['request_completed']
token_expired = record.get('token_expired', None)
if not request_completed:
msg = '\tRequest did not complete'
if token_expired:
msg += ' because the token was expired'
else:
msg = '\tRequest completed successfully'
logger.info(msg)
else:
for record in state['records']:
logger.info(f"job_id: {record['job_id']}")
for job_status_data in record['job_status_data']:
request_completed = job_status_data['request_completed']
token_expired = job_status_data.get('token_expired', None)
scratch_dir_fn = job_status_data['scratch_dir_fn']
session_id = scratch_dir_fn.split('_')[2]
msg = f'\tsession_id: {session_id}'
if not request_completed:
msg += ' - request did not complete'
if token_expired:
msg += ' because the token was expired'
else:
msg += ' - request completed successfully'
logger.info(msg)


def main():
Expand Down
39 changes: 38 additions & 1 deletion tests/test_cli.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,53 @@
import logging
import re
from typing import ChainMap
from click.testing import CliRunner
import pytest
import os
import jwt
import time

from oda_api import cli
from tests.test_basic import default_token_payload, secret_key
from cdci_data_analysis.pytest_fixtures import DispatcherJobState
# from tests.test_basic import default_token_payload, secret_key

secret_key = 'secretkey_test'
default_exp_time = int(time.time()) + 5000
default_token_payload = dict(
sub="[email protected]",
name="mmeharga",
roles="general",
exp=default_exp_time,
tem=0,
mstout=True,
mssub=True
)

@pytest.mark.parametrize('group_by_job', [True, False])
def test_inspect_state(dispatcher_live_fixture, monkeypatch, group_by_job):
token_payload = default_token_payload.copy()
token_payload['roles'] = ['general', 'job manager']
encoded_token = jwt.encode(token_payload, secret_key, algorithm='HS256')
monkeypatch.setenv('ODA_TOKEN', encoded_token)

DispatcherJobState.remove_scratch_folders()

runner = CliRunner()
result = runner.invoke(cli.cli, ['-u', dispatcher_live_fixture, 'get', '-i', 'empty', '-p', 'dummy', '-a', 'product_type=Dummy'], obj={})
assert result.exit_code == 0

runner = CliRunner()
args = ['-u', dispatcher_live_fixture, 'inspect']
if group_by_job:
args.append('--group-by-job')
result = runner.invoke(cli.cli, args=args, obj={})
assert result.exit_code == 0


@pytest.mark.parametrize('token_placement', ['no', 'env', 'homedotfile', 'cwddotfile'])
def test_token_inspect(token_placement, default_token, monkeypatch, caplog, tmpdir):
# this to make sure the level is sufficient to capture the DEBUG logs
logging.getLogger('oda_api').setLevel("DEBUG")
# reset any existing token locations
os.makedirs(tmpdir, exist_ok=True)
monkeypatch.setenv('HOME', tmpdir)
Expand Down

0 comments on commit b0b57ea

Please sign in to comment.