Skip to content

Commit

Permalink
Refactor HTTP clients
Browse files Browse the repository at this point in the history
  • Loading branch information
gagliardetto committed Jul 20, 2021
1 parent 58f43f6 commit f0f399a
Show file tree
Hide file tree
Showing 68 changed files with 2,560 additions and 144 deletions.
2 changes: 1 addition & 1 deletion cmd/slnc/cmd/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

func getClient() *rpc.Client {
httpHeaders := viper.GetStringSlice("global-http-header")
api := rpc.NewClient(sanitizeAPIURL(viper.GetString("global-rpc-url")))
api := rpc.New(sanitizeAPIURL(viper.GetString("global-rpc-url")))

for i := 0; i < 25; i++ {
if val := os.Getenv(fmt.Sprintf("SLNC_GLOBAL_HTTP_HEADER_%d", i)); val != "" {
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ require (
github.com/logrusorgru/aurora v2.0.3+incompatible
github.com/magiconair/properties v1.8.1
github.com/mr-tron/base58 v1.2.0
github.com/onsi/gomega v1.10.1 // indirect
github.com/onsi/gomega v1.10.1
github.com/pkg/errors v0.9.1
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f
github.com/spf13/cobra v1.1.1
Expand All @@ -30,9 +30,9 @@ require (
github.com/stretchr/testify v1.6.1
github.com/teris-io/shortid v0.0.0-20201117134242-e59966efd125 // indirect
github.com/tidwall/gjson v1.6.7
github.com/ybbus/jsonrpc v2.1.2+incompatible
go.opencensus.io v0.22.5 // indirect
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/ratelimit v0.2.0
go.uber.org/zap v1.16.0
golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a
golang.org/x/net v0.0.0-20210510120150-4163338589ed // indirect
Expand Down
9 changes: 7 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAE
github.com/akavel/rsrc v0.8.0/go.mod h1:uLoCtb9J+EyAqh+26kdrTgmzRBFPGOolLWKpdxkKq+c=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129 h1:MzBOUgng9orim59UnfUTLRjMpd09C5uEVQ6RPGeCaVI=
github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129/go.mod h1:rFgpPQZYZ8vdbc+48xibu8ALc3yeyd64IhHS+PU6Yyg=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
Expand Down Expand Up @@ -217,9 +219,11 @@ github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o=
github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/nkovacs/streamquote v0.0.0-20170412213628-49af9bddb229/go.mod h1:0aYXnNPJ8l7uZxf45rWW1a/uME32OF0rhiYGNQ2oF2E=
github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.12.1 h1:mFwc4LvZ0xpSvDZ3E+k8Yte0hLOMxXUlP+yXtJqkYfQ=
github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE=
Expand Down Expand Up @@ -298,8 +302,6 @@ github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/ybbus/jsonrpc v2.1.2+incompatible h1:V4mkE9qhbDQ92/MLMIhlhMSbz8jNXdagC3xBR5NDwaQ=
github.com/ybbus/jsonrpc v2.1.2+incompatible/go.mod h1:XJrh1eMSzdIYFbM08flv0wp5G35eRniyeGut1z+LSiE=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
Expand All @@ -321,6 +323,8 @@ go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+
go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=
go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/ratelimit v0.2.0 h1:UQE2Bgi7p2B85uP5dC2bbRtig0C+OeNRnNEafLjsLPA=
go.uber.org/ratelimit v0.2.0/go.mod h1:YYBV4e4naJvhpitQrWJu1vCpgB7CboMe0qhltKt6mUg=
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
go.uber.org/zap v1.14.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM=
Expand Down Expand Up @@ -559,6 +563,7 @@ gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMy
gopkg.in/ini.v1 v1.51.0 h1:AQvPpx3LzTDM0AjnIRlVFwFFGC+npRopjZxLJj6gdno=
gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
Expand Down
2 changes: 1 addition & 1 deletion programs/serum/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestDecoder_EventQueue_Diff(t *testing.T) {
newDataJSONFile := strings.ReplaceAll(newDataFile, ".bin.zst", ".json")

if os.Getenv("TESTDATA_UPDATE") == "true" {
client := rpc.NewClient("http://api.mainnet-beta.solana.com:80/rpc")
client := rpc.New("http://api.mainnet-beta.solana.com:80/rpc")
ctx := context.Background()
account := solana.MustPublicKeyFromBase58("13iGJcA4w5hcJZDjJbJQor1zUiDLE4jv2rMW9HkD5Eo1")

Expand Down
4 changes: 2 additions & 2 deletions programs/serum/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestFetchMarket(t *testing.T) {

//

client := rpc.NewClient(rpcURL)
client := rpc.New(rpcURL)
ctx := context.Background()

openOrderAdd, err := solana.PublicKeyFromBase58("jFoHUkNDC767PyK11cZM4zyNcpjLqFnSjaqEYp5GVBr")
Expand All @@ -61,7 +61,7 @@ func TestStreamOpenOrders(t *testing.T) {
t.Skip("Setup 'RPC_URL' to run test i.e. 'ws://api.mainnet-beta.solana.com:80/rpc'")
return
}
client, err := ws.Dial(context.Background(), rpcURL)
client, err := ws.Connect(context.Background(), rpcURL)
require.NoError(t, err)

err = StreamOpenOrders(client)
Expand Down
4 changes: 2 additions & 2 deletions programs/token/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestAccount(t *testing.T) {
func TestMint(t *testing.T) {

addr := solana.MustPublicKeyFromBase58("EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v")
cli := rpc.NewClient("http://api.mainnet-beta.solana.com")
cli := rpc.New("http://api.mainnet-beta.solana.com")

var m Mint
err := cli.GetAccountDataIn(context.Background(), addr, &m)
Expand All @@ -85,7 +85,7 @@ func TestMint(t *testing.T) {
func TestRawMint(t *testing.T) {

addr := solana.MustPublicKeyFromBase58("EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v")
cli := rpc.NewClient("http://api.mainnet-beta.solana.com")
cli := rpc.New("http://api.mainnet-beta.solana.com")

resp, err := cli.GetAccountInfo(context.Background(), addr)
// handle `err`
Expand Down
135 changes: 123 additions & 12 deletions rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,40 +15,73 @@
package rpc

import (
"context"
"errors"
"net"
"net/http"
"time"

"github.com/ybbus/jsonrpc"
"github.com/gagliardetto/solana-go/rpc/jsonrpc"
"github.com/klauspost/compress/gzhttp"
"go.uber.org/ratelimit"
)

var ErrNotFound = errors.New("not found")
var ErrNotConfirmed = errors.New("not confirmed")

type Client struct {
rpcURL string
rpcClient CallForClientInterface
rpcClient JSONRPCClient
headers http.Header
}

type CallForClientInterface interface {
CallFor(out interface{}, method string, params ...interface{}) error
type JSONRPCClient interface {
CallForInto(ctx context.Context, out interface{}, method string, params []interface{}) error
}

func NewClient(rpcURL string) *Client {
return NewClientWithOpts(rpcURL, nil)
// New creates a new Solana JSON RPC client.
func New(rpcEndpoint string) *Client {
opts := &jsonrpc.RPCClientOpts{
HTTPClient: newHTTP(),
}
rpcClient := jsonrpc.NewClientWithOpts(rpcEndpoint, opts)
return NewWithCustomRPCClient(rpcClient)
}

func NewClientWithOpts(rpcURL string, opts *jsonrpc.RPCClientOpts) *Client {
rpcClient := jsonrpc.NewClientWithOpts(rpcURL, opts)
// NewWithCustomRPCClient creates a new Solana RPC client
// with the provided RPC client.
func NewWithCustomRPCClient(rpcClient JSONRPCClient) *Client {
return &Client{
rpcURL: rpcURL,
rpcClient: rpcClient,
}
}

func NewWithCustomRPCClient(rpcClient CallForClientInterface) *Client {
return &Client{
rpcClient: rpcClient,
var _ JSONRPCClient = &clientWithRateLimiting{}

type clientWithRateLimiting struct {
rpcClient jsonrpc.RPCClient
rateLimiter ratelimit.Limiter
}

func (wr *clientWithRateLimiting) CallForInto(ctx context.Context, out interface{}, method string, params []interface{}) error {
wr.rateLimiter.Take()
return wr.rpcClient.CallForInto(ctx, &out, method, params)
}

// NewWithRateLimit creates a new rate-limitted Solana RPC client.
func NewWithRateLimit(
rpcEndpoint string,
rps int, // requests per second
) JSONRPCClient {
opts := &jsonrpc.RPCClientOpts{
HTTPClient: newHTTP(),
}

rpcClient := jsonrpc.NewClientWithOpts(rpcEndpoint, opts)

return &clientWithRateLimiting{
rpcClient: rpcClient,
rateLimiter: ratelimit.New(rps),
}
}

Expand All @@ -58,3 +91,81 @@ func (c *Client) SetHeader(k, v string) {
}
c.headers.Set(k, v)
}

var (
defaultMaxIdleConnsPerHost = 9
defaultTimeout = 5 * time.Minute
defaultKeepAlive = 180 * time.Second
)

func newHTTPTransport() *http.Transport {
return &http.Transport{
IdleConnTimeout: defaultTimeout,
MaxConnsPerHost: defaultMaxIdleConnsPerHost,
MaxIdleConnsPerHost: defaultMaxIdleConnsPerHost,
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: defaultTimeout,
KeepAlive: defaultKeepAlive,
DualStack: true,
}).DialContext,
ForceAttemptHTTP2: true,
// MaxIdleConns: 100,
TLSHandshakeTimeout: 10 * time.Second,
// ExpectContinueTimeout: 1 * time.Second,
}
}

// newHTTP returns a new Client from the provided config.
// Client is safe for concurrent use by multiple goroutines.
func newHTTP() *http.Client {
tr := newHTTPTransport()

return &http.Client{
Timeout: defaultTimeout,
Transport: gzhttp.Transport(tr),
}
}

// type wrapped struct {
// beforeHooks []JSONRPCClient
// cl JSONRPCClient
// afterHooks []JSONRPCClient
// }

// func (wr *wrapped) CallForInto(ctx context.Context, out interface{}, method string, params []interface{}) error {
// for _, ware := range wr.beforeHooks {
// err := ware.CallForInto(ctx, &out, method, params)
// if err != nil {
// return err
// }
// }

// err := wr.cl.CallForInto(ctx, &out, method, params)
// if err != nil {
// return err
// }

// for _, ware := range wr.afterHooks {
// err := ware.CallForInto(ctx, &out, method, params)
// if err != nil {
// return err
// }
// }
// return nil
// }

// func (cl *Client) WithBeforeHook(beforeHook JSONRPCClient) *Client {
// /* code */
// return nil
// }

// func (cl *Client) WithAfterHook(afterHook JSONRPCClient) *Client {
// /* code */
// return nil
// }

// // Expose this to make it easily expandable; or maybe don't ???
// func (cl *Client) CallForInto(ctx context.Context, out interface{}, method string, params []interface{}) error {
// return cl.rpcClient.CallForInto(ctx, &out, method, params)
// }
110 changes: 55 additions & 55 deletions rpc/client_test.go

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions rpc/deprecated.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (cl *Client) GetConfirmedBlockWithOpts(
}
}

err = cl.rpcClient.CallFor(&out, "getConfirmedBlock", params)
err = cl.rpcClient.CallForInto(ctx, &out, "getConfirmedBlock", params)
return
}

Expand All @@ -106,7 +106,7 @@ func (cl *Client) GetConfirmedBlocks(
params = append(params, M{"commitment": string(commitment)})
}

err = cl.rpcClient.CallFor(&out, "getConfirmedBlocks", params)
err = cl.rpcClient.CallForInto(ctx, &out, "getConfirmedBlocks", params)
return
}

Expand All @@ -126,7 +126,7 @@ func (cl *Client) GetConfirmedBlocksWithLimit(
params = append(params, M{"commitment": string(commitment)})
}

err = cl.rpcClient.CallFor(&out, "getConfirmedBlocksWithLimit", params)
err = cl.rpcClient.CallForInto(ctx, &out, "getConfirmedBlocksWithLimit", params)
return
}

Expand All @@ -143,7 +143,7 @@ func (cl *Client) GetConfirmedSignaturesForAddress2(

params := []interface{}{address, opts}

err = cl.rpcClient.CallFor(&out, "getConfirmedSignaturesForAddress2", params)
err = cl.rpcClient.CallForInto(ctx, &out, "getConfirmedSignaturesForAddress2", params)
return
}

Expand All @@ -154,7 +154,7 @@ func (cl *Client) GetConfirmedTransaction(
) (out *TransactionWithMeta, err error) {
params := []interface{}{signature, "json"}

err = cl.rpcClient.CallFor(&out, "getConfirmedTransaction", params)
err = cl.rpcClient.CallForInto(ctx, &out, "getConfirmedTransaction", params)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion rpc/getAccountInfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (cl *Client) GetAccountInfoWithOpts(
params = append(params, obj)
}

err = cl.rpcClient.CallFor(&out, "getAccountInfo", params)
err = cl.rpcClient.CallForInto(ctx, &out, "getAccountInfo", params)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion rpc/getBalance.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,6 @@ func (cl *Client) GetBalance(
params = append(params, M{"commitment": string(commitment)})
}

err = cl.rpcClient.CallFor(&out, "getBalance", params)
err = cl.rpcClient.CallForInto(ctx, &out, "getBalance", params)
return
}
2 changes: 1 addition & 1 deletion rpc/getBlock.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (cl *Client) GetBlockWithOpts(

params := []interface{}{slot, obj}

err = cl.rpcClient.CallFor(&out, "getBlock", params)
err = cl.rpcClient.CallForInto(ctx, &out, "getBlock", params)

if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion rpc/getBlockCommitment.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ func (cl *Client) GetBlockCommitment(
block uint64, // block, identified by Slot
) (out *GetBlockCommitmentResult, err error) {
params := []interface{}{block}
err = cl.rpcClient.CallFor(&out, "getBlockCommitment", params)
err = cl.rpcClient.CallForInto(ctx, &out, "getBlockCommitment", params)
return
}

Expand Down
2 changes: 1 addition & 1 deletion rpc/getBlockHeight.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@ func (cl *Client) GetBlockHeight(
if commitment != "" {
params = append(params, M{"commitment": commitment})
}
err = cl.rpcClient.CallFor(&out, "getBlockHeight", params)
err = cl.rpcClient.CallForInto(ctx, &out, "getBlockHeight", params)
return
}
2 changes: 1 addition & 1 deletion rpc/getBlockProduction.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (cl *Client) GetBlockProductionWithOpts(
params = append(params, obj)
}
}
err = cl.rpcClient.CallFor(&out, "getBlockProduction", params)
err = cl.rpcClient.CallForInto(ctx, &out, "getBlockProduction", params)

return
}
Expand Down
2 changes: 1 addition & 1 deletion rpc/getBlockTime.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,6 @@ func (cl *Client) GetBlockTime(
block uint64, // block, identified by Slot
) (out *UnixTimeSeconds, err error) {
params := []interface{}{block}
err = cl.rpcClient.CallFor(&out, "getBlockTime", params)
err = cl.rpcClient.CallForInto(ctx, &out, "getBlockTime", params)
return
}
Loading

0 comments on commit f0f399a

Please sign in to comment.