Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add example of how to connect to question status updates over a websocket or subscription #64

Open
thclark opened this issue Feb 6, 2024 · 0 comments
Labels
documentation Improvements or additions to documentation

Comments

@thclark
Copy link
Contributor

thclark commented Feb 6, 2024

Feature request

Use Case

We want to deliver updates in real time to end clients, where django applications have that ability (eg use channels or graphql subscriptions).

Current state

Previously, we've embedded in django-twined, consumers that allow such updates to happen baed on a class called ReelMessage and a consumer. However, this has proven very inflexible (requireing specific implementation) and fallen out of date - presently unused in any systems so we are removing it to allow for use with any kind of real time system.

Proposed Solution

It would be good to provide an example or documentation showing how to do this.

Working Example

In the meantime, here are some components that were used as a basis in the previous, inflexible system, that could form a basis but be warned these were written prior to the Service Usage Events architecture so you probably want to hook to the signals framework in order to issue updates here instead of using this code directly - expect a substantially different implementation - this is just for reference.

routing.py

from django.urls import path

from .consumers import AnalysisConsumer, ServiceConsumer


websocket_urlpatterns = [
    path(r"service/", ServiceConsumer.as_asgi()),
    path(r"analyses/", AnalysisConsumer.as_asgi()),
    path(r"analyses/<uuid:analysis_id>/", AnalysisConsumer.as_asgi()),
]

consumers/service.py

import logging
import os

from asgiref.sync import async_to_sync
from channels.generic.websocket import WebsocketConsumer
from django.conf import settings
from django_twined.messages import ReelMessage
from django_twined.tasks import ask


logger = logging.getLogger(__name__)


class ServiceConsumer(WebsocketConsumer):
    """A high-level consumer allowing subscription to service messages, resource messages and the current twine.

    Resource messages are sent only to a subset of clients (like members of a chat room).

        TODO ACTUALLY GET THIS OUT PROPERLY - Currently uses the default

    ~~ When in danger or in doubt, run in circles, scream and shout ~~

    """

    @property
    def service_name(self):
        """The service name for this connection"""
        # NB We define this as a property so it's an instance attribute, rather than a class attribute, without
        # having to overload __init__.
        return str(self.scope["url_route"]["kwargs"].get("service_name", None))

    @property
    def service_version(self):
        """The service version for this connection

        TODO ACTUALLY GET THIS OUT PROPERLY - Currently uses the default. Figure out a good way of listing the available
         services and versions. Possibly using a service table. Also need to consider the channel layer name which
         should include the version

        """
        # NB We define this as a property so it's an instance attribute, rather than a class attribute, without
        # having to overload __init__.
        return settings.SERVICES[self.service_name]["default_version"]

    @property
    def resource_name(self):
        """The resource group name for this connection, if any"""
        # NB We define this as a property so it's an instance attribute, rather than a class attribute, without
        # having to overload __init__.
        return str(self.scope["url_route"]["kwargs"].get("resource_name", None))

    def connect(self):
        """Accept connection to the service, and send twine to the client as acknowledgement"""

        # The service name should be given in the URL somewhere, if not, don't accept the connection
        if self.service_name is None:
            self.close()

        # Add this channel to the service group (unused at present)
        # async_to_sync(self.channel_layer.group_add)(self.service_name, self.channel_name)

        # Optionally add this channel to a particular resource group, if one is found in the URL kwargs
        if self.resource_name is not None:
            async_to_sync(self.channel_layer.group_add)(self.resource_name, self.channel_name)

        # Accept the connection
        self.accept()

        # Send the twine for this service to the connected client
        #   TODO this might be better done via a conventional view.
        #   TODO Check read-configuration permissions and make the configuration available
        self._send_twine()

    def disconnect(self, code):
        """Accept a disconnection from the service and associated groups gracefully"""

        # Disconnect from the service
        async_to_sync(self.channel_layer.group_discard)(self.service_name, self.channel_name)

        # Disconnect from the resource group, if a member
        if self.resource_name is not None:
            async_to_sync(self.channel_layer.group_discard)(self.resource_name, self.channel_name)

    def receive(self, text_data=None, bytes_data=None):
        """Receive service instructions from WebSocket"""

        # Parse the incoming message data
        try:
            message = ReelMessage(src=text_data)
        except Exception:
            ReelMessage(status="error", reference=message.reference, hints="Unable to understand your message").send(
                self
            )
            return

        if message.action == "ping":
            ReelMessage(
                action="pong",
                status="success",
                reference=message.reference,
                hints="U wanna play? I can pong your ping all day.",
            ).send(self)

        if message.action == "ask":

            # Forward the inbound message to the resource group
            if self.resource_name:
                message.group_send(self.resource_name)

            ask(self.service_name, self.service_version, message, self.resource_name or self.channel_name)

            # # Forward the inbount message to the resource group
            # if self.resource_group_name:
            #     ReelMessage(src=text_data, action="asked").group_send(self.resource_group_name)
            #
            #     # TODO more elegant send API for ReelMessage so I can just provide an iterable of group names
            #     #  or *Consumer instances
            #     message.group_send(self.resource_group_name)
            # else:
            #     message.send(self)
            #
            #     # TODO more elegant send API for ReelMessage so I can just provide an iterable of group names
            #     #  or *Consumer instances
            #     message.group_send(self.resource_group_name)
            # else:
            #     message.send(self)
            #

        else:
            ReelMessage(
                action=message.action,
                status="error",
                hint="Unknown action. Perhaps you meant to send the message to the analysis consumer?",
            ).send(self)

    def _send_twine(self):
        """Refresh the client with everything it needs to know about the twine and the analyses it has going on"""

        app_path = settings.TWINED_SERVICES[self.service_name][self.service_version]["path"]
        with open(os.path.join(app_path, "twine.json")) as fp:
            twine = fp.read()

        ReelMessage(action="connect", twine=twine).send(self)

