Skip to content

Commit 6e0fb4f

Browse files
authored
Fix materializer hash mismatch by making ExecutionCompleted/Cancelled materializers deterministic (#39)
- Add cellId to ExecutionCompleted and ExecutionCancelled event schemas - Remove ctx.query() calls from materializers to make them pure functions - Update all event commits to include cellId in the payload - This fixes LiveStore.UnexpectedError about materializer hash mismatch The issue was that materializers were using ctx.query() to look up cellId during materialization, making them non-deterministic. LiveStore requires materializers to be pure functions without side effects, with all necessary data passed via the event payload.
1 parent 0520ec5 commit 6e0fb4f

File tree

6 files changed

+40
-48
lines changed

6 files changed

+40
-48
lines changed

packages/dev-server-kernel-ls-client/src/kernel-adapter.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -565,6 +565,7 @@ async function processExecution(queueEntry: any) {
565565
const hasErrors = outputs.some(o => o.type === "error");
566566
store.commit(events.executionCompleted({
567567
queueId: queueEntry.id,
568+
cellId: queueEntry.cellId,
568569
status: hasErrors ? "error" : "success",
569570
error: hasErrors ? "Execution completed with errors" : undefined,
570571
}));
@@ -580,6 +581,7 @@ async function processExecution(queueEntry: any) {
580581
try {
581582
store.commit(events.executionCompleted({
582583
queueId: queueEntry.id,
584+
cellId: queueEntry.cellId,
583585
status: "error",
584586
error: error instanceof Error ? error.message : String(error),
585587
}));
@@ -624,6 +626,7 @@ assignedWorkSubscription = store.subscribe(assignedWorkQuery$ as any, {
624626
try {
625627
store.commit(events.executionCompleted({
626628
queueId: queueEntry.id,
629+
cellId: queueEntry.cellId,
627630
status: "error",
628631
error: error instanceof Error ? error.message : String(error),
629632
}));

packages/dev-server-kernel-ls-client/test/kernel-adapter.test.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,7 @@ describe('Kernel Adapter', () => {
206206
// Complete execution
207207
store.commit(events.executionCompleted({
208208
queueId,
209+
cellId,
209210
status: 'success',
210211
}))
211212

@@ -384,6 +385,7 @@ describe('Kernel Adapter', () => {
384385
if (i % 2 === 0) {
385386
store.commit(events.executionCompleted({
386387
queueId,
388+
cellId: `cell-${i}`,
387389
status: 'success'
388390
}))
389391
}

shared/schema.ts

Lines changed: 30 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,7 @@ export const events = {
288288
name: "v1.ExecutionCompleted",
289289
schema: Schema.Struct({
290290
queueId: Schema.String,
291+
cellId: Schema.String,
291292
status: Schema.Literal("success", "error", "cancelled"),
292293
error: Schema.optional(Schema.String),
293294
}),
@@ -297,6 +298,7 @@ export const events = {
297298
name: "v1.ExecutionCancelled",
298299
schema: Schema.Struct({
299300
queueId: Schema.String,
301+
cellId: Schema.String,
300302
cancelledBy: Schema.String,
301303
reason: Schema.String,
302304
}),
@@ -491,55 +493,35 @@ const materializers = State.SQLite.materializers(events, {
491493
];
492494
},
493495

494-
"v1.ExecutionCompleted": ({ queueId, status }, ctx) => {
495-
// Get the queue entry to find the cell ID
496-
const queueEntries = ctx.query(
497-
tables.executionQueue.select().where({ id: queueId }).limit(1),
498-
);
499-
if (queueEntries.length === 0) return [];
500-
501-
const queueEntry = queueEntries[0] as any;
502-
503-
return [
504-
// Update execution queue
505-
tables.executionQueue
506-
.update({
507-
status: status === "success" ? "completed" : "failed",
508-
})
509-
.where({ id: queueId }),
510-
// Update cell execution state
511-
tables.cells
512-
.update({
513-
executionState: status === "success" ? "completed" : "error",
514-
})
515-
.where({ id: queueEntry.cellId }),
516-
];
517-
},
518-
519-
"v1.ExecutionCancelled": ({ queueId }, ctx) => {
520-
// Get the queue entry to find the cell ID
521-
const queueEntries = ctx.query(
522-
tables.executionQueue.select().where({ id: queueId }).limit(1),
523-
);
524-
if (queueEntries.length === 0) return [];
525-
526-
const queueEntry = queueEntries[0] as any;
496+
"v1.ExecutionCompleted": ({ queueId, cellId, status }) => [
497+
// Update execution queue
498+
tables.executionQueue
499+
.update({
500+
status: status === "success" ? "completed" : "failed",
501+
})
502+
.where({ id: queueId }),
503+
// Update cell execution state
504+
tables.cells
505+
.update({
506+
executionState: status === "success" ? "completed" : "error",
507+
})
508+
.where({ id: cellId }),
509+
],
527510

528-
return [
529-
// Update execution queue
530-
tables.executionQueue
531-
.update({
532-
status: "cancelled",
533-
})
534-
.where({ id: queueId }),
535-
// Update cell execution state
536-
tables.cells
537-
.update({
538-
executionState: "idle",
539-
})
540-
.where({ id: queueEntry.cellId }),
541-
];
542-
},
511+
"v1.ExecutionCancelled": ({ queueId, cellId }) => [
512+
// Update execution queue
513+
tables.executionQueue
514+
.update({
515+
status: "cancelled",
516+
})
517+
.where({ id: queueId }),
518+
// Update cell execution state
519+
tables.cells
520+
.update({
521+
executionState: "idle",
522+
})
523+
.where({ id: cellId }),
524+
],
543525

544526
// Output materializers
545527
"v1.CellOutputAdded": ({

test/fixtures/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ export const mockEvents = {
120120
name: "v1.ExecutionCompleted",
121121
args: {
122122
queueId: mockExecutionQueueEntry.id,
123+
cellId: mockCellData.id,
123124
status: "success" as const,
124125
error: null,
125126
},

test/integration/execution-flow.test.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ describe.skip("End-to-End Execution Flow", () => {
185185
store.commit(
186186
events.executionCompleted({
187187
queueId,
188+
cellId,
188189
status: "success",
189190
}),
190191
);
@@ -303,6 +304,7 @@ describe.skip("End-to-End Execution Flow", () => {
303304
store.commit(
304305
events.executionCompleted({
305306
queueId,
307+
cellId,
306308
status: "error",
307309
error: "ValueError: Test error",
308310
}),
@@ -423,6 +425,7 @@ describe.skip("End-to-End Execution Flow", () => {
423425
store.commit(
424426
events.executionCompleted({
425427
queueId,
428+
cellId: cellIds[i],
426429
status: "success",
427430
}),
428431
);

test/integration/reactivity-debugging.test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ describe("Reactivity Debugging", () => {
168168
store.commit(
169169
events.executionCompleted({
170170
queueId: "multi-sub-queue",
171+
cellId: "multi-sub-cell",
171172
status: "success",
172173
}),
173174
);

0 commit comments

Comments
 (0)