You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
We want to deliver updates in real time to end clients, where django applications have that ability (eg use channels or graphql subscriptions).
Current state
Previously, we've embedded in django-twined, consumers that allow such updates to happen baed on a class called ReelMessage and a consumer. However, this has proven very inflexible (requireing specific implementation) and fallen out of date - presently unused in any systems so we are removing it to allow for use with any kind of real time system.
Proposed Solution
It would be good to provide an example or documentation showing how to do this.
Working Example
In the meantime, here are some components that were used as a basis in the previous, inflexible system, that could form a basis but be warned these were written prior to the Service Usage Events architecture so you probably want to hook to the signals framework in order to issue updates here instead of using this code directly - expect a substantially different implementation - this is just for reference.
importloggingimportosfromasgiref.syncimportasync_to_syncfromchannels.generic.websocketimportWebsocketConsumerfromdjango.confimportsettingsfromdjango_twined.messagesimportReelMessagefromdjango_twined.tasksimportasklogger=logging.getLogger(__name__)
classServiceConsumer(WebsocketConsumer):
"""A high-level consumer allowing subscription to service messages, resource messages and the current twine. Resource messages are sent only to a subset of clients (like members of a chat room). TODO ACTUALLY GET THIS OUT PROPERLY - Currently uses the default ~~ When in danger or in doubt, run in circles, scream and shout ~~ """@propertydefservice_name(self):
"""The service name for this connection"""# NB We define this as a property so it's an instance attribute, rather than a class attribute, without# having to overload __init__.returnstr(self.scope["url_route"]["kwargs"].get("service_name", None))
@propertydefservice_version(self):
"""The service version for this connection TODO ACTUALLY GET THIS OUT PROPERLY - Currently uses the default. Figure out a good way of listing the available services and versions. Possibly using a service table. Also need to consider the channel layer name which should include the version """# NB We define this as a property so it's an instance attribute, rather than a class attribute, without# having to overload __init__.returnsettings.SERVICES[self.service_name]["default_version"]
@propertydefresource_name(self):
"""The resource group name for this connection, if any"""# NB We define this as a property so it's an instance attribute, rather than a class attribute, without# having to overload __init__.returnstr(self.scope["url_route"]["kwargs"].get("resource_name", None))
defconnect(self):
"""Accept connection to the service, and send twine to the client as acknowledgement"""# The service name should be given in the URL somewhere, if not, don't accept the connectionifself.service_nameisNone:
self.close()
# Add this channel to the service group (unused at present)# async_to_sync(self.channel_layer.group_add)(self.service_name, self.channel_name)# Optionally add this channel to a particular resource group, if one is found in the URL kwargsifself.resource_nameisnotNone:
async_to_sync(self.channel_layer.group_add)(self.resource_name, self.channel_name)
# Accept the connectionself.accept()
# Send the twine for this service to the connected client# TODO this might be better done via a conventional view.# TODO Check read-configuration permissions and make the configuration availableself._send_twine()
defdisconnect(self, code):
"""Accept a disconnection from the service and associated groups gracefully"""# Disconnect from the serviceasync_to_sync(self.channel_layer.group_discard)(self.service_name, self.channel_name)
# Disconnect from the resource group, if a memberifself.resource_nameisnotNone:
async_to_sync(self.channel_layer.group_discard)(self.resource_name, self.channel_name)
defreceive(self, text_data=None, bytes_data=None):
"""Receive service instructions from WebSocket"""# Parse the incoming message datatry:
message=ReelMessage(src=text_data)
exceptException:
ReelMessage(status="error", reference=message.reference, hints="Unable to understand your message").send(
self
)
returnifmessage.action=="ping":
ReelMessage(
action="pong",
status="success",
reference=message.reference,
hints="U wanna play? I can pong your ping all day.",
).send(self)
ifmessage.action=="ask":
# Forward the inbound message to the resource groupifself.resource_name:
message.group_send(self.resource_name)
ask(self.service_name, self.service_version, message, self.resource_nameorself.channel_name)
# # Forward the inbount message to the resource group# if self.resource_group_name:# ReelMessage(src=text_data, action="asked").group_send(self.resource_group_name)## # TODO more elegant send API for ReelMessage so I can just provide an iterable of group names# # or *Consumer instances# message.group_send(self.resource_group_name)# else:# message.send(self)## # TODO more elegant send API for ReelMessage so I can just provide an iterable of group names# # or *Consumer instances# message.group_send(self.resource_group_name)# else:# message.send(self)#else:
ReelMessage(
action=message.action,
status="error",
hint="Unknown action. Perhaps you meant to send the message to the analysis consumer?",
).send(self)
def_send_twine(self):
"""Refresh the client with everything it needs to know about the twine and the analyses it has going on"""app_path=settings.TWINED_SERVICES[self.service_name][self.service_version]["path"]
withopen(os.path.join(app_path, "twine.json")) asfp:
twine=fp.read()
ReelMessage(action="connect", twine=twine).send(self)
consumers/analysis_log.py
importloggingfromchannels.generic.websocketimportWebsocketConsumerfromdjango_twined.messagesimportReelMessagelogger=logging.getLogger(__name__)
classAnalysisLogConsumer(WebsocketConsumer):
"""A consumer allowing logs for an analysis to be streamed in plain text directly from a python logger, then converted into ReelMessages which get logged to the analysis group. """@propertydefanalysis_id(self):
returnstr(self.scope["url_route"]["kwargs"].get("analysis_id"))
@propertydefanalysis_group_name(self):
returnf"analysis-{self.analysis_id}"defconnect(self):
"""Accept connection to to enable log forwarding to the analysis group"""self.accept()
defreceive(self, text_data=None, bytes_data=None):
"""Receive log text, wrap it properly as part of the messaging system and forward it to the analysis group"""ReelMessage(action="log", value=text_data).group_send(self.analysis_group_name)
consumers/analysis.py
importloggingfromuuidimportuuid4fromasgiref.syncimportasync_to_syncfromchannels.generic.websocketimportWebsocketConsumerfromdjango.core.cacheimportcachefromdjango_twined.messagesimportReelMessagefromdjango_twined.tasksimportasklogger=logging.getLogger(__name__)
classAnalysisConsumer(WebsocketConsumer):
"""A consumer allowing creation and monitoring of analyses ~~ When in danger or in doubt, run in circles, scream and shout ~~ I want to: load a configuration at startup of the server. I want to: load a frontend which knows about my twine and configuration settings. I want to: connect my frontend to a backend. I want to: supply a backend with input data periodically. - I can write a quick python script to do this from local. I want to: use the frontend to show some graphic of the most recent input data, continuously updating. I want to: use the frontend to show an input form. I want to: use the frontend to show an output (disabled) form with results. """defconnect(self):
"""Accept connection to the analysis, and forward any prior messages to the client"""self.group_id=str(self.scope["url_route"]["kwargs"].get("analysis_id", uuid4()))
# Set the group nameself.group_name=f"analysis-{self.group_id}"# Add this channel to the groupasync_to_sync(self.channel_layer.group_add)(self.group_name, self.channel_name)
# Accept the connectionself.accept()
# Flush the cache of messages from this group so farformessageincache.get(self.group_name, default=list()):
self.send(text_data=message)
defdisconnect(self, close_code):
"""Accept a disconnection from the analysis gracefully"""async_to_sync(self.channel_layer.group_discard)(self.group_name, self.channel_name)
defreceive(self, text_data=None, bytes_data=None):
"""Receive service instructions from WebSocket"""# Get the incoming message datatry:
message=ReelMessage(src=text_data)
exceptExceptionase:
logger.warning(f"Possible error (or malformed message): {e.args[0]}")
ReelMessage(status="error", hints="Unable to understand your message").send(self)
return# Attempt to respondtry:
# Run a new analysisifmessage.action=="ask":
ask(self.group_id, message).send(self)
exceptExceptionase:
logger.error(e.args[0])
ReelMessage(
action=message.action,
status="error",
).send(self)
return# # Append changes to the cache for this room, allowing us to flush all changes through to new connection# # TODO Race condition exists here. Either rewrite automerge for python and keep a master record, or use# # a locking strategy enabling us to ensure all changes are recorded or use a synchronous websocketconsumer# # to properly persist the change history# # https://pypi.org/project/python-redis-lock/# # https://github.com/joanvila/aioredlock# changes = cache.get(self.room_name, default=list())# changes.append(text_data)# cache.set(self.room_name, changes)# data = {# "type": "reel_message",# "message": text_data,# "message_bytes": bytes_data,# }# await self.channel_layer.group_send(self.room_group_name, data)defreel_message(self, event):
"""Send a message from the analysis to the client, serialising if necessary"""self.send(text_data=event["message"])
messages.py
importjsonimportloggingfromtimeimporttimefromuuidimportuuid4importdjango.dispatchfromasgiref.syncimportasync_to_syncfromchannels.layersimportget_channel_layerlogger=logging.getLogger(__name__)
reel_message_sent=django.dispatch.Signal()
channel_layer=get_channel_layer()
MESSAGE_FIELDS= (
"action",
"children",
"configuration_manifest",
"configuration_values",
"credentials",
"data",
"hints",
"input_manifest",
"input_values",
"logs",
"output_manifest",
"output_values",
"reference",
"status",
"timestamp",
"twine",
"uuid",
"value",
)
classReelMessage:
"""Class to manage incoming and outgoing messages"""def__init__(self, src=None, **kwargs):
"""Constructor for ServiceMessage"""ifsrcisnotNone:
kwargs= {
**json.loads(src),
**kwargs,
}
# TODO schema based validation with no extra fields allowedif"action"notinkwargs.keys():
raiseException("No action specified")
kwargs["status"] =kwargs.pop("status", "success")
# Check no uuid supplied and create one for this messageifkwargs.get("uuid", None) isnotNone:
raiseValueError("Cannot instantiate a reel message with a UUID. Each message must have a unique UUID.")
kwargs["uuid"] =str(uuid4())
# Add a server timestampifkwargs.get("timestamp", None) isnotNone:
raiseValueError(
"Cannot instantiate a reel message with a timestamp. Each message must determine its timestamp."
)
kwargs["timestamp"] =time()
self.__dict__=dict((k, kwargs.get(k, None)) forkinMESSAGE_FIELDS)
defserialise(self):
"""Serialise self to a string"""to_serialise=dict((k, getattr(self, k)) forkinMESSAGE_FIELDSifgetattr(self, k) isnotNone)
returnjson.dumps(to_serialise)
defsend(self, obj):
"""Send this message to an individual channel name Also sends a signal allowing 3rd party apps to execute callbacks for certain messages """serialised=self.serialise()
logger.debug("Sending ReelMessage to channel '%s': %s", obj, serialised)
obj.send(serialised)
reel_message_sent.send(sender=self.__class__, message=self, to_channel=obj, to_group=None)
defgroup_send(self, group_name, message_type="reel_message"):
"""Send this message to a group over channels Also sends a signal allowing 3rd party apps to execute callbacks for certain messages """serialised=self.serialise()
logger.debug("Group sending ReelMessage: %s", serialised)
async_to_sync(channel_layer.group_send)(group_name, {"type": message_type, "message": serialised})
reel_message_sent.send(sender=self.__class__, message=self, to_channel=None, to_group=group_name)
The text was updated successfully, but these errors were encountered:
thclark
changed the title
Add example of how to connecxt to question status updates over a websocket or subscription
Add example of how to connect to question status updates over a websocket or subscription
Feb 6, 2024
Feature request
Use Case
We want to deliver updates in real time to end clients, where django applications have that ability (eg use channels or graphql subscriptions).
Current state
Previously, we've embedded in django-twined, consumers that allow such updates to happen baed on a class called ReelMessage and a consumer. However, this has proven very inflexible (requireing specific implementation) and fallen out of date - presently unused in any systems so we are removing it to allow for use with any kind of real time system.
Proposed Solution
It would be good to provide an example or documentation showing how to do this.
Working Example
In the meantime, here are some components that were used as a basis in the previous, inflexible system, that could form a basis but be warned these were written prior to the Service Usage Events architecture so you probably want to hook to the signals framework in order to issue updates here instead of using this code directly - expect a substantially different implementation - this is just for reference.
routing.py
consumers/service.py
consumers/analysis_log.py
consumers/analysis.py
messages.py
The text was updated successfully, but these errors were encountered: