Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: handle chain stall in peer manager #7508

Open
wants to merge 6 commits into
base: unstable
Choose a base branch
from
Open
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions packages/api/src/beacon/routes/lodestar.ts
Original file line number Diff line number Diff line change
@@ -73,6 +73,12 @@ export type StateCacheItem = {

export type LodestarNodePeer = NodePeer & {
agentVersion: string;
status: unknown | null;
metadata: unknown | null;
agentClient: string;
lastReceivedMsgUnixTsMs: number;
lastStatusUnixTsMs: number;
connectedUnixTsMs: number;
};

export type LodestarThreadType = "main" | "network" | "discv5";
27 changes: 19 additions & 8 deletions packages/beacon-node/src/network/core/networkCore.ts
Original file line number Diff line number Diff line change
@@ -7,7 +7,7 @@ import {BeaconConfig} from "@lodestar/config";
import type {LoggerNode} from "@lodestar/logger/node";
import {ForkName} from "@lodestar/params";
import {ResponseIncoming} from "@lodestar/reqresp";
import {Epoch, phase0} from "@lodestar/types";
import {Epoch, phase0, ssz} from "@lodestar/types";
import {fromHex, withTimeout} from "@lodestar/utils";
import {multiaddr} from "@multiformats/multiaddr";
import {formatNodePeer} from "../../api/impl/node/utils.js";
@@ -385,18 +385,29 @@ export class NetworkCore implements INetworkCore {
await this.libp2p.hangUp(peerIdFromString(peerIdStr));
}

private _dumpPeer(peerIdStr: string, connections: Connection[]): routes.lodestar.LodestarNodePeer {
const peerData = this.peersData.connectedPeers.get(peerIdStr);
return {
...formatNodePeer(peerIdStr, connections),
agentVersion: peerData?.agentVersion ?? "NA",
status: peerData?.status ? ssz.phase0.Status.toJson(peerData.status) : null,
metadata: peerData?.metadata ? ssz.altair.Metadata.toJson(peerData.metadata) : null,
agentClient: String(peerData?.agentClient ?? "Unknown"),
lastReceivedMsgUnixTsMs: peerData?.lastReceivedMsgUnixTsMs ?? 0,
lastStatusUnixTsMs: peerData?.lastStatusUnixTsMs ?? 0,
connectedUnixTsMs: peerData?.connectedUnixTsMs ?? 0,
};
}

async dumpPeer(peerIdStr: string): Promise<routes.lodestar.LodestarNodePeer | undefined> {
const connections = this.getConnectionsByPeer().get(peerIdStr);
return connections
? {...formatNodePeer(peerIdStr, connections), agentVersion: this.peersData.getAgentVersion(peerIdStr)}
: undefined;
return connections ? this._dumpPeer(peerIdStr, connections) : undefined;
}

async dumpPeers(): Promise<routes.lodestar.LodestarNodePeer[]> {
return Array.from(this.getConnectionsByPeer().entries()).map(([peerIdStr, connections]) => ({
...formatNodePeer(peerIdStr, connections),
agentVersion: this.peersData.getAgentVersion(peerIdStr),
}));
return Array.from(this.getConnectionsByPeer().entries()).map(([peerIdStr, connections]) =>
this._dumpPeer(peerIdStr, connections)
);
}

async dumpPeerScoreStats(): Promise<PeerScoreStats> {
33 changes: 30 additions & 3 deletions packages/beacon-node/src/network/peers/peerManager.ts
Original file line number Diff line number Diff line change
@@ -2,7 +2,7 @@ import {BitArray} from "@chainsafe/ssz";
import {Connection, PeerId} from "@libp2p/interface";
import {BeaconConfig} from "@lodestar/config";
import {LoggerNode} from "@lodestar/logger/node";
import {SYNC_COMMITTEE_SUBNET_COUNT} from "@lodestar/params";
import {SLOTS_PER_EPOCH, SYNC_COMMITTEE_SUBNET_COUNT} from "@lodestar/params";
import {Metadata, altair, phase0} from "@lodestar/types";
import {withTimeout} from "@lodestar/utils";
import {GOODBYE_KNOWN_CODES, GoodByeReasonCode, Libp2pEvent} from "../../constants/index.js";
@@ -53,6 +53,11 @@ const PEER_RELEVANT_TAG = "relevant";
/** Tag value of PEER_RELEVANT_TAG */
const PEER_RELEVANT_TAG_VALUE = 100;

/** Change pruning behavior once the head falls behind */
const STARVATION_THRESHOLD_SLOTS = SLOTS_PER_EPOCH * 2;
/** Percentage of peers to attempt to prune when starvation threshold is met */
const STARVATION_PRUNE_RATIO = 0.05;

/**
* Relative factor of peers that are allowed to have a negative gossipsub score without penalizing them in lodestar.
*/
@@ -140,6 +145,7 @@ export class PeerManager {
private readonly discovery: PeerDiscovery | null;
private readonly networkEventBus: INetworkEventBus;
private readonly statusCache: StatusCache;
private lastStatus: phase0.Status;

// A single map of connected peers with all necessary data to handle PINGs, STATUS, and metrics
private connectedPeers: Map<PeerIdStr, PeerData>;
@@ -173,6 +179,8 @@ export class PeerManager {
this.libp2p.services.components.events.addEventListener(Libp2pEvent.connectionClose, this.onLibp2pPeerDisconnect);
this.networkEventBus.on(NetworkEvent.reqRespRequest, this.onRequest);

this.lastStatus = this.statusCache.get();

// On start-up will connected to existing peers in libp2p.peerStore, same as autoDial behaviour
this.heartbeat();
this.intervals = [
@@ -341,7 +349,10 @@ export class PeerManager {
private onStatus(peer: PeerId, status: phase0.Status): void {
// reset the to-status timer of this peer
const peerData = this.connectedPeers.get(peer.toString());
if (peerData) peerData.lastStatusUnixTsMs = Date.now();
if (peerData) {
peerData.lastStatusUnixTsMs = Date.now();
peerData.status = status;
}

let isIrrelevant: boolean;
try {
@@ -449,12 +460,21 @@ export class PeerManager {
}
}

const status = this.statusCache.get();
const starved =
// while syncing progress is happening, we aren't starved
this.lastStatus.headSlot === status.headSlot &&
// if the head falls behind the threshold, we are starved
this.clock.currentSlot - status.headSlot > STARVATION_THRESHOLD_SLOTS;
this.lastStatus = status;

const {peersToDisconnect, peersToConnect, attnetQueries, syncnetQueries} = prioritizePeers(
connectedHealthyPeers.map((peer) => {
const peerData = this.connectedPeers.get(peer.toString());
return {
id: peer,
direction: peerData?.direction ?? null,
status: peerData?.status ?? null,
attnets: peerData?.metadata?.attnets ?? null,
syncnets: peerData?.metadata?.syncnets ?? null,
score: this.peerRpcScores.getScore(peer),
@@ -463,7 +483,13 @@ export class PeerManager {
// Collect subnets which we need peers for in the current slot
this.attnetsService.getActiveSubnets(),
this.syncnetsService.getActiveSubnets(),
this.opts
{
...this.opts,
status,
starved,
starvationPruneRatio: STARVATION_PRUNE_RATIO,
starvationThresholdSlots: STARVATION_THRESHOLD_SLOTS,
}
);

const queriesMerged: SubnetDiscvQueryMs[] = [];
@@ -597,6 +623,7 @@ export class PeerManager {
relevantStatus: RelevantPeerStatus.Unknown,
direction,
peerId: remotePeer,
status: null,
metadata: null,
agentVersion: null,
agentClient: null,
5 changes: 3 additions & 2 deletions packages/beacon-node/src/network/peers/peersData.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {PeerId} from "@libp2p/interface";
import {Peer, PeerId} from "@libp2p/interface";
import {Encoding} from "@lodestar/reqresp";
import {altair} from "@lodestar/types";
import {altair, phase0} from "@lodestar/types";
import {ClientKind} from "./client.js";

type PeerIdStr = string;
@@ -18,6 +18,7 @@ export type PeerData = {
relevantStatus: RelevantPeerStatus;
direction: "inbound" | "outbound";
peerId: PeerId;
status: phase0.Status | null;
metadata: altair.Metadata | null;
agentVersion: string | null;
agentClient: ClientKind | null;
Original file line number Diff line number Diff line change
@@ -40,9 +40,40 @@ const syncnetsZero = BitArray.fromBitLen(SYNC_COMMITTEE_SUBNET_COUNT);

type SubnetDiscvQuery = {subnet: SubnetID; toSlot: number; maxPeersToDiscover: number};

enum StatusScore {
/** The peer is far behind our chain */
FAR_BEHIND = -2,
/** The peer is close to our chain */
CLOSE_TO_US = -1,
/** The peer is far ahead of chain */
FAR_AHEAD = 0,
}

function computeStatusScore(ours: phase0.Status, theirs: phase0.Status | null, opts: PrioritizePeersOpts): StatusScore {
if (theirs === null) {
return StatusScore.CLOSE_TO_US;
}

if (theirs.finalizedEpoch > ours.finalizedEpoch) {
return StatusScore.FAR_AHEAD;
}

if (theirs.headSlot > ours.headSlot + opts.starvationThresholdSlots) {
return StatusScore.FAR_AHEAD;
}

// It seems dangerous to downscore peers that are far behind.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we ever return FAR_BEHIND, then lodestar nodes will most likely disconnect peers that want to sync from us which is not great for the network. Should we remove FAR_BEHIND completely? or add more info for it

if we delete FAR_BEHIND enum then there is only 2 options: CLOSE_TO_US and FAR_AHEAD. At synced time, this function will always return CLOSE_TO_US. Hence the main usage for this function is for syncing time

please add all of these to the function description for later reference

// if (ours.headSlot > theirs.headSlot + opts.starvationThresholdSlots) {
// return StatusScore.FAR_BEHIND;
// }

return StatusScore.CLOSE_TO_US;
}

type PeerInfo = {
id: PeerId;
direction: Direction | null;
statusScore: StatusScore;
attnets: phase0.AttestationSubnets;
syncnets: altair.SyncSubnets;
attnetsTrueBitIndices: number[];
@@ -53,6 +84,10 @@ type PeerInfo = {
export interface PrioritizePeersOpts {
targetPeers: number;
maxPeers: number;
status: phase0.Status;
starved: boolean;
starvationPruneRatio: number;
starvationThresholdSlots: number;
outboundPeersRatio?: number;
targetSubnetPeers?: number;
}
@@ -67,6 +102,7 @@ export enum ExcessPeerDisconnectReason {
/**
* Prioritize which peers to disconect and which to connect. Conditions:
* - Reach `targetPeers`
* - If we're starved for data, prune additional peers
* - Don't exceed `maxPeers`
* - Ensure there are enough peers per active subnet
* - Prioritize peers with good score
@@ -75,6 +111,7 @@ export function prioritizePeers(
connectedPeersInfo: {
id: PeerId;
direction: Direction | null;
status: phase0.Status | null;
attnets: phase0.AttestationSubnets | null;
syncnets: altair.SyncSubnets | null;
score: number;
@@ -98,6 +135,7 @@ export function prioritizePeers(
(peer): PeerInfo => ({
id: peer.id,
direction: peer.direction,
statusScore: computeStatusScore(opts.status, peer.status, opts),
attnets: peer.attnets ?? attnetsZero,
syncnets: peer.syncnets ?? syncnetsZero,
attnetsTrueBitIndices: peer.attnets?.getTrueBitIndexes() ?? [],
@@ -254,6 +292,11 @@ function pruneExcessPeers(
return false;
}

// Peers far ahead when we're starved for data are not eligible for pruning
if (opts.starved && peer.statusScore === StatusScore.FAR_AHEAD) {
return false;
}

// outbound peers up to OUTBOUND_PEER_RATIO sorted by highest score and not eligible for pruning
if (peer.direction === "outbound") {
if (outboundPeers - outboundPeersEligibleForPruning > outboundPeersTarget) {
@@ -269,7 +312,9 @@ function pruneExcessPeers(
let peersToDisconnectCount = 0;
const noLongLivedSubnetPeersToDisconnect: PeerId[] = [];

const peersToDisconnectTarget = connectedPeerCount - targetPeers;
const peersToDisconnectTarget =
// if we're starved for data, prune additional peers
connectedPeerCount - targetPeers + (opts.starved ? targetPeers * opts.starvationPruneRatio : 0);

// 1. Lodestar prefers disconnecting peers that does not have long lived subnets
// See https://github.com/ChainSafe/lodestar/issues/3940
@@ -396,6 +441,10 @@ export function sortPeersToPrune(connectedPeers: PeerInfo[], dutiesByPeer: Map<P
const dutiedSubnet1 = dutiesByPeer.get(p1) ?? 0;
const dutiedSubnet2 = dutiesByPeer.get(p2) ?? 0;
if (dutiedSubnet1 === dutiedSubnet2) {
const statusScore = p1.statusScore - p2.statusScore;
if (statusScore !== 0) {
return statusScore;
}
const [longLivedSubnets1, longLivedSubnets2] = [p1, p2].map(
(p) => p.attnetsTrueBitIndices.length + p.syncnetsTrueBitIndices.length
);