Skip to content

Commit e24664d

Browse files
committed
create tran tranx server handler and client in core/tranx pkg
1 parent 903c779 commit e24664d

File tree

2 files changed

+365
-0
lines changed

2 files changed

+365
-0
lines changed

core/tranx/handlers.go

Lines changed: 283 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,283 @@
1+
package tranx
2+
3+
import (
4+
"fmt"
5+
"log"
6+
"time"
7+
"encoding/json"
8+
9+
"github.com/gorilla/websocket"
10+
"github.com/abdfnx/tran/tools"
11+
"github.com/abdfnx/tran/constants"
12+
"github.com/abdfnx/tran/models/protocol"
13+
)
14+
15+
// handleEstablishSender returns a websocket handler that communicates with the sender.
16+
func (s *Server) handleEstablishSender() tools.WsHandlerFunc {
17+
return func(wsConn *websocket.Conn) {
18+
// Bind an ID to this communication and send ot to the sender
19+
id := s.ids.Bind()
20+
wsConn.WriteJSON(protocol.TranxMessage{
21+
Type: protocol.TranxToSenderBind,
22+
Payload: protocol.TranxToSenderBindPayload{
23+
ID: id,
24+
},
25+
})
26+
27+
msg := protocol.TranxMessage{}
28+
err := wsConn.ReadJSON(&msg)
29+
30+
if err != nil {
31+
log.Println("message did not follow protocol:", err)
32+
return
33+
}
34+
35+
if !isExpected(msg.Type, protocol.SenderToTranxEstablish) {
36+
return
37+
}
38+
39+
// receive the password (hashed) from the sender.
40+
establishPayload := protocol.PasswordPayload{}
41+
err = tools.DecodePayload(msg.Payload, &establishPayload)
42+
if err != nil {
43+
log.Println("error in SenderToTranxEstablish payload:", err)
44+
45+
return
46+
}
47+
48+
// Allocate a mailbox for this communication.
49+
mailbox := &Mailbox{
50+
Sender: &protocol.TranxSender{
51+
TranxClient: *NewClient(wsConn),
52+
},
53+
CommunicationChannel: make(chan []byte),
54+
Quit: make(chan bool),
55+
}
56+
57+
s.mailboxes.StoreMailbox(establishPayload.Password, mailbox)
58+
_, err = s.mailboxes.GetMailbox(establishPayload.Password)
59+
60+
if err != nil {
61+
log.Println("The created mailbox could not be retrieved")
62+
63+
return
64+
}
65+
66+
// wait for receiver to connect
67+
timeout := time.NewTimer(constants.RECEIVER_CONNECT_TIMEOUT)
68+
69+
select {
70+
case <-timeout.C:
71+
s.ids.Delete(id)
72+
return
73+
74+
case <-mailbox.CommunicationChannel:
75+
// receiver connected
76+
s.ids.Delete(id)
77+
break
78+
}
79+
80+
wsConn.WriteJSON(protocol.TranxMessage{
81+
Type: protocol.TranxToSenderReady,
82+
})
83+
84+
msg = protocol.TranxMessage{}
85+
err = wsConn.ReadJSON(&msg)
86+
87+
if err != nil {
88+
log.Println("message did not follow protocol:", err)
89+
90+
return
91+
}
92+
93+
if !isExpected(msg.Type, protocol.SenderToTranxPAKE) {
94+
return
95+
}
96+
97+
pakePayload := protocol.PakePayload{}
98+
err = tools.DecodePayload(msg.Payload, &pakePayload)
99+
100+
if err != nil {
101+
log.Println("error in SenderToTranxPAKE payload:", err)
102+
return
103+
}
104+
105+
// send PAKE bytes to receiver
106+
mailbox.CommunicationChannel <- pakePayload.Bytes
107+
// respond with receiver PAKE bytes
108+
wsConn.WriteJSON(protocol.TranxMessage{
109+
Type: protocol.TranxToSenderPAKE,
110+
Payload: protocol.PakePayload{
111+
Bytes: <-mailbox.CommunicationChannel,
112+
},
113+
})
114+
115+
msg = protocol.TranxMessage{}
116+
err = wsConn.ReadJSON(&msg)
117+
118+
if err != nil {
119+
log.Println("message did not follow protocol:", err)
120+
121+
return
122+
}
123+
124+
if !isExpected(msg.Type, protocol.SenderToTranxSalt) {
125+
return
126+
}
127+
128+
saltPayload := protocol.SaltPayload{}
129+
err = tools.DecodePayload(msg.Payload, &saltPayload)
130+
131+
if err != nil {
132+
log.Println("error in SenderToTranxSalt payload:", err)
133+
return
134+
}
135+
136+
// Send the salt to the receiver.
137+
mailbox.CommunicationChannel <- saltPayload.Salt
138+
// Start the relay of messgaes between the sender and receiver handlers.
139+
startRelay(s, wsConn, mailbox, establishPayload.Password)
140+
}
141+
}
142+
143+
// handleEstablishReceiver returns a websocket handler that that communicates with the sender.
144+
func (s *Server) handleEstablishReceiver() tools.WsHandlerFunc {
145+
return func(wsConn *websocket.Conn) {
146+
// Establish receiver.
147+
msg := protocol.TranxMessage{}
148+
err := wsConn.ReadJSON(&msg)
149+
150+
if err != nil {
151+
log.Println("message did not follow protocol:", err)
152+
return
153+
}
154+
155+
if !isExpected(msg.Type, protocol.ReceiverToTranxEstablish) {
156+
return
157+
}
158+
159+
establishPayload := protocol.PasswordPayload{}
160+
err = tools.DecodePayload(msg.Payload, &establishPayload)
161+
if err != nil {
162+
log.Println("error in ReceiverToTranxEstablish payload:", err)
163+
return
164+
}
165+
166+
mailbox, err := s.mailboxes.GetMailbox(establishPayload.Password)
167+
168+
if err != nil {
169+
log.Println("failed to get mailbox:", err)
170+
return
171+
}
172+
173+
if mailbox.Receiver != nil {
174+
log.Println("mailbox already has a receiver:", err)
175+
return
176+
}
177+
178+
// this reveiver was first, reserve this mailbox for it to receive
179+
mailbox.Receiver = NewClient(wsConn)
180+
s.mailboxes.StoreMailbox(establishPayload.Password, mailbox)
181+
182+
// notify sender we are connected
183+
mailbox.CommunicationChannel <- nil
184+
// send back received sender PAKE bytes
185+
wsConn.WriteJSON(protocol.TranxMessage{
186+
Type: protocol.TranxToReceiverPAKE,
187+
Payload: protocol.PakePayload{
188+
Bytes: <-mailbox.CommunicationChannel,
189+
},
190+
})
191+
192+
msg = protocol.TranxMessage{}
193+
err = wsConn.ReadJSON(&msg)
194+
195+
if err != nil {
196+
log.Println("message did not follow protocol:", err)
197+
return
198+
}
199+
200+
if !isExpected(msg.Type, protocol.ReceiverToTranxPAKE) {
201+
return
202+
}
203+
204+
receiverPakePayload := protocol.PakePayload{}
205+
err = tools.DecodePayload(msg.Payload, &receiverPakePayload)
206+
207+
if err != nil {
208+
log.Println("error in ReceiverToTranxPAKE payload:", err)
209+
return
210+
}
211+
212+
mailbox.CommunicationChannel <- receiverPakePayload.Bytes
213+
wsConn.WriteJSON(protocol.TranxMessage{
214+
Type: protocol.TranxToReceiverSalt,
215+
Payload: protocol.SaltPayload{
216+
Salt: <-mailbox.CommunicationChannel,
217+
},
218+
})
219+
220+
startRelay(s, wsConn, mailbox, establishPayload.Password)
221+
}
222+
}
223+
224+
// starts the relay service, closing it on request (if i.e. clients can communicate directly)
225+
func startRelay(s *Server, wsConn *websocket.Conn, mailbox *Mailbox, mailboxPassword string) {
226+
relayForwardCh := make(chan []byte)
227+
// listen for incoming websocket messages from currently handled client
228+
go func() {
229+
for {
230+
_, p, err := wsConn.ReadMessage()
231+
232+
if err != nil {
233+
log.Println("error when listening to incoming client messages:", err)
234+
fmt.Printf("closed by: %s\n", wsConn.RemoteAddr())
235+
mailbox.Quit <- true
236+
237+
return
238+
}
239+
240+
relayForwardCh <- p
241+
}
242+
}()
243+
244+
for {
245+
select {
246+
case relayReceivePayload := <-mailbox.CommunicationChannel:
247+
wsConn.WriteMessage(websocket.BinaryMessage, relayReceivePayload)
248+
249+
// received payload from __currently handled__ client, relay it to other client
250+
case relayForwardPayload := <-relayForwardCh:
251+
msg := protocol.TranxMessage{}
252+
err := json.Unmarshal(relayForwardPayload, &msg)
253+
// failed to unmarshal, we are in (encrypted) relay-mode, forward message directly to client
254+
if err != nil {
255+
mailbox.CommunicationChannel <- relayForwardPayload
256+
} else {
257+
// close the relay service if sender requested it
258+
if isExpected(msg.Type, protocol.ReceiverToTranxClose) {
259+
mailbox.Quit <- true
260+
261+
return
262+
}
263+
}
264+
265+
// deallocate mailbox and quit
266+
case <-mailbox.Quit:
267+
s.mailboxes.Delete(mailboxPassword)
268+
269+
return
270+
}
271+
}
272+
}
273+
274+
// isExpected is a convience helper function that checks message types and logs errors.
275+
func isExpected(actual protocol.TranxMessageType, expected protocol.TranxMessageType) bool {
276+
wasExpected := actual == expected
277+
278+
if !wasExpected {
279+
log.Printf("Expected message of type: %d. Got type %d\n", expected, actual)
280+
}
281+
282+
return wasExpected
283+
}

