Skip to content
This repository has been archived by the owner on Jul 22, 2022. It is now read-only.

Commit

Permalink
Merge pull request #1 from poslegm/quill-3.11
Browse files Browse the repository at this point in the history
  • Loading branch information
kubukoz authored Nov 22, 2021
2 parents 949e4f8 + 315e5c7 commit b89ffd0
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 34 deletions.
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ For now docs are [here](docs/src/main/mdoc/docs/main.md), we'll have a site in t

## Version compatibility

| This library | Doobie | Quill |
| ------------ | --------- | ----- |
| 0.0.1 | 1.0.0-RC1 | 3.8.0 |
| This library | Doobie | Quill |
| ------------ | --------- | ------ |
| 0.0.1 | 1.0.0-RC1 | 3.8.0 |
| 0.0.2 | 1.0.0-RC1 | 3.11.0 |

## Migrating from original integration

Expand Down
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,10 @@ val root = project
.settings(
name := "doobie-quill",
libraryDependencies ++= Seq(
"io.getquill" %% "quill-jdbc" % "3.8.0",
"io.getquill" %% "quill-jdbc" % "3.11.0",
"org.tpolecat" %% "doobie-core" % "1.0.0-RC1",
"org.tpolecat" %% "doobie-postgres" % "1.0.0-RC1" % Test,
"org.scalameta" %% "munit" % "0.7.29" % Test,
) ++ compilerPlugins,
Test / fork := true
)
116 changes: 86 additions & 30 deletions src/main/scala/org/polyvariant/doobiequill/DoobieContextBase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import fs2.Stream
import io.getquill.NamingStrategy
import io.getquill.context.sql.idiom.SqlIdiom
import io.getquill.context.StreamingContext
import io.getquill.context.ExecutionInfo
import java.sql.Connection
import scala.annotation.nowarn
import scala.util.Success
import scala.util.Try
import doobie.enumerated.AutoGeneratedKeys
Expand Down Expand Up @@ -38,56 +40,84 @@ trait DoobieContextBase[Dialect <: SqlIdiom, Naming <: NamingStrategy]
// to log.underlying below.
private val log: ContextLogger = new ContextLogger("DoobieContext")

private def useConnection[A](f: Connection => PreparedStatementIO[A]): PreparedStatementIO[A] =
FPS.getConnection.flatMap(f)

private def prepareAndLog(
sql: String,
p: Prepare,
): PreparedStatementIO[Unit] = FPS.raw(p).flatMap { case (params, _) =>
)(
implicit connection: Connection
): PreparedStatementIO[Unit] = FPS.raw(p(_, connection)).flatMap { case (params, _) =>
FPS.delay(log.logQuery(sql, params))
}

override def executeQuery[A](
sql: String,
prepare: Prepare = identityPrepare,
extractor: Extractor[A] = identityExtractor,
)(
info: ExecutionInfo,
dc: DatasourceContext,
): ConnectionIO[List[A]] =
HC.prepareStatement(sql) {
prepareAndLog(sql, prepare) *>
HPS.executeQuery {
HRS.list(extractor)
}
useConnection { implicit connection =>
prepareAndLog(sql, prepare) *>
HPS.executeQuery {
HRS.list(extractor)
}
}
}

override def executeQuerySingle[A](
sql: String,
prepare: Prepare = identityPrepare,
extractor: Extractor[A] = identityExtractor,
)(
info: ExecutionInfo,
dc: DatasourceContext,
): ConnectionIO[A] =
HC.prepareStatement(sql) {
prepareAndLog(sql, prepare) *>
HPS.executeQuery {
HRS.getUnique(extractor)
}
useConnection { implicit connection =>
prepareAndLog(sql, prepare) *>
HPS.executeQuery {
HRS.getUnique(extractor)
}
}
}

@nowarn("msg=is never used")
def streamQuery[A](
fetchSize: Option[Int],
sql: String,
prepare: Prepare = identityPrepare,
extractor: Extractor[A] = identityExtractor,
)(
info: ExecutionInfo,
dc: DatasourceContext,
): Stream[ConnectionIO, A] =
HC.stream(
sql,
prepareAndLog(sql, prepare),
fetchSize.getOrElse(DefaultChunkSize),
)(extractor)
for {
connection <- Stream.eval(FC.raw(identity))
result <-
HC.stream(
sql,
prepareAndLog(sql, prepare)(connection),
fetchSize.getOrElse(DefaultChunkSize),
)(extractorToRead(extractor)(connection))
} yield result

