diff --git a/kafka/client_async.py b/kafka/client_async.py index 95278ae3a..84ed07f22 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -714,11 +714,13 @@ def _poll(self, timeout): for conn in six.itervalues(self._conns): if conn.requests_timed_out(): + timed_out = conn.timed_out_ifrs() + timeout_ms = (timed_out[0][2] - timed_out[0][1]) * 1000 log.warning('%s timed out after %s ms. Closing connection.', - conn, conn.config['request_timeout_ms']) + conn, timeout_ms) conn.close(error=Errors.RequestTimedOutError( 'Request timed out after %s ms' % - conn.config['request_timeout_ms'])) + timeout_ms)) if self._sensors: self._sensors.io_time.record((time.time() - end_select) * 1000000000) diff --git a/kafka/conn.py b/kafka/conn.py index 524be0095..dee8c1890 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -1153,11 +1153,13 @@ def recv(self): """ responses = self._recv() if not responses and self.requests_timed_out(): + timed_out = self.timed_out_ifrs() + timeout_ms = (timed_out[0][2] - timed_out[0][1]) * 1000 log.warning('%s timed out after %s ms. Closing connection.', - self, self.config['request_timeout_ms']) + self, timeout_ms) self.close(error=Errors.RequestTimedOutError( 'Request timed out after %s ms' % - self.config['request_timeout_ms'])) + timeout_ms)) return () # augment responses w/ correlation_id, future, and timestamp @@ -1235,6 +1237,11 @@ def _recv(self): def requests_timed_out(self): return self.next_ifr_request_timeout_ms() == 0 + def timed_out_ifrs(self): + now = time.time() + ifrs = sorted(self.in_flight_requests.values(), reverse=True, key=lambda ifr: ifr[2]) + return list(filter(lambda ifr: ifr[2] <= now, ifrs)) + def next_ifr_request_timeout_ms(self): with self._lock: if self.in_flight_requests: