Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reporting: Serialize batches as they're constructed #1227

Merged
merged 3 commits into from
Mar 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 16 additions & 18 deletions pkg/agent/billing/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ func createClients(ctx context.Context, logger *zap.Logger, cfg ClientsConfig) (
logger.Info("Created HTTP client for billing events", zap.Any("config", c))

clients = append(clients, billingClient{
Name: "http",
Base: client,
BaseConfig: c.BaseClientConfig,
SerializeBatch: jsonMarshalEvents, // note: NOT gzipped.
Name: "http",
Base: client,
BaseConfig: c.BaseClientConfig,
NewBatchBuilder: jsonArrayBatch(reporting.NewByteBuffer), // note: NOT gzipped.
})

}
Expand All @@ -66,10 +66,10 @@ func createClients(ctx context.Context, logger *zap.Logger, cfg ClientsConfig) (
logger.Info("Created Azure Blob Storage client for billing events", zap.Any("config", c))

clients = append(clients, billingClient{
Name: "azureblob",
Base: client,
BaseConfig: c.BaseClientConfig,
SerializeBatch: reporting.WrapSerialize(reporting.GZIPCompress, jsonMarshalEvents),
Name: "azureblob",
Base: client,
BaseConfig: c.BaseClientConfig,
NewBatchBuilder: jsonArrayBatch(reporting.NewGZIPBuffer),
})
}
if c := cfg.S3; c != nil {
Expand All @@ -81,22 +81,20 @@ func createClients(ctx context.Context, logger *zap.Logger, cfg ClientsConfig) (
logger.Info("Created S3 client for billing events", zap.Any("config", c))

clients = append(clients, billingClient{
Name: "s3",
Base: client,
BaseConfig: c.BaseClientConfig,
SerializeBatch: reporting.WrapSerialize(reporting.GZIPCompress, jsonMarshalEvents),
Name: "s3",
Base: client,
BaseConfig: c.BaseClientConfig,
NewBatchBuilder: jsonArrayBatch(reporting.NewGZIPBuffer),
})
}

return clients, nil
}

