@@ -24,7 +24,6 @@ const {
24
24
const { Buffer } = require ( 'buffer' ) ;
25
25
const MessageCache = require ( './_consumer_cache' ) ;
26
26
const { hrtime } = require ( 'process' ) ;
27
- const { LinkedList } = require ( './_linked-list' ) ;
28
27
29
28
const ConsumerState = Object . freeze ( {
30
29
INIT : 0 ,
@@ -203,11 +202,10 @@ class Consumer {
203
202
* It's set to null when no fetch is in progress.
204
203
*/
205
204
#fetchInProgress;
206
-
207
205
/**
208
- * List of DeferredPromises waiting on consumer queue to be non-empty.
206
+ * Are we waiting for the queue to be non-empty?
209
207
*/
210
- #queueWaiters = new LinkedList ( ) ;
208
+ #nonEmpty = null ;
211
209
212
210
/**
213
211
* Whether any rebalance callback is in progress.
@@ -363,7 +361,6 @@ class Consumer {
363
361
*/
364
362
async #rebalanceCallback( err , assignment ) {
365
363
const isLost = this . #internalClient. assignmentLost ( ) ;
366
- this . #rebalanceCbInProgress = new DeferredPromise ( ) ;
367
364
let assignmentFnCalled = false ;
368
365
this . #logger. info (
369
366
`Received rebalance event with message: '${ err . message } ' and ${ assignment . length } partition(s), isLost: ${ isLost } ` ,
@@ -468,7 +465,7 @@ class Consumer {
468
465
*/
469
466
const workersToSpawn = Math . max ( 1 , Math . min ( this . #concurrency, this . #partitionCount) ) ;
470
467
if ( workersToSpawn !== this . #workers. length ) {
471
- this . #workerTerminationScheduled . resolve ( ) ;
468
+ this . #resolveWorkerTerminationScheduled ( ) ;
472
469
/* We don't need to await the workers here. We are OK if the termination and respawning
473
470
* occurs later, since even if we have a few more or few less workers for a while, it's
474
471
* not a big deal. */
@@ -639,11 +636,14 @@ class Consumer {
639
636
/* Certain properties that the user has set are overridden. We use trampolines to accommodate the user's callbacks.
640
637
* TODO: add trampoline method for offset commit callback. */
641
638
rdKafkaConfig [ 'offset_commit_cb' ] = true ;
642
- rdKafkaConfig [ 'rebalance_cb' ] = ( err , assignment ) => this . #rebalanceCallback( err , assignment ) . catch ( e =>
639
+ rdKafkaConfig [ 'rebalance_cb' ] = ( err , assignment ) => {
640
+ this . #rebalanceCbInProgress = new DeferredPromise ( ) ;
641
+ this . #rebalanceCallback( err , assignment ) . catch ( e =>
643
642
{
644
643
if ( this . #logger)
645
644
this . #logger. error ( `Error from rebalance callback: ${ e . stack } ` ) ;
646
645
} ) ;
646
+ } ;
647
647
648
648
/* We handle offset storage within the promisified API by ourselves. Thus we don't allow the user to change this
649
649
* setting and set it to false. */
@@ -904,6 +904,7 @@ class Consumer {
904
904
const returnPayload = {
905
905
batch,
906
906
_stale : false ,
907
+ _seeked : false ,
907
908
_lastResolvedOffset : { offset : - 1 , leaderEpoch : - 1 } ,
908
909
heartbeat : async ( ) => { /* no op */ } ,
909
910
pause : this . pause . bind ( this , [ { topic, partitions : [ partition ] } ] ) ,
@@ -922,9 +923,25 @@ class Consumer {
922
923
923
924
async #fetchAndResolveWith( takeFromCache , size ) {
924
925
if ( this . #fetchInProgress) {
926
+ await this . #fetchInProgress;
927
+ /* Restart with the checks as we might have
928
+ * a new fetch in progress already. */
929
+ return null ;
930
+ }
931
+
932
+ if ( this . #nonEmpty) {
933
+ await this . #nonEmpty;
934
+ /* Restart with the checks as we might have
935
+ * a new fetch in progress already. */
925
936
return null ;
926
937
}
927
938
939
+ if ( this . #workerTerminationScheduled. resolved ) {
940
+ /* Return without fetching. */
941
+ return null ;
942
+ }
943
+
944
+ let err , messages , processedRebalance = false ;
928
945
try {
929
946
this . #fetchInProgress = new DeferredPromise ( ) ;
930
947
const fetchResult = new DeferredPromise ( ) ;
@@ -933,8 +950,9 @@ class Consumer {
933
950
this . #internalClient. consume ( size , ( err , messages ) =>
934
951
fetchResult . resolve ( [ err , messages ] ) ) ;
935
952
936
- let [ err , messages ] = await fetchResult ;
953
+ [ err , messages ] = await fetchResult ;
937
954
if ( this . #rebalanceCbInProgress) {
955
+ processedRebalance = true ;
938
956
await this . #rebalanceCbInProgress;
939
957
this . #rebalanceCbInProgress = null ;
940
958
}
@@ -956,6 +974,8 @@ class Consumer {
956
974
} finally {
957
975
this . #fetchInProgress. resolve ( ) ;
958
976
this . #fetchInProgress = null ;
977
+ if ( ! err && ! processedRebalance && this . #messageCache. assignedSize === 0 )
978
+ this . #nonEmpty = new DeferredPromise ( ) ;
959
979
}
960
980
}
961
981
@@ -973,10 +993,13 @@ class Consumer {
973
993
}
974
994
975
995
/* It's possible that we get msg = null, but that's because partitionConcurrency
976
- * exceeds the number of partitions containing messages. So in this case,
977
- * we should not call for new fetches, rather, try to focus on what we have left .
996
+ * exceeds the number of partitions containing messages. So
997
+ * we should wait for a new partition to be available .
978
998
*/
979
999
if ( ! msg && this . #messageCache. assignedSize !== 0 ) {
1000
+ await this . #messageCache. availablePartitions ( ) ;
1001
+ /* Restart with the checks as we might have
1002
+ * the cache full. */
980
1003
return null ;
981
1004
}
982
1005
@@ -1000,10 +1023,13 @@ class Consumer {
1000
1023
}
1001
1024
1002
1025
/* It's possible that we get msgs = null, but that's because partitionConcurrency
1003
- * exceeds the number of partitions containing messages. So in this case,
1004
- * we should not call for new fetches, rather, try to focus on what we have left .
1026
+ * exceeds the number of partitions containing messages. So
1027
+ * we should wait for a new partition to be available .
1005
1028
*/
1006
1029
if ( ! msgs && this . #messageCache. assignedSize !== 0 ) {
1030
+ await this . #messageCache. availablePartitions ( ) ;
1031
+ /* Restart with the checks as we might have
1032
+ * the cache full. */
1007
1033
return null ;
1008
1034
}
1009
1035
@@ -1316,7 +1342,7 @@ class Consumer {
1316
1342
1317
1343
/* If any message is unprocessed, either due to an error or due to the user not marking it processed, we must seek
1318
1344
* back to get it so it can be reprocessed. */
1319
- if ( lastOffsetProcessed . offset !== lastOffset ) {
1345
+ if ( ! payload . _seeked && lastOffsetProcessed . offset !== lastOffset ) {
1320
1346
const offsetToSeekTo = lastOffsetProcessed . offset === - 1 ? firstMessage . offset : ( lastOffsetProcessed . offset + 1 ) ;
1321
1347
const leaderEpoch = lastOffsetProcessed . offset === - 1 ? firstMessage . leaderEpoch : lastOffsetProcessed . leaderEpoch ;
1322
1348
this . seek ( {
@@ -1348,36 +1374,27 @@ class Consumer {
1348
1374
return ppc ;
1349
1375
}
1350
1376
1351
- #queueNonEmptyCb( ) {
1352
- for ( const waiter of this . #queueWaiters) {
1353
- waiter . resolve ( ) ;
1377
+ #notifyNonEmpty( ) {
1378
+ if ( this . #nonEmpty) {
1379
+ this . #nonEmpty. resolve ( ) ;
1380
+ this . #nonEmpty = null ;
1354
1381
}
1382
+ if ( this . #messageCache)
1383
+ this . #messageCache. notifyAvailablePartitions ( ) ;
1355
1384
}
1356
1385
1357
- async #nextFetchRetry( ) {
1358
- if ( this . #fetchInProgress) {
1359
- await this . #fetchInProgress;
1360
- } else {
1361
- /* Backoff a little. If m is null, we might be without messages
1362
- * or in available partition starvation, and calling consumeSingleCached
1363
- * in a tight loop will help no one.
1364
- * In case there is any message in the queue, we'll be woken up before the
1365
- * timer expires.
1366
- * We have a per-worker promise, otherwise we end up awakening
1367
- * other workers when they've already looped and just restarted awaiting.
1368
- * The `Promise` passed to `Timer.withTimeout` cannot be reused
1369
- * in next call to this method, to avoid memory leaks caused
1370
- * by `Promise.race`. */
1371
- const waiter = new DeferredPromise ( ) ;
1372
- const waiterNode = this . #queueWaiters. addLast ( waiter ) ;
1373
- await Timer . withTimeout ( 1000 , waiter ) ;
1374
-
1375
- /* Resolves the "extra" promise that has been spawned when creating the timer. */
1376
- waiter . resolve ( ) ;
1377
- this . #queueWaiters. remove ( waiterNode ) ;
1378
- }
1379
- }
1386
+ #queueNonEmptyCb( ) {
1387
+ const nonEmptyAction = async ( ) => {
1388
+ if ( this . #fetchInProgress)
1389
+ await this . #fetchInProgress;
1380
1390
1391
+ this . #notifyNonEmpty( ) ;
1392
+ } ;
1393
+ nonEmptyAction ( ) . catch ( ( e ) => {
1394
+ this . #logger. error ( `Error in queueNonEmptyCb: ${ e } ` ,
1395
+ this . #createConsumerBindingMessageMetadata( ) ) ;
1396
+ } ) ;
1397
+ }
1381
1398
/**
1382
1399
* Starts a worker to fetch messages/batches from the internal consumer and process them.
1383
1400
*
@@ -1393,27 +1410,24 @@ class Consumer {
1393
1410
*/
1394
1411
async #worker( config , perMessageProcessor , fetcher ) {
1395
1412
let ppc = null ;
1396
-
1397
1413
while ( ! this . #workerTerminationScheduled. resolved ) {
1414
+ try {
1415
+ const ms = await fetcher ( ppc ) ;
1416
+ if ( ! ms )
1417
+ continue ;
1398
1418
1399
- const ms = await fetcher ( ppc ) . catch ( e => {
1419
+ if ( this . #pendingOperations. length ) {
1420
+ ppc = this . #discardMessages( ms , ppc ) ;
1421
+ break ;
1422
+ }
1423
+
1424
+ ppc = await perMessageProcessor ( ms , config ) ;
1425
+ } catch ( e ) {
1400
1426
/* Since this error cannot be exposed to the user in the current situation, just log and retry.
1401
1427
* This is due to restartOnFailure being set to always true. */
1402
1428
if ( this . #logger)
1403
1429
this . #logger. error ( `Consumer encountered error while consuming. Retrying. Error details: ${ e } : ${ e . stack } ` , this . #createConsumerBindingMessageMetadata( ) ) ;
1404
- } ) ;
1405
-
1406
- if ( this . #pendingOperations. length ) {
1407
- ppc = this . #discardMessages( ms , ppc ) ;
1408
- break ;
1409
- }
1410
-
1411
- if ( ! ms ) {
1412
- await this . #nextFetchRetry( ) ;
1413
- continue ;
1414
1430
}
1415
-
1416
- ppc = await perMessageProcessor ( ms , config ) ;
1417
1431
}
1418
1432
1419
1433
if ( ppc )
@@ -1447,19 +1461,32 @@ class Consumer {
1447
1461
* @private
1448
1462
*/
1449
1463
async #cacheExpirationLoop( ) {
1464
+ const cacheExpirationInterval = BigInt ( this . #cacheExpirationTimeoutMs * 1e6 ) ;
1465
+ const maxFetchInterval = BigInt ( 1000 * 1e6 ) ;
1450
1466
while ( ! this . #workerTerminationScheduled. resolved ) {
1451
1467
let now = hrtime . bigint ( ) ;
1452
- const cacheExpiration = this . #lastFetchClockNs +
1453
- BigInt ( this . #cacheExpirationTimeoutMs * 1e6 ) ;
1468
+ const cacheExpirationTimeout = this . #lastFetchClockNs +
1469
+ cacheExpirationInterval ;
1470
+ const maxFetchTimeout = this . #lastFetchClockNs +
1471
+ maxFetchInterval ;
1454
1472
1455
- if ( now > cacheExpiration ) {
1473
+ if ( now > cacheExpirationTimeout ) {
1456
1474
this . #addPendingOperation( ( ) =>
1457
1475
this . #clearCacheAndResetPositions( ) ) ;
1458
1476
await this . #checkMaxPollIntervalNotExceeded( now ) ;
1459
1477
break ;
1460
1478
}
1479
+ if ( now > maxFetchTimeout ) {
1480
+ /* We need to continue fetching even when we're
1481
+ * not getting any messages, for example when all partitions are
1482
+ * paused. */
1483
+ this . #notifyNonEmpty( ) ;
1484
+ }
1461
1485
1462
- let interval = Number ( cacheExpiration - now ) / 1e6 ;
1486
+ const awakeTime = maxFetchTimeout < cacheExpirationTimeout ?
1487
+ maxFetchTimeout : cacheExpirationTimeout ;
1488
+
1489
+ let interval = Number ( awakeTime - now ) / 1e6 ;
1463
1490
if ( interval < 100 )
1464
1491
interval = 100 ;
1465
1492
await Timer . withTimeout ( interval , this . #maxPollIntervalRestart) ;
@@ -1481,6 +1508,13 @@ class Consumer {
1481
1508
this . #pendingOperations = [ ] ;
1482
1509
}
1483
1510
1511
+ #resolveWorkerTerminationScheduled( ) {
1512
+ if ( this . #workerTerminationScheduled) {
1513
+ this . #workerTerminationScheduled. resolve ( ) ;
1514
+ this . #queueNonEmptyCb( ) ;
1515
+ }
1516
+ }
1517
+
1484
1518
/**
1485
1519
* Internal polling loop.
1486
1520
* Spawns and awaits workers until disconnect is initiated.
@@ -1662,7 +1696,7 @@ class Consumer {
1662
1696
1663
1697
#addPendingOperation( fun ) {
1664
1698
if ( this . #pendingOperations. length === 0 ) {
1665
- this . #workerTerminationScheduled . resolve ( ) ;
1699
+ this . #resolveWorkerTerminationScheduled ( ) ;
1666
1700
}
1667
1701
this . #pendingOperations. push ( fun ) ;
1668
1702
}
@@ -1727,11 +1761,15 @@ class Consumer {
1727
1761
}
1728
1762
}
1729
1763
1730
- #markBatchPayloadsStale( topicPartitions ) {
1764
+ #markBatchPayloadsStale( topicPartitions , isSeek ) {
1731
1765
for ( const topicPartition of topicPartitions ) {
1732
1766
const key = partitionKey ( topicPartition ) ;
1733
- if ( this . #topicPartitionToBatchPayload. has ( key ) )
1734
- this . #topicPartitionToBatchPayload. get ( key ) . _stale = true ;
1767
+ if ( this . #topicPartitionToBatchPayload. has ( key ) ) {
1768
+ const payload = this . #topicPartitionToBatchPayload. get ( key ) ;
1769
+ payload . _stale = true ;
1770
+ if ( isSeek )
1771
+ payload . _seeked = true ;
1772
+ }
1735
1773
}
1736
1774
}
1737
1775
@@ -1757,7 +1795,7 @@ class Consumer {
1757
1795
}
1758
1796
}
1759
1797
if ( seekOffsets . length ) {
1760
- await this . #seekInternal( seekOffsets , false ) ;
1798
+ await this . #seekInternal( seekOffsets ) ;
1761
1799
}
1762
1800
}
1763
1801
@@ -1801,7 +1839,7 @@ class Consumer {
1801
1839
}
1802
1840
1803
1841
/* If anyone's using eachBatch, mark the batch as stale. */
1804
- this . #markBatchPayloadsStale( [ rdKafkaTopicPartitionOffset ] ) ;
1842
+ this . #markBatchPayloadsStale( [ rdKafkaTopicPartitionOffset ] , true ) ;
1805
1843
1806
1844
this . #addPendingOperation( ( ) =>
1807
1845
this . #seekInternal( [ rdKafkaTopicPartitionOffset ] ) ) ;
@@ -2010,7 +2048,7 @@ class Consumer {
2010
2048
}
2011
2049
2012
2050
this . #disconnectStarted = true ;
2013
- this . #workerTerminationScheduled . resolve ( ) ;
2051
+ this . #resolveWorkerTerminationScheduled ( ) ;
2014
2052
this . #logger. debug ( "Signalling disconnection attempt to workers" , this . #createConsumerBindingMessageMetadata( ) ) ;
2015
2053
await this . #lock. write ( async ( ) => {
2016
2054
0 commit comments