diff --git a/pkg/statistics/statistics.go b/pkg/statistics/statistics.go index 0107fe7..4e8fa91 100644 --- a/pkg/statistics/statistics.go +++ b/pkg/statistics/statistics.go @@ -26,10 +26,7 @@ import ( "go.opentelemetry.io/otel/metric" ) -func key(key string) string { - return "dataset." + key -} - +// Statistics related to data processing type Statistics struct { buffersEnqueued atomic.Uint64 buffersProcessed atomic.Uint64 @@ -63,6 +60,9 @@ type Statistics struct { hResponseTime metric.Int64Histogram } +// NewStatistics creates structure to keep track of data processing. +// If meter is not nil, then Open Telemetry is used for collecting metrics +// as well. func NewStatistics(meter *metric.Meter) (*Statistics, error) { statistics := &Statistics{ buffersEnqueued: atomic.Uint64{}, @@ -86,6 +86,10 @@ func NewStatistics(meter *metric.Meter) (*Statistics, error) { return statistics, err } +func key(key string) string { + return "dataset." + key +} + func (stats *Statistics) initMetrics(meter *metric.Meter) error { // if there is no meter, there is no need to initialise counters if meter == nil { @@ -275,6 +279,7 @@ func (stats *Statistics) add(counter metric.Int64UpDownCounter, i uint64) { } } +// Export exports statistics related to the processing func (stats *Statistics) Export(processingDur time.Duration) *ExportedStatistics { // log buffer stats bProcessed := stats.buffersProcessed.Load() diff --git a/pkg/statistics/statistics_test.go b/pkg/statistics/statistics_test.go index b0f772b..91c6b41 100644 --- a/pkg/statistics/statistics_test.go +++ b/pkg/statistics/statistics_test.go @@ -209,4 +209,40 @@ func TestExport(t *testing.T) { processingTime: time.Second, }, }, exp) + + assert.Equal(t, exp.Buffers.Enqueued(), uint64(1000)) + assert.Equal(t, exp.Buffers.Processed(), uint64(100)) + assert.Equal(t, exp.Buffers.Dropped(), uint64(10)) + assert.Equal(t, exp.Buffers.Broken(), uint64(1)) + assert.Equal(t, exp.Buffers.Waiting(), uint64(889)) + assert.Equal(t, exp.Buffers.ProcessingTime(), time.Second) + assert.Equal(t, exp.Buffers.SuccessRate(), .89) + + assert.Equal(t, exp.Events.Enqueued(), uint64(2000)) + assert.Equal(t, exp.Events.Processed(), uint64(200)) + assert.Equal(t, exp.Events.Dropped(), uint64(20)) + assert.Equal(t, exp.Events.Broken(), uint64(2)) + assert.Equal(t, exp.Events.Waiting(), uint64(1778)) + assert.Equal(t, exp.Events.ProcessingTime(), time.Second) + assert.Equal(t, exp.Events.SuccessRate(), .89) + + assert.Equal(t, exp.Transfer.BytesSent(), uint64(3000)) + assert.Equal(t, exp.Transfer.BytesAccepted(), uint64(300)) + assert.Equal(t, exp.Transfer.BuffersProcessed(), uint64(100)) + assert.Equal(t, exp.Transfer.SuccessRate(), float64(0.1)) + assert.Equal(t, exp.Transfer.ThroughputBpS(), float64(300)) + assert.Equal(t, exp.Transfer.AvgBufferBytes(), float64(3)) + assert.Equal(t, exp.Transfer.ProcessingTime(), time.Second) +} + +func TestExportNoTraffic(t *testing.T) { + stats, err := NewStatistics(nil) + require.Nil(t, err) + + exp := stats.Export(time.Second) + assert.Equal(t, exp.Events.SuccessRate(), float64(0)) + assert.Equal(t, exp.Buffers.SuccessRate(), float64(0)) + assert.Equal(t, exp.Transfer.SuccessRate(), float64(0)) + assert.Equal(t, exp.Transfer.ThroughputBpS(), float64(0)) + assert.Equal(t, exp.Transfer.AvgBufferBytes(), float64(0)) }