Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

group_by_job option inspect_state #232

Merged
merged 19 commits into from
Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,7 @@ vlf_table.fits
tests/.oda-token

.ipynb_checkpoints
.venv
.venv

dispatcher-state.json
tests/dispatcher-state.json
4 changes: 2 additions & 2 deletions oda_api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,8 @@ def __init__(self,
}
}

def inspect_state(self, job_id=None):
params = dict(token=oda_api.token.discover_token())
def inspect_state(self, job_id=None, group_by_job=False):
params = dict(token=oda_api.token.discover_token(), group_by_job=group_by_job)

if job_id is not None:
params['job_id'] = job_id
Expand Down
41 changes: 35 additions & 6 deletions oda_api/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,14 +167,15 @@
@cli.command("inspect")
@click.option("-s", "--store", default="dispatcher-state.json")
@click.option("-j", "--job-id", default=None)
@click.option("--group-by-job", default=False, is_flag=True)
@click.option("-l", "--local", default=False, is_flag=True)
# @click.option("-V", "--validate", default=None)
@click.pass_obj
def inspect_state(obj, store, job_id, local):
def inspect_state(obj, store, job_id, local, group_by_job):
if local:
state = json.load(open(store))
else:
state = obj['dispatcher'].inspect_state(job_id=job_id)
state = obj['dispatcher'].inspect_state(job_id=job_id, group_by_job=group_by_job)
json.dump(
state,
open(store, "w"),
Expand All @@ -183,10 +184,38 @@
)

# if validate:
for record in sorted(state['records'], key=lambda r:r['mtime']):
print("session_id", record['session_id'], "job_id", record['job_id'], datetime.fromtimestamp(record['mtime']))
for email in record.get('analysis_parameters', {}).get('email_history', []):
print(" - ", email)
if not group_by_job:
for record in sorted(state['records'], key=lambda r:r['mtime']):
logger.info(f"session_id: {record['session_id']}, job_id: {record['job_id']} - {datetime.fromtimestamp(record['mtime'])}")
for email in record.get('analysis_parameters', {}).get('email_history', []):
logger.info(f" - {email}")

Check warning on line 191 in oda_api/cli.py

View check run for this annotation

Codecov / codecov/patch

oda_api/cli.py#L191

Added line #L191 was not covered by tests
for matrix_message in record.get('analysis_parameters', {}).get('matrix_message_history', []):
logger.info(f" - {matrix_message}")

Check warning on line 193 in oda_api/cli.py

View check run for this annotation

Codecov / codecov/patch

oda_api/cli.py#L193

Added line #L193 was not covered by tests
request_completed = record['request_completed']
token_expired = record.get('token_expired', None)
if not request_completed:
msg = '\tRequest did not complete'
if token_expired:
msg += ' because the token was expired'

Check warning on line 199 in oda_api/cli.py

View check run for this annotation

Codecov / codecov/patch

oda_api/cli.py#L197-L199

Added lines #L197 - L199 were not covered by tests
else:
msg = '\tRequest completed successfully'
logger.info(msg)
else:
for record in state['records']:
logger.info(f"job_id: {record['job_id']}")
for job_status_data in record['job_status_data']:
request_completed = job_status_data['request_completed']
token_expired = job_status_data.get('token_expired', None)
scratch_dir_fn = job_status_data['scratch_dir_fn']
session_id = scratch_dir_fn.split('_')[2]
msg = f'\tsession_id: {session_id}'
if not request_completed:
msg += ' - request did not complete'
if token_expired:
msg += ' because the token was expired'

Check warning on line 215 in oda_api/cli.py

View check run for this annotation

Codecov / codecov/patch

oda_api/cli.py#L213-L215

Added lines #L213 - L215 were not covered by tests
else:
msg += ' - request completed successfully'
logger.info(msg)


def main():
Expand Down
39 changes: 38 additions & 1 deletion tests/test_cli.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,53 @@
import logging
import re
from typing import ChainMap
from click.testing import CliRunner
import pytest
import os
import jwt
import time

from oda_api import cli
from tests.test_basic import default_token_payload, secret_key
from cdci_data_analysis.pytest_fixtures import DispatcherJobState
# from tests.test_basic import default_token_payload, secret_key

secret_key = 'secretkey_test'
default_exp_time = int(time.time()) + 5000
default_token_payload = dict(
sub="[email protected]",
name="mmeharga",
roles="general",
exp=default_exp_time,
tem=0,
mstout=True,
mssub=True
)

@pytest.mark.parametrize('group_by_job', [True, False])
def test_inspect_state(dispatcher_live_fixture, monkeypatch, group_by_job):
token_payload = default_token_payload.copy()
token_payload['roles'] = ['general', 'job manager']
encoded_token = jwt.encode(token_payload, secret_key, algorithm='HS256')
monkeypatch.setenv('ODA_TOKEN', encoded_token)

DispatcherJobState.remove_scratch_folders()

runner = CliRunner()
result = runner.invoke(cli.cli, ['-u', dispatcher_live_fixture, 'get', '-i', 'empty', '-p', 'dummy', '-a', 'product_type=Dummy'], obj={})
assert result.exit_code == 0

runner = CliRunner()
args = ['-u', dispatcher_live_fixture, 'inspect']
if group_by_job:
args.append('--group-by-job')
result = runner.invoke(cli.cli, args=args, obj={})
assert result.exit_code == 0


@pytest.mark.parametrize('token_placement', ['no', 'env', 'homedotfile', 'cwddotfile'])
def test_token_inspect(token_placement, default_token, monkeypatch, caplog, tmpdir):
# this to make sure the level is sufficient to capture the DEBUG logs
logging.getLogger('oda_api').setLevel("DEBUG")
# reset any existing token locations
os.makedirs(tmpdir, exist_ok=True)
monkeypatch.setenv('HOME', tmpdir)
Expand Down
Loading