diff --git a/kafka/admin/client.py b/kafka/admin/client.py index c9e51e5c9..867e661a0 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -219,7 +219,7 @@ def __init__(self, **configs): self.config['api_version'] = self._client.config['api_version'] else: # need to run check_version for get_api_versions() - self._client.check_version(timeout=(self.config['api_version_auto_timeout_ms'] / 1000)) + self._client.check_version() self._closed = False self._refresh_controller_id() @@ -294,7 +294,7 @@ def _refresh_controller_id(self, timeout_ms=30000): time.sleep(1) continue # verify the controller is new enough to support our requests - controller_version = self._client.check_version(node_id=controller_id, timeout=(self.config['api_version_auto_timeout_ms'] / 1000)) + controller_version = self._client.check_version(node_id=controller_id) if controller_version < (0, 10, 0): raise IncompatibleBrokerVersion( "The controller appears to be running Kafka {}. KafkaAdminClient requires brokers >= 0.10.0.0." diff --git a/kafka/client_async.py b/kafka/client_async.py index be19cf80b..c73840846 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -238,8 +238,7 @@ def __init__(self, **configs): # Check Broker Version if not set explicitly if self.config['api_version'] is None: - check_timeout = self.config['api_version_auto_timeout_ms'] / 1000 - self.config['api_version'] = self.check_version(timeout=check_timeout) + self.config['api_version'] = self.check_version() elif self.config['api_version'] in BROKER_API_VERSIONS: self._api_versions = BROKER_API_VERSIONS[self.config['api_version']] elif (self.config['api_version'] + (0,)) in BROKER_API_VERSIONS: @@ -289,7 +288,7 @@ def _can_connect(self, node_id): def _conn_state_change(self, node_id, sock, conn): with self._lock: - if conn.connecting(): + if conn.state is ConnectionStates.CONNECTING: # SSL connections can enter this state 2x (second during Handshake) if node_id not in self._connecting: self._connecting.add(node_id) @@ -301,7 +300,13 @@ def _conn_state_change(self, node_id, sock, conn): if self.cluster.is_bootstrap(node_id): self._last_bootstrap = time.time() - elif conn.connected(): + elif conn.state in (ConnectionStates.API_VERSIONS, ConnectionStates.AUTHENTICATING): + try: + self._selector.register(sock, selectors.EVENT_READ, conn) + except KeyError: + self._selector.modify(sock, selectors.EVENT_READ, conn) + + elif conn.state is ConnectionStates.CONNECTED: log.debug("Node %s connected", node_id) if node_id in self._connecting: self._connecting.remove(node_id) @@ -318,6 +323,8 @@ def _conn_state_change(self, node_id, sock, conn): if self.cluster.is_bootstrap(node_id): self._bootstrap_fails = 0 + if self._api_versions is None: + self._api_versions = conn._api_versions else: for node_id in list(self._conns.keys()): @@ -384,11 +391,16 @@ def _should_recycle_connection(self, conn): return False - def _maybe_connect(self, node_id): + def _init_connect(self, node_id): """Idempotent non-blocking connection attempt to the given node id.""" with self._lock: conn = self._conns.get(node_id) + # Check if existing connection should be recreated because host/port changed + if conn is not None and self._should_recycle_connection(conn): + self._conns.pop(node_id) + conn = None + if conn is None: broker = self.cluster.broker_metadata(node_id) assert broker, 'Broker id %s not in current metadata' % (node_id,) @@ -403,15 +415,8 @@ def _maybe_connect(self, node_id): **self.config) self._conns[node_id] = conn - # Check if existing connection should be recreated because host/port changed - elif self._should_recycle_connection(conn): - self._conns.pop(node_id) - return False - - elif conn.connected(): - return True - - conn.connect() + if conn.disconnected(): + conn.connect() return conn.connected() def ready(self, node_id, metadata_priority=True): @@ -601,7 +606,7 @@ def poll(self, timeout_ms=None, future=None): # Attempt to complete pending connections for node_id in list(self._connecting): - self._maybe_connect(node_id) + self._init_connect(node_id) # If we got a future that is already done, don't block in _poll if future is not None and future.is_done: @@ -715,11 +720,13 @@ def _poll(self, timeout): for conn in six.itervalues(self._conns): if conn.requests_timed_out(): + timed_out = conn.timed_out_ifrs() + timeout_ms = (timed_out[0][2] - timed_out[0][1]) * 1000 log.warning('%s timed out after %s ms. Closing connection.', - conn, conn.config['request_timeout_ms']) + conn, timeout_ms) conn.close(error=Errors.RequestTimedOutError( 'Request timed out after %s ms' % - conn.config['request_timeout_ms'])) + timeout_ms)) if self._sensors: self._sensors.io_time.record((time.time() - end_select) * 1000000000) @@ -902,65 +909,64 @@ def refresh_done(val_or_error): def get_api_versions(self): """Return the ApiVersions map, if available. - Note: A call to check_version must previously have succeeded and returned - version 0.10.0 or later + Note: Only available after bootstrap; requires broker version 0.10.0 or later. Returns: a map of dict mapping {api_key : (min_version, max_version)}, or None if ApiVersion is not supported by the kafka cluster. """ return self._api_versions - def check_version(self, node_id=None, timeout=2, strict=False): + def check_version(self, node_id=None, timeout=None, **kwargs): """Attempt to guess the version of a Kafka broker. - Note: It is possible that this method blocks longer than the - specified timeout. This can happen if the entire cluster - is down and the client enters a bootstrap backoff sleep. - This is only possible if node_id is None. + Keyword Arguments: + node_id (str, optional): Broker node id from cluster metadata. If None, attempts + to connect to any available broker until version is identified. + Default: None + timeout (num, optional): Maximum time in seconds to try to check broker version. + If unable to identify version before timeout, raise error (see below). + Default: api_version_auto_timeout_ms / 1000 Returns: version tuple, i.e. (3, 9), (2, 0), (0, 10, 2) etc Raises: NodeNotReadyError (if node_id is provided) NoBrokersAvailable (if node_id is None) - UnrecognizedBrokerVersion: please file bug if seen! - AssertionError (if strict=True): please file bug if seen! """ - self._lock.acquire() - end = time.time() + timeout - while time.time() < end: - - # It is possible that least_loaded_node falls back to bootstrap, - # which can block for an increasing backoff period - try_node = node_id or self.least_loaded_node() - if try_node is None: - self._lock.release() - raise Errors.NoBrokersAvailable() - self._maybe_connect(try_node) - conn = self._conns[try_node] - - # We will intentionally cause socket failures - # These should not trigger metadata refresh - self._refresh_on_disconnects = False - try: - remaining = end - time.time() - version = conn.check_version(timeout=remaining, strict=strict, topics=list(self.config['bootstrap_topics_filter'])) - if not self._api_versions: - self._api_versions = conn.get_api_versions() - self._lock.release() - return version - except Errors.NodeNotReadyError: - # Only raise to user if this is a node-specific request + timeout = timeout or (self.config['api_version_auto_timeout_ms'] / 1000) + with self._lock: + end = time.time() + timeout + while time.time() < end: + time_remaining = max(end - time.time(), 0) + if node_id is not None and self.connection_delay(node_id) > 0: + sleep_time = min(time_remaining, self.connection_delay(node_id) / 1000.0) + if sleep_time > 0: + time.sleep(sleep_time) + continue + try_node = node_id or self.least_loaded_node() + if try_node is None: + sleep_time = min(time_remaining, self.least_loaded_node_refresh_ms() / 1000.0) + if sleep_time > 0: + log.warning('No node available during check_version; sleeping %.2f secs', sleep_time) + time.sleep(sleep_time) + continue + log.debug('Attempting to check version with node %s', try_node) + self._init_connect(try_node) + conn = self._conns[try_node] + + while conn.connecting() and time.time() < end: + timeout_ms = min((end - time.time()) * 1000, 200) + self.poll(timeout_ms=timeout_ms) + + if conn._api_version is not None: + return conn._api_version + + # Timeout + else: if node_id is not None: - self._lock.release() - raise - finally: - self._refresh_on_disconnects = True - - # Timeout - else: - self._lock.release() - raise Errors.NoBrokersAvailable() + raise Errors.NodeNotReadyError(node_id) + else: + raise Errors.NoBrokersAvailable() def wakeup(self): if self._waking or self._wake_w is None: diff --git a/kafka/conn.py b/kafka/conn.py index 4d1c36b95..8a0cdd193 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -24,8 +24,10 @@ from kafka.future import Future from kafka.metrics.stats import Avg, Count, Max, Rate from kafka.oauth.abstract import AbstractTokenProvider -from kafka.protocol.admin import SaslHandShakeRequest, DescribeAclsRequest_v2, DescribeClientQuotasRequest -from kafka.protocol.commit import OffsetFetchRequest +from kafka.protocol.admin import DescribeAclsRequest_v2, DescribeClientQuotasRequest, ListGroupsRequest, SaslHandShakeRequest +from kafka.protocol.api_versions import ApiVersionsRequest +from kafka.protocol.broker_api_versions import BROKER_API_VERSIONS +from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest from kafka.protocol.offset import OffsetRequest from kafka.protocol.produce import ProduceRequest from kafka.protocol.metadata import MetadataRequest @@ -92,12 +94,12 @@ class SSLWantWriteError(Exception): class ConnectionStates(object): - DISCONNECTING = '' DISCONNECTED = '' CONNECTING = '' HANDSHAKE = '' CONNECTED = '' AUTHENTICATING = '' + API_VERSIONS = '' class BrokerConnection(object): @@ -169,7 +171,7 @@ class BrokerConnection(object): Default: None api_version_auto_timeout_ms (int): number of milliseconds to throw a timeout exception from the constructor when checking the broker - api version. Only applies if api_version is None + api version. Only applies if api_version is None. Default: 2000. selector (selectors.BaseSelector): Provide a specific selector implementation to use for I/O multiplexing. Default: selectors.DefaultSelector @@ -215,6 +217,7 @@ class BrokerConnection(object): 'ssl_password': None, 'ssl_ciphers': None, 'api_version': None, + 'api_version_auto_timeout_ms': 2000, 'selector': selectors.DefaultSelector, 'state_change_callback': lambda node_id, sock, conn: True, 'metrics': None, @@ -228,6 +231,12 @@ class BrokerConnection(object): } SECURITY_PROTOCOLS = ('PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL') SASL_MECHANISMS = ('PLAIN', 'GSSAPI', 'OAUTHBEARER', "SCRAM-SHA-256", "SCRAM-SHA-512") + VERSION_CHECKS = ( + ((0, 9), ListGroupsRequest[0]()), + ((0, 8, 2), GroupCoordinatorRequest[0]('kafka-python-default-group')), + ((0, 8, 1), OffsetFetchRequest[0]('kafka-python-default-group', [])), + ((0, 8, 0), MetadataRequest[0]([])), + ) def __init__(self, host, port, afi, **configs): self.host = host @@ -236,6 +245,8 @@ def __init__(self, host, port, afi, **configs): self._sock_afi = afi self._sock_addr = None self._api_versions = None + self._api_version = None + self._check_version_idx = None self.config = copy.copy(self.DEFAULT_CONFIG) for key in self.config: @@ -300,6 +311,7 @@ def __init__(self, host, port, afi, **configs): self._ssl_context = None if self.config['ssl_context'] is not None: self._ssl_context = self.config['ssl_context'] + self._api_versions_future = None self._sasl_auth_future = None self.last_attempt = 0 self._gai = [] @@ -403,17 +415,9 @@ def connect(self): self.config['state_change_callback'](self.node_id, self._sock, self) # _wrap_ssl can alter the connection state -- disconnects on failure self._wrap_ssl() - - elif self.config['security_protocol'] == 'SASL_PLAINTEXT': - log.debug('%s: initiating SASL authentication', self) - self.state = ConnectionStates.AUTHENTICATING - self.config['state_change_callback'](self.node_id, self._sock, self) - else: - # security_protocol PLAINTEXT - log.info('%s: Connection complete.', self) - self.state = ConnectionStates.CONNECTED - self._reset_reconnect_backoff() + log.debug('%s: checking broker Api Versions', self) + self.state = ConnectionStates.API_VERSIONS self.config['state_change_callback'](self.node_id, self._sock, self) # Connection failed @@ -432,15 +436,25 @@ def connect(self): if self.state is ConnectionStates.HANDSHAKE: if self._try_handshake(): log.debug('%s: completed SSL handshake.', self) - if self.config['security_protocol'] == 'SASL_SSL': - log.debug('%s: initiating SASL authentication', self) - self.state = ConnectionStates.AUTHENTICATING - else: - log.info('%s: Connection complete.', self) - self.state = ConnectionStates.CONNECTED - self._reset_reconnect_backoff() + log.debug('%s: checking broker Api Versions', self) + self.state = ConnectionStates.API_VERSIONS self.config['state_change_callback'](self.node_id, self._sock, self) + if self.state is ConnectionStates.API_VERSIONS: + if self._try_api_versions_check(): + # _try_api_versions_check has side-effects: possibly disconnected on socket errors + if self.state is ConnectionStates.API_VERSIONS: + if self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL'): + log.debug('%s: initiating SASL authentication', self) + self.state = ConnectionStates.AUTHENTICATING + self.config['state_change_callback'](self.node_id, self._sock, self) + else: + # security_protocol PLAINTEXT + log.info('%s: Connection complete.', self) + self.state = ConnectionStates.CONNECTED + self._reset_reconnect_backoff() + self.config['state_change_callback'](self.node_id, self._sock, self) + if self.state is ConnectionStates.AUTHENTICATING: assert self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL') if self._try_authenticate(): @@ -521,6 +535,75 @@ def _try_handshake(self): return False + def _try_api_versions_check(self): + if self._api_versions_future is None: + if self.config['api_version'] is not None: + self._api_version = self.config['api_version'] + return True + elif self._check_version_idx is None: + # TODO: Implement newer versions + # ((3, 9), ApiVersionsRequest[4]()), + # ((2, 4), ApiVersionsRequest[3]()), + # ((2, 0), ApiVersionsRequest[2]()), + # ((0, 11), ApiVersionsRequest[1]()), + # ((0, 10), ApiVersionsRequest[0]()), + request = ApiVersionsRequest[0]() + future = Future() + response = self._send(request, blocking=True, request_timeout_ms=(self.config['api_version_auto_timeout_ms'] * 0.8)) + response.add_callback(self._handle_api_versions_response, future) + response.add_errback(self._handle_api_versions_failure, future) + self._api_versions_future = future + elif self._check_version_idx < len(self.VERSION_CHECKS): + version, request = self.VERSION_CHECKS[self._check_version_idx] + future = Future() + response = self._send(request, blocking=True, request_timeout_ms=(self.config['api_version_auto_timeout_ms'] * 0.8)) + response.add_callback(self._handle_check_version_response, future, version) + response.add_errback(self._handle_check_version_failure, future) + self._api_versions_future = future + else: + raise 'Unable to determine broker version.' + + for r, f in self.recv(): + f.success(r) + + # A connection error during blocking send could trigger close() which will reset the future + if self._api_versions_future is None: + return False + elif self._api_versions_future.failed(): + ex = self._api_versions_future.exception + if not isinstance(ex, Errors.KafkaConnectionError): + raise ex + return self._api_versions_future.succeeded() + + def _handle_api_versions_response(self, future, response): + error_type = Errors.for_code(response.error_code) + # if error_type i UNSUPPORTED_VERSION: retry w/ latest version from response + assert error_type is Errors.NoError, "API version check failed" + self._api_versions = dict([ + (api_key, (min_version, max_version)) + for api_key, min_version, max_version in response.api_versions + ]) + self._api_version = self._infer_broker_version_from_api_versions(self._api_versions) + future.success(self._api_version) + self.connect() + + def _handle_api_versions_failure(self, future, ex): + future.failure(ex) + self._check_version_idx = 0 + + def _handle_check_version_response(self, future, version, _response): + log.info('Broker version identified as %s', '.'.join(map(str, version))) + #log.info('Set configuration api_version=%s to skip auto' + # ' check_version requests on startup', version) + self._api_versions = BROKER_API_VERSIONS[version] + self._api_version = version + future.success(version) + self.connect() + + def _handle_check_version_failure(self, future, ex): + future.failure(ex) + self._check_version_idx += 1 + def _try_authenticate(self): assert self.config['api_version'] is None or self.config['api_version'] >= (0, 10, 0) @@ -528,7 +611,7 @@ def _try_authenticate(self): # Build a SaslHandShakeRequest message request = SaslHandShakeRequest[0](self.config['sasl_mechanism']) future = Future() - sasl_response = self._send(request) + sasl_response = self._send(request, blocking=True) sasl_response.add_callback(self._handle_sasl_handshake_response, future) sasl_response.add_errback(lambda f, e: f.failure(e), future) self._sasl_auth_future = future @@ -553,23 +636,28 @@ def _handle_sasl_handshake_response(self, future, response): return future.failure(error_type(self)) if self.config['sasl_mechanism'] not in response.enabled_mechanisms: - return future.failure( + future.failure( Errors.UnsupportedSaslMechanismError( 'Kafka broker does not support %s sasl mechanism. Enabled mechanisms are: %s' % (self.config['sasl_mechanism'], response.enabled_mechanisms))) elif self.config['sasl_mechanism'] == 'PLAIN': - return self._try_authenticate_plain(future) + self._try_authenticate_plain(future) elif self.config['sasl_mechanism'] == 'GSSAPI': - return self._try_authenticate_gssapi(future) + self._try_authenticate_gssapi(future) elif self.config['sasl_mechanism'] == 'OAUTHBEARER': - return self._try_authenticate_oauth(future) + self._try_authenticate_oauth(future) elif self.config['sasl_mechanism'].startswith("SCRAM-SHA-"): - return self._try_authenticate_scram(future) + self._try_authenticate_scram(future) else: - return future.failure( + future.failure( Errors.UnsupportedSaslMechanismError( 'kafka-python does not support SASL mechanism %s' % self.config['sasl_mechanism'])) + assert future.is_done, 'SASL future not complete after mechanism processing!' + if future.failed(): + self.close(error=future.exception) + else: + self.connect() def _send_bytes(self, data): """Send some data via non-blocking IO @@ -879,7 +967,15 @@ def connecting(self): different states, such as SSL handshake, authorization, etc).""" return self.state in (ConnectionStates.CONNECTING, ConnectionStates.HANDSHAKE, - ConnectionStates.AUTHENTICATING) + ConnectionStates.AUTHENTICATING, + ConnectionStates.API_VERSIONS) + + def initializing(self): + """Returns True if socket is connected but full connection is not complete. + During this time the connection may send api requests to the broker to + check api versions and perform SASL authentication.""" + return self.state in (ConnectionStates.AUTHENTICATING, + ConnectionStates.API_VERSIONS) def disconnected(self): """Return True iff socket is closed""" @@ -927,6 +1023,7 @@ def close(self, error=None): return log.log(logging.ERROR if error else logging.INFO, '%s: Closing connection. %s', self, error or '') self._update_reconnect_backoff() + self._api_versions_future = None self._sasl_auth_future = None self._protocol = KafkaProtocol( client_id=self.config['client_id'], @@ -948,15 +1045,14 @@ def close(self, error=None): # drop lock before state change callback and processing futures self.config['state_change_callback'](self.node_id, sock, self) sock.close() - for (_correlation_id, (future, _timestamp)) in ifrs: + for (_correlation_id, (future, _timestamp, _timeout)) in ifrs: future.failure(error) def _can_send_recv(self): """Return True iff socket is ready for requests / responses""" - return self.state in (ConnectionStates.AUTHENTICATING, - ConnectionStates.CONNECTED) + return self.connected() or self.initializing() - def send(self, request, blocking=True): + def send(self, request, blocking=True, request_timeout_ms=None): """Queue request for async network send, return Future()""" future = Future() if self.connecting(): @@ -965,9 +1061,9 @@ def send(self, request, blocking=True): return future.failure(Errors.KafkaConnectionError(str(self))) elif not self.can_send_more(): return future.failure(Errors.TooManyInFlightRequests(str(self))) - return self._send(request, blocking=blocking) + return self._send(request, blocking=blocking, request_timeout_ms=request_timeout_ms) - def _send(self, request, blocking=True): + def _send(self, request, blocking=True, request_timeout_ms=None): future = Future() with self._lock: if not self._can_send_recv(): @@ -980,9 +1076,11 @@ def _send(self, request, blocking=True): log.debug('%s Request %d: %s', self, correlation_id, request) if request.expect_response(): - sent_time = time.time() assert correlation_id not in self.in_flight_requests, 'Correlation ID already in-flight!' - self.in_flight_requests[correlation_id] = (future, sent_time) + sent_time = time.time() + request_timeout_ms = request_timeout_ms or self.config['request_timeout_ms'] + timeout_at = sent_time + (request_timeout_ms / 1000) + self.in_flight_requests[correlation_id] = (future, sent_time, timeout_at) else: future.success(None) @@ -1061,18 +1159,20 @@ def recv(self): """ responses = self._recv() if not responses and self.requests_timed_out(): + timed_out = self.timed_out_ifrs() + timeout_ms = (timed_out[0][2] - timed_out[0][1]) * 1000 log.warning('%s timed out after %s ms. Closing connection.', - self, self.config['request_timeout_ms']) + self, timeout_ms) self.close(error=Errors.RequestTimedOutError( 'Request timed out after %s ms' % - self.config['request_timeout_ms'])) + timeout_ms)) return () # augment responses w/ correlation_id, future, and timestamp for i, (correlation_id, response) in enumerate(responses): try: with self._lock: - (future, timestamp) = self.in_flight_requests.pop(correlation_id) + (future, timestamp, _timeout) = self.in_flight_requests.pop(correlation_id) except KeyError: self.close(Errors.KafkaConnectionError('Received unrecognized correlation id')) return () @@ -1143,27 +1243,21 @@ def _recv(self): def requests_timed_out(self): return self.next_ifr_request_timeout_ms() == 0 + def timed_out_ifrs(self): + now = time.time() + ifrs = sorted(self.in_flight_requests.values(), reverse=True, key=lambda ifr: ifr[2]) + return list(filter(lambda ifr: ifr[2] <= now, ifrs)) + def next_ifr_request_timeout_ms(self): with self._lock: if self.in_flight_requests: - get_timestamp = lambda v: v[1] - oldest_at = min(map(get_timestamp, - self.in_flight_requests.values())) - next_timeout = oldest_at + self.config['request_timeout_ms'] / 1000.0 + get_timeout = lambda v: v[2] + next_timeout = min(map(get_timeout, + self.in_flight_requests.values())) return max(0, (next_timeout - time.time()) * 1000) else: return float('inf') - def _handle_api_versions_response(self, response): - error_type = Errors.for_code(response.error_code) - if error_type is not Errors.NoError: - return False - self._api_versions = dict([ - (api_key, (min_version, max_version)) - for api_key, min_version, max_version in response.api_versions - ]) - return self._api_versions - def get_api_versions(self): if self._api_versions is not None: return self._api_versions @@ -1178,6 +1272,20 @@ def _infer_broker_version_from_api_versions(self, api_versions): test_cases = [ # format (, ) # Make sure to update consumer_integration test check when adding newer versions. + # ((3, 9), FetchRequest[17]), + # ((3, 8), ProduceRequest[11]), + # ((3, 7), FetchRequest[16]), + # ((3, 6), AddPartitionsToTxnRequest[4]), + # ((3, 5), FetchRequest[15]), + # ((3, 4), StopReplicaRequest[3]), # broker-internal api... + # ((3, 3), DescribeAclsRequest[3]), + # ((3, 2), JoinGroupRequest[9]), + # ((3, 1), FetchRequest[13]), + # ((3, 0), ListOffsetsRequest[7]), + # ((2, 8), ProduceRequest[9]), + # ((2, 7), FetchRequest[12]), + # ((2, 6), ListGroupsRequest[4]), + # ((2, 5), JoinGroupRequest[7]), ((2, 6), DescribeClientQuotasRequest[0]), ((2, 5), DescribeAclsRequest_v2), ((2, 4), ProduceRequest[8]), @@ -1204,120 +1312,24 @@ def _infer_broker_version_from_api_versions(self, api_versions): # so if all else fails, choose that return (0, 10, 0) - def check_version(self, timeout=2, strict=False, topics=[]): + def check_version(self, timeout=2, **kwargs): """Attempt to guess the broker version. + Keyword Arguments: + timeout (numeric, optional): Maximum number of seconds to block attempting + to connect and check version. Default 2 + Note: This is a blocking call. Returns: version tuple, i.e. (3, 9), (2, 4), etc ... + + Raises: NodeNotReadyError on timeout """ timeout_at = time.time() + timeout - log.info('Probing node %s broker version', self.node_id) - # Monkeypatch some connection configurations to avoid timeouts - override_config = { - 'request_timeout_ms': timeout * 1000, - 'max_in_flight_requests_per_connection': 5 - } - stashed = {} - for key in override_config: - stashed[key] = self.config[key] - self.config[key] = override_config[key] - - def reset_override_configs(): - for key in stashed: - self.config[key] = stashed[key] - - # kafka kills the connection when it doesn't recognize an API request - # so we can send a test request and then follow immediately with a - # vanilla MetadataRequest. If the server did not recognize the first - # request, both will be failed with a ConnectionError that wraps - # socket.error (32, 54, or 104) - from kafka.protocol.admin import ListGroupsRequest - from kafka.protocol.api_versions import ApiVersionsRequest - from kafka.protocol.broker_api_versions import BROKER_API_VERSIONS - from kafka.protocol.commit import OffsetFetchRequest, GroupCoordinatorRequest - - test_cases = [ - # All cases starting from 0.10 will be based on ApiVersionsResponse - ((0, 11), ApiVersionsRequest[1]()), - ((0, 10, 0), ApiVersionsRequest[0]()), - ((0, 9), ListGroupsRequest[0]()), - ((0, 8, 2), GroupCoordinatorRequest[0]('kafka-python-default-group')), - ((0, 8, 1), OffsetFetchRequest[0]('kafka-python-default-group', [])), - ((0, 8, 0), MetadataRequest[0](topics)), - ] - - for version, request in test_cases: - if not self.connect_blocking(timeout_at - time.time()): - reset_override_configs() - raise Errors.NodeNotReadyError() - f = self.send(request) - # HACK: sleeping to wait for socket to send bytes - time.sleep(0.1) - # when broker receives an unrecognized request API - # it abruptly closes our socket. - # so we attempt to send a second request immediately - # that we believe it will definitely recognize (metadata) - # the attempt to write to a disconnected socket should - # immediately fail and allow us to infer that the prior - # request was unrecognized - mr = self.send(MetadataRequest[0](topics)) - - if not (f.is_done and mr.is_done) and self._sock is not None: - selector = self.config['selector']() - selector.register(self._sock, selectors.EVENT_READ) - while not (f.is_done and mr.is_done): - selector.select(1) - for response, future in self.recv(): - future.success(response) - selector.close() - - if f.succeeded(): - if version >= (0, 10, 0): - # Starting from 0.10 kafka broker we determine version - # by looking at ApiVersionsResponse - api_versions = self._handle_api_versions_response(f.value) - if not api_versions: - continue - version = self._infer_broker_version_from_api_versions(api_versions) - else: - if version not in BROKER_API_VERSIONS: - raise Errors.UnrecognizedBrokerVersion(version) - self._api_versions = BROKER_API_VERSIONS[version] - log.info('Broker version identified as %s', '.'.join(map(str, version))) - log.info('Set configuration api_version=%s to skip auto' - ' check_version requests on startup', version) - break - - # Only enable strict checking to verify that we understand failure - # modes. For most users, the fact that the request failed should be - # enough to rule out a particular broker version. - if strict: - # If the socket flush hack did not work (which should force the - # connection to close and fail all pending requests), then we - # get a basic Request Timeout. This is not ideal, but we'll deal - if isinstance(f.exception, Errors.RequestTimedOutError): - pass - - # 0.9 brokers do not close the socket on unrecognized api - # requests (bug...). In this case we expect to see a correlation - # id mismatch - elif (isinstance(f.exception, Errors.CorrelationIdError) and - version > (0, 9)): - pass - elif six.PY2: - assert isinstance(f.exception.args[0], socket.error) - assert f.exception.args[0].errno in (32, 54, 104) - else: - assert isinstance(f.exception.args[0], ConnectionError) - log.info("Broker is not v%s -- it did not recognize %s", - version, request.__class__.__name__) + if not self.connect_blocking(timeout_at - time.time()): + raise Errors.NodeNotReadyError() else: - reset_override_configs() - raise Errors.UnrecognizedBrokerVersion() - - reset_override_configs() - return version + return self._api_version def __str__(self): return "" % ( diff --git a/test/test_client_async.py b/test/test_client_async.py index b9b415012..d3fb9dffa 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -58,7 +58,7 @@ def test_can_connect(cli, conn): assert cli._can_connect(0) # Node is connected, can't reconnect - assert cli._maybe_connect(0) is True + assert cli._init_connect(0) is True assert not cli._can_connect(0) # Node is disconnected, can connect @@ -70,10 +70,10 @@ def test_can_connect(cli, conn): assert not cli._can_connect(0) -def test_maybe_connect(cli, conn): +def test_init_connect(cli, conn): try: # Node not in metadata, raises AssertionError - cli._maybe_connect(2) + cli._init_connect(2) except AssertionError: pass else: @@ -83,7 +83,7 @@ def test_maybe_connect(cli, conn): assert 0 not in cli._conns conn.state = ConnectionStates.DISCONNECTED conn.connect.side_effect = lambda: conn._set_conn_state(ConnectionStates.CONNECTING) - assert cli._maybe_connect(0) is False + assert cli._init_connect(0) is False assert cli._conns[0] is conn @@ -127,8 +127,8 @@ def test_ready(mocker, cli, conn): def test_is_ready(mocker, cli, conn): - cli._maybe_connect(0) - cli._maybe_connect(1) + cli._init_connect(0) + cli._init_connect(1) # metadata refresh blocks ready nodes assert cli.is_ready(0) @@ -171,14 +171,14 @@ def test_close(mocker, cli, conn): assert conn.close.call_count == call_count # Single node close - cli._maybe_connect(0) + cli._init_connect(0) assert conn.close.call_count == call_count cli.close(0) call_count += 1 assert conn.close.call_count == call_count # All node close - cli._maybe_connect(1) + cli._init_connect(1) cli.close() # +2 close: node 1, node bootstrap (node 0 already closed) call_count += 2 @@ -190,7 +190,7 @@ def test_is_disconnected(cli, conn): conn.state = ConnectionStates.DISCONNECTED assert not cli.is_disconnected(0) - cli._maybe_connect(0) + cli._init_connect(0) assert cli.is_disconnected(0) conn.state = ConnectionStates.CONNECTING @@ -215,7 +215,7 @@ def test_send(cli, conn): assert isinstance(f.exception, Errors.NodeNotReadyError) conn.state = ConnectionStates.CONNECTED - cli._maybe_connect(0) + cli._init_connect(0) # ProduceRequest w/ 0 required_acks -> no response request = ProduceRequest[0](0, 0, []) assert request.expect_response() is False @@ -344,7 +344,7 @@ def test_maybe_refresh_metadata_cant_send(mocker, client): mocker.patch.object(client, 'least_loaded_node', return_value='foobar') mocker.patch.object(client, '_can_send_request', return_value=False) mocker.patch.object(client, '_can_connect', return_value=True) - mocker.patch.object(client, '_maybe_connect', return_value=True) + mocker.patch.object(client, '_init_connect', return_value=True) mocker.patch.object(client, 'maybe_connect', return_value=True) now = time.time() diff --git a/test/test_conn.py b/test/test_conn.py index fb4172814..25565ddc4 100644 --- a/test/test_conn.py +++ b/test/test_conn.py @@ -26,13 +26,16 @@ def dns_lookup(mocker): def _socket(mocker): socket = mocker.MagicMock() socket.connect_ex.return_value = 0 + socket.send.side_effect = lambda d: len(d) + socket.recv.side_effect = BlockingIOError("mocked recv") mocker.patch('socket.socket', return_value=socket) return socket @pytest.fixture -def conn(_socket, dns_lookup): +def conn(_socket, dns_lookup, mocker): conn = BrokerConnection('localhost', 9092, socket.AF_INET) + mocker.patch.object(conn, '_try_api_versions_check', return_value=True) return conn @@ -216,12 +219,13 @@ def test_recv_disconnected(_socket, conn): conn.send(req) # Empty data on recv means the socket is disconnected + _socket.recv.side_effect = None _socket.recv.return_value = b'' # Attempt to receive should mark connection as disconnected - assert conn.connected() + assert conn.connected(), 'Not connected: %s' % conn.state conn.recv() - assert conn.disconnected() + assert conn.disconnected(), 'Not disconnected: %s' % conn.state def test_recv(_socket, conn): @@ -347,14 +351,14 @@ def test_requests_timed_out(conn): # No in-flight requests, not timed out assert not conn.requests_timed_out() - # Single request, timestamp = now (0) - conn.in_flight_requests[0] = ('foo', 0) + # Single request, timeout_at > now (0) + conn.in_flight_requests[0] = ('foo', 0, 1) assert not conn.requests_timed_out() # Add another request w/ timestamp > request_timeout ago request_timeout = conn.config['request_timeout_ms'] expired_timestamp = 0 - request_timeout - 1 - conn.in_flight_requests[1] = ('bar', expired_timestamp) + conn.in_flight_requests[1] = ('bar', 0, expired_timestamp) assert conn.requests_timed_out() # Drop the expired request and we should be good to go again