The realtime-pubsub-client
is a Python client library for interacting
with Realtime Pub/Sub applications. It enables developers to manage real-time WebSocket
connections, handle subscriptions, and process messages efficiently. The library provides a simple and flexible API to
interact with realtime applications, supporting features like publishing/sending messages, subscribing to topics,
handling acknowledgements, and waiting for replies with timeout support.
- WebSocket Connection Management: Seamlessly connect and disconnect from the Realtime Pub/Sub service with automatic reconnection support.
- Topic Subscription: Subscribe and unsubscribe to topics for receiving messages.
- Topic Publishing: Publish messages to specific topics with optional message types and compression.
- Message Sending: Send messages to backend applications with optional message types and compression.
- Event Handling: Handle incoming messages with custom event listeners.
- Acknowledgements and Replies: Wait for gateway acknowledgements or replies to messages with timeout support.
- Error Handling: Robust error handling and logging capabilities.
- Asynchronous Support: Built using
asyncio
for efficient asynchronous programming.
Install the realtime-pubsub-client
library via pip:
pip install realtime-pubsub-client
This guide will help you set up and use the realtime-pubsub-client
library in your Python project.
First, import the RealtimeClient
class and create a new instance with the required configuration:
import asyncio
import logging
import os
from realtime_pubsub_client import RealtimeClient
async def main():
async def get_url():
# replace with your access token retrieval strategy
access_token = os.environ.get('ACCESS_TOKEN')
app_id = os.environ.get('APP_ID')
# return the WebSocket URL with the access token
return f"wss://genesis.r7.21no.de/apps/{app_id}?access_token={access_token}"
client_options = {
'logger': logging.getLogger('RealtimeClient'),
'websocket_options': {
'url_provider': get_url,
},
}
client = RealtimeClient(client_options)
async def on_session_started(connection_info):
print('Connection ID:', connection_info['id'])
# Subscribe to topics here
await client.subscribe_remote_topic('topic1')
await client.subscribe_remote_topic('topic2')
client.on('session.started', on_session_started)
await client.connect()
await client.wait_for('session.started')
asyncio.run(main())
You can handle messages for specific topics and message types:
Note: The topic and message type are separated by a dot (
.
) in the event name.
def handle_message(message, reply_fn):
# Message handling logic here
print('Received message:', message['data']['payload'])
client.on('topic1.action1', handle_message)
Wildcard subscriptions are also supported:
client.on('topic1.*', handle_message)
Publish messages to a topic:
await client.publish('topic1', 'Hello, world!', message_type='text-message')
Set up event listeners to handle incoming messages:
async def handle_message(message, reply_fn):
# Processing the message
print('Received message:', message['data']['payload'])
# Sending a reply
await reply_fn('Message received!', 'ok')
client.on('topic1.text-message', handle_message)
wait_for_ack(timeout=None)
: Waits for an acknowledgement of the message, with an optional timeout in seconds.wait_for_reply(timeout=None)
: Waits for a reply to the message, with an optional timeout in seconds.
Wait for the Realtime Gateway acknowledgement after publishing a message:
waiter = await client.publish('secure/peer-to-peer1', 'Hi', message_type='text-message')
await waiter.wait_for_ack()
Wait for the Realtime Gateway acknowledgement after sending a message:
waiter = await client.send({
# Message payload
}, message_type='create')
await waiter.wait_for_ack()
Wait for a reply with a timeout:
waiter = await client.send({
# Message payload
}, message_type='create')
await waiter.wait_for_reply(timeout=5) # Wait for up to 5 seconds
Handle errors and disconnections:
def on_error(error):
print('WebSocket error:', error)
def on_close(event):
print('WebSocket closed:', event)
client.on('error', on_error)
client.on('close', on_close)
RealtimeClient(config)
Creates a new RealtimeClient
instance.
config
: Configuration options for the client.
-
connect()
: Connects the client to the WebSocket Messaging Gateway.await client.connect()
-
disconnect()
: Terminates the WebSocket connection.await client.disconnect()
-
subscribe_remote_topic(topic)
: Subscribes the connection to a remote topic.await client.subscribe_remote_topic(topic)
-
unsubscribe_remote_topic(topic)
: Unsubscribes the connection from a remote topic.await client.unsubscribe_remote_topic(topic)
-
publish(topic, payload, message_type="broadcast", compress=False, message_id=None)
: Publishes a message to a topic.waiter = await client.publish(topic, payload)
Returns a
WaitFor
instance to wait for acknowledgements or replies. -
send(payload, compress=False, message_id=None)
: Sends a message to the server.waiter = await client.send(payload, options)
Returns a
WaitFor
instance to wait for acknowledgements or replies. -
wait(ms)
: Waits for a specified duration in milliseconds. Utility function for waiting in async functions.await wait(ms)
-
'session.started'
: Emitted when the session starts.client.on('session.started', on_session_started)
-
'error'
: Emitted on WebSocket errors.client.on('error', on_error)
-
'close'
: Emitted when the WebSocket connection closes.client.on('close', on_close)
-
Custom Events: Handle custom events based on topic and message type.
client.on('TOPIC_NAME.MESSAGE_TYPE', handle_message)
Note: Wildcard subscriptions are also supported.
This library is licensed under the MIT License.
For more detailed examples and advanced configurations, please refer to the documentation.
- Ensure that you have an account and an app set up with Realtime Pub/Sub.
- Customize the
url_provider
or URL to retrieve the access token for connecting to your realtime application. - Implement the
get_auth_token
function according to your authentication mechanism. - Optionally use the
logger
option to integrate with your application's logging system. - Handle errors and disconnections gracefully to improve the robustness of your application.
- Make sure to handle timeouts when waiting for replies to avoid hanging operations.
Feel free to contribute to this project by submitting issues or pull requests on GitHub.