Skip to content

Commit

Permalink
query statement filters by family name
Browse files Browse the repository at this point in the history
  • Loading branch information
Hongyu Zhou committed Feb 21, 2024
1 parent 519bf36 commit 6f62de3
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 18 deletions.
30 changes: 18 additions & 12 deletions pkg/cmd/ctlstore/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ type reflectorCliConfig struct {
UpstreamDriver string `conf:"upstream-driver" help:"Upstream driver name (e.g. sqlite3)" validate:"nonzero"`
UpstreamDSN string `conf:"upstream-dsn" help:"Upstream DSN (e.g. path to file if sqlite3)" validate:"nonzero"`
UpstreamLedgerTable string `conf:"upstream-ledger-table" help:"Table on the upstream to look for statement ledger"`
UpstreamShardingFamily string `conf:"upstream-sharding-family" help:"Sharding family(s) reflector is targeting"`
UpstreamShardingTable string `conf:"upstream-sharding-table" help:"Sharding tables(s) reflector is targeting"`
BootstrapURL string `conf:"bootstrap-url" help:"Bootstraps LDB from an S3 URL"`
BootstrapRegion string `conf:"bootstrap-region" help:"If specified, indicates which region in which the S3 bucket lives"`
PollInterval time.Duration `conf:"poll-interval" help:"How often to pull the upstream" validate:"nonzero"`
Expand Down Expand Up @@ -573,18 +575,20 @@ func multiReflector(ctx context.Context, args []string) {

func defaultReflectorCLIConfig(isSupervisor bool) reflectorCliConfig {
config := reflectorCliConfig{
LDBPath: "",
ChangelogPath: "",
ChangelogSize: 1 * 1024 * 1024,
UpstreamDriver: "",
UpstreamDSN: "",
UpstreamLedgerTable: "ctlstore_dml_ledger",
BootstrapURL: "",
PollInterval: 1 * time.Second,
PollJitterCoefficient: 0.25,
QueryBlockSize: 100,
Dogstatsd: defaultDogstatsdConfig(),
PollTimeout: 5 * time.Second,
LDBPath: "",
ChangelogPath: "",
ChangelogSize: 1 * 1024 * 1024,
UpstreamDriver: "",
UpstreamDSN: "",
UpstreamLedgerTable: "ctlstore_dml_ledger",
UpstreamShardingFamily: "flagon2,cob",
UpstreamShardingTable: "flagon2___flags,cob___kvs",
BootstrapURL: "",
PollInterval: 1 * time.Second,
PollJitterCoefficient: 0.25,
QueryBlockSize: 100,
Dogstatsd: defaultDogstatsdConfig(),
PollTimeout: 5 * time.Second,
LedgerHealth: ledgerHealthConfig{
Disable: false,
MaxHealthyLatency: time.Minute,
Expand Down Expand Up @@ -654,6 +658,8 @@ func newReflector(cliCfg reflectorCliConfig, isSupervisor bool, i int) (*reflect
Driver: cliCfg.UpstreamDriver,
DSN: cliCfg.UpstreamDSN,
LedgerTable: cliCfg.UpstreamLedgerTable,
ShardingFamily: cliCfg.UpstreamShardingFamily,
ShardingTable: cliCfg.UpstreamShardingTable,
PollInterval: cliCfg.PollInterval,
PollJitterCoefficient: cliCfg.PollJitterCoefficient,
QueryBlockSize: cliCfg.QueryBlockSize,
Expand Down
31 changes: 27 additions & 4 deletions pkg/reflector/dml_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"fmt"
"strings"
"time"

"github.com/pkg/errors"
Expand All @@ -29,6 +30,8 @@ type sqlDmlSource struct {
db *sql.DB
lastSequence schema.DMLSequence
ledgerTableName string
shardingFamily string
shardingTable string
queryBlockSize int
buffer []schema.DMLStatement
scanLoopCallBack func()
Expand All @@ -44,10 +47,8 @@ func (source *sqlDmlSource) Next(ctx context.Context) (statement schema.DMLState
blocksize = defaultQueryBlockSize
}

// table layout is: seq, leader_ts, statement
qs := sqlgen.SqlSprintf("SELECT seq, leader_ts, statement, family_name, table_name FROM $1 WHERE seq > ? ORDER BY seq LIMIT $2",
source.ledgerTableName,
fmt.Sprintf("%d", blocksize))
// table layout is: seq, leader_ts, statement, family_name, table_name
qs := generateSQLQuery(source.ledgerTableName, source.shardingFamily, source.shardingTable, blocksize)

// HMM: do we lean too hard on the LIMIT here? in the loop below
// we'll end up spinning if the DB keeps feeding us data
Expand Down Expand Up @@ -126,3 +127,25 @@ func (source *sqlDmlSource) Next(ctx context.Context) (statement schema.DMLState
err = errNoNewStatements
return
}

// Helper function to generate the SQL query
func generateSQLQuery(ledgerTableName, shardingFamily, shardingTable string, blocksize int) string {
if shardingFamily != "" {
familiesStr := prepareFamilyString(shardingFamily)
return sqlgen.SqlSprintf("SELECT seq, leader_ts, statement, family_name, table_name FROM $1 WHERE seq > ? AND family IN $2 ORDER BY seq LIMIT $4",
ledgerTableName,
familiesStr,
shardingTable,
fmt.Sprintf("%d", blocksize))
} else {
return sqlgen.SqlSprintf("SELECT seq, leader_ts, statement, family_name, table_name FROM $1 WHERE seq > ? ORDER BY seq LIMIT $3",
ledgerTableName,
shardingTable,
fmt.Sprintf("%d", blocksize))
}
}

// Helper function to prepare the family string for SQL query
func prepareFamilyString(families string) string {
return "('" + strings.ReplaceAll(families, ",", "', '") + "')"
}
33 changes: 33 additions & 0 deletions pkg/reflector/dml_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,36 @@ func TestSqlDmlSource(t *testing.T) {
t.Fatal("Expected a context error or an interrupted error")
}
}

func TestPrepareFamilyString(t *testing.T) {
tests := []struct {
name string
input string
expected string
}{
{
name: "Single family",
input: "family1",
expected: "('family1')",
},
{
name: "Multiple families",
input: "family1,family2,family3",
expected: "('family1', 'family2', 'family3')",
},
{
name: "No families",
input: "",
expected: "('')",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
output := prepareFamilyString(tt.input)
if output != tt.expected {
t.Errorf("prepareFamilyString(%q) = %q, want %q", tt.input, output, tt.expected)
}
})
}
}
4 changes: 4 additions & 0 deletions pkg/reflector/reflector.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type UpstreamConfig struct {
Driver string
DSN string
LedgerTable string
ShardingFamily string
ShardingTable string
QueryBlockSize int
PollInterval time.Duration
PollTimeout time.Duration
Expand Down Expand Up @@ -254,6 +256,8 @@ func ReflectorFromConfig(config ReflectorConfig) (*Reflector, error) {
db: upstreamdb,
lastSequence: lastSeq,
ledgerTableName: config.Upstream.LedgerTable,
shardingFamily: config.Upstream.ShardingFamily,
shardingTable: config.Upstream.ShardingTable,
queryBlockSize: config.Upstream.QueryBlockSize,
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/schema/dml.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ func NewTestDMLStatementWithSharding(statement string, familyName FamilyName, ta
Statement: statement,
Sequence: nextTestDmlSeq(),
Timestamp: time.Now(),
FamilyName: FamilyName(familyName),
TableName: TableName(tableName),
FamilyName: familyName,
TableName: tableName,
}

}
Expand Down

0 comments on commit 6f62de3

Please sign in to comment.