Skip to content

Commit e05ba6f

Browse files
authored
Enable pylint workflow; disable/fix all outstanding errors (#2701)
1 parent c650a77 commit e05ba6f

File tree

5 files changed

+24
-20
lines changed

5 files changed

+24
-20
lines changed

.github/workflows/python-package.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ jobs:
6868
python -m pip install --upgrade pip
6969
pip install -r requirements-dev.txt
7070
- name: Pylint
71-
run: pylint --recursive=y --errors-only --exit-zero kafka test
71+
run: pylint --recursive=y --errors-only kafka test
7272
- name: Setup java
7373
uses: actions/setup-java@v5
7474
with:

kafka/admin/client.py

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ def _find_coordinator_id_request(self, group_id):
290290
request = FindCoordinatorRequest[version](group_id)
291291
elif version <= 2:
292292
request = FindCoordinatorRequest[version](group_id, 0)
293-
return request
293+
return request # pylint: disable=E0606
294294

295295
def _find_coordinator_id_process_response(self, response):
296296
"""Process a FindCoordinatorResponse.
@@ -506,7 +506,7 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=False):
506506
)
507507
# TODO convert structs to a more pythonic interface
508508
# TODO raise exceptions if errors
509-
return self._send_request_to_controller(request)
509+
return self._send_request_to_controller(request) # pylint: disable=E0606
510510

511511
def delete_topics(self, topics, timeout_ms=None):
512512
"""Delete topics from the cluster.
@@ -680,7 +680,7 @@ def describe_acls(self, acl_filter):
680680
permission_type=acl_filter.permission_type
681681

682682
)
683-
response = self.send_request(request)
683+
response = self.send_request(request) # pylint: disable=E0606
684684
error_type = Errors.for_code(response.error_code)
685685
if error_type is not Errors.NoError:
686686
# optionally we could retry if error_type.retriable
@@ -793,7 +793,7 @@ def create_acls(self, acls):
793793
request = CreateAclsRequest[version](
794794
creations=[self._convert_create_acls_resource_request_v1(acl) for acl in acls]
795795
)
796-
response = self.send_request(request)
796+
response = self.send_request(request) # pylint: disable=E0606
797797
return self._convert_create_acls_response_to_acls(acls, response)
798798

799799
@staticmethod
@@ -907,7 +907,7 @@ def delete_acls(self, acl_filters):
907907
request = DeleteAclsRequest[version](
908908
filters=[self._convert_delete_acls_resource_request_v1(acl) for acl in acl_filters]
909909
)
910-
response = self.send_request(request)
910+
response = self.send_request(request) # pylint: disable=E0606
911911
return self._convert_delete_acls_response_to_matching_acls(acl_filters, response)
912912

913913
@staticmethod
@@ -1269,14 +1269,15 @@ def _describe_consumer_groups_process_response(self, response):
12691269
# TODO: Fix GroupInformation defaults
12701270
described_group_information_list.append([])
12711271
group_description = GroupInformation._make(described_group_information_list)
1272-
error_code = group_description.error_code
1273-
error_type = Errors.for_code(error_code)
1274-
# Java has the note: KAFKA-6789, we can retry based on the error code
1275-
if error_type is not Errors.NoError:
1276-
raise error_type(
1277-
"DescribeGroupsResponse failed with response '{}'."
1278-
.format(response))
1279-
return group_description
1272+
error_code = group_description.error_code
1273+
error_type = Errors.for_code(error_code)
1274+
# Java has the note: KAFKA-6789, we can retry based on the error code
1275+
if error_type is not Errors.NoError:
1276+
raise error_type(
1277+
"DescribeGroupsResponse failed with response '{}'."
1278+
.format(response))
1279+
return group_description
1280+
assert False, "DescribeGroupsResponse parsing failed"
12801281

12811282
def describe_consumer_groups(self, group_ids, group_coordinator_id=None, include_authorized_operations=False):
12821283
"""Describe a set of consumer groups.

kafka/record/default_records.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ def _maybe_uncompress(self):
245245
uncompressed = lz4_decode(data.tobytes())
246246
if compression_type == self.CODEC_ZSTD:
247247
uncompressed = zstd_decode(data.tobytes())
248-
self._buffer = bytearray(uncompressed)
248+
self._buffer = bytearray(uncompressed) # pylint: disable=E0606
249249
self._pos = 0
250250
self._decompressed = True
251251

@@ -658,7 +658,7 @@ def _maybe_compress(self):
658658
compressed = lz4_encode(data)
659659
elif self._compression_type == self.CODEC_ZSTD:
660660
compressed = zstd_encode(data)
661-
compressed_size = len(compressed)
661+
compressed_size = len(compressed) # pylint: disable=E0606
662662
if len(data) <= compressed_size:
663663
# We did not get any benefit from compression, lets send
664664
# uncompressed

kafka/record/legacy_records.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,9 @@ def _assert_has_codec(self, compression_type):
122122
checker, name = codecs.has_snappy, "snappy"
123123
elif compression_type == self.CODEC_LZ4:
124124
checker, name = codecs.has_lz4, "lz4"
125+
else:
126+
raise UnsupportedCodecError(
127+
"Unrecognized compression type")
125128
if not checker():
126129
raise UnsupportedCodecError(
127130
"Libraries for {} compression codec not found".format(name))
@@ -206,7 +209,7 @@ def _decompress(self, key_offset):
206209
uncompressed = lz4_decode_old_kafka(data.tobytes())
207210
else:
208211
uncompressed = lz4_decode(data.tobytes())
209-
return uncompressed
212+
return uncompressed # pylint: disable=E0606
210213

211214
def _read_header(self, pos):
212215
if self._magic == 0:
@@ -483,7 +486,7 @@ def _maybe_compress(self):
483486
else:
484487
compressed = lz4_encode(data)
485488
size = self.size_in_bytes(
486-
0, timestamp=0, key=None, value=compressed)
489+
0, timestamp=0, key=None, value=compressed) # pylint: disable=E0606
487490
# We will try to reuse the same buffer if we have enough space
488491
if size > len(self._buffer):
489492
self._buffer = bytearray(size)

test/integration/fixtures.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,7 @@ def __init__(self, host, port, broker_id, zookeeper=None, zk_chroot=None,
299299

300300
if self.external:
301301
self.child = ExternalService(self.host, self.port)
302-
(self._client,) = self.get_clients(1, client_id='_internal_client')
302+
self._client = next(self.get_clients(1, client_id='_internal_client'))
303303
self.running = True
304304
else:
305305
self._client = None
@@ -447,7 +447,7 @@ def start(self):
447447
else:
448448
raise RuntimeError('Failed to start KafkaInstance before max_timeout')
449449

450-
(self._client,) = self.get_clients(1, client_id='_internal_client')
450+
self._client = next(self.get_clients(1, client_id='_internal_client'))
451451

452452
self.out("Done!")
453453
self.running = True

0 commit comments

Comments
 (0)