@@ -7,7 +7,7 @@ use rc::util;
7
7
8
8
use std:: io;
9
9
use std:: io:: { Read , Write } ;
10
- use std:: net :: ToSocketAddrs ;
10
+ use std:: collections :: HashMap ;
11
11
use std:: sync:: mpsc;
12
12
use std:: { env, net, thread, time} ;
13
13
@@ -18,12 +18,19 @@ const GETADDR_CMD: [u8; 12] =
18
18
[ 0x67 , 0x65 , 0x74 , 0x61 , 0x64 , 0x64 , 0x72 , 0 , 0 , 0 , 0 , 0 ] ;
19
19
const PING_CMD : [ u8 ; 12 ] = [ 0x70 , 0x69 , 0x6e , 0x67 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 ] ;
20
20
21
+ struct Stream {
22
+ stream : net:: TcpStream ,
23
+ ts : u64 ,
24
+ is_outbound : bool ,
25
+ }
26
+
21
27
struct NetworkState {
22
28
port : String ,
23
29
listener : net:: TcpListener ,
24
30
blocks : Vec < Block > ,
25
31
step : usize ,
26
- active_nodes : Vec < net:: TcpStream > ,
32
+ streams : Vec < Stream > ,
33
+ outbound_addrs : HashMap < net:: SocketAddr , bool > ,
27
34
}
28
35
29
36
impl NetworkState {
@@ -41,9 +48,10 @@ impl NetworkState {
41
48
listener : net:: TcpListener :: bind ( format ! ( "0.0.0.0:{}" , & port) )
42
49
. unwrap ( ) ,
43
50
port : port,
44
- active_nodes : Vec :: new ( ) ,
51
+ streams : Vec :: new ( ) ,
45
52
blocks : fetch_blocks ( ) ,
46
53
step : 1 ,
54
+ outbound_addrs : HashMap :: new ( ) ,
47
55
} ;
48
56
ns. listener
49
57
. set_nonblocking ( true )
@@ -58,33 +66,58 @@ impl NetworkState {
58
66
59
67
if ns. port != "8333" . to_string ( ) {
60
68
ns. add_stream_from_addr ( & known_node) ;
61
- if ns. active_nodes . len ( ) > 0 {
69
+ if ns. streams . len ( ) > 0 {
70
+ let addr = Addr {
71
+ address : ns. listener . local_addr ( ) . unwrap ( ) ,
72
+ ts : current_epoch ( ) ,
73
+ } ;
74
+ let addr = Addr :: msg_from_addrs ( & vec ! [ addr] ) ;
62
75
let getaddr = Message :: from_command ( GETADDR_CMD ) ;
63
- ns. active_nodes [ 0 ] . write ( & getaddr. serialize ( ) ) . unwrap ( ) ;
76
+ ns. streams [ 0 ] . stream . write ( & addr. serialize ( ) ) . unwrap ( ) ;
77
+ ns. streams [ 0 ] . stream . write ( & getaddr. serialize ( ) ) . unwrap ( ) ;
64
78
}
65
79
}
66
80
ns
67
81
}
68
82
69
83
fn add_stream_from_addr ( & mut self , addr : & net:: SocketAddr ) {
70
- match net:: TcpStream :: connect ( addr) {
71
- Ok ( stream) => {
72
- self . add_stream ( stream) ;
73
- }
74
- Err ( err) => {
75
- println ! (
76
- "{}: Failed to connect to addr {:?} with err {}" ,
77
- self . port, addr, err
78
- ) ;
84
+ match self . outbound_addrs . get ( addr) {
85
+ Some ( _) => { } ,
86
+ None => {
87
+ self . outbound_addrs . insert ( * addr, true ) ;
88
+ match net:: TcpStream :: connect ( addr) {
89
+ Ok ( stream) => {
90
+ self . add_stream ( stream, true ) ;
91
+ }
92
+ Err ( err) => {
93
+ println ! (
94
+ "{}: Failed to connect to addr {:?} with err {}" ,
95
+ self . port, addr, err
96
+ ) ;
97
+ }
98
+ }
79
99
}
80
100
}
81
101
}
82
102
83
- fn add_stream ( & mut self , stream : net:: TcpStream ) {
103
+ fn add_stream ( & mut self , stream : net:: TcpStream , is_outbound : bool ) {
84
104
stream
85
105
. set_nonblocking ( true )
86
106
. expect ( "set_nonblocking call failed" ) ;
87
- self . active_nodes . push ( stream) ;
107
+ self . streams . push ( Stream {
108
+ stream : stream,
109
+ is_outbound : is_outbound,
110
+ ts : current_epoch ( ) ,
111
+ } ) ;
112
+ }
113
+
114
+ fn send_to_all_outbound_stream ( & self , buf : & [ u8 ] ) {
115
+ for mut stream in & self . streams {
116
+ if stream. is_outbound {
117
+ let mut stream = & stream. stream ;
118
+ stream. write ( buf) . unwrap ( ) ;
119
+ }
120
+ }
88
121
}
89
122
90
123
}
@@ -171,13 +204,13 @@ pub fn start_node() {
171
204
172
205
loop {
173
206
let hundo_millis = time:: Duration :: from_millis ( 1000 ) ;
174
- println ! ( "{}: looped {}" , ns. port, ns. active_nodes . len( ) ) ;
207
+ println ! ( "{}: looped {}" , ns. port, ns. streams . len( ) ) ;
175
208
thread:: sleep ( hundo_millis) ;
176
209
177
210
match ns. listener . accept ( ) {
178
211
Ok ( ( socket, addr) ) => {
179
- println ! ( "{}: {} {}" , ns. port, "Connected to new node" , addr) ;
180
- ns. add_stream ( socket) ;
212
+ println ! ( "{}: {} {}" , ns. port, "New inbound node connected " , addr) ;
213
+ ns. add_stream ( socket, false ) ;
181
214
}
182
215
Err ( err) => match err. kind ( ) {
183
216
io:: ErrorKind :: WouldBlock => { }
@@ -188,31 +221,32 @@ pub fn start_node() {
188
221
}
189
222
190
223
let mut addr_to_add: Vec < net:: SocketAddr > = Vec :: new ( ) ;
191
- let mut step = ns. step ;
192
- for mut node in & ns. active_nodes {
193
- println ! ( "{}: {}, {}" , ns. port, node. local_addr( ) . unwrap( ) , node. peer_addr( ) . unwrap( ) ) ;
194
- let mut buf = [ 0 ; 2_000_000 ] ;
195
- match node. read ( & mut buf) {
224
+ let step = ns. step ;
225
+ for mut stream in & ns. streams {
226
+ let mut stream = & stream. stream ;
227
+
228
+ let mut buf = [ 0 ; 2_000_000 ] ;
229
+ match stream. read ( & mut buf) {
196
230
Ok ( _amt) => {
197
231
let mut message = Message :: deserialize ( & mut buf. to_vec ( ) ) ;
198
232
let command = String :: from_utf8_lossy ( & message. command ) ;
199
233
println ! (
200
234
"{}: Command \" {}\" from {}" ,
201
235
ns. port,
202
236
command,
203
- node . peer_addr( ) . unwrap( )
237
+ stream . peer_addr( ) . unwrap( )
204
238
) ;
205
239
let pong = Message {
206
240
payload : Vec :: new ( ) ,
207
241
command : [ 0x70 , 0x6f , 0x6e , 0x67 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 ] ,
208
242
} ;
209
243
match command. as_ref ( ) {
210
244
"ping\u{0} \u{0} \u{0} \u{0} \u{0} \u{0} \u{0} \u{0} " => {
211
- node . write ( & pong. serialize ( ) ) . unwrap ( ) ;
245
+ stream . write ( & pong. serialize ( ) ) . unwrap ( ) ;
212
246
}
213
247
"getaddr\u{0} \u{0} \u{0} \u{0} \u{0} " => {
214
- let addr = Addr :: serialize ( & ns. active_nodes ) ;
215
- node . write ( & addr. serialize ( ) ) . unwrap ( ) ;
248
+ let addr = Addr :: msg_from_streams ( & ns. streams ) ;
249
+ stream . write ( & addr. serialize ( ) ) . unwrap ( ) ;
216
250
}
217
251
"addr\u{0} \u{0} \u{0} \u{0} \u{0} \u{0} \u{0} \u{0} " => {
218
252
let addresses = Addr :: deserialize ( & mut message. payload ) ;
@@ -226,9 +260,6 @@ pub fn start_node() {
226
260
ns. port, address. address
227
261
) ;
228
262
addr_to_add. push ( address. address ) ;
229
- if ns. active_nodes . len ( ) >= 3 {
230
- step = 2usize ;
231
- }
232
263
}
233
264
}
234
265
}
@@ -249,7 +280,7 @@ pub fn start_node() {
249
280
} )
250
281
}
251
282
if inv. inv_vectors . len ( ) > 0 {
252
- node . write ( & inv. serialize ( ) ) . unwrap ( ) ;
283
+ stream . write ( & inv. serialize ( ) ) . unwrap ( ) ;
253
284
}
254
285
// referse for
255
286
}
@@ -274,7 +305,7 @@ pub fn start_node() {
274
305
275
306
ns. step = step;
276
307
for addr in & addr_to_add {
277
- ns. add_stream_from_addr ( addr)
308
+ ns. add_stream_from_addr ( & addr)
278
309
} ;
279
310
280
311
// check if there's a new mined block
@@ -289,10 +320,8 @@ pub fn start_node() {
289
320
hash : block. hash ( ) ,
290
321
} ) ;
291
322
ns. blocks . push ( block) ;
292
- for mut node in & ns. active_nodes {
293
- println ! ( "{}: Sent new block to {}" , ns. port, & node. peer_addr( ) . unwrap( ) ) ;
294
- node. write ( & inv. serialize ( ) ) . unwrap ( ) ;
295
- }
323
+ println ! ( "{}: Sent new blocks inv" , ns. port) ;
324
+ ns. send_to_all_outbound_stream ( & inv. serialize ( ) ) ;
296
325
}
297
326
Err ( err) => {
298
327
match err {
@@ -307,23 +336,17 @@ pub fn start_node() {
307
336
if ns. step == 2usize && last_sent_getblocks. elapsed ( ) . as_secs ( ) > 10 {
308
337
let last_hash = ns. blocks [ ns. blocks . len ( ) - 1 ] . hash ( ) ;
309
338
getblocks. payload = last_hash. to_vec ( ) ;
310
- for mut node in & ns. active_nodes {
311
- println ! ( "{}: Sent getblocks to {}" , ns. port, & node. peer_addr( ) . unwrap( ) ) ;
312
- node. write ( & getblocks. serialize ( ) ) . unwrap ( ) ;
313
- }
339
+ println ! ( "{}: Sent getblocks" , ns. port) ;
340
+ ns. send_to_all_outbound_stream ( & getblocks. serialize ( ) ) ;
314
341
last_sent_getblocks = time:: Instant :: now ( ) ;
315
342
}
316
343
if last_sent_pings. elapsed ( ) . as_secs ( ) > 10 {
317
344
if ns. step == 1usize {
318
- for mut node in & ns. active_nodes {
319
- node. write ( & getaddr. serialize ( ) ) . unwrap ( ) ;
320
- println ! ( "{}: Sent getaddr to {}" , ns. port, & node. peer_addr( ) . unwrap( ) ) ;
321
- }
322
- }
323
- for mut node in & ns. active_nodes {
324
- println ! ( "{}: Sent ping to {}" , ns. port, & node. peer_addr( ) . unwrap( ) ) ;
325
- node. write ( & ping. serialize ( ) ) . unwrap ( ) ;
345
+ println ! ( "{}: Sent getaddr" , ns. port) ;
346
+ ns. send_to_all_outbound_stream ( & getaddr. serialize ( ) ) ;
326
347
}
348
+ println ! ( "{}: Sent ping" , ns. port) ;
349
+ ns. send_to_all_outbound_stream ( & ping. serialize ( ) ) ;
327
350
last_sent_pings = time:: Instant :: now ( ) ;
328
351
}
329
352
}
@@ -345,27 +368,37 @@ struct Addr {
345
368
}
346
369
347
370
impl Addr {
348
- fn serialize ( active_nodes : & Vec < net :: TcpStream > ) -> Message {
371
+ fn msg_from_addrs ( addrs : & Vec < Addr > ) -> Message {
349
372
let addr_ascii = [ 0x61 , 0x64 , 0x64 , 0x72 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 ] ;
350
- let addrs : Vec < u8 > = Vec :: new ( ) ;
351
- let mut addr = Message {
373
+ let payload : Vec < u8 > = Vec :: new ( ) ;
374
+ let mut msg = Message {
352
375
command : addr_ascii,
353
- payload : addrs ,
376
+ payload : payload ,
354
377
} ;
355
- ( active_nodes. len ( ) as u16 ) . serialize ( & mut addr. payload ) ;
356
- for stream in active_nodes {
357
- // TODO: actually track timestamps
378
+ ( addrs. len ( ) as u16 ) . serialize ( & mut msg. payload ) ;
379
+ for addr in addrs {
358
380
let ts = current_epoch ( ) ;
359
- let peer_addr = stream. peer_addr ( ) . unwrap ( ) ;
360
- let ip_octets = match peer_addr. ip ( ) {
381
+ let ip_octets = match addr. address . ip ( ) {
361
382
net:: IpAddr :: V4 ( ip) => ip. octets ( ) ,
362
383
net:: IpAddr :: V6 ( _) => continue ,
363
384
} ;
364
- addr . payload . extend_from_slice ( & ip_octets) ;
365
- peer_addr . port ( ) . serialize ( & mut addr . payload ) ;
366
- ts. serialize ( & mut addr . payload ) ;
385
+ msg . payload . extend_from_slice ( & ip_octets) ;
386
+ addr . address . port ( ) . serialize ( & mut msg . payload ) ;
387
+ ts. serialize ( & mut msg . payload ) ;
367
388
}
368
- addr
389
+ msg
390
+ }
391
+ fn msg_from_streams ( streams : & Vec < Stream > ) -> Message {
392
+ let mut addrs: Vec < Addr > = Vec :: new ( ) ;
393
+ for stream in streams {
394
+ if stream. is_outbound {
395
+ addrs. push ( Addr {
396
+ address : stream. stream . peer_addr ( ) . unwrap ( ) ,
397
+ ts : stream. ts ,
398
+ } )
399
+ }
400
+ }
401
+ Addr :: msg_from_addrs ( & addrs)
369
402
}
370
403
fn deserialize ( payload : & mut Vec < u8 > ) -> Vec < Addr > {
371
404
let len: u16 = Encodable :: deserialize ( payload) ;
@@ -426,6 +459,7 @@ struct InvVector {
426
459
}
427
460
428
461
impl Message {
462
+
429
463
fn from_command ( command : [ u8 ; 12 ] ) -> Message {
430
464
Message {
431
465
payload : Vec :: new ( ) ,
0 commit comments