Skip to content

Graph synchronization on peer connection for improved data handshake #1404

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

bitnom
Copy link

@bitnom bitnom commented May 10, 2025

This pull request addresses inconsistencies in data synchronization between peers, particularly when a peer reconnects after being offline or during periods of rapid updates (as reported in issues #392 and #259). Previously, a manual browser refresh was often required to fully sync data across all connected clients.

Problem:
The existing peer connection handshake (hi event in src/mesh.js) primarily relied on the local peer's active subscriptions (root.next) to determine what data to request from a newly connected peer. This could lead to:

  • Missed updates that occurred while a peer was offline.
  • Failure to propagate changes made by a rejoining peer while it was offline.
  • Incomplete data discovery if a peer's root.next was stale or did not cover all relevant data.

Solution:
The root.on('hi', ...) handler in src/mesh.js has been enhanced to perform a more comprehensive data handshake:

  1. Full Graph Synchronization: Upon connection or reconnection, each peer now iterates over all souls in its local root.graph and sends GET requests for these souls to the other peer. Since this occurs reciprocally, both peers attempt to fetch each other's entire known graph state.
  2. Active Subscription Synchronization: The original logic to GET data based on root.next (specific, active subscriptions) is retained to complement the full graph sync, ensuring fine-grained interests are also covered.

Benefits:
This more aggressive synchronization strategy ensures that peers exchange all known data upon establishing a connection. This allows:

  • Gun's HAM (Hierarchical Acyclic Merkle) algorithm to correctly merge differing data states.
  • AXE's subscription logic to establish necessary subscriptions based on the comprehensive exchange of GET requests and subsequent PUT replies.
  • More reliable data consistency across peers, reducing the need for manual refreshes.

Considerations:
This change may lead to increased initial bandwidth usage for peers with very large graphs during the connection handshake. However, this trade-off prioritizes data consistency and aims to resolve the reported synchronization failures.

Related Issues:


The way this PR was coded (roocode) is related to #1370

AI: roocode + Gemini 2.5 Pro
Full roocode Session: https://gist.github.com/bitnom/64c911fcbe074a2f7a715eb6bd9baf2d

Note: This is an experimental way of making a PR. There could be mistakes. However, if you point out the mistakes, I can continue the roocode session to resolve them (If nobody wants to do it manually).

@amark
Copy link
Owner

amark commented Jun 15, 2025

@bitnom awwwwwsome!!!!

So here's the review, let's vote:

the AI fix brute force asks for all nodes (souls), rather than pending ones (tho glitchy) leading to less buggy re-sync on re-connect, however unbounded network usage. It looks like it'll also have a side effect of requesting an entire node even if the previous query only asked for 1 item off the node, so if that node happens to be something like get('users').get('frank') then suddenly all users (not just Frank) will start streaming back to the browser.

vote👍accepting it: stabs the re-connect resync bugs with a brutal but inefficient fix, however we need all the help we can get and this helps move things along!!!!
vote 👎rejecting it: would prevent accidental DDoS.

I'm inclined to accept it anyways cause most people are pulling from NPM not bleeding edge GH. A couple people in the twitter/X groupchat (can I add you please? I'm active there) have already voted yes also, but will let people vote here too.

@aheissenberger
Copy link
Contributor

