Skip to content

Commit b9c72b9

Browse files
committed
add forwad implementation + tests
1 parent 2cb3b86 commit b9c72b9

File tree

15 files changed

+421
-13
lines changed

15 files changed

+421
-13
lines changed

go.mod

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ require (
111111
require (
112112
github.com/docker/go-units v0.5.0
113113
github.com/go-chi/chi/v5 v5.2.1
114+
github.com/go-playground/validator/v10 v10.11.1
114115
github.com/hashicorp/go-version v1.7.0
115116
)
116117

@@ -257,6 +258,8 @@ require (
257258
github.com/go-logfmt/logfmt v0.6.0 // indirect
258259
github.com/go-logr/logr v1.4.2 // indirect
259260
github.com/go-logr/stdr v1.2.2 // indirect
261+
github.com/go-playground/locales v0.14.0 // indirect
262+
github.com/go-playground/universal-translator v0.18.0 // indirect
260263
github.com/go-task/slim-sprig/v3 v3.0.0 // indirect
261264
github.com/go-toolsmith/astcast v1.1.0 // indirect
262265
github.com/go-toolsmith/astcopy v1.1.0 // indirect
@@ -343,6 +346,7 @@ require (
343346
github.com/ldez/grignotin v0.9.0 // indirect
344347
github.com/ldez/tagliatelle v0.7.1 // indirect
345348
github.com/ldez/usetesting v0.4.2 // indirect
349+
github.com/leodido/go-urn v1.2.1 // indirect
346350
github.com/leonklingele/grouper v1.1.2 // indirect
347351
github.com/lib/pq v1.10.9 // indirect
348352
github.com/linxGnu/grocksdb v1.8.14 // indirect

go.sum

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -673,6 +673,7 @@ github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
673673
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
674674
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
675675
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
676+
github.com/go-playground/assert/v2 v2.0.1 h1:MsBgLAaY856+nPRTKrp3/OZK38U/wa0CcBYNjji3q3A=
676677
github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
677678
github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8=
678679
github.com/go-playground/locales v0.14.0 h1:u50s323jtVGugKlcYeyzC0etD1HifMjqmJqb8WugfUU=
@@ -1056,6 +1057,8 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxv
10561057
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
10571058
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
10581059
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
1060+
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
1061+
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
10591062
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
10601063
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
10611064
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
@@ -1360,6 +1363,8 @@ github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUc
13601363
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
13611364
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
13621365
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
1366+
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
1367+
github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE=
13631368
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
13641369
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
13651370
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
@@ -1631,6 +1636,7 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U
16311636
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
16321637
golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
16331638
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
1639+
golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
16341640
golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc=
16351641
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
16361642
golang.org/x/crypto v0.15.0/go.mod h1:4ChreQoLWfG3xLDer1WdlH5NdlQ3+mwnQq1YTKY+72g=
@@ -1736,6 +1742,7 @@ golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96b
17361742
golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
17371743
golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
17381744
golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
1745+
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
17391746
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
17401747
golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
17411748
golang.org/x/net v0.0.0-20220325170049-de3da57026de/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=

makefiles/relay.mk

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,4 +65,5 @@ relayminer_forward_http_rest: ## Forward request to the rest service.
6565
.PHONY: relayminer_forward_request_websocket_anvilws
6666
relayminer_forward_request_websocket_anvilws: ## Forward websocket request to the anvilws service.
6767
@websocat ws://localhost:10001/services/anvilws/forward \
68-
-H "token: 8cc09793290cd64d8a9bc80eaae4fbeef5f7cf797b0c70e078d2a5b81d74f12c"
68+
-H "token: 8cc09793290cd64d8a9bc80eaae4fbeef5f7cf797b0c70e078d2a5b81d74f12c" \
69+
-H "Rpc-Type: 2" # RPCType_WEBSOCKET

pkg/relayer/forward_server.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package relayer
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net"
7+
"net/http"
8+
9+
"github.com/go-chi/chi/v5"
10+
)
11+
12+
// ServeForward exposes a forward HTTP server for administrators to send request to
13+
// specific service.
14+
func (rel *relayMiner) ServeForward(ctx context.Context, network, addr, token string) error {
15+
ln, err := net.Listen(network, addr)
16+
if err != nil {
17+
return fmt.Errorf("net listen: %w", err)
18+
}
19+
20+
muxRouter := chi.NewRouter()
21+
muxRouter.HandleFunc("/services/{service_id}/forward", rel.newForwardHandlerFn(ctx, token))
22+
23+
go func() {
24+
if err := http.Serve(ln, muxRouter); err != nil {
25+
rel.logger.Error().Err(err).
26+
Msg("unexpected error occurred while serving forward server")
27+
return
28+
}
29+
}()
30+
31+
return nil
32+
}
33+
34+
func (rel *relayMiner) newForwardHandlerFn(ctx context.Context, token string) http.HandlerFunc {
35+
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
36+
reqToken := r.Header.Get("token")
37+
if reqToken != token {
38+
w.WriteHeader(http.StatusUnauthorized)
39+
return
40+
}
41+
42+
serviceID := chi.URLParam(r, "service_id")
43+
if serviceID == "" {
44+
rel.logger.Error().Msg("service id not found in URL while forwarding request")
45+
w.WriteHeader(http.StatusBadRequest)
46+
return
47+
}
48+
49+
rel.logger.Debug().Str("service_id", serviceID).
50+
Msg("forwarding request to supplier...")
51+
52+
if err := rel.relayerProxy.Forward(ctx, serviceID, w, r); err != nil {
53+
rel.logger.Error().Err(err).
54+
Msg("unable to forward request")
55+
w.WriteHeader(http.StatusInternalServerError)
56+
return
57+
}
58+
})
59+
}

