Skip to content

Commit

Permalink
fix consumer retry logic (fixes #135)
Browse files Browse the repository at this point in the history
Fixes bug in the follow condition:

* Starting buffer size is 1024, max buffer size is 2048, both set on an instance level
* Fetch from p0, p1 and received response
* p0 has more than 1024 bytes, consumer doubles buffer size to 2048 and marks p0 for retry
* p1 has more than 1024 bytes, consumer tries to double buffer size, but sees that it's at
  the max and raises ConsumerFetchSizeTooSmall

The fix changes the logic to the following:

* Starting buffer size is 1024 set on a per-partition level, max buffer size is 2048 set on an instance level
* Fetch from p0, p1 and received response
* p0 has more than 1024 bytes, consumer doubles buffer size to 2048 for p0 and marks p0 for retry
* p1 has more than 1024 bytes, consumer double buffer size to 2048 for p1 and marks p1 for retry
* Consumer sees that there's partitions to retry, repeats parsing loop
* p0 sent all the bytes this time, consumer yields these messages
* p1 sent all the bytes this time, consumer yields these messages
  • Loading branch information
Carlo Cabanilla committed Feb 28, 2014
1 parent 4abf7ee commit a798c1e
Showing 1 changed file with 14 additions and 12 deletions.
26 changes: 14 additions & 12 deletions kafka/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,47 +404,49 @@ def __iter__(self):

def _fetch(self):
# Create fetch request payloads for all the partitions
requests = []
partitions = self.fetch_offsets.keys()
partitions = dict((p, self.buffer_size)
for p in self.fetch_offsets.keys())
while partitions:
for partition in partitions:
requests = []
for partition, buffer_size in partitions.iteritems():
requests.append(FetchRequest(self.topic, partition,
self.fetch_offsets[partition],
self.buffer_size))
buffer_size))
# Send request
responses = self.client.send_fetch_request(
requests,
max_wait_time=int(self.fetch_max_wait_time),
min_bytes=self.fetch_min_bytes)

retry_partitions = set()
retry_partitions = {}
for resp in responses:
partition = resp.partition
buffer_size = partitions[partition]
try:
for message in resp.messages:
# Put the message in our queue
self.queue.put((partition, message))
self.fetch_offsets[partition] = message.offset + 1
except ConsumerFetchSizeTooSmall, e:
if (self.max_buffer_size is not None and
self.buffer_size == self.max_buffer_size):
buffer_size == self.max_buffer_size):
log.error("Max fetch size %d too small",
self.max_buffer_size)
raise e
if self.max_buffer_size is None:
self.buffer_size *= 2
buffer_size *= 2
else:
self.buffer_size = max(self.buffer_size * 2,
self.max_buffer_size)
buffer_size = max(buffer_size * 2,
self.max_buffer_size)
log.warn("Fetch size too small, increase to %d (2x) "
"and retry", self.buffer_size)
retry_partitions.add(partition)
"and retry", buffer_size)
retry_partitions[partition] = buffer_size
except ConsumerNoMoreData, e:
log.debug("Iteration was ended by %r", e)
except StopIteration:
# Stop iterating through this partition
log.debug("Done iterating over partition %s" % partition)
partitions = retry_partitions
partitions = retry_partitions

def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
"""
Expand Down

0 comments on commit a798c1e

Please sign in to comment.