override def executeAction[A](
sql: String,
prepare: Prepare = identityPrepare,
)(
info: ExecutionInfo,
dc: DatasourceContext,
): ConnectionIO[Long] =
HC.prepareStatement(sql) {
prepareAndLog(sql, prepare) *>
HPS.executeUpdate.map(_.toLong)
useConnection { implicit connection =>
prepareAndLog(sql, prepare) *>
HPS.executeUpdate.map(_.toLong)
}
}

private def prepareConnections[A](returningBehavior: ReturnAction) =
Expand All @@ -103,42 +133,68 @@ trait DoobieContextBase[Dialect <: SqlIdiom, Naming <: NamingStrategy]
prepare: Prepare = identityPrepare,
extractor: Extractor[A],
returningBehavior: ReturnAction,
)(
info: ExecutionInfo,
dc: DatasourceContext,
): ConnectionIO[A] =
prepareConnections[A](returningBehavior)(sql) {
prepareAndLog(sql, prepare) *>
FPS.executeUpdate *>
HPS.getGeneratedKeys(HRS.getUnique(extractor))
useConnection { implicit connection =>
prepareAndLog(sql, prepare) *>
FPS.executeUpdate *>
HPS.getGeneratedKeys(HRS.getUnique(extractor))
}
}

private def prepareBatchAndLog(sql: String, p: Prepare): PreparedStatementIO[Unit] =
FPS.raw(p) flatMap { case (params, _) => FPS.delay(log.logBatchItem(sql, params)) }
private def prepareBatchAndLog(
sql: String,
p: Prepare,
)(
implicit connection: Connection
): PreparedStatementIO[Unit] =
FPS.raw(p(_, connection)) flatMap { case (params, _) =>
FPS.delay(log.logBatchItem(sql, params))
}

override def executeBatchAction(
groups: List[BatchGroup]
)(
info: ExecutionInfo,
dc: DatasourceContext,
): ConnectionIO[List[Long]] = groups.flatTraverse { case BatchGroup(sql, preps) =>
HC.prepareStatement(sql) {
FPS.delay(log.underlying.debug("Batch: {}", sql)) *>
preps.traverse(prepareBatchAndLog(sql, _) *> FPS.addBatch) *>
Nested(HPS.executeBatch).map(_.toLong).value
useConnection { implicit connection =>
FPS.delay(log.underlying.debug("Batch: {}", sql)) *>
preps.traverse(prepareBatchAndLog(sql, _) *> FPS.addBatch) *>
Nested(HPS.executeBatch).map(_.toLong).value
}
}
}

override def executeBatchActionReturning[A](
groups: List[BatchGroupReturning],
extractor: Extractor[A],
)(
info: ExecutionInfo,
dc: DatasourceContext,
): ConnectionIO[List[A]] = groups.flatTraverse {
case BatchGroupReturning(sql, returningBehavior, preps) =>
prepareConnections(returningBehavior)(sql) {
FPS.delay(log.underlying.debug("Batch: {}", sql)) *>
preps.traverse(prepareBatchAndLog(sql, _) *> FPS.addBatch) *>
HPS.executeBatch *>
HPS.getGeneratedKeys(HRS.list(extractor))

useConnection { implicit connection =>
FPS.delay(log.underlying.debug("Batch: {}", sql)) *>
preps.traverse(prepareBatchAndLog(sql, _) *> FPS.addBatch) *>
HPS.executeBatch *>
HPS.getGeneratedKeys(HRS.list(extractor))
}
}
}

// Turn an extractor into a `Read` so we can use the existing resultset.
private implicit def extractorToRead[A](ex: Extractor[A]): Read[A] =
new Read[A](Nil, (rs, _) => ex(rs))
private implicit def extractorToRead[A](
ex: Extractor[A]
)(
implicit connection: Connection
): Read[A] = new Read[A](Nil, (rs, _) => ex(rs, connection))

// Nothing to do here.
override def close(): Unit = ()
Expand Down

0 comments on commit b89ffd0

Please sign in to comment.