I did not see any improvements on syncing data which was created offline (only local network exists on test system!) :-(

My setup:

  1. initialize 2 Users with no peers
  2. both users subscribe to myset
  3. both users set data
  4. both users add a peer
  5. both user set data
  6. initialize a third user with a peer
  7. third user subscribe to myset

The server knows only the data which is set after the peers have been added.
The third user does not get any of the data set by the first two users.

import GUN from 'gun';
import http from 'node:http';
import { Worker, isMainThread, parentPort, workerData } from 'node:worker_threads';
import { rm, mkdir } from 'node:fs/promises';

const port = Number(process.env?.HTTP_PORT ?? "8889");

const createGunClientDefaultOptions = (name) => ({
    file: `peer_cache/${name}.json`,
    axe: false,
    peers: [],
});

if (isMainThread) {

    try {
        // cleanup the relay cache
        await rm('./radata', { recursive: true, force: true });
        console.info('Directory "radata" and all its subdirectories have been removed.');
        // cleanup the peers cache
        await rm('./peer_cache', { recursive: true, force: true });
        console.info('Directory "peer_cache" and all its subdirectories have been removed.');
    } catch (error) {
        console.error('Error removing directory:', error);
    }

    await mkdir('./peer_cache', { recursive: true });

    if (isNaN(port) || port < 1 || port > 65535) {
        console.error(`Invalid port number "${port}". Please provide a number between 1 and 65535.`);
        process.exit(1);
    }
    const server = http.createServer(GUN.serve('./')).listen(port, () => {
        const gunServerUrl = `http://localhost:${port}/gun`;
        console.info(
            'Gun server started: ' + gunServerUrl
        );

        const workerBob = new Worker(new URL(import.meta.url), { workerData: { name: 'bob', options: { gunOptions: { peers: [] } } } });
        workerBob.on('message', (message) => {
            console.log('Message from worker Bob:', message);
        });
        const workerFred = new Worker(new URL(import.meta.url), { workerData: { name: 'fred', options: { gunOptions: { peers: [] } } } });
        workerFred.on('message', (message) => {
            console.log('Message from worker Fred:', message);
        });
        const workerAlice = new Worker(new URL(import.meta.url), { workerData: { name: 'alice', options: { gunOptions: { peers: [] } } } });
        workerAlice.on('message', (message) => {
            console.log('Message from worker Alice:', message);
        });

        workerBob.postMessage({ cmd: 'init', msgoptions: { gunOptions: { peers: [] } } });
        workerFred.postMessage({ cmd: 'init', msgoptions: { gunOptions: { peers: [] } } });
        //workerAlice.postMessage({ cmd: 'init', msgoptions: { gunOptions: { peers: [] } } });

        let nextStepTimer = 3000

        setTimeout(() => {
            console.log('Step: subscribe')
            workerBob.postMessage({ cmd: 'subscribe', msgoptions: { key: 'myset' } });
            workerFred.postMessage({ cmd: 'subscribe', msgoptions: { key: 'myset' } });
            //workerAlice.postMessage({ cmd: 'subscribe', msgoptions: { key: 'myset' } });
        }, nextStepTimer);
        nextStepTimer += 1000;

        setTimeout(() => {
            console.log('Step: set')
            workerBob.postMessage({ cmd: 'set', msgoptions: { key: 'myset', value: { bob: 't1' } } });
            workerFred.postMessage({ cmd: 'set', msgoptions: { key: 'myset', value: { fred: 't1' } } });
            //workerAlice.postMessage({ cmd: 'set', msgoptions: { key: 'myset', value: { alice: 't1' } } });
        }, nextStepTimer);
        nextStepTimer += 1000;

        setTimeout(() => {
            console.log('Step: peers')
            workerBob.postMessage({ cmd: 'peers', msgoptions: { peers: [gunServerUrl] } });
            //workerAlice.postMessage({ cmd: 'peers', msgoptions: { peers: [gunServerUrl] } });
            workerFred.postMessage({ cmd: 'peers', msgoptions: { peers: [gunServerUrl] } });
        }, nextStepTimer);
        nextStepTimer += 1000;

        // setTimeout(() => {
        //     console.log('Step: subscribe2')
        //     workerBob.postMessage({ cmd: 'subscribe', msgoptions: { key: 'myset' } });
        //     workerFred.postMessage({ cmd: 'subscribe', msgoptions: { key: 'myset' } });
        //     workerAlice.postMessage({ cmd: 'subscribe', msgoptions: { key: 'myset' } });
        // }, nextStepTimer);
        // nextStepTimer+=1000;

        setTimeout(() => {
            console.log('Step: set2')
            workerBob.postMessage({ cmd: 'set', msgoptions: { key: 'myset', value: { bob: 't2' } } });
            //workerAlice.postMessage({ cmd: 'set', msgoptions: { key: 'myset', value: { alice: 't2' } } });
            workerFred.postMessage({ cmd: 'set', msgoptions: { key: 'myset', value: { fred: 't2' } } });
        }, nextStepTimer);
        nextStepTimer += 1000;

        setTimeout(() => {
            console.log('Step: Alice init & peer')
            workerAlice.postMessage({ cmd: 'init', msgoptions: { gunOptions: { peers: [gunServerUrl] } } });
        }, nextStepTimer);
        nextStepTimer += 1000;

        setTimeout(() => {
            console.log('Step: Alice subscribe')
            workerAlice.postMessage({ cmd: 'subscribe', msgoptions: { key: 'myset' } });
        }, nextStepTimer);
        nextStepTimer += 1000;
    });
    const gun = GUN({ web: server });

} else { // WORKERS <name>
    const { name, options } = workerData;
    const debugOptions = JSON.stringify(options.gunOptions);
    // let gun = GUN(options.gunOptions);
    let gun = null;

    console.info(`${name}: Gun client started with options: ${debugOptions}`);
    parentPort.on('message', ({ cmd, msgoptions }) => {
        console.info(`${name}: Received command: ${cmd} with options: ${JSON.stringify(msgoptions)}`);
        switch (cmd) {
            case 'init':
                const clientOptions = { ...createGunClientDefaultOptions(name), ...msgoptions.gunOptions }
                gun = GUN(clientOptions);
                break;
            case 'peers':
                gun.opt({ peers: msgoptions.peers });
                gun.get('key').get('heartbeat').put('heartbeat')
                break;
            case 'subscribe':
                gun.get(msgoptions.key).on((data, key) => {
                    parentPort.postMessage(`received data: ${JSON.stringify(data)} with key: ${key}`);
                });
                break;
            case 'set':
                gun.get(msgoptions.key).set(msgoptions.value);
                break;
            case 'put':
                gun.get(msgoptions.key).put(msgoptions.value);
                break;

            default:
                console.error(`Unknown type "${type}"`);
        }
    })
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

I have to refresh one browser to trigger all browser to update
3 participants