-
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Graph synchronization on peer connection for improved data handshake #1404
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
@bitnom awwwwwsome!!!! So here's the review, let's vote: the AI fix brute force asks for all nodes (souls), rather than pending ones (tho glitchy) leading to less buggy re-sync on re-connect, however unbounded network usage. It looks like it'll also have a side effect of requesting an entire node even if the previous query only asked for 1 item off the node, so if that node happens to be something like vote👍accepting it: stabs the re-connect resync bugs with a brutal but inefficient fix, however we need all the help we can get and this helps move things along!!!! I'm inclined to accept it anyways cause most people are pulling from NPM not bleeding edge GH. A couple people in the twitter/X groupchat (can I add you please? I'm active there) have already voted yes also, but will let people vote here too. |
I did not see any improvements on syncing data which was created offline (only local network exists on test system!) :-( My setup:
The server knows only the data which is set after the peers have been added. import GUN from 'gun';
import http from 'node:http';
import { Worker, isMainThread, parentPort, workerData } from 'node:worker_threads';
import { rm, mkdir } from 'node:fs/promises';
const port = Number(process.env?.HTTP_PORT ?? "8889");
const createGunClientDefaultOptions = (name) => ({
file: `peer_cache/${name}.json`,
axe: false,
peers: [],
});
if (isMainThread) {
try {
// cleanup the relay cache
await rm('./radata', { recursive: true, force: true });
console.info('Directory "radata" and all its subdirectories have been removed.');
// cleanup the peers cache
await rm('./peer_cache', { recursive: true, force: true });
console.info('Directory "peer_cache" and all its subdirectories have been removed.');
} catch (error) {
console.error('Error removing directory:', error);
}
await mkdir('./peer_cache', { recursive: true });
if (isNaN(port) || port < 1 || port > 65535) {
console.error(`Invalid port number "${port}". Please provide a number between 1 and 65535.`);
process.exit(1);
}
const server = http.createServer(GUN.serve('./')).listen(port, () => {
const gunServerUrl = `http://localhost:${port}/gun`;
console.info(
'Gun server started: ' + gunServerUrl
);
const workerBob = new Worker(new URL(import.meta.url), { workerData: { name: 'bob', options: { gunOptions: { peers: [] } } } });
workerBob.on('message', (message) => {
console.log('Message from worker Bob:', message);
});
const workerFred = new Worker(new URL(import.meta.url), { workerData: { name: 'fred', options: { gunOptions: { peers: [] } } } });
workerFred.on('message', (message) => {
console.log('Message from worker Fred:', message);
});
const workerAlice = new Worker(new URL(import.meta.url), { workerData: { name: 'alice', options: { gunOptions: { peers: [] } } } });
workerAlice.on('message', (message) => {
console.log('Message from worker Alice:', message);
});
workerBob.postMessage({ cmd: 'init', msgoptions: { gunOptions: { peers: [] } } });
workerFred.postMessage({ cmd: 'init', msgoptions: { gunOptions: { peers: [] } } });
//workerAlice.postMessage({ cmd: 'init', msgoptions: { gunOptions: { peers: [] } } });
let nextStepTimer = 3000
setTimeout(() => {
console.log('Step: subscribe')
workerBob.postMessage({ cmd: 'subscribe', msgoptions: { key: 'myset' } });
workerFred.postMessage({ cmd: 'subscribe', msgoptions: { key: 'myset' } });
//workerAlice.postMessage({ cmd: 'subscribe', msgoptions: { key: 'myset' } });
}, nextStepTimer);
nextStepTimer += 1000;
setTimeout(() => {
console.log('Step: set')
workerBob.postMessage({ cmd: 'set', msgoptions: { key: 'myset', value: { bob: 't1' } } });
workerFred.postMessage({ cmd: 'set', msgoptions: { key: 'myset', value: { fred: 't1' } } });
//workerAlice.postMessage({ cmd: 'set', msgoptions: { key: 'myset', value: { alice: 't1' } } });
}, nextStepTimer);
nextStepTimer += 1000;
setTimeout(() => {
console.log('Step: peers')
workerBob.postMessage({ cmd: 'peers', msgoptions: { peers: [gunServerUrl] } });
//workerAlice.postMessage({ cmd: 'peers', msgoptions: { peers: [gunServerUrl] } });
workerFred.postMessage({ cmd: 'peers', msgoptions: { peers: [gunServerUrl] } });
}, nextStepTimer);
nextStepTimer += 1000;
// setTimeout(() => {
// console.log('Step: subscribe2')
// workerBob.postMessage({ cmd: 'subscribe', msgoptions: { key: 'myset' } });
// workerFred.postMessage({ cmd: 'subscribe', msgoptions: { key: 'myset' } });
// workerAlice.postMessage({ cmd: 'subscribe', msgoptions: { key: 'myset' } });
// }, nextStepTimer);
// nextStepTimer+=1000;
setTimeout(() => {
console.log('Step: set2')
workerBob.postMessage({ cmd: 'set', msgoptions: { key: 'myset', value: { bob: 't2' } } });
//workerAlice.postMessage({ cmd: 'set', msgoptions: { key: 'myset', value: { alice: 't2' } } });
workerFred.postMessage({ cmd: 'set', msgoptions: { key: 'myset', value: { fred: 't2' } } });
}, nextStepTimer);
nextStepTimer += 1000;
setTimeout(() => {
console.log('Step: Alice init & peer')
workerAlice.postMessage({ cmd: 'init', msgoptions: { gunOptions: { peers: [gunServerUrl] } } });
}, nextStepTimer);
nextStepTimer += 1000;
setTimeout(() => {
console.log('Step: Alice subscribe')
workerAlice.postMessage({ cmd: 'subscribe', msgoptions: { key: 'myset' } });
}, nextStepTimer);
nextStepTimer += 1000;
});
const gun = GUN({ web: server });
} else { // WORKERS <name>
const { name, options } = workerData;
const debugOptions = JSON.stringify(options.gunOptions);
// let gun = GUN(options.gunOptions);
let gun = null;
console.info(`${name}: Gun client started with options: ${debugOptions}`);
parentPort.on('message', ({ cmd, msgoptions }) => {
console.info(`${name}: Received command: ${cmd} with options: ${JSON.stringify(msgoptions)}`);
switch (cmd) {
case 'init':
const clientOptions = { ...createGunClientDefaultOptions(name), ...msgoptions.gunOptions }
gun = GUN(clientOptions);
break;
case 'peers':
gun.opt({ peers: msgoptions.peers });
gun.get('key').get('heartbeat').put('heartbeat')
break;
case 'subscribe':
gun.get(msgoptions.key).on((data, key) => {
parentPort.postMessage(`received data: ${JSON.stringify(data)} with key: ${key}`);
});
break;
case 'set':
gun.get(msgoptions.key).set(msgoptions.value);
break;
case 'put':
gun.get(msgoptions.key).put(msgoptions.value);
break;
default:
console.error(`Unknown type "${type}"`);
}
})
} |
This pull request addresses inconsistencies in data synchronization between peers, particularly when a peer reconnects after being offline or during periods of rapid updates (as reported in issues #392 and #259). Previously, a manual browser refresh was often required to fully sync data across all connected clients.
Problem:
The existing peer connection handshake (
hi
event insrc/mesh.js
) primarily relied on the local peer's active subscriptions (root.next
) to determine what data to request from a newly connected peer. This could lead to:root.next
was stale or did not cover all relevant data.Solution:
The
root.on('hi', ...)
handler insrc/mesh.js
has been enhanced to perform a more comprehensive data handshake:root.graph
and sendsGET
requests for these souls to the other peer. Since this occurs reciprocally, both peers attempt to fetch each other's entire known graph state.GET
data based onroot.next
(specific, active subscriptions) is retained to complement the full graph sync, ensuring fine-grained interests are also covered.Benefits:
This more aggressive synchronization strategy ensures that peers exchange all known data upon establishing a connection. This allows:
GET
requests and subsequentPUT
replies.Considerations:
This change may lead to increased initial bandwidth usage for peers with very large graphs during the connection handshake. However, this trade-off prioritizes data consistency and aims to resolve the reported synchronization failures.
Related Issues:
The way this PR was coded (roocode) is related to #1370
AI: roocode + Gemini 2.5 Pro
Full roocode Session: https://gist.github.com/bitnom/64c911fcbe074a2f7a715eb6bd9baf2d
Note: This is an experimental way of making a PR. There could be mistakes. However, if you point out the mistakes, I can continue the roocode session to resolve them (If nobody wants to do it manually).