Skip to content

Commit 40a3cbd

Browse files
feat: add pull-based gate sync for cross-node message delivery
Nodes behind NAT could push gate messages to relays but had no way to pull messages from OTHER nodes back. The push loop only sends outbound; the public chain sync carries encrypted blobs but peer- pushed gate events never made it onto the relay's chain. Adds: - POST /api/mesh/gate/peer-pull: HMAC-authenticated endpoint that returns gate events a peer is missing (discovery mode returns all gate IDs with counts; per-gate mode returns event batches). - _http_gate_pull_loop: background thread (30s interval) that pulls new gate events from relay peers into local gate_store. This closes the loop: push sends YOUR messages out, pull fetches EVERYONE ELSE's messages back. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent b118840 commit 40a3cbd

File tree

1 file changed

+185
-0
lines changed

1 file changed

+185
-0
lines changed

backend/main.py

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -777,6 +777,136 @@ def _http_peer_push_loop() -> None:
777777
_NODE_SYNC_STOP.wait(_PEER_PUSH_INTERVAL_S)
778778

779779

780+
# ─── Background Gate Message Pull Worker ─────────────────────────────────
781+
# Periodically pulls gate events from relay peers that this node is missing.
782+
# Complements the push loop: push sends OUR events to peers, pull fetches
783+
# THEIR events from peers (needed when this node is behind NAT).
784+
785+
_GATE_PULL_INTERVAL_S = 30
786+
_gate_pull_last_count: dict[str, dict[str, int]] = {} # peer → {gate_id → known count}
787+
788+
789+
def _http_gate_pull_loop() -> None:
790+
"""Background thread: pull new gate messages from HTTP relay peers."""
791+
import requests as _requests
792+
from services.mesh.mesh_hashchain import gate_store
793+
794+
while not _NODE_SYNC_STOP.is_set():
795+
try:
796+
if not _participant_node_enabled():
797+
_NODE_SYNC_STOP.wait(_GATE_PULL_INTERVAL_S)
798+
continue
799+
800+
secret = str(get_settings().MESH_PEER_PUSH_SECRET or "").strip()
801+
if not secret:
802+
_NODE_SYNC_STOP.wait(_GATE_PULL_INTERVAL_S)
803+
continue
804+
805+
peers = authenticated_push_peer_urls()
806+
if not peers:
807+
_NODE_SYNC_STOP.wait(_GATE_PULL_INTERVAL_S)
808+
continue
809+
810+
for peer_url in peers:
811+
normalized = normalize_peer_url(peer_url)
812+
if not normalized:
813+
continue
814+
815+
peer_key = _derive_peer_key(secret, normalized)
816+
if not peer_key:
817+
continue
818+
819+
peer_counts = _gate_pull_last_count.setdefault(normalized, {})
820+
821+
try:
822+
# Step 1: Ask the peer which gates it has and how many events each
823+
discovery_body = json_mod.dumps(
824+
{"gate_id": "", "after_count": 0},
825+
sort_keys=True,
826+
separators=(",", ":"),
827+
ensure_ascii=False,
828+
).encode("utf-8")
829+
830+
import hmac as _hmac_pull
831+
import hashlib as _hashlib_pull
832+
discovery_hmac = _hmac_pull.new(peer_key, discovery_body, _hashlib_pull.sha256).hexdigest()
833+
834+
timeout = int(get_settings().MESH_RELAY_PUSH_TIMEOUT_S or 10)
835+
resp = _requests.post(
836+
f"{normalized}/api/mesh/gate/peer-pull",
837+
data=discovery_body,
838+
headers={
839+
"Content-Type": "application/json",
840+
"X-Peer-HMAC": discovery_hmac,
841+
},
842+
timeout=timeout,
843+
)
844+
if resp.status_code != 200:
845+
continue
846+
discovery = resp.json()
847+
if not discovery.get("ok"):
848+
continue
849+
remote_gates: dict[str, int] = discovery.get("gates", {})
850+
if not remote_gates:
851+
continue
852+
853+
# Step 2: For each gate with new events, pull the batch
854+
for gate_id, remote_total in remote_gates.items():
855+
local_known = peer_counts.get(gate_id, 0)
856+
# Also account for what we already have locally
857+
with gate_store._lock:
858+
local_count = len(gate_store._gates.get(gate_id, []))
859+
effective_cursor = max(local_known, local_count)
860+
if effective_cursor >= remote_total:
861+
continue
862+
863+
pull_body = json_mod.dumps(
864+
{"gate_id": gate_id, "after_count": effective_cursor},
865+
sort_keys=True,
866+
separators=(",", ":"),
867+
ensure_ascii=False,
868+
).encode("utf-8")
869+
870+
pull_hmac = _hmac_pull.new(peer_key, pull_body, _hashlib_pull.sha256).hexdigest()
871+
872+
pull_resp = _requests.post(
873+
f"{normalized}/api/mesh/gate/peer-pull",
874+
data=pull_body,
875+
headers={
876+
"Content-Type": "application/json",
877+
"X-Peer-HMAC": pull_hmac,
878+
},
879+
timeout=timeout,
880+
)
881+
if pull_resp.status_code != 200:
882+
continue
883+
pull_data = pull_resp.json()
884+
if not pull_data.get("ok"):
885+
continue
886+
887+
events = pull_data.get("events", [])
888+
if not events:
889+
peer_counts[gate_id] = remote_total
890+
continue
891+
892+
result = gate_store.ingest_peer_events(gate_id, events)
893+
accepted = int(result.get("accepted", 0) or 0)
894+
dups = int(result.get("duplicates", 0) or 0)
895+
if accepted > 0:
896+
logger.info(
897+
"Gate pull: %d new event(s) for %s from %s",
898+
accepted, gate_id[:12], normalized[:40],
899+
)
900+
peer_counts[gate_id] = effective_cursor + len(events)
901+
902+
except Exception as exc:
903+
logger.warning("Gate pull from %s failed: %s", normalized[:40], exc)
904+
905+
except Exception:
906+
logger.exception("HTTP gate pull loop error")
907+
_NODE_SYNC_STOP.wait(_GATE_PULL_INTERVAL_S)
908+
909+
780910
# ─── Background Gate Message Push Worker ─────────────────────────────────
781911

