Skip to content

Commit

Permalink
- move some test helper functions to new module db_utils
Browse files Browse the repository at this point in the history
- add system test for autoreducer memory monitoring
  • Loading branch information
backmari committed Sep 4, 2024
1 parent e98f787 commit 13603bb
Show file tree
Hide file tree
Showing 8 changed files with 250 additions and 149 deletions.
4 changes: 2 additions & 2 deletions Dockerfile.autoreducer
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ FROM registry.access.redhat.com/ubi9/ubi
# install various dependencies
RUN dnf install -y https://dl.fedoraproject.org/pub/epel/epel-release-latest-9.noarch.rpm
RUN dnf updateinfo
RUN dnf install -y procps-ng # pgrep is used for health check
RUN dnf install -y procps-ng numpy # pgrep is used for health check, numpy for high memory test

# Inspection tools
RUN dnf install -y vim
Expand All @@ -19,7 +19,7 @@ ARG CONFIG_FILE=tests/configuration/post_process_consumer.conf
COPY ${CONFIG_FILE} /etc/autoreduce/post_processing.conf

# install postprocessing
RUN dnf install -y https://github.com/neutrons/post_processing_agent/releases/download/v3.3/postprocessing-3.3.0-1.el9.noarch.rpm
RUN dnf install -y https://github.com/neutrons/post_processing_agent/releases/download/v3.3.1/postprocessing-3.3.1-1.el9.noarch.rpm

# install the fake test data
ARG DATA_TARBALL=/tmp/SNSdata.tar.gz
Expand Down
4 changes: 3 additions & 1 deletion tests/configuration/post_process_consumer.conf
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,7 @@
],
"publish_url_template": "https://172.16.238.222/plots/$instrument/$run_number/upload_plot_data/",
"publisher_username": "workflow",
"publisher_password": "workflow"
"publisher_password": "workflow",
"system_mem_limit_perc": 10,
"mem_check_interval_sec": 0.1
}
20 changes: 20 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
# third-party imports
from dotenv import dotenv_values
import psycopg2
import pytest

# standard imports
import requests
import time


@pytest.fixture(scope="session")
Expand Down Expand Up @@ -56,3 +59,20 @@ def __contains__(self, substring: str):
return substring.replace(" ", "") in self.text.replace(" ", "")

return _HTTPText


@pytest.fixture(scope="session")
def db_connection():
"""Database connection with config from env files"""
config = {**dotenv_values(".env"), **dotenv_values(".env.ci")}
assert config
conn = psycopg2.connect(
database=config["DATABASE_NAME"],
user=config["DATABASE_USER"],
password=config["DATABASE_PASS"],
port=config["DATABASE_PORT"],
host="localhost",
)
time.sleep(1)
yield conn
conn.close()
Empty file.
10 changes: 10 additions & 0 deletions tests/data/PG3/shared/autoreduce/reduce_PG3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#!/usr/bin/env python
import numpy as np
import sys
import time
from datetime import datetime

print("Running reduction for " + sys.argv[1] + " at " + datetime.isoformat(datetime.now()))

_ = np.random.rand(100000, 10000)
time.sleep(5)
174 changes: 28 additions & 146 deletions tests/test_autoreducer_high_memory.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
"""This is to test that the reduction tasks go to the correct autoreducer node
depending on if it requires high memoery or not"""

import psycopg2
import requests
import time
from dotenv import dotenv_values

from tests.utils import db_utils


class TestAutoreducerQueues:
Expand All @@ -15,111 +14,16 @@ class TestAutoreducerQueues:
IPTS = "IPTS-1234"
run_number = 12345

def setup_class(cls):
config = {**dotenv_values(".env"), **dotenv_values(".env.ci")}
assert config
cls.conn = psycopg2.connect(
database=config["DATABASE_NAME"],
user=config["DATABASE_USER"],
password=config["DATABASE_PASS"],
port=config["DATABASE_PORT"],
host="localhost",
)
time.sleep(1)

def teardown_class(cls):
cls.conn.close()

def login(self, next, username, password):
# taken from test_RunPageView.py - consolidate as helper somewhere?
URL = "http://localhost/users/login?next="
client = requests.session()

# Retrieve the CSRF token first
client.get(URL) # sets the cookie
csrftoken = client.cookies["csrftoken"]

login_data = dict(username=username, password=password, csrfmiddlewaretoken=csrftoken)
return client.post(URL + next, data=login_data, timeout=None)

def create_test_data(self):
"""create the instrument, ipts and datarun if they don't already exist
returns the id for the created rundata"""
conn = TestAutoreducerQueues.conn
cursor = conn.cursor()

cursor.execute("SELECT id FROM report_instrument where name = %s;", (self.instrument,))
inst_id = cursor.fetchone()

if inst_id is None:
cursor.execute("INSERT INTO report_instrument (name) VALUES (%s);", (self.instrument,))
cursor.execute("SELECT id FROM report_instrument where name = %s;", (self.instrument,))
inst_id = cursor.fetchone()
conn.commit()

