Skip to content

Commit

Permalink
DSET-4558: Add metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
martin-majlis-s1 committed Nov 14, 2023
1 parent 5f915d0 commit cbc0d2e
Show file tree
Hide file tree
Showing 12 changed files with 375 additions and 136 deletions.
6 changes: 3 additions & 3 deletions examples/client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ go 1.19

require (
github.com/scalyr/dataset-go v0.0.0
go.uber.org/zap v1.24.0
go.uber.org/zap v1.26.0
)

require (
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/cskr/pubsub v1.0.2 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/google/uuid v1.4.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/multierr v1.10.0 // indirect
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df // indirect
)

Expand Down
4 changes: 4 additions & 0 deletions examples/client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand All @@ -26,8 +27,11 @@ go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60=
go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg=
go.uber.org/zap v1.25.0/go.mod h1:JIAUzQIH94IC4fOJQm7gMmBJP5k7wQfdcnYdPoEXJYk=
go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so=
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df h1:UA2aFVmmsIlefxMk29Dp2juaUSth8Pyn3Tq5Y5mJGME=
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc=
golang.org/x/mod v0.11.0 h1:bUO06HqtnRcc/7l71XBe4WcqTZ+3AH1J59zWDDwLKgU=
Expand Down
6 changes: 3 additions & 3 deletions examples/readme/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ go 1.19

require (
github.com/scalyr/dataset-go v0.0.0
go.uber.org/zap v1.24.0
go.uber.org/zap v1.26.0
)

require (
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/cskr/pubsub v1.0.2 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/google/uuid v1.4.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/multierr v1.10.0 // indirect
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df // indirect
)

Expand Down
4 changes: 4 additions & 0 deletions examples/readme/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand All @@ -26,8 +27,11 @@ go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60=
go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg=
go.uber.org/zap v1.25.0/go.mod h1:JIAUzQIH94IC4fOJQm7gMmBJP5k7wQfdcnYdPoEXJYk=
go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so=
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df h1:UA2aFVmmsIlefxMk29Dp2juaUSth8Pyn3Tq5Y5mJGME=
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc=
golang.org/x/mod v0.11.0 h1:bUO06HqtnRcc/7l71XBe4WcqTZ+3AH1J59zWDDwLKgU=
Expand Down
5 changes: 5 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,18 @@ require (
github.com/cskr/pubsub v1.0.2
github.com/google/uuid v1.4.0
github.com/stretchr/testify v1.8.4
go.opentelemetry.io/otel v1.20.0
go.opentelemetry.io/otel/metric v1.20.0
go.uber.org/zap v1.26.0
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-logr/logr v1.3.0 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opentelemetry.io/otel/trace v1.20.0 // indirect
go.uber.org/multierr v1.10.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
12 changes: 12 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,24 @@ github.com/cskr/pubsub v1.0.2 h1:vlOzMhl6PFn60gRlTQQsIfVwaPB/B/8MziK8FhEPt/0=
github.com/cskr/pubsub v1.0.2/go.mod h1:/8MzYXk/NJAz782G8RPkFzXTZVu63VotefPnR9TIRis=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY=
github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4=
github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
go.opentelemetry.io/otel v1.20.0 h1:vsb/ggIY+hUjD/zCAQHpzTmndPqv/ml2ArbsbfBYTAc=
go.opentelemetry.io/otel v1.20.0/go.mod h1:oUIGj3D77RwJdM6PPZImDpSZGDvkD9fhesHny69JFrs=
go.opentelemetry.io/otel/metric v1.20.0 h1:ZlrO8Hu9+GAhnepmRGhSU7/VkpjrNowxRN9GyKR4wzA=
go.opentelemetry.io/otel/metric v1.20.0/go.mod h1:90DRw3nfK4D7Sm/75yQ00gTJxtkBxX+wu6YaNymbpVM=
go.opentelemetry.io/otel/trace v1.20.0 h1:+yxVAPZPbQhbC3OfAkeIVTky6iTFpcr4SiY9om7mXSQ=
go.opentelemetry.io/otel/trace v1.20.0/go.mod h1:HJSK7F/hA5RlzpZ0zKDCHCDHm556LCDtKaAo6JmBFUU=
go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=
go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ=
go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
Expand Down
50 changes: 25 additions & 25 deletions pkg/client/add_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (client *DataSetClient) AddEvents(bundles []*add_events.EventBundle) error
for _, bundle := range bundles {
key := bundle.Key(client.Config.BufferSettings.GroupBy)
client.eventBundlePerKeyTopic.Pub(bundle, key)
client.eventsEnqueued.Add(1)
client.statistics.EventsEnqueuedAdd(1)
}

return nil
Expand Down Expand Up @@ -154,7 +154,7 @@ func (client *DataSetClient) listenAndSendBundlesForKey(key string, ch chan inte
zap.String("key", key),
zap.Any("msg", msg),
)
client.eventsBroken.Add(1)
client.statistics.EventsBrokenAdd(1)
client.lastAcceptedAt.Store(time.Now().UnixNano())
continue
}
Expand Down Expand Up @@ -190,7 +190,7 @@ func (client *DataSetClient) listenAndSendBundlesForKey(key string, ch chan inte
buf = getBuffer(key)
} else {
client.Logger.Error("Cannot add bundle", zap.Error(err))
client.eventsDropped.Add(1)
client.statistics.EventsDroppedAdd(1)
continue
}
}
Expand All @@ -199,11 +199,11 @@ func (client *DataSetClient) listenAndSendBundlesForKey(key string, ch chan inte
}
if added == buffer.TooMuch {
client.Logger.Fatal("Bundle was not added for second time!", buf.ZapStats()...)
client.eventsDropped.Add(1)
client.statistics.EventsDroppedAdd(1)
continue
}
}
client.eventsProcessed.Add(1)
client.statistics.EventsProcessedAdd(1)

