Skip to content

Commit

Permalink
Merge pull request #33 from cortexclick/planetscale_client
Browse files Browse the repository at this point in the history
Use Planetscale Client instead of raw Connection, to enable concurrent queries.
  • Loading branch information
jacobwgillespie authored Aug 15, 2024
2 parents fb1baf0 + d9a4e78 commit c9c08d3
Showing 1 changed file with 32 additions and 24 deletions.
56 changes: 32 additions & 24 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {cast, Config, connect, Connection, Field} from '@planetscale/database'
import {Client, Config, Connection, Field, cast} from '@planetscale/database'
import {parseJSON} from 'date-fns'
import {
CompiledQuery,
Expand Down Expand Up @@ -71,22 +71,22 @@ export class PlanetScaleDialect implements Dialect {
return new MysqlQueryCompiler()
}

createIntrospector(db: Kysely<any>): DatabaseIntrospector {
createIntrospector(db: Kysely<unknown>): DatabaseIntrospector {
return new MysqlIntrospector(db)
}
}

class PlanetScaleDriver implements Driver {
#config: PlanetScaleDialectConfig
#client: Client

constructor(config: PlanetScaleDialectConfig) {
this.#config = config
this.#client = new Client({cast: inflateDates, ...config})
}

async init(): Promise<void> {}

async acquireConnection(): Promise<DatabaseConnection> {
return new PlanetScaleConnection(this.#config)
return new PlanetScaleConnection(this.#client)
}

async beginTransaction(conn: PlanetScaleConnection): Promise<void> {
Expand All @@ -109,27 +109,35 @@ class PlanetScaleDriver implements Driver {
const sharedConnections = new WeakMap<PlanetScaleDialectConfig, Connection>()

class PlanetScaleConnection implements DatabaseConnection {
#config: PlanetScaleDialectConfig
#conn: Connection
#transactionClient?: PlanetScaleConnection
#client: Client
#transactionConn?: Connection
#useSharedConnection: boolean

constructor(config: PlanetScaleDialectConfig, isForTransaction = false) {
this.#config = config
const useSharedConnection = config.useSharedConnection && !isForTransaction
const sharedConnection = useSharedConnection ? sharedConnections.get(config) : undefined
this.#conn = sharedConnection ?? connect({cast: inflateDates, ...config})
if (useSharedConnection) sharedConnections.set(config, this.#conn)
get #config(): Config {
return this.#client.config
}

constructor(client: Client, useSharedConnection = false, isForTransaction = false) {
this.#client = client
this.#useSharedConnection = useSharedConnection && !isForTransaction
if (this.#useSharedConnection) sharedConnections.set(this.#config, sharedConnections.get(this.#config) ?? this.#client.connection())
}

async executeQuery<O>(compiledQuery: CompiledQuery): Promise<QueryResult<O>> {
if (this.#transactionClient) return this.#transactionClient.executeQuery(compiledQuery)
if (this.#transactionConn) return this.execute(compiledQuery, this.#transactionConn)

return this.#useSharedConnection
? this.execute(compiledQuery, sharedConnections.get(this.#config) || this.#client)
: this.execute(compiledQuery, this.#client)
}

private async execute<O>(compiledQuery: CompiledQuery, conn: Pick<Connection, 'execute'>): Promise<QueryResult<O>> {
// If no custom formatter is provided, format dates as DB date strings
const parameters = this.#config.format
? compiledQuery.parameters
: compiledQuery.parameters.map((param) => (param instanceof Date ? formatDate(param) : param))

const results = await this.#conn.execute(compiledQuery.sql, parameters)
const results = await conn.execute(compiledQuery.sql, parameters)

// @planetscale/database versions older than 1.3.0 return errors directly, rather than throwing
if ((results as any).error) {
Expand All @@ -150,25 +158,25 @@ class PlanetScaleConnection implements DatabaseConnection {
}

async beginTransaction() {
this.#transactionClient = this.#transactionClient ?? new PlanetScaleConnection(this.#config, true)
await this.#transactionClient.#conn.execute('BEGIN')
this.#transactionConn = this.#transactionConn ?? this.#client.connection()
await this.#transactionConn.execute('BEGIN')
}

async commitTransaction() {
if (!this.#transactionClient) throw new Error('No transaction to commit')
if (!this.#transactionConn) throw new Error('No transaction to commit')
try {
await this.#transactionClient.#conn.execute('COMMIT')
await this.#transactionConn.execute('COMMIT')
} finally {
this.#transactionClient = undefined
this.#transactionConn = undefined
}
}

async rollbackTransaction() {
if (!this.#transactionClient) throw new Error('No transaction to rollback')
if (!this.#transactionConn) throw new Error('No transaction to rollback')
try {
await this.#transactionClient.#conn.execute('ROLLBACK')
await this.#transactionConn.execute('ROLLBACK')
} finally {
this.#transactionClient = undefined
this.#transactionConn = undefined
}
}

Expand Down

0 comments on commit c9c08d3

Please sign in to comment.