Skip to content

Commit

Permalink
add tally stats to backend clients
Browse files Browse the repository at this point in the history
  • Loading branch information
zachcheu committed May 8, 2024
1 parent 8f9dd94 commit a8233de
Show file tree
Hide file tree
Showing 26 changed files with 120 additions and 82 deletions.
2 changes: 1 addition & 1 deletion build-index/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func Run(flags *Flags, opts ...Option) {
log.Fatalf("Error creating simple store: %s", err)
}

backends, err := backend.NewManager(config.Backends, config.Auth)
backends, err := backend.NewManager(config.Backends, config.Auth, stats)
if err != nil {
log.Fatalf("Error creating backend manager: %s", err)
}
Expand Down
3 changes: 2 additions & 1 deletion lib/backend/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ import (
"io"

"github.com/uber/kraken/core"
"github.com/uber-go/tally"
)

var _factories = make(map[string]ClientFactory)

// ClientFactory creates backend client given name.
type ClientFactory interface {
Create(config interface{}, authConfig interface{}) (Client, error)
Create(config interface{}, authConfig interface{}, stats tally.Scope) (Client, error)
}

// Register registers new Factory with corresponding backend client name.
Expand Down
4 changes: 3 additions & 1 deletion lib/backend/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
// limitations under the License.
package backend

import "github.com/uber-go/tally"