func jsonMarshalEvents(events []*IncrementalEvent) ([]byte, reporting.SimplifiableError) {
obj := struct {
Events []*IncrementalEvent `json:"events"`
}{Events: events}

return reporting.JSONMarshalBatch(&obj)
func jsonArrayBatch[B reporting.IOBuffer](buf func() B) func() reporting.BatchBuilder[*IncrementalEvent] {
return func() reporting.BatchBuilder[*IncrementalEvent] {
return reporting.NewJSONArrayBuilder[*IncrementalEvent](buf(), "events")
}
}

// Returns a function to generate keys for the placement of billing events data into blob storage.
Expand Down
22 changes: 14 additions & 8 deletions pkg/agent/scalingevents/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ func createClients(ctx context.Context, logger *zap.Logger, cfg ClientsConfig) (
logger.Info("Created Azure Blob Storage client for scaling events", zap.Any("config", c))

clients = append(clients, eventsClient{
Name: "azureblob",
Base: client,
BaseConfig: c.BaseClientConfig,
SerializeBatch: reporting.WrapSerialize[ScalingEvent](reporting.GZIPCompress, reporting.JSONLinesMarshalBatch),
Name: "azureblob",
Base: client,
BaseConfig: c.BaseClientConfig,
NewBatchBuilder: jsonLinesBatch(reporting.NewGZIPBuffer),
})
}
if c := cfg.S3; c != nil {
Expand All @@ -57,16 +57,22 @@ func createClients(ctx context.Context, logger *zap.Logger, cfg ClientsConfig) (
logger.Info("Created S3 client for scaling events", zap.Any("config", c))

clients = append(clients, eventsClient{
Name: "s3",
Base: client,
BaseConfig: c.BaseClientConfig,
SerializeBatch: reporting.WrapSerialize[ScalingEvent](reporting.GZIPCompress, reporting.JSONLinesMarshalBatch),
Name: "s3",
Base: client,
BaseConfig: c.BaseClientConfig,
NewBatchBuilder: jsonLinesBatch(reporting.NewGZIPBuffer),
})
}

return clients, nil
}

func jsonLinesBatch[B reporting.IOBuffer](buf func() B) func() reporting.BatchBuilder[ScalingEvent] {
return func() reporting.BatchBuilder[ScalingEvent] {
return reporting.NewJSONLinesBuilder[ScalingEvent](buf())
}
}

// Returns a function to generate keys for the placement of scaling events data into blob storage.
//
// Example: prefix/2024/10/31/23/events_{uuid}.ndjson.gz (11pm on halloween, UTC)
Expand Down
83 changes: 83 additions & 0 deletions pkg/reporting/batch_jsonarray.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package reporting

import (
"encoding/json"
"fmt"
)

var _ BatchBuilder[int] = (*JSONArrayBuilder[int])(nil)

// JSONArrayBuilder is a BatchBuilder where all the events in a batch are serialized as a single
// large JSON array.
type JSONArrayBuilder[E any] struct {
buf IOBuffer
started bool
nestingCount int
}

// NewJSONArrayBatch creates a new JSONArrayBuilder using the underlying IOBuffer to potentially
// process the JSON encoding -- either with ByteBuffer for plaintext or GZIPBuffer for gzip
// compression.
func NewJSONArrayBuilder[E any](buf IOBuffer, nestedFields ...string) *JSONArrayBuilder[E] {
for _, fieldName := range nestedFields {
// note: use a discrete json.Marhsal here instead of json.Encoder because encoder adds a
// newline at the end, and that'll make the formatting weird for us.
encodedField, err := json.Marshal(fieldName)
if err != nil {
panic(fmt.Sprintf("failed to JSON encode: %s", fieldName))
}

if _, err := buf.Write([]byte{'{'}); err != nil {
panic(fmt.Sprintf("failed to write: %s", err))
}
if _, err := buf.Write(encodedField); err != nil {
panic(fmt.Sprintf("failed to write: %s", err))
}
if _, err := buf.Write([]byte{':'}); err != nil {
panic(fmt.Sprintf("failed to write: %s", err))
}
}
// open the array:
if _, err := buf.Write([]byte{'['}); err != nil {
panic(fmt.Sprintf("failed to write: %s", err))
}

return &JSONArrayBuilder[E]{
buf: buf,
started: false,
nestingCount: len(nestedFields),
}
}

func (b *JSONArrayBuilder[E]) Add(event E) {
if b.started {
if _, err := b.buf.Write([]byte("\n\t,")); err != nil {
panic(fmt.Sprintf("failed to write: %s", err))
}
}

// note: we use a discrete json.Marshal here instead of json.Encoder becaues encoder adds a
// newline at the end, and that'll make the formatting weird for us.
tmpJSON, err := json.Marshal(event)
if err != nil {
panic(fmt.Sprintf("failed to JSON encode: %s", err))
}

if _, err := b.buf.Write(tmpJSON); err != nil {
panic(fmt.Sprintf("failed to write: %s", err))
}
b.started = true
}

func (b *JSONArrayBuilder[E]) Finish() []byte {
if _, err := b.buf.Write([]byte("\n]")); err != nil {
panic(fmt.Sprintf("failed to write: %s", err))
}
for i := 0; i < b.nestingCount; i++ {
if _, err := b.buf.Write([]byte("}")); err != nil {
panic(fmt.Sprintf("failed to write: %s", err))
}
}

return b.buf.Collect()
}
38 changes: 38 additions & 0 deletions pkg/reporting/batch_jsonlines.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package reporting

import (
"encoding/json"
"fmt"
)

var _ BatchBuilder[int] = (*JSONLinesBuilder[int])(nil)

// JSONLinesBuilder is a BatchBuilder where each event in the batch is serialized as a separate JSON
// object on its own line, adhering to the "JSON lines"/"jsonl" format.
type JSONLinesBuilder[E any] struct {
buf IOBuffer
}

func NewJSONLinesBuilder[E any](buf IOBuffer) *JSONLinesBuilder[E] {
return &JSONLinesBuilder[E]{
buf: buf,
}
}

func (b *JSONLinesBuilder[E]) Add(event E) {
tmpJSON, err := json.Marshal(event)
if err != nil {
panic(fmt.Sprintf("failed to JSON encode: %s", err))
}

if _, err := b.buf.Write(tmpJSON); err != nil {
panic(fmt.Sprintf("failed to write: %s", err))
}
if _, err := b.buf.Write([]byte{'\n'}); err != nil {
panic(fmt.Sprintf("failed to write: %s", err))
}
}

func (b *JSONLinesBuilder[E]) Finish() []byte {
return b.buf.Collect()
}
44 changes: 32 additions & 12 deletions pkg/reporting/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,25 @@ import (
"github.com/prometheus/client_golang/prometheus"
)

// BatchBuilder is an interface for gradually converting []E to []byte, allowing us to construct
// batches of events without buffering them uncompressed, in memory.
//
// Implementations of BatchBuilder are defined in various 'batch_*.go' files.
type BatchBuilder[E any] interface {
// Add appends an event to the in-progress batch.
Add(event E)
// Finish completes the in-progress batch, returning the events serialized as bytes.
Finish() []byte
}

type eventBatcher[E any] struct {
mu sync.Mutex

targetBatchSize int

ongoing []E
newBatch func() BatchBuilder[E]
ongoing BatchBuilder[E]
ongoingSize int

completed []batch[E]
onComplete func()
Expand All @@ -21,11 +34,13 @@ type eventBatcher[E any] struct {
}

type batch[E any] struct {
events []E
serialized []byte
count int
}

func newEventBatcher[E any](
targetBatchSize int,
newBatch func() BatchBuilder[E],
notifyCompletedBatch func(),
sizeGauge prometheus.Gauge,
) *eventBatcher[E] {
Expand All @@ -34,7 +49,9 @@ func newEventBatcher[E any](

targetBatchSize: targetBatchSize,

ongoing: []E{},
newBatch: newBatch,
ongoing: newBatch(),
ongoingSize: 0,

completed: []batch[E]{},
onComplete: notifyCompletedBatch,
Expand All @@ -52,10 +69,11 @@ func (b *eventBatcher[E]) enqueue(event E) {
b.mu.Lock()
defer b.mu.Unlock()

b.ongoing = append(b.ongoing, event)
b.ongoing.Add(event)
b.ongoingSize += 1
b.updateGauge()

if len(b.ongoing) >= b.targetBatchSize {
if b.ongoingSize >= b.targetBatchSize {
b.finishCurrentBatch()
}
}
Expand All @@ -69,7 +87,7 @@ func (b *eventBatcher[E]) finishOngoing() {
b.mu.Lock()
defer b.mu.Unlock()

if len(b.ongoing) == 0 {
if b.ongoingSize == 0 {
return
}

Expand Down Expand Up @@ -103,26 +121,28 @@ func (b *eventBatcher[e]) dropLatestCompleted() {
b.mu.Lock()
defer b.mu.Unlock()

dropped := b.completed[0]
batch := b.completed[0]
b.completed = b.completed[1:]
b.completedSize -= len(dropped.events)
b.completedSize -= batch.count

b.updateGauge()
}

// NB: must hold mu
func (b *eventBatcher[E]) updateGauge() {
b.sizeGauge.Set(float64(len(b.ongoing) + b.completedSize))
b.sizeGauge.Set(float64(b.ongoingSize + b.completedSize))
}

// NB: must hold mu
func (b *eventBatcher[E]) finishCurrentBatch() {
b.completed = append(b.completed, batch[E]{
events: b.ongoing,
serialized: b.ongoing.Finish(),
count: b.ongoingSize,
})

b.completedSize += len(b.ongoing)
b.ongoing = []E{}
b.completedSize += b.ongoingSize
b.ongoingSize = 0
b.ongoing = b.newBatch()

b.onComplete()
}
Loading
Loading