Skip to content

Commit 17de271

Browse files
authored
Merge pull request #178 from neutrons/autoreducer_mem_limit_test
System test for post-processing agent memory monitoring
2 parents 88f13a1 + 0a99c11 commit 17de271

File tree

8 files changed

+250
-149
lines changed

8 files changed

+250
-149
lines changed

Dockerfile.autoreducer

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ FROM registry.access.redhat.com/ubi9/ubi
33
# install various dependencies
44
RUN dnf install -y https://dl.fedoraproject.org/pub/epel/epel-release-latest-9.noarch.rpm
55
RUN dnf updateinfo
6-
RUN dnf install -y procps-ng # pgrep is used for health check
6+
RUN dnf install -y procps-ng python3-numpy # pgrep is used for health check, numpy for high memory test
77

88
# Inspection tools
99
RUN dnf install -y vim
@@ -19,7 +19,7 @@ ARG CONFIG_FILE=tests/configuration/post_process_consumer.conf
1919
COPY ${CONFIG_FILE} /etc/autoreduce/post_processing.conf
2020

2121
# install postprocessing
22-
RUN dnf install -y https://github.com/neutrons/post_processing_agent/releases/download/v3.3/postprocessing-3.3.0-1.el9.noarch.rpm
22+
RUN dnf install -y https://github.com/neutrons/post_processing_agent/releases/download/v3.3.1/postprocessing-3.3.1-1.el9.noarch.rpm
2323

2424
# install the fake test data
2525
ARG DATA_TARBALL=/tmp/SNSdata.tar.gz

tests/configuration/post_process_consumer.conf

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,5 +27,7 @@
2727
],
2828
"publish_url_template": "https://172.16.238.222/plots/$instrument/$run_number/upload_plot_data/",
2929
"publisher_username": "workflow",
30-
"publisher_password": "workflow"
30+
"publisher_password": "workflow",
31+
"system_mem_limit_perc": 10,
32+
"mem_check_interval_sec": 0.1
3133
}

tests/conftest.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
# third-party imports
2+
from dotenv import dotenv_values
3+
import psycopg2
24
import pytest
35

46
# standard imports
57
import requests
8+
import time
69

710

811
@pytest.fixture(scope="session")
@@ -56,3 +59,20 @@ def __contains__(self, substring: str):
5659
return substring.replace(" ", "") in self.text.replace(" ", "")
5760

5861
return _HTTPText
62+
63+
64+
@pytest.fixture(scope="session")
65+
def db_connection():
66+
"""Database connection with config from env files"""
67+
config = {**dotenv_values(".env"), **dotenv_values(".env.ci")}
68+
assert config
69+
conn = psycopg2.connect(
70+
database=config["DATABASE_NAME"],
71+
user=config["DATABASE_USER"],
72+
password=config["DATABASE_PASS"],
73+
port=config["DATABASE_PORT"],
74+
host="localhost",
75+
)
76+
time.sleep(1)
77+
yield conn
78+
conn.close()

tests/data/PG3/IPTS-4321/nexus/PG3_54321.nxs.h5

Whitespace-only changes.
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
#!/usr/bin/env python
2+
import numpy as np
3+
import sys
4+
import time
5+
from datetime import datetime
6+
7+
print("Running reduction for " + sys.argv[1] + " at " + datetime.isoformat(datetime.now()))
8+
9+
# intentionally take up a lot of memory (used to test job memory monitoring)
10+
_ = np.random.rand(100000, 10000)
11+
time.sleep(5)

tests/test_autoreducer_high_memory.py

Lines changed: 28 additions & 146 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
"""This is to test that the reduction tasks go to the correct autoreducer node
22
depending on if it requires high memoery or not"""
33

4-
import psycopg2
5-
import requests
64
import time
7-
from dotenv import dotenv_values
5+
6+
import tests.utils.db as db_utils
87

98

109
class TestAutoreducerQueues:
@@ -15,111 +14,16 @@ class TestAutoreducerQueues:
1514
IPTS = "IPTS-1234"
1615
run_number = 12345
1716

