@@ -5,11 +5,11 @@ import (
5
5
"fmt"
6
6
"sync"
7
7
8
- host "github.com/libp2p/go-libp2p/core/host"
9
8
network "github.com/libp2p/go-libp2p/core/network"
10
9
peer "github.com/libp2p/go-libp2p/core/peer"
11
10
pstore "github.com/libp2p/go-libp2p/core/peerstore"
12
11
tpt "github.com/libp2p/go-libp2p/core/transport"
12
+ "github.com/libp2p/go-libp2p/p2p/net/swarm"
13
13
ma "github.com/multiformats/go-multiaddr"
14
14
mafmt "github.com/multiformats/go-multiaddr-fmt"
15
15
"github.com/pkg/errors"
@@ -50,9 +50,8 @@ type ProximityTransport interface {
50
50
}
51
51
52
52
type proximityTransport struct {
53
- host host. Host
53
+ swarm * swarm. Swarm
54
54
upgrader tpt.Upgrader
55
- rcmgr network.ResourceManager
56
55
57
56
connMap map [string ]* Conn
58
57
connMapMutex sync.RWMutex
@@ -64,7 +63,7 @@ type proximityTransport struct {
64
63
ctx context.Context
65
64
}
66
65
67
- func NewTransport (ctx context.Context , l * zap.Logger , driver ProximityDriver ) func (h host. Host , u tpt.Upgrader , rcmgr network. ResourceManager ) (* proximityTransport , error ) {
66
+ func NewTransport (ctx context.Context , l * zap.Logger , driver ProximityDriver ) func (swarm * swarm. Swarm , u tpt.Upgrader ) (* proximityTransport , error ) {
68
67
if l == nil {
69
68
l = zap .NewNop ()
70
69
}
@@ -76,12 +75,12 @@ func NewTransport(ctx context.Context, l *zap.Logger, driver ProximityDriver) fu
76
75
driver = & NoopProximityDriver {}
77
76
}
78
77
79
- return func (h host.Host , u tpt.Upgrader , rcmgr network.ResourceManager ) (* proximityTransport , error ) {
78
+ l .Debug ("remi: transport.go: new Transport" )
79
+ return func (swarm * swarm.Swarm , u tpt.Upgrader ) (* proximityTransport , error ) {
80
80
l .Debug ("NewTransport called" , zap .String ("driver" , driver .ProtocolName ()))
81
81
transport := & proximityTransport {
82
- host : h ,
82
+ swarm : swarm ,
83
83
upgrader : u ,
84
- rcmgr : rcmgr ,
85
84
connMap : make (map [string ]* Conn ),
86
85
cache : NewRingBufferMap (l , 128 ),
87
86
driver : driver ,
@@ -141,7 +140,7 @@ func (t *proximityTransport) CanDial(remoteMa ma.Multiaddr) bool {
141
140
func (t * proximityTransport ) Listen (localMa ma.Multiaddr ) (tpt.Listener , error ) {
142
141
// localAddr is supposed to be equal to the localPID
143
142
// or to DefaultAddr since multiaddr == /<protocol>/<peerID>
144
- localPID := t .host . ID ().String ()
143
+ localPID := t .swarm . LocalPeer ().String ()
145
144
localAddr , err := localMa .ValueForProtocol (t .driver .ProtocolCode ())
146
145
if err != nil || (localMa .String () != t .driver .DefaultAddr () && localAddr != localPID ) {
147
146
return nil , errors .Wrap (err , "error: proximityTransport.Listen: wrong multiaddr" )
@@ -246,7 +245,7 @@ func (t *proximityTransport) HandleFoundPeer(sRemotePID string) bool {
246
245
t .lock .RUnlock ()
247
246
248
247
// Adds peer to peerstore.
249
- t .host .Peerstore ().AddAddr (remotePID , remoteMa ,
248
+ t .swarm .Peerstore ().AddAddr (remotePID , remoteMa ,
250
249
pstore .TempAddrTTL )
251
250
252
251
// Delete previous cache if it exists
@@ -259,13 +258,13 @@ func (t *proximityTransport) HandleFoundPeer(sRemotePID string) bool {
259
258
// Needed to read and write during the connect handshake.
260
259
go func () {
261
260
// Need to use listener than t.listener here to not have to check valid value of t.listener
262
- err := t .host . Connect (listener .ctx , peer.AddrInfo {
261
+ err := t .connect (listener .ctx , peer.AddrInfo {
263
262
ID : remotePID ,
264
263
Addrs : []ma.Multiaddr {remoteMa },
265
264
})
266
265
if err != nil {
267
266
t .logger .Error ("HandleFoundPeer: async connect error" , zap .Error (err ))
268
- t .host .Peerstore ().SetAddr (remotePID , remoteMa , - 1 )
267
+ t .swarm .Peerstore ().SetAddr (remotePID , remoteMa , - 1 )
269
268
t .driver .CloseConnWithPeer (sRemotePID )
270
269
}
271
270
}()
@@ -287,6 +286,24 @@ func (t *proximityTransport) HandleFoundPeer(sRemotePID string) bool {
287
286
}
288
287
}
289
288
289
+ // Adapted from https://github.com/libp2p/go-libp2p/blob/v0.38.1/p2p/host/basic/basic_host.go#L795
290
+ func (t * proximityTransport ) connect (ctx context.Context , pi peer.AddrInfo ) error {
291
+ // absorb addresses into peerstore
292
+ t .swarm .Peerstore ().AddAddrs (pi .ID , pi .Addrs , pstore .TempAddrTTL )
293
+
294
+ forceDirect , _ := network .GetForceDirectDial (ctx )
295
+ canUseLimitedConn , _ := network .GetAllowLimitedConn (ctx )
296
+ if ! forceDirect {
297
+ connectedness := t .swarm .Connectedness (pi .ID )
298
+ if connectedness == network .Connected || (canUseLimitedConn && connectedness == network .Limited ) {
299
+ return nil
300
+ }
301
+ }
302
+
303
+ _ , err := t .swarm .DialPeer (ctx , pi .ID )
304
+ return err
305
+ }
306
+
290
307
// HandleLostPeer is called by the native driver when the connection with the peer is lost.
291
308
// Closes connections with the peer.
292
309
func (t * proximityTransport ) HandleLostPeer (sRemotePID string ) {
@@ -304,10 +321,10 @@ func (t *proximityTransport) HandleLostPeer(sRemotePID string) {
304
321
}
305
322
306
323
// Remove peer's address to peerstore.
307
- t .host .Peerstore ().SetAddr (remotePID , remoteMa , - 1 )
324
+ t .swarm .Peerstore ().SetAddr (remotePID , remoteMa , - 1 )
308
325
309
326
// Close the peer connection
310
- conns := t .host . Network () .ConnsToPeer (remotePID )
327
+ conns := t .swarm .ConnsToPeer (remotePID )
311
328
for _ , conn := range conns {
312
329
if conn .RemoteMultiaddr ().Equal (remoteMa ) {
313
330
conn .Close ()
0 commit comments