// ManagerFixture returns a Manager with no clients for testing purposes.
func ManagerFixture() *Manager {
m, err := NewManager(nil, AuthConfig{})
m, err := NewManager(nil, AuthConfig{}, tally.NoopScope)
if err != nil {
panic(err)
}
Expand Down
12 changes: 7 additions & 5 deletions lib/backend/gcsbackend/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"io"
"path"

"github.com/uber-go/tally"
"github.com/uber/kraken/core"
"github.com/uber/kraken/lib/backend"
"github.com/uber/kraken/lib/backend/backenderrors"
Expand All @@ -41,7 +42,7 @@ func init() {
type factory struct{}

func (f *factory) Create(
confRaw interface{}, authConfRaw interface{}) (backend.Client, error) {
confRaw interface{}, authConfRaw interface{}, stats tally.Scope) (backend.Client, error) {

confBytes, err := yaml.Marshal(confRaw)
if err != nil {
Expand All @@ -61,13 +62,14 @@ func (f *factory) Create(
return nil, errors.New("unmarshal gcs auth config")
}

return NewClient(config, userAuth)
return NewClient(config, userAuth, stats)
}

// Client implements a backend.Client for GCS.
type Client struct {
config Config
pather namepath.Pather
stats tally.Scope
gcs GCS
}

Expand All @@ -81,7 +83,7 @@ func WithGCS(gcs GCS) Option {

// NewClient creates a new Client for GCS.
func NewClient(
config Config, userAuth UserAuthConfig, opts ...Option) (*Client, error) {
config Config, userAuth UserAuthConfig, stats tally.Scope, opts ...Option) (*Client, error) {

config.applyDefaults()
if config.Username == "" {
Expand All @@ -106,7 +108,7 @@ func NewClient(

if len(opts) > 0 {
// For mock.
client := &Client{config, pather, nil}
client := &Client{config, pather, stats, nil}
for _, opt := range opts {
opt(client)
}
Expand All @@ -120,7 +122,7 @@ func NewClient(
return nil, fmt.Errorf("invalid gcs credentials: %s", err)
}

client := &Client{config, pather,
client := &Client{config, pather, stats,
NewGCS(ctx, sClient.Bucket(config.Bucket), &config)}

log.Infof("Initalized GCS backend with config: %s", config)
Expand Down
5 changes: 3 additions & 2 deletions lib/backend/gcsbackend/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"strings"
"testing"

"github.com/uber-go/tally"
"github.com/uber/kraken/core"
"github.com/uber/kraken/lib/backend"
"github.com/uber/kraken/mocks/lib/backend/gcsbackend"
Expand Down Expand Up @@ -62,7 +63,7 @@ func newClientMocks(t *testing.T) (*clientMocks, func()) {
}

func (m *clientMocks) new() *Client {
c, err := NewClient(m.config, m.userAuth, WithGCS(m.gcs))
c, err := NewClient(m.config, m.userAuth, tally.NoopScope, WithGCS(m.gcs))
if err != nil {
panic(err)
}
Expand All @@ -83,7 +84,7 @@ func TestClientFactory(t *testing.T) {
auth.GCS.AccessBlob = "access_blob"
userAuth := UserAuthConfig{"test-user": auth}
f := factory{}
_, err := f.Create(config, userAuth)
_, err := f.Create(config, userAuth, tally.NoopScope)
fmt.Println(err.Error())
require.True(strings.Contains(err.Error(), "invalid gcs credentials"))
}
Expand Down
12 changes: 7 additions & 5 deletions lib/backend/hdfsbackend/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"regexp"
"sync"

"github.com/uber-go/tally"
"github.com/uber/kraken/core"
"github.com/uber/kraken/lib/backend"
"github.com/uber/kraken/lib/backend/hdfsbackend/webhdfs"
Expand All @@ -41,7 +42,7 @@ func init() {
type factory struct{}

func (f *factory) Create(
confRaw interface{}, authConfRaw interface{}) (backend.Client, error) {
confRaw interface{}, authConfRaw interface{}, stats tally.Scope) (backend.Client, error) {

confBytes, err := yaml.Marshal(confRaw)
if err != nil {
Expand All @@ -51,14 +52,15 @@ func (f *factory) Create(
if err := yaml.Unmarshal(confBytes, &config); err != nil {
return nil, errors.New("unmarshal hdfs config")
}
return NewClient(config)
return NewClient(config, stats)
}

// Client is a backend.Client for HDFS.
type Client struct {
config Config
pather namepath.Pather
webhdfs webhdfs.Client
stats tally.Scope
}

// Option allows setting optional Client parameters.
Expand All @@ -70,7 +72,7 @@ func WithWebHDFS(w webhdfs.Client) Option {
}

// NewClient creates a new Client for HDFS.
func NewClient(config Config, opts ...Option) (*Client, error) {
func NewClient(config Config, stats tally.Scope, opts ...Option) (*Client, error) {
config.applyDefaults()
if !path.IsAbs(config.RootDirectory) {
return nil, errors.New("invalid config: root_directory must be absolute path")
Expand All @@ -83,7 +85,7 @@ func NewClient(config Config, opts ...Option) (*Client, error) {
if err != nil {
return nil, err
}
client := &Client{config, pather, webhdfs}
client := &Client{config, pather, webhdfs, stats}
for _, opt := range opts {
opt(client)
}
Expand Down Expand Up @@ -269,5 +271,5 @@ func (c *Client) List(prefix string, opts ...backend.ListOption) (*backend.ListR

return &backend.ListResult{
Names: files,
}, nil
}, nil
}
5 changes: 3 additions & 2 deletions lib/backend/hdfsbackend/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"path"
"testing"

"github.com/uber-go/tally"
"github.com/uber/kraken/core"
"github.com/uber/kraken/lib/backend/hdfsbackend/webhdfs"
"github.com/uber/kraken/mocks/lib/backend/hdfsbackend/webhdfs"
Expand Down Expand Up @@ -46,7 +47,7 @@ func (m *clientMocks) new() *Client {
RootDirectory: "/root",
NamePath: "identity",
testing: true,
}, WithWebHDFS(m.webhdfs))
}, tally.NoopScope, WithWebHDFS(m.webhdfs))
if err != nil {
panic(err)
}
Expand All @@ -63,7 +64,7 @@ func TestClientFactory(t *testing.T) {
testing: true,
}
f := factory{}
_, err := f.Create(config, nil)
_, err := f.Create(config, nil, tally.NoopScope)
require.NoError(err)
}

Expand Down
10 changes: 6 additions & 4 deletions lib/backend/httpbackend/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"io"
"time"

"github.com/uber-go/tally"
"github.com/uber/kraken/core"
"github.com/uber/kraken/lib/backend"
"github.com/uber/kraken/lib/backend/backenderrors"
Expand All @@ -37,7 +38,7 @@ func init() {
type factory struct{}

func (f *factory) Create(
confRaw interface{}, authConfRaw interface{}) (backend.Client, error) {
confRaw interface{}, authConfRaw interface{}, stats tally.Scope) (backend.Client, error) {

confBytes, err := yaml.Marshal(confRaw)
if err != nil {
Expand All @@ -48,7 +49,7 @@ func (f *factory) Create(
if err := yaml.Unmarshal(confBytes, &config); err != nil {
return nil, errors.New("unmarshal http config")
}
return NewClient(config)
return NewClient(config, stats)
}

// Config defines http post/get upload/download urls
Expand All @@ -64,6 +65,7 @@ type Config struct {
// Client implements downloading/uploading object from/to S3
type Client struct {
config Config
stats tally.Scope
}

func (c Config) applyDefaults() Config {
Expand All @@ -74,8 +76,8 @@ func (c Config) applyDefaults() Config {
}

// NewClient creates a new http Client.
func NewClient(config Config) (*Client, error) {
return &Client{config: config.applyDefaults()}, nil
func NewClient(config Config, stats tally.Scope) (*Client, error) {
return &Client{config: config.applyDefaults(), stats: stats}, nil
}

// Stat always succeeds.
Expand Down
9 changes: 5 additions & 4 deletions lib/backend/httpbackend/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"net/http"
"testing"

"github.com/uber-go/tally"
"github.com/uber/kraken/core"
"github.com/uber/kraken/lib/backend/backenderrors"
"github.com/uber/kraken/utils/memsize"
Expand All @@ -34,7 +35,7 @@ func TestClientFactory(t *testing.T) {

config := Config{}
f := factory{}
_, err := f.Create(config, nil)
_, err := f.Create(config, nil, tally.NoopScope)
require.NoError(err)
}

Expand All @@ -52,7 +53,7 @@ func TestHttpDownloadSuccess(t *testing.T) {
defer stop()

config := Config{DownloadURL: "http://" + addr + "/data/%s"}
client, err := NewClient(config)
client, err := NewClient(config, tally.NoopScope)
require.NoError(err)

var b bytes.Buffer
Expand All @@ -72,7 +73,7 @@ func TestHttpDownloadFileNotFound(t *testing.T) {
defer stop()

config := Config{DownloadURL: "http://" + addr + "/data/%s"}
client, err := NewClient(config)
client, err := NewClient(config, tally.NoopScope)
require.NoError(err)

var b bytes.Buffer
Expand All @@ -87,7 +88,7 @@ func TestDownloadMalformedURLThrowsError(t *testing.T) {
defer stop()

config := Config{DownloadURL: "http://" + addr + "/data"}
client, err := NewClient(config)
client, err := NewClient(config, tally.NoopScope)
require.NoError(err)

var b bytes.Buffer
Expand Down
6 changes: 4 additions & 2 deletions lib/backend/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (

"github.com/uber/kraken/utils/bandwidth"
"github.com/uber/kraken/utils/log"

"github.com/uber-go/tally"
)

// Manager errors.
Expand Down Expand Up @@ -49,7 +51,7 @@ type Manager struct {
}

// NewManager creates a new backend Manager.
func NewManager(configs []Config, auth AuthConfig) (*Manager, error) {
func NewManager(configs []Config, auth AuthConfig, stats tally.Scope) (*Manager, error) {
var backends []*backend
for _, config := range configs {
config = config.applyDefaults()
Expand All @@ -66,7 +68,7 @@ func NewManager(configs []Config, auth AuthConfig) (*Manager, error) {
if err != nil {
return nil, fmt.Errorf("get backend client factory: %s", err)
}
c, err = factory.Create(backendConfig, auth[name])
c, err = factory.Create(backendConfig, auth[name], stats)
if err != nil {
return nil, fmt.Errorf("create backend client: %s", err)
}
Expand Down
5 changes: 3 additions & 2 deletions lib/backend/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package backend_test
import (
"testing"

"github.com/uber-go/tally"
. "github.com/uber/kraken/lib/backend"
"github.com/uber/kraken/lib/backend/namepath"
"github.com/uber/kraken/lib/backend/testfs"
Expand Down Expand Up @@ -103,7 +104,7 @@ func TestManagerNamespaceOrdering(t *testing.T) {
var configs []Config
require.NoError(yaml.Unmarshal([]byte(configStr), &configs))

m, err := NewManager(configs, AuthConfig{})
m, err := NewManager(configs, AuthConfig{}, tally.NoopScope)
require.NoError(err)

for ns, expected := range map[string]string{
Expand Down Expand Up @@ -135,7 +136,7 @@ func TestManagerBandwidth(t *testing.T) {
Backend: map[string]interface{}{
"testfs": testfs.Config{Addr: "test-addr", NamePath: namepath.Identity},
},
}}, AuthConfig{})
}}, AuthConfig{}, tally.NoopScope)
require.NoError(err)

checkBandwidth := func(egress, ingress int64) {
Expand Down
9 changes: 6 additions & 3 deletions lib/backend/registrybackend/blobclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"net/http"
"strconv"

"github.com/uber-go/tally"
"github.com/uber/kraken/core"
"github.com/uber/kraken/lib/backend"
"github.com/uber/kraken/lib/backend/backenderrors"
Expand All @@ -38,7 +39,7 @@ func init() {
type blobClientFactory struct{}

func (f *blobClientFactory) Create(
confRaw interface{}, authConfRaw interface{}) (backend.Client, error) {
confRaw interface{}, authConfRaw interface{}, stats tally.Scope) (backend.Client, error) {

confBytes, err := yaml.Marshal(confRaw)
if err != nil {
Expand All @@ -48,7 +49,7 @@ func (f *blobClientFactory) Create(
if err := yaml.Unmarshal(confBytes, &config); err != nil {
return nil, errors.New("unmarshal hdfs config")
}
return NewBlobClient(config)
return NewBlobClient(config, stats)
}

const _layerquery = "http://%s/v2/%s/blobs/sha256:%s"
Expand All @@ -58,10 +59,11 @@ const _manifestquery = "http://%s/v2/%s/manifests/sha256:%s"
type BlobClient struct {
config Config
authenticator security.Authenticator
stats tally.Scope
}

// NewBlobClient creates a new BlobClient.
func NewBlobClient(config Config) (*BlobClient, error) {
func NewBlobClient(config Config, stats tally.Scope) (*BlobClient, error) {
config = config.applyDefaults()
authenticator, err := security.NewAuthenticator(config.Address, config.Security)
if err != nil {
Expand All @@ -70,6 +72,7 @@ func NewBlobClient(config Config) (*BlobClient, error) {
return &BlobClient{
config: config,
authenticator: authenticator,
stats: stats,
}, nil
}

Expand Down

0 comments on commit a8233de

Please sign in to comment.