Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge two tests that are very similar #1891

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 15 additions & 35 deletions test/test_consumer_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -623,48 +623,28 @@ def test_kafka_consumer__offset_commit_resume(kafka_consumer_factory, send_messa


@pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1")
def test_kafka_consumer_max_bytes_simple(kafka_consumer_factory, topic, send_messages):
send_messages(range(100, 200), partition=0)
send_messages(range(200, 300), partition=1)

# Start a consumer
consumer = kafka_consumer_factory(
auto_offset_reset='earliest', fetch_max_bytes=300)
seen_partitions = set()
for i in range(90):
poll_res = consumer.poll(timeout_ms=100)
for partition, msgs in poll_res.items():
for msg in msgs:
seen_partitions.add(partition)
def test_kafka_consumer_max_bytes_one_msg(kafka_consumer_factory, topic, send_messages):
"""Check that messages larger than fetch_max_bytes are still received.

# Check that we fetched at least 1 message from both partitions
assert seen_partitions == {TopicPartition(topic, 0), TopicPartition(topic, 1)}


@pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1")
def test_kafka_consumer_max_bytes_one_msg(kafka_consumer_factory, send_messages):
# We send to only 1 partition so we don't have parallel requests to 2
# nodes for data.
send_messages(range(100, 200))
We are checking for both partition starvation and messages simply not being
received. The broker should reply with them, just making sure the consumer
isn't doing anything unexpected client-side that blocks them.
"""
send_messages(range(0, 100), partition=0)
send_messages(range(100, 200), partition=1)

# Start a consumer. FetchResponse_v3 should always include at least 1
# full msg, so by setting fetch_max_bytes=1 we should get 1 msg at a time
# But 0.11.0.0 returns 1 MessageSet at a time when the messages are
# stored in the new v2 format by the broker.
#
# DP Note: This is a strange test. The consumer shouldn't care
# how many messages are included in a FetchResponse, as long as it is
# non-zero. I would not mind if we deleted this test. It caused
# a minor headache when testing 0.11.0.0.
group = 'test-kafka-consumer-max-bytes-one-msg-' + random_string(5)
consumer = kafka_consumer_factory(
group_id=group,
auto_offset_reset='earliest',
consumer_timeout_ms=5000,
fetch_max_bytes=1)
consumer = kafka_consumer_factory(auto_offset_reset='earliest', fetch_max_bytes=1)

messages = [next(consumer) for i in range(10)]
assert_message_count(messages, 10)

fetched_msgs = [next(consumer) for i in range(10)]
assert_message_count(fetched_msgs, 10)
# Check that we fetched at least 1 message from both partitions
seen_partitions = {(m.topic, m.partition) for m in messages}
assert seen_partitions == {TopicPartition(topic, 0), TopicPartition(topic, 1)}


@pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1")
Expand Down