diff --git a/packages/api/src/beacon/routes/lodestar.ts b/packages/api/src/beacon/routes/lodestar.ts index e5a0dae93f21..d6b713d22694 100644 --- a/packages/api/src/beacon/routes/lodestar.ts +++ b/packages/api/src/beacon/routes/lodestar.ts @@ -76,6 +76,12 @@ export type StateCacheItem = { export type LodestarNodePeer = NodePeer & { agentVersion: string; + status: unknown | null; + metadata: unknown | null; + agentClient: string; + lastReceivedMsgUnixTsMs: number; + lastStatusUnixTsMs: number; + connectedUnixTsMs: number; }; export type BlacklistedBlock = {root: RootHex; slot: Slot | null}; diff --git a/packages/beacon-node/src/network/core/metrics.ts b/packages/beacon-node/src/network/core/metrics.ts index d2c9c3314746..201159207957 100644 --- a/packages/beacon-node/src/network/core/metrics.ts +++ b/packages/beacon-node/src/network/core/metrics.ts @@ -112,6 +112,10 @@ export function createNetworkCoreMetrics(register: RegistryMetricCreator) { help: "Peer manager heartbeat function duration in seconds", buckets: [0.001, 0.01, 0.1, 1], }), + starved: register.gauge({ + name: "lodestar_peer_manager_starved_bool", + help: "Whether lodestar is starved of data while syncing", + }), }, leakedConnectionsCount: register.gauge({ name: "lodestar_peer_manager_leaked_connections_count", diff --git a/packages/beacon-node/src/network/core/networkCore.ts b/packages/beacon-node/src/network/core/networkCore.ts index ff4f1ef77ec7..8c83d77abc7f 100644 --- a/packages/beacon-node/src/network/core/networkCore.ts +++ b/packages/beacon-node/src/network/core/networkCore.ts @@ -7,7 +7,7 @@ import {BeaconConfig} from "@lodestar/config"; import type {LoggerNode} from "@lodestar/logger/node"; import {ForkName} from "@lodestar/params"; import {ResponseIncoming} from "@lodestar/reqresp"; -import {Epoch, phase0} from "@lodestar/types"; +import {Epoch, phase0, ssz} from "@lodestar/types"; import {fromHex} from "@lodestar/utils"; import {multiaddr} from "@multiformats/multiaddr"; import {formatNodePeer} from "../../api/impl/node/utils.js"; @@ -387,18 +387,29 @@ export class NetworkCore implements INetworkCore { await this.libp2p.hangUp(peerIdFromString(peerIdStr)); } + private _dumpPeer(peerIdStr: string, connections: Connection[]): routes.lodestar.LodestarNodePeer { + const peerData = this.peersData.connectedPeers.get(peerIdStr); + return { + ...formatNodePeer(peerIdStr, connections), + agentVersion: peerData?.agentVersion ?? "NA", + status: peerData?.status ? ssz.phase0.Status.toJson(peerData.status) : null, + metadata: peerData?.metadata ? ssz.altair.Metadata.toJson(peerData.metadata) : null, + agentClient: String(peerData?.agentClient ?? "Unknown"), + lastReceivedMsgUnixTsMs: peerData?.lastReceivedMsgUnixTsMs ?? 0, + lastStatusUnixTsMs: peerData?.lastStatusUnixTsMs ?? 0, + connectedUnixTsMs: peerData?.connectedUnixTsMs ?? 0, + }; + } + async dumpPeer(peerIdStr: string): Promise { const connections = this.getConnectionsByPeer().get(peerIdStr); - return connections - ? {...formatNodePeer(peerIdStr, connections), agentVersion: this.peersData.getAgentVersion(peerIdStr)} - : undefined; + return connections ? this._dumpPeer(peerIdStr, connections) : undefined; } async dumpPeers(): Promise { - return Array.from(this.getConnectionsByPeer().entries()).map(([peerIdStr, connections]) => ({ - ...formatNodePeer(peerIdStr, connections), - agentVersion: this.peersData.getAgentVersion(peerIdStr), - })); + return Array.from(this.getConnectionsByPeer().entries()).map(([peerIdStr, connections]) => + this._dumpPeer(peerIdStr, connections) + ); } async dumpPeerScoreStats(): Promise { diff --git a/packages/beacon-node/src/network/peers/peerManager.ts b/packages/beacon-node/src/network/peers/peerManager.ts index fb0e1a367ad7..d624ed43211e 100644 --- a/packages/beacon-node/src/network/peers/peerManager.ts +++ b/packages/beacon-node/src/network/peers/peerManager.ts @@ -2,7 +2,7 @@ import {BitArray} from "@chainsafe/ssz"; import {Connection, PeerId, PrivateKey} from "@libp2p/interface"; import {BeaconConfig} from "@lodestar/config"; import {LoggerNode} from "@lodestar/logger/node"; -import {SYNC_COMMITTEE_SUBNET_COUNT} from "@lodestar/params"; +import {SLOTS_PER_EPOCH, SYNC_COMMITTEE_SUBNET_COUNT} from "@lodestar/params"; import {Metadata, altair, phase0} from "@lodestar/types"; import {withTimeout} from "@lodestar/utils"; import {GOODBYE_KNOWN_CODES, GoodByeReasonCode, Libp2pEvent} from "../../constants/index.js"; @@ -53,6 +53,11 @@ const PEER_RELEVANT_TAG = "relevant"; /** Tag value of PEER_RELEVANT_TAG */ const PEER_RELEVANT_TAG_VALUE = 100; +/** Change pruning behavior once the head falls behind */ +const STARVATION_THRESHOLD_SLOTS = SLOTS_PER_EPOCH * 2; +/** Percentage of peers to attempt to prune when starvation threshold is met */ +const STARVATION_PRUNE_RATIO = 0.05; + /** * Relative factor of peers that are allowed to have a negative gossipsub score without penalizing them in lodestar. */ @@ -141,6 +146,7 @@ export class PeerManager { private readonly discovery: PeerDiscovery | null; private readonly networkEventBus: INetworkEventBus; private readonly statusCache: StatusCache; + private lastStatus: phase0.Status; // A single map of connected peers with all necessary data to handle PINGs, STATUS, and metrics private connectedPeers: Map; @@ -174,6 +180,8 @@ export class PeerManager { this.libp2p.services.components.events.addEventListener(Libp2pEvent.connectionClose, this.onLibp2pPeerDisconnect); this.networkEventBus.on(NetworkEvent.reqRespRequest, this.onRequest); + this.lastStatus = this.statusCache.get(); + // On start-up will connected to existing peers in libp2p.peerStore, same as autoDial behaviour this.heartbeat(); this.intervals = [ @@ -342,7 +350,10 @@ export class PeerManager { private onStatus(peer: PeerId, status: phase0.Status): void { // reset the to-status timer of this peer const peerData = this.connectedPeers.get(peer.toString()); - if (peerData) peerData.lastStatusUnixTsMs = Date.now(); + if (peerData) { + peerData.lastStatusUnixTsMs = Date.now(); + peerData.status = status; + } let isIrrelevant: boolean; try { @@ -450,12 +461,22 @@ export class PeerManager { } } + const status = this.statusCache.get(); + const starved = + // while syncing progress is happening, we aren't starved + this.lastStatus.headSlot === status.headSlot && + // if the head falls behind the threshold, we are starved + this.clock.currentSlot - status.headSlot > STARVATION_THRESHOLD_SLOTS; + this.lastStatus = status; + this.metrics?.peerManager.starved.set(starved ? 1 : 0); + const {peersToDisconnect, peersToConnect, attnetQueries, syncnetQueries} = prioritizePeers( connectedHealthyPeers.map((peer) => { const peerData = this.connectedPeers.get(peer.toString()); return { id: peer, direction: peerData?.direction ?? null, + status: peerData?.status ?? null, attnets: peerData?.metadata?.attnets ?? null, syncnets: peerData?.metadata?.syncnets ?? null, score: this.peerRpcScores.getScore(peer), @@ -464,7 +485,13 @@ export class PeerManager { // Collect subnets which we need peers for in the current slot this.attnetsService.getActiveSubnets(), this.syncnetsService.getActiveSubnets(), - this.opts + { + ...this.opts, + status, + starved, + starvationPruneRatio: STARVATION_PRUNE_RATIO, + starvationThresholdSlots: STARVATION_THRESHOLD_SLOTS, + } ); const queriesMerged: SubnetDiscvQueryMs[] = []; @@ -598,6 +625,7 @@ export class PeerManager { relevantStatus: RelevantPeerStatus.Unknown, direction, peerId: remotePeer, + status: null, metadata: null, agentVersion: null, agentClient: null, diff --git a/packages/beacon-node/src/network/peers/peersData.ts b/packages/beacon-node/src/network/peers/peersData.ts index 1a2619f202cd..a0121c1dd695 100644 --- a/packages/beacon-node/src/network/peers/peersData.ts +++ b/packages/beacon-node/src/network/peers/peersData.ts @@ -1,6 +1,6 @@ -import {PeerId} from "@libp2p/interface"; +import {Peer, PeerId} from "@libp2p/interface"; import {Encoding} from "@lodestar/reqresp"; -import {altair} from "@lodestar/types"; +import {altair, phase0} from "@lodestar/types"; import {ClientKind} from "./client.js"; type PeerIdStr = string; @@ -18,6 +18,7 @@ export type PeerData = { relevantStatus: RelevantPeerStatus; direction: "inbound" | "outbound"; peerId: PeerId; + status: phase0.Status | null; metadata: altair.Metadata | null; agentVersion: string | null; agentClient: ClientKind | null; diff --git a/packages/beacon-node/src/network/peers/utils/prioritizePeers.ts b/packages/beacon-node/src/network/peers/utils/prioritizePeers.ts index 9aa95e8af645..865b9f34d9d5 100644 --- a/packages/beacon-node/src/network/peers/utils/prioritizePeers.ts +++ b/packages/beacon-node/src/network/peers/utils/prioritizePeers.ts @@ -40,9 +40,50 @@ const syncnetsZero = BitArray.fromBitLen(SYNC_COMMITTEE_SUBNET_COUNT); type SubnetDiscvQuery = {subnet: SubnetID; toSlot: number; maxPeersToDiscover: number}; +/** + * Comparison of our status vs a peer's status. + * + * The main usage of this score is to feed into peer priorization during syncing, and especially when the node is having trouble finding data during syncing + * + * For network stability, we DON'T distinguish peers that are far behind us vs peers that are close to us. + */ +enum StatusScore { + /** The peer is close to our chain */ + CLOSE_TO_US = -1, + /** The peer is far ahead of chain */ + FAR_AHEAD = 0, +} + +/** + * In practice, this score only tracks if the peer is far ahead of us or not during syncing. + * When the node is synced, the peer is always CLOSE_TO_US. + */ +function computeStatusScore(ours: phase0.Status, theirs: phase0.Status | null, opts: PrioritizePeersOpts): StatusScore { + if (theirs === null) { + return StatusScore.CLOSE_TO_US; + } + + if (theirs.finalizedEpoch > ours.finalizedEpoch) { + return StatusScore.FAR_AHEAD; + } + + if (theirs.headSlot > ours.headSlot + opts.starvationThresholdSlots) { + return StatusScore.FAR_AHEAD; + } + + // It's dangerous to downscore peers that are far behind. + // This means we'd be more likely to disconnect peers that are attempting to sync, which would affect network stability. + // if (ours.headSlot > theirs.headSlot + opts.starvationThresholdSlots) { + // return StatusScore.FAR_BEHIND; + // } + + return StatusScore.CLOSE_TO_US; +} + type PeerInfo = { id: PeerId; direction: Direction | null; + statusScore: StatusScore; attnets: phase0.AttestationSubnets; syncnets: altair.SyncSubnets; attnetsTrueBitIndices: number[]; @@ -53,6 +94,10 @@ type PeerInfo = { export interface PrioritizePeersOpts { targetPeers: number; maxPeers: number; + status: phase0.Status; + starved: boolean; + starvationPruneRatio: number; + starvationThresholdSlots: number; outboundPeersRatio?: number; targetSubnetPeers?: number; } @@ -67,6 +112,7 @@ export enum ExcessPeerDisconnectReason { /** * Prioritize which peers to disconect and which to connect. Conditions: * - Reach `targetPeers` + * - If we're starved for data, prune additional peers * - Don't exceed `maxPeers` * - Ensure there are enough peers per active subnet * - Prioritize peers with good score @@ -75,6 +121,7 @@ export function prioritizePeers( connectedPeersInfo: { id: PeerId; direction: Direction | null; + status: phase0.Status | null; attnets: phase0.AttestationSubnets | null; syncnets: altair.SyncSubnets | null; score: number; @@ -98,6 +145,7 @@ export function prioritizePeers( (peer): PeerInfo => ({ id: peer.id, direction: peer.direction, + statusScore: computeStatusScore(opts.status, peer.status, opts), attnets: peer.attnets ?? attnetsZero, syncnets: peer.syncnets ?? syncnetsZero, attnetsTrueBitIndices: peer.attnets?.getTrueBitIndexes() ?? [], @@ -254,6 +302,11 @@ function pruneExcessPeers( return false; } + // Peers far ahead when we're starved for data are not eligible for pruning + if (opts.starved && peer.statusScore === StatusScore.FAR_AHEAD) { + return false; + } + // outbound peers up to OUTBOUND_PEER_RATIO sorted by highest score and not eligible for pruning if (peer.direction === "outbound") { if (outboundPeers - outboundPeersEligibleForPruning > outboundPeersTarget) { @@ -269,7 +322,9 @@ function pruneExcessPeers( let peersToDisconnectCount = 0; const noLongLivedSubnetPeersToDisconnect: PeerId[] = []; - const peersToDisconnectTarget = connectedPeerCount - targetPeers; + const peersToDisconnectTarget = + // if we're starved for data, prune additional peers + connectedPeerCount - targetPeers + (opts.starved ? targetPeers * opts.starvationPruneRatio : 0); // 1. Lodestar prefers disconnecting peers that does not have long lived subnets // See https://github.com/ChainSafe/lodestar/issues/3940 @@ -387,7 +442,7 @@ function pruneExcessPeers( /** * Sort peers ascending, peer-0 has the most chance to prune, peer-n has the least. * Shuffling first to break ties. - * prefer sorting by dutied subnets first then number of long lived subnets, + * prefer sorting by status score (applicable during syncing), then dutied subnets, then number of long lived subnets, then peer score * peer score is the last criteria since they are supposed to be in the same score range, * bad score peers are removed by peer manager anyway */ @@ -396,6 +451,10 @@ export function sortPeersToPrune(connectedPeers: PeerInfo[], dutiesByPeer: Map

p.attnetsTrueBitIndices.length + p.syncnetsTrueBitIndices.length ); diff --git a/packages/beacon-node/test/e2e/network/onWorker/dataSerialization.test.ts b/packages/beacon-node/test/e2e/network/onWorker/dataSerialization.test.ts index 7a132cc91821..7103d532e430 100644 --- a/packages/beacon-node/test/e2e/network/onWorker/dataSerialization.test.ts +++ b/packages/beacon-node/test/e2e/network/onWorker/dataSerialization.test.ts @@ -160,6 +160,12 @@ describe("data serialization through worker boundary", () => { state: "connected", direction: "inbound", agentVersion: "test", + status: null, + metadata: null, + agentClient: "test", + lastReceivedMsgUnixTsMs: 0, + lastStatusUnixTsMs: 0, + connectedUnixTsMs: 0, }; // If return type is void, set to null diff --git a/packages/beacon-node/test/perf/network/peers/util/prioritizePeers.test.ts b/packages/beacon-node/test/perf/network/peers/util/prioritizePeers.test.ts index c79655b4af11..6d1790ea6b83 100644 --- a/packages/beacon-node/test/perf/network/peers/util/prioritizePeers.test.ts +++ b/packages/beacon-node/test/perf/network/peers/util/prioritizePeers.test.ts @@ -2,8 +2,8 @@ import {beforeAll, bench, describe} from "@chainsafe/benchmark"; import {generateKeyPair} from "@libp2p/crypto/keys"; import {PeerId} from "@libp2p/interface"; import {peerIdFromPrivateKey} from "@libp2p/peer-id"; -import {ATTESTATION_SUBNET_COUNT, SYNC_COMMITTEE_SUBNET_COUNT} from "@lodestar/params"; -import {altair, phase0} from "@lodestar/types"; +import {ATTESTATION_SUBNET_COUNT, SLOTS_PER_EPOCH, SYNC_COMMITTEE_SUBNET_COUNT} from "@lodestar/params"; +import {altair, phase0, ssz} from "@lodestar/types"; import {defaultNetworkOptions} from "../../../../../src/network/options.js"; import {RequestedSubnet, prioritizePeers} from "../../../../../src/network/peers/utils/index.js"; import {getAttnets, getSyncnets} from "../../../../utils/network.js"; @@ -103,6 +103,7 @@ describe("prioritizePeers", () => { Array.from({length: Math.floor(syncnetPercentage * SYNC_COMMITTEE_SUBNET_COUNT)}, (_, i) => i) ), score: lowestScore + ((highestScore - lowestScore) * i) / defaultNetworkOptions.maxPeers, + status: ssz.phase0.Status.defaultValue(), })); const attnets: RequestedSubnet[] = []; @@ -117,7 +118,13 @@ describe("prioritizePeers", () => { return {connectedPeers, attnets, syncnets}; }, fn: ({connectedPeers, attnets, syncnets}) => { - prioritizePeers(connectedPeers, attnets, syncnets, defaultNetworkOptions); + prioritizePeers(connectedPeers, attnets, syncnets, { + ...defaultNetworkOptions, + status: ssz.phase0.Status.defaultValue(), + starved: false, + starvationPruneRatio: 0.05, + starvationThresholdSlots: SLOTS_PER_EPOCH * 2, + }); }, }); } diff --git a/packages/beacon-node/test/unit/network/peers/priorization.test.ts b/packages/beacon-node/test/unit/network/peers/priorization.test.ts index 84472239b19d..4d393c41e156 100644 --- a/packages/beacon-node/test/unit/network/peers/priorization.test.ts +++ b/packages/beacon-node/test/unit/network/peers/priorization.test.ts @@ -2,7 +2,8 @@ import {BitArray} from "@chainsafe/ssz"; import {generateKeyPair} from "@libp2p/crypto/keys"; import {PeerId} from "@libp2p/interface"; import {peerIdFromPrivateKey} from "@libp2p/peer-id"; -import {ATTESTATION_SUBNET_COUNT} from "@lodestar/params"; +import {ATTESTATION_SUBNET_COUNT, SLOTS_PER_EPOCH} from "@lodestar/params"; +import {ssz} from "@lodestar/types"; import {describe, expect, it} from "vitest"; import {RequestedSubnet} from "../../../../src/network/peers/utils/index.js"; import { @@ -24,6 +25,16 @@ describe("network / peers / priorization", async () => { peers.push(peer); } const none = BitArray.fromBitLen(ATTESTATION_SUBNET_COUNT); + const status = ssz.phase0.Status.defaultValue(); + const defaultOpts: PrioritizePeersOpts = { + targetPeers: 1, + maxPeers: 1, + targetSubnetPeers: 1, + status, + starved: false, + starvationPruneRatio: 0.05, + starvationThresholdSlots: SLOTS_PER_EPOCH * 2, + }; const testCases: { id: string; @@ -38,7 +49,7 @@ describe("network / peers / priorization", async () => { connectedPeers: [], activeAttnets: [3], activeSyncnets: [], - opts: {targetPeers: 1, maxPeers: 1, targetSubnetPeers: 1}, + opts: defaultOpts, expectedResult: { peersToDisconnect: new Map(), peersToConnect: 1, @@ -48,10 +59,10 @@ describe("network / peers / priorization", async () => { }, { id: "Don't request a subnet query when enough peers are connected to it", - connectedPeers: [{id: peers[0], direction: null, syncnets: none, attnets: getAttnets([3]), score: 0}], + connectedPeers: [{id: peers[0], direction: null, syncnets: none, attnets: getAttnets([3]), score: 0, status}], activeAttnets: [3], activeSyncnets: [], - opts: {targetPeers: 1, maxPeers: 1, targetSubnetPeers: 1}, + opts: defaultOpts, expectedResult: { peersToDisconnect: new Map(), peersToConnect: 0, @@ -62,14 +73,14 @@ describe("network / peers / priorization", async () => { { id: "Disconnect low score peers without duty", connectedPeers: [ - {id: peers[0], direction: null, syncnets: none, attnets: getAttnets([3]), score: 0}, - {id: peers[1], direction: null, syncnets: none, attnets: getAttnets([5]), score: -5}, - {id: peers[2], direction: null, syncnets: none, attnets: getAttnets([5]), score: -10}, - {id: peers[3], direction: null, syncnets: none, attnets: getAttnets([5, 6, 7]), score: -19}, + {id: peers[0], direction: null, syncnets: none, attnets: getAttnets([3]), score: 0, status}, + {id: peers[1], direction: null, syncnets: none, attnets: getAttnets([5]), score: -5, status}, + {id: peers[2], direction: null, syncnets: none, attnets: getAttnets([5]), score: -10, status}, + {id: peers[3], direction: null, syncnets: none, attnets: getAttnets([5, 6, 7]), score: -19, status}, ], activeAttnets: [3], activeSyncnets: [], - opts: {targetPeers: 1, maxPeers: 1, targetSubnetPeers: 1}, + opts: defaultOpts, expectedResult: { // Peers sorted by score, excluding with future duties peersToDisconnect: new Map([ @@ -84,14 +95,14 @@ describe("network / peers / priorization", async () => { { id: "Disconnect no long-lived-subnet peers without duty", connectedPeers: [ - {id: peers[0], direction: null, syncnets: none, attnets: getAttnets([3]), score: 0}, - {id: peers[1], direction: null, syncnets: none, attnets: none, score: -0.1}, - {id: peers[2], direction: null, syncnets: none, attnets: none, score: -0.2}, - {id: peers[3], direction: null, syncnets: none, attnets: none, score: -0.3}, + {id: peers[0], direction: null, syncnets: none, attnets: getAttnets([3]), score: 0, status}, + {id: peers[1], direction: null, syncnets: none, attnets: none, score: -0.1, status}, + {id: peers[2], direction: null, syncnets: none, attnets: none, score: -0.2, status}, + {id: peers[3], direction: null, syncnets: none, attnets: none, score: -0.3, status}, ], activeAttnets: [3], activeSyncnets: [], - opts: {targetPeers: 1, maxPeers: 1, targetSubnetPeers: 1}, + opts: defaultOpts, expectedResult: { // Peers sorted by score, excluding with future duties peersToDisconnect: new Map([ @@ -106,17 +117,17 @@ describe("network / peers / priorization", async () => { id: "Disconnect no-duty peers that's too grouped in a subnet", connectedPeers: [ // should not drop this peer or duty peers drop below min value - {id: peers[0], direction: null, syncnets: none, attnets: getAttnets([1, 3]), score: 0}, + {id: peers[0], direction: null, syncnets: none, attnets: getAttnets([1, 3]), score: 0, status}, // below peers are too grouped into subnet 1 - {id: peers[1], direction: null, syncnets: none, attnets: getAttnets([1, 4, 6]), score: 0}, - {id: peers[2], direction: null, syncnets: none, attnets: getAttnets([1, 4]), score: 0}, - {id: peers[3], direction: null, syncnets: none, attnets: getAttnets([1]), score: 0}, + {id: peers[1], direction: null, syncnets: none, attnets: getAttnets([1, 4, 6]), score: 0, status}, + {id: peers[2], direction: null, syncnets: none, attnets: getAttnets([1, 4]), score: 0, status}, + {id: peers[3], direction: null, syncnets: none, attnets: getAttnets([1]), score: 0, status}, // should not remove this peer due or syncnet peers would drop below min value - {id: peers[4], direction: null, syncnets: getSyncnets([2, 3]), attnets: getAttnets([1]), score: 0}, + {id: peers[4], direction: null, syncnets: getSyncnets([2, 3]), attnets: getAttnets([1]), score: 0, status}, ], activeAttnets: [3], activeSyncnets: [2], - opts: {targetPeers: 2, maxPeers: 2, targetSubnetPeers: 1}, + opts: {...defaultOpts, targetPeers: 2, maxPeers: 2}, expectedResult: { // Peers sorted by long lived subnets peersToDisconnect: new Map([ @@ -131,20 +142,20 @@ describe("network / peers / priorization", async () => { id: "Disconnect no-duty peers that's too grouped in a subnet - ignore maxPeersSubnet", connectedPeers: [ // should not drop this peer or duty peers drop below min value - {id: peers[0], direction: null, syncnets: none, attnets: getAttnets([1, 3]), score: 0}, + {id: peers[0], direction: null, syncnets: none, attnets: getAttnets([1, 3]), score: 0, status}, // below peers are too grouped into subnet 1 // but cannot remove them due to syncnet requirement - {id: peers[1], direction: null, syncnets: getSyncnets([2]), attnets: getAttnets([1, 4, 6]), score: 0}, - {id: peers[2], direction: null, syncnets: getSyncnets([2]), attnets: getAttnets([1, 4]), score: 0}, + {id: peers[1], direction: null, syncnets: getSyncnets([2]), attnets: getAttnets([1, 4, 6]), score: 0, status}, + {id: peers[2], direction: null, syncnets: getSyncnets([2]), attnets: getAttnets([1, 4]), score: 0, status}, // biggest maxPeerSubnet is 1 (3 peers) but cannot remove all of them // 2nd biggest maxPeerSubnet is 7, should remove peers from that subnet - {id: peers[3], direction: null, syncnets: none, attnets: getAttnets([7]), score: 0}, + {id: peers[3], direction: null, syncnets: none, attnets: getAttnets([7]), score: 0, status}, // peer 4 has more long lived subnets than peer 3, should not remove it - {id: peers[4], direction: null, syncnets: none, attnets: getAttnets([7, 8]), score: 0}, + {id: peers[4], direction: null, syncnets: none, attnets: getAttnets([7, 8]), score: 0, status}, ], activeAttnets: [3], activeSyncnets: [2], - opts: {targetPeers: 4, maxPeers: 4, targetSubnetPeers: 1}, + opts: {...defaultOpts, targetPeers: 4, maxPeers: 4}, expectedResult: { // Peers sorted by long lived subnets peersToDisconnect: new Map([ @@ -158,15 +169,15 @@ describe("network / peers / priorization", async () => { { id: "Ensure to prune to target peers", connectedPeers: [ - {id: peers[0], direction: null, syncnets: none, attnets: getAttnets([1, 2, 3]), score: 0}, - {id: peers[1], direction: null, syncnets: none, attnets: getAttnets([1, 2]), score: -1.9}, - {id: peers[2], direction: null, syncnets: none, attnets: getAttnets([3, 4]), score: -1.8}, - {id: peers[3], direction: null, syncnets: none, attnets: getAttnets([4]), score: -1}, - {id: peers[4], direction: null, syncnets: none, attnets: getAttnets([5]), score: -1.5}, + {id: peers[0], direction: null, syncnets: none, attnets: getAttnets([1, 2, 3]), score: 0, status}, + {id: peers[1], direction: null, syncnets: none, attnets: getAttnets([1, 2]), score: -1.9, status}, + {id: peers[2], direction: null, syncnets: none, attnets: getAttnets([3, 4]), score: -1.8, status}, + {id: peers[3], direction: null, syncnets: none, attnets: getAttnets([4]), score: -1, status}, + {id: peers[4], direction: null, syncnets: none, attnets: getAttnets([5]), score: -1.5, status}, ], activeAttnets: [1, 2, 3], activeSyncnets: [], - opts: {targetPeers: 1, maxPeers: 1, targetSubnetPeers: 2}, + opts: {...defaultOpts, targetSubnetPeers: 2}, expectedResult: { peersToDisconnect: new Map([ // the order is based on sortPeers() logic @@ -183,18 +194,18 @@ describe("network / peers / priorization", async () => { // Peers with a least one attnet, distributed such that 1 peer / subnet. // Target to disconnect 4 of them, while keeping 25% outbound = 2. // So should disconnect 4 peers with worse score while keeping 2 outbound with best score. - {id: peers[0], direction: "inbound", syncnets: none, attnets: getAttnets([0]), score: 0}, - {id: peers[1], direction: "inbound", syncnets: none, attnets: getAttnets([1]), score: -10}, - {id: peers[2], direction: "inbound", syncnets: none, attnets: getAttnets([2]), score: -20}, - {id: peers[3], direction: "inbound", syncnets: none, attnets: getAttnets([3]), score: -30}, - {id: peers[4], direction: "outbound", syncnets: none, attnets: getAttnets([4]), score: -40}, - {id: peers[5], direction: "outbound", syncnets: none, attnets: getAttnets([5]), score: -50}, - {id: peers[6], direction: "outbound", syncnets: none, attnets: getAttnets([6]), score: -60}, - {id: peers[7], direction: "outbound", syncnets: none, attnets: getAttnets([7]), score: -70}, + {id: peers[0], direction: "inbound", syncnets: none, attnets: getAttnets([0]), score: 0, status}, + {id: peers[1], direction: "inbound", syncnets: none, attnets: getAttnets([1]), score: -10, status}, + {id: peers[2], direction: "inbound", syncnets: none, attnets: getAttnets([2]), score: -20, status}, + {id: peers[3], direction: "inbound", syncnets: none, attnets: getAttnets([3]), score: -30, status}, + {id: peers[4], direction: "outbound", syncnets: none, attnets: getAttnets([4]), score: -40, status}, + {id: peers[5], direction: "outbound", syncnets: none, attnets: getAttnets([5]), score: -50, status}, + {id: peers[6], direction: "outbound", syncnets: none, attnets: getAttnets([6]), score: -60, status}, + {id: peers[7], direction: "outbound", syncnets: none, attnets: getAttnets([7]), score: -70, status}, ], activeAttnets: [], activeSyncnets: [], - opts: {targetPeers: 4, maxPeers: 4, targetSubnetPeers: 1, outboundPeersRatio: 2 / 8}, + opts: {...defaultOpts, targetPeers: 4, maxPeers: 4, outboundPeersRatio: 2 / 8}, expectedResult: { // Peers sorted by score, excluding with future duties peersToDisconnect: new Map([ @@ -209,18 +220,18 @@ describe("network / peers / priorization", async () => { { id: "Complete example: Disconnect peers and request a subnet query", connectedPeers: [ - {id: peers[0], direction: null, syncnets: none, attnets: getAttnets([0, 1, 2]), score: 0}, - {id: peers[1], direction: null, syncnets: none, attnets: getAttnets([0, 1, 2]), score: -10}, - {id: peers[2], direction: null, syncnets: none, attnets: getAttnets([0, 1]), score: 0}, - {id: peers[3], direction: null, syncnets: none, attnets: getAttnets([0]), score: -10}, - {id: peers[4], direction: null, syncnets: none, attnets: getAttnets([2]), score: 0}, - {id: peers[5], direction: null, syncnets: none, attnets: getAttnets([0, 2]), score: -20}, - {id: peers[6], direction: null, syncnets: none, attnets: getAttnets([1, 2, 3]), score: 0}, - {id: peers[7], direction: null, syncnets: none, attnets: getAttnets([1, 2]), score: -10}, + {id: peers[0], direction: null, syncnets: none, attnets: getAttnets([0, 1, 2]), score: 0, status}, + {id: peers[1], direction: null, syncnets: none, attnets: getAttnets([0, 1, 2]), score: -10, status}, + {id: peers[2], direction: null, syncnets: none, attnets: getAttnets([0, 1]), score: 0, status}, + {id: peers[3], direction: null, syncnets: none, attnets: getAttnets([0]), score: -10, status}, + {id: peers[4], direction: null, syncnets: none, attnets: getAttnets([2]), score: 0, status}, + {id: peers[5], direction: null, syncnets: none, attnets: getAttnets([0, 2]), score: -20, status}, + {id: peers[6], direction: null, syncnets: none, attnets: getAttnets([1, 2, 3]), score: 0, status}, + {id: peers[7], direction: null, syncnets: none, attnets: getAttnets([1, 2]), score: -10, status}, ], activeAttnets: [1, 3], activeSyncnets: [], - opts: {targetPeers: 6, maxPeers: 6, targetSubnetPeers: 2}, + opts: {...defaultOpts, targetPeers: 6, maxPeers: 6, targetSubnetPeers: 2}, expectedResult: { // Peers sorted by score, excluding with future duties peersToDisconnect: new Map([ @@ -232,6 +243,41 @@ describe("network / peers / priorization", async () => { syncnetQueries: [], }, }, + { + id: "Disconnect close to us peers before far ahead peers when starved", + connectedPeers: [ + // CLOSE_TO_US peers + {id: peers[0], direction: null, syncnets: none, attnets: none, score: 0, status: null}, + {id: peers[1], direction: null, syncnets: none, attnets: none, score: -1, status: null}, + // FAR_AHEAD peer + { + id: peers[2], + direction: null, + syncnets: none, + attnets: none, + score: -2, + status: {...status, headSlot: defaultOpts.starvationThresholdSlots + 1}, + }, + ], + activeAttnets: [], + activeSyncnets: [], + opts: { + ...defaultOpts, + targetPeers: 2, + starved: true, + // prune one more peer due to being starved + starvationPruneRatio: 0.5, + }, + expectedResult: { + peersToConnect: 0, + attnetQueries: [], + syncnetQueries: [], + peersToDisconnect: new Map([ + // only the two CLOSE_TO_US peers are disconnected; keep FAR_AHEAD peer + [ExcessPeerDisconnectReason.NO_LONG_LIVED_SUBNET, [peers[1], peers[0]]], + ]), + }, + }, // TODO: Add a test case with syncnets priorization ]; @@ -277,10 +323,10 @@ describe("sortPeersToPrune", async () => { it("should sort peers by dutied subnets then long lived subnets then score", () => { const connectedPeers = [ - {id: peers[3], direction: null, syncnets: none, attnets: getAttnets([0, 4]), score: -1}, - {id: peers[2], direction: null, syncnets: none, attnets: getAttnets([2, 3, 5]), score: 0}, - {id: peers[1], direction: null, syncnets: none, attnets: getAttnets([3, 5]), score: -1}, - {id: peers[0], direction: null, syncnets: none, attnets: getAttnets([6, 7]), score: -1.9}, + {id: peers[3], direction: null, syncnets: none, attnets: getAttnets([0, 4]), score: -1, statusScore: -1}, + {id: peers[2], direction: null, syncnets: none, attnets: getAttnets([2, 3, 5]), score: 0, statusScore: -1}, + {id: peers[1], direction: null, syncnets: none, attnets: getAttnets([3, 5]), score: -1, statusScore: -1}, + {id: peers[0], direction: null, syncnets: none, attnets: getAttnets([6, 7]), score: -1.9, statusScore: -1}, ].map((p) => ({ ...p, attnetsTrueBitIndices: p.attnets?.getTrueBitIndexes() ?? [], diff --git a/packages/beacon-node/test/utils/node/p2p.ts b/packages/beacon-node/test/utils/node/p2p.ts index 7f534d3ab4ae..1a6e1d987b29 100644 --- a/packages/beacon-node/test/utils/node/p2p.ts +++ b/packages/beacon-node/test/utils/node/p2p.ts @@ -13,5 +13,11 @@ export function lodestarNodePeer( enr: "", lastSeenP2pAddress: "", agentVersion: "", + status: null, + metadata: null, + agentClient: "", + lastReceivedMsgUnixTsMs: 0, + lastStatusUnixTsMs: 0, + connectedUnixTsMs: 0, }; }