Skip to content

Commit

Permalink
feat(zero-cache): time-slice IVM for fairness and responsiveness (#3651)
Browse files Browse the repository at this point in the history
  • Loading branch information
darkgnotic authored Jan 29, 2025
1 parent bec7870 commit 698a837
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 2 deletions.
18 changes: 18 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions packages/zero-cache/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
"fastify": "^5.0.0",
"jose": "^5.9.3",
"json-custom-numbers": "^3.1.1",
"magic-stopwatch": "^1.0.1",
"nanoid": "^5.0.8",
"pg": "^8.11.3",
"pg-format": "npm:pg-format-fix@^1.0.5",
Expand Down
33 changes: 31 additions & 2 deletions packages/zero-cache/src/services/view-syncer/view-syncer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {Lock} from '@rocicorp/lock';
import type {LogContext} from '@rocicorp/logger';
import {resolver} from '@rocicorp/resolver';
import type {JWTPayload} from 'jose';
import {Stopwatch} from 'magic-stopwatch';
import type {Row} from 'postgres';
import {
manualSpan,
Expand Down Expand Up @@ -728,7 +729,7 @@ export class ViewSyncerService implements ViewSyncer, ActivityBasedService {
}
// #processChanges does batched de-duping of rows. Wrap all pipelines in
// a single generator in order to maximize de-duping.
await this.#processChanges(
const processTime = await this.#processChanges(
lc,
generateRowChanges(),
updater,
Expand All @@ -755,7 +756,10 @@ export class ViewSyncerService implements ViewSyncer, ActivityBasedService {
// Signal clients to commit.
pokers.forEach(poker => poker.end());

lc.info?.(`finished processing queries (${Date.now() - start} ms)`);
const wallTime = Date.now() - start;
lc.info?.(
`finished processing queries (process: ${processTime} ms, wall: ${wallTime} ms)`,
);
});
}

Expand Down Expand Up @@ -858,6 +862,7 @@ export class ViewSyncerService implements ViewSyncer, ActivityBasedService {
});
}

/** Returns the time spent processing rows (i.e. excludes yielded time) */
#processChanges(
lc: LogContext,
changes: Iterable<RowChange>,
Expand All @@ -866,6 +871,7 @@ export class ViewSyncerService implements ViewSyncer, ActivityBasedService {
transformationHashToHash: Map<string, string>,
) {
return startAsyncSpan(tracer, 'vs.#processChanges', async () => {
const stopwatch = new Stopwatch({type: 'date'});
const start = Date.now();
const rows = new CustomKeyMap<RowID, RowUpdate>(rowIDString);
let total = 0;
Expand Down Expand Up @@ -937,12 +943,25 @@ export class ViewSyncerService implements ViewSyncer, ActivityBasedService {
if (rows.size % CURSOR_PAGE_SIZE === 0) {
await processBatch();
}

if (rows.size % TIME_SLICE_CHECK_SIZE === 0) {
const {elapsed} = stopwatch;
if (elapsed > TIME_SLICE_MS) {
lc.debug?.(`yielding at ${rows.size} rows (${elapsed} ms)`);
stopwatch.stop(RECORD_LAP);
await yieldProcess();
stopwatch.start();
}
}
}
if (rows.size) {
await processBatch();
}
span.setAttribute('totalRows', total);
});

stopwatch.stop(RECORD_LAP);
return stopwatch.laps.reduce((total, lap) => total + lap.elapsed, 0);
});
}

Expand Down Expand Up @@ -1036,7 +1055,17 @@ export class ViewSyncerService implements ViewSyncer, ActivityBasedService {
}
}

// Update CVR after every 10000 rows.
const CURSOR_PAGE_SIZE = 10000;
// Check the elapsed time every 500 rows.
const TIME_SLICE_CHECK_SIZE = 500;
// Yield the process after churning for > 500ms.
const TIME_SLICE_MS = 500;
const RECORD_LAP = true; // Readability for Stopwatch.stop(recordlap: boolean);

function yieldProcess() {
return new Promise(resolve => setTimeout(resolve, 0));
}

function contentsAndVersion(row: Row) {
const {[ZERO_VERSION_COLUMN_NAME]: version, ...contents} = row;
Expand Down
1 change: 1 addition & 0 deletions packages/zero/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
"js-xxhash": "^4.0.0",
"json-custom-numbers": "^3.1.1",
"kasi": "^1.1.0",
"magic-stopwatch": "^1.0.1",
"nanoid": "^5.0.8",
"pg": "^8.11.3",
"pg-format": "npm:pg-format-fix@^1.0.5",
Expand Down

0 comments on commit 698a837

Please sign in to comment.