@@ -416,47 +416,49 @@ def __iter__(self):
416416
417417 def _fetch (self ):
418418 # Create fetch request payloads for all the partitions
419- requests = []
420- partitions = self .fetch_offsets .keys ()
419+ partitions = dict (( p , self . buffer_size )
420+ for p in self .fetch_offsets .keys () )
421421 while partitions :
422- for partition in partitions :
422+ requests = []
423+ for partition , buffer_size in partitions .iteritems ():
423424 requests .append (FetchRequest (self .topic , partition ,
424425 self .fetch_offsets [partition ],
425- self . buffer_size ))
426+ buffer_size ))
426427 # Send request
427428 responses = self .client .send_fetch_request (
428429 requests ,
429430 max_wait_time = int (self .fetch_max_wait_time ),
430431 min_bytes = self .fetch_min_bytes )
431432
432- retry_partitions = set ()
433+ retry_partitions = {}
433434 for resp in responses :
434435 partition = resp .partition
436+ buffer_size = partitions [partition ]
435437 try :
436438 for message in resp .messages :
437439 # Put the message in our queue
438440 self .queue .put ((partition , message ))
439441 self .fetch_offsets [partition ] = message .offset + 1
440442 except ConsumerFetchSizeTooSmall :
441443 if (self .max_buffer_size is not None and
442- self . buffer_size == self .max_buffer_size ):
444+ buffer_size == self .max_buffer_size ):
443445 log .error ("Max fetch size %d too small" ,
444446 self .max_buffer_size )
445447 raise
446448 if self .max_buffer_size is None :
447- self . buffer_size *= 2
449+ buffer_size *= 2
448450 else :
449- self . buffer_size = max (self . buffer_size * 2 ,
450- self .max_buffer_size )
451+ buffer_size = max (buffer_size * 2 ,
452+ self .max_buffer_size )
451453 log .warn ("Fetch size too small, increase to %d (2x) "
452- "and retry" , self . buffer_size )
453- retry_partitions . add ( partition )
454+ "and retry" , buffer_size )
455+ retry_partitions [ partition ] = buffer_size
454456 except ConsumerNoMoreData as e :
455457 log .debug ("Iteration was ended by %r" , e )
456458 except StopIteration :
457459 # Stop iterating through this partition
458460 log .debug ("Done iterating over partition %s" % partition )
459- partitions = retry_partitions
461+ partitions = retry_partitions
460462
461463def _mp_consume (client , group , topic , chunk , queue , start , exit , pause , size ):
462464 """
0 commit comments