18-
def setup_class(cls):
19-
config = {**dotenv_values(".env"), **dotenv_values(".env.ci")}
20-
assert config
21-
cls.conn = psycopg2.connect(
22-
database=config["DATABASE_NAME"],
23-
user=config["DATABASE_USER"],
24-
password=config["DATABASE_PASS"],
25-
port=config["DATABASE_PORT"],
26-
host="localhost",
27-
)
28-
time.sleep(1)
29-
30-
def teardown_class(cls):
31-
cls.conn.close()
32-
33-
def login(self, next, username, password):
34-
# taken from test_RunPageView.py - consolidate as helper somewhere?
35-
URL = "http://localhost/users/login?next="
36-
client = requests.session()
37-
38-
# Retrieve the CSRF token first
39-
client.get(URL) # sets the cookie
40-
csrftoken = client.cookies["csrftoken"]
41-
42-
login_data = dict(username=username, password=password, csrfmiddlewaretoken=csrftoken)
43-
return client.post(URL + next, data=login_data, timeout=None)
44-
45-
def create_test_data(self):
46-
"""create the instrument, ipts and datarun if they don't already exist
47-
48-
returns the id for the created rundata"""
49-
conn = TestAutoreducerQueues.conn
50-
cursor = conn.cursor()
51-
52-
cursor.execute("SELECT id FROM report_instrument where name = %s;", (self.instrument,))
53-
inst_id = cursor.fetchone()
54-
55-
if inst_id is None:
56-
cursor.execute("INSERT INTO report_instrument (name) VALUES (%s);", (self.instrument,))
57-
cursor.execute("SELECT id FROM report_instrument where name = %s;", (self.instrument,))
58-
inst_id = cursor.fetchone()
59-
conn.commit()
60-
61-
cursor.execute("SELECT id FROM report_ipts WHERE expt_name = %s;", (self.IPTS,))
62-
ipts_id = cursor.fetchone()
63-
if ipts_id is None:
64-
cursor.execute(
65-
"INSERT INTO report_ipts (expt_name, created_on) VALUES (%s, %s);",
66-
("IPTS-1234", "2020-05-20 13:02:52.281964;"),
67-
)
68-
cursor.execute("SELECT id FROM report_ipts WHERE expt_name = %s;", (self.IPTS,))
69-
ipts_id = cursor.fetchone()
70-
conn.commit()
71-
72-
cursor.execute(
73-
"SELECT id FROM report_datarun WHERE run_number = %s AND ipts_id_id = %s AND instrument_id_id = %s;",
74-
(self.run_number, ipts_id[0], inst_id[0]),
75-
)
76-
run_id = cursor.fetchone()
77-
if run_id is None:
78-
cursor.execute(
79-
"INSERT INTO report_datarun (run_number, ipts_id_id, instrument_id_id, file, created_on) "
80-
"VALUES (%s, %s, %s, %s, %s);",
81-
(
82-
self.run_number,
83-
ipts_id[0],
84-
inst_id[0],
85-
"/SNS/VULCAN/IPTS-1234/nexus/VULCAN_12345.nxs.h5",
86-
"2020-05-20 13:02:52.281964;",
87-
),
88-
)
89-
cursor.execute(
90-
"SELECT id FROM report_datarun WHERE run_number = %s AND ipts_id_id = %s AND instrument_id_id = %s;",
91-
(self.run_number, ipts_id[0], inst_id[0]),
92-
)
93-
run_id = cursor.fetchone()
94-
conn.commit()
95-
96-
return run_id
97-
98-
def get_status_queue_id(self, cursor, queue_name):
99-
"""return the if for the statusqueue for the provided name"""
100-
cursor.execute("SELECT id FROM report_statusqueue where name = %s;", (queue_name,))
101-
queue_id = cursor.fetchone()
102-
103-
if queue_id is None:
104-
cursor.execute(
105-
"INSERT INTO report_statusqueue (name, is_workflow_input) VALUES (%s, %s);", (queue_name, False)
106-
)
107-
cursor.execute("SELECT id FROM report_statusqueue where name = %s;", (queue_name,))
108-
queue_id = cursor.fetchone()
109-
110-
return queue_id[0]
111-
112-
def set_reduction_request_queue(self, queue_name):
17+
def set_reduction_request_queue(self, conn, queue_name):
11318
"""create the task to send REDUCTION.REQUEST to the provided queue"""
114-
conn = TestAutoreducerQueues.conn
11519
cursor = conn.cursor()
11620

11721
cursor.execute("SELECT id FROM report_instrument where name = %s;", (self.instrument,))
11822
inst_id = cursor.fetchone()[0]
11923