buf.SetStatus(buffer.Ready)
// it could happen that the buffer could have been published
Expand All @@ -218,21 +218,21 @@ func (client *DataSetClient) listenAndSendBundlesForKey(key string, ch chan inte
zap.String("key", key),
zap.Any("msg", msg),
)
client.eventsBroken.Add(1)
client.statistics.EventsBrokenAdd(1)
}
}
}

// isProcessingBuffers returns True if there are still some unprocessed buffers.
// False otherwise.
func (client *DataSetClient) isProcessingBuffers() bool {
return client.buffersEnqueued.Load() > (client.buffersProcessed.Load() + client.buffersDropped.Load() + client.buffersBroken.Load())
return client.statistics.buffersEnqueued.Load() > (client.statistics.buffersProcessed.Load() + client.statistics.buffersDropped.Load() + client.statistics.buffersBroken.Load())
}

// isProcessingEvents returns True if there are still some unprocessed events.
// False otherwise.
func (client *DataSetClient) isProcessingEvents() bool {
return client.eventsEnqueued.Load() > (client.eventsProcessed.Load() + client.eventsDropped.Load() + client.eventsBroken.Load())
return client.statistics.eventsEnqueued.Load() > (client.statistics.eventsProcessed.Load() + client.statistics.eventsDropped.Load() + client.statistics.eventsBroken.Load())
}

