Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Early cancellation intermittently causes rediculous connection in an odd state. #53

Open
Swoorup opened this issue Jun 29, 2022 · 4 comments

Comments

@Swoorup
Copy link
Contributor

Swoorup commented Jun 29, 2022

Using rediculous with timeout can cause weird issues intermittently.

Repro test case.

Add the following to RedisCommandSpec.scala and do a few trial runs.

  test("early termination"){
    redisConnection().flatMap{ connection => 
      val msg1 = "msg1" -> "msg1"
      val msg2 = "msg2" -> "msg2"
      val msg3 = "msg3" -> "msg3"

      val xopts = 
        RedisCommands.XReadOpts.default
          .copy(blockMillisecond = 0L.some, 1L.some)
          // .copy(count = 1L.some, blockMillisecond = 1000L.some)

      val offset: Set[RedisCommands.StreamOffset] = Set(RedisCommands.StreamOffset.From("foo", "$"))

      val extract = (resp: Option[List[RedisCommands.XReadResponse]]) => 
        resp.flatMap(_.headOption).flatMap(_.records.headOption).flatMap(_.keyValues.headOption)

      val action = 
        for {
          _ <- (
            RedisCommands.xadd[RedisIO]("foo", List(msg1)),
            RedisCommands.xadd[RedisIO]("foo", List(msg2)),
            RedisCommands.xadd[RedisIO]("foo", List(msg3))
          ).tupled.run(connection)
          msg1 <- RedisCommands.xread[RedisIO](offset, xopts).run(connection).timeout(50.milli).attempt.map{
            case Right(resp) => extract(resp)
            case Left(_) => None
          }
          empty <- RedisCommands.xread[RedisIO](offset, xopts).run(connection).timeout(50.milli).replicateA(100).attempt
          _ <- RedisCommands.del[RedisIO]("foo").run(connection)
        } yield msg1

      action.assertEquals(None)
    }
  }

Observe the error

==> X io.chrisdavenport.rediculous.RedisCommandsSpec.early termination  0.298s io.chrisdavenport.rediculous.RedisError$Generic: Rediculous: Impossible Return List was Empty but we guarantee output matches input
    at io.chrisdavenport.rediculous.RedisConnection$.$anonfun$head$1(RedisConnection.scala:109)
    at cats.ApplicativeError$LiftFromOptionPartially$.apply$extension(ApplicativeError.scala:329)
    at cats.syntax.OptionOps$LiftToPartiallyApplied.apply(option.scala:374)
    at io.chrisdavenport.rediculous.RedisConnection$.head(RedisConnection.scala:109)
    at io.chrisdavenport.rediculous.RedisConnection$.$anonfun$runRequest$24(RedisConnection.scala:113)
    at clear @ org.http4s.client.middleware.Retry$.retryLoop$1(Retry.scala:77)
    at clear @ org.http4s.client.middleware.Retry$.retryLoop$1(Retry.scala:77)
    at flatMap @ org.typelevel.keypool.KeyPool$.put(KeyPool.scala:250)
    at void @ org.typelevel.keypool.KeyPool$.reap(KeyPool.scala:161)
    at get @ fs2.internal.Scope.openScope(Scope.scala:281)
    at flatMap @ org.typelevel.keypool.KeyPool$.$anonfun$take$7(KeyPool.scala:278)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:162)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:162)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:162)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:162)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:162)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:162)
    at clear @ org.http4s.client.middleware.Retry$.retryLoop$1(Retry.scala:77)
    at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:162)

Fix is probably to add .uncancelable statement to the effect returned by RedisConnection::explicitPipelineRequest

@ChristopherDavenport
Copy link
Collaborator

ChristopherDavenport commented Jul 8, 2022

Ok, I think I've narrowed it down. I leverage the queued rather than pooled generally which is why I haven't run into tis. But in the pool you have the ability to interrupt the underlying system which can leave connections in a bad state.

def makeSoftCancelable[F[_]: Concurrent, A](fa: F[A], supervisor: Supervisor[F]): F[A] = {
    supervisor.supervise(fa)
    .flatMap(_.joinWith(Concurrent[F].raiseError(new java.util.concurrent.CancellationException("Outcome was Canceled"))))
  }

I should be able to make this soft cancelable with a dispatcher in the pooled and direct versions to prevent these leaking.

@Swoorup
Copy link
Contributor Author

Swoorup commented Jul 8, 2022

Great to hear. I do recall having issues with pool, than changing it to queued few months ago, but didn't note what caused the issue, since it was intermittent.

@ChristopherDavenport
Copy link
Collaborator

Hmmm, well that fixed that error and introduced a new one. We're in for a wild ride.

==> X io.chrisdavenport.rediculous.RedisCommandsSpec.early termination  30.004s java.util.concurrent.TimeoutException: Future timed out after [30 seconds]

@ChristopherDavenport
Copy link
Collaborator

Its binary compatible though and should remain so, so I think we are good to release the next version and fix it afterwards.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants