Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions src/napari_tiled_browser/models/tiled_subscriber.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from qtpy.QtCore import QObject, QRunnable, Signal


class TiledSubscriberSignals(QObject):
finished = Signal()
results = Signal(object)


class TiledSubscriber(QRunnable):
def __init__(
self,
*,
client,
node_path_parts,
**kwargs,
):
super().__init__()
self.signals = TiledSubscriberSignals()
self.client = client
self.node_path_parts = node_path_parts

def run(self):
catalog_sub = self.client.subscribe()
catalog_sub.child_created.add_callback(on_new_child)
catalog_sub.start()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This works.

Long-term question; QRunnable is generally intended for fire-and-forget short-term tasks. Is it appropriate for this task, which will run ~forever waiting for new scans to appear? This might be a use case for QThreadPool`.

Again, what you have works. Just something keep in the back of your minds.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The question probably comes down to how much interaction is needed between the worker thread and the main thread.

That does seem to imply that QThread would be more appropriate that QThreadPool + QRunnble for the catalog subscriber here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The child data subscribers however might be better off as individual QRunnables in a QThreadPool.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe:

  • Every sub should be started a QThread because they could be long-running (like a long scan or a never-completed scan).
  • The callbacks should be run using a custom executor:
class QtExecutor:
    "Wrap QtThreadPool in concurrent.futures.Executor-compatible API"
    …

The above is easier than it might sound: you just need the methods submit(f, *args) and shutdown(wait: bool).

Then inject your custom executor, overwriting Tiled’s default ThreadPoolExecutor like x.subscribe(executor=QtExecutor(QtThreadPool())).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thus, the threads listening to websockets for seconds/minutes/hours are QThread and the threads doing the work of the callbacks (which is always a short job) are QRunmable in a QThreadPool.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Qthread largely has the same code signature as Qrunnable, anything else I need to add?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above comment about QtExector.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear, that is not a thing that exists—it is a thing you'll need to define, to wrap the QtThreadPool interface in a class that present the Executor interface expected by Tiled.



def on_new_child(update):
"A new child node has been created in a container."
child = update.child()
print(child)
sub = child.subscribe()
# Is the child also a container?
if child.structure_family == "container":
# Recursively subscribe to the children of this new container.
sub.child_created.add_callback(on_new_child)
else:
# Subscribe to data updates (i.e. appended table rows or array slices).
sub.new_data.add_callback(on_new_data)
# Launch the subscription.
# Ask the server to replay from the very first update, if we already
# missed some.
sub.start_in_thread(1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose we need handle the case where replaying from message #1 overlaps with data we already have.

Is there a unique identifier in the update payload to know whether we are receiving data that is already in the table / image stack?



def on_new_data(update):
"Data has been updated (maybe appended) to an array or table."
print(update.data())
8 changes: 8 additions & 0 deletions src/napari_tiled_browser/qt/tiled_widget.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

from napari_tiled_browser.models.tiled_selector import TiledSelector
from napari_tiled_browser.models.tiled_worker import TiledWorker
from napari_tiled_browser.models.tiled_subscriber import TiledSubscriber
from napari_tiled_browser.qt.tiled_search import QTiledSearchWidget

_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -248,6 +249,13 @@ def fetch_table_data(self):
runnable.signals.results.connect(self.populate_table)
self.thread_pool.start(runnable)

def subscribe_to_table_data(self):
runnable = TiledSubscriber(
client=self.model.client,
node_path_parts=self.model.node_path_parts,
)
self.thread_pool.start(runnable)

def populate_table(self, results):
_logger.debug("QTiledBrowser.populate_table()...")
original_state = {}
Expand Down