@@ -20,9 +20,9 @@ use tokio::{sync::watch, time::sleep};
20
20
use tracing:: { debug, error, info, trace, warn} ;
21
21
use types:: {
22
22
metered_channel:: Sender , Batch , BatchDigest , PrimaryToWorker , ReconfigureNotification ,
23
- RequestBatchRequest , RequestBatchResponse , WorkerBatchMessage , WorkerBatchRequest ,
24
- WorkerBatchResponse , WorkerDeleteBatchesMessage , WorkerOthersBatchMessage ,
25
- WorkerReconfigureMessage , WorkerSynchronizeMessage , WorkerToWorker , WorkerToWorkerClient ,
23
+ RequestBatchRequest , RequestBatchResponse , WorkerBatchMessage , WorkerDeleteBatchesMessage ,
24
+ WorkerOthersBatchMessage , WorkerReconfigureMessage , WorkerSynchronizeMessage , WorkerToWorker ,
25
+ WorkerToWorkerClient ,
26
26
} ;
27
27
28
28
#[ cfg( test) ]
@@ -56,28 +56,11 @@ impl WorkerToWorker for WorkerReceiverHandler {
56
56
. map_err ( |e| anemo:: rpc:: Status :: internal ( e. to_string ( ) ) )
57
57
}
58
58
59
- async fn request_batches (
60
- & self ,
61
- request : anemo:: Request < WorkerBatchRequest > ,
62
- ) -> Result < anemo:: Response < WorkerBatchResponse > , anemo:: rpc:: Status > {
63
- let message = request. into_body ( ) ;
64
- // TODO [issue #7]: Do some accounting to prevent bad actors from monopolizing our resources
65
- // TODO: Add a limit on number of requested batches
66
- let batches: Vec < Batch > = self
67
- . store
68
- . read_all ( message. digests )
69
- . await
70
- . map_err ( |e| anemo:: rpc:: Status :: from_error ( Box :: new ( e) ) ) ?
71
- . into_iter ( )
72
- . flatten ( )
73
- . collect ( ) ;
74
- Ok ( anemo:: Response :: new ( WorkerBatchResponse { batches } ) )
75
- }
76
-
77
59
async fn request_batch (
78
60
& self ,
79
61
request : anemo:: Request < RequestBatchRequest > ,
80
62
) -> Result < anemo:: Response < RequestBatchResponse > , anemo:: rpc:: Status > {
63
+ // TODO [issue #7]: Do some accounting to prevent bad actors from monopolizing our resources
81
64
let batch = request. into_body ( ) . batch ;
82
65
let batch = self
83
66
. store
@@ -100,10 +83,10 @@ pub struct PrimaryReceiverHandler {
100
83
// The worker information cache.
101
84
pub worker_cache : SharedWorkerCache ,
102
85
pub store : Store < BatchDigest , Batch > ,
103
- // Timeout on RequestBatches RPC.
104
- pub request_batches_timeout : Duration ,
86
+ // Timeout on RequestBatch RPC.
87
+ pub request_batch_timeout : Duration ,
105
88
// Number of random nodes to query when retrying batch requests.
106
- pub request_batches_retry_nodes : usize ,
89
+ pub request_batch_retry_nodes : usize ,
107
90
/// Send reconfiguration update to other tasks.
108
91
pub tx_reconfigure : watch:: Sender < ReconfigureNotification > ,
109
92
}
@@ -172,9 +155,11 @@ impl PrimaryToWorker for PrimaryReceiverHandler {
172
155
return Ok ( anemo:: Response :: new ( ( ) ) ) ;
173
156
}
174
157
175
- let batch_request = WorkerBatchRequest {
176
- digests : missing. iter ( ) . cloned ( ) . collect ( ) ,
177
- } ;
158
+ let batch_requests: Vec < _ > = missing
159
+ . iter ( )
160
+ . cloned ( )
161
+ . map ( |batch| RequestBatchRequest { batch } )
162
+ . collect ( ) ;
178
163
let network = request
179
164
. extensions ( )
180
165
. get :: < anemo:: NetworkRef > ( )
@@ -184,16 +169,15 @@ impl PrimaryToWorker for PrimaryReceiverHandler {
184
169
} ) ?;
185
170
186
171
let mut handles = FuturesUnordered :: new ( ) ;
187
- let request_batches_fn = |mut client : WorkerToWorkerClient < anemo:: Peer > ,
188
- batch_request,
189
- timeout| {
190
- // Wrapper function enables us to move `client` into the future.
191
- async move {
192
- client
193
- . request_batches ( anemo:: Request :: new ( batch_request) . with_timeout ( timeout) )
194
- . await
195
- }
196
- } ;
172
+ let request_batch_fn =
173
+ |mut client : WorkerToWorkerClient < anemo:: Peer > , batch_request, timeout| {
174
+ // Wrapper function enables us to move `client` into the future.
175
+ async move {
176
+ client
177
+ . request_batch ( anemo:: Request :: new ( batch_request) . with_timeout ( timeout) )
178
+ . await
179
+ }
180
+ } ;
197
181
if first_attempt {
198
182
// Send first sync request to a single node.
199
183
let worker_name = match self . worker_cache . load ( ) . worker ( & message. target , & self . id ) {
@@ -207,14 +191,16 @@ impl PrimaryToWorker for PrimaryReceiverHandler {
207
191
let peer_id = anemo:: PeerId ( worker_name. 0 . to_bytes ( ) ) ;
208
192
if let Some ( peer) = network. peer ( peer_id) {
209
193
debug ! (
210
- "Sending WorkerBatchRequest message to {worker_name} for missing batches {:?}" ,
211
- batch_request . digests
194
+ "Sending BatchRequests to {worker_name}: {:?}" ,
195
+ batch_requests
212
196
) ;
213
- handles. push ( request_batches_fn (
214
- WorkerToWorkerClient :: new ( peer) ,
215
- batch_request,
216
- self . request_batches_timeout ,
217
- ) ) ;
197
+ handles. extend ( batch_requests. into_iter ( ) . map ( |request| {
198
+ request_batch_fn (
199
+ WorkerToWorkerClient :: new ( peer. clone ( ) ) ,
200
+ request,
201
+ self . request_batch_timeout ,
202
+ )
203
+ } ) ) ;
218
204
} else {
219
205
warn ! ( "Unable to reach primary peer {worker_name} on the network" ) ;
220
206
}
@@ -229,19 +215,22 @@ impl PrimaryToWorker for PrimaryReceiverHandler {
229
215
. collect ( ) ;
230
216
handles. extend (
231
217
names
232
- . choose_multiple ( & mut rand:: thread_rng ( ) , self . request_batches_retry_nodes )
218
+ . choose_multiple ( & mut rand:: thread_rng ( ) , self . request_batch_retry_nodes )
233
219
. filter_map ( |name| network. peer ( anemo:: PeerId ( name. 0 . to_bytes ( ) ) ) )
234
- . map ( |peer| {
235
- request_batches_fn (
236
- WorkerToWorkerClient :: new ( peer) ,
237
- batch_request. clone ( ) ,
238
- self . request_batches_timeout ,
239
- )
220
+ . flat_map ( |peer| {
221
+ batch_requests. iter ( ) . cloned ( ) . map ( move |request| {
222
+ let peer = peer. clone ( ) ;
223
+ request_batch_fn (
224
+ WorkerToWorkerClient :: new ( peer) ,
225
+ request,
226
+ self . request_batch_timeout ,
227
+ )
228
+ } )
240
229
} ) ,
241
230
) ;
242
231
debug ! (
243
- "Sending WorkerBatchRequest retries to workers {names:?} for missing batches {:?}" ,
244
- batch_request . digests
232
+ "Sending BatchRequest retries to workers {names:?}: {:?}" ,
233
+ batch_requests
245
234
) ;
246
235
}
247
236
@@ -250,7 +239,7 @@ impl PrimaryToWorker for PrimaryReceiverHandler {
250
239
while let Some ( result) = handles. next ( ) . await {
251
240
match result {
252
241
Ok ( response) => {
253
- for batch in response. into_body ( ) . batches {
242
+ if let Some ( batch) = response. into_body ( ) . batch {
254
243
let digest = batch. digest ( ) ;
255
244
if missing. remove ( & digest) {
256
245
self . store . write ( digest, batch) . await ;
@@ -261,9 +250,10 @@ impl PrimaryToWorker for PrimaryReceiverHandler {
261
250
}
262
251
}
263
252
Err ( e) => {
264
- // TODO: add info on target peer when anemo supports retrieving it from
265
- // anemo::rpc::Status.
266
- info ! ( "WorkerBatchRequest failed: {e:?}" ) ;
253
+ info ! (
254
+ "RequestBatchRequest to worker {:?} failed: {e:?}" ,
255
+ e. peer_id( )
256
+ )
267
257
}
268
258
}
269
259
}
0 commit comments