diff --git a/kafka/client_async.py b/kafka/client_async.py index 67014488f..d2fdc8aa0 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -925,10 +925,7 @@ def check_version(self, node_id=None, timeout=2, strict=False): try: remaining = end - time.time() version = conn.check_version(timeout=remaining, strict=strict, topics=list(self.config['bootstrap_topics_filter'])) - if version >= (0, 10, 0): - # cache the api versions map if it's available (starting - # in 0.10 cluster version) - self._api_versions = conn.get_api_versions() + self._api_versions = conn.get_api_versions() self._lock.release() return version except Errors.NodeNotReadyError: diff --git a/kafka/conn.py b/kafka/conn.py index ab3fc6944..4abbbdec8 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -1156,7 +1156,8 @@ def next_ifr_request_timeout_ms(self): def _handle_api_version_response(self, response): error_type = Errors.for_code(response.error_code) - assert error_type is Errors.NoError, "API version check failed" + if error_type is not Errors.NoError: + return False self._api_versions = dict([ (api_key, (min_version, max_version)) for api_key, min_version, max_version in response.api_versions @@ -1168,12 +1169,7 @@ def get_api_versions(self): return self._api_versions version = self.check_version() - if version < (0, 10, 0): - raise Errors.UnsupportedVersionError( - "ApiVersion not supported by cluster version {} < 0.10.0" - .format(version)) - # _api_versions is set as a side effect of check_versions() on a cluster - # that supports 0.10.0 or later + # _api_versions is set as a side effect of check_versions() return self._api_versions def _infer_broker_version_from_api_versions(self, api_versions): @@ -1204,7 +1200,7 @@ def _infer_broker_version_from_api_versions(self, api_versions): if min_version <= struct.API_VERSION <= max_version: return broker_version - # We know that ApiVersionResponse is only supported in 0.10+ + # We know that ApiVersionsResponse is only supported in 0.10+ # so if all else fails, choose that return (0, 10, 0) @@ -1236,12 +1232,14 @@ def reset_override_configs(): # vanilla MetadataRequest. If the server did not recognize the first # request, both will be failed with a ConnectionError that wraps # socket.error (32, 54, or 104) - from kafka.protocol.admin import ApiVersionRequest, ListGroupsRequest + from kafka.protocol.admin import ListGroupsRequest + from kafka.protocol.api_versions import ApiVersionsRequest, OLD_BROKER_API_VERSIONS from kafka.protocol.commit import OffsetFetchRequest, GroupCoordinatorRequest test_cases = [ - # All cases starting from 0.10 will be based on ApiVersionResponse - ((0, 10), ApiVersionRequest[0]()), + # All cases starting from 0.10 will be based on ApiVersionsResponse + ((0, 11), ApiVersionsRequest[1]()), + ((0, 10), ApiVersionsRequest[0]()), ((0, 9), ListGroupsRequest[0]()), ((0, 8, 2), GroupCoordinatorRequest[0]('kafka-python-default-group')), ((0, 8, 1), OffsetFetchRequest[0]('kafka-python-default-group', [])), @@ -1274,11 +1272,15 @@ def reset_override_configs(): selector.close() if f.succeeded(): - if isinstance(request, ApiVersionRequest[0]): + if version >= (0, 10): # Starting from 0.10 kafka broker we determine version - # by looking at ApiVersionResponse + # by looking at ApiVersionsResponse api_versions = self._handle_api_version_response(f.value) + if not api_versions: + continue version = self._infer_broker_version_from_api_versions(api_versions) + else: + self._api_versions = OLD_BROKER_API_VERSIONS.get(version) log.info('Broker version identified as %s', '.'.join(map(str, version))) log.info('Set configuration api_version=%s to skip auto' ' check_version requests on startup', version) diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py index 87768f839..3da5c5419 100644 --- a/kafka/protocol/admin.py +++ b/kafka/protocol/admin.py @@ -4,66 +4,6 @@ from kafka.protocol.types import Array, Boolean, Bytes, Int8, Int16, Int32, Int64, Schema, String, Float64, CompactString, CompactArray, TaggedFields -class ApiVersionResponse_v0(Response): - API_KEY = 18 - API_VERSION = 0 - SCHEMA = Schema( - ('error_code', Int16), - ('api_versions', Array( - ('api_key', Int16), - ('min_version', Int16), - ('max_version', Int16))) - ) - - -class ApiVersionResponse_v1(Response): - API_KEY = 18 - API_VERSION = 1 - SCHEMA = Schema( - ('error_code', Int16), - ('api_versions', Array( - ('api_key', Int16), - ('min_version', Int16), - ('max_version', Int16))), - ('throttle_time_ms', Int32) - ) - - -class ApiVersionResponse_v2(Response): - API_KEY = 18 - API_VERSION = 2 - SCHEMA = ApiVersionResponse_v1.SCHEMA - - -class ApiVersionRequest_v0(Request): - API_KEY = 18 - API_VERSION = 0 - RESPONSE_TYPE = ApiVersionResponse_v0 - SCHEMA = Schema() - - -class ApiVersionRequest_v1(Request): - API_KEY = 18 - API_VERSION = 1 - RESPONSE_TYPE = ApiVersionResponse_v1 - SCHEMA = ApiVersionRequest_v0.SCHEMA - - -class ApiVersionRequest_v2(Request): - API_KEY = 18 - API_VERSION = 2 - RESPONSE_TYPE = ApiVersionResponse_v1 - SCHEMA = ApiVersionRequest_v0.SCHEMA - - -ApiVersionRequest = [ - ApiVersionRequest_v0, ApiVersionRequest_v1, ApiVersionRequest_v2, -] -ApiVersionResponse = [ - ApiVersionResponse_v0, ApiVersionResponse_v1, ApiVersionResponse_v2, -] - - class CreateTopicsResponse_v0(Response): API_KEY = 19 API_VERSION = 0 diff --git a/kafka/protocol/api_versions.py b/kafka/protocol/api_versions.py new file mode 100644 index 000000000..55a833010 --- /dev/null +++ b/kafka/protocol/api_versions.py @@ -0,0 +1,151 @@ +from __future__ import absolute_import + +from kafka.protocol.api import Request, Response +from kafka.protocol.types import Array, Int16, Int32, Schema + + +class BaseApiVersionsResponse(Response): + API_KEY = 18 + API_VERSION = 0 + SCHEMA = Schema( + ('error_code', Int16), + ('api_versions', Array( + ('api_key', Int16), + ('min_version', Int16), + ('max_version', Int16))) + ) + + @classmethod + def decode(cls, data): + if isinstance(data, bytes): + data = BytesIO(data) + # Check error_code, decode as v0 if any error + curr = data.tell() + err = Int16.decode(data) + data.seek(curr) + if err != 0: + return ApiVersionsResponse_v0.decode(data) + return super(BaseApiVersionsResponse, cls).decode(data) + + +class ApiVersionsResponse_v0(Response): + API_KEY = 18 + API_VERSION = 0 + SCHEMA = Schema( + ('error_code', Int16), + ('api_versions', Array( + ('api_key', Int16), + ('min_version', Int16), + ('max_version', Int16))) + ) + + +class ApiVersionsResponse_v1(BaseApiVersionsResponse): + API_KEY = 18 + API_VERSION = 1 + SCHEMA = Schema( + ('error_code', Int16), + ('api_versions', Array( + ('api_key', Int16), + ('min_version', Int16), + ('max_version', Int16))), + ('throttle_time_ms', Int32) + ) + + +class ApiVersionsResponse_v2(BaseApiVersionsResponse): + API_KEY = 18 + API_VERSION = 2 + SCHEMA = ApiVersionsResponse_v1.SCHEMA + + +class ApiVersionsRequest_v0(Request): + API_KEY = 18 + API_VERSION = 0 + RESPONSE_TYPE = ApiVersionsResponse_v0 + SCHEMA = Schema() + + +class ApiVersionsRequest_v1(Request): + API_KEY = 18 + API_VERSION = 1 + RESPONSE_TYPE = ApiVersionsResponse_v1 + SCHEMA = ApiVersionsRequest_v0.SCHEMA + + +class ApiVersionsRequest_v2(Request): + API_KEY = 18 + API_VERSION = 2 + RESPONSE_TYPE = ApiVersionsResponse_v1 + SCHEMA = ApiVersionsRequest_v0.SCHEMA + + +ApiVersionsRequest = [ + ApiVersionsRequest_v0, ApiVersionsRequest_v1, ApiVersionsRequest_v2, +] +ApiVersionsResponse = [ + ApiVersionsResponse_v0, ApiVersionsResponse_v1, ApiVersionsResponse_v2, +] + + +OLD_BROKER_API_VERSIONS = { + (0, 8, 0): { + 0: (0, 0), + 1: (0, 0), + 2: (0, 0), + 3: (0, 0), + }, + (0, 8, 1): { # adds offset commit + fetch + 0: (0, 0), + 1: (0, 0), + 2: (0, 0), + 3: (0, 0), + 8: (0, 0), + 9: (0, 0), + }, + (0, 8, 2): { # adds find coordinator + 0: (0, 0), + 1: (0, 0), + 2: (0, 0), + 3: (0, 0), + 8: (0, 1), + 9: (0, 1), + 10: (0, 0), + }, + (0, 9): { # adds group management (join/sync/leave/heartbeat) + 0: (0, 1), + 1: (0, 1), + 2: (0, 0), + 3: (0, 0), + 8: (0, 2), + 9: (0, 1), + 10: (0, 0), + 11: (0, 0), + 12: (0, 0), + 13: (0, 0), + 14: (0, 0), + 15: (0, 0), + 16: (0, 0), + }, + (0, 10): { # adds sasl + api versions [pulled from api_versions response] + 0: (0, 2), + 1: (0, 2), + 2: (0, 0), + 3: (0, 1), + 4: (0, 0), + 5: (0, 0), + 6: (0, 2), + 7: (1, 1), + 8: (0, 2), + 9: (0, 1), + 10: (0, 0), + 11: (0, 0), + 12: (0, 0), + 13: (0, 0), + 14: (0, 0), + 15: (0, 0), + 16: (0, 0), + 17: (0, 0), + 18: (0, 0), + }, +}