Skip to content

Commit f62a810

Browse files
authored
Upgrade to ZIO 2.0.0-RC4 (#324)
* upgrade zio version * optimize foreachpar * fix auto tupling issue
1 parent a275b17 commit f62a810

File tree

5 files changed

+127
-64
lines changed

5 files changed

+127
-64
lines changed

build.sbt

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ inThisBuild(
2323
addCommandAlias("fmt", "all scalafmtSbt scalafmt test:scalafmt")
2424
addCommandAlias("check", "all scalafmtSbtCheck scalafmtCheck test:scalafmtCheck")
2525

26-
val zioVersion = "2.0.0-RC3"
26+
val zioVersion = "2.0.0-RC4"
2727

2828
lazy val root = project
2929
.in(file("."))

zio-query/shared/src/main/scala/zio/query/ZQuery.scala

+37-47
Original file line numberDiff line numberDiff line change
@@ -606,14 +606,14 @@ final class ZQuery[-R, +E, +A] private (private val step: ZIO[R, Nothing, Result
606606
/**
607607
* Returns a new query that executes this one and times the execution.
608608
*/
609-
final def timed(implicit trace: ZTraceElement): ZQuery[R with Clock, E, (Duration, A)] =
609+
final def timed(implicit trace: ZTraceElement): ZQuery[R, E, (Duration, A)] =
610610
summarized(Clock.nanoTime)((start, end) => Duration.fromNanos(end - start))
611611

612612
/**
613613
* Returns an effect that will timeout this query, returning `None` if the
614614
* timeout elapses before the query was completed.
615615
*/
616-
final def timeout(duration: => Duration)(implicit trace: ZTraceElement): ZQuery[R with Clock, E, Option[A]] =
616+
final def timeout(duration: => Duration)(implicit trace: ZTraceElement): ZQuery[R, E, Option[A]] =
617617
timeoutTo(None)(Some(_))(duration)
618618

619619
/**
@@ -622,7 +622,7 @@ final class ZQuery[-R, +E, +A] private (private val step: ZIO[R, Nothing, Result
622622
*/
623623
final def timeoutFail[E1 >: E](e: => E1)(duration: => Duration)(implicit
624624
trace: ZTraceElement
625-
): ZQuery[R with Clock, E1, A] =
625+
): ZQuery[R, E1, A] =
626626
timeoutTo(ZQuery.fail(e))(ZQuery.succeedNow)(duration).flatten
627627

628628
/**
@@ -631,7 +631,7 @@ final class ZQuery[-R, +E, +A] private (private val step: ZIO[R, Nothing, Result
631631
*/
632632
final def timeoutFailCause[E1 >: E](cause: => Cause[E1])(duration: => Duration)(implicit
633633
trace: ZTraceElement
634-
): ZQuery[R with Clock, E1, A] =
634+
): ZQuery[R, E1, A] =
635635
timeoutTo(ZQuery.failCause(cause))(ZQuery.succeedNow)(duration).flatten
636636

637637
/**
@@ -641,7 +641,7 @@ final class ZQuery[-R, +E, +A] private (private val step: ZIO[R, Nothing, Result
641641
@deprecated("use timeoutFailCause", "0.3.0")
642642
final def timeoutHalt[E1 >: E](cause: => Cause[E1])(duration: => Duration)(implicit
643643
trace: ZTraceElement
644-
): ZQuery[R with Clock, E1, A] =
644+
): ZQuery[R, E1, A] =
645645
timeoutFailCause(cause)(duration)
646646

647647
/**
@@ -1216,17 +1216,7 @@ object ZQuery {
12161216
)(
12171217
f: A => ZQuery[R, E, B]
12181218
)(implicit bf: BuildFrom[Collection[A], B, Collection[B]], trace: ZTraceElement): ZQuery[R, E, Collection[B]] =
1219-
if (as.isEmpty) ZQuery.succeed(bf.newBuilder(as).result())
1220-
else {
1221-
val iterator = as.iterator
1222-
var builder: ZQuery[R, E, Builder[B, Collection[B]]] = null
1223-
while (iterator.hasNext) {
1224-
val a = iterator.next()
1225-
if (builder eq null) builder = f(a).map(bf.newBuilder(as) += _)
1226-
else builder = builder.zipWithPar(f(a))(_ += _)
1227-
}
1228-
builder.map(_.result())
1229-
}
1219+
ZQuery(ZIO.foreachPar(Chunk.fromIterable(as))(f(_).step).map(Result.collectAllPar(_).map(bf.fromSpecific(as))))
12301220

12311221
/**
12321222
* Performs a query for each element in a Set, collecting the results
@@ -1513,38 +1503,38 @@ object ZQuery {
15131503
final class TimeoutTo[-R, +E, +A, +B](self: ZQuery[R, E, A], b: () => B) {
15141504
def apply[B1 >: B](
15151505
f: A => B1
1516-
)(duration: => Duration)(implicit trace: ZTraceElement): ZQuery[R with Clock, E, B1] =
1517-
ZQuery.environment[Clock].flatMap { clock =>
1518-
def race(
1519-
query: ZQuery[R, E, B1],
1520-
fiber: Fiber[Nothing, B1]
1521-
): ZQuery[R, E, B1] =
1522-
ZQuery {
1523-
query.step.raceWith[R, Nothing, Nothing, B1, Result[R, E, B1]](fiber.join)(
1524-
(leftExit, rightFiber) =>
1525-
leftExit.foldZIO(
1526-
cause => rightFiber.interrupt *> ZIO.succeedNow(Result.fail(cause)),
1527-
result =>
1528-
result match {
1529-
case Result.Blocked(blockedRequests, continue) =>
1530-
continue match {
1531-
case Continue.Effect(query) =>
1532-
ZIO.succeedNow(Result.blocked(blockedRequests, Continue.effect(race(query, fiber))))
1533-
case Continue.Get(io) =>
1534-
ZIO.succeedNow(
1535-
Result.blocked(blockedRequests, Continue.effect(race(ZQuery.fromZIO(io), fiber)))
1536-
)
1537-
}
1538-
case Result.Done(value) => rightFiber.interrupt *> ZIO.succeedNow(Result.done(value))
1539-
case Result.Fail(cause) => rightFiber.interrupt *> ZIO.succeedNow(Result.fail(cause))
1540-
}
1541-
),
1542-
(rightExit, leftFiber) => leftFiber.interrupt *> ZIO.succeedNow(Result.fromExit(rightExit))
1543-
)
1544-
}
1506+
)(duration: => Duration)(implicit trace: ZTraceElement): ZQuery[R, E, B1] = {
1507+
1508+
def race(
1509+
query: ZQuery[R, E, B1],
1510+
fiber: Fiber[Nothing, B1]
1511+
): ZQuery[R, E, B1] =
1512+
ZQuery {
1513+
query.step.raceWith[R, Nothing, Nothing, B1, Result[R, E, B1]](fiber.join)(
1514+
(leftExit, rightFiber) =>
1515+
leftExit.foldZIO(
1516+
cause => rightFiber.interrupt *> ZIO.succeedNow(Result.fail(cause)),
1517+
result =>
1518+
result match {
1519+
case Result.Blocked(blockedRequests, continue) =>
1520+
continue match {
1521+
case Continue.Effect(query) =>
1522+
ZIO.succeedNow(Result.blocked(blockedRequests, Continue.effect(race(query, fiber))))
1523+
case Continue.Get(io) =>
1524+
ZIO.succeedNow(
1525+
Result.blocked(blockedRequests, Continue.effect(race(ZQuery.fromZIO(io), fiber)))
1526+
)
1527+
}
1528+
case Result.Done(value) => rightFiber.interrupt *> ZIO.succeedNow(Result.done(value))
1529+
case Result.Fail(cause) => rightFiber.interrupt *> ZIO.succeedNow(Result.fail(cause))
1530+
}
1531+
),
1532+
(rightExit, leftFiber) => leftFiber.interrupt *> ZIO.succeedNow(Result.fromExit(rightExit))
1533+
)
1534+
}
15451535

1546-
ZQuery.fromZIO(clock.get.sleep(duration).interruptible.as(b()).fork).flatMap(fiber => race(self.map(f), fiber))
1547-
}
1536+
ZQuery.fromZIO(ZIO.sleep(duration).interruptible.as(b()).fork).flatMap(fiber => race(self.map(f), fiber))
1537+
}
15481538
}
15491539

15501540
final class ServiceWithPartiallyApplied[R](private val dummy: Boolean = true) extends AnyVal {

zio-query/shared/src/main/scala/zio/query/internal/Continue.scala

+37-1
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
package zio.query.internal
22

3+
import zio._
34
import zio.query._
45
import zio.query.internal.Continue._
56
import zio.stacktracer.TracingImplicits.disableAutoTrace
6-
import zio.{ CanFail, Cause, IO, Ref, ZEnvironment, ZIO, ZTraceElement }
77

88
/**
99
* A `Continue[R, E, A]` models a continuation of a blocked request that
@@ -167,6 +167,42 @@ private[query] object Continue {
167167
}
168168
}
169169

170+
/**
171+
* Collects a collection of continuation into a continuation returning a
172+
* collection of their results, in parallel.
173+
*/
174+
def collectAllPar[R, E, A, Collection[+Element] <: Iterable[Element]](
175+
continues: Collection[Continue[R, E, A]]
176+
)(implicit
177+
bf: BuildFrom[Collection[Continue[R, E, A]], A, Collection[A]],
178+
trace: ZTraceElement
179+
): Continue[R, E, Collection[A]] =
180+
continues.zipWithIndex
181+
.foldLeft[(Chunk[(ZQuery[R, E, A], Int)], Chunk[(IO[E, A], Int)])]((Chunk.empty, Chunk.empty)) {
182+
case ((queries, ios), (continue, index)) =>
183+
continue match {
184+
case Effect(query) => (queries :+ ((query, index)), ios)
185+
case Get(io) => (queries, ios :+ ((io, index)))
186+
}
187+
} match {
188+
case (Chunk(), ios) =>
189+
get(ZIO.collectAll(ios.map(_._1)).map(bf.fromSpecific(continues)))
190+
case (queries, ios) =>
191+
val query = ZQuery.collectAllPar(queries.map(_._1)).flatMap { as =>
192+
val array = Array.ofDim[AnyRef](continues.size)
193+
as.zip(queries.map(_._2)).foreach { case (a, i) =>
194+
array(i) = a.asInstanceOf[AnyRef]
195+
}
196+
ZQuery.fromZIO(ZIO.collectAll(ios.map(_._1))).map { as =>
197+
as.zip(ios.map(_._2)).foreach { case (a, i) =>
198+
array(i) = a.asInstanceOf[AnyRef]
199+
}
200+
bf.fromSpecific(continues)(array.asInstanceOf[Array[A]])
201+
}
202+
}
203+
effect(query)
204+
}
205+
170206
/**
171207
* Constructs a continuation that may perform arbitrary effects.
172208
*/

zio-query/shared/src/main/scala/zio/query/internal/Result.scala

+38-1
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
package zio.query.internal
22

3+
import zio._
34
import zio.query.internal.Result._
45
import zio.query.{ DataSourceAspect, Described }
56
import zio.stacktracer.TracingImplicits.disableAutoTrace
6-
import zio.{ CanFail, Cause, Exit, ZEnvironment, ZTraceElement }
77

88
/**
99
* A `Result[R, E, A]` is the result of running one step of a `ZQuery`. A
@@ -111,6 +111,43 @@ private[query] object Result {
111111
def blocked[R, E, A](blockedRequests: BlockedRequests[R], continue: Continue[R, E, A]): Result[R, E, A] =
112112
Blocked(blockedRequests, continue)
113113

114+
/**
115+
* Collects a collection of results into a single result. Blocked requests
116+
* and their continuations will be executed in parallel.
117+
*/
118+
def collectAllPar[R, E, A, Collection[+Element] <: Iterable[Element]](results: Collection[Result[R, E, A]])(implicit
119+
bf: BuildFrom[Collection[Result[R, E, A]], A, Collection[A]],
120+
trace: ZTraceElement
121+
): Result[R, E, Collection[A]] =
122+
results.zipWithIndex
123+
.foldLeft[(Chunk[((BlockedRequests[R], Continue[R, E, A]), Int)], Chunk[(A, Int)], Chunk[(Cause[E], Int)])](
124+
(Chunk.empty, Chunk.empty, Chunk.empty)
125+
) { case ((blocked, done, fails), (result, index)) =>
126+
result match {
127+
case Blocked(br, c) => (blocked :+ (((br, c), index)), done, fails)
128+
case Done(a) => (blocked, done :+ ((a, index)), fails)
129+
case Fail(e) => (blocked, done, fails :+ ((e, index)))
130+
}
131+
} match {
132+
case (Chunk(), done, Chunk()) =>
133+
Result.done(bf.fromSpecific(results)(done.map(_._1)))
134+
case (blocked, done, Chunk()) =>
135+
val blockedRequests = blocked.map(_._1._1).foldLeft[BlockedRequests[R]](BlockedRequests.empty)(_ && _)
136+
val continue = Continue.collectAllPar(blocked.map(_._1._2)).map { as =>
137+
val array = Array.ofDim[AnyRef](results.size)
138+
as.zip(blocked.map(_._2)).foreach { case (a, i) =>
139+
array(i) = a.asInstanceOf[AnyRef]
140+
}
141+
done.foreach { case (a, i) =>
142+
array(i) = a.asInstanceOf[AnyRef]
143+
}
144+
bf.fromSpecific(results)(array.asInstanceOf[Array[A]])
145+
}
146+
Result.blocked(blockedRequests, continue)
147+
case (_, _, fail) =>
148+
Result.fail(fail.map(_._1).foldLeft[Cause[E]](Cause.empty)(_ && _))
149+
}
150+
114151
/**
115152
* Constructs a result that is done with the specified value.
116153
*/

zio-query/shared/src/test/scala/zio/query/ZQuerySpec.scala

+14-14
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ object ZQuerySpec extends ZIOBaseSpec {
172172
richUsers <- ZQuery.foreachPar(users) { user =>
173173
Sources
174174
.getPayment(user.paymentId)
175-
.zipPar(Sources.getAddress(user.addressId))
175+
.zip(Sources.getAddress(user.addressId))
176176
.map { case (payment, address) =>
177177
(user, payment, address)
178178
}
@@ -281,8 +281,8 @@ object ZQuerySpec extends ZIOBaseSpec {
281281
case object GetAllIds extends UserRequest[List[Int]]
282282
final case class GetNameById(id: Int) extends UserRequest[String]
283283

284-
val UserRequestDataSource: DataSource[Console, UserRequest[Any]] =
285-
DataSource.Batched.make[Console, UserRequest[Any]]("UserRequestDataSource") { requests =>
284+
val UserRequestDataSource: DataSource[Any, UserRequest[Any]] =
285+
DataSource.Batched.make[Any, UserRequest[Any]]("UserRequestDataSource") { requests =>
286286
ZIO.when(requests.toSet.size != requests.size)(ZIO.dieMessage("Duplicate requests)")) *>
287287
Console.printLine(requests.toString).orDie *>
288288
ZIO.succeed {
@@ -294,25 +294,25 @@ object ZQuerySpec extends ZIOBaseSpec {
294294
}
295295
}
296296

297-
val getAllUserIds: ZQuery[Console, Nothing, List[Int]] =
297+
val getAllUserIds: ZQuery[Any, Nothing, List[Int]] =
298298
ZQuery.fromRequest(GetAllIds)(UserRequestDataSource)
299299

300-
def getUserNameById(id: Int): ZQuery[Console, Nothing, String] =
300+
def getUserNameById(id: Int): ZQuery[Any, Nothing, String] =
301301
ZQuery.fromRequest(GetNameById(id))(UserRequestDataSource)
302302

303-
val getAllUserNames: ZQuery[Console, Nothing, List[String]] =
303+
val getAllUserNames: ZQuery[Any, Nothing, List[String]] =
304304
for {
305305
userIds <- getAllUserIds
306306
userNames <- ZQuery.foreachPar(userIds)(getUserNameById)
307307
} yield userNames
308308

309309
case object GetFoo extends Request[Nothing, String]
310-
val getFoo: ZQuery[Console, Nothing, String] = ZQuery.fromRequest(GetFoo)(
310+
val getFoo: ZQuery[Any, Nothing, String] = ZQuery.fromRequest(GetFoo)(
311311
DataSource.fromFunctionZIO("foo")(_ => Console.printLine("Running foo query") *> ZIO.succeed("foo"))
312312
)
313313

314314
case object GetBar extends Request[Nothing, String]
315-
val getBar: ZQuery[Console, Nothing, String] = ZQuery.fromRequest(GetBar)(
315+
val getBar: ZQuery[Any, Nothing, String] = ZQuery.fromRequest(GetBar)(
316316
DataSource.fromFunctionZIO("bar")(_ => Console.printLine("Running bar query") *> ZIO.succeed("bar"))
317317
)
318318

@@ -447,12 +447,12 @@ object ZQuerySpec extends ZIOBaseSpec {
447447
4 -> "d"
448448
)
449449

450-
def backendGetAll: ZIO[Console, Nothing, Map[Int, String]] =
450+
def backendGetAll: ZIO[Any, Nothing, Map[Int, String]] =
451451
for {
452452
_ <- Console.printLine("getAll called").orDie
453453
} yield testData
454454

455-
def backendGetSome(ids: Chunk[Int]): ZIO[Console, Nothing, Map[Int, String]] =
455+
def backendGetSome(ids: Chunk[Int]): ZIO[Any, Nothing, Map[Int, String]] =
456456
for {
457457
_ <- Console.printLine(s"getSome ${ids.mkString(", ")} called").orDie
458458
} yield ids.flatMap { id =>
@@ -468,10 +468,10 @@ object ZQuerySpec extends ZIOBaseSpec {
468468
final case class Get(id: Int) extends Req[String]
469469
}
470470

471-
val ds: DataSource.Batched[Console, Req[_]] = new DataSource.Batched[Console, Req[_]] {
471+
val ds: DataSource.Batched[Any, Req[_]] = new DataSource.Batched[Any, Req[_]] {
472472
override def run(
473473
requests: Chunk[Req[_]]
474-
)(implicit trace: ZTraceElement): ZIO[Console, Nothing, CompletedRequestMap] = {
474+
)(implicit trace: ZTraceElement): ZIO[Any, Nothing, CompletedRequestMap] = {
475475
val (all, oneByOne) = requests.partition {
476476
case Req.GetAll => true
477477
case Req.Get(_) => false
@@ -505,8 +505,8 @@ object ZQuerySpec extends ZIOBaseSpec {
505505
override val identifier: String = "test"
506506
}
507507

508-
def getAll: ZQuery[Console, DataSourceErrors, Map[Int, String]] =
508+
def getAll: ZQuery[Any, DataSourceErrors, Map[Int, String]] =
509509
ZQuery.fromRequest(Req.GetAll)(ds)
510-
def get(id: Int): ZQuery[Console, DataSourceErrors, String] =
510+
def get(id: Int): ZQuery[Any, DataSourceErrors, String] =
511511
ZQuery.fromRequest(Req.Get(id))(ds)
512512
}

0 commit comments

Comments
 (0)