Skip to content

Commit 6a3ae02

Browse files
authored
fix: add time out to incoming KAD streams (#3167)
Close the stream if the RPC invocation has not completed after a configurable window.
1 parent 57dbdaa commit 6a3ae02

File tree

4 files changed

+52
-30
lines changed

4 files changed

+52
-30
lines changed

packages/kad-dht/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@
6161
"it-all": "^3.0.8",
6262
"it-drain": "^3.0.9",
6363
"it-length": "^3.0.8",
64-
"it-length-prefixed": "^10.0.1",
6564
"it-map": "^3.1.3",
6665
"it-merge": "^3.0.11",
6766
"it-parallel": "^3.0.11",
@@ -95,6 +94,7 @@
9594
"execa": "^9.5.3",
9695
"it-filter": "^3.1.3",
9796
"it-last": "^3.0.8",
97+
"it-length-prefixed": "^10.0.1",
9898
"it-pair": "^2.0.6",
9999
"it-stream-types": "^2.0.2",
100100
"lodash.random": "^3.2.0",

packages/kad-dht/src/index.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -693,6 +693,14 @@ export interface KadDHTInit {
693693
* @default 10_000
694694
*/
695695
onPeerConnectTimeout?: number
696+
697+
/**
698+
* When acting as a DHT server, all incoming RPC requests must complete within
699+
* this timeout in ms otherwise the stream will be closed.
700+
*
701+
* @default 10_000
702+
*/
703+
incomingMessageTimeout?: number
696704
}
697705

698706
export interface KadDHTComponents {

packages/kad-dht/src/rpc/index.ts

Lines changed: 42 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
import * as lp from 'it-length-prefixed'
2-
import { pipe } from 'it-pipe'
1+
import { TimeoutError } from '@libp2p/interface'
2+
import { pbStream } from 'it-protobuf-stream'
33
import { Message, MessageType } from '../message/dht.js'
44
import { AddProviderHandler } from './handlers/add-provider.js'
55
import { FindNodeHandler } from './handlers/find-node.js'
@@ -30,6 +30,7 @@ export interface RPCInit {
3030
metricsPrefix: string
3131
datastorePrefix: string
3232
peerInfoMapper: PeerInfoMapper
33+
incomingMessageTimeout?: number
3334
}
3435

3536
export interface RPCComponents extends GetValueHandlerComponents, PutValueHandlerComponents, FindNodeHandlerComponents, GetProvidersHandlerComponents {
@@ -38,21 +39,22 @@ export interface RPCComponents extends GetValueHandlerComponents, PutValueHandle
3839

3940
export class RPC {
4041
private readonly handlers: Record<string, DHTMessageHandler>
41-
private readonly routingTable: RoutingTable
4242
private readonly log: Logger
4343
private readonly metrics: {
4444
operations?: CounterGroup
4545
errors?: CounterGroup
4646
}
4747

48+
private readonly incomingMessageTimeout: number
49+
4850
constructor (components: RPCComponents, init: RPCInit) {
4951
this.metrics = {
5052
operations: components.metrics?.registerCounterGroup(`${init.metricsPrefix}_inbound_rpc_requests_total`),
5153
errors: components.metrics?.registerCounterGroup(`${init.metricsPrefix}_inbound_rpc_errors_total`)
5254
}
5355

5456
this.log = components.logger.forComponent(`${init.logPrefix}:rpc`)
55-
this.routingTable = init.routingTable
57+
this.incomingMessageTimeout = init.incomingMessageTimeout ?? 10_000
5658
this.handlers = {
5759
[MessageType.GET_VALUE.toString()]: new GetValueHandler(components, init),
5860
[MessageType.PUT_VALUE.toString()]: new PutValueHandler(components, init),
@@ -92,34 +94,46 @@ export class RPC {
9294
* Handle incoming streams on the dht protocol
9395
*/
9496
onIncomingStream (data: IncomingStreamData): void {
95-
let message = 'unknown'
97+
const message = 'unknown'
9698

9799
Promise.resolve().then(async () => {
98100
const { stream, connection } = data
99-
const peerId = connection.remotePeer
100-
101-
const self = this
102-
103-
await pipe(
104-
stream,
105-
(source) => lp.decode(source),
106-
async function * (source) {
107-
for await (const msg of source) {
108-
// handle the message
109-
const desMessage = Message.decode(msg)
110-
message = desMessage.type
111-
self.log('incoming %s from %p', desMessage.type, peerId)
112-
const res = await self.handleMessage(peerId, desMessage)
113-
114-
// Not all handlers will return a response
115-
if (res != null) {
116-
yield Message.encode(res)
117-
}
101+
102+
const abortListener = (): void => {
103+
stream.abort(new TimeoutError())
104+
}
105+
106+
let signal = AbortSignal.timeout(this.incomingMessageTimeout)
107+
signal.addEventListener('abort', abortListener)
108+
109+
const messages = pbStream(stream).pb(Message)
110+
111+
try {
112+
while (true) {
113+
const message = await messages.read({
114+
signal
115+
})
116+
117+
// handle the message
118+
this.log('incoming %s from %p', message.type, connection.remotePeer)
119+
const res = await this.handleMessage(connection.remotePeer, message)
120+
121+
// Not all handlers will return a response
122+
if (res != null) {
123+
await messages.write(res, {
124+
signal
125+
})
118126
}
119-
},
120-
(source) => lp.encode(source),
121-
stream
122-
)
127+
128+
// we have received a message so reset the timeout controller to
129+
// allow the remote to send another
130+
signal.removeEventListener('abort', abortListener)
131+
signal = AbortSignal.timeout(this.incomingMessageTimeout)
132+
signal.addEventListener('abort', abortListener)
133+
}
134+
} catch (err: any) {
135+
stream.abort(err)
136+
}
123137
})
124138
.catch(err => {
125139
this.log.error('error handling %s RPC message from %p - %e', message, data.connection.remotePeer, err)

packages/kad-dht/test/libp2p-routing.spec.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ function createPeer (peerId: PeerId, peer: Partial<Peer> = {}): Peer {
7070
id: peerId,
7171
addresses: [{
7272
isCertified: false,
73-
multiaddr: multiaddr(`/ip4/58.42.62.62/tcp/${Math.random() * (maxPort - minPort) + minPort}`)
73+
multiaddr: multiaddr(`/ip4/58.42.62.62/tcp/${Math.round(Math.random() * (maxPort - minPort) + minPort)}`)
7474
}],
7575
tags: new Map(),
7676
metadata: new Map(),

0 commit comments

Comments
 (0)