Skip to content

Commit

Permalink
chore: jobsdb benchmark
Browse files Browse the repository at this point in the history
  • Loading branch information
atzoum committed Feb 21, 2025
1 parent 9f8d6ad commit 17554e4
Show file tree
Hide file tree
Showing 14 changed files with 780 additions and 129 deletions.
10 changes: 10 additions & 0 deletions app/apphandlers/embeddedAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
drain_config "github.com/rudderlabs/rudder-server/internal/drain-config"
"github.com/rudderlabs/rudder-server/internal/pulsar"
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/jobsdb/bench"
"github.com/rudderlabs/rudder-server/processor"
"github.com/rudderlabs/rudder-server/router"
"github.com/rudderlabs/rudder-server/router/batchrouter"
Expand Down Expand Up @@ -395,5 +396,14 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
return nil
})

if config.GetBool("JobsDB.Bench.enabled", false) {
g.Go(func() error {
b, err := bench.New(config, statsFactory, a.log.Child("jobsdb.benchmark"), dbPool)
if err != nil {
return fmt.Errorf("creating jobsdb benchmarker: %w", err)
}
return b.Run(ctx)
})
}
return g.Wait()
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ require (
github.com/openzipkin/zipkin-go v0.4.3 // indirect
github.com/pelletier/go-toml/v2 v2.2.3 // indirect
github.com/pierrec/lz4 v2.6.1+incompatible // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pierrec/lz4/v4 v4.1.21
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pkg/sftp v1.13.7 // indirect
Expand Down
57 changes: 57 additions & 0 deletions gzip/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package gzip

import (
"compress/gzip"
"io"
"sync"
)

var Gzip GzipPool

// GzipPool manages a pool of gzip.Writer.
// The pool uses sync.Pool internally.
type GzipPool struct {
readers sync.Pool
writers sync.Pool
}

// GetReader returns gzip.Reader from the pool, or creates a new one
// if the pool is empty.
func (pool *GzipPool) GetReader(src io.Reader) (reader *gzip.Reader, err error) {
if r := pool.readers.Get(); r != nil {
reader = r.(*gzip.Reader)
if err := reader.Reset(src); err != nil {
pool.PutReader(reader)
return nil, err
}
return reader, nil
} else {
return gzip.NewReader(src)
}
}

// PutReader closes and returns a gzip.Reader to the pool
// so that it can be reused via GetReader.
func (pool *GzipPool) PutReader(reader *gzip.Reader) {
reader.Close()
pool.readers.Put(reader)
}

// GetWriter returns gzip.Writer from the pool, or creates a new one
// with gzip.BestCompression if the pool is empty.
func (pool *GzipPool) GetWriter(dst io.Writer) (writer *gzip.Writer, err error) {
if w := pool.writers.Get(); w != nil {
writer = w.(*gzip.Writer)
writer.Reset(dst)
return writer, nil
} else {
return gzip.NewWriterLevel(dst, gzip.BestSpeed)
}
}

// PutWriter closes and returns a gzip.Writer to the pool
// so that it can be reused via GetWriter.
func (pool *GzipPool) PutWriter(writer *gzip.Writer) {
writer.Close()
pool.writers.Put(writer)
}
26 changes: 26 additions & 0 deletions jobsdb/bench/bench.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package bench

import (
"context"
"database/sql"
"fmt"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-server/jobsdb/bench/scenario"
)

type Bench interface {
Run(ctx context.Context) error
}

func New(conf *config.Config, stat stats.Stats, log logger.Logger, db *sql.DB) (Bench, error) {
scenarioName := conf.GetStringVar("simple", "JobsDB.Bench.scenario")
switch scenarioName {
case "simple":
return scenario.NewSimple(conf, stat, log, db), nil
default:
return nil, fmt.Errorf("unknown jobsdb bench scenario name: %s", conf.GetStringVar("processor", "JobsDB.bench.scenario"))
}
}
92 changes: 92 additions & 0 deletions jobsdb/bench/bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package bench_test

import (
"context"
"fmt"
"testing"
"time"

"github.com/ory/dockertest/v3"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"

"github.com/rudderlabs/rudder-go-kit/bytesize"
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
statsmetric "github.com/rudderlabs/rudder-go-kit/stats/metric"
"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/postgres"
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/jobsdb/bench"
)

func TestBench(t *testing.T) {
pool, err := dockertest.NewPool("")
require.NoError(t, err)
postgresContainer, err := postgres.Setup(pool, t, postgres.WithOptions(
"max_connections=200",
"shared_buffers=800MB",
"effective_cache_size=2GB",
"work_mem=192MB",
"wal_buffers=26MB",
"effective_io_concurrency=100",
"random_page_cost=1",
"max_wal_size=30GB",
),
postgres.WithTag("17-alpine"),
postgres.WithShmSize(256*bytesize.MB),
)
require.NoError(t, err)
postgresContainer.DB.SetMaxOpenConns(60)
postgresContainer.DB.SetMaxIdleConns(20)

c := config.New()

c.Set("JobsDB.enableWriterQueue", true) // default: true
c.Set("JobsDB.maxWriters", 4) // default: 3

c.Set("JobsDB.enableReaderQueue", true) // default: true
c.Set("JobsDB.maxReaders", 8) // default: 6

c.Set("JobsDB.payloadColumnType", string(jobsdb.TEXT))
jobsdb.GZIP = false
jobsdb.LZ4 = false
c.Set("JobsDB.enableToastOptimizations", false)

c.Set("JobsDB.refreshDSListLoopSleepDuration", 1*time.Second)

c.Set("JobsDB.Bench.payloadSize", 2*bytesize.KB) // default: 1KB
c.Set("JobsDB.Bench.noOfSources", 15) // default: 10
c.Set("JobsDB.Bench.writerConcurrency", 4)
c.Set("JobsDB.Bench.updateConcurrency", 1)
c.Set("JobsDB.Bench.writerBatchSize", 5000)
c.Set("JobsDB.Bench.readerReadSize", 20000)
c.Set("JobsDB.Bench.payloadLimit", 100*bytesize.MB)

if false {
c.Set("JobsDB.payloadColumnType", string(jobsdb.TEXT))
c.Set("JobsDB.enableToastOptimizations", true)
c.Set("JobsDB.refreshDSListLoopSleepDuration", 1*time.Second)
}

l := logger.NewFactory(c)
stat := stats.NewStats(c, l, statsmetric.Instance)
b, err := bench.New(c, stat, l.NewLogger(), postgresContainer.DB)
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
select {
case <-ctx.Done():
return fmt.Errorf("context cancelled")
case <-time.After(5 * time.Second):
cancel()
return nil
}
})
g.Go(func() error {
return b.Run(ctx)
})
require.NoError(t, g.Wait())
}
Loading

0 comments on commit 17554e4

Please sign in to comment.