Skip to content

Commit

Permalink
Add conn.timed_out_ifrs(); report per-request timeout in error messages
Browse files Browse the repository at this point in the history
  • Loading branch information
dpkp committed Feb 25, 2025
1 parent 2855dbf commit b66b885
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 4 deletions.
6 changes: 4 additions & 2 deletions kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -714,11 +714,13 @@ def _poll(self, timeout):

for conn in six.itervalues(self._conns):
if conn.requests_timed_out():
timed_out = conn.timed_out_ifrs()
timeout_ms = (timed_out[0][2] - timed_out[0][1]) * 1000
log.warning('%s timed out after %s ms. Closing connection.',
conn, conn.config['request_timeout_ms'])
conn, timeout_ms)
conn.close(error=Errors.RequestTimedOutError(
'Request timed out after %s ms' %
conn.config['request_timeout_ms']))
timeout_ms))

if self._sensors:
self._sensors.io_time.record((time.time() - end_select) * 1000000000)
Expand Down
11 changes: 9 additions & 2 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -1153,11 +1153,13 @@ def recv(self):
"""
responses = self._recv()
if not responses and self.requests_timed_out():
timed_out = self.timed_out_ifrs()
timeout_ms = (timed_out[0][2] - timed_out[0][1]) * 1000
log.warning('%s timed out after %s ms. Closing connection.',
self, self.config['request_timeout_ms'])
self, timeout_ms)
self.close(error=Errors.RequestTimedOutError(
'Request timed out after %s ms' %
self.config['request_timeout_ms']))
timeout_ms))
return ()

# augment responses w/ correlation_id, future, and timestamp
Expand Down Expand Up @@ -1235,6 +1237,11 @@ def _recv(self):
def requests_timed_out(self):
return self.next_ifr_request_timeout_ms() == 0

def timed_out_ifrs(self):
now = time.time()
ifrs = sorted(self.in_flight_requests.values(), reverse=True, key=lambda ifr: ifr[2])
return list(filter(lambda ifr: ifr[2] <= now, ifrs))

def next_ifr_request_timeout_ms(self):
with self._lock:
if self.in_flight_requests:
Expand Down

0 comments on commit b66b885

Please sign in to comment.