Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Call ApiVersionsRequest during connection, prior to Sasl Handshake #2493

Draft
wants to merge 14 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ def __init__(self, **configs):
self.config['api_version'] = self._client.config['api_version']
else:
# need to run check_version for get_api_versions()
self._client.check_version(timeout=(self.config['api_version_auto_timeout_ms'] / 1000))
self._client.check_version()

self._closed = False
self._refresh_controller_id()
Expand Down Expand Up @@ -294,7 +294,7 @@ def _refresh_controller_id(self, timeout_ms=30000):
time.sleep(1)
continue
# verify the controller is new enough to support our requests
controller_version = self._client.check_version(node_id=controller_id, timeout=(self.config['api_version_auto_timeout_ms'] / 1000))
controller_version = self._client.check_version(node_id=controller_id)
if controller_version < (0, 10, 0):
raise IncompatibleBrokerVersion(
"The controller appears to be running Kafka {}. KafkaAdminClient requires brokers >= 0.10.0.0."
Expand Down
128 changes: 66 additions & 62 deletions kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,7 @@ def __init__(self, **configs):

# Check Broker Version if not set explicitly
if self.config['api_version'] is None:
check_timeout = self.config['api_version_auto_timeout_ms'] / 1000
self.config['api_version'] = self.check_version(timeout=check_timeout)
self.config['api_version'] = self.check_version()

def _init_wakeup_socketpair(self):
self._wake_r, self._wake_w = socket.socketpair()
Expand Down Expand Up @@ -283,7 +282,7 @@ def _can_connect(self, node_id):

def _conn_state_change(self, node_id, sock, conn):
with self._lock:
if conn.connecting():
if conn.state is ConnectionStates.CONNECTING:
# SSL connections can enter this state 2x (second during Handshake)
if node_id not in self._connecting:
self._connecting.add(node_id)
Expand All @@ -295,7 +294,13 @@ def _conn_state_change(self, node_id, sock, conn):
if self.cluster.is_bootstrap(node_id):
self._last_bootstrap = time.time()

elif conn.connected():
elif conn.state in (ConnectionStates.API_VERSIONS, ConnectionStates.AUTHENTICATING):
try:
self._selector.register(sock, selectors.EVENT_READ, conn)
except KeyError:
self._selector.modify(sock, selectors.EVENT_READ, conn)

elif conn.state is ConnectionStates.CONNECTED:
log.debug("Node %s connected", node_id)
if node_id in self._connecting:
self._connecting.remove(node_id)
Expand All @@ -312,6 +317,8 @@ def _conn_state_change(self, node_id, sock, conn):

if self.cluster.is_bootstrap(node_id):
self._bootstrap_fails = 0
if self._api_versions is None:
self._api_versions = conn._api_versions

else:
for node_id in list(self._conns.keys()):
Expand Down Expand Up @@ -378,11 +385,16 @@ def _should_recycle_connection(self, conn):

return False

def _maybe_connect(self, node_id):
def _init_connect(self, node_id):
"""Idempotent non-blocking connection attempt to the given node id."""
with self._lock:
conn = self._conns.get(node_id)

# Check if existing connection should be recreated because host/port changed
if conn is not None and self._should_recycle_connection(conn):
self._conns.pop(node_id)
conn = None

if conn is None:
broker = self.cluster.broker_metadata(node_id)
assert broker, 'Broker id %s not in current metadata' % (node_id,)
Expand All @@ -397,15 +409,8 @@ def _maybe_connect(self, node_id):
**self.config)
self._conns[node_id] = conn

# Check if existing connection should be recreated because host/port changed
elif self._should_recycle_connection(conn):
self._conns.pop(node_id)
return False

elif conn.connected():
return True

conn.connect()
if conn.disconnected():
conn.connect()
return conn.connected()

def ready(self, node_id, metadata_priority=True):
Expand Down Expand Up @@ -595,7 +600,7 @@ def poll(self, timeout_ms=None, future=None):

# Attempt to complete pending connections
for node_id in list(self._connecting):
self._maybe_connect(node_id)
self._init_connect(node_id)

# If we got a future that is already done, don't block in _poll
if future is not None and future.is_done:
Expand Down Expand Up @@ -709,11 +714,13 @@ def _poll(self, timeout):

