From a1a0584d25a84da9243fa3569103ba69f27c3d5a Mon Sep 17 00:00:00 2001 From: Ross Whitfield Date: Thu, 2 May 2024 15:34:33 +1000 Subject: [PATCH] Change default catalog data_ready to OnCat --- Dockerfile.catalog | 9 ----- docker-compose.yml | 14 -------- src/catalog/catalog_process.py | 34 ------------------ .../reporting/fixtures/db_init.json | 36 +++++++++++++++++-- .../workflow/database/report/models.py | 4 +-- src/workflow_app/workflow/settings.py | 2 +- .../workflow/tests/database_models_test.py | 20 +++++------ tests/test_PostProcessWorkflow.py | 10 +++--- tests/test_SubmitPostprocessing.py | 2 +- 9 files changed, 52 insertions(+), 79 deletions(-) delete mode 100644 Dockerfile.catalog delete mode 100644 src/catalog/catalog_process.py diff --git a/Dockerfile.catalog b/Dockerfile.catalog deleted file mode 100644 index d411521e..00000000 --- a/Dockerfile.catalog +++ /dev/null @@ -1,9 +0,0 @@ -FROM continuumio/miniconda3:4.12.0 - -COPY conda_environment.yml . -RUN conda env update --name base --file conda_environment.yml - -COPY src/catalog/catalog_process.py . - -ENTRYPOINT [ "python" ] -CMD ["catalog_process.py"] diff --git a/docker-compose.yml b/docker-compose.yml index 40640dea..07944063 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -158,20 +158,6 @@ services: webmon: condition: service_healthy - catalog_process: - restart: always - build: - context: . - dockerfile: Dockerfile.catalog - network: host - environment: - ICAT_USER: "icat" - ICAT_PASSCODE: "icat" - ACTIVE_MQ_PORTS: 61613 - depends_on: - activemq: - condition: service_healthy - autoheal: restart: always image: willfarrell/autoheal diff --git a/src/catalog/catalog_process.py b/src/catalog/catalog_process.py deleted file mode 100644 index 09014d6f..00000000 --- a/src/catalog/catalog_process.py +++ /dev/null @@ -1,34 +0,0 @@ -#!/usr/bin/env python -import os -import time -import stomp - - -class Listener(stomp.ConnectionListener): - def on_message(self, frame) -> None: - if frame.headers["destination"] == "/queue/CATALOG.DATA_READY": - conn.send("/queue/CATALOG.STARTED", frame.body) - time.sleep(0.1) - conn.send("/queue/CATALOG.COMPLETE", frame.body) - else: - conn.send("/queue/REDUCTION_CATALOG.STARTED", frame.body) - time.sleep(0.1) - conn.send("/queue/REDUCTION_CATALOG.COMPLETE", frame.body) - - -if __name__ == "__main__": - # Parse ENV vars - ICAT_USER = os.environ.get("ICAT_USER", default="icat") - ICAT_PASSCODE = os.environ.get("ICAT_PASSCODE", default="icat") - PORTS = os.environ.get("ACTIVE_MQ_PORTS", default=61613) - BROKERS = [(os.environ.get("ACTIVE_MQ_HOST", default="activemq"), int(PORTS))] - - # Start the connection - conn = stomp.Connection(host_and_ports=BROKERS) - conn.set_listener("", Listener()) - conn.connect(ICAT_USER, ICAT_PASSCODE, wait=True) - conn.subscribe(destination="/queue/CATALOG.DATA_READY", id=1, ack="auto") - conn.subscribe(destination="/queue/REDUCTION_CATALOG.DATA_READY", id=1, ack="auto") - - while conn.is_connected(): - time.sleep(1) diff --git a/src/webmon_app/reporting/fixtures/db_init.json b/src/webmon_app/reporting/fixtures/db_init.json index e1e50322..83931701 100644 --- a/src/webmon_app/reporting/fixtures/db_init.json +++ b/src/webmon_app/reporting/fixtures/db_init.json @@ -197,6 +197,38 @@ "is_workflow_input": true } }, +{ + "model": "report.statusqueue", + "pk": 24, + "fields": { + "name": "CATALOG.ONCAT.DATA_READY", + "is_workflow_input": false + } +}, +{ + "model": "report.statusqueue", + "pk": 25, + "fields": { + "name": "CATALOG.ONCAT.STARTED", + "is_workflow_input": true + } +}, +{ + "model": "report.statusqueue", + "pk": 26, + "fields": { + "name": "CATALOG.ONCAT.ERROR", + "is_workflow_input": true + } +}, +{ + "model": "report.statusqueue", + "pk": 27, + "fields": { + "name": "CATALOG.ONCAT.COMPLETE", + "is_workflow_input": true + } +}, { "model": "report.task", "pk": 1, @@ -216,11 +248,11 @@ "input_queue_id": 1, "task_class": " ", "task_queue_ids": [ - 4, + 24, 7 ], "success_queue_ids": [ - 6, + 27, 11 ] } diff --git a/src/workflow_app/workflow/database/report/models.py b/src/workflow_app/workflow/database/report/models.py index 36e27044..3cf5a164 100644 --- a/src/workflow_app/workflow/database/report/models.py +++ b/src/workflow_app/workflow/database/report/models.py @@ -378,9 +378,9 @@ def update(self): self.complete = False # Look for cataloging status - if len(RunStatus.objects.status(self.run_id, "CATALOG.COMPLETE")) > 0: + if len(RunStatus.objects.status(self.run_id, "CATALOG.ONCAT.COMPLETE")) > 0: self.cataloged = True - if len(RunStatus.objects.status(self.run_id, "CATALOG.STARTED")) > 0: + if len(RunStatus.objects.status(self.run_id, "CATALOG.ONCAT.STARTED")) > 0: self.catalog_started = True # Check whether we need reduction (default is no) diff --git a/src/workflow_app/workflow/settings.py b/src/workflow_app/workflow/settings.py index 6483c4e4..641d60f0 100644 --- a/src/workflow_app/workflow/settings.py +++ b/src/workflow_app/workflow/settings.py @@ -11,7 +11,7 @@ # Default queue names POSTPROCESS_INFO = "POSTPROCESS.INFO" POSTPROCESS_ERROR = "POSTPROCESS.ERROR" -CATALOG_DATA_READY = "CATALOG.DATA_READY" +CATALOG_DATA_READY = "CATALOG.ONCAT.DATA_READY" REDUCTION_DATA_READY = "REDUCTION.DATA_READY" REDUCTION_CATALOG_DATA_READY = "REDUCTION_CATALOG.DATA_READY" diff --git a/src/workflow_app/workflow/tests/database_models_test.py b/src/workflow_app/workflow/tests/database_models_test.py index c8e8a179..11ee583e 100644 --- a/src/workflow_app/workflow/tests/database_models_test.py +++ b/src/workflow_app/workflow/tests/database_models_test.py @@ -112,8 +112,8 @@ def test_update_catalog(self, runStatusObjectsMock, djangoModelsMock): def status_lookup(run_id, param_string): vals = { - "CATALOG.COMPLETE": 1, - "CATALOG.STARTED": 0, + "CATALOG.ONCAT.COMPLETE": 1, + "CATALOG.ONCAT.STARTED": 0, "REDUCTION.NOT_NEEDED": 0, "REDUCTION.DISABLED": 0, "REDUCTION.COMPLETE": 0, @@ -141,8 +141,8 @@ def test_update_reduction_need(self, runStatusObjectsMock, djangoModelsMock): def status_lookup(run_id, param_string): vals = { - "CATALOG.COMPLETE": 0, - "CATALOG.STARTED": 0, + "CATALOG.ONCAT.COMPLETE": 0, + "CATALOG.ONCAT.STARTED": 0, "REDUCTION.NOT_NEEDED": 1, "REDUCTION.DISABLED": 1, "REDUCTION.COMPLETE": 0, @@ -169,8 +169,8 @@ def test_update_reduction_status(self, runStatusObjectsMock, djangoModelsMock): def status_lookup(run_id, param_string): vals = { - "CATALOG.COMPLETE": 0, - "CATALOG.STARTED": 0, + "CATALOG.ONCAT.COMPLETE": 0, + "CATALOG.ONCAT.STARTED": 0, "REDUCTION.NOT_NEEDED": 0, "REDUCTION.DISABLED": 0, "REDUCTION.COMPLETE": 1, @@ -198,8 +198,8 @@ def test_update_reduction_catalog(self, runStatusObjectsMock, djangoModelsMock): def status_lookup(run_id, param_string): vals = { - "CATALOG.COMPLETE": 0, - "CATALOG.STARTED": 0, + "CATALOG.ONCAT.COMPLETE": 0, + "CATALOG.ONCAT.STARTED": 0, "REDUCTION.NOT_NEEDED": 0, "REDUCTION.DISABLED": 0, "REDUCTION.COMPLETE": 0, @@ -227,8 +227,8 @@ def test_update_overall(self, runStatusObjectsMock, djangoModelsMock): def status_lookup(run_id, param_string): vals = { - "CATALOG.COMPLETE": 1, - "CATALOG.STARTED": 0, + "CATALOG.ONCAT.COMPLETE": 1, + "CATALOG.ONCAT.STARTED": 0, "REDUCTION.NOT_NEEDED": 1, "REDUCTION.DISABLED": 0, "REDUCTION.COMPLETE": 0, diff --git a/tests/test_PostProcessWorkflow.py b/tests/test_PostProcessWorkflow.py index 38298ae5..66a257b3 100644 --- a/tests/test_PostProcessWorkflow.py +++ b/tests/test_PostProcessWorkflow.py @@ -1,7 +1,6 @@ import psycopg2 import requests import time -import pytest from django.conf import settings from dotenv import dotenv_values @@ -74,17 +73,16 @@ def get_status_errors(self, cursor, datarun_id): errors.append(result) return errors - @pytest.mark.skip("Skipping due to catalog start message not sending in CI") def test_catalog(self): cursor = self.__class__.conn.cursor() datarun_id = self.get_datarun_id(cursor, "arcs", "IPTS-27800", "214583") queue_id = self.get_queue_id(cursor, "CATALOG.REQUEST") - started_id = self.get_queue_id(cursor, "CATALOG.STARTED") - ready_id = self.get_queue_id(cursor, "CATALOG.DATA_READY") - complete_id = self.get_queue_id(cursor, "CATALOG.COMPLETE") - error_id = self.get_queue_id(cursor, "CATALOG.ERROR") + started_id = self.get_queue_id(cursor, "CATALOG.ONCAT.STARTED") + ready_id = self.get_queue_id(cursor, "CATALOG.ONCAT.DATA_READY") + complete_id = self.get_queue_id(cursor, "CATALOG.ONCAT.COMPLETE") + error_id = self.get_queue_id(cursor, "CATALOG.ONCAT.ERROR") # get current catalog status counts queues = [queue_id, started_id, ready_id, complete_id] diff --git a/tests/test_SubmitPostprocessing.py b/tests/test_SubmitPostprocessing.py index 458c02e3..a7a0dbde 100644 --- a/tests/test_SubmitPostprocessing.py +++ b/tests/test_SubmitPostprocessing.py @@ -123,7 +123,7 @@ def testPostProcessingSubmitMissingRunfile(self): self.send_request("POSTPROCESS.DATA_READY", RUN_NUMBER_MISSING, requestType="submit") new_status_count = self.get_status_count(RUN_NUMBER_MISSING) - assert new_status_count - status_count == 6 + assert new_status_count - status_count == 5 @pytest.mark.parametrize("run_number", [RUN_NUMBER_GOOD, RUN_NUMBER_MISSING]) def testPostProcessingFind(self, run_number):