Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Artemis Data Collector and display queue lengths #203

Merged
merged 3 commits into from
Dec 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,26 @@ services:
webmon:
condition: service_healthy

artemis_data_collector:
restart: always
image: ghcr.io/neutrons/artemis_data_collector/artemis_data_collector:latest-prod
env_file:
- .env
environment:
INTERVAL: 60
ARTEMIS_URL: http://activemq:8161
ARTEMIS_USER: artemis
ARTEMIS_PASSWORD: artemis
ARTEMIS_BROKER_NAME: Artemis-Broker
QUEUE_LIST: "['REDUCTION.DATA_READY', 'REDUCTION.HIMEM.DATA_READY', 'CATALOG.ONCAT.DATA_READY', 'REDUCTION_CATALOG.DATA_READY']"
depends_on:
db:
condition: service_healthy
webmon:
condition: service_healthy
activemq:
condition: service_healthy

autoheal:
restart: always
image: willfarrell/autoheal
Expand Down
3 changes: 3 additions & 0 deletions src/webmon_app/reporting/dasmon/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,8 @@ def diagnostics(request, instrument):
wf_diag = view_util.workflow_diagnostics()
# Post-processing
red_diag = view_util.postprocessing_diagnostics()
# reduction queue size
red_queue_size = report_view_util.reduction_queue_sizes()

