Skip to content

Commit

Permalink
Add support for RedisClient in "batch" mode to RedisClientPool (debas…
Browse files Browse the repository at this point in the history
…ishg#287)

* Version 3.42-SNAPSHOT

* Add support for RedisClient in batch mode to RedisPool, and some documentation clarifying pipeline/multi/batch modes.

* add .bsp to .gitignore
  • Loading branch information
noahlz authored Oct 30, 2021
1 parent 9321ce9 commit 67be3b5
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 9 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ src_managed/
project/boot/
project/plugins/project/
project/activator-sbt*
.bsp

.env

Expand Down
3 changes: 2 additions & 1 deletion .sbtopts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
-J-Xms512m
-J-Xmx3g
-J-XX:MaxMetaspaceSize=512m
#-Dsbt.task.timings=true
#-Dsbt.task.timings=true
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ lazy val redisClient = (project in file(".")).settings(coreSettings : _*)

lazy val commonSettings: Seq[Setting[_]] = Seq(
organization := "net.debasishg",
version := "3.41",
version := "3.42-SNAPSHOT",
scalaVersion := "2.13.6",
crossScalaVersions := Seq("2.12.14", "2.11.12", "2.10.7"),

Expand Down
9 changes: 5 additions & 4 deletions src/main/scala/com/redis/Pool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import javax.net.ssl.SSLContext
import org.apache.commons.pool2._
import org.apache.commons.pool2.impl._

private [redis] class RedisClientFactory(val host: String, val port: Int, val database: Int = 0, val secret: Option[Any] = None, val timeout : Int = 0, val sslContext: Option[SSLContext] = None)
private [redis] class RedisClientFactory(val host: String, val port: Int, val database: Int = 0, val secret: Option[Any] = None, val timeout : Int = 0, val sslContext: Option[SSLContext] = None, batch: RedisClient.Mode = RedisClient.SINGLE)
extends PooledObjectFactory[RedisClient] {

// when we make an object it's already connected
override def makeObject: PooledObject[RedisClient] = {
new DefaultPooledObject[RedisClient](new RedisClient(host, port, database, secret, timeout, sslContext))
new DefaultPooledObject[RedisClient](new RedisClient(host, port, database, secret, timeout, sslContext, batch))
}

// quit & disconnect
Expand Down Expand Up @@ -42,7 +42,8 @@ class RedisClientPool(
val timeout: Int = 0,
val maxConnections: Int = RedisClientPool.UNLIMITED_CONNECTIONS,
val poolWaitTimeout: Long = 3000,
val sslContext: Option[SSLContext] = None
val sslContext: Option[SSLContext] = None,
val batch: RedisClient.Mode = RedisClient.SINGLE
) {

val objectPoolConfig = new GenericObjectPoolConfig[RedisClient]
Expand All @@ -54,7 +55,7 @@ class RedisClientPool(

val abandonedConfig = new AbandonedConfig
abandonedConfig.setRemoveAbandonedTimeout(TimeUnit.MILLISECONDS.toSeconds(poolWaitTimeout).toInt)
val pool = new GenericObjectPool(new RedisClientFactory(host, port, database, secret, timeout, sslContext), objectPoolConfig,abandonedConfig)
val pool = new GenericObjectPool(new RedisClientFactory(host, port, database, secret, timeout, sslContext, batch), objectPoolConfig,abandonedConfig)
override def toString: String = host + ":" + String.valueOf(port)

def withClient[T](body: RedisClient => T): T = {
Expand Down
25 changes: 22 additions & 3 deletions src/main/scala/com/redis/RedisClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,13 @@ class RedisClient(override val host: String, override val port: Int,
)
override def toString: String = host + ":" + String.valueOf(port) + "/" + database

// with MULTI/EXEC
/**
* Execute operations in the context of a MULTI command.
* If you want to send the commands in batch mode, use the `batchedPipeline` method.
*
* @throws NullPointerException if code attempts to access the results of any Redis command within the block.
* @see https://redis.io/commands/multi
*/
def pipeline(f: PipelineClient => Any): Option[List[Any]] = {
send("MULTI", false)(asString) // flush reply stream
try {
Expand Down Expand Up @@ -186,6 +192,8 @@ class RedisClient(override val host: String, override val port: Int,
* </pre>
*
* Or the client may wish to track and get the promises as soon as the underlying <tt>Future</tt> is completed.
*
* @throws NullPointerException if code attempts to access the results of any Redis command within the block.
*/
def pipelineNoMulti(commands: Seq[() => Any]) = {
val ps = List.fill(commands.size)(Promise[Any]())
Expand All @@ -206,9 +214,15 @@ class RedisClient(override val host: String, override val port: Int,
ps
}

// batched pipelines : all commands submitted in batch
/**
* Executes all the provided commands a single communication, returning a list with all the results.
*
* @throws IllegalStateException if this client was not intialized for batch (pipelined) messaging
* @see https://redis.io/topics/pipelining
*/
def batchedPipeline(commands: List[() => Any]): Option[List[Any]] = {
assert(batch == BATCH)
if (batch != BATCH) throw new IllegalStateException("Cannot use batch operations for non-batch mode client")

commands.foreach { command =>
command()
}
Expand All @@ -218,6 +232,11 @@ class RedisClient(override val host: String, override val port: Int,
r
}

/**
* Redis client which sends all messages in the context of a MULTI command, providing transaction semantics.
*
* @see https://redis.io/commands/multi
*/
class PipelineClient(parent: RedisClient) extends RedisCommand(parent.batch) with PubOperations {
import com.redis.serialization.Parse

Expand Down

0 comments on commit 67be3b5

Please sign in to comment.