for conn in six.itervalues(self._conns):
if conn.requests_timed_out():
timed_out = conn.timed_out_ifrs()
timeout_ms = (timed_out[0][2] - timed_out[0][1]) * 1000
log.warning('%s timed out after %s ms. Closing connection.',
conn, conn.config['request_timeout_ms'])
conn, timeout_ms)
conn.close(error=Errors.RequestTimedOutError(
'Request timed out after %s ms' %
conn.config['request_timeout_ms']))
timeout_ms))

if self._sensors:
self._sensors.io_time.record((time.time() - end_select) * 1000000000)
Expand Down Expand Up @@ -896,67 +903,64 @@ def refresh_done(val_or_error):
def get_api_versions(self):
"""Return the ApiVersions map, if available.

Note: A call to check_version must previously have succeeded and returned
version 0.10.0 or later
Note: Only available after bootstrap; requires broker version 0.10.0 or later.

Returns: a map of dict mapping {api_key : (min_version, max_version)},
or None if ApiVersion is not supported by the kafka cluster.
"""
return self._api_versions

def check_version(self, node_id=None, timeout=2, strict=False):
def check_version(self, node_id=None, timeout=None, **kwargs):
"""Attempt to guess the version of a Kafka broker.

Note: It is possible that this method blocks longer than the
specified timeout. This can happen if the entire cluster
is down and the client enters a bootstrap backoff sleep.
This is only possible if node_id is None.
Keyword Arguments:
node_id (str, optional): Broker node id from cluster metadata. If None, attempts
to connect to any available broker until version is identified.
Default: None
timeout (num, optional): Maximum time in seconds to try to check broker version.
If unable to identify version before timeout, raise error (see below).
Default: api_version_auto_timeout_ms / 1000

Returns: version tuple, i.e. (0, 10), (0, 9), (0, 8, 2), ...

Raises:
NodeNotReadyError (if node_id is provided)
NoBrokersAvailable (if node_id is None)
UnrecognizedBrokerVersion: please file bug if seen!
AssertionError (if strict=True): please file bug if seen!
"""
self._lock.acquire()
end = time.time() + timeout
while time.time() < end:

# It is possible that least_loaded_node falls back to bootstrap,
# which can block for an increasing backoff period
try_node = node_id or self.least_loaded_node()
if try_node is None:
self._lock.release()
raise Errors.NoBrokersAvailable()
self._maybe_connect(try_node)
conn = self._conns[try_node]

# We will intentionally cause socket failures
# These should not trigger metadata refresh
self._refresh_on_disconnects = False
try:
remaining = end - time.time()
version = conn.check_version(timeout=remaining, strict=strict, topics=list(self.config['bootstrap_topics_filter']))
if version >= (0, 10, 0):
# cache the api versions map if it's available (starting
# in 0.10 cluster version)
self._api_versions = conn.get_api_versions()
self._lock.release()
return version
except Errors.NodeNotReadyError:
# Only raise to user if this is a node-specific request
timeout = timeout or (self.config['api_version_auto_timeout_ms'] / 1000)
with self._lock:
end = time.time() + timeout
while time.time() < end:
time_remaining = max(end - time.time(), 0)
if node_id is not None and self.connection_delay(node_id) > 0:
sleep_time = min(time_remaining, self.connection_delay(node_id) / 1000.0)
if sleep_time > 0:
time.sleep(sleep_time)
continue
try_node = node_id or self.least_loaded_node()
if try_node is None:
sleep_time = min(time_remaining, self.least_loaded_node_refresh_ms() / 1000.0)
if sleep_time > 0:
log.warning('No node available during check_version; sleeping %.2f secs', sleep_time)
time.sleep(sleep_time)
continue
log.debug('Attempting to check version with node %s', try_node)
self._init_connect(try_node)
conn = self._conns[try_node]

while conn.connecting() and time.time() < end:
timeout_ms = min((end - time.time()) * 1000, 200)
self.poll(timeout_ms=timeout_ms)

if conn._api_version is not None:
return conn._api_version

# Timeout
else:
if node_id is not None:
self._lock.release()
raise
finally:
self._refresh_on_disconnects = True

# Timeout
else:
self._lock.release()
raise Errors.NoBrokersAvailable()
raise Errors.NodeNotReadyError(node_id)
else:
raise Errors.NoBrokersAvailable()

def wakeup(self):
if self._waking or self._wake_w is None:
Expand Down
Loading