Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Serialize and compress transactions #1607

Merged
merged 27 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
4d4d78e
Start to serialize transaction operations in core, client, multiplaye…
ddimaria Jul 24, 2024
fc69317
Prefer transaction compression
ddimaria Jul 25, 2024
eb29d18
Successfully handle compressed tranactions in core, client, multiplay…
ddimaria Jul 26, 2024
9ae1c75
Merge remote-tracking branch 'origin/qa' into serialize-and-compress-…
ddimaria Jul 26, 2024
0de38e1
Fix lints
ddimaria Jul 26, 2024
8053594
Merge remote-tracking branch 'origin/bincode' into serialize-and-comp…
ddimaria Jul 29, 2024
93b2114
Cleanup
ddimaria Jul 29, 2024
b710b3d
Avoid grid copy on postMessage
ddimaria Jul 29, 2024
17174b1
Add/remove headers for binary transactions
ddimaria Jul 29, 2024
3d19541
Fix typo
HactarCE Jul 30, 2024
ce8f454
Merge branch 'qa' into serialize-and-compress-transactions
ddimaria Aug 2, 2024
f1d868e
Address PR comments
ddimaria Aug 2, 2024
aaeb9e8
Merge branch 'qa' into serialize-and-compress-transactions
davidkircos Aug 6, 2024
b5d42a1
Merge branch 'qa' into serialize-and-compress-transactions
ddimaria Aug 7, 2024
7cc6ced
Delete Vite temp config file: quadratic-client/vite.config.js.timestamp
ddimaria Aug 7, 2024
2e9b8a1
Merge remote-tracking branch 'origin/qa' into serialize-and-compress-…
ddimaria Aug 7, 2024
89f5575
Fix bad merge
ddimaria Aug 7, 2024
08c3903
Import transactions individually for optimal performance
ddimaria Aug 8, 2024
830ec19
Add TODO comments
ddimaria Aug 8, 2024
9a0835b
Fix lints
ddimaria Aug 8, 2024
75da846
Merge remote-tracking branch 'origin/qa' into serialize-and-compress-…
ddimaria Aug 8, 2024
cdb44b1
Merge branch 'qa' into serialize-and-compress-transactions
AyushAgrawal-A2 Aug 10, 2024
0a1d477
Fix grid download issue
ddimaria Aug 13, 2024
83dca83
Merge remote-tracking branch 'origin/qa' into serialize-and-compress-…
ddimaria Aug 13, 2024
ef44229
Allow large file imports
ddimaria Aug 13, 2024
5692aa9
Remove base64-serde to skip extra encoding step
ddimaria Aug 14, 2024
19d96a1
Remove extra message serializations when broadcasting in mp
ddimaria Aug 14, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion quadratic-client/src/app/quadratic-core-types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ export type Axis = "X" | "Y";
export interface Instant { seconds: number, }
export interface Duration { years: number, months: number, seconds: number, }
export interface RunError { span: Span | null, msg: RunErrorMsg, }
export type RunErrorMsg = { "PythonError": string } | "Spill" | { "Unimplemented": string } | "UnknownError" | { "InternalError": string } | { "Unterminated": string } | { "Expected": { expected: string, got: string | null, } } | { "Unexpected": string } | { "TooManyArguments": { func_name: string, max_arg_count: number, } } | { "MissingRequiredArgument": { func_name: string, arg_name: string, } } | "BadFunctionName" | "BadCellReference" | "BadNumber" | { "ExactArraySizeMismatch": { expected: ArraySize, got: ArraySize, } } | { "ExactArrayAxisMismatch": { axis: Axis, expected: number, got: number, } } | { "ArrayAxisMismatch": { axis: Axis, expected: number, got: number, } } | "EmptyArray" | "NonRectangularArray" | "NonLinearArray" | "ArrayTooBig" | "CircularReference" | "Overflow" | "DivideByZero" | "NegativeExponent" | "NotANumber" | "Infinity" | "IndexOutOfBounds" | "NoMatch" | "InvalidArgument";
export type RunErrorMsg = { "PythonError": string } | "Spill" | { "Unimplemented": string } | "UnknownError" | { "InternalError": string } | { "Unterminated": string } | { "Expected": { expected: string, got: string | null, } } | { "Unexpected": string } | { "TooManyArguments": { func_name: string, max_arg_count: number, } } | { "MissingRequiredArgument": { func_name: string, arg_name: string, } } | "BadFunctionName" | "BadCellReference" | "BadNumber" | "NaN" | { "ExactArraySizeMismatch": { expected: ArraySize, got: ArraySize, } } | { "ExactArrayAxisMismatch": { axis: Axis, expected: number, got: number, } } | { "ArrayAxisMismatch": { axis: Axis, expected: number, got: number, } } | "EmptyArray" | "NonRectangularArray" | "NonLinearArray" | "ArrayTooBig" | "CircularReference" | "Overflow" | "DivideByZero" | "NegativeExponent" | "NotANumber" | "Infinity" | "IndexOutOfBounds" | "NoMatch" | "InvalidArgument";
export interface Pos { x: bigint, y: bigint, }
export interface Rect { min: Pos, max: Pos, }
export interface Span { start: number, end: number, }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ export interface MultiplayerCoreSequenceNum {

export interface CoreMultiplayerTransaction {
type: 'coreMultiplayerTransaction';
operations: string;
operations: ArrayBuffer;
transaction_id: string;
}

export interface MultiplayerCoreReceiveTransactions {
type: 'multiplayerCoreReceiveTransactions';
transactions: string;
transactions: ReceiveTransaction[];
}

export interface MultiplayerCoreReceiveTransaction {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ export interface ReceiveTransaction {
type: 'Transaction';
id: string;
file_id: string;
operations: string;
operations: string | Buffer;
sequence_num: number;
}

Expand All @@ -111,7 +111,7 @@ export interface SendGetTransactions {

export interface ReceiveTransactions {
type: 'Transactions';
transactions: string;
transactions: ReceiveTransaction[];
}

export interface Heartbeat {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import { debugWebWorkersMessages } from '@/app/debugFlags';
import { CoreMultiplayerMessage, MultiplayerCoreMessage } from '../multiplayerCoreMessages';
import { ReceiveTransaction } from '../multiplayerTypes';
import { ReceiveTransaction, ReceiveTransactions } from '../multiplayerTypes';
import { multiplayerServer } from './multiplayerServer';

class MultiplayerCore {
Expand Down Expand Up @@ -51,10 +51,10 @@ class MultiplayerCore {
});
}

receiveTransactions(transactions: string) {
receiveTransactions(receive_transactions: ReceiveTransactions) {
this.send({
type: 'multiplayerCoreReceiveTransactions',
transactions,
transactions: receive_transactions.transactions,
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import { User } from '@auth0/auth0-spa-js';
import * as Sentry from '@sentry/react';
import { Buffer } from 'buffer';
import sharedConstants from '../../../../../../updateAlertVersion.json';
import { debugShow, debugShowMultiplayer } from '../../../debugFlags';
import { ClientMultiplayerInit, MultiplayerState } from '../multiplayerClientMessages';
Expand Down Expand Up @@ -244,7 +245,7 @@ export class MultiplayerServer {
break;

case 'Transactions':
multiplayerCore.receiveTransactions(data.transactions);
multiplayerCore.receiveTransactions(data);
break;

case 'EnterRoom':
Expand Down Expand Up @@ -304,7 +305,7 @@ export class MultiplayerServer {
id: transactionMessage.transaction_id,
session_id: this.sessionId!,
file_id: this.fileId!,
operations: transactionMessage.operations,
operations: Buffer.from(transactionMessage.operations).toString('base64'),
};
this.send(message);
}
Expand Down
29 changes: 26 additions & 3 deletions quadratic-client/src/app/web-workers/quadraticCore/worker/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@ import {
SummarizeSelectionResult,
} from '@/app/quadratic-core-types';
import initCore, { GridController } from '@/app/quadratic-core/quadratic_core';
import { MultiplayerCoreReceiveTransaction } from '@/app/web-workers/multiplayerWebWorker/multiplayerCoreMessages';
import {
MultiplayerCoreReceiveTransaction,
MultiplayerCoreReceiveTransactions,
} from '@/app/web-workers/multiplayerWebWorker/multiplayerCoreMessages';
import * as Sentry from '@sentry/react';
import { Buffer } from 'buffer';
import {
ClientCoreFindNextColumn,
ClientCoreFindNextRow,
Expand Down Expand Up @@ -286,6 +290,11 @@ class Core {
this.clientQueue.push(async () => {
if (!this.gridController) throw new Error('Expected gridController to be defined');
const data = message.transaction;

if (typeof data.operations === 'string') {
data.operations = Buffer.from(data.operations, 'base64');
}

this.gridController.multiplayerTransaction(data.id, data.sequence_num, data.operations);
offline.markTransactionSent(data.id);
if (await offline.unsentTransactionsCount()) {
Expand All @@ -298,11 +307,25 @@ class Core {
});
}

receiveTransactions(transactions: string) {
receiveTransactions(receive_transactions: MultiplayerCoreReceiveTransactions) {
return new Promise((resolve) => {
this.clientQueue.push(async () => {
if (!this.gridController) throw new Error('Expected gridController to be defined');
this.gridController.receiveMultiplayerTransactions(transactions);

for (let data of receive_transactions.transactions) {
// convert the base64 encoded string of operations into buffers
if (typeof data.operations === 'string') {
data.operations = Buffer.from(data.operations, 'base64');
}

this.gridController.multiplayerTransaction(data.id, data.sequence_num, data.operations);
ddimaria marked this conversation as resolved.
Show resolved Hide resolved
}

// TODO(ddimaria): re-enable 5 - 7 days after we roll out the compressed
// transactions PR, so that we'll know all transactions are of the same version.
//
// const transactionsBuffer = JSON.stringify(receive_transactions.transactions);
// this.gridController.receiveMultiplayerTransactions(transactionsBuffer);
if (await offline.unsentTransactionsCount()) {
coreClient.sendMultiplayerState('syncing');
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,12 @@ class CoreClient {
if (debugWebWorkers) console.log('[coreClient] initialized.');
}

private send(message: CoreClientMessage) {
self.postMessage(message);
private send(message: CoreClientMessage, transfer?: Transferable[]) {
if (transfer) {
self.postMessage(message, transfer);
} else {
self.postMessage(message);
}
}

private handleMessage = async (e: MessageEvent<ClientCoreMessage>) => {
Expand Down Expand Up @@ -338,7 +342,8 @@ class CoreClient {
return;

case 'clientCoreExport':
this.send({ type: 'coreClientExport', id: e.data.id, grid: await core.export() });
const exported = await core.export();
this.send({ type: 'coreClientExport', id: e.data.id, grid: exported }, [exported.buffer]);
return;

case 'clientCoreSearch':
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { core } from './core';

declare var self: WorkerGlobalScope &
typeof globalThis & {
sendTransaction: (transactionId: string, operations: string) => void;
sendTransaction: (transactionId: string, operations: Uint8Array) => void;
requestTransactions: (sequenceNum: number) => void;
};

Expand All @@ -18,9 +18,13 @@ class CoreMultiplayer {
if (debugWebWorkers) console.log('[coreMultiplayer] initialized');
}

private send(message: CoreMultiplayerMessage) {
private send(message: CoreMultiplayerMessage, transfer?: Transferable[]) {
if (!this.coreMessagePort) throw new Error('Expected coreMessagePort to be defined in CoreMultiplayer');
this.coreMessagePort.postMessage(message);
if (transfer) {
this.coreMessagePort.postMessage(message, transfer);
} else {
this.coreMessagePort.postMessage(message);
}
}

private handleMessage = (e: MessageEvent<MultiplayerCoreMessage>) => {
Expand All @@ -36,20 +40,23 @@ class CoreMultiplayer {
break;

case 'multiplayerCoreReceiveTransactions':
core.receiveTransactions(e.data.transactions);
core.receiveTransactions(e.data);
break;

default:
console.warn('[coreMultiplayer] Unhandled message type', e.data);
}
};

sendTransaction = (transactionId: string, operations: string) => {
this.send({
type: 'coreMultiplayerTransaction',
operations,
transaction_id: transactionId,
});
sendTransaction = (transactionId: string, operations: ArrayBuffer) => {
this.send(
{
type: 'coreMultiplayerTransaction',
operations,
transaction_id: transactionId,
},
[operations]
);
};

requestTransactions = (sequenceNum: number) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import {
declare var self: WorkerGlobalScope &
typeof globalThis & {
addUnsentTransaction: (transactionId: string, transaction: string, operations: number) => void;
sendTransaction: (transactionId: string, operations: string) => void;
sendTransaction: (transactionId: string, operations: ArrayBuffer) => void;
sendImportProgress: (
filename: string,
current: number,
Expand Down Expand Up @@ -94,8 +94,8 @@ export const addUnsentTransaction = (transactionId: string, transactions: string
return self.addUnsentTransaction(transactionId, transactions, operations);
};

export const jsSendTransaction = (transactionId: string, operations: string) => {
return self.sendTransaction(transactionId, operations);
export const jsSendTransaction = (transactionId: string, operations: Uint8Array) => {
return self.sendTransaction(transactionId, operations.buffer);
};

export const jsTime = (name: string) => console.time(name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ impl PendingTransaction {
&& !self.is_server()
{
let transaction_id = self.id.to_string();
match serde_json::to_string(&self.forward_operations) {

match Transaction::serialize_and_compress(&self.forward_operations) {
Ok(ops) => {
crate::wasm_bindings::js::jsSendTransaction(transaction_id, ops);
}
Expand Down
Loading
Loading