Skip to content

Commit

Permalink
fixup! chore: some review comments work
Browse files Browse the repository at this point in the history
  • Loading branch information
Sidddddarth committed Dec 28, 2024
1 parent ee2c12a commit ece9675
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 25 deletions.
6 changes: 3 additions & 3 deletions jobsdb/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1027,7 +1027,7 @@ func requireSequential(t *testing.T, jobs []*JobT) {
func TestJobsDB_SanitizeJSON(t *testing.T) {
_ = startPostgres(t)
conf := config.New()
conf.Set("JobsDB.payloadColumnType", 0)
conf.Set("JobsDB.payloadColumnType", "jsonb")
jobDB := Handle{config: conf}
ch := func(n int) string {
return strings.Repeat("�", n)
Expand Down Expand Up @@ -1114,7 +1114,7 @@ func TestJobsDB_SanitizeJSON(t *testing.T) {
}
jobDB.TearDown()

conf.Set("JobsDB.payloadColumnType", 2)
conf.Set("JobsDB.payloadColumnType", "text")
textDB := &Handle{config: conf}
require.NoError(t, textDB.Setup(ReadWrite, true, strings.ToLower(rand.String(5))))

Expand Down Expand Up @@ -1197,7 +1197,7 @@ func TestJobsDB_SanitizeJSON(t *testing.T) {
}
textDB.TearDown()

conf.Set("JobsDB.payloadColumnType", 1)
conf.Set("JobsDB.payloadColumnType", "bytea")
byteaDB := &Handle{config: conf}
require.NoError(t, byteaDB.Setup(ReadWrite, true, strings.ToLower(rand.String(5))))

Expand Down
28 changes: 15 additions & 13 deletions jobsdb/jobsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ type payloadColumnType string

const (
JSONB payloadColumnType = "jsonb"
BYTEA = "bytea"
TEXT = "text"
// JSON // Explore afterwards?
BYTEA payloadColumnType = "bytea"
TEXT payloadColumnType = "text"
// JSON ??
)

// QueryConditions holds jobsdb query conditions
Expand Down Expand Up @@ -720,13 +720,13 @@ func WithStats(s stats.Stats) OptsFunc {

func WithBinaryPayload() OptsFunc {
return func(jd *Handle) {
jd.conf.payloadColumnType = "bytea"
jd.conf.payloadColumnType = BYTEA
}
}

func WithTextPayload() OptsFunc {
return func(jd *Handle) {
jd.conf.payloadColumnType = "text"
jd.conf.payloadColumnType = TEXT
}
}

Expand Down Expand Up @@ -798,8 +798,8 @@ func (jd *Handle) init() {
jd.config = config.Default
}

if jd.conf.payloadColumnType == "" {
jd.conf.payloadColumnType = payloadColumnType(jd.config.GetStringVar(TEXT, "JobsDB.payloadColumnType"))
if string(jd.conf.payloadColumnType) == "" {
jd.conf.payloadColumnType = payloadColumnType(jd.config.GetStringVar(string(TEXT), "JobsDB.payloadColumnType"))
}

if jd.stats == nil {
Expand Down Expand Up @@ -1461,14 +1461,16 @@ func (jd *Handle) createDSInTx(tx *Tx, newDS dataSetT) error {
}

func (jd *Handle) createDSTablesInTx(ctx context.Context, tx *Tx, newDS dataSetT) error {
var payloadColumnType string
var columnType payloadColumnType
switch jd.conf.payloadColumnType {
case JSONB:
payloadColumnType = "JSONB"
columnType = JSONB
case BYTEA:
payloadColumnType = "BYTEA"
case TEXT:
payloadColumnType = "TEXT"
columnType = BYTEA
// case TEXT:
// columnType = TEXT
default:
columnType = TEXT
}
if _, err := tx.ExecContext(ctx, fmt.Sprintf(`CREATE TABLE %q (
job_id BIGSERIAL PRIMARY KEY,
Expand All @@ -1477,7 +1479,7 @@ func (jd *Handle) createDSTablesInTx(ctx context.Context, tx *Tx, newDS dataSetT
user_id TEXT NOT NULL,
parameters JSONB NOT NULL,
custom_val VARCHAR(64) NOT NULL,
event_payload `+payloadColumnType+` NOT NULL,
event_payload `+string(columnType)+` NOT NULL,
event_count INTEGER NOT NULL DEFAULT 1,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
expire_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW());`, newDS.JobTable)); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion jobsdb/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ func (jd *Handle) migrateJobsInTx(ctx context.Context, tx *Tx, srcDS, destDS dat
ctx,
`select table_name, data_type
from information_schema.columns
where table_name IN (?, ?) and column_name='event_payload';`,
where table_name IN ($1, $2) and column_name='event_payload';`,
srcDS.JobTable, destDS.JobTable,
)
if err != nil {
Expand Down
16 changes: 8 additions & 8 deletions jobsdb/migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,33 +537,33 @@ func TestPayloadLiteral(t *testing.T) {
byteJD := Handle{
config: c,
}
byteJD.conf.payloadColumnType = "bytea"
byteJD.conf.payloadColumnType = BYTEA
require.NoError(t, byteJD.Setup(
ReadWrite,
true,
"bytea",
string(BYTEA),
))
defer byteJD.TearDown()

jsonbJD := Handle{
config: c,
}
jsonbJD.conf.payloadColumnType = "jsonb"
jsonbJD.conf.payloadColumnType = JSONB
require.NoError(t, jsonbJD.Setup(
ReadWrite,
true,
"jsonb",
string(JSONB),
))
defer jsonbJD.TearDown()

textJD := Handle{
config: c,
}
textJD.conf.payloadColumnType = "text"
textJD.conf.payloadColumnType = TEXT
require.NoError(t, textJD.Setup(
ReadWrite,
true,
"text",
string(TEXT),
))
defer textJD.TearDown()

Expand All @@ -573,7 +573,7 @@ func TestPayloadLiteral(t *testing.T) {
require.NoError(t, textJD.Store(ctx, jobs))
require.NoError(t, jsonbJD.Store(ctx, jobs))

prefixes := []string{"text", "jsonb", "bytea"}
prefixes := []string{string(TEXT), string(JSONB), string(BYTEA)}
for i := range prefixes {
_, err := db.ExecContext(ctx, fmt.Sprintf(`ALTER TABLE %[1]s_job_status_1 DROP CONSTRAINT fk_%[1]s_job_status_1_job_id`, prefixes[i]))
require.NoError(t, err)
Expand Down Expand Up @@ -603,7 +603,7 @@ func TestPayloadLiteral(t *testing.T) {
Index: "1",
},
)
require.NoError(t, err)
require.NoError(t, err, src, dest)
}
}
require.NoError(t, txn.Commit())
Expand Down

0 comments on commit ece9675

Please sign in to comment.