From 410f03b4efa75342201fd0ec68b223a6ae8a7a72 Mon Sep 17 00:00:00 2001 From: Jeff Widman Date: Thu, 22 Aug 2019 21:27:57 -0700 Subject: [PATCH] Merge two tests that are very similar Previously the `test_kafka_consumer_max_bytes_simple()` was seeing occasional test failures because it was doing only 10 iterations. And much of the purpose of it was gutted when Kafka 0.11 came out and changed the behavior. So this merges the two tests into one which should be relatively straightforward. Further discussion in https://github.com/dpkp/kafka-python/pull/1886/files#r316860737 --- test/test_consumer_integration.py | 50 ++++++++++--------------------- 1 file changed, 15 insertions(+), 35 deletions(-) diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index c7e2ebf5e..38eec82df 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -623,48 +623,28 @@ def test_kafka_consumer__offset_commit_resume(kafka_consumer_factory, send_messa @pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1") -def test_kafka_consumer_max_bytes_simple(kafka_consumer_factory, topic, send_messages): - send_messages(range(100, 200), partition=0) - send_messages(range(200, 300), partition=1) - - # Start a consumer - consumer = kafka_consumer_factory( - auto_offset_reset='earliest', fetch_max_bytes=300) - seen_partitions = set() - for i in range(90): - poll_res = consumer.poll(timeout_ms=100) - for partition, msgs in poll_res.items(): - for msg in msgs: - seen_partitions.add(partition) +def test_kafka_consumer_max_bytes_one_msg(kafka_consumer_factory, topic, send_messages): + """Check that messages larger than fetch_max_bytes are still received. - # Check that we fetched at least 1 message from both partitions - assert seen_partitions == {TopicPartition(topic, 0), TopicPartition(topic, 1)} - - -@pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1") -def test_kafka_consumer_max_bytes_one_msg(kafka_consumer_factory, send_messages): - # We send to only 1 partition so we don't have parallel requests to 2 - # nodes for data. - send_messages(range(100, 200)) + We are checking for both partition starvation and messages simply not being + received. The broker should reply with them, just making sure the consumer + isn't doing anything unexpected client-side that blocks them. + """ + send_messages(range(0, 100), partition=0) + send_messages(range(100, 200), partition=1) # Start a consumer. FetchResponse_v3 should always include at least 1 # full msg, so by setting fetch_max_bytes=1 we should get 1 msg at a time # But 0.11.0.0 returns 1 MessageSet at a time when the messages are # stored in the new v2 format by the broker. - # - # DP Note: This is a strange test. The consumer shouldn't care - # how many messages are included in a FetchResponse, as long as it is - # non-zero. I would not mind if we deleted this test. It caused - # a minor headache when testing 0.11.0.0. - group = 'test-kafka-consumer-max-bytes-one-msg-' + random_string(5) - consumer = kafka_consumer_factory( - group_id=group, - auto_offset_reset='earliest', - consumer_timeout_ms=5000, - fetch_max_bytes=1) + consumer = kafka_consumer_factory(auto_offset_reset='earliest', fetch_max_bytes=1) + + messages = [next(consumer) for i in range(25)] + assert_message_count(messages, 25) - fetched_msgs = [next(consumer) for i in range(10)] - assert_message_count(fetched_msgs, 10) + # Check that we fetched at least 1 message from both partitions + seen_partitions = {(m.topic, m.partition) for m in messages} + assert seen_partitions == {TopicPartition(topic, 0), TopicPartition(topic, 1)} @pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1")