Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] chore: jobsdb benchmark #5535

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions app/apphandlers/embeddedAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
drain_config "github.com/rudderlabs/rudder-server/internal/drain-config"
"github.com/rudderlabs/rudder-server/internal/pulsar"
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/jobsdb/bench"
"github.com/rudderlabs/rudder-server/processor"
"github.com/rudderlabs/rudder-server/router"
"github.com/rudderlabs/rudder-server/router/batchrouter"
Expand Down Expand Up @@ -395,5 +396,14 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
return nil
})

if config.GetBool("JobsDB.Bench.enabled", false) {
g.Go(func() error {
b, err := bench.New(config, statsFactory, a.log.Child("jobsdb.benchmark"), dbPool)
if err != nil {
return fmt.Errorf("creating jobsdb benchmarker: %w", err)
}
return b.Run(ctx)
})
}
return g.Wait()
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ require (
github.com/openzipkin/zipkin-go v0.4.3 // indirect
github.com/pelletier/go-toml/v2 v2.2.3 // indirect
github.com/pierrec/lz4 v2.6.1+incompatible // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pierrec/lz4/v4 v4.1.21
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pkg/sftp v1.13.7 // indirect
Expand Down
57 changes: 57 additions & 0 deletions gzip/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package gzip

import (
"compress/gzip"
"io"
"sync"
)

var Gzip GzipPool

// GzipPool manages a pool of gzip.Writer.
// The pool uses sync.Pool internally.
type GzipPool struct {
readers sync.Pool
writers sync.Pool
}

// GetReader returns gzip.Reader from the pool, or creates a new one
// if the pool is empty.
func (pool *GzipPool) GetReader(src io.Reader) (reader *gzip.Reader, err error) {
if r := pool.readers.Get(); r != nil {
reader = r.(*gzip.Reader)
if err := reader.Reset(src); err != nil {
pool.PutReader(reader)
return nil, err
}
return reader, nil
} else {
return gzip.NewReader(src)
}
}

// PutReader closes and returns a gzip.Reader to the pool
// so that it can be reused via GetReader.
func (pool *GzipPool) PutReader(reader *gzip.Reader) {
reader.Close()
pool.readers.Put(reader)
}

// GetWriter returns gzip.Writer from the pool, or creates a new one
// with gzip.BestCompression if the pool is empty.
func (pool *GzipPool) GetWriter(dst io.Writer) (writer *gzip.Writer, err error) {
if w := pool.writers.Get(); w != nil {
writer = w.(*gzip.Writer)
writer.Reset(dst)
return writer, nil
} else {
return gzip.NewWriterLevel(dst, gzip.BestSpeed)
}
}

// PutWriter closes and returns a gzip.Writer to the pool
// so that it can be reused via GetWriter.
func (pool *GzipPool) PutWriter(writer *gzip.Writer) {
writer.Close()
pool.writers.Put(writer)
}
26 changes: 26 additions & 0 deletions jobsdb/bench/bench.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package bench

import (
"context"
"database/sql"
"fmt"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-server/jobsdb/bench/scenario"
)

type Bench interface {
Run(ctx context.Context) error
}

func New(conf *config.Config, stat stats.Stats, log logger.Logger, db *sql.DB) (Bench, error) {
scenarioName := conf.GetStringVar("simple", "JobsDB.Bench.scenario")
switch scenarioName {
case "simple":
return scenario.NewSimple(conf, stat, log, db), nil
default:
return nil, fmt.Errorf("unknown jobsdb bench scenario name: %s", conf.GetStringVar("processor", "JobsDB.bench.scenario"))
}
}
92 changes: 92 additions & 0 deletions jobsdb/bench/bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package bench_test

import (
"context"
"fmt"
"testing"
"time"

"github.com/ory/dockertest/v3"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"

"github.com/rudderlabs/rudder-go-kit/bytesize"
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
statsmetric "github.com/rudderlabs/rudder-go-kit/stats/metric"
"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/postgres"
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/jobsdb/bench"
)

func TestBench(t *testing.T) {
pool, err := dockertest.NewPool("")
require.NoError(t, err)
postgresContainer, err := postgres.Setup(pool, t, postgres.WithOptions(
"max_connections=200",
"shared_buffers=800MB",
"effective_cache_size=2GB",
"work_mem=192MB",
"wal_buffers=26MB",
"effective_io_concurrency=100",
"random_page_cost=1",
"max_wal_size=30GB",
),
postgres.WithTag("17-alpine"),
postgres.WithShmSize(256*bytesize.MB),
)
require.NoError(t, err)
postgresContainer.DB.SetMaxOpenConns(60)
postgresContainer.DB.SetMaxIdleConns(20)

c := config.New()

c.Set("JobsDB.enableWriterQueue", true) // default: true
c.Set("JobsDB.maxWriters", 4) // default: 3

c.Set("JobsDB.enableReaderQueue", true) // default: true
c.Set("JobsDB.maxReaders", 8) // default: 6

c.Set("JobsDB.payloadColumnType", string(jobsdb.TEXT))
jobsdb.GZIP = false
jobsdb.LZ4 = false
c.Set("JobsDB.enableToastOptimizations", false)

c.Set("JobsDB.refreshDSListLoopSleepDuration", 1*time.Second)

c.Set("JobsDB.Bench.payloadSize", 2*bytesize.KB) // default: 1KB
c.Set("JobsDB.Bench.noOfSources", 15) // default: 10
c.Set("JobsDB.Bench.writerConcurrency", 4)
c.Set("JobsDB.Bench.updateConcurrency", 1)
c.Set("JobsDB.Bench.writerBatchSize", 5000)
c.Set("JobsDB.Bench.readerReadSize", 20000)
c.Set("JobsDB.Bench.payloadLimit", 100*bytesize.MB)

if false {
c.Set("JobsDB.payloadColumnType", string(jobsdb.TEXT))
c.Set("JobsDB.enableToastOptimizations", true)
c.Set("JobsDB.refreshDSListLoopSleepDuration", 1*time.Second)
}

l := logger.NewFactory(c)
stat := stats.NewStats(c, l, statsmetric.Instance)
b, err := bench.New(c, stat, l.NewLogger(), postgresContainer.DB)
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
select {
case <-ctx.Done():
return fmt.Errorf("context cancelled")
case <-time.After(5 * time.Second):
cancel()
return nil
}
})
g.Go(func() error {
return b.Run(ctx)
})
require.NoError(t, g.Wait())
}
Loading
Loading