Skip to content

Commit 8c4ee41

Browse files
Fix resource leak on cancellation
1 parent 134a4e7 commit 8c4ee41

File tree

24 files changed

+364
-112
lines changed

24 files changed

+364
-112
lines changed

armeria-backend/cats-ce2/src/main/scala/sttp/client3/armeria/cats/ArmeriaCatsBackend.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package sttp.client3.armeria.cats
22

3-
import cats.effect.{Concurrent, Resource, Sync}
3+
import cats.effect.{Concurrent, Resource, Sync, ExitCase}
44
import com.linecorp.armeria.client.WebClient
55
import com.linecorp.armeria.common.HttpData
66
import com.linecorp.armeria.common.stream.StreamMessage
@@ -17,6 +17,12 @@ private final class ArmeriaCatsBackend[F[_]: Concurrent](client: WebClient, clos
1717

1818
override val streams: NoStreams = NoStreams
1919

20+
override protected def ensureOnAbnormal[T](effect: F[T])(finalizer: => F[Unit]): F[T] =
21+
Concurrent[F].guaranteeCase(effect) { exit =>
22+
if (exit == ExitCase.Completed) Concurrent[F].unit
23+
else Concurrent[F].recoverWith(finalizer) { case t => Concurrent[F].delay(t.printStackTrace()) }
24+
}
25+
2026
override protected def bodyFromStreamMessage: BodyFromStreamMessage[F, Nothing] =
2127
new BodyFromStreamMessage[F, Nothing] {
2228

armeria-backend/cats/src/main/scala/sttp/client3/armeria/cats/ArmeriaCatsBackend.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,12 @@ private final class ArmeriaCatsBackend[F[_]: Async](client: WebClient, closeFact
1717

1818
override val streams: NoStreams = NoStreams
1919

20+
override protected def ensureOnAbnormal[T](effect: F[T])(finalizer: => F[Unit]): F[T] =
21+
Async[F].guaranteeCase(effect) { outcome =>
22+
if (outcome.isSuccess) Async[F].unit
23+
else Async[F].onError(finalizer) { case t => Async[F].delay(t.printStackTrace()) }
24+
}
25+
2026
override protected def bodyFromStreamMessage: BodyFromStreamMessage[F, Nothing] =
2127
new BodyFromStreamMessage[F, Nothing] {
2228

armeria-backend/fs2-ce2/src/main/scala/sttp/client3/armeria/fs2/ArmeriaFs2Backend.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package sttp.client3.armeria.fs2
22

3-
import cats.effect.{ConcurrentEffect, Resource, Sync}
3+
import cats.effect.{ConcurrentEffect, Resource, Sync, ExitCase}
44
import com.linecorp.armeria.client.WebClient
55
import com.linecorp.armeria.common.HttpData
66
import com.linecorp.armeria.common.stream.StreamMessage
@@ -19,6 +19,12 @@ private final class ArmeriaFs2Backend[F[_]: ConcurrentEffect](client: WebClient,
1919

2020
override val streams: Fs2Streams[F] = Fs2Streams[F]
2121

22+
override protected def ensureOnAbnormal[T](effect: F[T])(finalizer: => F[Unit]): F[T] =
23+
ConcurrentEffect[F].guaranteeCase(effect) { exitCase =>
24+
if (exitCase == ExitCase.Completed) ConcurrentEffect[F].unit
25+
else ConcurrentEffect[F].onError(finalizer) { case t => ConcurrentEffect[F].delay(t.printStackTrace()) }
26+
}
27+
2228
override protected def bodyFromStreamMessage: BodyFromStreamMessage[F, Fs2Streams[F]] =
2329
new BodyFromStreamMessage[F, Fs2Streams[F]] {
2430

armeria-backend/fs2/src/main/scala/sttp/client3/armeria/fs2/ArmeriaFs2Backend.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,12 @@ private final class ArmeriaFs2Backend[F[_]: Async](client: WebClient, closeFacto
2020

2121
override val streams: Fs2Streams[F] = Fs2Streams[F]
2222

23+
override protected def ensureOnAbnormal[T](effect: F[T])(finalizer: => F[Unit]): F[T] =
24+
Async[F].guaranteeCase(effect) { outcome =>
25+
if (outcome.isSuccess) Async[F].unit
26+
else Async[F].onError(finalizer) { case t => Async[F].delay(t.printStackTrace()) }
27+
}
28+
2329
override protected def bodyFromStreamMessage: BodyFromStreamMessage[F, Fs2Streams[F]] =
2430
new BodyFromStreamMessage[F, Fs2Streams[F]] {
2531

armeria-backend/monix/src/main/scala/sttp/client3/armeria/monix/ArmeriaMonixBackend.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package sttp.client3.armeria.monix
22

3+
import cats.effect.ExitCase
34
import com.linecorp.armeria.client.WebClient
45
import com.linecorp.armeria.common.HttpData
56
import com.linecorp.armeria.common.stream.StreamMessage
@@ -19,6 +20,11 @@ private final class ArmeriaMonixBackend(client: WebClient, closeFactory: Boolean
1920

2021
override val streams: MonixStreams = MonixStreams
2122

23+
override protected def ensureOnAbnormal[T](effect: Task[T])(finalizer: => Task[Unit]): Task[T] =
24+
effect.guaranteeCase { exit =>
25+
if (exit == ExitCase.Completed) Task.unit else finalizer.onErrorHandleWith(t => Task.eval(t.printStackTrace()))
26+
}
27+
2228
override protected def bodyFromStreamMessage: BodyFromStreamMessage[Task, MonixStreams] =
2329
new BodyFromStreamMessage[Task, MonixStreams] {
2430

armeria-backend/scalaz/src/main/scala/sttp/client3/armeria/scalaz/ArmeriaScalazBackend.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@ private final class ArmeriaScalazBackend(client: WebClient, closeFactory: Boolea
1717

1818
override val streams: NoStreams = NoStreams
1919

20+
override protected def ensureOnAbnormal[T](effect: Task[T])(finalizer: => Task[Unit]): Task[T] =
21+
effect.handleWith { case e =>
22+
finalizer.handleWith { case e2 => Task(e.addSuppressed(e2)) }.flatMap(_ => Task.fail(e))
23+
}
24+
2025
override protected def bodyFromStreamMessage: BodyFromStreamMessage[Task, Nothing] =
2126
new BodyFromStreamMessage[Task, Nothing] {
2227

armeria-backend/src/main/scala/sttp/client3/armeria/AbstractArmeriaBackend.scala

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,16 +50,32 @@ abstract class AbstractArmeriaBackend[F[_], S <: Streams[S]](
5050

5151
type SE = S with Effect[F]
5252

53+
// #1987: see the comments in HttpClientAsyncBackend
54+
protected def ensureOnAbnormal[T](effect: F[T])(finalizer: => F[Unit]): F[T]
55+
5356
protected def bodyFromStreamMessage: BodyFromStreamMessage[F, S]
5457

5558
protected def streamToPublisher(stream: streams.BinaryStream): Publisher[HttpData]
5659

5760
override def responseMonad: MonadError[F] = monad
5861

59-
override def send[T, R >: SE](request: Request[T, R]): F[Response[T]] =
60-
monad.suspend(adjustExceptions(request)(execute(request)))
62+
override def send[T, R >: SE](request: Request[T, R]): F[Response[T]] = {
63+
// #1987: see the comments in HttpClientAsyncBackend
64+
val armeriaCtx = new AtomicReference[ClientRequestContext]()
65+
ensureOnAbnormal {
66+
monad.suspend(adjustExceptions(request)(execute(request, armeriaCtx)))
67+
} {
68+
monad.eval {
69+
val ctx = armeriaCtx.get()
70+
if (ctx != null) ctx.cancel()
71+
}
72+
}
73+
}
6174

62-
private def execute[T, R >: SE](request: Request[T, R]): F[Response[T]] = {
75+
private def execute[T, R >: SE](
76+
request: Request[T, R],
77+
armeriaCtx: AtomicReference[ClientRequestContext]
78+
): F[Response[T]] = {
6379
val captor = Clients.newContextCaptor()
6480
try {
6581
val armeriaRes = requestToArmeria(request).execute()
@@ -82,6 +98,7 @@ abstract class AbstractArmeriaBackend[F[_], S <: Streams[S]](
8298
noopCanceler
8399
}
84100
case Success(ctx) =>
101+
armeriaCtx.set(ctx)
85102
fromArmeriaResponse(request, armeriaRes, ctx)
86103
}
87104
} catch {

armeria-backend/src/main/scala/sttp/client3/armeria/future/ArmeriaFutureBackend.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@ private final class ArmeriaFutureBackend(client: WebClient, closeFactory: Boolea
1818

1919
override val streams: NoStreams = NoStreams
2020

21+
override protected def ensureOnAbnormal[T](effect: Future[T])(finalizer: => Future[Unit]): Future[T] =
22+
effect.recoverWith { case e =>
23+
finalizer.recoverWith { case e2 => e.addSuppressed(e2); Future.failed(e) }.flatMap(_ => Future.failed(e))
24+
}
25+
2126
override protected def bodyFromStreamMessage: BodyFromStreamMessage[Future, Nothing] =
2227
new BodyFromStreamMessage[Future, Nothing] {
2328

armeria-backend/zio/src/main/scala/sttp/client3/armeria/zio/ArmeriaZioBackend.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@ private final class ArmeriaZioBackend(runtime: Runtime[Any], client: WebClient,
2222

2323
override val streams: ZioStreams = ZioStreams
2424

25+
override protected def ensureOnAbnormal[T](effect: Task[T])(finalizer: => Task[Unit]): Task[T] = effect.onExit {
26+
exit =>
27+
if (exit.isSuccess) ZIO.unit else finalizer.catchAll(t => ZIO.logErrorCause("Error in finalizer", Cause.fail(t)))
28+
}.resurrect
29+
2530
override protected def bodyFromStreamMessage: BodyFromStreamMessage[Task, ZioStreams] =
2631
new BodyFromStreamMessage[Task, ZioStreams] {
2732

armeria-backend/zio1/src/main/scala/sttp/client3/armeria/zio/ArmeriaZioBackend.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@ private final class ArmeriaZioBackend(runtime: Runtime[Any], client: WebClient,
2323

2424
override val streams: ZioStreams = ZioStreams
2525

26+
override protected def ensureOnAbnormal[T](effect: Task[T])(finalizer: => Task[Unit]): Task[T] = effect.onExit {
27+
exit =>
28+
if (exit.succeeded) ZIO.unit else finalizer.catchAll(t => ZIO.effect(t.printStackTrace()).orDie)
29+
}.resurrect
30+
2631
override protected def bodyFromStreamMessage: BodyFromStreamMessage[Task, ZioStreams] =
2732
new BodyFromStreamMessage[Task, ZioStreams] {
2833

0 commit comments

Comments
 (0)