-
Notifications
You must be signed in to change notification settings - Fork 36
/
endpoint_client.go
132 lines (107 loc) · 2.88 KB
/
endpoint_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package gomavlib
import (
"context"
"fmt"
"io"
"net"
"github.com/bluenviron/gomavlib/v3/pkg/reconnector"
"github.com/bluenviron/gomavlib/v3/pkg/timednetconn"
)
type endpointClientConf interface {
isUDP() bool
getAddress() string
init(*Node) (Endpoint, error)
}
// EndpointTCPClient sets up a endpoint that works with a TCP client.
// TCP is fit for routing frames through the internet, but is not the most
// appropriate way for transferring frames from a UAV to a GCS, since it does
// not allow frame losses.
type EndpointTCPClient struct {
// domain name or IP of the server to connect to, example: 1.2.3.4:5600
Address string
}
func (EndpointTCPClient) isUDP() bool {
return false
}
func (conf EndpointTCPClient) getAddress() string {
return conf.Address
}
func (conf EndpointTCPClient) init(node *Node) (Endpoint, error) {
return initEndpointClient(node, conf)
}
// EndpointUDPClient sets up a endpoint that works with a UDP client.
type EndpointUDPClient struct {
// domain name or IP of the server to connect to, example: 1.2.3.4:5600
Address string
}
func (EndpointUDPClient) isUDP() bool {
return true
}
func (conf EndpointUDPClient) getAddress() string {
return conf.Address
}
func (conf EndpointUDPClient) init(node *Node) (Endpoint, error) {
return initEndpointClient(node, conf)
}
type endpointClient struct {
conf endpointClientConf
reconnector *reconnector.Reconnector
}
func initEndpointClient(node *Node, conf endpointClientConf) (Endpoint, error) {
_, _, err := net.SplitHostPort(conf.getAddress())
if err != nil {
return nil, fmt.Errorf("invalid address")
}
t := &endpointClient{
conf: conf,
reconnector: reconnector.New(
func(ctx context.Context) (io.ReadWriteCloser, error) {
network := func() string {
if conf.isUDP() {
return "udp4"
}
return "tcp4"
}()
// in UDP, the only possible error is a DNS failure
// in TCP, the handshake must be completed
timedContext, timedContextClose := context.WithTimeout(ctx, node.conf.ReadTimeout)
nconn, err := (&net.Dialer{}).DialContext(timedContext, network, conf.getAddress())
timedContextClose()
if err != nil {
return nil, err
}
return timednetconn.New(
node.conf.IdleTimeout,
node.conf.WriteTimeout,
nconn,
), nil
},
),
}
return t, nil
}
func (t *endpointClient) isEndpoint() {}
func (t *endpointClient) Conf() EndpointConf {
return t.conf
}
func (t *endpointClient) close() {
t.reconnector.Close()
}
func (t *endpointClient) oneChannelAtAtime() bool {
return true
}
func (t *endpointClient) provide() (string, io.ReadWriteCloser, error) {
conn, ok := t.reconnector.Reconnect()
if !ok {
return "", nil, errTerminated
}
return t.label(), conn, nil
}
func (t *endpointClient) label() string {
return fmt.Sprintf("%s:%s", func() string {
if t.conf.isUDP() {
return "udp"
}
return "tcp"
}(), t.conf.getAddress())
}