Skip to content

Commit 9fb898d

Browse files
Remove Old Mutex (#662)
1 parent a1aa18c commit 9fb898d

File tree

11 files changed

+26
-42
lines changed

11 files changed

+26
-42
lines changed

.changeset/cuddly-dingos-check.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
---
2+
'@powersync/node': patch
3+
---
4+
Fixed an issue where `readLock` and `writeLock` calls were unnecessarily serialized due to a shared mutex. This did not affect individual calls to `get`, `getAll`, or `getOptional`.

.changeset/short-countries-swim.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/common': patch
3+
---
4+
5+
[Internal] Removed shared mutex implementation of `readLock` and `writeLock`.

packages/common/src/client/AbstractPowerSyncDatabase.ts

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import { Schema } from '../db/schema/Schema.js';
1616
import { BaseObserver } from '../utils/BaseObserver.js';
1717
import { ControlledExecutor } from '../utils/ControlledExecutor.js';
1818
import { throttleTrailing } from '../utils/async.js';
19-
import { mutexRunExclusive } from '../utils/mutex.js';
2019
import { ConnectionManager } from './ConnectionManager.js';
2120
import { SQLOpenFactory, SQLOpenOptions, isDBAdapter, isSQLOpenFactory, isSQLOpenOptions } from './SQLOpenFactory.js';
2221
import { PowerSyncBackendConnector } from './connection/PowerSyncBackendConnector.js';
@@ -149,12 +148,6 @@ export const isPowerSyncDatabaseOptionsWithSettings = (test: any): test is Power
149148
};
150149

