Skip to content

Commit 1db00a5

Browse files
committed
Redo the compact peer types
1 parent 6b27e14 commit 1db00a5

File tree

10 files changed

+329
-126
lines changed

10 files changed

+329
-126
lines changed

client.go

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,10 @@ import (
3535
"github.com/anacrolix/torrent/dht"
3636
"github.com/anacrolix/torrent/internal/pieceordering"
3737
"github.com/anacrolix/torrent/iplist"
38-
"github.com/anacrolix/torrent/logonce"
3938
"github.com/anacrolix/torrent/metainfo"
4039
"github.com/anacrolix/torrent/mse"
4140
pp "github.com/anacrolix/torrent/peer_protocol"
4241
"github.com/anacrolix/torrent/tracker"
43-
. "github.com/anacrolix/torrent/util"
4442
)
4543

4644
var (
@@ -1403,12 +1401,6 @@ func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *torrent, c *connect
14031401
return
14041402
}
14051403

1406-
type peerExchangeMessage struct {
1407-
Added CompactPeers `bencode:"added"`
1408-
AddedFlags []byte `bencode:"added.f"`
1409-
Dropped CompactPeers `bencode:"dropped"`
1410-
}
1411-
14121404
// Extracts the port as an integer from an address string.
14131405
func addrPort(addr net.Addr) int {
14141406
return AddrPort(addr)
@@ -2434,8 +2426,8 @@ newAnnounce:
24342426
for trIndex, tr := range tier {
24352427
numTrackersTried++
24362428
err := cl.announceTorrentSingleTracker(tr, &req, t)
2437-
if err != nil {
2438-
logonce.Stderr.Printf("%s: error announcing to %s: %s", t, tr, err)
2429+
if err != nil && missinggo.CryHeard() {
2430+
log.Printf("%s: error announcing to %s: %s", t, tr, err)
24392431
continue
24402432
}
24412433
// Float the successful announce to the top of the tier. If

dht/announce.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"github.com/willf/bloom"
1212

1313
"github.com/anacrolix/torrent/logonce"
14-
"github.com/anacrolix/torrent/util"
1514
)
1615

1716
// Maintains state for an ongoing Announce operation. An Announce is started
@@ -204,8 +203,8 @@ func (me *Announce) getPeers(addr dHTAddr) error {
204203
// peers that a node has reported as being in the swarm for a queried info
205204
// hash.
206205
type PeersValues struct {
207-
Peers []util.CompactPeer // Peers given in get_peers response.
208-
NodeInfo // The node that gave the response.
206+
Peers []Peer // Peers given in get_peers response.
207+
NodeInfo // The node that gave the response.
209208
}
210209

211210
// Stop the announce.

dht/dht.go

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"math/rand"
1919
"net"
2020
"os"
21+
"strconv"
2122
"time"
2223

2324
"github.com/anacrolix/missinggo"
@@ -1028,19 +1029,25 @@ func (s *Server) findNode(addr dHTAddr, targetID string) (t *Transaction, err er
10281029
return
10291030
}
10301031

1032+
type Peer struct {
1033+
IP net.IP
1034+
Port int
1035+
}
1036+
1037+
func (me *Peer) String() string {
1038+
return net.JoinHostPort(me.IP.String(), strconv.FormatInt(int64(me.Port), 10))
1039+
}
1040+
10311041
// In a get_peers response, the addresses of torrent clients involved with the
10321042
// queried info-hash.
1033-
func (m Msg) Values() (vs []util.CompactPeer) {
1034-
r, ok := m["r"]
1035-
if !ok {
1036-
return
1037-
}
1038-
rd, ok := r.(map[string]interface{})
1039-
if !ok {
1040-
return
1041-
}
1042-
v, ok := rd["values"]
1043-
if !ok {
1043+
func (m Msg) Values() (vs []Peer) {
1044+
v := func() interface{} {
1045+
defer func() {
1046+
recover()
1047+
}()
1048+
return m["r"].(map[string]interface{})["values"]
1049+
}()
1050+
if v == nil {
10441051
return
10451052
}
10461053
vl, ok := v.([]interface{})
@@ -1050,19 +1057,21 @@ func (m Msg) Values() (vs []util.CompactPeer) {
10501057
}
10511058
return
10521059
}
1053-
vs = make([]util.CompactPeer, 0, len(vl))
1060+
vs = make([]Peer, 0, len(vl))
10541061
for _, i := range vl {
10551062
s, ok := i.(string)
10561063
if !ok {
10571064
panic(i)
10581065
}
1066+
// Because it's a list of strings, we can let the length of the string
1067+
// determine the IP version of the compact peer.
10591068
var cp util.CompactPeer
10601069
err := cp.UnmarshalBinary([]byte(s))
10611070
if err != nil {
10621071
log.Printf("error decoding values list element: %s", err)
10631072
continue
10641073
}
1065-
vs = append(vs, cp)
1074+
vs = append(vs, Peer{cp.IP[:], int(cp.Port)})
10661075
}
10671076
return
10681077
}

pex.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package torrent
2+
3+
import "github.com/anacrolix/torrent/util"
4+
5+
type peerExchangeMessage struct {
6+
Added util.CompactIPv4Peers `bencode:"added"`
7+
AddedFlags []byte `bencode:"added.f"`
8+
Dropped util.CompactIPv4Peers `bencode:"dropped"`
9+
}

pex_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package torrent
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/require"
7+
8+
"github.com/anacrolix/torrent/bencode"
9+
)
10+
11+
func TestUnmarshalPex(t *testing.T) {
12+
var pem peerExchangeMessage
13+
err := bencode.Unmarshal([]byte("d5:added12:\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0ce"), &pem)
14+
require.NoError(t, err)
15+
require.EqualValues(t, 2, len(pem.Added))
16+
require.EqualValues(t, 1286, pem.Added[0].Port)
17+
require.EqualValues(t, 0x100*0xb+0xc, pem.Added[1].Port)
18+
}

tracker/http.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,7 @@ func (r *response) UnmarshalPeers() (ret []Peer, err error) {
4343
err = fmt.Errorf("unsupported peers value type: %T", r.Peers)
4444
return
4545
}
46-
cp := make(util.CompactPeers, 0, len(s)/6)
47-
err = cp.UnmarshalBinary([]byte(s))
46+
cp, err := util.UnmarshalIPv4CompactPeers([]byte(s))
4847
if err != nil {
4948
return
5049
}

tracker/server.go

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
package tracker
2+
3+
import (
4+
"bytes"
5+
"encoding/binary"
6+
"fmt"
7+
"math/rand"
8+
"net"
9+
10+
"github.com/anacrolix/torrent/util"
11+
)
12+
13+
type torrent struct {
14+
Leechers int32
15+
Seeders int32
16+
Peers util.CompactIPv4Peers
17+
}
18+
19+
type server struct {
20+
pc net.PacketConn
21+
conns map[int64]struct{}
22+
t map[[20]byte]torrent
23+
}
24+
25+
func marshal(parts ...interface{}) (ret []byte, err error) {
26+
var buf bytes.Buffer
27+
for _, p := range parts {
28+
err = binary.Write(&buf, binary.BigEndian, p)
29+
if err != nil {
30+
return
31+
}
32+
}
33+
ret = buf.Bytes()
34+
return
35+
}
36+
37+
func (me *server) respond(addr net.Addr, rh ResponseHeader, parts ...interface{}) (err error) {
38+
b, err := marshal(append([]interface{}{rh}, parts...)...)
39+
if err != nil {
40+
return
41+
}
42+
_, err = me.pc.WriteTo(b, addr)
43+
return
44+
}
45+
46+
func (me *server) newConn() (ret int64) {
47+
ret = rand.Int63()
48+
if me.conns == nil {
49+
me.conns = make(map[int64]struct{})
50+
}
51+
me.conns[ret] = struct{}{}
52+
return
53+
}
54+
55+
func (me *server) serveOne() (err error) {
56+
b := make([]byte, 0x10000)
57+
n, addr, err := me.pc.ReadFrom(b)
58+
if err != nil {
59+
return
60+
}
61+
r := bytes.NewReader(b[:n])
62+
var h RequestHeader
63+
err = readBody(r, &h)
64+
if err != nil {
65+
return
66+
}
67+
switch h.Action {
68+
case Connect:
69+
if h.ConnectionId != connectRequestConnectionId {
70+
return
71+
}
72+
connId := me.newConn()
73+
err = me.respond(addr, ResponseHeader{
74+
Connect,
75+
h.TransactionId,
76+
}, ConnectionResponse{
77+
connId,
78+
})
79+
return
80+
case Announce:
81+
if _, ok := me.conns[h.ConnectionId]; !ok {
82+
me.respond(addr, ResponseHeader{
83+
TransactionId: h.TransactionId,
84+
Action: Error,
85+
}, []byte("not connected"))
86+
return
87+
}
88+
var ar AnnounceRequest
89+
err = readBody(r, &ar)
90+
if err != nil {
91+
return
92+
}
93+
t := me.t[ar.InfoHash]
94+
b, err = t.Peers.MarshalBinary()
95+
if err != nil {
96+
panic(err)
97+
}
98+
err = me.respond(addr, ResponseHeader{
99+
TransactionId: h.TransactionId,
100+
Action: Announce,
101+
}, AnnounceResponseHeader{
102+
Interval: 900,
103+
Leechers: t.Leechers,
104+
Seeders: t.Seeders,
105+
}, b)
106+
return
107+
default:
108+
err = fmt.Errorf("unhandled action: %d", h.Action)
109+
me.respond(addr, ResponseHeader{
110+
TransactionId: h.TransactionId,
111+
Action: Error,
112+
}, []byte("unhandled action"))
113+
return
114+
}
115+
}

tracker/udp.go

Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import (
1111
"net/url"
1212
"time"
1313

14+
"github.com/anacrolix/missinggo"
15+
1416
"github.com/anacrolix/torrent/util"
1517
)
1618

@@ -22,6 +24,8 @@ const (
2224
Scrape
2325
Error
2426

27+
connectRequestConnectionId = 0x41727101980
28+
2529
// BEP 41
2630
optionTypeEndOfOptions = 0
2731
optionTypeNOP = 1
@@ -121,34 +125,29 @@ func (c *udpClient) Announce(req *AnnounceRequest) (res AnnounceResponse, err er
121125
res.Interval = h.Interval
122126
res.Leechers = h.Leechers
123127
res.Seeders = h.Seeders
124-
for {
125-
var p util.CompactPeer
126-
err = binary.Read(b, binary.BigEndian, &p)
127-
switch err {
128-
case nil:
129-
case io.EOF:
130-
err = nil
131-
fallthrough
132-
default:
133-
return
134-
}
128+
cps, err := util.UnmarshalIPv4CompactPeers(b.Bytes())
129+
if err != nil {
130+
return
131+
}
132+
for _, cp := range cps {
135133
res.Peers = append(res.Peers, Peer{
136-
IP: p.IP[:],
137-
Port: int(p.Port),
134+
IP: cp.IP[:],
135+
Port: int(cp.Port),
138136
})
139137
}
138+
return
140139
}
141140

142141
// body is the binary serializable request body. trailer is optional data
143142
// following it, such as for BEP 41.
144143
func (c *udpClient) write(h *RequestHeader, body interface{}, trailer []byte) (err error) {
145-
buf := &bytes.Buffer{}
146-
err = binary.Write(buf, binary.BigEndian, h)
144+
var buf bytes.Buffer
145+
err = binary.Write(&buf, binary.BigEndian, h)
147146
if err != nil {
148147
panic(err)
149148
}
150149
if body != nil {
151-
err = binary.Write(buf, binary.BigEndian, body)
150+
err = binary.Write(&buf, binary.BigEndian, body)
152151
if err != nil {
153152
panic(err)
154153
}
@@ -177,7 +176,7 @@ func write(w io.Writer, data interface{}) error {
177176

178177
// args is the binary serializable request body. trailer is optional data
179178
// following it, such as for BEP 41.
180-
func (c *udpClient) request(action Action, args interface{}, options []byte) (responseBody *bytes.Reader, err error) {
179+
func (c *udpClient) request(action Action, args interface{}, options []byte) (responseBody *bytes.Buffer, err error) {
181180
tid := newTransactionId()
182181
err = c.write(&RequestHeader{
183182
ConnectionId: c.connectionId,
@@ -218,12 +217,12 @@ func (c *udpClient) request(action Action, args interface{}, options []byte) (re
218217
if h.Action == Error {
219218
err = errors.New(buf.String())
220219
}
221-
responseBody = bytes.NewReader(buf.Bytes())
220+
responseBody = buf
222221
return
223222
}
224223
}
225224

226-
func readBody(r *bytes.Reader, data ...interface{}) (err error) {
225+
func readBody(r io.Reader, data ...interface{}) (err error) {
227226
for _, datum := range data {
228227
err = binary.Read(r, binary.BigEndian, datum)
229228
if err != nil {
@@ -241,9 +240,14 @@ func (c *udpClient) Connect() (err error) {
241240
if c.connected() {
242241
return nil
243242
}
244-
c.connectionId = 0x41727101980
243+
c.connectionId = connectRequestConnectionId
245244
if c.socket == nil {
246-
c.socket, err = net.Dial("udp", c.url.Host)
245+
hmp := missinggo.SplitHostPort(c.url.Host)
246+
if hmp.NoPort {
247+
hmp.NoPort = false
248+
hmp.Port = 80
249+
}
250+
c.socket, err = net.Dial("udp", hmp.String())
247251
if err != nil {
248252
return
249253
}

0 commit comments

Comments
 (0)