consumers/analysis_log.py

import logging

from channels.generic.websocket import WebsocketConsumer
from django_twined.messages import ReelMessage


logger = logging.getLogger(__name__)


class AnalysisLogConsumer(WebsocketConsumer):
    """A consumer allowing logs for an analysis to be streamed in plain text directly from a python logger, then
    converted into ReelMessages which get logged to the analysis group.
    """

    @property
    def analysis_id(self):
        return str(self.scope["url_route"]["kwargs"].get("analysis_id"))

    @property
    def analysis_group_name(self):
        return f"analysis-{self.analysis_id}"

    def connect(self):
        """Accept connection to to enable log forwarding to the analysis group"""
        self.accept()

    def receive(self, text_data=None, bytes_data=None):
        """Receive log text, wrap it properly as part of the messaging system and forward it to the analysis group"""
        ReelMessage(action="log", value=text_data).group_send(self.analysis_group_name)

consumers/analysis.py

import logging
from uuid import uuid4

from asgiref.sync import async_to_sync
from channels.generic.websocket import WebsocketConsumer
from django.core.cache import cache
from django_twined.messages import ReelMessage
from django_twined.tasks import ask


logger = logging.getLogger(__name__)


class AnalysisConsumer(WebsocketConsumer):
    """A consumer allowing creation and monitoring of analyses


    ~~ When in danger or in doubt, run in circles, scream and shout ~~


    I want to: load a configuration at startup of the server.
    I want to: load a frontend which knows about my twine and configuration settings.
    I want to: connect my frontend to a backend.
    I want to: supply a backend with input data periodically.
            - I can write a quick python script to do this from local.
    I want to: use the frontend to show some graphic of the most recent input data, continuously updating.
    I want to: use the frontend to show an input form.
    I want to: use the frontend to show an output (disabled) form with results.

    """

    def connect(self):
        """Accept connection to the analysis, and forward any prior messages to the client"""
        self.group_id = str(self.scope["url_route"]["kwargs"].get("analysis_id", uuid4()))

        # Set the group name
        self.group_name = f"analysis-{self.group_id}"

        # Add this channel to the group
        async_to_sync(self.channel_layer.group_add)(self.group_name, self.channel_name)

        # Accept the connection
        self.accept()

        # Flush the cache of messages from this group so far
        for message in cache.get(self.group_name, default=list()):
            self.send(text_data=message)

    def disconnect(self, close_code):
        """Accept a disconnection from the analysis gracefully"""
        async_to_sync(self.channel_layer.group_discard)(self.group_name, self.channel_name)

    def receive(self, text_data=None, bytes_data=None):
        """Receive service instructions from WebSocket"""

        # Get the incoming message data
        try:
            message = ReelMessage(src=text_data)
        except Exception as e:
            logger.warning(f"Possible error (or malformed message): {e.args[0]}")
            ReelMessage(status="error", hints="Unable to understand your message").send(self)
            return

        # Attempt to respond
        try:
            # Run a new analysis
            if message.action == "ask":
                ask(self.group_id, message).send(self)

        except Exception as e:
            logger.error(e.args[0])
            ReelMessage(
                action=message.action,
                status="error",
            ).send(self)
            return

        # # Append changes to the cache for this room, allowing us to flush all changes through to new connection
        # # TODO Race condition exists here. Either rewrite automerge for python and keep a master record, or use
        # #  a locking strategy enabling us to ensure all changes are recorded or use a synchronous websocketconsumer
        # #  to properly persist the change history
        # #  https://pypi.org/project/python-redis-lock/
        # #  https://github.com/joanvila/aioredlock
        # changes = cache.get(self.room_name, default=list())
        # changes.append(text_data)
        # cache.set(self.room_name, changes)
        # data = {
        #     "type": "reel_message",
        #     "message": text_data,
        #     "message_bytes": bytes_data,
        # }
        # await self.channel_layer.group_send(self.room_group_name, data)

    def reel_message(self, event):
        """Send a message from the analysis to the client, serialising if necessary"""
        self.send(text_data=event["message"])