151150
export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDBListener> {
152-
/**
153-
* Transactions should be queued in the DBAdapter, but we also want to prevent
154-
* calls to `.execute` while an async transaction is running.
155-
*/
156-
protected static transactionMutex: Mutex = new Mutex();
157-
158151
/**
159152
* Returns true if the connection is closed.
160153
*/
@@ -678,8 +671,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
678671
* @returns The query result as an object with structured key-value pairs
679672
*/
680673
async execute(sql: string, parameters?: any[]) {
681-
await this.waitForReady();
682-
return this.database.execute(sql, parameters);
674+
return this.writeLock((tx) => tx.execute(sql, parameters));
683675
}
684676

685677
/**
@@ -753,7 +745,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
753745
*/
754746
async readLock<T>(callback: (db: DBAdapter) => Promise<T>) {
755747
await this.waitForReady();
756-
return mutexRunExclusive(AbstractPowerSyncDatabase.transactionMutex, () => callback(this.database));
748+
return this.database.readLock(callback);
757749
}
758750

759751
/**
@@ -762,10 +754,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
762754
*/
763755
async writeLock<T>(callback: (db: DBAdapter) => Promise<T>) {
764756
await this.waitForReady();
765-
return mutexRunExclusive(AbstractPowerSyncDatabase.transactionMutex, async () => {
766-
const res = await callback(this.database);
767-
return res;
768-
});
757+
return this.database.writeLock(callback);
769758
}
770759

771760
/**

packages/common/src/client/sync/bucket/SqliteBucketStorage.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
import { Mutex } from 'async-mutex';
21
import Logger, { ILogger } from 'js-logger';
3-
import { DBAdapter, Transaction, extractTableUpdates } from '../../../db/DBAdapter.js';
2+
import { DBAdapter, extractTableUpdates, Transaction } from '../../../db/DBAdapter.js';
43
import { BaseObserver } from '../../../utils/BaseObserver.js';
54
import { MAX_OP_ID } from '../../constants.js';
65
import {
@@ -26,7 +25,6 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
2625

2726
constructor(
2827
private db: DBAdapter,
29-
private mutex: Mutex,
3028
private logger: ILogger = Logger.get('SqliteBucketStorage')
3129
) {
3230
super();

packages/node/src/db/PowerSyncDatabase.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase {
6464
}
6565

6666
protected generateBucketStorageAdapter(): BucketStorageAdapter {
67-
return new SqliteBucketStorage(this.database, AbstractPowerSyncDatabase.transactionMutex);
67+
return new SqliteBucketStorage(this.database);
6868
}
6969

7070
connect(

packages/react-native/src/db/PowerSyncDatabase.ts

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ import {
99
} from '@powersync/common';
1010
import { ReactNativeRemote } from '../sync/stream/ReactNativeRemote';
1111
import { ReactNativeStreamingSyncImplementation } from '../sync/stream/ReactNativeStreamingSyncImplementation';
12-
import { ReactNativeQuickSqliteOpenFactory } from './adapters/react-native-quick-sqlite/ReactNativeQuickSQLiteOpenFactory';
1312
import { ReactNativeBucketStorageAdapter } from './../sync/bucket/ReactNativeBucketStorageAdapter';
13+
import { ReactNativeQuickSqliteOpenFactory } from './adapters/react-native-quick-sqlite/ReactNativeQuickSQLiteOpenFactory';
1414

1515
/**
1616
* A PowerSync database which provides SQLite functionality
@@ -39,7 +39,7 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase {
3939
}
4040

4141
protected generateBucketStorageAdapter(): BucketStorageAdapter {
42-
return new ReactNativeBucketStorageAdapter(this.database, AbstractPowerSyncDatabase.transactionMutex);
42+
return new ReactNativeBucketStorageAdapter(this.database);
4343
}
4444

4545
protected generateSyncStreamImplementation(
@@ -60,14 +60,4 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase {
6060
identifier: this.database.name
6161
});
6262
}
63-
64-
async readLock<T>(callback: (db: DBAdapter) => Promise<T>): Promise<T> {
65-
await this.waitForReady();
66-
return this.database.readLock(callback);
67-
}
68-
69-
async writeLock<T>(callback: (db: DBAdapter) => Promise<T>): Promise<T> {
70-
await this.waitForReady();
71-
return this.database.writeLock(callback);
72-
}
7363
}

packages/web/src/db/PowerSyncDatabase.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase {
172172
}
173173

174174
protected generateBucketStorageAdapter(): BucketStorageAdapter {
175-
return new SqliteBucketStorage(this.database, AbstractPowerSyncDatabase.transactionMutex);
175+
return new SqliteBucketStorage(this.database);
176176
}
177177

178178
protected async runExclusive<T>(cb: () => Promise<T>) {

packages/web/src/worker/sync/SharedSyncImplementation.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,7 @@ export class SharedSyncImplementation
371371
const syncParams = this.syncParams!;
372372
// Create a new StreamingSyncImplementation for each connect call. This is usually done is all SDKs.
373373
return new WebStreamingSyncImplementation({
374-
adapter: new SqliteBucketStorage(this.dbAdapter!, new Mutex(), this.logger),
374+
adapter: new SqliteBucketStorage(this.dbAdapter!, this.logger),
375375
remote: new WebRemote(
376376
{
377377
invalidateCredentials: async () => {

packages/web/tests/bucket_storage.test.ts

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import {
1111
SyncDataBucket
1212
} from '@powersync/common';
1313
import { PowerSyncDatabase, WASQLitePowerSyncDatabaseOpenFactory } from '@powersync/web';
14-
import { Mutex } from 'async-mutex';
1514
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
1615
import { testSchema } from './utils/testDb';
1716

@@ -73,7 +72,7 @@ describe('Bucket Storage', { sequential: true }, () => {
7372
schema: testSchema
7473
});
7574
await db.waitForReady();
76-
bucketStorage = new SqliteBucketStorage(db.database, new Mutex());
75+
bucketStorage = new SqliteBucketStorage(db.database);
7776
});
7877

7978
afterEach(async () => {
@@ -418,7 +417,7 @@ describe('Bucket Storage', { sequential: true }, () => {
418417

419418
let powersync = factory.getInstance();
420419
await powersync.waitForReady();
421-
bucketStorage = new SqliteBucketStorage(powersync.database, new Mutex());
420+
bucketStorage = new SqliteBucketStorage(powersync.database);
422421

423422
await bucketStorage.saveSyncData(
424423
new SyncDataBatch([new SyncDataBucket('bucket1', [putAsset1_1, putAsset2_2, putAsset1_3], false)])
@@ -462,7 +461,7 @@ describe('Bucket Storage', { sequential: true }, () => {
462461

463462
let powersync = factory.getInstance();
464463
await powersync.waitForReady();
465-
bucketStorage = new SqliteBucketStorage(powersync.database, new Mutex());
464+
bucketStorage = new SqliteBucketStorage(powersync.database);
466465

467466
await bucketStorage.saveSyncData(
468467
new SyncDataBatch([new SyncDataBucket('bucket1', [putAsset1_1, putAsset2_2, putAsset1_3], false)])

packages/web/tests/multiple_instances.test.ts

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import {
1010
SharedWebStreamingSyncImplementationOptions,
1111
WebRemote
1212
} from '@powersync/web';
13-
import { Mutex } from 'async-mutex';
1413

1514
import { beforeAll, describe, expect, it, vi } from 'vitest';
1615
import { WebDBAdapter } from '../src/db/adapters/WebDBAdapter';
@@ -126,7 +125,7 @@ describe('Multiple Instances', { sequential: true }, () => {
126125
// They need to use the same identifier to use the same shared worker.
127126
const identifier = 'streaming-sync-shared';
128127
const syncOptions1: SharedWebStreamingSyncImplementationOptions = {
129-
adapter: new SqliteBucketStorage(db.database, new Mutex()),
128+
adapter: new SqliteBucketStorage(db.database),
130129
remote: new WebRemote(connector1),
131130
uploadCrud: async () => {
132131
await connector1.uploadData(db);
@@ -140,7 +139,7 @@ describe('Multiple Instances', { sequential: true }, () => {
140139
// Generate the second streaming sync implementation
141140
const connector2 = new TestConnector();
142141
const syncOptions2: SharedWebStreamingSyncImplementationOptions = {
143-
adapter: new SqliteBucketStorage(db.database, new Mutex()),
142+
adapter: new SqliteBucketStorage(db.database),
144143
remote: new WebRemote(connector1),
145144
uploadCrud: async () => {
146145
await connector2.uploadData(db);
@@ -190,7 +189,7 @@ describe('Multiple Instances', { sequential: true }, () => {
190189

191190
// Create the first streaming client
192191
const stream1 = new SharedWebStreamingSyncImplementation({
193-
adapter: new SqliteBucketStorage(db.database, new Mutex()),
192+
adapter: new SqliteBucketStorage(db.database),
194193
remote: new WebRemote(connector1),
195194
uploadCrud: async () => {
196195
triggerUpload1();
@@ -216,7 +215,7 @@ describe('Multiple Instances', { sequential: true }, () => {
216215
});
217216

218217
const stream2 = new SharedWebStreamingSyncImplementation({
219-
adapter: new SqliteBucketStorage(db.database, new Mutex()),
218+
adapter: new SqliteBucketStorage(db.database),
220219
remote: new WebRemote(connector1),
221220
uploadCrud: async () => {
222221
triggerUpload2();

0 commit comments

Comments
 (0)