From cbc0d2e3aa638730be69f22c3e5124f140ff2865 Mon Sep 17 00:00:00 2001 From: Martin Majlis <martin.majlis+github@sentinelone.com> Date: Tue, 14 Nov 2023 09:14:23 +0100 Subject: [PATCH] DSET-4558: Add metrics --- examples/client/go.mod | 6 +- examples/client/go.sum | 4 + examples/readme/go.mod | 6 +- examples/readme/go.sum | 4 + go.mod | 5 + go.sum | 12 ++ pkg/client/add_events.go | 50 ++--- pkg/client/add_events_long_running_test.go | 5 +- pkg/client/add_events_test.go | 72 +++++-- pkg/client/client.go | 115 ++++------- pkg/client/client_test.go | 12 +- pkg/client/statistics.go | 220 ++++++++++++++++++++- 12 files changed, 375 insertions(+), 136 deletions(-) diff --git a/examples/client/go.mod b/examples/client/go.mod index a73b127..a936d38 100644 --- a/examples/client/go.mod +++ b/examples/client/go.mod @@ -4,15 +4,15 @@ go 1.19 require ( github.com/scalyr/dataset-go v0.0.0 - go.uber.org/zap v1.24.0 + go.uber.org/zap v1.26.0 ) require ( github.com/cenkalti/backoff/v4 v4.2.1 // indirect github.com/cskr/pubsub v1.0.2 // indirect - github.com/google/uuid v1.3.0 // indirect + github.com/google/uuid v1.4.0 // indirect go.uber.org/atomic v1.7.0 // indirect - go.uber.org/multierr v1.6.0 // indirect + go.uber.org/multierr v1.10.0 // indirect golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df // indirect ) diff --git a/examples/client/go.sum b/examples/client/go.sum index 63e98d5..5e91cb1 100644 --- a/examples/client/go.sum +++ b/examples/client/go.sum @@ -11,6 +11,7 @@ github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -26,8 +27,11 @@ go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= +go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg= +go.uber.org/zap v1.25.0/go.mod h1:JIAUzQIH94IC4fOJQm7gMmBJP5k7wQfdcnYdPoEXJYk= +go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so= golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df h1:UA2aFVmmsIlefxMk29Dp2juaUSth8Pyn3Tq5Y5mJGME= golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc= golang.org/x/mod v0.11.0 h1:bUO06HqtnRcc/7l71XBe4WcqTZ+3AH1J59zWDDwLKgU= diff --git a/examples/readme/go.mod b/examples/readme/go.mod index 0c0a1da..0159991 100644 --- a/examples/readme/go.mod +++ b/examples/readme/go.mod @@ -4,15 +4,15 @@ go 1.19 require ( github.com/scalyr/dataset-go v0.0.0 - go.uber.org/zap v1.24.0 + go.uber.org/zap v1.26.0 ) require ( github.com/cenkalti/backoff/v4 v4.2.1 // indirect github.com/cskr/pubsub v1.0.2 // indirect - github.com/google/uuid v1.3.0 // indirect + github.com/google/uuid v1.4.0 // indirect go.uber.org/atomic v1.7.0 // indirect - go.uber.org/multierr v1.6.0 // indirect + go.uber.org/multierr v1.10.0 // indirect golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df // indirect ) diff --git a/examples/readme/go.sum b/examples/readme/go.sum index 63e98d5..5e91cb1 100644 --- a/examples/readme/go.sum +++ b/examples/readme/go.sum @@ -11,6 +11,7 @@ github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -26,8 +27,11 @@ go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= +go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg= +go.uber.org/zap v1.25.0/go.mod h1:JIAUzQIH94IC4fOJQm7gMmBJP5k7wQfdcnYdPoEXJYk= +go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so= golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df h1:UA2aFVmmsIlefxMk29Dp2juaUSth8Pyn3Tq5Y5mJGME= golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc= golang.org/x/mod v0.11.0 h1:bUO06HqtnRcc/7l71XBe4WcqTZ+3AH1J59zWDDwLKgU= diff --git a/go.mod b/go.mod index ce0f84e..0ecbf5e 100644 --- a/go.mod +++ b/go.mod @@ -21,13 +21,18 @@ require ( github.com/cskr/pubsub v1.0.2 github.com/google/uuid v1.4.0 github.com/stretchr/testify v1.8.4 + go.opentelemetry.io/otel v1.20.0 + go.opentelemetry.io/otel/metric v1.20.0 go.uber.org/zap v1.26.0 golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df ) require ( github.com/davecgh/go-spew v1.1.1 // indirect + github.com/go-logr/logr v1.3.0 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + go.opentelemetry.io/otel/trace v1.20.0 // indirect go.uber.org/multierr v1.10.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index b74f93c..5e225ed 100644 --- a/go.sum +++ b/go.sum @@ -4,12 +4,24 @@ github.com/cskr/pubsub v1.0.2 h1:vlOzMhl6PFn60gRlTQQsIfVwaPB/B/8MziK8FhEPt/0= github.com/cskr/pubsub v1.0.2/go.mod h1:/8MzYXk/NJAz782G8RPkFzXTZVu63VotefPnR9TIRis= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY= +github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4= github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +go.opentelemetry.io/otel v1.20.0 h1:vsb/ggIY+hUjD/zCAQHpzTmndPqv/ml2ArbsbfBYTAc= +go.opentelemetry.io/otel v1.20.0/go.mod h1:oUIGj3D77RwJdM6PPZImDpSZGDvkD9fhesHny69JFrs= +go.opentelemetry.io/otel/metric v1.20.0 h1:ZlrO8Hu9+GAhnepmRGhSU7/VkpjrNowxRN9GyKR4wzA= +go.opentelemetry.io/otel/metric v1.20.0/go.mod h1:90DRw3nfK4D7Sm/75yQ00gTJxtkBxX+wu6YaNymbpVM= +go.opentelemetry.io/otel/trace v1.20.0 h1:+yxVAPZPbQhbC3OfAkeIVTky6iTFpcr4SiY9om7mXSQ= +go.opentelemetry.io/otel/trace v1.20.0/go.mod h1:HJSK7F/hA5RlzpZ0zKDCHCDHm556LCDtKaAo6JmBFUU= go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ= go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= diff --git a/pkg/client/add_events.go b/pkg/client/add_events.go index 9b5c81e..5571a38 100644 --- a/pkg/client/add_events.go +++ b/pkg/client/add_events.go @@ -83,7 +83,7 @@ func (client *DataSetClient) AddEvents(bundles []*add_events.EventBundle) error for _, bundle := range bundles { key := bundle.Key(client.Config.BufferSettings.GroupBy) client.eventBundlePerKeyTopic.Pub(bundle, key) - client.eventsEnqueued.Add(1) + client.statistics.EventsEnqueuedAdd(1) } return nil @@ -154,7 +154,7 @@ func (client *DataSetClient) listenAndSendBundlesForKey(key string, ch chan inte zap.String("key", key), zap.Any("msg", msg), ) - client.eventsBroken.Add(1) + client.statistics.EventsBrokenAdd(1) client.lastAcceptedAt.Store(time.Now().UnixNano()) continue } @@ -190,7 +190,7 @@ func (client *DataSetClient) listenAndSendBundlesForKey(key string, ch chan inte buf = getBuffer(key) } else { client.Logger.Error("Cannot add bundle", zap.Error(err)) - client.eventsDropped.Add(1) + client.statistics.EventsDroppedAdd(1) continue } } @@ -199,11 +199,11 @@ func (client *DataSetClient) listenAndSendBundlesForKey(key string, ch chan inte } if added == buffer.TooMuch { client.Logger.Fatal("Bundle was not added for second time!", buf.ZapStats()...) - client.eventsDropped.Add(1) + client.statistics.EventsDroppedAdd(1) continue } } - client.eventsProcessed.Add(1) + client.statistics.EventsProcessedAdd(1) buf.SetStatus(buffer.Ready) // it could happen that the buffer could have been published @@ -218,7 +218,7 @@ func (client *DataSetClient) listenAndSendBundlesForKey(key string, ch chan inte zap.String("key", key), zap.Any("msg", msg), ) - client.eventsBroken.Add(1) + client.statistics.EventsBrokenAdd(1) } } } @@ -226,13 +226,13 @@ func (client *DataSetClient) listenAndSendBundlesForKey(key string, ch chan inte // isProcessingBuffers returns True if there are still some unprocessed buffers. // False otherwise. func (client *DataSetClient) isProcessingBuffers() bool { - return client.buffersEnqueued.Load() > (client.buffersProcessed.Load() + client.buffersDropped.Load() + client.buffersBroken.Load()) + return client.statistics.buffersEnqueued.Load() > (client.statistics.buffersProcessed.Load() + client.statistics.buffersDropped.Load() + client.statistics.buffersBroken.Load()) } // isProcessingEvents returns True if there are still some unprocessed events. // False otherwise. func (client *DataSetClient) isProcessingEvents() bool { - return client.eventsEnqueued.Load() > (client.eventsProcessed.Load() + client.eventsDropped.Load() + client.eventsBroken.Load()) + return client.statistics.eventsEnqueued.Load() > (client.statistics.eventsProcessed.Load() + client.statistics.eventsDropped.Load() + client.statistics.eventsBroken.Load()) } // Shutdown takes care of shutdown of client. It does following steps @@ -273,7 +273,7 @@ func (client *DataSetClient) Shutdown() error { // try (with timeout) to process (add into buffers) events, retryNum := 0 expBackoff.Reset() - initialEventsDropped := client.eventsDropped.Load() + initialEventsDropped := client.statistics.eventsDropped.Load() for client.isProcessingEvents() { // log statistics client.logStatistics() @@ -283,8 +283,8 @@ func (client *DataSetClient) Shutdown() error { "Shutting down - processing events", zap.Int("retryNum", retryNum), zap.Duration("backoffDelay", backoffDelay), - zap.Uint64("eventsEnqueued", client.eventsEnqueued.Load()), - zap.Uint64("eventsProcessed", client.eventsProcessed.Load()), + zap.Uint64("eventsEnqueued", client.statistics.eventsEnqueued.Load()), + zap.Uint64("eventsProcessed", client.statistics.eventsProcessed.Load()), zap.Duration("elapsedTime", time.Since(processingStart)), zap.Duration("maxElapsedTime", maxElapsedTime), ) @@ -325,7 +325,7 @@ func (client *DataSetClient) Shutdown() error { // do wait (with timeout) for all buffers to be sent to the server retryNum = 0 expBackoff.Reset() - initialBuffersDropped := client.buffersDropped.Load() + initialBuffersDropped := client.statistics.buffersDropped.Load() for client.isProcessingBuffers() { // log statistics client.logStatistics() @@ -335,9 +335,9 @@ func (client *DataSetClient) Shutdown() error { "Shutting down - processing buffers", zap.Int("retryNum", retryNum), zap.Duration("backoffDelay", backoffDelay), - zap.Uint64("buffersEnqueued", client.buffersEnqueued.Load()), - zap.Uint64("buffersProcessed", client.buffersProcessed.Load()), - zap.Uint64("buffersDropped", client.buffersDropped.Load()), + zap.Uint64("buffersEnqueued", client.statistics.buffersEnqueued.Load()), + zap.Uint64("buffersProcessed", client.statistics.buffersProcessed.Load()), + zap.Uint64("buffersDropped", client.statistics.buffersDropped.Load()), zap.Duration("elapsedTime", time.Since(processingStart)), zap.Duration("maxElapsedTime", maxElapsedTime), ) @@ -352,30 +352,30 @@ func (client *DataSetClient) Shutdown() error { if client.isProcessingEvents() { lastError = fmt.Errorf( "not all events have been processed - %d", - client.eventsEnqueued.Load()-client.eventsProcessed.Load(), + client.statistics.eventsEnqueued.Load()-client.statistics.eventsProcessed.Load(), ) client.Logger.Error( "Shutting down - not all events have been processed", - zap.Uint64("eventsEnqueued", client.eventsEnqueued.Load()), - zap.Uint64("eventsProcessed", client.eventsProcessed.Load()), + zap.Uint64("eventsEnqueued", client.statistics.eventsEnqueued.Load()), + zap.Uint64("eventsProcessed", client.statistics.eventsProcessed.Load()), ) } if client.isProcessingBuffers() { lastError = fmt.Errorf( "not all buffers have been processed - %d", - client.buffersEnqueued.Load()-client.buffersProcessed.Load()-client.buffersDropped.Load(), + client.statistics.buffersEnqueued.Load()-client.statistics.buffersProcessed.Load()-client.statistics.buffersDropped.Load(), ) client.Logger.Error( "Shutting down - not all buffers have been processed", zap.Int("retryNum", retryNum), - zap.Uint64("buffersEnqueued", client.buffersEnqueued.Load()), - zap.Uint64("buffersProcessed", client.buffersProcessed.Load()), - zap.Uint64("buffersDropped", client.buffersDropped.Load()), + zap.Uint64("buffersEnqueued", client.statistics.buffersEnqueued.Load()), + zap.Uint64("buffersProcessed", client.statistics.buffersProcessed.Load()), + zap.Uint64("buffersDropped", client.statistics.buffersDropped.Load()), ) } - eventsDropped := client.eventsDropped.Load() - initialEventsDropped + eventsDropped := client.statistics.eventsDropped.Load() - initialEventsDropped if eventsDropped > 0 { lastError = fmt.Errorf( "some events were dropped during finishing - %d", @@ -387,7 +387,7 @@ func (client *DataSetClient) Shutdown() error { ) } - buffersDropped := client.buffersDropped.Load() - initialBuffersDropped + buffersDropped := client.statistics.buffersDropped.Load() - initialBuffersDropped if buffersDropped > 0 { lastError = fmt.Errorf( "some buffers were dropped during finishing - %d", @@ -446,7 +446,7 @@ func (client *DataSetClient) sendAddEventsBuffer(buf *buffer.Buffer) (*add_event } err = client.apiCall(httpRequest, resp) - client.bytesAPISent.Add(uint64(len(payload))) + client.statistics.bytesAPISent.Add(uint64(len(payload))) if strings.HasPrefix(resp.Status, "error") { client.Logger.Error( diff --git a/pkg/client/add_events_long_running_test.go b/pkg/client/add_events_long_running_test.go index a4a99e8..0c4aa1b 100644 --- a/pkg/client/add_events_long_running_test.go +++ b/pkg/client/add_events_long_running_test.go @@ -30,10 +30,9 @@ import ( "testing" "time" - "github.com/scalyr/dataset-go/pkg/server_host_config" - "github.com/scalyr/dataset-go/pkg/buffer_config" "github.com/scalyr/dataset-go/pkg/config" + "github.com/scalyr/dataset-go/pkg/server_host_config" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap" @@ -102,7 +101,7 @@ func TestAddEventsManyLogsShouldSucceed(t *testing.T) { }, ServerHostSettings: server_host_config.NewDefaultDataSetServerHostSettings(), } - sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment()), nil) + sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment()), nil, nil) require.Nil(t, err) sessionInfo := &add_events.SessionInfo{ServerId: "a", ServerType: "b"} diff --git a/pkg/client/add_events_test.go b/pkg/client/add_events_test.go index 15b6c8d..9e4e111 100644 --- a/pkg/client/add_events_test.go +++ b/pkg/client/add_events_test.go @@ -31,6 +31,9 @@ import ( "testing" "time" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/metric" + "github.com/scalyr/dataset-go/pkg/server_host_config" "github.com/stretchr/testify/assert" @@ -133,7 +136,7 @@ func TestAddEventsRetry(t *testing.T) { buffer_config.WithRetryInitialInterval(RetryBase), buffer_config.WithRetryMaxInterval(RetryBase), ), server_host_config.NewDefaultDataSetServerHostSettings()) - sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment()), nil) + sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment()), nil, nil) require.Nil(t, err) sessionInfo := &add_events.SessionInfo{ServerId: "a", ServerType: "b"} @@ -225,7 +228,7 @@ func TestAddEventsRetryAfterSec(t *testing.T) { buffer_config.WithRetryMaxInterval(RetryBase), buffer_config.WithRetryShutdownTimeout(10*time.Second), ), server_host_config.NewDefaultDataSetServerHostSettings()) - sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment()), nil) + sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment()), nil, nil) require.Nil(t, err) sessionInfo := &add_events.SessionInfo{ServerId: "a", ServerType: "b"} @@ -316,7 +319,7 @@ func TestAddEventsRetryAfterTime(t *testing.T) { buffer_config.WithRetryMaxInterval(RetryBase), buffer_config.WithRetryShutdownTimeout(10*time.Second), ), server_host_config.NewDefaultDataSetServerHostSettings()) - sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment()), nil) + sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment()), nil, nil) require.Nil(t, err) sessionInfo := &add_events.SessionInfo{ServerId: "a", ServerType: "b"} @@ -407,7 +410,7 @@ func TestAddEventsLargeEvent(t *testing.T) { buffer_config.WithRetryInitialInterval(RetryBase), buffer_config.WithRetryMaxInterval(RetryBase), ), *newDataSetServerHostSettings()) - sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment()), nil) + sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment()), nil, nil) require.Nil(t, err) sessionInfo := &add_events.SessionInfo{ServerId: "a", ServerType: "b"} @@ -507,7 +510,7 @@ func TestAddEventsLargeEventThatNeedEscaping(t *testing.T) { buffer_config.WithRetryMaxInterval(RetryBase), buffer_config.WithRetryShutdownTimeout(20*time.Second), ), *newDataSetServerHostSettings()) - sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment()), nil) + sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment()), nil, nil) require.Nil(t, err) sessionInfo := &add_events.SessionInfo{ServerId: "a", ServerType: "b"} @@ -531,7 +534,7 @@ func TestAddEventsRejectAfterFinish(t *testing.T) { buffer_config.WithRetryInitialInterval(RetryBase), buffer_config.WithRetryMaxInterval(RetryBase), ), server_host_config.NewDefaultDataSetServerHostSettings()) - sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment()), nil) + sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment()), nil, nil) require.Nil(t, err) err = sc.Shutdown() assert.Nil(t, err) @@ -579,7 +582,7 @@ func TestAddEventsWithBufferSweeper(t *testing.T) { }, ServerHostSettings: server_host_config.NewDefaultDataSetServerHostSettings(), } - sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment()), nil) + sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment()), nil, nil) require.Nil(t, err) sessionInfo := &add_events.SessionInfo{ServerId: "a", ServerType: "b"} @@ -613,7 +616,7 @@ func TestAddEventsDoNotRetryForever(t *testing.T) { config := newDataSetConfig(server.URL, *newBufferSettings( buffer_config.WithRetryMaxElapsedTime(time.Duration(5) * time.Second), ), server_host_config.NewDefaultDataSetServerHostSettings()) - sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment()), nil) + sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment()), nil, nil) require.Nil(t, err) sessionInfo := &add_events.SessionInfo{ServerId: "a", ServerType: "b"} @@ -652,7 +655,7 @@ func TestAddEventsLogResponseBodyOnInvalidJson(t *testing.T) { buffer_config.WithRetryMaxElapsedTime(time.Duration(30)*time.Second), buffer_config.WithRetryShutdownTimeout(time.Duration(6)*time.Second), ), server_host_config.NewDefaultDataSetServerHostSettings()) - sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment()), nil) + sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment()), nil, nil) require.Nil(t, err) sessionInfo := &add_events.SessionInfo{ServerId: "a", ServerType: "b"} @@ -683,7 +686,7 @@ func TestShutdownFinishesWithinExpectedTimeout(t *testing.T) { buffer_config.WithRetryMaxElapsedTime(time.Duration(30)*time.Second), buffer_config.WithRetryShutdownTimeout(time.Duration(retryShutdownTimeout)*time.Second), ), server_host_config.NewDefaultDataSetServerHostSettings()) - sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment()), nil) + sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment()), nil, nil) require.Nil(t, err) sessionInfo := &add_events.SessionInfo{ServerId: "a", ServerType: "b"} @@ -718,7 +721,7 @@ func TestAddEventsAreNotRejectedOncePreviousReqRetriesMaxLifetimeExpired(t *test buffer_config.WithRetryMaxElapsedTime(time.Duration(maxElapsedTime)*time.Second), buffer_config.WithRetryRandomizationFactor(0.000000001), ), server_host_config.NewDefaultDataSetServerHostSettings()) - client, err := NewClient(dataSetConfig, &http.Client{}, zap.Must(zap.NewDevelopment()), nil) + client, err := NewClient(dataSetConfig, &http.Client{}, zap.Must(zap.NewDevelopment()), nil, nil) require.Nil(t, err) sessionInfo := &add_events.SessionInfo{ServerId: "a", ServerType: "b"} @@ -749,7 +752,7 @@ func TestAddEventsAreRejectedOncePreviousReqRetriesMaxLifetimeNotExpired(t *test buffer_config.WithRetryMaxElapsedTime(time.Duration(maxElapsedTime)*time.Second), buffer_config.WithRetryRandomizationFactor(0.000000001), ), server_host_config.NewDefaultDataSetServerHostSettings()) - client, err := NewClient(dataSetConfig, &http.Client{}, zap.Must(zap.NewDevelopment()), nil) + client, err := NewClient(dataSetConfig, &http.Client{}, zap.Must(zap.NewDevelopment()), nil, nil) require.Nil(t, err) sessionInfo := &add_events.SessionInfo{ServerId: "a", ServerType: "b"} @@ -769,6 +772,49 @@ func TestAddEventsAreRejectedOncePreviousReqRetriesMaxLifetimeNotExpired(t *test assert.Errorf(t, err, "AddEvents - reject batch: rejecting - Last HTTP request contains an error: failed to handle previous request") } +func TestAddEventsWithMetrics(t *testing.T) { + meter := otel.Meter("test") + tests := []struct { + name string + meter *metric.Meter + }{ + { + name: "no meter", + meter: nil, + }, + { + name: "with meter", + meter: &meter, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(*testing.T) { + // GIVEN + attempt.Store(0) + server := mockServerDefaultPayload(t, http.StatusOK) + defer server.Close() + dataSetConfig := newDataSetConfig(server.URL, buffer_config.NewDefaultDataSetBufferSettings(), server_host_config.NewDefaultDataSetServerHostSettings()) + client, err := NewClient(dataSetConfig, &http.Client{}, zap.Must(zap.NewDevelopment()), nil, tt.meter) + require.Nil(t, err) + + sessionInfo := &add_events.SessionInfo{ServerId: "a", ServerType: "b"} + client.SessionInfo = sessionInfo + event1 := &add_events.Event{Thread: "5", Sev: 3, Ts: "0", Attrs: map[string]interface{}{"message": "test - 1"}} + eventBundle1 := &add_events.EventBundle{Event: event1, Thread: &add_events.Thread{Id: "5", Name: "fred"}} + + // WHEN + errAdd := client.AddEvents([]*add_events.EventBundle{eventBundle1}) + errShutdown := client.Shutdown() + + // THEN + assert.Nil(t, errAdd) + assert.Nil(t, errShutdown) + lastError := client.LastError() + assert.Nil(t, lastError) + }) + } +} + func TestAddEventsServerHostLogic(t *testing.T) { configServerHost := "global-server-host" ev1ServerHost := "host-1" @@ -1082,7 +1128,7 @@ func TestAddEventsServerHostLogic(t *testing.T) { UseHostName: false, ServerHost: configServerHost, }) - sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment()), nil) + sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment()), nil, nil) require.Nil(t, err) sessionInfo := &add_events.SessionInfo{ServerId: "a", ServerType: "b"} sc.SessionInfo = sessionInfo diff --git a/pkg/client/client.go b/pkg/client/client.go index a8e36e5..eb51167 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -27,8 +27,6 @@ import ( "sync/atomic" "time" - "github.com/scalyr/dataset-go/pkg/server_host_config" - "golang.org/x/exp/slices" "github.com/scalyr/dataset-go/pkg/version" @@ -38,11 +36,13 @@ import ( "github.com/scalyr/dataset-go/pkg/api/add_events" "github.com/scalyr/dataset-go/pkg/buffer" "github.com/scalyr/dataset-go/pkg/config" + "github.com/scalyr/dataset-go/pkg/server_host_config" _ "net/http/pprof" "github.com/cskr/pubsub" "github.com/google/uuid" + "go.opentelemetry.io/otel/metric" "go.uber.org/zap" ) @@ -69,12 +69,8 @@ type DataSetClient struct { Client *http.Client SessionInfo *add_events.SessionInfo // map of known Buffer //TODO introduce cleanup - buffers map[string]*buffer.Buffer - buffersAllMutex sync.Mutex - buffersEnqueued atomic.Uint64 - buffersProcessed atomic.Uint64 - buffersDropped atomic.Uint64 - buffersBroken atomic.Uint64 + buffers map[string]*buffer.Buffer + buffersAllMutex sync.Mutex // Pub/Sub topics of Buffers based on its session BufferPerSessionTopic *pubsub.PubSub LastHttpStatus atomic.Uint32 @@ -84,15 +80,9 @@ type DataSetClient struct { retryAfter time.Time retryAfterMu sync.RWMutex // indicates that client has been shut down and no further processing is possible - finished atomic.Bool - Logger *zap.Logger - eventsEnqueued atomic.Uint64 - eventsProcessed atomic.Uint64 - eventsDropped atomic.Uint64 - eventsBroken atomic.Uint64 - bytesAPISent atomic.Uint64 - bytesAPIAccepted atomic.Uint64 - addEventsMutex sync.Mutex + finished atomic.Bool + Logger *zap.Logger + addEventsMutex sync.Mutex // Pub/Sub topics of EventBundles based on its key eventBundlePerKeyTopic *pubsub.PubSub // map of known Subscription channels of eventBundlePerKeyTopic @@ -106,9 +96,16 @@ type DataSetClient struct { addEventsEndpointUrl string userAgent string serverHost string + statistics *Statistics } -func NewClient(cfg *config.DataSetConfig, client *http.Client, logger *zap.Logger, userAgentSuffix *string) (*DataSetClient, error) { +func NewClient( + cfg *config.DataSetConfig, + client *http.Client, + logger *zap.Logger, + userAgentSuffix *string, + meter *metric.Meter, +) (*DataSetClient, error) { logger.Info( "Using config: ", zap.String("config", cfg.String()), @@ -156,16 +153,17 @@ func NewClient(cfg *config.DataSetConfig, client *http.Client, logger *zap.Logge userAgent = userAgent + ";" + *userAgentSuffix } + statictics, err := NewStatistics(meter) + if err != nil { + return nil, fmt.Errorf("it was not possible to create statistics: %w", err) + } + dataClient := &DataSetClient{ Id: id, Config: cfg, Client: client, SessionInfo: &add_events.SessionInfo{}, buffers: make(map[string]*buffer.Buffer), - buffersEnqueued: atomic.Uint64{}, - buffersProcessed: atomic.Uint64{}, - buffersDropped: atomic.Uint64{}, - buffersBroken: atomic.Uint64{}, buffersAllMutex: sync.Mutex{}, BufferPerSessionTopic: pubsub.New(0), LastHttpStatus: atomic.Uint32{}, @@ -174,12 +172,6 @@ func NewClient(cfg *config.DataSetConfig, client *http.Client, logger *zap.Logge lastErrorMu: sync.RWMutex{}, Logger: logger, finished: atomic.Bool{}, - eventsEnqueued: atomic.Uint64{}, - eventsProcessed: atomic.Uint64{}, - eventsDropped: atomic.Uint64{}, - eventsBroken: atomic.Uint64{}, - bytesAPIAccepted: atomic.Uint64{}, - bytesAPISent: atomic.Uint64{}, addEventsMutex: sync.Mutex{}, eventBundlePerKeyTopic: pubsub.New(0), eventBundleSubscriptionChannels: make(map[string]chan interface{}), @@ -188,6 +180,7 @@ func NewClient(cfg *config.DataSetConfig, client *http.Client, logger *zap.Logge addEventsEndpointUrl: addEventsEndpointUrl, userAgent: userAgent, serverHost: serverHost, + statistics: statictics, } // run buffer sweeper if requested @@ -285,16 +278,16 @@ func (client *DataSetClient) listenAndSendBufferForSession(session string, ch ch zap.String("session", session), zap.Any("msg", msg), ) - client.buffersBroken.Add(1) + client.statistics.BuffersBrokenAdd(1) client.lastAcceptedAt.Store(time.Now().UnixNano()) continue } client.Logger.Debug("Received Buffer from channel", zap.String("session", session), zap.Int("processedMsgCnt", processedMsgCnt), - zap.Uint64("buffersEnqueued", client.buffersEnqueued.Load()), - zap.Uint64("buffersProcessed", client.buffersProcessed.Load()), - zap.Uint64("buffersDropped", client.buffersDropped.Load()), + zap.Uint64("buffersEnqueued", client.statistics.buffersEnqueued.Load()), + zap.Uint64("buffersProcessed", client.statistics.buffersProcessed.Load()), + zap.Uint64("buffersDropped", client.statistics.buffersDropped.Load()), ) buf, bufferReadSuccess := msg.(*buffer.Buffer) if bufferReadSuccess { @@ -304,11 +297,11 @@ func (client *DataSetClient) listenAndSendBufferForSession(session string, ch ch } if !buf.HasEvents() { client.Logger.Warn("Buffer is empty, skipping", buf.ZapStats()...) - client.buffersProcessed.Add(1) + client.statistics.BuffersProcessedAdd(1) continue } if client.sendBufferWithRetryPolicy(buf) { - client.buffersProcessed.Add(1) + client.statistics.BuffersProcessedAdd(1) } } else { client.Logger.Error( @@ -316,7 +309,7 @@ func (client *DataSetClient) listenAndSendBufferForSession(session string, ch ch zap.String("session", session), zap.Any("msg", msg), ) - client.buffersBroken.Add(1) + client.statistics.BuffersBrokenAdd(1) } client.lastAcceptedAt.Store(time.Now().UnixNano()) } @@ -375,7 +368,7 @@ func (client *DataSetClient) sendBufferWithRetryPolicy(buf *buffer.Buffer) bool if isOkStatus(lastHttpStatus) { // everything was fine, there is no need for retries - client.bytesAPIAccepted.Add(uint64(payloadLen)) + client.statistics.BytesAPIAcceptedAdd(uint64(payloadLen)) return true // exit loop (buffer sent) } @@ -422,7 +415,7 @@ func (client *DataSetClient) statisticsSweeper() { } // Statistics returns statistics about events, buffers processing from the start time -func (client *DataSetClient) Statistics() *Statistics { +func (client *DataSetClient) Statistics() *ExportedStatistics { // for how long are events being processed firstAt := time.Unix(0, client.firstReceivedAt.Load()) lastAt := time.Unix(0, client.lastAcceptedAt.Load()) @@ -434,49 +427,7 @@ func (client *DataSetClient) Statistics() *Statistics { return nil } - // log buffer stats - bProcessed := client.buffersProcessed.Load() - bEnqueued := client.buffersEnqueued.Load() - bDropped := client.buffersDropped.Load() - bBroken := client.buffersBroken.Load() - - buffersStats := QueueStats{ - bEnqueued, - bProcessed, - bDropped, - bBroken, - processingDur, - } - - // log events stats - eProcessed := client.eventsProcessed.Load() - eEnqueued := client.eventsEnqueued.Load() - eDropped := client.eventsDropped.Load() - eBroken := client.eventsBroken.Load() - - eventsStats := QueueStats{ - eEnqueued, - eProcessed, - eDropped, - eBroken, - processingDur, - } - - // log transferred stats - bAPISent := client.bytesAPISent.Load() - bAPIAccepted := client.bytesAPIAccepted.Load() - transferStats := TransferStats{ - bAPISent, - bAPIAccepted, - bProcessed, - processingDur, - } - - return &Statistics{ - Buffers: buffersStats, - Events: eventsStats, - Transfer: transferStats, - } + return client.statistics.Export(processingDur) } func (client *DataSetClient) logStatistics() { @@ -613,7 +564,7 @@ func (client *DataSetClient) publishBuffer(buf *buffer.Buffer) { client.Logger.Debug("publishing buffer", buf.ZapStats()...) // publish buffer so it can be sent - client.buffersEnqueued.Add(+1) + client.statistics.BuffersEnqueuedAdd(+1) client.BufferPerSessionTopic.Pub(buf, buf.Session) } @@ -709,7 +660,7 @@ func (client *DataSetClient) setRetryAfter(t time.Time) { } func (client *DataSetClient) onBufferDrop(buf *buffer.Buffer, status uint32, err error) { - client.buffersDropped.Add(1) + client.statistics.BuffersDroppedAdd(1) client.Logger.Error("Dropping buffer", buf.ZapStats( zap.Uint32("httpStatus", status), diff --git a/pkg/client/client_test.go b/pkg/client/client_test.go index f269dbe..8f3f7ff 100644 --- a/pkg/client/client_test.go +++ b/pkg/client/client_test.go @@ -47,7 +47,7 @@ func TestNewClient(t *testing.T) { t.Setenv("SCALYR_READCONFIG_TOKEN", "readconfig") cfg, err := config.New(config.FromEnv()) assert.Nil(t, err) - sc4, err := NewClient(cfg, nil, zap.Must(zap.NewDevelopment()), nil) + sc4, err := NewClient(cfg, nil, zap.Must(zap.NewDevelopment()), nil, nil) require.Nil(t, err) assert.Equal(t, sc4.Config.Tokens.ReadLog, "readlog") assert.Equal(t, sc4.Config.Tokens.WriteLog, "writelog") @@ -67,7 +67,7 @@ func TestClientBuffer(t *testing.T) { Tokens: config.DataSetTokens{WriteLog: token}, BufferSettings: buffer_config.NewDefaultDataSetBufferSettings(), ServerHostSettings: server_host_config.NewDefaultDataSetServerHostSettings(), - }, &http.Client{}, zap.Must(zap.NewDevelopment()), nil) + }, &http.Client{}, zap.Must(zap.NewDevelopment()), nil, nil) require.Nil(t, err) sessionInfo := add_events.SessionInfo{ @@ -199,7 +199,7 @@ func TestAddEventsEndpointUrlWithoutTrailingSlash(t *testing.T) { t.Setenv("SCALYR_SERVER", "https://app.scalyr.com") cfg, err := config.New(config.FromEnv()) assert.Nil(t, err) - sc, err := NewClient(cfg, nil, zap.Must(zap.NewDevelopment()), nil) + sc, err := NewClient(cfg, nil, zap.Must(zap.NewDevelopment()), nil, nil) require.Nil(t, err) assert.Equal(t, sc.addEventsEndpointUrl, "https://app.scalyr.com/api/addEvents") } @@ -208,7 +208,7 @@ func TestAddEventsEndpointUrlWithTrailingSlash(t *testing.T) { t.Setenv("SCALYR_SERVER", "https://app.scalyr.com/") cfg2, err := config.New(config.FromEnv()) assert.Nil(t, err) - sc2, err := NewClient(cfg2, nil, zap.Must(zap.NewDevelopment()), nil) + sc2, err := NewClient(cfg2, nil, zap.Must(zap.NewDevelopment()), nil, nil) require.Nil(t, err) assert.Equal(t, sc2.addEventsEndpointUrl, "https://app.scalyr.com/api/addEvents") } @@ -219,7 +219,7 @@ func TestUserAgent(t *testing.T) { numCpu := fmt.Sprint(runtime.NumCPU()) cfg, err := config.New(config.FromEnv()) assert.Nil(t, err) - client, err := NewClient(cfg, nil, zap.Must(zap.NewDevelopment()), &libraryConsumerUserAgentSuffix) + client, err := NewClient(cfg, nil, zap.Must(zap.NewDevelopment()), &libraryConsumerUserAgentSuffix, nil) clientId := client.Id.String() require.Nil(t, err) assert.Equal(t, client.userAgent, "dataset-go;"+version.Version+";"+version.ReleasedDate+";"+clientId+";"+runtime.GOOS+";"+runtime.GOARCH+";"+numCpu+";"+libraryConsumerUserAgentSuffix) @@ -230,7 +230,7 @@ func TestUserAgentWithoutCollectorAttrs(t *testing.T) { numCpu := fmt.Sprint(runtime.NumCPU()) cfg, err := config.New(config.FromEnv()) assert.Nil(t, err) - client, err := NewClient(cfg, nil, zap.Must(zap.NewDevelopment()), nil) + client, err := NewClient(cfg, nil, zap.Must(zap.NewDevelopment()), nil, nil) clientId := client.Id.String() require.Nil(t, err) assert.Equal(t, client.userAgent, "dataset-go;"+version.Version+";"+version.ReleasedDate+";"+clientId+";"+runtime.GOOS+";"+runtime.GOARCH+";"+numCpu) diff --git a/pkg/client/statistics.go b/pkg/client/statistics.go index 06500b0..1fe1ab6 100644 --- a/pkg/client/statistics.go +++ b/pkg/client/statistics.go @@ -1,9 +1,227 @@ package client import ( + "context" + "sync/atomic" "time" + + "go.opentelemetry.io/otel/metric" ) +func key(key string) string { + return "dataset." + key +} + +type Statistics struct { + buffersEnqueued atomic.Uint64 + buffersProcessed atomic.Uint64 + buffersDropped atomic.Uint64 + buffersBroken atomic.Uint64 + + eventsEnqueued atomic.Uint64 + eventsProcessed atomic.Uint64 + eventsDropped atomic.Uint64 + eventsBroken atomic.Uint64 + + bytesAPISent atomic.Uint64 + bytesAPIAccepted atomic.Uint64 + + meter *metric.Meter + + cBuffersEnqueued metric.Int64UpDownCounter + cBuffersProcessed metric.Int64UpDownCounter + cBuffersDropped metric.Int64UpDownCounter + cBuffersBroken metric.Int64UpDownCounter + + cEventsEnqueued metric.Int64UpDownCounter + cEventsProcessed metric.Int64UpDownCounter + cEventsDropped metric.Int64UpDownCounter + cEventsBroken metric.Int64UpDownCounter + + cBytesAPISent metric.Int64UpDownCounter + cBytesAPIAccepted metric.Int64UpDownCounter +} + +func NewStatistics(meter *metric.Meter) (*Statistics, error) { + statistics := &Statistics{ + buffersEnqueued: atomic.Uint64{}, + buffersProcessed: atomic.Uint64{}, + buffersDropped: atomic.Uint64{}, + buffersBroken: atomic.Uint64{}, + + eventsEnqueued: atomic.Uint64{}, + eventsProcessed: atomic.Uint64{}, + eventsDropped: atomic.Uint64{}, + eventsBroken: atomic.Uint64{}, + + bytesAPIAccepted: atomic.Uint64{}, + bytesAPISent: atomic.Uint64{}, + + meter: meter, + } + + err := statistics.initMetrics(meter) + + return statistics, err +} + +func (stats *Statistics) initMetrics(meter *metric.Meter) error { + // if there is no meter, there is no need to initialise counters + if meter == nil { + return nil + } + + stats.meter = meter + + err := error(nil) + stats.cBuffersEnqueued, err = (*meter).Int64UpDownCounter(key("buffersEnqueued")) + if err != nil { + return err + } + stats.cBuffersProcessed, err = (*meter).Int64UpDownCounter(key("buffersProcessed")) + if err != nil { + return err + } + stats.cBuffersDropped, err = (*meter).Int64UpDownCounter(key("buffersDropped")) + if err != nil { + return err + } + stats.cBuffersBroken, err = (*meter).Int64UpDownCounter(key("buffersBroken")) + if err != nil { + return err + } + + stats.cEventsEnqueued, err = (*meter).Int64UpDownCounter(key("eventsEnqueued")) + if err != nil { + return err + } + stats.cEventsProcessed, err = (*meter).Int64UpDownCounter(key("eventsProcessed")) + if err != nil { + return err + } + stats.cEventsDropped, err = (*meter).Int64UpDownCounter(key("eventsDropped")) + if err != nil { + return err + } + stats.cEventsBroken, err = (*meter).Int64UpDownCounter(key("eventsBroken")) + if err != nil { + return err + } + + stats.cBytesAPISent, err = (*meter).Int64UpDownCounter(key("bytesAPISent")) + if err != nil { + return err + } + stats.cBytesAPIAccepted, err = (*meter).Int64UpDownCounter(key("bytesAPIAccepted")) + if err != nil { + return err + } + + return err +} + +func (stats *Statistics) BuffersEnqueuedAdd(i uint64) { + stats.buffersEnqueued.Add(i) + stats.add(stats.cBuffersEnqueued, i) +} + +func (stats *Statistics) BuffersProcessedAdd(i uint64) { + stats.buffersProcessed.Add(i) + stats.add(stats.cBuffersProcessed, i) +} + +func (stats *Statistics) BuffersDroppedAdd(i uint64) { + stats.buffersDropped.Add(i) + stats.add(stats.cBuffersDropped, i) +} + +func (stats *Statistics) BuffersBrokenAdd(i uint64) { + stats.buffersBroken.Add(i) + stats.add(stats.cBuffersBroken, i) +} + +func (stats *Statistics) EventsEnqueuedAdd(i uint64) { + stats.eventsEnqueued.Add(i) + stats.add(stats.cEventsEnqueued, i) +} + +func (stats *Statistics) EventsProcessedAdd(i uint64) { + stats.eventsProcessed.Add(i) + stats.add(stats.cEventsProcessed, i) +} + +func (stats *Statistics) EventsDroppedAdd(i uint64) { + stats.eventsDropped.Add(i) + stats.add(stats.cEventsDropped, i) +} + +func (stats *Statistics) EventsBrokenAdd(i uint64) { + stats.eventsBroken.Add(i) + stats.add(stats.cEventsBroken, i) +} + +func (stats *Statistics) BytesAPISentAdd(i uint64) { + stats.bytesAPISent.Add(i) + stats.add(stats.cBytesAPISent, i) +} + +func (stats *Statistics) BytesAPIAcceptedAdd(i uint64) { + stats.bytesAPIAccepted.Add(i) + stats.add(stats.cBytesAPIAccepted, i) +} + +func (stats *Statistics) add(counter metric.Int64UpDownCounter, i uint64) { + if counter != nil { + counter.Add(context.Background(), int64(i)) + } +} + +func (stats *Statistics) Export(processingDur time.Duration) *ExportedStatistics { + // log buffer stats + bProcessed := stats.buffersProcessed.Load() + bEnqueued := stats.buffersEnqueued.Load() + bDropped := stats.buffersDropped.Load() + bBroken := stats.buffersBroken.Load() + + buffersStats := QueueStats{ + bEnqueued, + bProcessed, + bDropped, + bBroken, + processingDur, + } + + // log events stats + eProcessed := stats.eventsProcessed.Load() + eEnqueued := stats.eventsEnqueued.Load() + eDropped := stats.eventsDropped.Load() + eBroken := stats.eventsBroken.Load() + + eventsStats := QueueStats{ + eEnqueued, + eProcessed, + eDropped, + eBroken, + processingDur, + } + + // log transferred stats + bAPISent := stats.bytesAPISent.Load() + bAPIAccepted := stats.bytesAPIAccepted.Load() + transferStats := TransferStats{ + bAPISent, + bAPIAccepted, + bProcessed, + processingDur, + } + + return &ExportedStatistics{ + Buffers: buffersStats, + Events: eventsStats, + Transfer: transferStats, + } +} + // QueueStats stores statistics related to the queue processing type QueueStats struct { // enqueued is number of items that has been accepted for processing @@ -120,7 +338,7 @@ func (stats TransferStats) ProcessingTime() time.Duration { // Statistics store statistics about queues and transferred data // These are statistics from the beginning of the processing -type Statistics struct { +type ExportedStatistics struct { // Events stores statistics about processing events Events QueueStats `mapstructure:"events"` // Buffers stores statistics about processing buffers