messages.py

import json
import logging
from time import time
from uuid import uuid4

import django.dispatch
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer


logger = logging.getLogger(__name__)

reel_message_sent = django.dispatch.Signal()

channel_layer = get_channel_layer()


MESSAGE_FIELDS = (
    "action",
    "children",
    "configuration_manifest",
    "configuration_values",
    "credentials",
    "data",
    "hints",
    "input_manifest",
    "input_values",
    "logs",
    "output_manifest",
    "output_values",
    "reference",
    "status",
    "timestamp",
    "twine",
    "uuid",
    "value",
)


class ReelMessage:
    """Class to manage incoming and outgoing messages"""

    def __init__(self, src=None, **kwargs):
        """Constructor for ServiceMessage"""
        if src is not None:
            kwargs = {
                **json.loads(src),
                **kwargs,
            }
            # TODO schema based validation with no extra fields allowed

        if "action" not in kwargs.keys():
            raise Exception("No action specified")

        kwargs["status"] = kwargs.pop("status", "success")

        # Check no uuid supplied and create one for this message
        if kwargs.get("uuid", None) is not None:
            raise ValueError("Cannot instantiate a reel message with a UUID. Each message must have a unique UUID.")
        kwargs["uuid"] = str(uuid4())

        # Add a server timestamp
        if kwargs.get("timestamp", None) is not None:
            raise ValueError(
                "Cannot instantiate a reel message with a timestamp. Each message must determine its timestamp."
            )
        kwargs["timestamp"] = time()

        self.__dict__ = dict((k, kwargs.get(k, None)) for k in MESSAGE_FIELDS)

    def serialise(self):
        """Serialise self to a string"""
        to_serialise = dict((k, getattr(self, k)) for k in MESSAGE_FIELDS if getattr(self, k) is not None)
        return json.dumps(to_serialise)

    def send(self, obj):
        """Send this message to an individual channel name
        Also sends a signal allowing 3rd party apps to execute callbacks for certain messages
        """
        serialised = self.serialise()
        logger.debug("Sending ReelMessage to channel '%s': %s", obj, serialised)
        obj.send(serialised)
        reel_message_sent.send(sender=self.__class__, message=self, to_channel=obj, to_group=None)

    def group_send(self, group_name, message_type="reel_message"):
        """Send this message to a group over channels
        Also sends a signal allowing 3rd party apps to execute callbacks for certain messages
        """
        serialised = self.serialise()
        logger.debug("Group sending ReelMessage: %s", serialised)
        async_to_sync(channel_layer.group_send)(group_name, {"type": message_type, "message": serialised})
        reel_message_sent.send(sender=self.__class__, message=self, to_channel=None, to_group=group_name)
@thclark thclark added the documentation Improvements or additions to documentation label Feb 6, 2024
@thclark thclark changed the title Add example of how to connecxt to question status updates over a websocket or subscription Add example of how to connect to question status updates over a websocket or subscription Feb 6, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
documentation Improvements or additions to documentation
Projects
Status: Priority 1 (Low)
Development

No branches or pull requests

1 participant