Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
type MockedFunction,
vi,
} from 'vitest';
import {resolver} from '@rocicorp/resolver';
import {createSilentLogContext} from '../../../../shared/src/logging-test-utils.ts';
import type {Queue} from '../../../../shared/src/queue.ts';
import {sleep} from '../../../../shared/src/sleep.ts';
Expand Down Expand Up @@ -43,7 +44,7 @@ import type {ReplicaState} from '../replicator/replicator.ts';
import {updateReplicationWatermark} from '../replicator/schema/replication-state.ts';
import {type FakeReplicator} from '../replicator/test-utils.ts';
import {CVRStore} from './cvr-store.ts';
import {CVRQueryDrivenUpdater} from './cvr.ts';
import {CVRQueryDrivenUpdater, CVRUpdater} from './cvr.ts';
import type {DrainCoordinator} from './drain-coordinator.ts';
import {ttlClockFromNumber} from './ttl-clock.ts';
import {
Expand Down Expand Up @@ -71,9 +72,15 @@ import {
USERS_QUERY,
} from './view-syncer-test-util.ts';
import type {ViewSyncerService} from './view-syncer.ts';
import {ClientHandler} from './client-handler.ts';
import {PipelineDriver} from './pipeline-driver.ts';
import {type SyncContext} from './view-syncer.ts';

describe('view-syncer/service', () => {
afterEach(() => {
vi.restoreAllMocks();
});

let storageDB: Database;
let replicaDbFile: DbFile;
let replica: Database;
Expand Down Expand Up @@ -4893,4 +4900,61 @@ describe('view-syncer/service', () => {
`[ProtocolError: Reconnect required]`,
);
});

test('stop waits for in-flight changeDesiredQueries', async () => {
// Bring pipelines into the synced state so changeDesiredQueries hits
// #syncQueryPipelineSet and currentPermissions.
const client = connect(SYNC_CONTEXT, [
{op: 'put', hash: 'query-hash1', ast: ISSUES_QUERY},
]);
await nextPoke(client);

stateChanges.push({state: 'version-ready'});
await nextPoke(client);

// Gate the CVR flush so we can stop while a config update is in-flight.
const {promise: flushStarted, resolve: signalFlushStarted} =
resolver<void>();
const allowFlush = resolver<void>();
const originalFlush = CVRUpdater.prototype.flush;
vi.spyOn(CVRUpdater.prototype, 'flush').mockImplementation(async function (
this: CVRUpdater,
...args: Parameters<CVRUpdater['flush']>
) {
signalFlushStarted();
await allowFlush.promise;
return originalFlush.apply(this, args);
});
const failSpy = vi.spyOn(ClientHandler.prototype, 'fail');
let flushReleased = false;
let destroyCalledAfterRelease = false;
const originalDestroy = PipelineDriver.prototype.destroy;
const destroySpy = vi
.spyOn(PipelineDriver.prototype, 'destroy')
.mockImplementation(function (this: PipelineDriver) {
destroyCalledAfterRelease = flushReleased;
return originalDestroy.call(this);
});

// Start the config update; it will block on the flush gate.
const changePromise = vs.changeDesiredQueries(SYNC_CONTEXT, [
'changeDesiredQueries',
{
desiredQueriesPatch: [
{op: 'put', hash: 'query-hash2', ast: USERS_QUERY},
],
},
]);

await flushStarted;
const stopPromise = vs.stop();
flushReleased = true;
allowFlush.resolve();
await Promise.all([stopPromise, viewSyncerDone, changePromise]);

// The in-flight update should finish without failing the client.
expect(failSpy).not.toHaveBeenCalled();
expect(destroySpy).toHaveBeenCalled();
expect(destroyCalledAfterRelease).toBe(true);
});
});
12 changes: 8 additions & 4 deletions packages/zero-cache/src/services/view-syncer/view-syncer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -498,13 +498,13 @@ export class ViewSyncerService implements ViewSyncer, ActivityBasedService {
if (this.#drainCoordinator.shouldDrain()) {
this.#drainCoordinator.drainNextIn(this.#totalHydrationTimeMs());
}
this.#cleanup();
await this.#cleanup();
} catch (e) {
this.#lc[getLogLevel(e)]?.(
`stopping view-syncer ${this.id}: ${String(e)}`,
e,
);
this.#cleanup(e);
await this.#cleanup(e);
} finally {
// Always wait for the cvrStore to flush, regardless of how the service
// was stopped.
Expand Down Expand Up @@ -2015,18 +2015,22 @@ export class ViewSyncerService implements ViewSyncer, ActivityBasedService {
return this.#stopped.promise;
}

#cleanup(err?: unknown) {
async #cleanup(err?: unknown) {
this.#stopTTLClockInterval();
this.#stopExpireTimer();

this.#pipelines.destroy();
for (const client of this.#clients.values()) {
if (err) {
client.fail(err);
} else {
client.close(`closed clientGroupID=${this.id}`);
}
}

// Wait for existing lock logic to complete before
// cleaning up the pipelines and closing db connections.
await this.#lock.withLock(() => {});
this.#pipelines.destroy();
}

/**
Expand Down
Loading