Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Broadcast Ability #11

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ object RedisCommands {
RedisCtx[F].unkeyed(NEL.of("COMMAND", "COUNT"))

def clientsetname[F[_]: RedisCtx](connectionName: String): F[String] =
RedisCtx[F].unkeyed(NEL.of("CLIENT", "SETNAME", connectionName.encode))
RedisCtx[F].broadcast(NEL.of("CLIENT", "SETNAME", connectionName.encode))

def zrank[F[_]: RedisCtx](key: String, member: String): F[Long] =
RedisCtx[F].keyed(key, NEL.of("ZRANK", key.encode, member.encode))
Expand Down Expand Up @@ -386,7 +386,7 @@ object RedisCommands {
RedisCtx[F].unkeyed(NEL.of("CONFIG", "SET", parameter.encode, value.encode))

def scriptflush[F[_]: RedisCtx]: F[Status] =
RedisCtx[F].unkeyed(NEL.of("SCRIPT", "FLUSH"))
RedisCtx[F].broadcast(NEL.of("SCRIPT", "FLUSH"))

def dbsize[F[_]: RedisCtx]: F[Long] =
RedisCtx[F].unkeyed(NEL.of("DBSIZE"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ object RedisConnection{
) extends RedisConnection[F]
private case class DirectConnection[F[_]](socket: Socket[F]) extends RedisConnection[F]

private case class Cluster[F[_]](queue: Queue[F, Chunk[(Deferred[F, Either[Throwable, Resp]], Option[String], Option[(String, Int)], Int, Resp)]]) extends RedisConnection[F]
private case class Cluster[F[_]](queue: Queue[F, Chunk[(Deferred[F, Either[Throwable, Resp]], RedisCtx.CtxType, Option[(String, Int)], Int, Resp)]]) extends RedisConnection[F]

// Guarantees With Socket That Each Call Receives a Response
// Chunk must be non-empty but to do so incurs a penalty
Expand Down Expand Up @@ -57,7 +57,7 @@ object RedisConnection{

def runRequestInternal[F[_]: Concurrent](connection: RedisConnection[F])(
inputs: NonEmptyList[NonEmptyList[String]],
key: Option[String]
ctx: RedisCtx.CtxType
): F[F[NonEmptyList[Resp]]] = {
val chunk = Chunk.seq(inputs.toList.map(Resp.renderRequest))
def withSocket(socket: Socket[F]): F[NonEmptyList[Resp]] = explicitPipelineRequest[F](socket, chunk).flatMap(l => Sync[F].delay(l.toNel.getOrElse(throw RedisError.Generic("Rediculous: Impossible Return List was Empty but we guarantee output matches input"))))
Expand All @@ -74,7 +74,7 @@ object RedisConnection{
c.traverse(_._1.get).flatMap(_.sequence.traverse(l => Sync[F].delay(l.toNel.getOrElse(throw RedisError.Generic("Rediculous: Impossible Return List was Empty but we guarantee output matches input"))))).rethrow
}
}
case Cluster(queue) => chunk.traverse(resp => Deferred[F, Either[Throwable, Resp]].map((_, key, None, 0, resp))).flatMap{ c =>
case Cluster(queue) => chunk.traverse(resp => Deferred[F, Either[Throwable, Resp]].map((_, ctx, None, 0, resp))).flatMap{ c =>
queue.enqueue1(c).as {
c.traverse(_._1.get).flatMap(_.sequence.traverse(l => Sync[F].delay(l.toNel.getOrElse(throw RedisError.Generic("Rediculous: Impossible Return List was Empty but we guarantee output matches input"))))).rethrow
}
Expand All @@ -83,11 +83,11 @@ object RedisConnection{
}

// Can Be used to implement any low level protocols.
def runRequest[F[_]: Concurrent, A: RedisResult](connection: RedisConnection[F])(input: NonEmptyList[String], key: Option[String]): F[F[Either[Resp, A]]] =
runRequestInternal(connection)(NonEmptyList.of(input), key).map(_.map(nel => RedisResult[A].decode(nel.head)))
def runRequest[F[_]: Concurrent, A: RedisResult](connection: RedisConnection[F])(input: NonEmptyList[String], ctx: RedisCtx.CtxType): F[F[Either[Resp, A]]] =
runRequestInternal(connection)(NonEmptyList.of(input), ctx).map(_.map(nel => RedisResult[A].decode(nel.head)))

def runRequestTotal[F[_]: Concurrent, A: RedisResult](input: NonEmptyList[String], key: Option[String]): Redis[F, A] = Redis(Kleisli{connection: RedisConnection[F] =>
runRequest(connection)(input, key).map{ fE =>
def runRequestTotal[F[_]: Concurrent, A: RedisResult](input: NonEmptyList[String], ctx: RedisCtx.CtxType): Redis[F, A] = Redis(Kleisli{connection: RedisConnection[F] =>
runRequest(connection)(input, ctx).map{ fE =>
fE.flatMap{
case Right(a) => a.pure[F]
case Left([email protected](_)) => ApplicativeError[F, Throwable].raiseError[A](e)
Expand Down Expand Up @@ -196,7 +196,7 @@ object RedisConnection{
).build
sockets <- Resource.liftF(keypool.take((host, port)).map(_.value._1).map(DirectConnection(_)).use(ClusterCommands.clusterslots[Redis[F, *]].run(_)))
refTopology <- Resource.liftF(Ref[F].of(sockets))
queue <- Resource.liftF(Queue.bounded[F, Chunk[(Deferred[F, Either[Throwable,Resp]], Option[String], Option[(String, Int)], Int, Resp)]](maxQueued))
queue <- Resource.liftF(Queue.bounded[F, Chunk[(Deferred[F, Either[Throwable,Resp]], RedisCtx.CtxType, Option[(String, Int)], Int, Resp)]](maxQueued))
cluster = Cluster(queue)
refreshTopology = refTopology.get.flatMap(_.random).flatMap{ case (host, port) =>
keypool.take((host, port)).map(_.value._1).map(DirectConnection(_)).use(ClusterCommands.clusterslots[Redis[F, *]].run(_))
Expand All @@ -208,22 +208,20 @@ object RedisConnection{
Stream.eval(refTopology.get).map{topo =>
Stream.eval(topo.random[F]).flatMap{ default =>
Stream.emits(
chunk.toList.groupBy{ case (_, s, server,_,_) => // TODO Investigate Efficient Group By
server.orElse(s.flatMap(key => topo.served(HashSlot.find(key)))).getOrElse(default) // Explicitly Set Server, Key Hashslot Server, or a default server if none selected.
}.toSeq
sortToServers(chunk, {s => topo.served(HashSlot.find(s))}, default, topo.all).toList
).evalMap{
case (server, rest) =>
keypool.map(_._1).take(server).use{m =>
val out = Chunk.seq(rest.map(_._5))
val out = Chunk.seq(rest.map(_._5).toList)
explicitPipelineRequest(m.value, out).attempt.flatTap{// Currently Guarantee Chunk.size === returnSize
case Left(_) => m.canBeReused.set(Reusable.DontReuse)
case _ => Applicative[F].unit
}
}.flatMap{
case Right(n) =>
n.zipWithIndex.traverse_{
n.zipWithIndex.traverse_[F, Unit]{
case (ref, i) =>
val (toSet, key, _, retries, initialCommand) = rest(i)
val (toSet, key, _, retries, initialCommand) = rest.toList(i)
ref match {
case [email protected](s) if (s.startsWith("MOVED") && retries <= 5) => // MOVED 1234-2020 127.0.0.1:6381
refreshTopology.attempt.void >>
Expand Down Expand Up @@ -284,4 +282,41 @@ object RedisConnection{
Either.catchNonFatal(port.toInt).toOption.map((host, _))
} else None
}

private def sortToServers[F[_]](
chunk: Chunk[(Deferred[F, Either[Throwable, Resp]], RedisCtx.CtxType, Option[(String, Int)], Int, Resp)],
f: String => Option[(String, Int)],
default: (String, Int),
all: Set[(String, Int)]
): Map[(String, Int), NonEmptyList[(Deferred[F, Either[Throwable, Resp]], RedisCtx.CtxType, Option[(String, Int)], Int, Resp)]] = {
groupByMany(chunk){ case (_, ctx, server, _, _) =>
server.map(Set(_)).getOrElse(
ctx match {
case RedisCtx.CtxType.Keyed(key) => f(key).map(Set(_)).getOrElse(Set(default))
case RedisCtx.CtxType.Random => Set(default)
case RedisCtx.CtxType.Broadcast => all
}
)
}
}

private def groupByMany[F[_], A](fa: F[A]): GroupByMany[F, A]= new GroupByMany(fa)

private class GroupByMany[F[_], A](fa: F[A]){
def apply[K](f: A => Set[K])(implicit F: Foldable[F]): Map[K, NonEmptyList[A]] = {
Foldable[F].foldRight(fa, Eval.now(Map.empty[K, NonEmptyList[A]])){
case (x, eva) => eva.map{ acc =>
val k = f(x)
k.foldLeft(acc){ case (acc, k) =>
val nextAcc = acc.get(k).fold(
acc + (k -> NonEmptyList.of(x))
)( value =>
acc + (k -> value.prepend(x))
)
nextAcc
}
}
}.value
}
}
}
14 changes: 12 additions & 2 deletions core/src/main/scala/io/chrisdavenport/rediculous/RedisCtx.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,26 @@ please consult your library documentation.
trait RedisCtx[F[_]]{
def keyed[A: RedisResult](key: String, command: NonEmptyList[String]): F[A]
def unkeyed[A: RedisResult](command: NonEmptyList[String]): F[A]
def broadcast[A: RedisResult](command: NonEmptyList[String]): F[A]
}

object RedisCtx {

def apply[F[_]](implicit ev: RedisCtx[F]): ev.type = ev

sealed trait CtxType
object CtxType {
case class Keyed(key: String) extends CtxType
case object Broadcast extends CtxType
case object Random extends CtxType
}

implicit def redis[F[_]: Concurrent]: RedisCtx[Redis[F, *]] = new RedisCtx[Redis[F, *]]{
def keyed[A: RedisResult](key: String, command: NonEmptyList[String]): Redis[F,A] =
RedisConnection.runRequestTotal(command, Some(key))
RedisConnection.runRequestTotal(command, CtxType.Keyed(key))
def unkeyed[A: RedisResult](command: NonEmptyList[String]): Redis[F, A] =
RedisConnection.runRequestTotal(command, None)
RedisConnection.runRequestTotal(command, CtxType.Random)
def broadcast[A: RedisResult](command: NonEmptyList[String]): Redis[F, A] =
RedisConnection.runRequestTotal(command, CtxType.Broadcast)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ object RedisPipeline {
(i, base, value) <- State.get
_ <- State.set((i + 1, command :: base, value))
} yield RedisTransaction.Queued(l => RedisResult[A].decode(l(i)))})

def broadcast[A: RedisResult](command: NonEmptyList[String]): RedisPipeline[A] =
unkeyed(command)
}

implicit val applicative: Applicative[RedisPipeline] = new Applicative[RedisPipeline]{
Expand Down Expand Up @@ -58,7 +61,7 @@ object RedisPipeline {
Redis(Kleisli{c: RedisConnection[F] =>
val ((_, commandsR, key), RedisTransaction.Queued(f)) = tx.value.value.run((0, List.empty, None)).value
val commands = commandsR.reverse.toNel
commands.traverse(nelCommands => RedisConnection.runRequestInternal(c)(nelCommands, key) // We Have to Actually Send A Command
commands.traverse(nelCommands => RedisConnection.runRequestInternal(c)(nelCommands, key.map(s => RedisCtx.CtxType.Keyed(s)).getOrElse(RedisCtx.CtxType.Random)) // We Have to Actually Send A Command
.map{fNel => RedisConnection.closeReturn(fNel.map(a => f(a.toList)))}
).flatMap{fOpt =>
fOpt.map(_.pure[F]).getOrElse(F.raiseError(RedisError.Generic("Rediculous: Attempted to Pipeline Empty Command")))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ object RedisTransaction {
(i, base, value) <- State.get
_ <- State.set((i + 1, command :: base, value))
} yield Queued(l => RedisResult[A].decode(l(i)))})

def broadcast[A: RedisResult](command: NonEmptyList[String]): RedisTransaction[A] = unkeyed(command)
}
implicit val applicative: Applicative[RedisTransaction] = new Applicative[RedisTransaction]{
def pure[A](a: A) = RedisTransaction(Monad[RedisTxState].pure(Monad[Queued].pure(a)))
Expand Down Expand Up @@ -122,7 +124,7 @@ object RedisTransaction {
NonEmptyList.of("MULTI"),
commands ++
List(NonEmptyList.of("EXEC"))
), key).map{_.flatMap{_.last match {
), key.map(s => RedisCtx.CtxType.Keyed(s)).getOrElse(RedisCtx.CtxType.Random)).map{_.flatMap{_.last match {
case Resp.Array(Some(a)) => f(a).fold[TxResult[A]](e => TxResult.Error(e.toString), TxResult.Success(_)).pure[F]
case Resp.Array(None) => (TxResult.Aborted: TxResult[A]).pure[F]
case other => ApplicativeError[F, Throwable].raiseError(RedisError.Generic(s"EXEC returned $other"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ object ClusterCommands {
}
}
final case class ClusterSlots(l: List[ClusterSlot]){

def all: Set[(String, Int)] = l.flatMap(_.replicas).map(s => (s.host, s.port)).toSet

def served(bucket: Int): Option[(String, Int)] =
l.collectFirst{
case ClusterSlot(start, end, master :: _) if start <= bucket && end >= bucket => (master.host, master.port)
Expand Down