From 61fa0b27685c2d4e67d1b6575ca6797f36eb1bfa Mon Sep 17 00:00:00 2001 From: Jeff Widman Date: Thu, 22 Aug 2019 21:14:37 -0700 Subject: [PATCH] Convert remaining `KafkaConsumer` tests to `pytest` (#1886) This makes it so the only remaining use of `unittest` is in the old tests of the deprecated `Simple*` clients. All `KafkaConsumer` tests are migrated to `pytest`. I also had to bump the test iterations up on one of the tests, I think there was a race condition there that was more commonly hit under pytest , planning to cleanup that in a followup PR. See https://github.com/dpkp/kafka-python/pull/1886#discussion_r316860737 for details. --- test/conftest.py | 26 ++ test/test_consumer_group.py | 2 + test/test_consumer_integration.py | 501 +++++++++++++++--------------- test/testutil.py | 11 + 4 files changed, 284 insertions(+), 256 deletions(-) diff --git a/test/conftest.py b/test/conftest.py index 5015cc7a1..267ac6aa9 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -1,5 +1,7 @@ from __future__ import absolute_import +import uuid + import pytest from test.testutil import env_kafka_version, random_string @@ -137,3 +139,27 @@ def _set_conn_state(state): conn.connected = lambda: conn.state is ConnectionStates.CONNECTED conn.disconnected = lambda: conn.state is ConnectionStates.DISCONNECTED return conn + + +@pytest.fixture() +def send_messages(topic, kafka_producer, request): + """A factory that returns a send_messages function with a pre-populated + topic topic / producer.""" + + def _send_messages(number_range, partition=0, topic=topic, producer=kafka_producer, request=request): + """ + messages is typically `range(0,100)` + partition is an int + """ + messages_and_futures = [] # [(message, produce_future),] + for i in number_range: + # request.node.name provides the test name (including parametrized values) + encoded_msg = '{}-{}-{}'.format(i, request.node.name, uuid.uuid4()).encode('utf-8') + future = kafka_producer.send(topic, value=encoded_msg, partition=partition) + messages_and_futures.append((encoded_msg, future)) + kafka_producer.flush() + for (msg, f) in messages_and_futures: + assert f.succeeded() + return [msg for (msg, f) in messages_and_futures] + + return _send_messages diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py index 33676179d..58dc7ebf9 100644 --- a/test/test_consumer_group.py +++ b/test/test_consumer_group.py @@ -29,6 +29,7 @@ def test_consumer(kafka_broker, topic): assert consumer._client._conns[node_id].state is ConnectionStates.CONNECTED consumer.close() + @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") def test_consumer_topics(kafka_broker, topic): consumer = KafkaConsumer(bootstrap_servers=get_connect_str(kafka_broker)) @@ -38,6 +39,7 @@ def test_consumer_topics(kafka_broker, topic): assert len(consumer.partitions_for_topic(topic)) > 0 consumer.close() + @pytest.mark.skipif(env_kafka_version() < (0, 9), reason='Unsupported Kafka Version') def test_group(kafka_broker, topic): num_partitions = 4 diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index cb0524294..c7e2ebf5e 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -4,7 +4,6 @@ from mock import patch import pytest -from kafka.vendor import six from kafka.vendor.six.moves import range from . import unittest @@ -23,34 +22,26 @@ ) from test.fixtures import ZookeeperFixture, KafkaFixture -from test.testutil import KafkaIntegrationTestCase, Timer, env_kafka_version, random_string +from test.testutil import KafkaIntegrationTestCase, Timer, assert_message_count, env_kafka_version, random_string @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") -def test_kafka_consumer(kafka_producer, topic, kafka_consumer_factory): +def test_kafka_consumer(kafka_consumer_factory, send_messages): """Test KafkaConsumer""" - kafka_consumer = kafka_consumer_factory(auto_offset_reset='earliest') - - # TODO replace this with a `send_messages()` pytest fixture - # as we will likely need this elsewhere - for i in range(0, 100): - kafka_producer.send(topic, partition=0, value=str(i).encode()) - for i in range(100, 200): - kafka_producer.send(topic, partition=1, value=str(i).encode()) - kafka_producer.flush() - + consumer = kafka_consumer_factory(auto_offset_reset='earliest') + send_messages(range(0, 100), partition=0) + send_messages(range(0, 100), partition=1) cnt = 0 - messages = {0: set(), 1: set()} - for message in kafka_consumer: + messages = {0: [], 1: []} + for message in consumer: logging.debug("Consumed message %s", repr(message)) cnt += 1 - messages[message.partition].add(message.offset) + messages[message.partition].append(message) if cnt >= 200: break - assert len(messages[0]) == 100 - assert len(messages[1]) == 100 - kafka_consumer.close() + assert_message_count(messages[0], 100) + assert_message_count(messages[1], 100) @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") @@ -547,242 +538,240 @@ def test_fetch_buffer_size(self): messages = [ message for message in consumer ] self.assertEqual(len(messages), 2) - @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") - def test_kafka_consumer__blocking(self): - TIMEOUT_MS = 500 - consumer = self.kafka_consumer(auto_offset_reset='earliest', - enable_auto_commit=False, - consumer_timeout_ms=TIMEOUT_MS) - - # Manual assignment avoids overhead of consumer group mgmt - consumer.unsubscribe() - consumer.assign([TopicPartition(self.topic, 0)]) - # Ask for 5 messages, nothing in queue, block 500ms - with Timer() as t: - with self.assertRaises(StopIteration): - msg = next(consumer) - self.assertGreaterEqual(t.interval, TIMEOUT_MS / 1000.0 ) - - self.send_messages(0, range(0, 10)) - - # Ask for 5 messages, 10 in queue. Get 5 back, no blocking - messages = set() - with Timer() as t: - for i in range(5): +@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") +def test_kafka_consumer__blocking(kafka_consumer_factory, topic, send_messages): + TIMEOUT_MS = 500 + consumer = kafka_consumer_factory(auto_offset_reset='earliest', + enable_auto_commit=False, + consumer_timeout_ms=TIMEOUT_MS) + + # Manual assignment avoids overhead of consumer group mgmt + consumer.unsubscribe() + consumer.assign([TopicPartition(topic, 0)]) + + # Ask for 5 messages, nothing in queue, block 500ms + with Timer() as t: + with pytest.raises(StopIteration): + msg = next(consumer) + assert t.interval >= (TIMEOUT_MS / 1000.0) + + send_messages(range(0, 10)) + + # Ask for 5 messages, 10 in queue. Get 5 back, no blocking + messages = [] + with Timer() as t: + for i in range(5): + msg = next(consumer) + messages.append(msg) + assert_message_count(messages, 5) + assert t.interval < (TIMEOUT_MS / 1000.0) + + # Ask for 10 messages, get 5 back, block 500ms + messages = [] + with Timer() as t: + with pytest.raises(StopIteration): + for i in range(10): msg = next(consumer) - messages.add((msg.partition, msg.offset)) - self.assertEqual(len(messages), 5) - self.assertLess(t.interval, TIMEOUT_MS / 1000.0 ) - - # Ask for 10 messages, get 5 back, block 500ms - messages = set() - with Timer() as t: - with self.assertRaises(StopIteration): - for i in range(10): - msg = next(consumer) - messages.add((msg.partition, msg.offset)) - self.assertEqual(len(messages), 5) - self.assertGreaterEqual(t.interval, TIMEOUT_MS / 1000.0 ) - consumer.close() - - @pytest.mark.skipif(env_kafka_version() < (0, 8, 1), reason="Requires KAFKA_VERSION >= 0.8.1") - def test_kafka_consumer__offset_commit_resume(self): - GROUP_ID = random_string(10) - - self.send_messages(0, range(0, 100)) - self.send_messages(1, range(100, 200)) - - # Start a consumer - consumer1 = self.kafka_consumer( - group_id=GROUP_ID, - enable_auto_commit=True, - auto_commit_interval_ms=100, - auto_offset_reset='earliest', - ) - - # Grab the first 180 messages - output_msgs1 = [] - for _ in range(180): - m = next(consumer1) - output_msgs1.append((m.key, m.value)) - self.assert_message_count(output_msgs1, 180) - consumer1.close() - - # The total offset across both partitions should be at 180 - consumer2 = self.kafka_consumer( - group_id=GROUP_ID, - enable_auto_commit=True, - auto_commit_interval_ms=100, - auto_offset_reset='earliest', - ) - - # 181-200 - output_msgs2 = [] - for _ in range(20): - m = next(consumer2) - output_msgs2.append((m.key, m.value)) - self.assert_message_count(output_msgs2, 20) - self.assertEqual(len(set(output_msgs1) | set(output_msgs2)), 200) - consumer2.close() - - @pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1") - def test_kafka_consumer_max_bytes_simple(self): - self.send_messages(0, range(100, 200)) - self.send_messages(1, range(200, 300)) - - # Start a consumer - consumer = self.kafka_consumer( - auto_offset_reset='earliest', fetch_max_bytes=300) - seen_partitions = set([]) - for i in range(10): - poll_res = consumer.poll(timeout_ms=100) - for partition, msgs in six.iteritems(poll_res): - for msg in msgs: - seen_partitions.add(partition) - - # Check that we fetched at least 1 message from both partitions - self.assertEqual( - seen_partitions, set([ - TopicPartition(self.topic, 0), TopicPartition(self.topic, 1)])) - consumer.close() - - @pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1") - def test_kafka_consumer_max_bytes_one_msg(self): - # We send to only 1 partition so we don't have parallel requests to 2 - # nodes for data. - self.send_messages(0, range(100, 200)) - - # Start a consumer. FetchResponse_v3 should always include at least 1 - # full msg, so by setting fetch_max_bytes=1 we should get 1 msg at a time - # But 0.11.0.0 returns 1 MessageSet at a time when the messages are - # stored in the new v2 format by the broker. - # - # DP Note: This is a strange test. The consumer shouldn't care - # how many messages are included in a FetchResponse, as long as it is - # non-zero. I would not mind if we deleted this test. It caused - # a minor headache when testing 0.11.0.0. - group = 'test-kafka-consumer-max-bytes-one-msg-' + random_string(5) - consumer = self.kafka_consumer( - group_id=group, - auto_offset_reset='earliest', - consumer_timeout_ms=5000, - fetch_max_bytes=1) - - fetched_msgs = [next(consumer) for i in range(10)] - self.assertEqual(len(fetched_msgs), 10) - consumer.close() - - @pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1") - def test_kafka_consumer_offsets_for_time(self): - late_time = int(time.time()) * 1000 - middle_time = late_time - 1000 - early_time = late_time - 2000 - tp = TopicPartition(self.topic, 0) - - timeout = 10 - kafka_producer = self.kafka_producer() - early_msg = kafka_producer.send( - self.topic, partition=0, value=b"first", - timestamp_ms=early_time).get(timeout) - late_msg = kafka_producer.send( - self.topic, partition=0, value=b"last", - timestamp_ms=late_time).get(timeout) - - consumer = self.kafka_consumer() - offsets = consumer.offsets_for_times({tp: early_time}) - self.assertEqual(len(offsets), 1) - self.assertEqual(offsets[tp].offset, early_msg.offset) - self.assertEqual(offsets[tp].timestamp, early_time) - - offsets = consumer.offsets_for_times({tp: middle_time}) - self.assertEqual(offsets[tp].offset, late_msg.offset) - self.assertEqual(offsets[tp].timestamp, late_time) - - offsets = consumer.offsets_for_times({tp: late_time}) - self.assertEqual(offsets[tp].offset, late_msg.offset) - self.assertEqual(offsets[tp].timestamp, late_time) - - offsets = consumer.offsets_for_times({}) - self.assertEqual(offsets, {}) - - # Out of bound timestamps check - - offsets = consumer.offsets_for_times({tp: 0}) - self.assertEqual(offsets[tp].offset, early_msg.offset) - self.assertEqual(offsets[tp].timestamp, early_time) - - offsets = consumer.offsets_for_times({tp: 9999999999999}) - self.assertEqual(offsets[tp], None) - - # Beginning/End offsets - - offsets = consumer.beginning_offsets([tp]) - self.assertEqual(offsets, { - tp: early_msg.offset, - }) - offsets = consumer.end_offsets([tp]) - self.assertEqual(offsets, { - tp: late_msg.offset + 1 - }) - consumer.close() - - @pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1") - def test_kafka_consumer_offsets_search_many_partitions(self): - tp0 = TopicPartition(self.topic, 0) - tp1 = TopicPartition(self.topic, 1) - - kafka_producer = self.kafka_producer() - send_time = int(time.time() * 1000) - timeout = 10 - p0msg = kafka_producer.send( - self.topic, partition=0, value=b"XXX", - timestamp_ms=send_time).get(timeout) - p1msg = kafka_producer.send( - self.topic, partition=1, value=b"XXX", - timestamp_ms=send_time).get(timeout) - - consumer = self.kafka_consumer() - offsets = consumer.offsets_for_times({ - tp0: send_time, - tp1: send_time - }) - - self.assertEqual(offsets, { - tp0: OffsetAndTimestamp(p0msg.offset, send_time), - tp1: OffsetAndTimestamp(p1msg.offset, send_time) - }) - - offsets = consumer.beginning_offsets([tp0, tp1]) - self.assertEqual(offsets, { - tp0: p0msg.offset, - tp1: p1msg.offset - }) - - offsets = consumer.end_offsets([tp0, tp1]) - self.assertEqual(offsets, { - tp0: p0msg.offset + 1, - tp1: p1msg.offset + 1 - }) - consumer.close() - - @pytest.mark.skipif(env_kafka_version() >= (0, 10, 1), reason="Requires KAFKA_VERSION < 0.10.1") - def test_kafka_consumer_offsets_for_time_old(self): - consumer = self.kafka_consumer() - tp = TopicPartition(self.topic, 0) - - with self.assertRaises(UnsupportedVersionError): - consumer.offsets_for_times({tp: int(time.time())}) - - @pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1") - def test_kafka_consumer_offsets_for_times_errors(self): - consumer = self.kafka_consumer(fetch_max_wait_ms=200, - request_timeout_ms=500) - tp = TopicPartition(self.topic, 0) - bad_tp = TopicPartition(self.topic, 100) - - with self.assertRaises(ValueError): - consumer.offsets_for_times({tp: -1}) - - with self.assertRaises(KafkaTimeoutError): - consumer.offsets_for_times({bad_tp: 0}) + messages.append(msg) + assert_message_count(messages, 5) + assert t.interval >= (TIMEOUT_MS / 1000.0) + + +@pytest.mark.skipif(env_kafka_version() < (0, 8, 1), reason="Requires KAFKA_VERSION >= 0.8.1") +def test_kafka_consumer__offset_commit_resume(kafka_consumer_factory, send_messages): + GROUP_ID = random_string(10) + + send_messages(range(0, 100), partition=0) + send_messages(range(100, 200), partition=1) + + # Start a consumer and grab the first 180 messages + consumer1 = kafka_consumer_factory( + group_id=GROUP_ID, + enable_auto_commit=True, + auto_commit_interval_ms=100, + auto_offset_reset='earliest', + ) + output_msgs1 = [] + for _ in range(180): + m = next(consumer1) + output_msgs1.append(m) + assert_message_count(output_msgs1, 180) + + # Normally we let the pytest fixture `kafka_consumer_factory` handle + # closing as part of its teardown. Here we manually call close() to force + # auto-commit to occur before the second consumer starts. That way the + # second consumer only consumes previously unconsumed messages. + consumer1.close() + + # Start a second consumer to grab 181-200 + consumer2 = kafka_consumer_factory( + group_id=GROUP_ID, + enable_auto_commit=True, + auto_commit_interval_ms=100, + auto_offset_reset='earliest', + ) + output_msgs2 = [] + for _ in range(20): + m = next(consumer2) + output_msgs2.append(m) + assert_message_count(output_msgs2, 20) + + # Verify the second consumer wasn't reconsuming messages that the first + # consumer already saw + assert_message_count(output_msgs1 + output_msgs2, 200) + + +@pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1") +def test_kafka_consumer_max_bytes_simple(kafka_consumer_factory, topic, send_messages): + send_messages(range(100, 200), partition=0) + send_messages(range(200, 300), partition=1) + + # Start a consumer + consumer = kafka_consumer_factory( + auto_offset_reset='earliest', fetch_max_bytes=300) + seen_partitions = set() + for i in range(90): + poll_res = consumer.poll(timeout_ms=100) + for partition, msgs in poll_res.items(): + for msg in msgs: + seen_partitions.add(partition) + + # Check that we fetched at least 1 message from both partitions + assert seen_partitions == {TopicPartition(topic, 0), TopicPartition(topic, 1)} + + +@pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1") +def test_kafka_consumer_max_bytes_one_msg(kafka_consumer_factory, send_messages): + # We send to only 1 partition so we don't have parallel requests to 2 + # nodes for data. + send_messages(range(100, 200)) + + # Start a consumer. FetchResponse_v3 should always include at least 1 + # full msg, so by setting fetch_max_bytes=1 we should get 1 msg at a time + # But 0.11.0.0 returns 1 MessageSet at a time when the messages are + # stored in the new v2 format by the broker. + # + # DP Note: This is a strange test. The consumer shouldn't care + # how many messages are included in a FetchResponse, as long as it is + # non-zero. I would not mind if we deleted this test. It caused + # a minor headache when testing 0.11.0.0. + group = 'test-kafka-consumer-max-bytes-one-msg-' + random_string(5) + consumer = kafka_consumer_factory( + group_id=group, + auto_offset_reset='earliest', + consumer_timeout_ms=5000, + fetch_max_bytes=1) + + fetched_msgs = [next(consumer) for i in range(10)] + assert_message_count(fetched_msgs, 10) + + +@pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1") +def test_kafka_consumer_offsets_for_time(topic, kafka_consumer, kafka_producer): + late_time = int(time.time()) * 1000 + middle_time = late_time - 1000 + early_time = late_time - 2000 + tp = TopicPartition(topic, 0) + + timeout = 10 + early_msg = kafka_producer.send( + topic, partition=0, value=b"first", + timestamp_ms=early_time).get(timeout) + late_msg = kafka_producer.send( + topic, partition=0, value=b"last", + timestamp_ms=late_time).get(timeout) + + consumer = kafka_consumer + offsets = consumer.offsets_for_times({tp: early_time}) + assert len(offsets) == 1 + assert offsets[tp].offset == early_msg.offset + assert offsets[tp].timestamp == early_time + + offsets = consumer.offsets_for_times({tp: middle_time}) + assert offsets[tp].offset == late_msg.offset + assert offsets[tp].timestamp == late_time + + offsets = consumer.offsets_for_times({tp: late_time}) + assert offsets[tp].offset == late_msg.offset + assert offsets[tp].timestamp == late_time + + offsets = consumer.offsets_for_times({}) + assert offsets == {} + + # Out of bound timestamps check + + offsets = consumer.offsets_for_times({tp: 0}) + assert offsets[tp].offset == early_msg.offset + assert offsets[tp].timestamp == early_time + + offsets = consumer.offsets_for_times({tp: 9999999999999}) + assert offsets[tp] is None + + # Beginning/End offsets + + offsets = consumer.beginning_offsets([tp]) + assert offsets == {tp: early_msg.offset} + offsets = consumer.end_offsets([tp]) + assert offsets == {tp: late_msg.offset + 1} + + +@pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1") +def test_kafka_consumer_offsets_search_many_partitions(kafka_consumer, kafka_producer, topic): + tp0 = TopicPartition(topic, 0) + tp1 = TopicPartition(topic, 1) + + send_time = int(time.time() * 1000) + timeout = 10 + p0msg = kafka_producer.send( + topic, partition=0, value=b"XXX", + timestamp_ms=send_time).get(timeout) + p1msg = kafka_producer.send( + topic, partition=1, value=b"XXX", + timestamp_ms=send_time).get(timeout) + + consumer = kafka_consumer + offsets = consumer.offsets_for_times({ + tp0: send_time, + tp1: send_time + }) + + assert offsets == { + tp0: OffsetAndTimestamp(p0msg.offset, send_time), + tp1: OffsetAndTimestamp(p1msg.offset, send_time) + } + + offsets = consumer.beginning_offsets([tp0, tp1]) + assert offsets == { + tp0: p0msg.offset, + tp1: p1msg.offset + } + + offsets = consumer.end_offsets([tp0, tp1]) + assert offsets == { + tp0: p0msg.offset + 1, + tp1: p1msg.offset + 1 + } + + +@pytest.mark.skipif(env_kafka_version() >= (0, 10, 1), reason="Requires KAFKA_VERSION < 0.10.1") +def test_kafka_consumer_offsets_for_time_old(kafka_consumer, topic): + consumer = kafka_consumer + tp = TopicPartition(topic, 0) + + with pytest.raises(UnsupportedVersionError): + consumer.offsets_for_times({tp: int(time.time())}) + + +@pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1") +def test_kafka_consumer_offsets_for_times_errors(kafka_consumer_factory, topic): + consumer = kafka_consumer_factory(fetch_max_wait_ms=200, + request_timeout_ms=500) + tp = TopicPartition(topic, 0) + bad_tp = TopicPartition(topic, 100) + + with pytest.raises(ValueError): + consumer.offsets_for_times({tp: -1}) + + with pytest.raises(KafkaTimeoutError): + consumer.offsets_for_times({bad_tp: 0}) diff --git a/test/testutil.py b/test/testutil.py index 327226205..650f9bf29 100644 --- a/test/testutil.py +++ b/test/testutil.py @@ -47,6 +47,17 @@ def current_offset(client, topic, partition, kafka_broker=None): return offsets.offsets[0] +def assert_message_count(messages, num_messages): + """Check that we received the expected number of messages with no duplicates.""" + # Make sure we got them all + assert len(messages) == num_messages + # Make sure there are no duplicates + # Note: Currently duplicates are identified only using key/value. Other attributes like topic, partition, headers, + # timestamp, etc are ignored... this could be changed if necessary, but will be more tolerant of dupes. + unique_messages = {(m.key, m.value) for m in messages} + assert len(unique_messages) == num_messages + + class KafkaIntegrationTestCase(unittest.TestCase): create_client = True topic = None