cursor.execute("SELECT id FROM report_ipts WHERE expt_name = %s;", (self.IPTS,))
ipts_id = cursor.fetchone()
if ipts_id is None:
cursor.execute(
"INSERT INTO report_ipts (expt_name, created_on) VALUES (%s, %s);",
("IPTS-1234", "2020-05-20 13:02:52.281964;"),
)
cursor.execute("SELECT id FROM report_ipts WHERE expt_name = %s;", (self.IPTS,))
ipts_id = cursor.fetchone()
conn.commit()

cursor.execute(
"SELECT id FROM report_datarun WHERE run_number = %s AND ipts_id_id = %s AND instrument_id_id = %s;",
(self.run_number, ipts_id[0], inst_id[0]),
)
run_id = cursor.fetchone()
if run_id is None:
cursor.execute(
"INSERT INTO report_datarun (run_number, ipts_id_id, instrument_id_id, file, created_on) "
"VALUES (%s, %s, %s, %s, %s);",
(
self.run_number,
ipts_id[0],
inst_id[0],
"/SNS/VULCAN/IPTS-1234/nexus/VULCAN_12345.nxs.h5",
"2020-05-20 13:02:52.281964;",
),
)
cursor.execute(
"SELECT id FROM report_datarun WHERE run_number = %s AND ipts_id_id = %s AND instrument_id_id = %s;",
(self.run_number, ipts_id[0], inst_id[0]),
)
run_id = cursor.fetchone()
conn.commit()

return run_id

def get_status_queue_id(self, cursor, queue_name):
"""return the if for the statusqueue for the provided name"""
cursor.execute("SELECT id FROM report_statusqueue where name = %s;", (queue_name,))
queue_id = cursor.fetchone()

if queue_id is None:
cursor.execute(
"INSERT INTO report_statusqueue (name, is_workflow_input) VALUES (%s, %s);", (queue_name, False)
)
cursor.execute("SELECT id FROM report_statusqueue where name = %s;", (queue_name,))
queue_id = cursor.fetchone()

return queue_id[0]

def set_reduction_request_queue(self, queue_name):
def set_reduction_request_queue(self, conn, queue_name):
"""create the task to send REDUCTION.REQUEST to the provided queue"""
conn = TestAutoreducerQueues.conn
cursor = conn.cursor()

cursor.execute("SELECT id FROM report_instrument where name = %s;", (self.instrument,))
inst_id = cursor.fetchone()[0]

queue_id = self.get_status_queue_id(cursor, queue_name)
success_queue_id = self.get_status_queue_id(cursor, "REDUCTION.COMPLETE")
reduction_request_queue_id = self.get_status_queue_id(cursor, "REDUCTION.REQUEST")
queue_id = db_utils.get_status_queue_id(conn, queue_name)
success_queue_id = db_utils.get_status_queue_id(conn, "REDUCTION.COMPLETE")
reduction_request_queue_id = db_utils.get_status_queue_id(conn, "REDUCTION.REQUEST")

cursor.execute(
"SELECT id FROM report_task where instrument_id_id = %s AND input_queue_id_id = %s;",
Expand Down Expand Up @@ -153,77 +57,55 @@ def set_reduction_request_queue(self, queue_name):
)
conn.commit()

def clear_previous_runstatus(self, run_id):
"""remove all previous run statuses for the given run_id"""
conn = TestAutoreducerQueues.conn
cursor = conn.cursor()
# delete all information entries for runstatus
cursor.execute(
"DELETE FROM report_information WHERE run_status_id_id IN (SELECT id FROM report_runstatus "
"WHERE run_id_id = %s);",
run_id,
)
cursor.execute("DELETE FROM report_runstatus WHERE run_id_id = %s;", run_id)
conn.commit()

def get_autoreducer_hostname(self, run_id):
def get_autoreducer_hostname(self, conn, run_id):
"""return the hostname that executed the task that is stored in the report information"""
conn = TestAutoreducerQueues.conn
cursor = conn.cursor()
queue_id = self.get_status_queue_id(cursor, "REDUCTION.STARTED")
queue_id = db_utils.get_status_queue_id(conn, "REDUCTION.STARTED")
cursor.execute("SELECT id FROM report_runstatus WHERE run_id_id = %s AND queue_id_id = %s", (run_id, queue_id))
runstatus_id = cursor.fetchone()[0]
cursor.execute("SELECT description FROM report_information WHERE run_status_id_id = %s", (runstatus_id,))
description = cursor.fetchone()[0]
return description

def check_run_status_exist(self, run_id, queue_name):
"""return if the run status was created for the given run_id and queue_name"""
conn = TestAutoreducerQueues.conn
cursor = conn.cursor()
queue_id = self.get_status_queue_id(cursor, queue_name)
cursor.execute("SELECT * FROM report_runstatus WHERE run_id_id = %s AND queue_id_id = %s", (run_id, queue_id))
return cursor.fetchone() is not None

def test_normal_reduction_queue(self):
def test_normal_reduction_queue(self, db_connection, request_page):
# switch to the REDUCTION.DATA_READY queue and check that the task goes to the correct node
run_id = self.create_test_data()
self.clear_previous_runstatus(run_id)
run_id = db_utils.add_instrument_data_run(db_connection, self.instrument, self.IPTS, self.run_number)
db_utils.clear_previous_runstatus(db_connection, run_id)

