Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fake api_versions for old brokers, rename to ApiVersionsRequest, and handle error decoding #2494

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -925,10 +925,7 @@ def check_version(self, node_id=None, timeout=2, strict=False):
try:
remaining = end - time.time()
version = conn.check_version(timeout=remaining, strict=strict, topics=list(self.config['bootstrap_topics_filter']))
if version >= (0, 10, 0):
# cache the api versions map if it's available (starting
# in 0.10 cluster version)
self._api_versions = conn.get_api_versions()
self._api_versions = conn.get_api_versions()
self._lock.release()
return version
except Errors.NodeNotReadyError:
Expand Down
28 changes: 15 additions & 13 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -1156,7 +1156,8 @@ def next_ifr_request_timeout_ms(self):

def _handle_api_version_response(self, response):
error_type = Errors.for_code(response.error_code)
assert error_type is Errors.NoError, "API version check failed"
if error_type is not Errors.NoError:
return False
self._api_versions = dict([
(api_key, (min_version, max_version))
for api_key, min_version, max_version in response.api_versions
Expand All @@ -1168,12 +1169,7 @@ def get_api_versions(self):
return self._api_versions

version = self.check_version()
if version < (0, 10, 0):
raise Errors.UnsupportedVersionError(
"ApiVersion not supported by cluster version {} < 0.10.0"
.format(version))
# _api_versions is set as a side effect of check_versions() on a cluster
# that supports 0.10.0 or later
# _api_versions is set as a side effect of check_versions()
return self._api_versions

def _infer_broker_version_from_api_versions(self, api_versions):
Expand Down Expand Up @@ -1204,7 +1200,7 @@ def _infer_broker_version_from_api_versions(self, api_versions):
if min_version <= struct.API_VERSION <= max_version:
return broker_version

# We know that ApiVersionResponse is only supported in 0.10+
# We know that ApiVersionsResponse is only supported in 0.10+
# so if all else fails, choose that
return (0, 10, 0)

Expand Down Expand Up @@ -1236,12 +1232,14 @@ def reset_override_configs():
# vanilla MetadataRequest. If the server did not recognize the first
# request, both will be failed with a ConnectionError that wraps
# socket.error (32, 54, or 104)
from kafka.protocol.admin import ApiVersionRequest, ListGroupsRequest
from kafka.protocol.admin import ListGroupsRequest
from kafka.protocol.api_versions import ApiVersionsRequest, OLD_BROKER_API_VERSIONS
from kafka.protocol.commit import OffsetFetchRequest, GroupCoordinatorRequest

test_cases = [
# All cases starting from 0.10 will be based on ApiVersionResponse
((0, 10), ApiVersionRequest[0]()),
# All cases starting from 0.10 will be based on ApiVersionsResponse
((0, 11), ApiVersionsRequest[1]()),
((0, 10), ApiVersionsRequest[0]()),
((0, 9), ListGroupsRequest[0]()),
((0, 8, 2), GroupCoordinatorRequest[0]('kafka-python-default-group')),
((0, 8, 1), OffsetFetchRequest[0]('kafka-python-default-group', [])),
Expand Down Expand Up @@ -1274,11 +1272,15 @@ def reset_override_configs():
selector.close()

if f.succeeded():
if isinstance(request, ApiVersionRequest[0]):
if version >= (0, 10):
# Starting from 0.10 kafka broker we determine version
# by looking at ApiVersionResponse
# by looking at ApiVersionsResponse
api_versions = self._handle_api_version_response(f.value)
if not api_versions:
continue
version = self._infer_broker_version_from_api_versions(api_versions)
else:
self._api_versions = OLD_BROKER_API_VERSIONS.get(version)
log.info('Broker version identified as %s', '.'.join(map(str, version)))
log.info('Set configuration api_version=%s to skip auto'
' check_version requests on startup', version)
Expand Down
60 changes: 0 additions & 60 deletions kafka/protocol/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,66 +4,6 @@
from kafka.protocol.types import Array, Boolean, Bytes, Int8, Int16, Int32, Int64, Schema, String, Float64, CompactString, CompactArray, TaggedFields


class ApiVersionResponse_v0(Response):
API_KEY = 18
API_VERSION = 0
SCHEMA = Schema(
('error_code', Int16),
('api_versions', Array(
('api_key', Int16),
('min_version', Int16),
('max_version', Int16)))
)


class ApiVersionResponse_v1(Response):
API_KEY = 18
API_VERSION = 1
SCHEMA = Schema(
('error_code', Int16),
('api_versions', Array(
('api_key', Int16),
('min_version', Int16),
('max_version', Int16))),
('throttle_time_ms', Int32)
)


class ApiVersionResponse_v2(Response):
API_KEY = 18
API_VERSION = 2
SCHEMA = ApiVersionResponse_v1.SCHEMA


class ApiVersionRequest_v0(Request):
API_KEY = 18
API_VERSION = 0
RESPONSE_TYPE = ApiVersionResponse_v0
SCHEMA = Schema()


class ApiVersionRequest_v1(Request):
API_KEY = 18
API_VERSION = 1
RESPONSE_TYPE = ApiVersionResponse_v1
SCHEMA = ApiVersionRequest_v0.SCHEMA


class ApiVersionRequest_v2(Request):
API_KEY = 18
API_VERSION = 2
RESPONSE_TYPE = ApiVersionResponse_v1
SCHEMA = ApiVersionRequest_v0.SCHEMA


ApiVersionRequest = [
ApiVersionRequest_v0, ApiVersionRequest_v1, ApiVersionRequest_v2,
]
ApiVersionResponse = [
ApiVersionResponse_v0, ApiVersionResponse_v1, ApiVersionResponse_v2,
]


class CreateTopicsResponse_v0(Response):
API_KEY = 19
API_VERSION = 0
Expand Down
151 changes: 151 additions & 0 deletions kafka/protocol/api_versions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
from __future__ import absolute_import

from kafka.protocol.api import Request, Response
from kafka.protocol.types import Array, Int16, Int32, Schema


class BaseApiVersionsResponse(Response):
API_KEY = 18
API_VERSION = 0
SCHEMA = Schema(
('error_code', Int16),
('api_versions', Array(
('api_key', Int16),
('min_version', Int16),
('max_version', Int16)))
)

@classmethod
def decode(cls, data):
if isinstance(data, bytes):
data = BytesIO(data)
# Check error_code, decode as v0 if any error
curr = data.tell()
err = Int16.decode(data)
data.seek(curr)
if err != 0:
return ApiVersionsResponse_v0.decode(data)
return super(BaseApiVersionsResponse, cls).decode(data)


class ApiVersionsResponse_v0(Response):
API_KEY = 18
API_VERSION = 0
SCHEMA = Schema(
('error_code', Int16),
('api_versions', Array(
('api_key', Int16),
('min_version', Int16),
('max_version', Int16)))
)


class ApiVersionsResponse_v1(BaseApiVersionsResponse):
API_KEY = 18
API_VERSION = 1
SCHEMA = Schema(
('error_code', Int16),
('api_versions', Array(
('api_key', Int16),
('min_version', Int16),
('max_version', Int16))),
('throttle_time_ms', Int32)
)


class ApiVersionsResponse_v2(BaseApiVersionsResponse):
API_KEY = 18
API_VERSION = 2
SCHEMA = ApiVersionsResponse_v1.SCHEMA


class ApiVersionsRequest_v0(Request):
API_KEY = 18
API_VERSION = 0
RESPONSE_TYPE = ApiVersionsResponse_v0
SCHEMA = Schema()


class ApiVersionsRequest_v1(Request):
API_KEY = 18
API_VERSION = 1
RESPONSE_TYPE = ApiVersionsResponse_v1
SCHEMA = ApiVersionsRequest_v0.SCHEMA


class ApiVersionsRequest_v2(Request):
API_KEY = 18
API_VERSION = 2
RESPONSE_TYPE = ApiVersionsResponse_v1
SCHEMA = ApiVersionsRequest_v0.SCHEMA


ApiVersionsRequest = [
ApiVersionsRequest_v0, ApiVersionsRequest_v1, ApiVersionsRequest_v2,
]
ApiVersionsResponse = [
ApiVersionsResponse_v0, ApiVersionsResponse_v1, ApiVersionsResponse_v2,
]


OLD_BROKER_API_VERSIONS = {
(0, 8, 0): {
0: (0, 0),
1: (0, 0),
2: (0, 0),
3: (0, 0),
},
(0, 8, 1): { # adds offset commit + fetch
0: (0, 0),
1: (0, 0),
2: (0, 0),
3: (0, 0),
8: (0, 0),
9: (0, 0),
},
(0, 8, 2): { # adds find coordinator
0: (0, 0),
1: (0, 0),
2: (0, 0),
3: (0, 0),
8: (0, 1),
9: (0, 1),
10: (0, 0),
},
(0, 9): { # adds group management (join/sync/leave/heartbeat)
0: (0, 1),
1: (0, 1),
2: (0, 0),
3: (0, 0),
8: (0, 2),
9: (0, 1),
10: (0, 0),
11: (0, 0),
12: (0, 0),
13: (0, 0),
14: (0, 0),
15: (0, 0),
16: (0, 0),
},
(0, 10): { # adds sasl + api versions [pulled from api_versions response]
0: (0, 2),
1: (0, 2),
2: (0, 0),
3: (0, 1),
4: (0, 0),
5: (0, 0),
6: (0, 2),
7: (1, 1),
8: (0, 2),
9: (0, 1),
10: (0, 0),
11: (0, 0),
12: (0, 0),
13: (0, 0),
14: (0, 0),
15: (0, 0),
16: (0, 0),
17: (0, 0),
18: (0, 0),
},
}