120-
queue_id = self.get_status_queue_id(cursor, queue_name)
121-
success_queue_id = self.get_status_queue_id(cursor, "REDUCTION.COMPLETE")
122-
reduction_request_queue_id = self.get_status_queue_id(cursor, "REDUCTION.REQUEST")
24+
queue_id = db_utils.get_status_queue_id(conn, queue_name)
25+
success_queue_id = db_utils.get_status_queue_id(conn, "REDUCTION.COMPLETE")
26+
reduction_request_queue_id = db_utils.get_status_queue_id(conn, "REDUCTION.REQUEST")
12327

12428
cursor.execute(
12529
"SELECT id FROM report_task where instrument_id_id = %s AND input_queue_id_id = %s;",
@@ -153,77 +57,55 @@ def set_reduction_request_queue(self, queue_name):
15357
)
15458
conn.commit()
15559

156-
def clear_previous_runstatus(self, run_id):
157-
"""remove all previous run statuses for the given run_id"""
158-
conn = TestAutoreducerQueues.conn
159-
cursor = conn.cursor()
160-
# delete all information entries for runstatus
161-
cursor.execute(
162-
"DELETE FROM report_information WHERE run_status_id_id IN (SELECT id FROM report_runstatus "
163-
"WHERE run_id_id = %s);",
164-
run_id,
165-
)
166-
cursor.execute("DELETE FROM report_runstatus WHERE run_id_id = %s;", run_id)
167-
conn.commit()
168-
169-
def get_autoreducer_hostname(self, run_id):
60+
def get_autoreducer_hostname(self, conn, run_id):
17061
"""return the hostname that executed the task that is stored in the report information"""
171-
conn = TestAutoreducerQueues.conn
17262
cursor = conn.cursor()
173-
queue_id = self.get_status_queue_id(cursor, "REDUCTION.STARTED")
63+
queue_id = db_utils.get_status_queue_id(conn, "REDUCTION.STARTED")
17464
cursor.execute("SELECT id FROM report_runstatus WHERE run_id_id = %s AND queue_id_id = %s", (run_id, queue_id))
17565
runstatus_id = cursor.fetchone()[0]
17666
cursor.execute("SELECT description FROM report_information WHERE run_status_id_id = %s", (runstatus_id,))
17767
description = cursor.fetchone()[0]
17868
return description
17969

180-
def check_run_status_exist(self, run_id, queue_name):
181-
"""return if the run status was created for the given run_id and queue_name"""
182-
conn = TestAutoreducerQueues.conn
183-
cursor = conn.cursor()
184-
queue_id = self.get_status_queue_id(cursor, queue_name)
185-
cursor.execute("SELECT * FROM report_runstatus WHERE run_id_id = %s AND queue_id_id = %s", (run_id, queue_id))
186-
return cursor.fetchone() is not None
187-
188-
def test_normal_reduction_queue(self):
70+
def test_normal_reduction_queue(self, db_connection, request_page):
18971
# switch to the REDUCTION.DATA_READY queue and check that the task goes to the correct node
190-
run_id = self.create_test_data()
191-
self.clear_previous_runstatus(run_id)
72+
run_id = db_utils.add_instrument_data_run(db_connection, self.instrument, self.IPTS, self.run_number)
73+
db_utils.clear_previous_runstatus(db_connection, run_id)
19274

193-
self.set_reduction_request_queue("REDUCTION.DATA_READY")
75+
self.set_reduction_request_queue(db_connection, "REDUCTION.DATA_READY")
19476

19577
# login and send reduction request
196-
response = self.login("/report/vulcan/12345/reduce/", self.user, self.pwd)
78+
response = request_page("/report/vulcan/12345/reduce/", self.user, self.pwd)
19779
assert response.status_code == 200
19880
assert response.url.endswith("/report/vulcan/12345/")
19981

20082
# wait for database to get updated
20183
time.sleep(1.0)
20284

203-
assert self.check_run_status_exist(run_id, "REDUCTION.REQUEST")
204-
assert self.check_run_status_exist(run_id, "REDUCTION.STARTED")
205-
assert self.check_run_status_exist(run_id, "REDUCTION.DATA_READY")
206-
assert not self.check_run_status_exist(run_id, "REDUCTION.HIMEM.DATA_READY")
85+
assert db_utils.check_run_status_exist(db_connection, run_id, "REDUCTION.REQUEST")
86+
assert db_utils.check_run_status_exist(db_connection, run_id, "REDUCTION.STARTED")
87+
assert db_utils.check_run_status_exist(db_connection, run_id, "REDUCTION.DATA_READY")
88+
assert not db_utils.check_run_status_exist(db_connection, run_id, "REDUCTION.HIMEM.DATA_READY")
20789

