Skip to content

Commit

Permalink
feat(graphqlmetrics): allow measurement of total request count per day (
Browse files Browse the repository at this point in the history
  • Loading branch information
Noroth authored Nov 15, 2024
1 parent 9558ece commit d29b462
Show file tree
Hide file tree
Showing 3 changed files with 307 additions and 103 deletions.
171 changes: 110 additions & 61 deletions graphqlmetrics/core/metrics_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"context"
"errors"
"fmt"
"github.com/dgraph-io/ristretto"
"github.com/wundergraph/cosmo/graphqlmetrics/pkg/batchprocessor"
"sort"
"strconv"
"strings"
Expand All @@ -16,7 +14,9 @@ import (
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/avast/retry-go"
"github.com/dgraph-io/ristretto"
graphqlmetricsv1 "github.com/wundergraph/cosmo/graphqlmetrics/gen/proto/wg/cosmo/graphqlmetrics/v1"
"github.com/wundergraph/cosmo/graphqlmetrics/pkg/batchprocessor"
utils "github.com/wundergraph/cosmo/graphqlmetrics/pkg/utils"
"go.uber.org/zap"
)
Expand All @@ -28,8 +28,9 @@ var (
// SchemaUsageRequestItem is a struct which holds information about the schema usage
// and the JWT claims of a request.
type SchemaUsageRequestItem struct {
SchemaUsage []*graphqlmetricsv1.SchemaUsageInfo
Claims *utils.GraphAPITokenClaims
SchemaUsage []*graphqlmetricsv1.SchemaUsageInfo
Claims *utils.GraphAPITokenClaims
TotalRequestCount uint64
}

type ProcessorConfig struct {
Expand Down Expand Up @@ -100,8 +101,9 @@ func (s *MetricsService) PublishGraphQLMetrics(
}

s.processor.Push(SchemaUsageRequestItem{
SchemaUsage: req.Msg.SchemaUsage,
Claims: claims,
SchemaUsage: req.Msg.SchemaUsage,
Claims: claims,
TotalRequestCount: 1,
})

return res, nil
Expand All @@ -119,8 +121,11 @@ func (s *MetricsService) PublishAggregatedGraphQLMetrics(ctx context.Context, re
return res, nil
}

var totalRequestCount uint64

schemaUsage := make([]*graphqlmetricsv1.SchemaUsageInfo, len(req.Msg.Aggregation))
for i, agg := range req.Msg.Aggregation {
totalRequestCount += agg.RequestCount
for j := range agg.SchemaUsage.ArgumentMetrics {
agg.SchemaUsage.ArgumentMetrics[j].Count = agg.RequestCount
}
Expand All @@ -134,8 +139,9 @@ func (s *MetricsService) PublishAggregatedGraphQLMetrics(ctx context.Context, re
}

s.processor.Push(SchemaUsageRequestItem{
SchemaUsage: schemaUsage,
Claims: claims,
SchemaUsage: schemaUsage,
Claims: claims,
TotalRequestCount: totalRequestCount,
})

return res, nil
Expand All @@ -152,19 +158,42 @@ func (s *MetricsService) Shutdown(timeout time.Duration) {
}
}

// prepareClickhouseBatches prepares the operation and metric batches for the given schema usage.
// It returns the operation and metric batch. Either batch can be nil if there is nothing to write.
// prepareClickhouseBatches prepares the clickhouse batches for the given batch
// of schema usage items. It returns the operation, metric and request count batches.
// If there is nothing to be processed, the corresponding batch will be nil.
func (s *MetricsService) prepareClickhouseBatches(
ctx context.Context, insertTime time.Time, batch []SchemaUsageRequestItem,
) (driver.Batch, driver.Batch) {
) (driver.Batch, driver.Batch, driver.Batch) {
var (
err error
operationBatch, metricBatch driver.Batch
err error
operationBatch driver.Batch
metricBatch driver.Batch
requestCountBatch driver.Batch

hasMetrics = false
)

if len(batch) == 0 {
return nil, nil, nil
}

if requestCountBatch, err = s.conn.PrepareBatch(ctx, `INSERT INTO gql_metrics_router_requests`); err != nil {
s.logger.Error("Failed to prepare request count batch", zap.Error(err))
}

for _, item := range batch {
if requestCountBatch != nil {
err = requestCountBatch.Append(
insertTime,
item.Claims.OrganizationID,
item.Claims.FederatedGraphID,
item.TotalRequestCount,
)
if err != nil {
s.logger.Error("Failed to append request count to batch", zap.Error(err))
}
}

for _, su := range item.SchemaUsage {
// We will take care of metrics later, but we can already check if there are any
// metrics to process to save some time later.
Expand Down Expand Up @@ -206,7 +235,7 @@ func (s *MetricsService) prepareClickhouseBatches(

// If we do not have any metrics to process, we can return early.
if !hasMetrics {
return operationBatch, nil
return operationBatch, nil, requestCountBatch
}

for _, item := range batch {
Expand All @@ -232,7 +261,7 @@ func (s *MetricsService) prepareClickhouseBatches(
}
}

return operationBatch, metricBatch
return operationBatch, metricBatch, requestCountBatch
}

func (s *MetricsService) appendUsageMetrics(
Expand Down Expand Up @@ -342,71 +371,91 @@ func (s *MetricsService) appendUsageMetrics(
return nil
}

func (s *MetricsService) processBatch(ctx context.Context, batch []SchemaUsageRequestItem) {
var (
storedOperations, storedMetrics int
)
func (s *MetricsService) processBatch(_ context.Context, batch []SchemaUsageRequestItem) {
var storedOperations, storedMetrics int

insertTime := time.Now()
insertCtx := context.Background()

operationsBatch, metricsBatch := s.prepareClickhouseBatches(insertCtx, insertTime, batch)
operationsBatch, metricsBatch, requestCountBatch := s.prepareClickhouseBatches(insertCtx, insertTime, batch)

var wg sync.WaitGroup

if operationsBatch != nil {
wg.Add(1)
go func() {
defer wg.Done()
wg.Add(1)
go func() {
defer wg.Done()

err := retryOnError(insertCtx, s.logger.With(zap.String("component", "operations")), func(ctx context.Context) error {
if err := operationsBatch.Send(); err != nil {
return fmt.Errorf("failed to send operation batch: %w", err)
}
if operationsBatch == nil {
return
}

err := retryOnError(insertCtx, s.logger.With(zap.String("component", "operations")), func(ctx context.Context) error {
if err := operationsBatch.Send(); err != nil {
return fmt.Errorf("failed to send operation batch: %w", err)
}

for _, item := range batch {
for _, su := range item.SchemaUsage {
// Add the operation to the cache once it has been written
// We use a TTL of 30 days to prevent caching of operations that are no in our database
// due to storage retention policies
s.opGuardCache.SetWithTTL(su.OperationInfo.Hash, struct{}{}, 1, 30*24*time.Hour)
}
for _, item := range batch {
for _, su := range item.SchemaUsage {
// Add the operation to the cache once it has been written
// We use a TTL of 30 days to prevent caching of operations that are no in our database
// due to storage retention policies
s.opGuardCache.SetWithTTL(su.OperationInfo.Hash, struct{}{}, 1, 30*24*time.Hour)
}
}

s.opGuardCache.Wait()

s.opGuardCache.Wait()
storedOperations += operationsBatch.Rows()
return nil
})

storedOperations += operationsBatch.Rows()
return nil
})
if err != nil {
s.logger.Error("Failed to write operations", zap.Error(err))
}
}()

if err != nil {
s.logger.Error("Failed to write operations", zap.Error(err))
}
}()
}
wg.Add(1)
go func() {
defer wg.Done()

if metricsBatch != nil {
wg.Add(1)
go func() {
defer wg.Done()
if metricsBatch == nil {
return
}

if metricsBatch == nil {
return
err := retryOnError(insertCtx, s.logger.With(zap.String("component", "metrics")), func(ctx context.Context) error {
if err := metricsBatch.Send(); err != nil {
return fmt.Errorf("failed to send metrics batch: %w", err)
}

err := retryOnError(insertCtx, s.logger.With(zap.String("component", "metrics")), func(ctx context.Context) error {
if err := metricsBatch.Send(); err != nil {
return fmt.Errorf("failed to send metrics batch: %w", err)
}
storedMetrics += metricsBatch.Rows()
return nil
})

if err != nil {
s.logger.Error("Failed to write metrics", zap.Error(err))
}
}()

storedMetrics += metricsBatch.Rows()
return nil
})
wg.Add(1)
go func() {
defer wg.Done()

if err != nil {
s.logger.Error("Failed to write metrics", zap.Error(err))
if requestCountBatch == nil {
return
}

err := retryOnError(insertCtx, s.logger.With(zap.String("component", "total_requests")), func(ctx context.Context) error {
if err := requestCountBatch.Send(); err != nil {
return fmt.Errorf("failed to send total request batch: %w", err)
}
}()
}

return nil
})

if err != nil {
s.logger.Error("Failed to write total requests", zap.Error(err))
}
}()

wg.Wait()

Expand Down
Loading

0 comments on commit d29b462

Please sign in to comment.