Skip to content

Django Channels Initializaiton #14

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dev/.env.docker-compose
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ DJANGO_MINIO_STORAGE_ACCESS_KEY=minioAccessKey
DJANGO_MINIO_STORAGE_SECRET_KEY=minioSecretKey
DJANGO_STORAGE_BUCKET_NAME=django-storage
DJANGO_MINIO_STORAGE_MEDIA_URL=http://localhost:9000/django-storage
REDIS_URL=redis://redis:6379
1 change: 1 addition & 0 deletions dev/.env.docker-compose-native
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ DJANGO_MINIO_STORAGE_ENDPOINT=localhost:9000
DJANGO_MINIO_STORAGE_ACCESS_KEY=minioAccessKey
DJANGO_MINIO_STORAGE_SECRET_KEY=minioSecretKey
DJANGO_STORAGE_BUCKET_NAME=django-storage
REDIS_URL=redis://localhost:6379
7 changes: 7 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ services:
- rabbitmq:/var/lib/rabbitmq/mnesia
logging:
driver: none
redis:
image: redis:latest
volumes:
- redis-data:/redis/data
ports:
- ${DOCKER_REDIS_PORT-6379}:6379

minio:
image: minio/minio:latest
Expand Down Expand Up @@ -48,3 +54,4 @@ volumes:
postgres:
minio:
rabbitmq:
redis-data:
2 changes: 2 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
install_requires=[
# Pinned August 2024
'celery==5.4.0',
'channels[daphne]==4.2.0',
'channels-redis==4.2.1',
'django==5.0.7',
'django-configurations[database,email]==2.5.1',
'django-extensions==3.2.3',
Expand Down
2 changes: 2 additions & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ passenv =
DJANGO_MINIO_STORAGE_ACCESS_KEY
DJANGO_MINIO_STORAGE_ENDPOINT
DJANGO_MINIO_STORAGE_SECRET_KEY
REDIS_URL
extras =
dev
deps =
Expand All @@ -66,6 +67,7 @@ passenv =
DJANGO_MINIO_STORAGE_ACCESS_KEY
DJANGO_MINIO_STORAGE_ENDPOINT
DJANGO_MINIO_STORAGE_SECRET_KEY
REDIS_URL
extras =
dev
commands =
Expand Down
19 changes: 17 additions & 2 deletions uvdat/asgi.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,26 @@
import os

from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
import configurations.importer
from django.core.asgi import get_asgi_application
from django.urls import path

from uvdat.core.notifications import AppConsumer

os.environ['DJANGO_SETTINGS_MODULE'] = 'uvdat.settings'
if not os.environ.get('DJANGO_CONFIGURATION'):
raise ValueError('The environment variable "DJANGO_CONFIGURATION" must be set.')
configurations.importer.install()

application = get_asgi_application()
application = ProtocolTypeRouter(
{
'http': get_asgi_application(),
'websocket': AuthMiddlewareStack(
URLRouter(
[
path('app/', AppConsumer.as_asgi(), name='app-ws'),
]
)
),
}
)
143 changes: 143 additions & 0 deletions uvdat/core/notifications.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
from abc import ABC
from contextlib import contextmanager
import dataclasses
from enum import Enum
import time
from typing import Any

from asgiref.sync import async_to_sync
from channels.generic.websocket import JsonWebsocketConsumer
from channels.layers import get_channel_layer


@dataclasses.dataclass
class ProgressMessage:
description: str | None = None
progress: float | None = None
status: str | None = None
custom: dict | None = None
type: str = 'task_tracker_progress'


class TaskStatus(Enum):
QUEUED = 'queued'
RUNNING = 'running'
SUCCEEDED = 'succeeded'
FAILED = 'failed'


class TaskTracker:
def __init__(self, state: Any, group_names: list[str], initial_description: str = ''):
# 'state' is an opaque value used to identify what is being tracked
self.state = state
self.groups = group_names
channel_layer = get_channel_layer()
self._sync_group_send = async_to_sync(channel_layer.group_send)
self._description = initial_description
self._progress = -1.0
self._status = TaskStatus.QUEUED
self._dirty = True
self._last_flush = 0

def send_message(self, payload: ProgressMessage):
for group in self.groups:
self._sync_group_send(
group,
{
'type': 'send_notification',
'message': {
**dataclasses.asdict(payload),
'state': self.state,
},
},
)

def flush(self, max_rate_seconds: float | None = None):
if self._dirty and (
not max_rate_seconds or time.time() - self._last_flush > max_rate_seconds
):
self._last_flush = time.time()
self.send_message(
ProgressMessage(
status=self._status.value,
progress=self._progress,
description=self._description,
)
)
self._dirty = False

@contextmanager
def running(self):
self.status = TaskStatus.RUNNING
self.flush()

try:
yield self
self.status = TaskStatus.SUCCEEDED
except Exception:
self.status = TaskStatus.FAILED
self.description = 'An error occurred'
raise
finally:
self.flush()
for group in self.groups:
self._sync_group_send(
group,
{
'type': 'close_connection',
'message': {},
},
)

@property
def description(self):
return self._description

@description.setter
def description(self, value: str):
self._dirty = True
self._description = value

@property
def progress(self):
return self._progress

@progress.setter
def progress(self, value: float):
self._dirty = True
self._progress = value

@property
def status(self):
return self._status

@status.setter
def status(self, value: TaskStatus):
self._dirty = True
self._status = value


class TaskTrackerConsumer(JsonWebsocketConsumer, ABC):
# This must be set by subclasses during connect()
group_name = None

def connect(self):
async_to_sync(self.channel_layer.group_add)(self.group_name, self.channel_name)
self.accept()

def disconnect(self, code):
async_to_sync(self.channel_layer.group_discard)(self.group_name, self.channel_name)

def send_notification(self, event):
self.send_json(content=event['message'])

def close_connection(self, event):
self.close(reason='task complete')


class AppConsumer(TaskTrackerConsumer):
# TODO probably rename this to something more descriptive
def connect(self):
self.user = self.scope['user']
self.group_name = f'app_{self.user.pk}'
super().connect()
17 changes: 17 additions & 0 deletions uvdat/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def split_env_str(s: str | None, sep: str) -> list[str]:


class UvdatMixin(ConfigMixin):
ASGI_APPLICATION = 'uvdat.asgi.application'
WSGI_APPLICATION = 'uvdat.wsgi.application'
ROOT_URLCONF = 'uvdat.urls'

Expand All @@ -31,10 +32,26 @@ class UvdatMixin(ConfigMixin):
ACCOUNT_EMAIL_VERIFICATION = 'none'
ACCOUNT_LOGOUT_REDIRECT_URL = '/accounts/login/'

CHANNEL_LAYERS = {
'default': {
# TODO: switch to channels_redis.pubsub.RedisPubSubChannelLayer when it's out of beta
'BACKEND': 'channels_redis.core.RedisChannelLayer',
'CONFIG': {
'hosts': [
{
'address': os.environ['REDIS_URL'],
'db': 1,
}
],
},
},
}

@staticmethod
def mutate_configuration(configuration: ComposedConfiguration) -> None:
# Install local apps first, to ensure any overridden resources are found first
configuration.INSTALLED_APPS = [
'daphne',
'django.contrib.gis',
'django_large_image',
'uvdat.core.apps.CoreConfig',
Expand Down
Loading