// Shutdown takes care of shutdown of client. It does following steps
Expand Down Expand Up @@ -273,7 +273,7 @@ func (client *DataSetClient) Shutdown() error {
// try (with timeout) to process (add into buffers) events,
retryNum := 0
expBackoff.Reset()
initialEventsDropped := client.eventsDropped.Load()
initialEventsDropped := client.statistics.eventsDropped.Load()
for client.isProcessingEvents() {
// log statistics
client.logStatistics()
Expand All @@ -283,8 +283,8 @@ func (client *DataSetClient) Shutdown() error {
"Shutting down - processing events",
zap.Int("retryNum", retryNum),
zap.Duration("backoffDelay", backoffDelay),
zap.Uint64("eventsEnqueued", client.eventsEnqueued.Load()),
zap.Uint64("eventsProcessed", client.eventsProcessed.Load()),
zap.Uint64("eventsEnqueued", client.statistics.eventsEnqueued.Load()),
zap.Uint64("eventsProcessed", client.statistics.eventsProcessed.Load()),
zap.Duration("elapsedTime", time.Since(processingStart)),
zap.Duration("maxElapsedTime", maxElapsedTime),
)
Expand Down Expand Up @@ -325,7 +325,7 @@ func (client *DataSetClient) Shutdown() error {
// do wait (with timeout) for all buffers to be sent to the server
retryNum = 0
expBackoff.Reset()
initialBuffersDropped := client.buffersDropped.Load()
initialBuffersDropped := client.statistics.buffersDropped.Load()
for client.isProcessingBuffers() {
// log statistics
client.logStatistics()
Expand All @@ -335,9 +335,9 @@ func (client *DataSetClient) Shutdown() error {
"Shutting down - processing buffers",
zap.Int("retryNum", retryNum),
zap.Duration("backoffDelay", backoffDelay),
zap.Uint64("buffersEnqueued", client.buffersEnqueued.Load()),
zap.Uint64("buffersProcessed", client.buffersProcessed.Load()),
zap.Uint64("buffersDropped", client.buffersDropped.Load()),
zap.Uint64("buffersEnqueued", client.statistics.buffersEnqueued.Load()),
zap.Uint64("buffersProcessed", client.statistics.buffersProcessed.Load()),
zap.Uint64("buffersDropped", client.statistics.buffersDropped.Load()),
zap.Duration("elapsedTime", time.Since(processingStart)),
zap.Duration("maxElapsedTime", maxElapsedTime),
)
Expand All @@ -352,30 +352,30 @@ func (client *DataSetClient) Shutdown() error {
if client.isProcessingEvents() {
lastError = fmt.Errorf(
"not all events have been processed - %d",
client.eventsEnqueued.Load()-client.eventsProcessed.Load(),
client.statistics.eventsEnqueued.Load()-client.statistics.eventsProcessed.Load(),
)
client.Logger.Error(
"Shutting down - not all events have been processed",
zap.Uint64("eventsEnqueued", client.eventsEnqueued.Load()),
zap.Uint64("eventsProcessed", client.eventsProcessed.Load()),
zap.Uint64("eventsEnqueued", client.statistics.eventsEnqueued.Load()),
zap.Uint64("eventsProcessed", client.statistics.eventsProcessed.Load()),
)
}

if client.isProcessingBuffers() {
lastError = fmt.Errorf(
"not all buffers have been processed - %d",
client.buffersEnqueued.Load()-client.buffersProcessed.Load()-client.buffersDropped.Load(),
client.statistics.buffersEnqueued.Load()-client.statistics.buffersProcessed.Load()-client.statistics.buffersDropped.Load(),
)
client.Logger.Error(
"Shutting down - not all buffers have been processed",
zap.Int("retryNum", retryNum),
zap.Uint64("buffersEnqueued", client.buffersEnqueued.Load()),
zap.Uint64("buffersProcessed", client.buffersProcessed.Load()),
zap.Uint64("buffersDropped", client.buffersDropped.Load()),
zap.Uint64("buffersEnqueued", client.statistics.buffersEnqueued.Load()),
zap.Uint64("buffersProcessed", client.statistics.buffersProcessed.Load()),
zap.Uint64("buffersDropped", client.statistics.buffersDropped.Load()),
)
}

eventsDropped := client.eventsDropped.Load() - initialEventsDropped
eventsDropped := client.statistics.eventsDropped.Load() - initialEventsDropped
if eventsDropped > 0 {
lastError = fmt.Errorf(
"some events were dropped during finishing - %d",
Expand All @@ -387,7 +387,7 @@ func (client *DataSetClient) Shutdown() error {
)
}

buffersDropped := client.buffersDropped.Load() - initialBuffersDropped
buffersDropped := client.statistics.buffersDropped.Load() - initialBuffersDropped
if buffersDropped > 0 {
lastError = fmt.Errorf(
"some buffers were dropped during finishing - %d",
Expand Down Expand Up @@ -446,7 +446,7 @@ func (client *DataSetClient) sendAddEventsBuffer(buf *buffer.Buffer) (*add_event
}

err = client.apiCall(httpRequest, resp)
client.bytesAPISent.Add(uint64(len(payload)))
client.statistics.bytesAPISent.Add(uint64(len(payload)))

if strings.HasPrefix(resp.Status, "error") {
client.Logger.Error(
Expand Down
5 changes: 2 additions & 3 deletions pkg/client/add_events_long_running_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,9 @@ import (
"testing"
"time"

"github.com/scalyr/dataset-go/pkg/server_host_config"

"github.com/scalyr/dataset-go/pkg/buffer_config"
"github.com/scalyr/dataset-go/pkg/config"
"github.com/scalyr/dataset-go/pkg/server_host_config"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
Expand Down Expand Up @@ -102,7 +101,7 @@ func TestAddEventsManyLogsShouldSucceed(t *testing.T) {
},
ServerHostSettings: server_host_config.NewDefaultDataSetServerHostSettings(),
}
sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment()), nil)
sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment()), nil, nil)
require.Nil(t, err)

sessionInfo := &add_events.SessionInfo{ServerId: "a", ServerType: "b"}
Expand Down
Loading

0 comments on commit cbc0d2e

Please sign in to comment.