Skip to content

Commit

Permalink
Merge pull request #801 from Debilski/feature/fix-crashing
Browse files Browse the repository at this point in the history
Add a send queue for outgoing messages to avoid broken players stalling the server
  • Loading branch information
Debilski authored Jun 28, 2024
2 parents 245a9ca + 520087e commit 0e578c0
Showing 1 changed file with 53 additions and 4 deletions.
57 changes: 53 additions & 4 deletions pelita/scripts/pelita_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import json
import logging
import random
import queue
import shlex
import signal
import subprocess
Expand Down Expand Up @@ -171,11 +172,57 @@ def cleanup(_signum, _frame):
self.ticks_progressbar = 0.0
self.ticks_process_cleanup = 0.0

self.send_queue = queue.SimpleQueue()

def handle_send_queue(self):
if self.send_queue.qsize() == 0:
# The check for the qsize is not reliable and we only
# use it as an optimisation to skip early
return

# Handle all unsent messages to pair sockets
unsent = set()
try:
while True:
data = self.send_queue.get(block=False)
time_monotonic, dealer_id, message = data
if time.monotonic() - time_monotonic > 1:
# discard
_logger.warn(f"Could not send to dealer id {dealer_id.hex()}.")
continue

if dealer_id not in self.connection_map:
_logger.warn(f"Could not send to dealer id {dealer_id.hex()}.")
continue

process_info = self.connection_map[dealer_id]
process_info.info.last_msg = message

# Problem: When the receiving end of the pair socket has crashed, then
# a simple send will halt forever.
if process_info.pair_socket.poll(0, flags=zmq.POLLOUT) == zmq.POLLOUT:
process_info.pair_socket.send(message)
else:
unsent.add(data)

except queue.Empty:
for data in unsent:
self.send_queue.put(data)

def handle_known_client(self, dealer_id, message, progress):
# We try to send to the pair socket immediately
# If this fails, we enqueue the message and send it again later

process_info = self.connection_map[dealer_id]
process_info.info.last_msg = message
process_info.pair_socket.send(message)

try:
process_info.pair_socket.send(message, flags=zmq.NOBLOCK)
except zmq.ZMQError:
data = time.monotonic(), dealer_id, message
self.send_queue.put(data)
return


def handle_new_connection(self, dealer_id, message, progress):
try:
Expand Down Expand Up @@ -287,7 +334,7 @@ def handle_new_connection(self, dealer_id, message, progress):
_logger.info("Unknown incoming DEALER and not a request.")


def update_progress_bar(self, progress, process_info: ProcessInfo):
def update_progress_bar(self, progress, process_info: ProcessInfo, force_exit=False):
if not process_info.info.last_msg:
return

Expand Down Expand Up @@ -326,7 +373,7 @@ def update_progress_bar(self, progress, process_info: ProcessInfo):
except KeyError:
is_exit = False

if is_exit:
if is_exit or force_exit:
progress.stop_task(process_info.task)
progress.update(process_info.task, visible=False)
process_info.info.finished = True
Expand Down Expand Up @@ -388,6 +435,8 @@ def start(self):
# route message back
self.router_sock.send_multipart([process_info.dealer_id, message])

self.handle_send_queue()

# not every event needs to update the progress bars
if (now := time.monotonic()) - self.ticks_progressbar > 0.01:
self.ticks_progressbar = now
Expand All @@ -400,7 +449,7 @@ def start(self):
for process_info in list(self.connection_map.values()):
# check if the process has terminated
if process_info.proc.poll() is not None:
self.update_progress_bar(progress, process_info)
self.update_progress_bar(progress, process_info, force_exit=True)
# We need to unregister the socket or else the polling will take longer and longer
self.poll.unregister(process_info.pair_socket)
del self.connection_map[process_info.dealer_id]
Expand Down

0 comments on commit 0e578c0

Please sign in to comment.