Skip to content

Commit 91effe2

Browse files
committed
poc: generalized approach to extracting steps (#2065)
1 parent a687310 commit 91effe2

File tree

2 files changed

+68
-33
lines changed

2 files changed

+68
-33
lines changed

modules/core/src/main/scala/doobie/hi/connection.scala

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,17 @@ object connection {
8787
loggingInfo
8888
)
8989

90+
def executionWithResultSet[A](
91+
prepared: PreparedExecutionWithResultSet[A],
92+
loggingInfo: LoggingInfo
93+
): ConnectionIO[A] = executeWithResultSet(
94+
prepared.create,
95+
prepared.prep,
96+
prepared.exec,
97+
prepared.process,
98+
loggingInfo
99+
)
100+
90101
/** Create and execute a PreparedStatement which immediately returns the result without reading from a ResultSet. The
91102
* most common case is executing an INSERT/UPDATE and it returning the rows inserted/updated. If the query you're
92103
* executing returns a ResultSet, use `executeWithResultSet` instead for better logging and resource cleanup.
@@ -116,6 +127,17 @@ object connection {
116127
loggingInfo
117128
)
118129

130+
def executeWithoutResultSet[A](
131+
prepared: PreparedExecutionWithoutResultSet[A],
132+
loggingInfo: LoggingInfo
133+
): ConnectionIO[A] =
134+
executeWithoutResultSet(
135+
prepared.create,
136+
prepared.prep,
137+
prepared.exec,
138+
loggingInfo
139+
)
140+
119141
private def execImpl[A](
120142
create: ConnectionIO[PreparedStatement],
121143
prep: PreparedStatementIO[Unit],
@@ -226,6 +248,18 @@ object connection {
226248
} yield ele
227249
}
228250

251+
def stream[A: Read](
252+
prepared: PreparedExecutionStream,
253+
loggingInfo: LoggingInfo
254+
): Stream[ConnectionIO, A] =
255+
stream[A](
256+
prepared.create,
257+
prepared.prep,
258+
prepared.exec,
259+
prepared.chunkSize,
260+
loggingInfo
261+
)
262+
229263
// Old implementation, used by deprecated methods
230264
private def liftStream[A: Read](
231265
chunkSize: Int,
@@ -543,4 +577,24 @@ object connection {
543577
// val nativeTypeMap: ConnectionIO[Map[String, JdbcType]] = {
544578
// getMetaData(IFDMD.getTypeInfo.flatMap(IFDMD.embed(_, HRS.list[(String, JdbcType)].map(_.toMap))))
545579
// }
580+
581+
final case class PreparedExecutionWithResultSet[A](
582+
create: ConnectionIO[PreparedStatement],
583+
prep: PreparedStatementIO[Unit],
584+
exec: PreparedStatementIO[ResultSet],
585+
process: ResultSetIO[A]
586+
)
587+
588+
final case class PreparedExecutionWithoutResultSet[A](
589+
create: ConnectionIO[PreparedStatement],
590+
prep: PreparedStatementIO[Unit],
591+
exec: PreparedStatementIO[A]
592+
)
593+
594+
final case class PreparedExecutionStream(
595+
create: ConnectionIO[PreparedStatement],
596+
prep: PreparedStatementIO[Unit],
597+
exec: PreparedStatementIO[ResultSet],
598+
chunkSize: Int
599+
)
546600
}

modules/core/src/main/scala/doobie/util/query.scala

Lines changed: 14 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import doobie.free.connection.ConnectionIO
1111
import doobie.free.preparedstatement.PreparedStatementIO
1212
import doobie.free.resultset.ResultSetIO
1313
import doobie.free.{connection as IFC, preparedstatement as IFPS}
14+
import doobie.hi.connection.PreparedExecutionWithResultSet
1415
import doobie.hi.{connection as IHC, preparedstatement as IHPS, resultset as IHRS}
1516
import doobie.util.MultiVersionTypeSupport.=:=
1617
import doobie.util.analysis.Analysis
@@ -20,8 +21,6 @@ import doobie.util.log.{LoggingInfo, Parameters}
2021
import doobie.util.pos.Pos
2122
import fs2.Stream
2223

23-
import java.sql.{PreparedStatement, ResultSet}
24-
2524
/** Module defining queries parameterized by input and output types. */
2625
object query {
2726

@@ -114,9 +113,8 @@ object query {
114113
def toMap[K, V](a: A)(implicit ev: B =:= (K, V), f: FactoryCompat[(K, V), Map[K, V]]): ConnectionIO[Map[K, V]] =
115114
toConnectionIO(a, IHRS.buildPair[Map, K, V](f, read.map(ev)))
116115

117-
/**
118-
* Just like `toMap` but allowing to alter `PreparedExecution`.
119-
*/
116+
/** Just like `toMap` but allowing to alter `PreparedExecution`.
117+
*/
120118
def toMapAlteringExecution[K, V](a: A, fn: PreparedExecutionUpdate[Map[K, V]])(implicit
121119
ev: B =:= (K, V),
122120
f: FactoryCompat[(K, V), Map[K, V]]
@@ -153,14 +151,22 @@ object query {
153151
toConnectionIO(a, IHRS.nel[B])
154152

155153
private def toConnectionIO[C](a: A, rsio: ResultSetIO[C]): ConnectionIO[C] =
156-
PreparedExecution(sql, a, rsio).execute(mkLoggingInfo(a))
154+
IHC.executionWithResultSet(preparedExecution(sql, a, rsio), mkLoggingInfo(a))
157155

158156
private def toConnectionIOAlteringExecution[C](
159157
a: A,
160158
rsio: ResultSetIO[C],
161159
fn: PreparedExecutionUpdate[C]
162160
): ConnectionIO[C] =
163-
fn(PreparedExecution(sql, a, rsio)).execute(mkLoggingInfo(a))
161+
IHC.executionWithResultSet(fn(preparedExecution(sql, a, rsio)), mkLoggingInfo(a))
162+
163+
private def preparedExecution[C](sql: String, a: A, rsio: ResultSetIO[C]): PreparedExecutionWithResultSet[C] =
164+
PreparedExecutionWithResultSet(
165+
create = IFC.prepareStatement(sql),
166+
prep = IHPS.set(a),
167+
exec = IFPS.executeQuery,
168+
process = rsio
169+
)
164170

165171
private def mkLoggingInfo(a: A): LoggingInfo =
166172
LoggingInfo(
@@ -258,32 +264,7 @@ object query {
258264

259265
}
260266

261-
type PreparedExecutionUpdate[A] = PreparedExecution[A] => PreparedExecution[A]
262-
263-
final case class PreparedExecution[C](
264-
create: ConnectionIO[PreparedStatement],
265-
prep: PreparedStatementIO[Unit],
266-
exec: PreparedStatementIO[ResultSet],
267-
process: ResultSetIO[C]
268-
) { ctx =>
269-
private[util] def execute(loggingInfo: LoggingInfo) = IHC.executeWithResultSet(
270-
create = ctx.create,
271-
prep = ctx.prep,
272-
exec = ctx.exec,
273-
process = ctx.process,
274-
loggingInfo = loggingInfo
275-
)
276-
}
277-
278-
private object PreparedExecution {
279-
def apply[C, A](sql: String, a: A, rsio: ResultSetIO[C])(implicit w: Write[A]): PreparedExecution[C] =
280-
PreparedExecution(
281-
create = IFC.prepareStatement(sql),
282-
prep = IHPS.set(a),
283-
exec = IFPS.executeQuery,
284-
process = rsio
285-
)
286-
}
267+
type PreparedExecutionUpdate[A] = PreparedExecutionWithResultSet[A] => PreparedExecutionWithResultSet[A]
287268

288269
/** An abstract query closed over its input arguments and yielding values of type `B`, without a specified
289270
* disposition. Methods provided on `[[Query0]]` allow the query to be interpreted as a stream or program in

0 commit comments

Comments
 (0)