208-
assert self.get_autoreducer_hostname(run_id) == "autoreducer"
90+
assert self.get_autoreducer_hostname(db_connection, run_id) == "autoreducer"
20991

210-
def test_himem_reduction_queue(self):
92+
def test_himem_reduction_queue(self, db_connection, request_page):
21193
# switch to the REDUCTION.HIMEM.DATA_READY queue and check that the task goes to the correct node
212-
run_id = self.create_test_data()
213-
self.clear_previous_runstatus(run_id)
94+
run_id = db_utils.add_instrument_data_run(db_connection, self.instrument, self.IPTS, self.run_number)
95+
db_utils.clear_previous_runstatus(db_connection, run_id)
21496

215-
self.set_reduction_request_queue("REDUCTION.HIMEM.DATA_READY")
97+
self.set_reduction_request_queue(db_connection, "REDUCTION.HIMEM.DATA_READY")
21698
# login and send reduction request
217-
response = self.login("/report/vulcan/12345/reduce/", self.user, self.pwd)
99+
response = request_page("/report/vulcan/12345/reduce/", self.user, self.pwd)
218100
assert response.status_code == 200
219101
assert response.url.endswith("/report/vulcan/12345/")
220102

221103
# wait for database to get updated
222104
time.sleep(1.0)
223105

224-
assert self.check_run_status_exist(run_id, "REDUCTION.REQUEST")
225-
assert self.check_run_status_exist(run_id, "REDUCTION.STARTED")
226-
assert not self.check_run_status_exist(run_id, "REDUCTION.DATA_READY")
227-
assert self.check_run_status_exist(run_id, "REDUCTION.HIMEM.DATA_READY")
106+
assert db_utils.check_run_status_exist(db_connection, run_id, "REDUCTION.REQUEST")
107+
assert db_utils.check_run_status_exist(db_connection, run_id, "REDUCTION.STARTED")
108+
assert not db_utils.check_run_status_exist(db_connection, run_id, "REDUCTION.DATA_READY")
109+
assert db_utils.check_run_status_exist(db_connection, run_id, "REDUCTION.HIMEM.DATA_READY")
228110

229-
assert self.get_autoreducer_hostname(run_id) == "autoreducer.himem"
111+
assert self.get_autoreducer_hostname(db_connection, run_id) == "autoreducer.himem"
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
"""
2+
Test of the autoreducer memory management that sets a max limit
3+
on the memory used by reduction scripts.
4+
"""
5+
6+
import time
7+
import tests.utils.db as db_utils
8+
9+
10+
class TestAutoreducerMemoryLimit:
11+
user = "InstrumentScientist"
12+
pwd = "InstrumentScientist"
13+
instrument = "pg3"
14+
IPTS = "IPTS-4321"
15+
run_number = 54321
16+
17+
def test_reduction_script_exceeds_memory_limit(self, db_connection, request_page):
18+
"""test that the reduction is terminated and an error is logged"""
19+
run_id = db_utils.add_instrument_data_run(db_connection, self.instrument, self.IPTS, self.run_number)
20+
db_utils.clear_previous_runstatus(db_connection, run_id)
21+
22+
# login and send reduction request
23+
response = request_page("/report/pg3/54321/reduce/", self.user, self.pwd)
24+
assert response.status_code == 200
25+
assert response.url.endswith("/report/pg3/54321/")
26+
27+
# wait for reduction job to be terminated and database to be updated
28+
time.sleep(5.0)
29+
30+
assert db_utils.check_run_status_exist(db_connection, run_id, "REDUCTION.REQUEST")
31+
assert db_utils.check_run_status_exist(db_connection, run_id, "REDUCTION.STARTED")
32+
assert db_utils.check_run_status_exist(db_connection, run_id, "REDUCTION.DATA_READY")
33+
assert db_utils.check_run_status_exist(db_connection, run_id, "REDUCTION.ERROR")
34+
35+
assert db_utils.check_error_msg_contains(db_connection, run_id, "Total memory usage exceeded limit")

0 commit comments

Comments
 (0)