782912
_gate_push_last_count: dict[str, dict[str, int]] = {} # peer → {gate_id → count}
@@ -1221,6 +1351,7 @@ def _verify_loop():
12211351
threading.Thread(target=_public_infonet_sync_loop, daemon=True).start()
12221352
threading.Thread(target=_http_peer_push_loop, daemon=True).start()
12231353
threading.Thread(target=_http_gate_push_loop, daemon=True).start()
1354+
threading.Thread(target=_http_gate_pull_loop, daemon=True).start()
12241355
global _NODE_PUBLIC_EVENT_HOOK_REGISTERED
12251356
if not _NODE_PUBLIC_EVENT_HOOK_REGISTERED:
12261357
register_public_event_append_hook(_schedule_public_event_propagation)
@@ -4304,6 +4435,60 @@ async def gate_peer_push(request: Request):
43044435
return {"ok": True, "accepted": accepted, "duplicates": duplicates, "rejected": rejected}
43054436

43064437

4438+
@app.post("/api/mesh/gate/peer-pull")
4439+
@limiter.limit("30/minute")
4440+
async def gate_peer_pull(request: Request):
4441+
"""Return gate events a peer is missing (HMAC-authenticated pull sync).
4442+
4443+
Body: {"gate_id": "...", "after_count": N}
4444+
Returns up to 50 events after the caller's known count for that gate.
4445+
"""
4446+
content_length = request.headers.get("content-length")
4447+
if content_length:
4448+
try:
4449+
if int(content_length) > 65_536:
4450+
return Response(
4451+
content='{"ok":false,"detail":"Request body too large"}',
4452+
status_code=413,
4453+
media_type="application/json",
4454+
)
4455+
except (ValueError, TypeError):
4456+
pass
4457+
4458+
from services.mesh.mesh_hashchain import gate_store
4459+
4460+
body_bytes = await request.body()
4461+
if not _verify_peer_push_hmac(request, body_bytes):
4462+
return Response(
4463+
content='{"ok":false,"detail":"Invalid or missing peer HMAC"}',
4464+
status_code=403,
4465+
media_type="application/json",
4466+
)
4467+
4468+
body = json_mod.loads(body_bytes or b"{}")
4469+
gate_id = str(body.get("gate_id", "") or "").strip().lower()
4470+
after_count = _safe_int(body.get("after_count", 0) or 0)
4471+
4472+
if not gate_id:
4473+
# If no gate_id, return all known gate IDs with their event counts
4474+
# so the puller knows which gates to sync.
4475+
gate_ids = gate_store.known_gate_ids()
4476+
gate_counts: dict[str, int] = {}
4477+
for gid in gate_ids:
4478+
with gate_store._lock:
4479+
gate_counts[gid] = len(gate_store._gates.get(gid, []))
4480+
return {"ok": True, "gates": gate_counts}
4481+
4482+
with gate_store._lock:
4483+
all_events = list(gate_store._gates.get(gate_id, []))
4484+
total = len(all_events)
4485+
if after_count >= total:
4486+
return {"ok": True, "events": [], "total": total, "gate_id": gate_id}
4487+
4488+
batch = all_events[after_count : after_count + _PEER_PUSH_BATCH_SIZE]
4489+
return {"ok": True, "events": batch, "total": total, "gate_id": gate_id}
4490+
4491+
43074492
# ---------------------------------------------------------------------------
43084493
# Peer Management API — operator endpoints for adding / removing / listing
43094494
# peers without editing peer_store.json by hand.

0 commit comments

Comments
 (0)