core/tranx/server.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package tranx
2+
3+
import (
4+
"os"
5+
"fmt"
6+
"log"
7+
"sync"
8+
"time"
9+
"context"
10+
"net/http"
11+
)
12+
13+
// Server is contains the necessary data to run the tranx server.
14+
type Server struct {
15+
httpServer *http.Server
16+
router *http.ServeMux
17+
mailboxes *Mailboxes
18+
ids *IDs
19+
signal chan os.Signal
20+
}
21+
22+
// NewServer constructs a new Server struct and setups the routes.
23+
func NewServer(port int) *Server {
24+
router := &http.ServeMux{}
25+
26+
s := &Server{
27+
httpServer: &http.Server{
28+
Addr: fmt.Sprintf(":%d", port),
29+
ReadTimeout: 30 * time.Second,
30+
WriteTimeout: 30 * time.Second,
31+
Handler: router,
32+
},
33+
router: router,
34+
mailboxes: &Mailboxes{&sync.Map{}},
35+
ids: &IDs{&sync.Map{}},
36+
}
37+
38+
s.routes()
39+
40+
return s
41+
}
42+
43+
// Start runs the tranx server.
44+
func (s *Server) Start() {
45+
ctx, cancel := context.WithCancel(context.Background())
46+
47+
go func() {
48+
<-s.signal
49+
cancel()
50+
}()
51+
52+
if err := serve(s, ctx); err != nil {
53+
log.Printf("Error serving Tran tranx server: %s\n", err)
54+
}
55+
}
56+
57+
// serve is a helper function providing graceful shutdown of the server.
58+
func serve(s *Server, ctx context.Context) (err error) {
59+
go func() {
60+
if err = s.httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
61+
log.Fatalf("Serving Tran: %s\n", err)
62+
}
63+
}()
64+
65+
log.Printf("Tran Tranx Server started at \"%s\" \n", s.httpServer.Addr)
66+
<-ctx.Done()
67+
68+
ctxShutdown, cancel := context.WithTimeout(context.Background(), 5*time.Second)
69+
defer func() {
70+
cancel()
71+
}()
72+
73+
if err = s.httpServer.Shutdown(ctxShutdown); err != nil {
74+
log.Fatalf("Tran tranx shutdown failed: %s", err)
75+
}
76+
77+
if err == http.ErrServerClosed {
78+
err = nil
79+
}
80+
81+
return err
82+
}

0 commit comments

Comments
 (0)