Skip to content

Commit 64b1799

Browse files
authored
Improve performance of ZQuery#run (#497)
* Improve performance of `ZQuery#run` * Implement using a custom scope * Reset finalizers to avoid leaks * Cleanups
1 parent a57ea1d commit 64b1799

File tree

7 files changed

+93
-12
lines changed

7 files changed

+93
-12
lines changed

benchmarks/src/main/scala/zio/query/BenchmarkUtil.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,7 @@ object BenchmarkUtil extends Runtime[Any] { self =>
1414

1515
def unsafeRunCache[E, A](query: ZQuery[Any, E, A], cache: Cache): A =
1616
Unsafe.unsafe(implicit unsafe => self.unsafe.run(query.runCache(cache)).getOrThrowFiberFailure())
17+
18+
def unsafeRunZIO[E, A](query: ZIO[Any, E, A]): A =
19+
Unsafe.unsafe(implicit unsafe => self.unsafe.run(query).getOrThrowFiberFailure())
1720
}

benchmarks/src/main/scala/zio/query/DataSourceBenchmark.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package zio.query
33
import cats.effect.IO
44
import cats.effect.unsafe.implicits._
55
import cats.syntax.all._
6-
import fetch.Fetch
6+
import fetch.{Fetch, fetchM}
77
import org.openjdk.jmh.annotations.{Scope => JScope, _}
88
import zio.query.BenchmarkUtil._
99
import zio.{Chunk, ZIO}
@@ -74,7 +74,7 @@ class DataSourceBenchmark {
7474

7575
object ZQueryImpl {
7676
case class Req(i: Int) extends Request[Nothing, Int]
77-
val ds = DataSource.fromFunctionBatchedZIO("PlusOne") { reqs: Chunk[Req] => ZIO.succeed(reqs.map(_.i + 1)) }
77+
val ds = DataSource.fromFunctionBatchedZIO("PlusOne") { (reqs: Chunk[Req]) => ZIO.succeed(reqs.map(_.i + 1)) }
7878
}
7979

8080
object FetchImpl {

benchmarks/src/main/scala/zio/query/FromRequestBenchmark.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,5 +69,5 @@ class FromRequestBenchmark {
6969
}
7070

7171
private case class Req(i: Int) extends Request[Nothing, Int]
72-
private val ds = DataSource.fromFunctionBatchedZIO("Datasource") { reqs: Chunk[Req] => ZIO.succeed(reqs.map(_.i)) }
72+
private val ds = DataSource.fromFunctionBatchedZIO("Datasource") { (reqs: Chunk[Req]) => ZIO.succeed(reqs.map(_.i)) }
7373
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package zio.query
2+
3+
import org.openjdk.jmh.annotations.{Scope => JScope, _}
4+
import zio.{Chunk, ZIO}
5+
import zio.query.BenchmarkUtil._
6+
7+
import java.util.concurrent.TimeUnit
8+
9+
@Measurement(iterations = 3, time = 1, timeUnit = TimeUnit.SECONDS)
10+
@Warmup(iterations = 3, time = 1, timeUnit = TimeUnit.SECONDS)
11+
@Fork(1)
12+
@Threads(1)
13+
@State(JScope.Thread)
14+
@BenchmarkMode(Array(Mode.Throughput))
15+
@OutputTimeUnit(TimeUnit.MILLISECONDS)
16+
class ZQueryBenchmark {
17+
val cache = Cache.unsafeMake()
18+
19+
val qs1 = Chunk.fill(1000)(ZQuery.succeedNow("foo").runCache(cache))
20+
val qs2 = Chunk.fill(1000)(ZQuery.succeed("foo").runCache(cache))
21+
22+
@Benchmark
23+
@OperationsPerInvocation(1000)
24+
def zQueryRunSucceedNowBenchmark() =
25+
unsafeRunZIO(ZIO.collectAllDiscard(qs1))
26+
27+
@Benchmark
28+
@OperationsPerInvocation(1000)
29+
def zQueryRunSucceedBenchmark() =
30+
unsafeRunZIO(ZIO.collectAllDiscard(qs2))
31+
}

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ lazy val allScalas = List("2.12", "2.13", "3.3")
1313
inThisBuild(
1414
List(
1515
name := "ZIO Query",
16-
zioVersion := "2.1.4",
16+
zioVersion := "2.1.6",
1717
scalaVersion := scalaV,
1818
developers := List(
1919
Developer(

zio-query/shared/src/main/scala/zio/query/ZQuery.scala

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -536,12 +536,17 @@ final class ZQuery[-R, +E, +A] private (private val step: ZIO[R, Nothing, Result
536536
def runCache(cache: => Cache)(implicit trace: Trace): ZIO[R, E, A] =
537537
asExitOrElse(null) match {
538538
case null =>
539-
ZIO.acquireReleaseExitWith {
540-
Scope.make
541-
} { (scope: Scope.Closeable, exit: Exit[E, A]) =>
542-
scope.close(exit)
543-
} { scope =>
544-
ZQuery.currentScope.locally(scope)(ZQuery.currentCache.locally(cache)(runToZIO))
539+
ZIO.uninterruptibleMask { restore =>
540+
ZIO.withFiberRuntime[R, E, A] { (state, _) =>
541+
val scope = QueryScope.make()
542+
state.setFiberRef(ZQuery.currentCache, cache)
543+
state.setFiberRef(ZQuery.currentScope, scope)
544+
restore(runToZIO).exitWith { exit =>
545+
state.deleteFiberRef(ZQuery.currentCache)
546+
state.deleteFiberRef(ZQuery.currentScope)
547+
scope.closeAndExitWith(exit)
548+
}
549+
}
545550
}
546551
case exit => exit
547552
}
@@ -1826,8 +1831,8 @@ object ZQuery {
18261831
val currentCache: FiberRef[Cache] =
18271832
FiberRef.unsafe.make(Cache.unsafeMake())(Unsafe.unsafe)
18281833

1829-
val currentScope: FiberRef[Scope] =
1830-
FiberRef.unsafe.make[Scope](Scope.global)(Unsafe.unsafe)
1834+
val currentScope: FiberRef[QueryScope] =
1835+
FiberRef.unsafe.make[QueryScope](QueryScope.NoOp)(Unsafe.unsafe)
18311836

18321837
final class Acquire[-R, +E, +A](private val acquire: () => ZIO[R, E, A]) extends AnyVal {
18331838
def apply[R1](release: A => URIO[R1, Any]): Release[R with R1, E, A] =
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package zio.query.internal
2+
3+
import zio._
4+
import zio.stacktracer.TracingImplicits.disableAutoTrace
5+
6+
import java.util.concurrent.atomic.AtomicReference
7+
8+
/**
9+
* Lightweight variant of [[zio.Scope]], optimized for usage with ZQuery
10+
*/
11+
sealed trait QueryScope {
12+
def addFinalizerExit(f: Exit[Any, Any] => UIO[Any])(implicit trace: Trace): UIO[Unit]
13+
def closeAndExitWith[E, A](exit: Exit[E, A])(implicit trace: Trace): IO[E, A]
14+
}
15+
16+
private[query] object QueryScope {
17+
def make(): QueryScope = new Default
18+
19+
case object NoOp extends QueryScope {
20+
def addFinalizerExit(f: Exit[Any, Any] => UIO[Any])(implicit trace: Trace): UIO[Unit] = ZIO.unit
21+
def closeAndExitWith[E, A](exit: Exit[E, A])(implicit trace: Trace): IO[E, A] = exit
22+
}
23+
24+
final private class Default extends QueryScope {
25+
private val ref = new AtomicReference(List.empty[Exit[Any, Any] => UIO[Any]])
26+
27+
def addFinalizerExit(f: Exit[Any, Any] => UIO[Any])(implicit trace: Trace): UIO[Unit] =
28+
ZIO.succeed {
29+
ref.updateAndGet(f :: _)
30+
()
31+
}
32+
33+
def closeAndExitWith[E, A](exit: Exit[E, A])(implicit trace: Trace): IO[E, A] = {
34+
val finalizers = ref.get
35+
if (finalizers.isEmpty) exit
36+
else {
37+
ref.set(Nil)
38+
ZIO.foreachDiscard(finalizers)(_(exit)) *> exit
39+
}
40+
}
41+
}
42+
}

0 commit comments

Comments
 (0)