breadcrumbs = view_util.get_monitor_breadcrumbs(instrument_id, "diagnostics")
template_values = {
Expand Down Expand Up @@ -371,6 +373,7 @@ def diagnostics(request, instrument):

template_values["wf_diagnostics"] = wf_diag
template_values["post_diagnostics"] = red_diag
template_values["reduction_queue_size"] = red_queue_size
template_values["action_messages"] = actions

notices = []
Expand Down
7 changes: 7 additions & 0 deletions src/webmon_app/reporting/report/admin.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from reporting.report.models import (
DataRun,
StatusQueue,
StatusQueueMessageCount,
RunStatus,
WorkflowSummary,
IPTS,
Expand Down Expand Up @@ -93,6 +94,11 @@ class StatusQueueAdmin(admin.ModelAdmin):
list_filter = ("is_workflow_input",)


class StatusQueueMessageCountAdmin(admin.ModelAdmin):
list_display = ("id", "queue_id", "message_count", "created_on")
list_filter = ("queue_id",)


class WorkflowSummaryAdmin(admin.ModelAdmin):
readonly_fields = ("run_id",)
list_filter = (
Expand Down Expand Up @@ -151,6 +157,7 @@ class InstrumentStatusAdmin(admin.ModelAdmin):

admin.site.register(DataRun, DataRunAdmin)
admin.site.register(StatusQueue, StatusQueueAdmin)
admin.site.register(StatusQueueMessageCount, StatusQueueMessageCountAdmin)
admin.site.register(RunStatus, RunStatusAdmin)
admin.site.register(WorkflowSummary, WorkflowSummaryAdmin)
admin.site.register(IPTS, IPTSAdmin)
Expand Down
13 changes: 13 additions & 0 deletions src/webmon_app/reporting/report/view_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
StatusQueue,
Task,
WorkflowSummary,
StatusQueueMessageCount,
)
from django.urls import reverse
from django.http import HttpResponseServerError
Expand Down Expand Up @@ -629,3 +630,15 @@ def find_skipped_runs(instrument_id, start_run_number=0):
except: # noqa: E722
logging.exception("Error finding missing runs:")
return missing_runs


def reduction_queue_sizes():
"""
Get the size of the message queues
"""
queue_sizes = []

for q in StatusQueueMessageCount.objects.values_list("queue_id").distinct():
queue_sizes.append(StatusQueueMessageCount.objects.filter(queue_id=q[0]).latest("created_on").to_dict())

return queue_sizes
24 changes: 24 additions & 0 deletions src/webmon_app/reporting/templates/dasmon/diagnostics.html
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,30 @@
{% endif %}
</div>

{% if reduction_queue_size %}
<p>
<b>Reduction queue length:</b>
<div style="margin-left:20px">
<table>
<tbody>
<tr>
<th>Queue</th>
<th>Last updated</th>
<th>Count</th>
</tr>
{% for item in reduction_queue_size %}
<tr>
<td>{{ item.queue }}</td>
<td>{{ item.created_on }}</td>
<td>{{ item.message_count }}</td>
</tr>
{% endfor %}
</tbody>
</table>
<p>
</div>
{% endif %}

{% endblock %}

{% block nocontent %}{% endblock %}
21 changes: 21 additions & 0 deletions src/webmon_app/reporting/tests/test_report/test_view_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from reporting.report.models import StatusQueue
from reporting.report.models import Task
from reporting.report.models import WorkflowSummary
from reporting.report.models import StatusQueueMessageCount

import json

Expand Down Expand Up @@ -368,6 +369,26 @@ def test_find_skipped_runs(self):
missing_runs = find_skipped_runs(inst)
self.assertEqual(missing_runs[0], 5)

def test_get_status_queue_message_count(self):
from reporting.report.view_util import reduction_queue_sizes

sq1 = StatusQueue(name="TEST_QUEUE", is_workflow_input=True)
sq1.save()
sq2 = StatusQueue(name="TEST_QUEUE2", is_workflow_input=True)
sq2.save()

StatusQueueMessageCount(queue=sq1, message_count=10).save()
StatusQueueMessageCount(queue=sq1, message_count=11).save()
StatusQueueMessageCount(queue=sq1, message_count=12).save()
StatusQueueMessageCount(queue=sq2, message_count=42).save()

queue_size_list = reduction_queue_sizes()
self.assertEqual(len(queue_size_list), 2)
assert queue_size_list[0]["queue"] == "TEST_QUEUE"
assert queue_size_list[0]["message_count"] == 12
assert queue_size_list[1]["queue"] == "TEST_QUEUE2"
assert queue_size_list[1]["message_count"] == 42


if __name__ == "__main__":
pytest.main([__file__])
19 changes: 19 additions & 0 deletions src/workflow_app/workflow/database/report/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,25 @@ def __str__(self):
return self.name


class StatusQueueMessageCount(models.Model):
queue = models.ForeignKey(StatusQueue, on_delete=models.CASCADE)
message_count = models.IntegerField()
created_on = models.DateTimeField("Timestamp", auto_now_add=True)

class Meta:
app_label = "report"

def __str__(self):
return f"{self.queue}: {self.message_count} {self.created_on}"

def to_dict(self):
return {
"queue": str(self.queue),
"message_count": self.message_count,
"created_on": self.created_on,
}


class RunStatusManager(models.Manager):
def status(self, run_id, status_description):
"""
Expand Down
9 changes: 8 additions & 1 deletion tests/test_DASMONPageView.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def testVerifyDASMONPageView(self, dasmon_diagnostics):
tree = etree.parse(StringIO(dasmon_diagnostics.text), parser)
table_content = tree.xpath("//tr/td//text()")
# verify number of entries in the tables
expected_number_of_entries = 48
expected_number_of_entries = 57
assert len(table_content) == expected_number_of_entries
# -- DASMON diagnostics
status = table_content[1]
Expand Down Expand Up @@ -63,6 +63,13 @@ def testVerifyDASMONPageView(self, dasmon_diagnostics):
autoreducer_pid = table_content[21]
assert len(autoreducer) > 0
assert len(autoreducer_pid) > 0
# -- Reduction queue lengths
assert table_content[31] == "REDUCTION.DATA_READY"
assert table_content[33] == "0"
assert table_content[34] == "REDUCTION_CATALOG.DATA_READY"
assert table_content[36] == "0"
assert table_content[37] == "CATALOG.ONCAT.DATA_READY"
assert table_content[39] == "0"


if __name__ == "__main__":
Expand Down
Loading