@@ -338,6 +338,55 @@ class StreamingConsumerLiveTest extends TestBase with ScalaFutures {
338338 }
339339 }
340340
341+ test(" streaming consumer cancels the task after timeout" ) {
342+ val c = createConfig()
343+ import c ._
344+
345+ RabbitMQConnection .fromConfig[Task ](config, ex).withResource { rabbitConnection =>
346+ val count = Random .nextInt(100 ) + 100
347+
348+ logger.info(s " Sending $count messages " )
349+
350+ val delivered = new AtomicInteger (0 )
351+ val executed = new AtomicInteger (0 )
352+
353+ rabbitConnection.newStreamingConsumer[Bytes ](" testingStreamingWithTimeout" , Monitor .noOp()).withResource { cons =>
354+ val stream = cons.deliveryStream
355+ .mapAsyncUnordered(50 ) {
356+ _.handleWith { d =>
357+ Task .delay(delivered.incrementAndGet()) >>
358+ Task .sleep(800 .millis) >>
359+ // the consumer has timeout to 500ms so this should never get executed!
360+ Task {
361+ logger.info(s " Executed: ${d.properties.messageId.getOrElse(" -no-message-id-" )}" )
362+ executed.incrementAndGet()
363+ }.as(Ack )
364+ }
365+ }
366+
367+ rabbitConnection.newProducer[Bytes ](" testing" , Monitor .noOp()).withResource { sender =>
368+ for (_ <- 1 to count) {
369+ sender.send(" test" , Bytes .copyFromUtf8(Random .nextString(10 ))).await
370+ }
371+
372+ // it takes some time before the stats appear... :-|
373+ eventually(timeout(Span (5 , Seconds )), interval(Span (0.5 , Seconds ))) {
374+ assertResult(count)(testHelper.queue.getPublishedCount(queueName1))
375+ }
376+
377+ ex.execute(() => stream.compile.drain.runSyncUnsafe()) // run the stream
378+
379+ eventually(timeout(Span (60 , Seconds )), interval(Span (1 , Seconds ))) {
380+ println(s " D: ${delivered.get()} EX: ${executed.get()}" )
381+ assert(delivered.get() >= count)
382+ assertResult(0 )(executed.get())
383+ assert(testHelper.exchange.getPublishedCount(exchange5) > 0 )
384+ }
385+ }
386+ }
387+ }
388+ }
389+
341390 test(" can be closed properly" ) {
342391 val c = createConfig()
343392 import c ._
@@ -421,7 +470,7 @@ class StreamingConsumerLiveTest extends TestBase with ScalaFutures {
421470
422471 createStream().map(_ => processedFromRest.incrementAndGet()).compile.drain.startAndForget.await // run asynchronously
423472
424- eventually(timeout(Span (5 , Seconds )), interval(Span (0.2 , Seconds ))) {
473+ eventually(timeout(Span (10 , Seconds )), interval(Span (0.2 , Seconds ))) {
425474 println(" D: " + processedFromRest.get())
426475 assertResult(9 )(processedFromRest.get())
427476 assertResult(0 )(testHelper.queue.getMessagesCount(queueName1))
0 commit comments