diff --git a/dev/.env.docker-compose b/dev/.env.docker-compose index 38c7d82..58c1a51 100644 --- a/dev/.env.docker-compose +++ b/dev/.env.docker-compose @@ -7,3 +7,4 @@ DJANGO_MINIO_STORAGE_ACCESS_KEY=minioAccessKey DJANGO_MINIO_STORAGE_SECRET_KEY=minioSecretKey DJANGO_STORAGE_BUCKET_NAME=django-storage DJANGO_MINIO_STORAGE_MEDIA_URL=http://localhost:9000/django-storage +REDIS_URL=redis://redis:6379 diff --git a/dev/.env.docker-compose-native b/dev/.env.docker-compose-native index 1b612c1..08ce1e0 100644 --- a/dev/.env.docker-compose-native +++ b/dev/.env.docker-compose-native @@ -6,3 +6,4 @@ DJANGO_MINIO_STORAGE_ENDPOINT=localhost:9000 DJANGO_MINIO_STORAGE_ACCESS_KEY=minioAccessKey DJANGO_MINIO_STORAGE_SECRET_KEY=minioSecretKey DJANGO_STORAGE_BUCKET_NAME=django-storage +REDIS_URL=redis://localhost:6379 diff --git a/docker-compose.yml b/docker-compose.yml index 250857f..86adc76 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -21,6 +21,12 @@ services: - rabbitmq:/var/lib/rabbitmq/mnesia logging: driver: none + redis: + image: redis:latest + volumes: + - redis-data:/redis/data + ports: + - ${DOCKER_REDIS_PORT-6379}:6379 minio: image: minio/minio:latest @@ -48,3 +54,4 @@ volumes: postgres: minio: rabbitmq: + redis-data: diff --git a/setup.py b/setup.py index 18e1e91..d61e60a 100644 --- a/setup.py +++ b/setup.py @@ -38,6 +38,8 @@ install_requires=[ # Pinned August 2024 'celery==5.4.0', + 'channels[daphne]==4.2.0', + 'channels-redis==4.2.1', 'django==5.0.7', 'django-configurations[database,email]==2.5.1', 'django-extensions==3.2.3', diff --git a/tox.ini b/tox.ini index 6214ad0..36c7579 100644 --- a/tox.ini +++ b/tox.ini @@ -45,6 +45,7 @@ passenv = DJANGO_MINIO_STORAGE_ACCESS_KEY DJANGO_MINIO_STORAGE_ENDPOINT DJANGO_MINIO_STORAGE_SECRET_KEY + REDIS_URL extras = dev deps = @@ -66,6 +67,7 @@ passenv = DJANGO_MINIO_STORAGE_ACCESS_KEY DJANGO_MINIO_STORAGE_ENDPOINT DJANGO_MINIO_STORAGE_SECRET_KEY + REDIS_URL extras = dev commands = diff --git a/uvdat/asgi.py b/uvdat/asgi.py index 6597b6c..17585b0 100644 --- a/uvdat/asgi.py +++ b/uvdat/asgi.py @@ -1,11 +1,26 @@ import os - +from channels.auth import AuthMiddlewareStack +from channels.routing import ProtocolTypeRouter, URLRouter import configurations.importer from django.core.asgi import get_asgi_application +from django.urls import path + +from uvdat.core.notifications import AppConsumer os.environ['DJANGO_SETTINGS_MODULE'] = 'uvdat.settings' if not os.environ.get('DJANGO_CONFIGURATION'): raise ValueError('The environment variable "DJANGO_CONFIGURATION" must be set.') configurations.importer.install() -application = get_asgi_application() +application = ProtocolTypeRouter( + { + 'http': get_asgi_application(), + 'websocket': AuthMiddlewareStack( + URLRouter( + [ + path('app/', AppConsumer.as_asgi(), name='app-ws'), + ] + ) + ), + } +) diff --git a/uvdat/core/notifications.py b/uvdat/core/notifications.py new file mode 100644 index 0000000..5664425 --- /dev/null +++ b/uvdat/core/notifications.py @@ -0,0 +1,143 @@ +from abc import ABC +from contextlib import contextmanager +import dataclasses +from enum import Enum +import time +from typing import Any + +from asgiref.sync import async_to_sync +from channels.generic.websocket import JsonWebsocketConsumer +from channels.layers import get_channel_layer + + +@dataclasses.dataclass +class ProgressMessage: + description: str | None = None + progress: float | None = None + status: str | None = None + custom: dict | None = None + type: str = 'task_tracker_progress' + + +class TaskStatus(Enum): + QUEUED = 'queued' + RUNNING = 'running' + SUCCEEDED = 'succeeded' + FAILED = 'failed' + + +class TaskTracker: + def __init__(self, state: Any, group_names: list[str], initial_description: str = ''): + # 'state' is an opaque value used to identify what is being tracked + self.state = state + self.groups = group_names + channel_layer = get_channel_layer() + self._sync_group_send = async_to_sync(channel_layer.group_send) + self._description = initial_description + self._progress = -1.0 + self._status = TaskStatus.QUEUED + self._dirty = True + self._last_flush = 0 + + def send_message(self, payload: ProgressMessage): + for group in self.groups: + self._sync_group_send( + group, + { + 'type': 'send_notification', + 'message': { + **dataclasses.asdict(payload), + 'state': self.state, + }, + }, + ) + + def flush(self, max_rate_seconds: float | None = None): + if self._dirty and ( + not max_rate_seconds or time.time() - self._last_flush > max_rate_seconds + ): + self._last_flush = time.time() + self.send_message( + ProgressMessage( + status=self._status.value, + progress=self._progress, + description=self._description, + ) + ) + self._dirty = False + + @contextmanager + def running(self): + self.status = TaskStatus.RUNNING + self.flush() + + try: + yield self + self.status = TaskStatus.SUCCEEDED + except Exception: + self.status = TaskStatus.FAILED + self.description = 'An error occurred' + raise + finally: + self.flush() + for group in self.groups: + self._sync_group_send( + group, + { + 'type': 'close_connection', + 'message': {}, + }, + ) + + @property + def description(self): + return self._description + + @description.setter + def description(self, value: str): + self._dirty = True + self._description = value + + @property + def progress(self): + return self._progress + + @progress.setter + def progress(self, value: float): + self._dirty = True + self._progress = value + + @property + def status(self): + return self._status + + @status.setter + def status(self, value: TaskStatus): + self._dirty = True + self._status = value + + +class TaskTrackerConsumer(JsonWebsocketConsumer, ABC): + # This must be set by subclasses during connect() + group_name = None + + def connect(self): + async_to_sync(self.channel_layer.group_add)(self.group_name, self.channel_name) + self.accept() + + def disconnect(self, code): + async_to_sync(self.channel_layer.group_discard)(self.group_name, self.channel_name) + + def send_notification(self, event): + self.send_json(content=event['message']) + + def close_connection(self, event): + self.close(reason='task complete') + + +class AppConsumer(TaskTrackerConsumer): + # TODO probably rename this to something more descriptive + def connect(self): + self.user = self.scope['user'] + self.group_name = f'app_{self.user.pk}' + super().connect() \ No newline at end of file diff --git a/uvdat/settings.py b/uvdat/settings.py index 758cd6e..675e9ab 100644 --- a/uvdat/settings.py +++ b/uvdat/settings.py @@ -23,6 +23,7 @@ def split_env_str(s: str | None, sep: str) -> list[str]: class UvdatMixin(ConfigMixin): + ASGI_APPLICATION = 'uvdat.asgi.application' WSGI_APPLICATION = 'uvdat.wsgi.application' ROOT_URLCONF = 'uvdat.urls' @@ -31,10 +32,26 @@ class UvdatMixin(ConfigMixin): ACCOUNT_EMAIL_VERIFICATION = 'none' ACCOUNT_LOGOUT_REDIRECT_URL = '/accounts/login/' + CHANNEL_LAYERS = { + 'default': { + # TODO: switch to channels_redis.pubsub.RedisPubSubChannelLayer when it's out of beta + 'BACKEND': 'channels_redis.core.RedisChannelLayer', + 'CONFIG': { + 'hosts': [ + { + 'address': os.environ['REDIS_URL'], + 'db': 1, + } + ], + }, + }, + } + @staticmethod def mutate_configuration(configuration: ComposedConfiguration) -> None: # Install local apps first, to ensure any overridden resources are found first configuration.INSTALLED_APPS = [ + 'daphne', 'django.contrib.gis', 'django_large_image', 'uvdat.core.apps.CoreConfig',