self.set_reduction_request_queue("REDUCTION.DATA_READY")
self.set_reduction_request_queue(db_connection, "REDUCTION.DATA_READY")

# login and send reduction request
response = self.login("/report/vulcan/12345/reduce/", self.user, self.pwd)
response = request_page("/report/vulcan/12345/reduce/", self.user, self.pwd)
assert response.status_code == 200
assert response.url.endswith("/report/vulcan/12345/")

# wait for database to get updated
time.sleep(1.0)

assert self.check_run_status_exist(run_id, "REDUCTION.REQUEST")
assert self.check_run_status_exist(run_id, "REDUCTION.STARTED")
assert self.check_run_status_exist(run_id, "REDUCTION.DATA_READY")
assert not self.check_run_status_exist(run_id, "REDUCTION.HIMEM.DATA_READY")
assert db_utils.check_run_status_exist(db_connection, run_id, "REDUCTION.REQUEST")
assert db_utils.check_run_status_exist(db_connection, run_id, "REDUCTION.STARTED")
assert db_utils.check_run_status_exist(db_connection, run_id, "REDUCTION.DATA_READY")
assert not db_utils.check_run_status_exist(db_connection, run_id, "REDUCTION.HIMEM.DATA_READY")

assert self.get_autoreducer_hostname(run_id) == "autoreducer"
assert self.get_autoreducer_hostname(db_connection, run_id) == "autoreducer"

def test_himem_reduction_queue(self):
def test_himem_reduction_queue(self, db_connection, request_page):
# switch to the REDUCTION.HIMEM.DATA_READY queue and check that the task goes to the correct node
run_id = self.create_test_data()
self.clear_previous_runstatus(run_id)
run_id = db_utils.add_instrument_data_run(db_connection, self.instrument, self.IPTS, self.run_number)
db_utils.clear_previous_runstatus(db_connection, run_id)

self.set_reduction_request_queue("REDUCTION.HIMEM.DATA_READY")
self.set_reduction_request_queue(db_connection, "REDUCTION.HIMEM.DATA_READY")
# login and send reduction request
response = self.login("/report/vulcan/12345/reduce/", self.user, self.pwd)
response = request_page("/report/vulcan/12345/reduce/", self.user, self.pwd)
assert response.status_code == 200
assert response.url.endswith("/report/vulcan/12345/")

# wait for database to get updated
time.sleep(1.0)

assert self.check_run_status_exist(run_id, "REDUCTION.REQUEST")
assert self.check_run_status_exist(run_id, "REDUCTION.STARTED")
assert not self.check_run_status_exist(run_id, "REDUCTION.DATA_READY")
assert self.check_run_status_exist(run_id, "REDUCTION.HIMEM.DATA_READY")
assert db_utils.check_run_status_exist(db_connection, run_id, "REDUCTION.REQUEST")
assert db_utils.check_run_status_exist(db_connection, run_id, "REDUCTION.STARTED")
assert not db_utils.check_run_status_exist(db_connection, run_id, "REDUCTION.DATA_READY")
assert db_utils.check_run_status_exist(db_connection, run_id, "REDUCTION.HIMEM.DATA_READY")

assert self.get_autoreducer_hostname(run_id) == "autoreducer.himem"
assert self.get_autoreducer_hostname(db_connection, run_id) == "autoreducer.himem"
36 changes: 36 additions & 0 deletions tests/test_autoreducer_memory_limit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
"""
Test of the autoreducer memory management that sets a max limit
on the memory used by reduction scripts.
"""

import time

from tests.utils import db_utils


class TestAutoreducerMemoryLimit:
user = "InstrumentScientist"
pwd = "InstrumentScientist"
instrument = "pg3"
IPTS = "IPTS-4321"
run_number = 54321

def test_reduction_script_exceeds_memory_limit(self, db_connection, request_page):
"""test that the reduction is terminated and an error is logged"""
run_id = db_utils.add_instrument_data_run(db_connection, self.instrument, self.IPTS, self.run_number)
db_utils.clear_previous_runstatus(db_connection, run_id)

# login and send reduction request
response = request_page("/report/pg3/54321/reduce/", self.user, self.pwd)
assert response.status_code == 200
assert response.url.endswith("/report/pg3/54321/")

# wait for reduction job to be terminated and database to be updated
time.sleep(5.0)

assert db_utils.check_run_status_exist(db_connection, run_id, "REDUCTION.REQUEST")
assert db_utils.check_run_status_exist(db_connection, run_id, "REDUCTION.STARTED")
assert db_utils.check_run_status_exist(db_connection, run_id, "REDUCTION.DATA_READY")
assert db_utils.check_run_status_exist(db_connection, run_id, "REDUCTION.ERROR")

assert db_utils.check_error_msg_contains(db_connection, run_id, "Total memory usage exceeded limit")
Loading

0 comments on commit 13603bb

Please sign in to comment.