15
15
16
16
import static tech .pegasys .teku .networking .eth2 .rpc .core .RpcResponseStatus .INVALID_REQUEST_CODE ;
17
17
18
+ import com .google .common .base .Throwables ;
19
+ import java .nio .channels .ClosedChannelException ;
18
20
import java .util .List ;
19
21
import java .util .Optional ;
20
- import java .util .stream . Collectors ;
22
+ import java .util .concurrent . atomic . AtomicInteger ;
21
23
import org .apache .logging .log4j .LogManager ;
22
24
import org .apache .logging .log4j .Logger ;
23
25
import org .hyperledger .besu .plugin .services .MetricsSystem ;
24
26
import org .hyperledger .besu .plugin .services .metrics .Counter ;
25
27
import org .hyperledger .besu .plugin .services .metrics .LabelledMetric ;
28
+ import tech .pegasys .teku .infrastructure .async .SafeFuture ;
26
29
import tech .pegasys .teku .infrastructure .metrics .TekuMetricCategory ;
27
- import tech .pegasys .teku .infrastructure .ssz .primitive .SszBit ;
28
30
import tech .pegasys .teku .networking .eth2 .peers .Eth2Peer ;
29
31
import tech .pegasys .teku .networking .eth2 .peers .RequestApproval ;
30
32
import tech .pegasys .teku .networking .eth2 .rpc .core .PeerRequiredLocalMessageHandler ;
31
33
import tech .pegasys .teku .networking .eth2 .rpc .core .ResponseCallback ;
32
34
import tech .pegasys .teku .networking .eth2 .rpc .core .RpcException ;
35
+ import tech .pegasys .teku .networking .p2p .rpc .StreamClosedException ;
33
36
import tech .pegasys .teku .spec .Spec ;
34
37
import tech .pegasys .teku .spec .config .SpecConfigEip7805 ;
35
38
import tech .pegasys .teku .spec .datastructures .networking .libp2p .rpc .InclusionListByCommitteeRequestMessage ;
@@ -72,9 +75,10 @@ public Optional<RpcException> validateRequest(
72
75
final SpecConfigEip7805 specConfig =
73
76
SpecConfigEip7805 .required (spec .atSlot (request .getSlot ()).getConfig ());
74
77
78
+ final int requestedCount = request .getCommitteeIndices ().getAllSetBits ().size ();
75
79
final int maxRequestInclusionList = specConfig .getMaxRequestInclusionList ();
76
80
77
- if (request . size () > maxRequestInclusionList ) {
81
+ if (requestedCount > maxRequestInclusionList ) {
78
82
requestCounter .labels ("count_too_big" ).inc ();
79
83
return Optional .of (
80
84
new RpcException (
@@ -86,8 +90,6 @@ public Optional<RpcException> validateRequest(
86
90
return Optional .empty ();
87
91
}
88
92
89
- // TODO EIP7805 review logic, add counter
90
- @ SuppressWarnings ("FutureReturnValueIgnored" )
91
93
@ Override
92
94
protected void onIncomingMessage (
93
95
final String protocolId ,
@@ -96,28 +98,64 @@ protected void onIncomingMessage(
96
98
final ResponseCallback <SignedInclusionList > callback ) {
97
99
98
100
LOG .trace (
99
- "Peer {} requested Inclusion Lists for slot {} with committee indices {}" ,
101
+ "Peer {} requested Inclusion Lists for slot {} for committee indices {}" ,
100
102
peer .getId (),
101
103
message .getSlot (),
102
- message .getCommitteeIndices ().stream ()
103
- .map (SszBit ::toString )
104
- .collect (Collectors .joining ("," )));
104
+ message .getCommitteeIndices ().getAllSetBits ().intStream ());
105
105
106
+ final int requestedCount = message .getCommitteeIndices ().getAllSetBits ().size ();
106
107
final Optional <RequestApproval > inclusionListsRequestApproval =
107
- peer .approveInclusionListsRequest (callback , message . size () );
108
+ peer .approveInclusionListsRequest (callback , requestedCount );
108
109
109
110
if (!peer .approveRequest () || inclusionListsRequestApproval .isEmpty ()) {
110
111
requestCounter .labels ("rate_limited" ).inc ();
111
112
return ;
112
113
}
113
114
114
115
requestCounter .labels ("ok" ).inc ();
115
- totalInclusionListsRequestedCounter .inc (message . size () );
116
+ totalInclusionListsRequestedCounter .inc (requestedCount );
116
117
117
- // TODO EIP7805 review logic / handle errors
118
+ final AtomicInteger sentInclusionLists = new AtomicInteger ( 0 );
118
119
final List <SignedInclusionList > signedInclusionLists =
119
120
inclusionListManager .getInclusionLists (message .getSlot (), message .getCommitteeIndices ());
120
- signedInclusionLists .forEach (callback ::respond );
121
- callback .completeSuccessfully ();
121
+ SafeFuture <Void > future = SafeFuture .COMPLETE ;
122
+ for (SignedInclusionList signedInclusionList : signedInclusionLists ) {
123
+ future =
124
+ future .thenCompose (
125
+ __ ->
126
+ callback
127
+ .respond (signedInclusionList )
128
+ .thenRun (sentInclusionLists ::incrementAndGet ));
129
+ }
130
+ future .finish (
131
+ () -> {
132
+ if (sentInclusionLists .get () != requestedCount ) {
133
+ peer .adjustInclusionListsRequest (
134
+ inclusionListsRequestApproval .get (), sentInclusionLists .get ());
135
+ }
136
+ callback .completeSuccessfully ();
137
+ },
138
+ err -> {
139
+ peer .adjustInclusionListsRequest (inclusionListsRequestApproval .get (), 0 );
140
+ handleError (callback , err );
141
+ });
142
+ }
143
+
144
+ private void handleError (
145
+ final ResponseCallback <SignedInclusionList > callback , final Throwable error ) {
146
+ final Throwable rootCause = Throwables .getRootCause (error );
147
+ if (rootCause instanceof RpcException ) {
148
+ LOG .trace (
149
+ "Rejecting inclusion lists by committee indices request" , error ); // Keep full context
150
+ callback .completeWithErrorResponse ((RpcException ) rootCause );
151
+ } else {
152
+ if (rootCause instanceof StreamClosedException
153
+ || rootCause instanceof ClosedChannelException ) {
154
+ LOG .trace ("Stream closed while sending requested inclusion lists" , error );
155
+ } else {
156
+ LOG .error ("Failed to process inclusion lists by committee indices request" , error );
157
+ }
158
+ callback .completeWithUnexpectedError (error );
159
+ }
122
160
}
123
161
}
0 commit comments