Skip to content

Commit f32425a

Browse files
committed
feat: add has_data_available() method to BaseConnection
1 parent 2ad6af7 commit f32425a

File tree

5 files changed

+115
-0
lines changed

5 files changed

+115
-0
lines changed

README.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,19 @@ This package contains a `setup.py` so it can be installed with `python3 setup.py
1313

1414
## Usage
1515
See included `examples/` folder for various use cases.
16+
17+
For patch subscriptions, `BaseConnection.has_data_available()` can be used to poll a
18+
subscription socket without blocking on `receive_json()` or `get_object_model_patch()`.
19+
This is useful when object model updates arrive less frequently than another data source.
20+
21+
```python
22+
from dsf.connections import SubscribeConnection, SubscriptionMode
23+
24+
subscription = SubscribeConnection(SubscriptionMode.PATCH)
25+
subscription.connect()
26+
object_model = subscription.get_object_model()
27+
28+
while True:
29+
if subscription.has_data_available():
30+
object_model.update_from_json(subscription.get_object_model_patch())
31+
```

docs/source/index.rst

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,24 @@ Welcome to dsf's documentation!
1313
API
1414
===
1515

16+
Subscription Polling
17+
====================
18+
19+
Patch subscriptions can be polled without blocking by checking
20+
``BaseConnection.has_data_available()`` before reading the next patch.
21+
22+
.. code-block:: python
23+
24+
from dsf.connections import SubscribeConnection, SubscriptionMode
25+
26+
subscription = SubscribeConnection(SubscriptionMode.PATCH)
27+
subscription.connect()
28+
object_model = subscription.get_object_model()
29+
30+
while True:
31+
if subscription.has_data_available():
32+
object_model.update_from_json(subscription.get_object_model_patch())
33+
1634
.. automodule:: dsf
1735
:members:
1836
.. automodule:: dsf.connections

src/dsf/connections/base_connection.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import json
2+
import select
23
import socket
34
import time
45
from typing import Optional
@@ -84,6 +85,17 @@ def receive_response(self):
8485
json_string = self.receive_json()
8586
return responses.decode_response(json.loads(json_string))
8687

88+
def has_data_available(self) -> bool:
89+
"""Return whether buffered or socket data is available to be read without blocking."""
90+
if self.get_json_object_end_index(self.input) > 1:
91+
return True
92+
93+
if not self.socket:
94+
return False
95+
96+
readable, _, _ = select.select([self.socket], [], [], 0)
97+
return bool(readable)
98+
8799
def receive_json(self) -> str:
88100
"""Receive the JSON response from the server"""
89101
if not self.socket:

tests/test_base_connection.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import unittest
2+
from unittest.mock import patch
3+
4+
from src.dsf.connections.base_connection import BaseConnection
5+
6+
7+
class TestBaseConnection(unittest.TestCase):
8+
def test_has_data_available_returns_true_for_complete_buffered_json(self):
9+
connection = BaseConnection()
10+
connection.input = '{"key":1}'
11+
12+
self.assertTrue(connection.has_data_available())
13+
14+
def test_has_data_available_returns_false_for_partial_buffer_without_socket_data(self):
15+
connection = BaseConnection()
16+
connection.input = '{"key"'
17+
connection.socket = object()
18+
19+
with patch('src.dsf.connections.base_connection.select.select', return_value=([], [], [])):
20+
self.assertFalse(connection.has_data_available())
21+
22+
def test_has_data_available_returns_true_when_socket_is_readable(self):
23+
connection = BaseConnection()
24+
connection.socket = object()
25+
26+
with patch(
27+
'src.dsf.connections.base_connection.select.select',
28+
return_value=([connection.socket], [], []),
29+
):
30+
self.assertTrue(connection.has_data_available())
31+
32+
33+
if __name__ == '__main__':
34+
unittest.main()

tests/test_subscribe_object_model.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,15 @@
1717
class TestSubscribeObjectModel(unittest.TestCase):
1818
"""Test suite for the object model subscription example."""
1919

20+
@staticmethod
21+
def _wait_for_data_available(subscribe_connection: SubscribeConnection, timeout: float = 1.0) -> bool:
22+
deadline = time.time() + timeout
23+
while time.time() < deadline:
24+
if subscribe_connection.has_data_available():
25+
return True
26+
time.sleep(0.01)
27+
return False
28+
2029
def setUp(self):
2130
"""Set up test environment before each test."""
2231
self.tmp_dir = tempfile.TemporaryDirectory()
@@ -131,6 +140,32 @@ def test_subscribe_object_model(self):
131140
# Verify the test completed successfully
132141
self.assertTrue(self.dcs_passed.is_set(), "The mock DCS did not complete successfully")
133142

143+
def test_has_data_available_during_subscription_flow(self):
144+
"""Test that has_data_available reflects queued model and patch updates."""
145+
subscribe_connection = SubscribeConnection(SubscriptionMode.PATCH)
146+
subscribe_connection.connect(self.mock_dcs_socket_file)
147+
148+
self.assertTrue(
149+
self._wait_for_data_available(subscribe_connection),
150+
"Expected the initial object model to be readable",
151+
)
152+
153+
subscribe_connection.get_object_model()
154+
155+
self.assertTrue(
156+
self._wait_for_data_available(subscribe_connection),
157+
"Expected the next object model patch to be readable after acknowledge",
158+
)
159+
160+
subscribe_connection.get_object_model_patch()
161+
162+
self.assertFalse(subscribe_connection.has_data_available())
163+
164+
subscribe_connection.close()
165+
166+
self.server_thread.join(timeout=5)
167+
self.assertTrue(self.dcs_passed.is_set(), "The mock DCS did not complete successfully")
168+
134169

135170
if __name__ == "__main__":
136171
unittest.main()

0 commit comments

Comments
 (0)