pkg/relayer/interface.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package relayer
77

88
import (
99
"context"
10+
"net/http"
1011

1112
"github.com/pokt-network/smt"
1213

@@ -59,6 +60,9 @@ type RelayerProxy interface {
5960

6061
// PingAll tests the connectivity between all the managed relay servers and their respective backend URLs.
6162
PingAll(ctx context.Context) error
63+
64+
// Forward sends a request to the relay server for the given service ID.
65+
Forward(ctx context.Context, serviceID string, w http.ResponseWriter, r *http.Request) error
6266
}
6367

6468
type RelayerProxyOption func(RelayerProxy)
@@ -101,6 +105,9 @@ type RelayServer interface {
101105

102106
// Ping tests the connection between the relay server and its backend URL.
103107
Ping(ctx context.Context) error
108+
109+
// Forward sends a request to the supplier service for the given service ID.
110+
Forward(ctx context.Context, serviceID string, w http.ResponseWriter, r *http.Request) error
104111
}
105112

106113
// RelayServers aggregates a slice of RelayServer interface.

pkg/relayer/proxy/async.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,13 @@ package proxy
22

33
import (
44
"context"
5+
"fmt"
56
"net/http"
67

78
"github.com/gorilla/websocket"
89

910
"github.com/pokt-network/poktroll/pkg/polylog"
11+
"github.com/pokt-network/poktroll/pkg/relayer/config"
1012
proxyws "github.com/pokt-network/poktroll/pkg/relayer/proxy/websockets"
1113
sharedtypes "github.com/pokt-network/poktroll/x/shared/types"
1214
)
@@ -121,3 +123,62 @@ func (server *relayMinerHTTPServer) handleAsyncConnection(
121123

122124
return nil
123125
}
126+
127+
// forwardAsyncConnection instantiates two websocket connections that:
128+
// - receive and forward message from the client to the supplier (backend URL).
129+
// - receive and forward message from the supplier (backend URL) to the client.
130+
func (server *relayMinerHTTPServer) forwardAsyncConnection(ctx context.Context, supplierConfig *config.RelayMinerSupplierConfig, w http.ResponseWriter, req *http.Request) error {
131+
upgrader := websocket.Upgrader{
132+
CheckOrigin: func(r *http.Request) bool { return true },
133+
}
134+
135+
clientConn, err := upgrader.Upgrade(w, req, nil)
136+
if err != nil {
137+
return fmt.Errorf("client connection upgrade client to ws: %w", err)
138+
}
139+
140+
serviceConn, err := proxyws.ConnectServiceBackend(supplierConfig.ServiceConfig.BackendUrl, supplierConfig.ServiceConfig.GetHeadersHTTP())
141+
if err != nil {
142+
return fmt.Errorf("service connection upgrade to ws: %w", err)
143+
}
144+
145+
forwardFn := func(from, to *websocket.Conn) {
146+
defer from.Close()
147+
defer to.Close()
148+
149+
isNormalCloseConnection := func(err error) bool {
150+
return websocket.IsCloseError(err,
151+
websocket.CloseNormalClosure,
152+
websocket.CloseGoingAway,
153+
websocket.CloseAbnormalClosure)
154+
}
155+
156+
for {
157+
msgType, msg, err := from.ReadMessage()
158+
if err != nil {
159+
if isNormalCloseConnection(err) {
160+
return
161+
}
162+
163+
server.logger.Error().
164+
Msgf("from read message: %w", err)
165+
return
166+
}
167+
168+
if err := to.WriteMessage(msgType, msg); err != nil {
169+
if isNormalCloseConnection(err) {
170+
return
171+
}
172+
173+
server.logger.Error().
174+
Msgf("to write message: %w", err)
175+
return
176+
}
177+
}
178+
}
179+
180+
go forwardFn(clientConn, serviceConn)
181+
forwardFn(serviceConn, clientConn)
182+
183+
return nil
184+
}

pkg/relayer/proxy/errors.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,5 @@ var (
2020
ErrRelayerProxyResponseLimitExceeded = sdkerrors.Register(codespace, 12, "response limit exceed")
2121
ErrRelayerProxyRequestLimitExceeded = sdkerrors.Register(codespace, 13, "request limit exceed")
2222
ErrRelayerProxyUnmarshalingRelayRequest = sdkerrors.Register(codespace, 14, "failed to unmarshal relay request")
23+
ErrRelayerProxyServiceIDNotFound = sdkerrors.Register(codespace, 15, "service id not found")
2324
)

pkg/relayer/proxy/http_server.go

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"net"
66
"net/http"
7-
"strconv"
87
"time"
98

109
codectypes "github.com/cosmos/cosmos-sdk/codec/types"
@@ -14,7 +13,6 @@ import (
1413
"github.com/pokt-network/poktroll/pkg/relayer"
1514
"github.com/pokt-network/poktroll/pkg/relayer/config"
1615
"github.com/pokt-network/poktroll/x/service/types"
17-
sharedtypes "github.com/pokt-network/poktroll/x/shared/types"
1816
)
1917

2018
// rpcTypeHeader is the header key for the RPC type, provided by the client.
@@ -137,11 +135,27 @@ func (server *relayMinerHTTPServer) Start(ctx context.Context) error {
137135
return server.server.Serve(listener)
138136
}
139137

138+
// Forward sends request to the appropriate service.
139+
// - It checks if the service id is managed by the relayminer.
140140
// Stop terminates the service server and returns an error if it fails.
141141
func (server *relayMinerHTTPServer) Stop(ctx context.Context) error {
142142
return server.server.Shutdown(ctx)
143143
}
144144

145+
// - It checks wether it needs to forward a websocket connection or send a http request.
146+
func (server *relayMinerHTTPServer) Forward(ctx context.Context, serviceID string, w http.ResponseWriter, req *http.Request) error {
147+
supplierConfig, ok := server.serverConfig.SupplierConfigsMap[serviceID]
148+
if !ok {
149+
return ErrRelayerProxyServiceIDNotFound.Wrapf("service ID: %s", serviceID)
150+
}
151+
152+
if isWebSocketRequest(req) {
153+
return server.forwardAsyncConnection(ctx, supplierConfig, w, req)
154+
} else {
155+
return server.forwardHTTP(ctx, supplierConfig, w, req)
156+
}
157+
}
158+
145159
// ServeHTTP listens for incoming relay requests. It implements the respective
146160
// method of the http.Handler interface. It is called by http.ListenAndServe()
147161
// when relayMinerHTTPServer is used as an http.Handler with an http.Server.
@@ -156,13 +170,6 @@ func (server *relayMinerHTTPServer) ServeHTTP(writer http.ResponseWriter, reques
156170
"remote_addr", request.RemoteAddr,
157171
)
158172

159-
// isWebSocketRequest checks if the request is trying to upgrade to WebSocket.
160-
isWebSocketRequest := func(r *http.Request) bool {
161-
// The request must have the "Rpc-Type" header set to "websocket".
162-
// This will be handled in the client, likely a PATH gateway.
163-
return r.Header.Get(RPCTypeHeader) == strconv.Itoa(int(sharedtypes.RPCType_WEBSOCKET))
164-
}
165-
166173
// Determine whether the request is upgrading to websocket.
167174
if isWebSocketRequest(request) {
168175
logger.ProbabilisticDebugInfo(relayProbabilisticDebugProb).Msg("🔍 detected asynchronous relay request")

0 commit comments

Comments
 (0)