From 58a72f574847167273e86ebd37700f277bcd4d27 Mon Sep 17 00:00:00 2001 From: "siddarth.msv" Date: Fri, 13 Dec 2024 15:58:00 +0530 Subject: [PATCH 01/17] chore: event_payload column can be JSONB, BYTEA or TEXT --- jobsdb/jobsdb.go | 65 +++++++- jobsdb/migration.go | 71 ++++++++- jobsdb/migration_test.go | 320 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 453 insertions(+), 3 deletions(-) diff --git a/jobsdb/jobsdb.go b/jobsdb/jobsdb.go index 2d728509e7..0035cf86f7 100644 --- a/jobsdb/jobsdb.go +++ b/jobsdb/jobsdb.go @@ -73,6 +73,15 @@ const ( pgErrorCodeTableReadonly = "RS001" ) +type payloadColumnType int + +const ( + JSONB payloadColumnType = iota + BYTEA + TEXT + // JSON // Explore afterwards? +) + // QueryConditions holds jobsdb query conditions type QueryConditions struct { // if IgnoreCustomValFiltersInQuery is true, CustomValFilters is not going to be used @@ -499,6 +508,7 @@ type Handle struct { config *config.Config conf struct { + payloadColumnType payloadColumnType maxTableSize config.ValueLoader[int64] cacheExpiration config.ValueLoader[time.Duration] addNewDSLoopSleepDuration config.ValueLoader[time.Duration] @@ -702,6 +712,18 @@ func WithStats(s stats.Stats) OptsFunc { } } +func WithBinaryPayload() OptsFunc { + return func(jd *Handle) { + jd.conf.payloadColumnType = 1 + } +} + +func WithTextPayload() OptsFunc { + return func(jd *Handle) { + jd.conf.payloadColumnType = 2 + } +} + func WithSkipMaintenanceErr(ignore bool) OptsFunc { return func(jd *Handle) { jd.conf.skipMaintenanceError = ignore @@ -770,6 +792,8 @@ func (jd *Handle) init() { jd.config = config.Default } + jd.conf.payloadColumnType = payloadColumnType(jd.config.GetIntVar(0, 1, jd.tablePrefix+"."+string(jd.ownerType)+".payloadColumnType", jd.tablePrefix+".payloadColumnType")) + if jd.stats == nil { jd.stats = stats.Default } @@ -1429,6 +1453,15 @@ func (jd *Handle) createDSInTx(tx *Tx, newDS dataSetT) error { } func (jd *Handle) createDSTablesInTx(ctx context.Context, tx *Tx, newDS dataSetT) error { + var payloadColumnType string + switch jd.conf.payloadColumnType { + case JSONB: + payloadColumnType = "JSONB" + case BYTEA: + payloadColumnType = "BYTEA" + case TEXT: + payloadColumnType = "TEXT" + } if _, err := tx.ExecContext(ctx, fmt.Sprintf(`CREATE TABLE %q ( job_id BIGSERIAL PRIMARY KEY, workspace_id TEXT NOT NULL DEFAULT '', @@ -1436,7 +1469,7 @@ func (jd *Handle) createDSTablesInTx(ctx context.Context, tx *Tx, newDS dataSetT user_id TEXT NOT NULL, parameters JSONB NOT NULL, custom_val VARCHAR(64) NOT NULL, - event_payload JSONB NOT NULL, + event_payload `+payloadColumnType+` NOT NULL, event_count INTEGER NOT NULL DEFAULT 1, created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), expire_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW());`, newDS.JobTable)); err != nil { @@ -2215,6 +2248,7 @@ func (jd *Handle) getJobsDS(ctx context.Context, ds dataSetT, lastDS bool, param resultsetStates := map[string]struct{}{} for rows.Next() { var job JobT + var payload Payload var jsState sql.NullString var jsAttemptNum sql.NullInt64 var jsExecTime sql.NullTime @@ -2223,13 +2257,14 @@ func (jd *Handle) getJobsDS(ctx context.Context, ds dataSetT, lastDS bool, param var jsErrorResponse []byte var jsParameters []byte err := rows.Scan(&job.JobID, &job.UUID, &job.UserID, &job.Parameters, &job.CustomVal, - &job.EventPayload, &job.EventCount, &job.CreatedAt, &job.ExpireAt, &job.WorkspaceId, &job.PayloadSize, &runningEventCount, &runningPayloadSize, + &payload, &job.EventCount, &job.CreatedAt, &job.ExpireAt, &job.WorkspaceId, &job.PayloadSize, &runningEventCount, &runningPayloadSize, &jsState, &jsAttemptNum, &jsExecTime, &jsRetryTime, &jsErrorCode, &jsErrorResponse, &jsParameters) if err != nil { return JobsResult{}, false, err } + job.EventPayload = payload.PayloadBytes() if jsState.Valid { resultsetStates[jsState.String] = struct{}{} job.LastJobStatus.JobState = jsState.String @@ -2289,6 +2324,32 @@ func (jd *Handle) getJobsDS(ctx context.Context, ds dataSetT, lastDS bool, param }, true, nil } +type Payload struct { + S string + B []byte +} + +func (p *Payload) PayloadBytes() []byte { + if p.B != nil { + return p.B + } + return []byte(p.S) +} + +func (p *Payload) Scan(src interface{}) error { + b, ok := src.([]byte) + if !ok { + s, ok := src.(string) + if !ok { + return errors.New("neither string nor bytes") + } + p.S = s + return nil + } + p.B = b + return nil +} + func (jd *Handle) updateJobStatusDSInTx(ctx context.Context, tx *Tx, ds dataSetT, statusList []*JobStatusT, tags statTags) (updatedStates map[string]map[string]map[ParameterFilterT]struct{}, err error) { if len(statusList) == 0 { return diff --git a/jobsdb/migration.go b/jobsdb/migration.go index 37594d6175..ea3c8bf825 100644 --- a/jobsdb/migration.go +++ b/jobsdb/migration.go @@ -459,18 +459,86 @@ func (jd *Handle) getMigrationList(dsList []dataSetT) (migrateFrom []dsWithPendi return } +func getColumnConversion(srcType, destType string) string { + if srcType == destType { + return "j.event_payload" + } + switch srcType { + case "jsonb": + switch destType { + case "text": + return "j.event_payload::TEXT" + case "bytea": + return "convert_to(j.event_payload::TEXT, 'UTF8')" + default: + panic(fmt.Sprintf("unsupported payload column types: src-%s, dest-%s", srcType, destType)) + } + case "bytea": + switch destType { + case "text": + return "convert_from(j.event_payload, 'UTF8')" + case "jsonb": + return "convert_from(j.event_payload, 'UTF8')::jsonb" + default: + panic(fmt.Sprintf("unsupported payload column types: src-%s, dest-%s", srcType, destType)) + } + case "text": + switch destType { + case "jsonb": + return "j.event_payload::jsonb" + case "bytea": + return "convert_to(j.event_payload, 'UTF8')" + default: + panic(fmt.Sprintf("unsupported payload column types: src-%s, dest-%s", srcType, destType)) + } + default: + panic(fmt.Sprintf("unsupported payload column types: src-%s, dest-%s", srcType, destType)) + } +} + func (jd *Handle) migrateJobsInTx(ctx context.Context, tx *Tx, srcDS, destDS dataSetT) (int, error) { defer jd.getTimerStat( "migration_jobs", &statTags{CustomValFilters: []string{jd.tablePrefix}}, ).RecordDuration()() + columnTypeMap := map[string]string{srcDS.JobTable: "jsonb", destDS.JobTable: "jsonb"} + // find colummn types first - to differentiate between `bytea` and `jsonb` + rows, err := tx.QueryContext( + ctx, + fmt.Sprintf( + `select table_name, data_type + from information_schema.columns + where table_name IN ('%[1]s', '%[2]s') and column_name='event_payload';`, + srcDS.JobTable, destDS.JobTable, + ), + ) + if err != nil { + return 0, fmt.Errorf("failed to get column types: %w", err) + } + defer rows.Close() + var jobsTable, columnType string + for rows.Next() { + if err = rows.Scan(&jobsTable, &columnType); err != nil { + return 0, fmt.Errorf("failed to scan column types: %w", err) + } + if columnType != "bytea" && columnType != "jsonb" && columnType != "text" { + return 0, fmt.Errorf("unsupported column type %s", columnType) + } + columnTypeMap[jobsTable] = columnType + } + if err = rows.Err(); err != nil { + return 0, fmt.Errorf("rows.Err() on column types: %w", err) + } + payloadLiteral := getColumnConversion(columnTypeMap[srcDS.JobTable], columnTypeMap[destDS.JobTable]) + jd.logger.Info(payloadLiteral) + compactDSQuery := fmt.Sprintf( `with last_status as (select * from "v_last_%[1]s"), inserted_jobs as ( insert into %[3]q (job_id, workspace_id, uuid, user_id, custom_val, parameters, event_payload, event_count, created_at, expire_at) - (select j.job_id, j.workspace_id, j.uuid, j.user_id, j.custom_val, j.parameters, j.event_payload, j.event_count, j.created_at, j.expire_at from %[2]q j left join last_status js on js.job_id = j.job_id + (select j.job_id, j.workspace_id, j.uuid, j.user_id, j.custom_val, j.parameters, %[6]s, j.event_count, j.created_at, j.expire_at from %[2]q j left join last_status js on js.job_id = j.job_id where js.job_id is null or js.job_state = ANY('{%[5]s}') order by j.job_id) returning job_id ), insertedStatuses as @@ -484,6 +552,7 @@ func (jd *Handle) migrateJobsInTx(ctx context.Context, tx *Tx, srcDS, destDS dat destDS.JobTable, destDS.JobStatusTable, strings.Join(validNonTerminalStates, ","), + payloadLiteral, ) var numJobsMigrated int64 diff --git a/jobsdb/migration_test.go b/jobsdb/migration_test.go index 8036df455e..b834fe0a5b 100644 --- a/jobsdb/migration_test.go +++ b/jobsdb/migration_test.go @@ -11,6 +11,7 @@ import ( "github.com/rudderlabs/rudder-go-kit/config" "github.com/rudderlabs/rudder-go-kit/testhelper/rand" + "github.com/rudderlabs/rudder-server/utils/tx" ) func TestMigration(t *testing.T) { @@ -326,4 +327,323 @@ func TestMigration(t *testing.T) { updatedTableSizes := getTableSizes(jobDB.getDSList()) require.Equal(t, newTableSizes[fmt.Sprintf("%s_job_status_1", tablePrefix)], updatedTableSizes[fmt.Sprintf("%s_job_status_1", tablePrefix)]) }) + + t.Run("migration between different table types(jsonb, text, bytea)", func(t *testing.T) { + config.Reset() + c := config.New() + c.Set("JobsDB.maxDSSize", 1) + + _ = startPostgres(t) + + triggerAddNewDS := make(chan time.Time) + triggerMigrateDS := make(chan time.Time) + + jobDB := Handle{ + TriggerAddNewDS: func() <-chan time.Time { + return triggerAddNewDS + }, + TriggerMigrateDS: func() <-chan time.Time { + return triggerMigrateDS + }, + config: c, + } + tablePrefix := strings.ToLower(rand.String(5)) + err := jobDB.Setup( + ReadWrite, + true, + tablePrefix, + ) + require.NoError(t, err) + defer jobDB.TearDown() + + c.Set("JobsDB."+tablePrefix+"."+"maxDSRetention", "1ms") + + customVal := rand.String(5) + jobs := genJobs(defaultWorkspaceID, customVal, 30, 1) + require.NoError(t, jobDB.Store(context.Background(), jobs[:10])) + + // let 8 jobs succeed, and 2 repeatedly fail + require.NoError( + t, + jobDB.UpdateJobStatus( + context.Background(), + genJobStatuses(jobs[:8], "executing"), + []string{customVal}, + []ParameterFilterT{}, + ), + ) + require.NoError( + t, + jobDB.UpdateJobStatus( + context.Background(), + genJobStatuses(jobs[:8], "succeeded"), + []string{customVal}, + []ParameterFilterT{}, + ), + ) + + require.NoError( + t, + jobDB.UpdateJobStatus( + context.Background(), + genJobStatuses(jobs[8:10], "executing"), + []string{customVal}, + []ParameterFilterT{}, + ), + `status update failed in 1st DS`, + ) + require.NoError( + t, + jobDB.UpdateJobStatus( + context.Background(), + genJobStatuses(jobs[8:10], "failed"), + []string{customVal}, + []ParameterFilterT{}, + ), + `status update failed in 1st DS`, + ) + require.EqualValues(t, 1, jobDB.GetMaxDSIndex()) + + jobDB.conf.payloadColumnType = 1 + triggerAddNewDS <- time.Now() // trigger addNewDSLoop to run + triggerAddNewDS <- time.Now() // Second time, waits for the first loop to finish + require.EqualValues(t, 2, jobDB.GetMaxDSIndex()) + + var payloadType string + secondTableName := fmt.Sprintf("%s_jobs_2", tablePrefix) + err = jobDB.dbHandle.QueryRowContext(context.Background(), fmt.Sprintf(`select data_type from information_schema.columns where table_name='%s' and column_name='event_payload';`, secondTableName)).Scan(&payloadType) + require.NoError(t, err) + require.EqualValues(t, "bytea", payloadType) + + // add some more jobs to the new DS + require.NoError(t, jobDB.Store(context.Background(), jobs[10:20])) + + // triggerMigrateDS <- time.Now() + // triggerMigrateDS <- time.Now() + // var payloadType_1_1 string + // err = jobDB.dbHandle.QueryRowContext(context.Background(), fmt.Sprintf(`select data_type from information_schema.columns where table_name='%s' and column_name='event_payload';`, tablePrefix+"_jobs_1_1")).Scan(&payloadType_1_1) + // require.NoError(t, err) + // require.EqualValues(t, "bytea", payloadType_1_1) + + triggerAddNewDS <- time.Now() // trigger addNewDSLoop to run + triggerAddNewDS <- time.Now() // Second time, waits for the first loop to finish + require.EqualValues(t, 3, jobDB.GetMaxDSIndex()) + thirdTableName := fmt.Sprintf("%s_jobs_3", tablePrefix) + err = jobDB.dbHandle.QueryRowContext(context.Background(), fmt.Sprintf(`select data_type from information_schema.columns where table_name='%s' and column_name='event_payload';`, thirdTableName)).Scan(&payloadType) + require.NoError(t, err) + require.EqualValues(t, "bytea", payloadType) + + // last DS + // should have enough statuses for a clean up to be triggered + // all non-terminal + require.NoError(t, jobDB.Store(context.Background(), jobs[20:30])) + for i := 0; i < 10; i++ { + require.NoError( + t, + jobDB.UpdateJobStatus( + context.Background(), + genJobStatuses(jobs[20:30], "executing"), + []string{customVal}, + []ParameterFilterT{}, + ), + `status update failed in 3rd DS`, + ) + require.NoError( + t, + jobDB.UpdateJobStatus( + context.Background(), + genJobStatuses(jobs[20:30], "failed"), + []string{customVal}, + []ParameterFilterT{}, + ), + `status update failed in 3rd DS`, + ) + } + + c.Set("JobsDB.maxDSSize", 100000) + jobDB.conf.payloadColumnType = 2 + triggerMigrateDS <- time.Now() // trigger migrateDSLoop to run + triggerMigrateDS <- time.Now() // waits for last loop to finish + + // data moved from both jsonb and bytea columns to a text column + + // we should see that in the three DSs we have, + // the first one should only have non-terminal jobs left now(with only the last status) in an jobs_1_1 + // the second one should have all jobs + // the third DS should have all jobs with only the last status per job + + // check that the first DS has only non-terminal jobs + dsList := jobDB.getDSList() + require.Len(t, dsList, 2) // 2_1, 3 + require.Equal(t, `2_1`, dsList[0].Index) + var count int64 + err = jobDB.dbHandle.QueryRow( + fmt.Sprintf( + `SELECT COUNT(*) FROM %[1]s_jobs_2_1 WHERE %[1]s_jobs_2_1.custom_val = $1`, + tablePrefix, + ), + customVal, + ).Scan(&count) + require.NoError(t, err) + require.EqualValues(t, 12, count) + + err = jobDB.dbHandle.QueryRowContext(context.Background(), fmt.Sprintf(`select data_type from information_schema.columns where table_name='%s' and column_name='event_payload';`, tablePrefix+"_jobs_2_1")).Scan(&payloadType) + require.NoError(t, err) + require.EqualValues(t, "text", payloadType) + + require.Equal(t, `3`, dsList[1].Index) + err = jobDB.dbHandle.QueryRow( + fmt.Sprintf( + `SELECT COUNT(*) FROM %[1]s_jobs_3 WHERE %[1]s_jobs_3.custom_val = $1`, + tablePrefix, + ), + customVal, + ).Scan(&count) + require.NoError(t, err) + require.EqualValues(t, 10, count) + + err = jobDB.dbHandle.QueryRow( + fmt.Sprintf( + `SELECT COUNT(*) FROM %[1]s_job_status_3 where job_state = 'failed';`, + tablePrefix, + ), + ).Scan(&count) + require.NoError(t, err) + require.EqualValues(t, 100, count) + + getJobs, err := jobDB.GetToProcess(context.Background(), GetQueryParams{ + IgnoreCustomValFiltersInQuery: true, + EventsLimit: 1, + JobsLimit: 1, + }, nil) + require.NoError(t, err) + require.Equal(t, 1, getJobs.EventsCount) + require.JSONEq( + t, + `{"receivedAt":"2021-06-06T20:26:39.598+05:30","writeKey":"writeKey","requestIP":"[::1]", "batch": [{"anonymousId":"anon_id","channel":"android-sdk","context":{"app":{"build":"1","name":"RudderAndroidClient","namespace":"com.rudderlabs.android.sdk","version":"1.0"},"device":{"id":"49e4bdd1c280bc00","manufacturer":"Google","model":"Android SDK built for x86","name":"generic_x86"},"library":{"name":"com.rudderstack.android.sdk.core"},"locale":"en-US","network":{"carrier":"Android"},"screen":{"density":420,"height":1794,"width":1080},"traits":{"anonymousId":"49e4bdd1c280bc00"},"user_agent":"Dalvik/2.1.0 (Linux; U; Android 9; Android SDK built for x86 Build/PSR1.180720.075)"},"event":"Demo Track","integrations":{"All":true},"messageId":"b96f3d8a-7c26-4329-9671-4e3202f42f15","originalTimestamp":"2019-08-12T05:08:30.909Z","properties":{"category":"Demo Category","floatVal":4.501,"label":"Demo Label","testArray":[{"id":"elem1","value":"e1"},{"id":"elem2","value":"e2"}],"testMap":{"t1":"a","t2":4},"value":5},"rudderId":"a-292e-4e79-9880-f8009e0ae4a3","sentAt":"2019-08-12T05:08:30.909Z","type":"track"}]}`, + string(getJobs.Jobs[0].EventPayload), + ) + }) +} + +func TestPayloadLiteral(t *testing.T) { + config.Reset() + c := config.New() + c.Set("JobsDB.maxDSSize", 1) + + pg := startPostgres(t) + db := pg.DB + + byteJD := Handle{ + config: c, + } + byteJD.conf.payloadColumnType = 1 + require.NoError(t, byteJD.Setup( + ReadWrite, + true, + "bytea", + )) + defer byteJD.TearDown() + + jsonbJD := Handle{ + config: c, + } + jsonbJD.conf.payloadColumnType = 0 + require.NoError(t, jsonbJD.Setup( + ReadWrite, + true, + "jsonb", + )) + defer jsonbJD.TearDown() + + textJD := Handle{ + config: c, + } + textJD.conf.payloadColumnType = 2 + require.NoError(t, textJD.Setup( + ReadWrite, + true, + "text", + )) + defer textJD.TearDown() + + ctx := context.Background() + jobs := genJobs("wsid", "cv", 1, 1) + require.NoError(t, byteJD.Store(ctx, jobs)) + require.NoError(t, textJD.Store(ctx, jobs)) + require.NoError(t, jsonbJD.Store(ctx, jobs)) + + prefixes := []string{"text", "jsonb", "bytea"} + for i := range prefixes { + _, err := db.ExecContext(ctx, fmt.Sprintf(`ALTER TABLE %[1]s_job_status_1 DROP CONSTRAINT fk_%[1]s_job_status_1_job_id`, prefixes[i])) + require.NoError(t, err) + _, err = db.ExecContext(ctx, fmt.Sprintf(`ALTER TABLE %[1]s_jobs_1 DROP CONSTRAINT %[1]s_jobs_1_pkey`, prefixes[i])) + require.NoError(t, err) + } // we drop these two because migrateJobsInTx moved jobIDs too, and we're only interested in moving jobs between two different column types + txn, err := db.Begin() + require.NoError(t, err) + for i := range prefixes { + for j := range prefixes { + if i == j { + continue + } + src := prefixes[i] + dest := prefixes[j] + _, err := textJD.migrateJobsInTx( + ctx, + &tx.Tx{Tx: txn}, + dataSetT{ + JobTable: src + "_jobs_1", + JobStatusTable: src + "_job_status_1", + Index: "1", + }, + dataSetT{ + JobTable: dest + "_jobs_1", + JobStatusTable: dest + "_job_status_1", + Index: "1", + }, + ) + require.NoError(t, err) + } + } + require.NoError(t, txn.Commit()) + + byteJobs, err := byteJD.GetUnprocessed(ctx, GetQueryParams{ + EventsLimit: 100, JobsLimit: 100, IgnoreCustomValFiltersInQuery: true, + }) + require.NoError(t, err) + textJobs, err := textJD.GetUnprocessed(ctx, GetQueryParams{ + EventsLimit: 100, JobsLimit: 100, IgnoreCustomValFiltersInQuery: true, + }) + require.NoError(t, err) + jsonbJobs, err := jsonbJD.GetUnprocessed(ctx, GetQueryParams{ + EventsLimit: 100, JobsLimit: 100, IgnoreCustomValFiltersInQuery: true, + }) + require.NoError(t, err) + require.Equal(t, 4, byteJobs.EventsCount) + require.Equal(t, 7, textJobs.EventsCount) + require.Equal(t, 6, jsonbJobs.EventsCount) + expectedPayload := `{"receivedAt":"2021-06-06T20:26:39.598+05:30","writeKey":"writeKey","requestIP":"[::1]", "batch": [{"anonymousId":"anon_id","channel":"android-sdk","context":{"app":{"build":"1","name":"RudderAndroidClient","namespace":"com.rudderlabs.android.sdk","version":"1.0"},"device":{"id":"49e4bdd1c280bc00","manufacturer":"Google","model":"Android SDK built for x86","name":"generic_x86"},"library":{"name":"com.rudderstack.android.sdk.core"},"locale":"en-US","network":{"carrier":"Android"},"screen":{"density":420,"height":1794,"width":1080},"traits":{"anonymousId":"49e4bdd1c280bc00"},"user_agent":"Dalvik/2.1.0 (Linux; U; Android 9; Android SDK built for x86 Build/PSR1.180720.075)"},"event":"Demo Track","integrations":{"All":true},"messageId":"b96f3d8a-7c26-4329-9671-4e3202f42f15","originalTimestamp":"2019-08-12T05:08:30.909Z","properties":{"category":"Demo Category","floatVal":4.501,"label":"Demo Label","testArray":[{"id":"elem1","value":"e1"},{"id":"elem2","value":"e2"}],"testMap":{"t1":"a","t2":4},"value":5},"rudderId":"a-292e-4e79-9880-f8009e0ae4a3","sentAt":"2019-08-12T05:08:30.909Z","type":"track"}]}` + + for i := range byteJobs.Jobs { + require.JSONEq( + t, + expectedPayload, + string(byteJobs.Jobs[i].EventPayload), + ) + } + for i := range textJobs.Jobs { + require.JSONEq( + t, + expectedPayload, + string(textJobs.Jobs[i].EventPayload), + ) + } + for i := range jsonbJobs.Jobs { + require.JSONEq( + t, + expectedPayload, + string(jsonbJobs.Jobs[i].EventPayload), + ) + } } From c5c21e30819b0756b75c9d692a7d201cd70c410e Mon Sep 17 00:00:00 2001 From: "siddarth.msv" Date: Fri, 13 Dec 2024 16:10:00 +0530 Subject: [PATCH 02/17] fixup! chore: event_payload column can be JSONB, BYTEA or TEXT --- jobsdb/jobsdb.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/jobsdb/jobsdb.go b/jobsdb/jobsdb.go index 0035cf86f7..23403395a8 100644 --- a/jobsdb/jobsdb.go +++ b/jobsdb/jobsdb.go @@ -792,7 +792,9 @@ func (jd *Handle) init() { jd.config = config.Default } - jd.conf.payloadColumnType = payloadColumnType(jd.config.GetIntVar(0, 1, jd.tablePrefix+"."+string(jd.ownerType)+".payloadColumnType", jd.tablePrefix+".payloadColumnType")) + if jd.conf.payloadColumnType == 0 { + jd.conf.payloadColumnType = payloadColumnType(jd.config.GetIntVar(0, 1, "JobsDB."+jd.tablePrefix+"."+string(jd.ownerType)+".payloadColumnType", "JobsDB."+jd.tablePrefix+".payloadColumnType")) + } if jd.stats == nil { jd.stats = stats.Default From 08c358abf4b7081da3bba1df700a4b8d08cf2b09 Mon Sep 17 00:00:00 2001 From: "siddarth.msv" Date: Tue, 17 Dec 2024 14:38:09 +0530 Subject: [PATCH 03/17] chore: accommodate jobsdb startup to create/update lastDS - only w, rw jobsdbs --- jobsdb/jobsdb.go | 55 ++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 53 insertions(+), 2 deletions(-) diff --git a/jobsdb/jobsdb.go b/jobsdb/jobsdb.go index 23403395a8..eff626ee5e 100644 --- a/jobsdb/jobsdb.go +++ b/jobsdb/jobsdb.go @@ -793,7 +793,7 @@ func (jd *Handle) init() { } if jd.conf.payloadColumnType == 0 { - jd.conf.payloadColumnType = payloadColumnType(jd.config.GetIntVar(0, 1, "JobsDB."+jd.tablePrefix+"."+string(jd.ownerType)+".payloadColumnType", "JobsDB."+jd.tablePrefix+".payloadColumnType")) + jd.conf.payloadColumnType = payloadColumnType(jd.config.GetIntVar(0, 1, "JobsDB.payloadColumnType")) } if jd.stats == nil { @@ -1083,8 +1083,59 @@ func (jd *Handle) writerSetup(ctx context.Context, l lock.LockToken) { jd.assertError(jd.doRefreshDSRangeList(l)) // If no DS present, add one - if len(jd.getDSList()) == 0 { + var createDS bool + var updateColumnType bool + dsList := jd.getDSList() + if len(dsList) == 0 { + createDS = true + } else { + // first check column type + var columnType string + err := jd.dbHandle.QueryRowContext( + ctx, + fmt.Sprintf( + `select data_type + from information_schema.columns + where table_name = '%[1]s' and column_name='event_payload';`, + dsList[len(dsList)-1].JobTable, + ), + ).Scan(&columnType) + jd.assertError(err) + jd.logger.Infow("previous column type", "type", columnType) + if columnType != string(jd.conf.payloadColumnType) { + var jobID int64 + err := jd.dbHandle.QueryRowContext( + ctx, + fmt.Sprintf(`select job_id from %q order by job_id asc limit 1`, dsList[len(dsList)-1].JobTable), + ).Scan(&jobID) + if errors.Is(err, sql.ErrNoRows) { + updateColumnType = true + } else if err == nil { + createDS = true + } else { + jd.assertError(err) + } + } + } + if createDS { jd.addNewDS(l, newDataSet(jd.tablePrefix, jd.computeNewIdxForAppend(l))) + } else if updateColumnType { + var payloadType string + switch jd.conf.payloadColumnType { + case payloadColumnType(0): + payloadType = "jsonb" + case payloadColumnType(1): + payloadType = "bytea" + case payloadColumnType(2): + payloadType = "text" + default: + jd.assertError(fmt.Errorf("invalid type: %d", jd.conf.payloadColumnType)) + } + _, err := jd.dbHandle.ExecContext( + ctx, + fmt.Sprintf(`alter table %q alter column event_payload type %s`, dsList[len(dsList)-1].JobTable, payloadType), + ) + jd.assertError(err) } jd.backgroundGroup.Go(crash.Wrapper(func() error { From 4129e5e3d44bdd8a66c1f2bf2a9eaae7c24b7450 Mon Sep 17 00:00:00 2001 From: "siddarth.msv" Date: Tue, 17 Dec 2024 15:24:15 +0530 Subject: [PATCH 04/17] fixup! chore: accommodate jobsdb startup to create/update lastDS - only w, rw jobsdbs --- jobsdb/jobsdb.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/jobsdb/jobsdb.go b/jobsdb/jobsdb.go index eff626ee5e..c7afd5b9c6 100644 --- a/jobsdb/jobsdb.go +++ b/jobsdb/jobsdb.go @@ -73,6 +73,12 @@ const ( pgErrorCodeTableReadonly = "RS001" ) +var payloadTypes = map[payloadColumnType]string{ + JSONB: "jsonb", + TEXT: "text", + BYTEA: "bytea", +} + type payloadColumnType int const ( @@ -1102,7 +1108,7 @@ func (jd *Handle) writerSetup(ctx context.Context, l lock.LockToken) { ).Scan(&columnType) jd.assertError(err) jd.logger.Infow("previous column type", "type", columnType) - if columnType != string(jd.conf.payloadColumnType) { + if columnType != payloadTypes[jd.conf.payloadColumnType] { var jobID int64 err := jd.dbHandle.QueryRowContext( ctx, From de47b4ee15f03f0f805b86c7255da9aadac63c7b Mon Sep 17 00:00:00 2001 From: "siddarth.msv" Date: Tue, 17 Dec 2024 15:54:26 +0530 Subject: [PATCH 05/17] fixup! chore: accommodate jobsdb startup to create/update lastDS - only w, rw jobsdbs --- jobsdb/jobsdb.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jobsdb/jobsdb.go b/jobsdb/jobsdb.go index c7afd5b9c6..148d4194ce 100644 --- a/jobsdb/jobsdb.go +++ b/jobsdb/jobsdb.go @@ -799,7 +799,7 @@ func (jd *Handle) init() { } if jd.conf.payloadColumnType == 0 { - jd.conf.payloadColumnType = payloadColumnType(jd.config.GetIntVar(0, 1, "JobsDB.payloadColumnType")) + jd.conf.payloadColumnType = payloadColumnType(jd.config.GetIntVar(2, 1, "JobsDB.payloadColumnType")) } if jd.stats == nil { From 06749955abbaa75db2d3f418255ce18f89fcc802 Mon Sep 17 00:00:00 2001 From: "siddarth.msv" Date: Tue, 17 Dec 2024 18:03:49 +0530 Subject: [PATCH 06/17] chore: jobsdb_sanitizeJSON test --- jobsdb/integration_test.go | 175 ++++++++++++++++++++++++++++++++++++- 1 file changed, 172 insertions(+), 3 deletions(-) diff --git a/jobsdb/integration_test.go b/jobsdb/integration_test.go index 085990081d..fabc701b2e 100644 --- a/jobsdb/integration_test.go +++ b/jobsdb/integration_test.go @@ -1026,7 +1026,9 @@ func requireSequential(t *testing.T, jobs []*JobT) { func TestJobsDB_SanitizeJSON(t *testing.T) { _ = startPostgres(t) - jobDB := Handle{config: config.New()} + conf := config.New() + conf.Set("JobsDB.payloadColumnType", 0) + jobDB := Handle{config: conf} ch := func(n int) string { return strings.Repeat("�", n) } @@ -1071,9 +1073,8 @@ func TestJobsDB_SanitizeJSON(t *testing.T) { err := jobDB.Setup(ReadWrite, false, strings.ToLower(rand.String(5))) require.NoError(t, err) - defer jobDB.TearDown() - eventPayload := []byte(`{"batch": [{"anonymousId":"anon_id","sentAt":"2019-08-12T05:08:30.909Z","type":"track"}]}`) + eventPayload := []byte(`{"batch":[{"anonymousId":"anon_id","sentAt":"2019-08-12T05:08:30.909Z","type":"track"}]}`) for i, tt := range toValidUTF8Tests { customVal := fmt.Sprintf("TEST_%d", i) @@ -1111,6 +1112,174 @@ func TestJobsDB_SanitizeJSON(t *testing.T) { string(unprocessedJob.Jobs[0].EventPayload), ) } + jobDB.TearDown() + + conf.Set("JobsDB.payloadColumnType", 2) + textDB := &Handle{config: conf} + require.NoError(t, textDB.Setup(ReadWrite, true, strings.ToLower(rand.String(5)))) + + toValidUTF8TestsForText := []struct { + in string + out string + err error + }{ + {`\u0000`, `\u0000`, nil}, + {`\u0000☺\u0000b☺`, `\u0000☺\u0000b☺`, nil}, + // NOTE: we are not handling the following: + // {"\u0000", ""}, + // {"\u0000☺\u0000b☺", "☺b☺"}, + + {"", "", nil}, + {"abc", "abc", nil}, + {"\uFDDD", "\uFDDD", nil}, + {"a\xffb", `a\ufffdb`, nil}, + {"a\xffb\uFFFD", `a\ufffdb�`, nil}, + {"a☺\xffb☺\xC0\xAFc☺\xff", `a☺\ufffdb☺\ufffd\ufffdc☺\ufffd`, nil}, + {"\xC0\xAF", `\ufffd\ufffd`, nil}, + {"\xE0\x80\xAF", `\ufffd\ufffd\ufffd`, nil}, + {"\xed\xa0\x80", `\ufffd\ufffd\ufffd`, nil}, + {"\xed\xbf\xbf", `\ufffd\ufffd\ufffd`, nil}, + {"\xF0\x80\x80\xaf", `\ufffd\ufffd\ufffd\ufffd`, nil}, + {"\xF8\x80\x80\x80\xAF", `\ufffd\ufffd\ufffd\ufffd\ufffd`, nil}, + {"\xFC\x80\x80\x80\x80\xAF", `\ufffd\ufffd\ufffd\ufffd\ufffd\ufffd`, nil}, + + // {"\ud800", ""}, + // 15 + {`\ud800`, `\ud800`, nil}, + {`\uDEAD`, `\uDEAD`, nil}, + + {`\uD83D\ub000`, `\uD83D\ub000`, nil}, + {`\uD83D\ude04`, `\uD83D\ude04`, nil}, + + {`\u4e2d\u6587`, `\u4e2d\u6587`, nil}, + {`\ud83d\udc4a`, `\ud83d\udc4a`, nil}, + + // 21 + {`\U0001f64f`, ch(1), errors.New(`readEscapedChar: invalid escape char after`)}, + {`\uD83D\u00`, ch(1), errors.New(`readU4: expects 0~9 or a~f, but found`)}, + } + for i, tt := range toValidUTF8TestsForText { + + customVal := fmt.Sprintf("TEST_%d", i) + + jobs := []*JobT{{ + Parameters: []byte(`{"batch_id":1,"source_id":"sourceID","source_job_run_id":""}`), + EventPayload: bytes.Replace(eventPayload, []byte("track"), []byte(tt.in), 1), + UserID: uuid.New().String(), + UUID: uuid.New(), + CustomVal: customVal, + WorkspaceId: defaultWorkspaceID, + EventCount: 1, + }} + + err := textDB.Store(context.Background(), jobs) + if tt.err != nil { + require.NoError(t, err, "text column should never error", i) + continue + } + + require.NoError(t, err) + + unprocessedJob, err := textDB.GetUnprocessed(context.Background(), GetQueryParams{ + CustomValFilters: []string{customVal}, + JobsLimit: 10, + ParameterFilters: []ParameterFilterT{}, + }) + require.NoError(t, err, "should not error") + + require.Len(t, unprocessedJob.Jobs, 1) + + require.Equal(t, + string(bytes.Replace(eventPayload, []byte("track"), []byte(tt.out), 1)), + string(unprocessedJob.Jobs[0].EventPayload), + "testCase", i, + ) + } + textDB.TearDown() + + conf.Set("JobsDB.payloadColumnType", 1) + byteaDB := &Handle{config: conf} + require.NoError(t, byteaDB.Setup(ReadWrite, true, strings.ToLower(rand.String(5)))) + + byteaInvalidInputSyntaxError := errors.New("pq: invalid input syntax for type bytea") + toValidUTF8TestsForBytea := []struct { + in string + out string + err error + }{ + {`\u0000`, "", nil}, + {`\u0000☺\u0000b☺`, "☺b☺", nil}, + // NOTE: we are not handling the following: + // {"\u0000", ""}, + // {"\u0000☺\u0000b☺", "☺b☺"}, + + {"", "", nil}, + {"abc", "abc", nil}, + {"\uFDDD", "\uFDDD", nil}, + {"a\xffb", "a" + ch(1) + "b", byteaInvalidInputSyntaxError}, + {"a\xffb\uFFFD", "a" + ch(1) + "b\uFFFD", byteaInvalidInputSyntaxError}, + {"a☺\xffb☺\xC0\xAFc☺\xff", "a☺" + ch(1) + "b☺" + ch(2) + "c☺" + ch(1), byteaInvalidInputSyntaxError}, + {"\xC0\xAF", ch(2), byteaInvalidInputSyntaxError}, + {"\xE0\x80\xAF", ch(3), byteaInvalidInputSyntaxError}, + {"\xed\xa0\x80", ch(3), byteaInvalidInputSyntaxError}, + {"\xed\xbf\xbf", ch(3), byteaInvalidInputSyntaxError}, + {"\xF0\x80\x80\xaf", ch(4), byteaInvalidInputSyntaxError}, + {"\xF8\x80\x80\x80\xAF", ch(5), byteaInvalidInputSyntaxError}, + {"\xFC\x80\x80\x80\x80\xAF", ch(6), byteaInvalidInputSyntaxError}, + + // {"\ud800", ""}, + // 15 + {`\ud800`, ch(1), nil}, + {`\uDEAD`, ch(1), nil}, + + {`\uD83D\ub000`, string([]byte{239, 191, 189, 235, 128, 128}), nil}, + {`\uD83D\ude04`, "😄", nil}, + + {`\u4e2d\u6587`, "中文", nil}, + {`\ud83d\udc4a`, "\xf0\x9f\x91\x8a", nil}, + + {`\U0001f64f`, ch(1), errors.New(`readEscapedChar: invalid escape char after`)}, + {`\uD83D\u00`, ch(1), errors.New(`readU4: expects 0~9 or a~f, but found`)}, + } + for i, tt := range toValidUTF8TestsForBytea { + + customVal := fmt.Sprintf("TEST_%d", i) + + jobs := []*JobT{{ + Parameters: []byte(`{"batch_id":1,"source_id":"sourceID","source_job_run_id":""}`), + EventPayload: bytes.Replace(eventPayload, []byte("track"), []byte(tt.in), 1), + UserID: uuid.New().String(), + UUID: uuid.New(), + CustomVal: customVal, + WorkspaceId: defaultWorkspaceID, + EventCount: 1, + }} + + err := byteaDB.Store(context.Background(), jobs) + if tt.err != nil { + require.Error(t, err, "should error", i) + require.Contains(t, err.Error(), tt.err.Error(), "should contain error", i) + continue + } + + require.NoError(t, err, i) + + unprocessedJob, err := byteaDB.GetUnprocessed(context.Background(), GetQueryParams{ + CustomValFilters: []string{customVal}, + JobsLimit: 10, + ParameterFilters: []ParameterFilterT{}, + }) + require.NoError(t, err, "should not error", i) + + require.Len(t, unprocessedJob.Jobs, 1) + + require.JSONEq(t, + string(bytes.Replace(eventPayload, []byte("track"), []byte(tt.out), 1)), + string(unprocessedJob.Jobs[0].EventPayload), + i, + ) + } + byteaDB.TearDown() } // BenchmarkJobsdb takes time... keep waiting From 3b802a9910a2f2903f5bf81a82e9245f039fd858 Mon Sep 17 00:00:00 2001 From: "siddarth.msv" Date: Wed, 18 Dec 2024 12:30:34 +0530 Subject: [PATCH 07/17] chore: address tests for text payload column --- jobsdb/jobsdb.go | 8 +++- processor/processor_geolocation_test.go | 40 +++++++++---------- .../batchrouter/batchrouter_isolation_test.go | 10 ++++- 3 files changed, 36 insertions(+), 22 deletions(-) diff --git a/jobsdb/jobsdb.go b/jobsdb/jobsdb.go index 148d4194ce..08584d1a85 100644 --- a/jobsdb/jobsdb.go +++ b/jobsdb/jobsdb.go @@ -2392,7 +2392,13 @@ func (p *Payload) PayloadBytes() []byte { if p.B != nil { return p.B } - return []byte(p.S) + // return []byte(p.S) + buffer := new(bytes.Buffer) + if err := json.Compact(buffer, []byte(p.S)); err != nil { + return []byte(`{}`) + } + + return buffer.Bytes() } func (p *Payload) Scan(src interface{}) error { diff --git a/processor/processor_geolocation_test.go b/processor/processor_geolocation_test.go index 17b143528b..a11e687d9a 100644 --- a/processor/processor_geolocation_test.go +++ b/processor/processor_geolocation_test.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "database/sql" - "encoding/json" "fmt" "io" "net/http" @@ -44,8 +43,8 @@ func TestProcessorGeolocation(t *testing.T) { WithGeolocationEnabledAtSource(true). WithClientIP(boxfordIP). WithContextIP(londonIP). - Run(t, func(t *testing.T, event json.RawMessage) { - require.Empty(t, gjson.GetBytes(event, "context.geo").Raw, "no geolocation information should be present when the feature is disabled") + Run(t, func(t *testing.T, event string) { + require.Empty(t, gjson.Get(event, "context.geo").Raw, "no geolocation information should be present when the feature is disabled") }) }) @@ -55,8 +54,8 @@ func TestProcessorGeolocation(t *testing.T) { WithGeolocationEnabledAtSource(false). WithClientIP(boxfordIP). WithContextIP(londonIP). - Run(t, func(t *testing.T, event json.RawMessage) { - require.Empty(t, gjson.GetBytes(event, "context.geo").Raw, "no geolocation information should be present when geolocation is disabled at source") + Run(t, func(t *testing.T, event string) { + require.Empty(t, gjson.Get(event, "context.geo").Raw, "no geolocation information should be present when geolocation is disabled at source") }) }) @@ -66,10 +65,10 @@ func TestProcessorGeolocation(t *testing.T) { WithGeolocationEnabledAtSource(true). WithClientIP(boxfordIP). WithContextIP(londonIP). - Run(t, func(t *testing.T, event json.RawMessage) { - require.NotEmpty(t, gjson.GetBytes(event, "context.geo").Raw, string(event), "geolocation information should be present") - require.Equal(t, londonIP, gjson.GetBytes(event, "context.geo.ip").String(), "contex.ip should take precedence over clientIP") - require.Equal(t, "London", gjson.GetBytes(event, "context.geo.city").String(), "contex.ip should take precedence over clientIP") + Run(t, func(t *testing.T, event string) { + require.NotEmpty(t, gjson.Get(event, "context.geo").Raw, string(event), "geolocation information should be present") + require.Equal(t, londonIP, gjson.Get(event, "context.geo.ip").String(), "contex.ip should take precedence over clientIP") + require.Equal(t, "London", gjson.Get(event, "context.geo.city").String(), "contex.ip should take precedence over clientIP") }) }) @@ -79,10 +78,10 @@ func TestProcessorGeolocation(t *testing.T) { WithGeolocationEnabledAtSource(true). WithClientIP(boxfordIP). WithContextIP(""). - Run(t, func(t *testing.T, event json.RawMessage) { - require.NotEmpty(t, gjson.GetBytes(event, "context.geo").Raw, string(event), "geolocation information should be present") - require.Equal(t, boxfordIP, gjson.GetBytes(event, "context.geo.ip").String(), "clientIP should be used by the geolocation service") - require.Equal(t, "Boxford", gjson.GetBytes(event, "context.geo.city").String(), "clientIP should be used by the geolocation service") + Run(t, func(t *testing.T, event string) { + require.NotEmpty(t, gjson.Get(event, "context.geo").Raw, string(event), "geolocation information should be present") + require.Equal(t, boxfordIP, gjson.Get(event, "context.geo.ip").String(), "clientIP should be used by the geolocation service") + require.Equal(t, "Boxford", gjson.Get(event, "context.geo.city").String(), "clientIP should be used by the geolocation service") }) }) @@ -92,10 +91,10 @@ func TestProcessorGeolocation(t *testing.T) { WithGeolocationEnabledAtSource(true). WithClientIP(londonIP). WithContextIP(invalidIP). - Run(t, func(t *testing.T, event json.RawMessage) { - require.NotEmpty(t, gjson.GetBytes(event, "context.geo").Raw, string(event), "geolocation information should be present") - require.Equal(t, invalidIP, gjson.GetBytes(event, "context.geo.ip").String(), "geolocation service should use the first non blank context.ip even if invalid") - require.Equal(t, "", gjson.GetBytes(event, "context.geo.city").String(), "geolocation service should use the first non blank context.ip even if invalid") + Run(t, func(t *testing.T, event string) { + require.NotEmpty(t, gjson.Get(event, "context.geo").Raw, string(event), "geolocation information should be present") + require.Equal(t, invalidIP, gjson.Get(event, "context.geo.ip").String(), "geolocation service should use the first non blank context.ip even if invalid") + require.Equal(t, "", gjson.Get(event, "context.geo.city").String(), "geolocation service should use the first non blank context.ip even if invalid") }) }) } @@ -127,7 +126,7 @@ func (s *geolocationScenario) WithClientIP(ip string) *geolocationScenario { return s } -func (s *geolocationScenario) Run(t *testing.T, verification func(t *testing.T, event json.RawMessage)) { +func (s *geolocationScenario) Run(t *testing.T, verification func(t *testing.T, event string)) { config.Reset() defer config.Reset() writeKey := "writekey-1" @@ -141,8 +140,8 @@ func (s *geolocationScenario) Run(t *testing.T, verification func(t *testing.T, s.requireJobsCount(t, db, "gw", "succeeded", 1) s.requireJobsCount(t, db, "rt", "aborted", 1) - var payload json.RawMessage - require.NoError(t, db.QueryRow("SELECT event_payload FROM unionjobsdb('rt',1)").Scan(&payload)) + var payload string + require.NoError(t, db.QueryRow("SELECT event_payload FROM rt_jobs_1").Scan(&payload)) verification(t, payload) } @@ -192,6 +191,7 @@ func (s *geolocationScenario) startAll(t *testing.T, writeKey string) (gatewayUr } func (s *geolocationScenario) runRudderServer(ctx context.Context, port int, postgresContainer *postgres.Resource, cbURL, transformerURL, tmpDir string) (err error) { + config.Set("enableStats", false) config.Set("CONFIG_BACKEND_URL", cbURL) config.Set("WORKSPACE_TOKEN", "token") config.Set("DB.host", postgresContainer.Host) diff --git a/router/batchrouter/batchrouter_isolation_test.go b/router/batchrouter/batchrouter_isolation_test.go index d147255f52..13209b92af 100644 --- a/router/batchrouter/batchrouter_isolation_test.go +++ b/router/batchrouter/batchrouter_isolation_test.go @@ -242,6 +242,7 @@ func BatchrouterIsolationScenario(t testing.TB, spec *BrtIsolationScenarioSpec) config.Set("DB.name", postgresContainer.Database) config.Set("DB.password", postgresContainer.Password) config.Set("DB.host", postgresContainer.Host) + config.Set("enableStats", false) config.Set("Warehouse.mode", "off") config.Set("DestinationDebugger.disableEventDeliveryStatusUploads", true) @@ -358,7 +359,14 @@ func BatchrouterIsolationScenario(t testing.TB, spec *BrtIsolationScenarioSpec) } var minExecTime, maxExecTime time.Time - require.NoError(t, postgresContainer.DB.QueryRow("SELECT min(exec_time), max(exec_time) FROM unionjobsdbmetadata('batch_rt',20)").Scan(&minExecTime, &maxExecTime), "it should be able to query the min and max execution times") + require.NoError( + t, + postgresContainer.DB.QueryRowContext( + ctx, + "SELECT min(exec_time), max(exec_time) FROM unionjobsdbmetadata('batch_rt',20)", + ).Scan(&minExecTime, &maxExecTime), + "it should be able to query the min and max execution times", + ) overallDuration = maxExecTime.Sub(minExecTime) cancel() From ef0001a98f31d105835c4b8ebef5e886c7338dd9 Mon Sep 17 00:00:00 2001 From: "siddarth.msv" Date: Wed, 18 Dec 2024 12:57:58 +0530 Subject: [PATCH 08/17] fixup! chore: address tests for text payload column --- processor/processor_geolocation_test.go | 6 +++--- processor/processor_isolation_test.go | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/processor/processor_geolocation_test.go b/processor/processor_geolocation_test.go index a11e687d9a..b4b0e37cab 100644 --- a/processor/processor_geolocation_test.go +++ b/processor/processor_geolocation_test.go @@ -66,7 +66,7 @@ func TestProcessorGeolocation(t *testing.T) { WithClientIP(boxfordIP). WithContextIP(londonIP). Run(t, func(t *testing.T, event string) { - require.NotEmpty(t, gjson.Get(event, "context.geo").Raw, string(event), "geolocation information should be present") + require.NotEmpty(t, gjson.Get(event, "context.geo").Raw, event, "geolocation information should be present") require.Equal(t, londonIP, gjson.Get(event, "context.geo.ip").String(), "contex.ip should take precedence over clientIP") require.Equal(t, "London", gjson.Get(event, "context.geo.city").String(), "contex.ip should take precedence over clientIP") }) @@ -79,7 +79,7 @@ func TestProcessorGeolocation(t *testing.T) { WithClientIP(boxfordIP). WithContextIP(""). Run(t, func(t *testing.T, event string) { - require.NotEmpty(t, gjson.Get(event, "context.geo").Raw, string(event), "geolocation information should be present") + require.NotEmpty(t, gjson.Get(event, "context.geo").Raw, event, "geolocation information should be present") require.Equal(t, boxfordIP, gjson.Get(event, "context.geo.ip").String(), "clientIP should be used by the geolocation service") require.Equal(t, "Boxford", gjson.Get(event, "context.geo.city").String(), "clientIP should be used by the geolocation service") }) @@ -92,7 +92,7 @@ func TestProcessorGeolocation(t *testing.T) { WithClientIP(londonIP). WithContextIP(invalidIP). Run(t, func(t *testing.T, event string) { - require.NotEmpty(t, gjson.Get(event, "context.geo").Raw, string(event), "geolocation information should be present") + require.NotEmpty(t, gjson.Get(event, "context.geo").Raw, event, "geolocation information should be present") require.Equal(t, invalidIP, gjson.Get(event, "context.geo.ip").String(), "geolocation service should use the first non blank context.ip even if invalid") require.Equal(t, "", gjson.Get(event, "context.geo.city").String(), "geolocation service should use the first non blank context.ip even if invalid") }) diff --git a/processor/processor_isolation_test.go b/processor/processor_isolation_test.go index 70d4703cf8..891cccfe69 100644 --- a/processor/processor_isolation_test.go +++ b/processor/processor_isolation_test.go @@ -31,6 +31,7 @@ import ( trand "github.com/rudderlabs/rudder-go-kit/testhelper/rand" "github.com/rudderlabs/rudder-server/processor/isolation" "github.com/rudderlabs/rudder-server/runner" + "github.com/rudderlabs/rudder-server/services/rmetrics" "github.com/rudderlabs/rudder-server/testhelper/health" "github.com/rudderlabs/rudder-server/testhelper/workspaceConfig" "github.com/rudderlabs/rudder-server/utils/types/deployment" @@ -227,6 +228,7 @@ func ProcIsolationScenario(t testing.TB, spec *ProcIsolationScenarioSpec) (overa config.Set("JobsDB.migrateDSLoopSleepDuration", "60m") config.Set("Router.toAbortDestinationIDs", destinationID) config.Set("archival.Enabled", false) + config.Set("enableStats", false) config.Set("Processor.isolationMode", string(spec.isolationMode)) @@ -330,9 +332,7 @@ func ProcIsolationScenario(t testing.TB, spec *ProcIsolationScenarioSpec) (overa overallDuration = maxJobTime.Sub(gwMinJobTime) require.Eventually(t, func() bool { - var pendingJobsCount int - require.NoError(t, postgresContainer.DB.QueryRow("SELECT count(*) FROM unionjobsdb('rt',5) WHERE COALESCE(job_state, 'pending') != 'aborted'").Scan(&pendingJobsCount)) - return pendingJobsCount == 0 + return rmetrics.PendingEvents("rt", rmetrics.All, rmetrics.All).IntValue() == 0 }, 100*time.Second, 1*time.Second, "all rt jobs should be aborted") cancel() <-svcDone From 85249336739d3cd0ded179507b777850a3fe9131 Mon Sep 17 00:00:00 2001 From: "siddarth.msv" Date: Thu, 19 Dec 2024 15:10:22 +0530 Subject: [PATCH 09/17] fixup! chore: address tests for text payload column --- jobsdb/migration.go | 3 +- .../000014_event_payload_column_type.up.sql | 68 +++++++++++++++++++ 2 files changed, 69 insertions(+), 2 deletions(-) create mode 100644 sql/migrations/node/000014_event_payload_column_type.up.sql diff --git a/jobsdb/migration.go b/jobsdb/migration.go index ea3c8bf825..ba8e0204a1 100644 --- a/jobsdb/migration.go +++ b/jobsdb/migration.go @@ -503,7 +503,7 @@ func (jd *Handle) migrateJobsInTx(ctx context.Context, tx *Tx, srcDS, destDS dat ).RecordDuration()() columnTypeMap := map[string]string{srcDS.JobTable: "jsonb", destDS.JobTable: "jsonb"} - // find colummn types first - to differentiate between `bytea` and `jsonb` + // find colummn types first - to differentiate between `text`, `bytea` and `jsonb` rows, err := tx.QueryContext( ctx, fmt.Sprintf( @@ -531,7 +531,6 @@ func (jd *Handle) migrateJobsInTx(ctx context.Context, tx *Tx, srcDS, destDS dat return 0, fmt.Errorf("rows.Err() on column types: %w", err) } payloadLiteral := getColumnConversion(columnTypeMap[srcDS.JobTable], columnTypeMap[destDS.JobTable]) - jd.logger.Info(payloadLiteral) compactDSQuery := fmt.Sprintf( `with last_status as (select * from "v_last_%[1]s"), diff --git a/sql/migrations/node/000014_event_payload_column_type.up.sql b/sql/migrations/node/000014_event_payload_column_type.up.sql new file mode 100644 index 0000000000..d43a6fb139 --- /dev/null +++ b/sql/migrations/node/000014_event_payload_column_type.up.sql @@ -0,0 +1,68 @@ +DROP FUNCTION IF EXISTS payloadColumnType(TEXT); +DROP FUNCTION IF EXISTS payloadColumnConvertToText(TEXT); +DROP FUNCTION IF EXISTS unionjobsdb(text,integer); + +-- returns payload column tyoe of the jobs table +CREATE OR REPLACE FUNCTION payloadColumnType(tableName TEXT) +RETURNS TEXT +AS $$ +BEGIN +RETURN(SELECT data_type FROM information_schema.columns WHERE table_name = tableName and column_name='event_payload' LIMIT 1); +END; +$$ LANGUAGE plpgsql; + +-- return payload column type conversion literal +CREATE OR REPLACE FUNCTION payloadColumnConvertToText(columnType TEXT) +RETURNS TEXT +AS $$ +DECLARE + ret TEXT; +BEGIN +CASE + WHEN columnType = 'text' THEN + ret = 'event_payload'; + WHEN columnType = 'jsonb' THEN + ret = 'event_payload::TEXT'; + WHEN columnType = 'bytea' THEN + ret = 'convert_from(event_payload, "UTF8")'; + ELSE + ret = 'invalid'; +END CASE; +RETURN ret; +END +$$ LANGUAGE plpgsql; + + +-- change function return table's payload type +CREATE OR REPLACE FUNCTION unionjobsdb(prefix text, num int) +RETURNS table ( + t_name text, + job_id bigint, + workspace_id text, + uuid uuid, + user_id text, + parameters jsonb, + custom_val character varying(64), + event_payload text, + event_count integer, + created_at timestamp with time zone, + expire_at timestamp with time zone, + status_id bigint, + job_state character varying(64), + attempt smallint, + exec_time timestamp with time zone, + error_code character varying(32), + error_response jsonb +) +AS $$ +DECLARE + qry text; +BEGIN +SELECT string_agg( + format('SELECT %1$L, j.job_id, j.workspace_id, j.uuid, j.user_id, j.parameters, j.custom_val, (j.event_payload::TEXT), j.event_count, j.created_at, j.expire_at, latest_status.id, latest_status.job_state, latest_status.attempt, latest_status.exec_time, latest_status.error_code, latest_status.error_response FROM %1$I j LEFT JOIN %2$I latest_status on latest_status.job_id = j.job_id', alltables.table_name, 'v_last_' || prefix || '_job_status_'|| substring(alltables.table_name, char_length(prefix)+7,30)), + ' UNION ') INTO qry + FROM (select table_name from information_schema.tables +WHERE table_name LIKE prefix || '_jobs_%' order by table_name asc LIMIT num) alltables; +RETURN QUERY EXECUTE qry; +END; +$$ LANGUAGE plpgsql; \ No newline at end of file From 5a4adff54578ea3afbb3f3d84c1757f0157744f7 Mon Sep 17 00:00:00 2001 From: "siddarth.msv" Date: Thu, 26 Dec 2024 17:33:49 +0530 Subject: [PATCH 10/17] fixup! chore: address tests for text payload column --- jobsdb/jobsdb.go | 36 ++---------------------------------- 1 file changed, 2 insertions(+), 34 deletions(-) diff --git a/jobsdb/jobsdb.go b/jobsdb/jobsdb.go index 08584d1a85..c483edf05f 100644 --- a/jobsdb/jobsdb.go +++ b/jobsdb/jobsdb.go @@ -2307,7 +2307,7 @@ func (jd *Handle) getJobsDS(ctx context.Context, ds dataSetT, lastDS bool, param resultsetStates := map[string]struct{}{} for rows.Next() { var job JobT - var payload Payload + var payload []byte var jsState sql.NullString var jsAttemptNum sql.NullInt64 var jsExecTime sql.NullTime @@ -2323,7 +2323,7 @@ func (jd *Handle) getJobsDS(ctx context.Context, ds dataSetT, lastDS bool, param if err != nil { return JobsResult{}, false, err } - job.EventPayload = payload.PayloadBytes() + job.EventPayload = payload if jsState.Valid { resultsetStates[jsState.String] = struct{}{} job.LastJobStatus.JobState = jsState.String @@ -2383,38 +2383,6 @@ func (jd *Handle) getJobsDS(ctx context.Context, ds dataSetT, lastDS bool, param }, true, nil } -type Payload struct { - S string - B []byte -} - -func (p *Payload) PayloadBytes() []byte { - if p.B != nil { - return p.B - } - // return []byte(p.S) - buffer := new(bytes.Buffer) - if err := json.Compact(buffer, []byte(p.S)); err != nil { - return []byte(`{}`) - } - - return buffer.Bytes() -} - -func (p *Payload) Scan(src interface{}) error { - b, ok := src.([]byte) - if !ok { - s, ok := src.(string) - if !ok { - return errors.New("neither string nor bytes") - } - p.S = s - return nil - } - p.B = b - return nil -} - func (jd *Handle) updateJobStatusDSInTx(ctx context.Context, tx *Tx, ds dataSetT, statusList []*JobStatusT, tags statTags) (updatedStates map[string]map[string]map[ParameterFilterT]struct{}, err error) { if len(statusList) == 0 { return From 571ee382ee351091b390c9f4a9d50869ef023611 Mon Sep 17 00:00:00 2001 From: "siddarth.msv" Date: Thu, 26 Dec 2024 23:04:33 +0530 Subject: [PATCH 11/17] chore: update batchrouter isolation test to use inline json payload --- jobsdb/jobsdb.go | 2 +- .../batchrouter/batchrouter_isolation_test.go | 55 +------------------ 2 files changed, 3 insertions(+), 54 deletions(-) diff --git a/jobsdb/jobsdb.go b/jobsdb/jobsdb.go index c483edf05f..d4f625267d 100644 --- a/jobsdb/jobsdb.go +++ b/jobsdb/jobsdb.go @@ -799,7 +799,7 @@ func (jd *Handle) init() { } if jd.conf.payloadColumnType == 0 { - jd.conf.payloadColumnType = payloadColumnType(jd.config.GetIntVar(2, 1, "JobsDB.payloadColumnType")) + jd.conf.payloadColumnType = payloadColumnType(jd.config.GetIntVar(int(TEXT), 1, "JobsDB.payloadColumnType")) } if jd.stats == nil { diff --git a/router/batchrouter/batchrouter_isolation_test.go b/router/batchrouter/batchrouter_isolation_test.go index 13209b92af..88a040b3ab 100644 --- a/router/batchrouter/batchrouter_isolation_test.go +++ b/router/batchrouter/batchrouter_isolation_test.go @@ -392,60 +392,9 @@ type brtIsolationJobSpec struct { func (jobSpec *brtIsolationJobSpec) payload() []byte { var template string if jobSpec.destType == "MINIO" { - template = `{ - "userId": %[1]q, - "anonymousId": %[2]q, - "testJobId": %[3]d, - "workspaceID": %[4]q, - "destType": %[6]q, - "type": "identify", - "context": - { - "traits": - { - "trait1": "new-val" - }, - "ip": "14.5.67.21", - "library": - { - "name": "http" - } - }, - "timestamp": "2020-02-02T00:23:09.544Z", - "receivedAt": %[5]q - }` + template = `{"userId": %[1]q,"anonymousId": %[2]q,"testJobId": %[3]d,"workspaceID": %[4]q,"destType": %[6]q,"type": "identify","context":{"traits":{"trait1": "new-val"},"ip": "14.5.67.21","library":{"name": "http"}},"timestamp": "2020-02-02T00:23:09.544Z","receivedAt": %[5]q}` } else { - template = `{ - "data": { - "userId": %[1]q, - "anonymousId": %[2]q, - "testJobId": %[3]d, - "workspaceID": %[4]q, - "destType": %[6]q, - "type": "identify", - "context_traits_trait1": "new-val", - "context_ip": "14.5.67.21", - "context_library_name": "http", - "timestamp": "2020-02-02T00:23:09.544Z" - }, - "userId": %[1]q, - "metadata": { - "table": "pages", - "columns": { - "userId": "string", - "anonymousId": "string", - "testJobId": "int", - "workspaceID": "string", - "destType": "string", - "type": "string", - "context_traits_trait1": "string", - "context_ip": "string", - "context_library_name": "string", - "timestamp": "string" - }, - "receivedAt": %[5]q - } - }` + template = `{"data": {"userId": %[1]q,"anonymousId": %[2]q,"testJobId": %[3]d,"workspaceID": %[4]q,"destType": %[6]q,"type": "identify","context_traits_trait1": "new-val","context_ip": "14.5.67.21","context_library_name": "http","timestamp": "2020-02-02T00:23:09.544Z"},"userId": %[1]q,"metadata": {"table": "pages","columns": {"userId": "string","anonymousId": "string","testJobId": "int","workspaceID": "string","destType": "string","type": "string","context_traits_trait1": "string","context_ip": "string","context_library_name": "string","timestamp": "string"},"receivedAt": %[5]q}}` } return []byte(fmt.Sprintf(template, jobSpec.userID, jobSpec.userID, jobSpec.id, jobSpec.workspaceID, time.Now().Format(misc.RFC3339Milli), jobSpec.destType)) } From ee2c12abc5e8f9b59fc2415903a0f55ede2ec0b5 Mon Sep 17 00:00:00 2001 From: "siddarth.msv" Date: Sat, 28 Dec 2024 21:04:34 +0530 Subject: [PATCH 12/17] chore: some review comments work --- jobsdb/jobsdb.go | 69 ++++++---------------------------------- jobsdb/migration.go | 40 +++++++++++------------ jobsdb/migration_test.go | 10 +++--- 3 files changed, 33 insertions(+), 86 deletions(-) diff --git a/jobsdb/jobsdb.go b/jobsdb/jobsdb.go index d4f625267d..ddf97c9867 100644 --- a/jobsdb/jobsdb.go +++ b/jobsdb/jobsdb.go @@ -79,12 +79,12 @@ var payloadTypes = map[payloadColumnType]string{ BYTEA: "bytea", } -type payloadColumnType int +type payloadColumnType string const ( - JSONB payloadColumnType = iota - BYTEA - TEXT + JSONB payloadColumnType = "jsonb" + BYTEA = "bytea" + TEXT = "text" // JSON // Explore afterwards? ) @@ -720,13 +720,13 @@ func WithStats(s stats.Stats) OptsFunc { func WithBinaryPayload() OptsFunc { return func(jd *Handle) { - jd.conf.payloadColumnType = 1 + jd.conf.payloadColumnType = "bytea" } } func WithTextPayload() OptsFunc { return func(jd *Handle) { - jd.conf.payloadColumnType = 2 + jd.conf.payloadColumnType = "text" } } @@ -798,8 +798,8 @@ func (jd *Handle) init() { jd.config = config.Default } - if jd.conf.payloadColumnType == 0 { - jd.conf.payloadColumnType = payloadColumnType(jd.config.GetIntVar(int(TEXT), 1, "JobsDB.payloadColumnType")) + if jd.conf.payloadColumnType == "" { + jd.conf.payloadColumnType = payloadColumnType(jd.config.GetStringVar(TEXT, "JobsDB.payloadColumnType")) } if jd.stats == nil { @@ -1089,59 +1089,8 @@ func (jd *Handle) writerSetup(ctx context.Context, l lock.LockToken) { jd.assertError(jd.doRefreshDSRangeList(l)) // If no DS present, add one - var createDS bool - var updateColumnType bool - dsList := jd.getDSList() - if len(dsList) == 0 { - createDS = true - } else { - // first check column type - var columnType string - err := jd.dbHandle.QueryRowContext( - ctx, - fmt.Sprintf( - `select data_type - from information_schema.columns - where table_name = '%[1]s' and column_name='event_payload';`, - dsList[len(dsList)-1].JobTable, - ), - ).Scan(&columnType) - jd.assertError(err) - jd.logger.Infow("previous column type", "type", columnType) - if columnType != payloadTypes[jd.conf.payloadColumnType] { - var jobID int64 - err := jd.dbHandle.QueryRowContext( - ctx, - fmt.Sprintf(`select job_id from %q order by job_id asc limit 1`, dsList[len(dsList)-1].JobTable), - ).Scan(&jobID) - if errors.Is(err, sql.ErrNoRows) { - updateColumnType = true - } else if err == nil { - createDS = true - } else { - jd.assertError(err) - } - } - } - if createDS { + if len(jd.getDSList()) == 0 { jd.addNewDS(l, newDataSet(jd.tablePrefix, jd.computeNewIdxForAppend(l))) - } else if updateColumnType { - var payloadType string - switch jd.conf.payloadColumnType { - case payloadColumnType(0): - payloadType = "jsonb" - case payloadColumnType(1): - payloadType = "bytea" - case payloadColumnType(2): - payloadType = "text" - default: - jd.assertError(fmt.Errorf("invalid type: %d", jd.conf.payloadColumnType)) - } - _, err := jd.dbHandle.ExecContext( - ctx, - fmt.Sprintf(`alter table %q alter column event_payload type %s`, dsList[len(dsList)-1].JobTable, payloadType), - ) - jd.assertError(err) } jd.backgroundGroup.Go(crash.Wrapper(func() error { diff --git a/jobsdb/migration.go b/jobsdb/migration.go index ba8e0204a1..1e4e355ade 100644 --- a/jobsdb/migration.go +++ b/jobsdb/migration.go @@ -46,7 +46,7 @@ func (jd *Handle) migrateDSLoop(ctx context.Context) { err := jd.doMigrateDS(timeoutCtx) jd.stats.NewTaggedStat("migration_loop", stats.TimerType, stats.Tags{"customVal": jd.tablePrefix, "error": strconv.FormatBool(err != nil)}).Since(start) if err != nil { - return fmt.Errorf("failed to migrate ds: %w", err) + return fmt.Errorf("migrate ds: %w", err) } return nil } @@ -85,7 +85,7 @@ func (jd *Handle) doMigrateDS(ctx context.Context) error { return jd.withDistributedSharedLock(ctx, tx, "schema_migrate", func() error { // cannot run while schema migration is running // Take the lock and run actual migration if !jd.dsMigrationLock.TryLockWithCtx(ctx) { - return fmt.Errorf("failed to acquire lock: %w", ctx.Err()) + return fmt.Errorf("acquire dsMigrationLock: %w", ctx.Err()) } defer jd.dsMigrationLock.Unlock() // repeat the check after the dsMigrationLock is acquired to get correct pending jobs count. @@ -122,16 +122,16 @@ func (jd *Handle) doMigrateDS(ctx context.Context) error { opPayload, err := json.Marshal(&journalOpPayloadT{From: migrateFromDatasets, To: destination}) if err != nil { - return fmt.Errorf("failed to marshal journal payload: %w", err) + return fmt.Errorf("marshal journal payload: %w", err) } opID, err := jd.JournalMarkStartInTx(tx, migrateCopyOperation, opPayload) if err != nil { - return fmt.Errorf("failed to mark journal start: %w", err) + return fmt.Errorf("mark journal start: %w", err) } if err = jd.createDSTablesInTx(ctx, tx, destination); err != nil { - return fmt.Errorf("failed to create dataset tables: %w", err) + return fmt.Errorf("create dataset tables: %w", err) } totalJobsMigrated := 0 @@ -146,7 +146,7 @@ func (jd *Handle) doMigrateDS(ctx context.Context) error { ) noJobsMigrated, err = jd.migrateJobsInTx(ctx, tx, source.ds, destination) if err != nil { - return fmt.Errorf("failed to migrate jobs: %w", err) + return fmt.Errorf("migrate jobs: %w", err) } jd.assert( noJobsMigrated == source.numJobsPending, @@ -168,29 +168,29 @@ func (jd *Handle) doMigrateDS(ctx context.Context) error { return fmt.Errorf("create %v indices: %w", destination, err) } if err = jd.journalMarkDoneInTx(tx, opID); err != nil { - return fmt.Errorf("failed to mark journal done: %w", err) + return fmt.Errorf("mark journal done: %w", err) } jd.logger.Infof("[[ migrateDSLoop ]]: Total migrated %d jobs", totalJobsMigrated) } opPayload, err := json.Marshal(&journalOpPayloadT{From: migrateFromDatasets}) if err != nil { - return fmt.Errorf("failed to marshal journal payload: %w", err) + return fmt.Errorf("marshal journal payload: %w", err) } opID, err := jd.JournalMarkStartInTx(tx, postMigrateDSOperation, opPayload) if err != nil { - return fmt.Errorf("failed to mark journal start: %w", err) + return fmt.Errorf("mark journal start: %w", err) } // acquire an async lock, as this needs to be released after the transaction commits l, lockChan, err = jd.dsListLock.AsyncLockWithCtx(ctx) if err != nil { - return fmt.Errorf("failed to acquire lock: %w", err) + return fmt.Errorf("acquire lock: %w", err) } if err = jd.postMigrateHandleDS(tx, migrateFromDatasets); err != nil { - return fmt.Errorf("failed to post migrate handle ds: %w", err) + return fmt.Errorf("post migrate handle ds: %w", err) } if err = jd.journalMarkDoneInTx(tx, opID); err != nil { - return fmt.Errorf("failed to mark journal done: %w", err) + return fmt.Errorf("mark journal done: %w", err) } return nil }) @@ -200,7 +200,7 @@ func (jd *Handle) doMigrateDS(ctx context.Context) error { defer func() { lockChan <- l }() if err == nil { if err = jd.doRefreshDSRangeList(l); err != nil { - return fmt.Errorf("failed to refresh ds range list: %w", err) + return fmt.Errorf("refresh ds range list: %w", err) } } @@ -506,21 +506,19 @@ func (jd *Handle) migrateJobsInTx(ctx context.Context, tx *Tx, srcDS, destDS dat // find colummn types first - to differentiate between `text`, `bytea` and `jsonb` rows, err := tx.QueryContext( ctx, - fmt.Sprintf( - `select table_name, data_type - from information_schema.columns - where table_name IN ('%[1]s', '%[2]s') and column_name='event_payload';`, - srcDS.JobTable, destDS.JobTable, - ), + `select table_name, data_type + from information_schema.columns + where table_name IN (?, ?) and column_name='event_payload';`, + srcDS.JobTable, destDS.JobTable, ) if err != nil { - return 0, fmt.Errorf("failed to get column types: %w", err) + return 0, fmt.Errorf("get column types: %w", err) } defer rows.Close() var jobsTable, columnType string for rows.Next() { if err = rows.Scan(&jobsTable, &columnType); err != nil { - return 0, fmt.Errorf("failed to scan column types: %w", err) + return 0, fmt.Errorf("scan column types: %w", err) } if columnType != "bytea" && columnType != "jsonb" && columnType != "text" { return 0, fmt.Errorf("unsupported column type %s", columnType) diff --git a/jobsdb/migration_test.go b/jobsdb/migration_test.go index b834fe0a5b..e8c1593224 100644 --- a/jobsdb/migration_test.go +++ b/jobsdb/migration_test.go @@ -404,7 +404,7 @@ func TestMigration(t *testing.T) { ) require.EqualValues(t, 1, jobDB.GetMaxDSIndex()) - jobDB.conf.payloadColumnType = 1 + jobDB.conf.payloadColumnType = "bytea" triggerAddNewDS <- time.Now() // trigger addNewDSLoop to run triggerAddNewDS <- time.Now() // Second time, waits for the first loop to finish require.EqualValues(t, 2, jobDB.GetMaxDSIndex()) @@ -461,7 +461,7 @@ func TestMigration(t *testing.T) { } c.Set("JobsDB.maxDSSize", 100000) - jobDB.conf.payloadColumnType = 2 + jobDB.conf.payloadColumnType = "text" triggerMigrateDS <- time.Now() // trigger migrateDSLoop to run triggerMigrateDS <- time.Now() // waits for last loop to finish @@ -537,7 +537,7 @@ func TestPayloadLiteral(t *testing.T) { byteJD := Handle{ config: c, } - byteJD.conf.payloadColumnType = 1 + byteJD.conf.payloadColumnType = "bytea" require.NoError(t, byteJD.Setup( ReadWrite, true, @@ -548,7 +548,7 @@ func TestPayloadLiteral(t *testing.T) { jsonbJD := Handle{ config: c, } - jsonbJD.conf.payloadColumnType = 0 + jsonbJD.conf.payloadColumnType = "jsonb" require.NoError(t, jsonbJD.Setup( ReadWrite, true, @@ -559,7 +559,7 @@ func TestPayloadLiteral(t *testing.T) { textJD := Handle{ config: c, } - textJD.conf.payloadColumnType = 2 + textJD.conf.payloadColumnType = "text" require.NoError(t, textJD.Setup( ReadWrite, true, From ece96758d412c613902cdb3cba0dff16ba6024e6 Mon Sep 17 00:00:00 2001 From: "siddarth.msv" Date: Sat, 28 Dec 2024 22:42:10 +0530 Subject: [PATCH 13/17] fixup! chore: some review comments work --- jobsdb/integration_test.go | 6 +++--- jobsdb/jobsdb.go | 28 +++++++++++++++------------- jobsdb/migration.go | 2 +- jobsdb/migration_test.go | 16 ++++++++-------- 4 files changed, 27 insertions(+), 25 deletions(-) diff --git a/jobsdb/integration_test.go b/jobsdb/integration_test.go index fabc701b2e..e306aff723 100644 --- a/jobsdb/integration_test.go +++ b/jobsdb/integration_test.go @@ -1027,7 +1027,7 @@ func requireSequential(t *testing.T, jobs []*JobT) { func TestJobsDB_SanitizeJSON(t *testing.T) { _ = startPostgres(t) conf := config.New() - conf.Set("JobsDB.payloadColumnType", 0) + conf.Set("JobsDB.payloadColumnType", "jsonb") jobDB := Handle{config: conf} ch := func(n int) string { return strings.Repeat("�", n) @@ -1114,7 +1114,7 @@ func TestJobsDB_SanitizeJSON(t *testing.T) { } jobDB.TearDown() - conf.Set("JobsDB.payloadColumnType", 2) + conf.Set("JobsDB.payloadColumnType", "text") textDB := &Handle{config: conf} require.NoError(t, textDB.Setup(ReadWrite, true, strings.ToLower(rand.String(5)))) @@ -1197,7 +1197,7 @@ func TestJobsDB_SanitizeJSON(t *testing.T) { } textDB.TearDown() - conf.Set("JobsDB.payloadColumnType", 1) + conf.Set("JobsDB.payloadColumnType", "bytea") byteaDB := &Handle{config: conf} require.NoError(t, byteaDB.Setup(ReadWrite, true, strings.ToLower(rand.String(5)))) diff --git a/jobsdb/jobsdb.go b/jobsdb/jobsdb.go index ddf97c9867..9bb0bf1161 100644 --- a/jobsdb/jobsdb.go +++ b/jobsdb/jobsdb.go @@ -83,9 +83,9 @@ type payloadColumnType string const ( JSONB payloadColumnType = "jsonb" - BYTEA = "bytea" - TEXT = "text" - // JSON // Explore afterwards? + BYTEA payloadColumnType = "bytea" + TEXT payloadColumnType = "text" + // JSON ?? ) // QueryConditions holds jobsdb query conditions @@ -720,13 +720,13 @@ func WithStats(s stats.Stats) OptsFunc { func WithBinaryPayload() OptsFunc { return func(jd *Handle) { - jd.conf.payloadColumnType = "bytea" + jd.conf.payloadColumnType = BYTEA } } func WithTextPayload() OptsFunc { return func(jd *Handle) { - jd.conf.payloadColumnType = "text" + jd.conf.payloadColumnType = TEXT } } @@ -798,8 +798,8 @@ func (jd *Handle) init() { jd.config = config.Default } - if jd.conf.payloadColumnType == "" { - jd.conf.payloadColumnType = payloadColumnType(jd.config.GetStringVar(TEXT, "JobsDB.payloadColumnType")) + if string(jd.conf.payloadColumnType) == "" { + jd.conf.payloadColumnType = payloadColumnType(jd.config.GetStringVar(string(TEXT), "JobsDB.payloadColumnType")) } if jd.stats == nil { @@ -1461,14 +1461,16 @@ func (jd *Handle) createDSInTx(tx *Tx, newDS dataSetT) error { } func (jd *Handle) createDSTablesInTx(ctx context.Context, tx *Tx, newDS dataSetT) error { - var payloadColumnType string + var columnType payloadColumnType switch jd.conf.payloadColumnType { case JSONB: - payloadColumnType = "JSONB" + columnType = JSONB case BYTEA: - payloadColumnType = "BYTEA" - case TEXT: - payloadColumnType = "TEXT" + columnType = BYTEA + // case TEXT: + // columnType = TEXT + default: + columnType = TEXT } if _, err := tx.ExecContext(ctx, fmt.Sprintf(`CREATE TABLE %q ( job_id BIGSERIAL PRIMARY KEY, @@ -1477,7 +1479,7 @@ func (jd *Handle) createDSTablesInTx(ctx context.Context, tx *Tx, newDS dataSetT user_id TEXT NOT NULL, parameters JSONB NOT NULL, custom_val VARCHAR(64) NOT NULL, - event_payload `+payloadColumnType+` NOT NULL, + event_payload `+string(columnType)+` NOT NULL, event_count INTEGER NOT NULL DEFAULT 1, created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), expire_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW());`, newDS.JobTable)); err != nil { diff --git a/jobsdb/migration.go b/jobsdb/migration.go index 1e4e355ade..7fced243d6 100644 --- a/jobsdb/migration.go +++ b/jobsdb/migration.go @@ -508,7 +508,7 @@ func (jd *Handle) migrateJobsInTx(ctx context.Context, tx *Tx, srcDS, destDS dat ctx, `select table_name, data_type from information_schema.columns - where table_name IN (?, ?) and column_name='event_payload';`, + where table_name IN ($1, $2) and column_name='event_payload';`, srcDS.JobTable, destDS.JobTable, ) if err != nil { diff --git a/jobsdb/migration_test.go b/jobsdb/migration_test.go index e8c1593224..e39ff6b9d3 100644 --- a/jobsdb/migration_test.go +++ b/jobsdb/migration_test.go @@ -537,33 +537,33 @@ func TestPayloadLiteral(t *testing.T) { byteJD := Handle{ config: c, } - byteJD.conf.payloadColumnType = "bytea" + byteJD.conf.payloadColumnType = BYTEA require.NoError(t, byteJD.Setup( ReadWrite, true, - "bytea", + string(BYTEA), )) defer byteJD.TearDown() jsonbJD := Handle{ config: c, } - jsonbJD.conf.payloadColumnType = "jsonb" + jsonbJD.conf.payloadColumnType = JSONB require.NoError(t, jsonbJD.Setup( ReadWrite, true, - "jsonb", + string(JSONB), )) defer jsonbJD.TearDown() textJD := Handle{ config: c, } - textJD.conf.payloadColumnType = "text" + textJD.conf.payloadColumnType = TEXT require.NoError(t, textJD.Setup( ReadWrite, true, - "text", + string(TEXT), )) defer textJD.TearDown() @@ -573,7 +573,7 @@ func TestPayloadLiteral(t *testing.T) { require.NoError(t, textJD.Store(ctx, jobs)) require.NoError(t, jsonbJD.Store(ctx, jobs)) - prefixes := []string{"text", "jsonb", "bytea"} + prefixes := []string{string(TEXT), string(JSONB), string(BYTEA)} for i := range prefixes { _, err := db.ExecContext(ctx, fmt.Sprintf(`ALTER TABLE %[1]s_job_status_1 DROP CONSTRAINT fk_%[1]s_job_status_1_job_id`, prefixes[i])) require.NoError(t, err) @@ -603,7 +603,7 @@ func TestPayloadLiteral(t *testing.T) { Index: "1", }, ) - require.NoError(t, err) + require.NoError(t, err, src, dest) } } require.NoError(t, txn.Commit()) From a03976d37a81d0e2f76e8c17239383357dfeff17 Mon Sep 17 00:00:00 2001 From: "siddarth.msv" Date: Mon, 30 Dec 2024 14:13:54 +0530 Subject: [PATCH 14/17] chore: table tests for jobsdb sanitizeJson - pending fuzz --- jobsdb/integration_test.go | 417 +++++++++++++++---------------------- 1 file changed, 171 insertions(+), 246 deletions(-) diff --git a/jobsdb/integration_test.go b/jobsdb/integration_test.go index e306aff723..7db19c8b67 100644 --- a/jobsdb/integration_test.go +++ b/jobsdb/integration_test.go @@ -1024,262 +1024,187 @@ func requireSequential(t *testing.T, jobs []*JobT) { } } -func TestJobsDB_SanitizeJSON(t *testing.T) { - _ = startPostgres(t) - conf := config.New() - conf.Set("JobsDB.payloadColumnType", "jsonb") - jobDB := Handle{config: conf} +func TestJobsdbSanitizeJSON(t *testing.T) { + type testCase struct { + in, out string + err error + } ch := func(n int) string { return strings.Repeat("�", n) } - toValidUTF8Tests := []struct { - in string - out string - err error - }{ - {`\u0000`, "", nil}, - {`\u0000☺\u0000b☺`, "☺b☺", nil}, - // NOTE: we are not handling the following: - // {"\u0000", ""}, - // {"\u0000☺\u0000b☺", "☺b☺"}, - - {"", "", nil}, - {"abc", "abc", nil}, - {"\uFDDD", "\uFDDD", nil}, - {"a\xffb", "a" + ch(1) + "b", nil}, - {"a\xffb\uFFFD", "a" + ch(1) + "b\uFFFD", nil}, - {"a☺\xffb☺\xC0\xAFc☺\xff", "a☺" + ch(1) + "b☺" + ch(2) + "c☺" + ch(1), nil}, - {"\xC0\xAF", ch(2), nil}, - {"\xE0\x80\xAF", ch(3), nil}, - {"\xed\xa0\x80", ch(3), nil}, - {"\xed\xbf\xbf", ch(3), nil}, - {"\xF0\x80\x80\xaf", ch(4), nil}, - {"\xF8\x80\x80\x80\xAF", ch(5), nil}, - {"\xFC\x80\x80\x80\x80\xAF", ch(6), nil}, - - // {"\ud800", ""}, - {`\ud800`, ch(1), nil}, - {`\uDEAD`, ch(1), nil}, - - {`\uD83D\ub000`, string([]byte{239, 191, 189, 235, 128, 128}), nil}, - {`\uD83D\ude04`, "😄", nil}, - - {`\u4e2d\u6587`, "中文", nil}, - {`\ud83d\udc4a`, "\xf0\x9f\x91\x8a", nil}, - - {`\U0001f64f`, ch(1), errors.New(`readEscapedChar: invalid escape char after`)}, - {`\uD83D\u00`, ch(1), errors.New(`readU4: expects 0~9 or a~f, but found`)}, - } - - err := jobDB.Setup(ReadWrite, false, strings.ToLower(rand.String(5))) - require.NoError(t, err) - - eventPayload := []byte(`{"batch":[{"anonymousId":"anon_id","sentAt":"2019-08-12T05:08:30.909Z","type":"track"}]}`) - for i, tt := range toValidUTF8Tests { - - customVal := fmt.Sprintf("TEST_%d", i) - - jobs := []*JobT{{ - Parameters: []byte(`{"batch_id":1,"source_id":"sourceID","source_job_run_id":""}`), - EventPayload: bytes.Replace(eventPayload, []byte("track"), []byte(tt.in), 1), - UserID: uuid.New().String(), - UUID: uuid.New(), - CustomVal: customVal, - WorkspaceId: defaultWorkspaceID, - EventCount: 1, - }} - - err := jobDB.Store(context.Background(), jobs) - if tt.err != nil { - require.Error(t, err, "should error") - require.Contains(t, err.Error(), tt.err.Error(), "should contain error") - continue - } - - require.NoError(t, err) - - unprocessedJob, err := jobDB.GetUnprocessed(context.Background(), GetQueryParams{ - CustomValFilters: []string{customVal}, - JobsLimit: 10, - ParameterFilters: []ParameterFilterT{}, - }) - require.NoError(t, err, "should not error") - - require.Len(t, unprocessedJob.Jobs, 1) - - require.JSONEq(t, - string(bytes.Replace(eventPayload, []byte("track"), []byte(tt.out), 1)), - string(unprocessedJob.Jobs[0].EventPayload), - ) - } - jobDB.TearDown() - - conf.Set("JobsDB.payloadColumnType", "text") - textDB := &Handle{config: conf} - require.NoError(t, textDB.Setup(ReadWrite, true, strings.ToLower(rand.String(5)))) - - toValidUTF8TestsForText := []struct { - in string - out string - err error - }{ - {`\u0000`, `\u0000`, nil}, - {`\u0000☺\u0000b☺`, `\u0000☺\u0000b☺`, nil}, - // NOTE: we are not handling the following: - // {"\u0000", ""}, - // {"\u0000☺\u0000b☺", "☺b☺"}, - - {"", "", nil}, - {"abc", "abc", nil}, - {"\uFDDD", "\uFDDD", nil}, - {"a\xffb", `a\ufffdb`, nil}, - {"a\xffb\uFFFD", `a\ufffdb�`, nil}, - {"a☺\xffb☺\xC0\xAFc☺\xff", `a☺\ufffdb☺\ufffd\ufffdc☺\ufffd`, nil}, - {"\xC0\xAF", `\ufffd\ufffd`, nil}, - {"\xE0\x80\xAF", `\ufffd\ufffd\ufffd`, nil}, - {"\xed\xa0\x80", `\ufffd\ufffd\ufffd`, nil}, - {"\xed\xbf\xbf", `\ufffd\ufffd\ufffd`, nil}, - {"\xF0\x80\x80\xaf", `\ufffd\ufffd\ufffd\ufffd`, nil}, - {"\xF8\x80\x80\x80\xAF", `\ufffd\ufffd\ufffd\ufffd\ufffd`, nil}, - {"\xFC\x80\x80\x80\x80\xAF", `\ufffd\ufffd\ufffd\ufffd\ufffd\ufffd`, nil}, - - // {"\ud800", ""}, - // 15 - {`\ud800`, `\ud800`, nil}, - {`\uDEAD`, `\uDEAD`, nil}, - - {`\uD83D\ub000`, `\uD83D\ub000`, nil}, - {`\uD83D\ude04`, `\uD83D\ude04`, nil}, - - {`\u4e2d\u6587`, `\u4e2d\u6587`, nil}, - {`\ud83d\udc4a`, `\ud83d\udc4a`, nil}, - - // 21 - {`\U0001f64f`, ch(1), errors.New(`readEscapedChar: invalid escape char after`)}, - {`\uD83D\u00`, ch(1), errors.New(`readU4: expects 0~9 or a~f, but found`)}, - } - for i, tt := range toValidUTF8TestsForText { - - customVal := fmt.Sprintf("TEST_%d", i) - - jobs := []*JobT{{ - Parameters: []byte(`{"batch_id":1,"source_id":"sourceID","source_job_run_id":""}`), - EventPayload: bytes.Replace(eventPayload, []byte("track"), []byte(tt.in), 1), - UserID: uuid.New().String(), - UUID: uuid.New(), - CustomVal: customVal, - WorkspaceId: defaultWorkspaceID, - EventCount: 1, - }} - - err := textDB.Store(context.Background(), jobs) - if tt.err != nil { - require.NoError(t, err, "text column should never error", i) - continue - } - - require.NoError(t, err) - - unprocessedJob, err := textDB.GetUnprocessed(context.Background(), GetQueryParams{ - CustomValFilters: []string{customVal}, - JobsLimit: 10, - ParameterFilters: []ParameterFilterT{}, - }) - require.NoError(t, err, "should not error") - - require.Len(t, unprocessedJob.Jobs, 1) - - require.Equal(t, - string(bytes.Replace(eventPayload, []byte("track"), []byte(tt.out), 1)), - string(unprocessedJob.Jobs[0].EventPayload), - "testCase", i, - ) - } - textDB.TearDown() - - conf.Set("JobsDB.payloadColumnType", "bytea") - byteaDB := &Handle{config: conf} - require.NoError(t, byteaDB.Setup(ReadWrite, true, strings.ToLower(rand.String(5)))) - byteaInvalidInputSyntaxError := errors.New("pq: invalid input syntax for type bytea") - toValidUTF8TestsForBytea := []struct { - in string - out string - err error + UTF8Tests := []struct { + payloadColumnType string + cases []testCase }{ - {`\u0000`, "", nil}, - {`\u0000☺\u0000b☺`, "☺b☺", nil}, - // NOTE: we are not handling the following: - // {"\u0000", ""}, - // {"\u0000☺\u0000b☺", "☺b☺"}, - - {"", "", nil}, - {"abc", "abc", nil}, - {"\uFDDD", "\uFDDD", nil}, - {"a\xffb", "a" + ch(1) + "b", byteaInvalidInputSyntaxError}, - {"a\xffb\uFFFD", "a" + ch(1) + "b\uFFFD", byteaInvalidInputSyntaxError}, - {"a☺\xffb☺\xC0\xAFc☺\xff", "a☺" + ch(1) + "b☺" + ch(2) + "c☺" + ch(1), byteaInvalidInputSyntaxError}, - {"\xC0\xAF", ch(2), byteaInvalidInputSyntaxError}, - {"\xE0\x80\xAF", ch(3), byteaInvalidInputSyntaxError}, - {"\xed\xa0\x80", ch(3), byteaInvalidInputSyntaxError}, - {"\xed\xbf\xbf", ch(3), byteaInvalidInputSyntaxError}, - {"\xF0\x80\x80\xaf", ch(4), byteaInvalidInputSyntaxError}, - {"\xF8\x80\x80\x80\xAF", ch(5), byteaInvalidInputSyntaxError}, - {"\xFC\x80\x80\x80\x80\xAF", ch(6), byteaInvalidInputSyntaxError}, - - // {"\ud800", ""}, - // 15 - {`\ud800`, ch(1), nil}, - {`\uDEAD`, ch(1), nil}, - - {`\uD83D\ub000`, string([]byte{239, 191, 189, 235, 128, 128}), nil}, - {`\uD83D\ude04`, "😄", nil}, - - {`\u4e2d\u6587`, "中文", nil}, - {`\ud83d\udc4a`, "\xf0\x9f\x91\x8a", nil}, - - {`\U0001f64f`, ch(1), errors.New(`readEscapedChar: invalid escape char after`)}, - {`\uD83D\u00`, ch(1), errors.New(`readU4: expects 0~9 or a~f, but found`)}, + { + string(JSONB), + []testCase{ + {`\u0000`, "", nil}, + {`\u0000☺\u0000b☺`, "☺b☺", nil}, + // NOTE: we are not handling the following: + // {"\u0000", ""}, + // {"\u0000☺\u0000b☺", "☺b☺"}, + + {"", "", nil}, + {"abc", "abc", nil}, + {"\uFDDD", "\uFDDD", nil}, + {"a\xffb", "a" + ch(1) + "b", nil}, + {"a\xffb\uFFFD", "a" + ch(1) + "b\uFFFD", nil}, + {"a☺\xffb☺\xC0\xAFc☺\xff", "a☺" + ch(1) + "b☺" + ch(2) + "c☺" + ch(1), nil}, + {"\xC0\xAF", ch(2), nil}, + {"\xE0\x80\xAF", ch(3), nil}, + {"\xed\xa0\x80", ch(3), nil}, + {"\xed\xbf\xbf", ch(3), nil}, + {"\xF0\x80\x80\xaf", ch(4), nil}, + {"\xF8\x80\x80\x80\xAF", ch(5), nil}, + {"\xFC\x80\x80\x80\x80\xAF", ch(6), nil}, + + // {"\ud800", ""}, + {`\ud800`, ch(1), nil}, + {`\uDEAD`, ch(1), nil}, + + {`\uD83D\ub000`, string([]byte{239, 191, 189, 235, 128, 128}), nil}, + {`\uD83D\ude04`, "😄", nil}, + + {`\u4e2d\u6587`, "中文", nil}, + {`\ud83d\udc4a`, "\xf0\x9f\x91\x8a", nil}, + + {`\U0001f64f`, ch(1), errors.New(`readEscapedChar: invalid escape char after`)}, + {`\uD83D\u00`, ch(1), errors.New(`readU4: expects 0~9 or a~f, but found`)}, + }, + }, + { + string(TEXT), + []testCase{ + {`\u0000`, `\u0000`, nil}, + {`\u0000☺\u0000b☺`, `\u0000☺\u0000b☺`, nil}, + + {"", "", nil}, + {"abc", "abc", nil}, + {"\uFDDD", "\uFDDD", nil}, + {"a\xffb", `a\ufffdb`, nil}, + {"a\xffb\uFFFD", `a\ufffdb�`, nil}, + {"a☺\xffb☺\xC0\xAFc☺\xff", `a☺\ufffdb☺\ufffd\ufffdc☺\ufffd`, nil}, + {"\xC0\xAF", `\ufffd\ufffd`, nil}, + {"\xE0\x80\xAF", `\ufffd\ufffd\ufffd`, nil}, + {"\xed\xa0\x80", `\ufffd\ufffd\ufffd`, nil}, + {"\xed\xbf\xbf", `\ufffd\ufffd\ufffd`, nil}, + {"\xF0\x80\x80\xaf", `\ufffd\ufffd\ufffd\ufffd`, nil}, + {"\xF8\x80\x80\x80\xAF", `\ufffd\ufffd\ufffd\ufffd\ufffd`, nil}, + {"\xFC\x80\x80\x80\x80\xAF", `\ufffd\ufffd\ufffd\ufffd\ufffd\ufffd`, nil}, + + // {"\ud800", ""}, + // 15 + {`\ud800`, `\ud800`, nil}, + {`\uDEAD`, `\uDEAD`, nil}, + + {`\uD83D\ub000`, `\uD83D\ub000`, nil}, + {`\uD83D\ude04`, `\uD83D\ude04`, nil}, + + {`\u4e2d\u6587`, `\u4e2d\u6587`, nil}, + {`\ud83d\udc4a`, `\ud83d\udc4a`, nil}, + + // 21 + {`\U0001f64f`, `\U0001f64f`, nil}, + {`\uD83D\u00`, `\uD83D\u00`, nil}, + }, + }, + { + string(BYTEA), + []testCase{ + {`\u0000`, "", nil}, + {`\u0000☺\u0000b☺`, "☺b☺", nil}, + // NOTE: we are not handling the following: + // {"\u0000", ""}, + // {"\u0000☺\u0000b☺", "☺b☺"}, + + {"", "", nil}, + {"abc", "abc", nil}, + {"\uFDDD", "\uFDDD", nil}, + {"a\xffb", "a" + ch(1) + "b", byteaInvalidInputSyntaxError}, + {"a\xffb\uFFFD", "a" + ch(1) + "b\uFFFD", byteaInvalidInputSyntaxError}, + {"a☺\xffb☺\xC0\xAFc☺\xff", "a☺" + ch(1) + "b☺" + ch(2) + "c☺" + ch(1), byteaInvalidInputSyntaxError}, + {"\xC0\xAF", ch(2), byteaInvalidInputSyntaxError}, + {"\xE0\x80\xAF", ch(3), byteaInvalidInputSyntaxError}, + {"\xed\xa0\x80", ch(3), byteaInvalidInputSyntaxError}, + {"\xed\xbf\xbf", ch(3), byteaInvalidInputSyntaxError}, + {"\xF0\x80\x80\xaf", ch(4), byteaInvalidInputSyntaxError}, + {"\xF8\x80\x80\x80\xAF", ch(5), byteaInvalidInputSyntaxError}, + {"\xFC\x80\x80\x80\x80\xAF", ch(6), byteaInvalidInputSyntaxError}, + + // {"\ud800", ""}, + // 15 + {`\ud800`, ch(1), nil}, + {`\uDEAD`, ch(1), nil}, + + {`\uD83D\ub000`, string([]byte{239, 191, 189, 235, 128, 128}), nil}, + {`\uD83D\ude04`, "😄", nil}, + + {`\u4e2d\u6587`, "中文", nil}, + {`\ud83d\udc4a`, "\xf0\x9f\x91\x8a", nil}, + + {`\U0001f64f`, ch(1), errors.New(`readEscapedChar: invalid escape char after`)}, + {`\uD83D\u00`, ch(1), errors.New(`readU4: expects 0~9 or a~f, but found`)}, + }, + }, } - for i, tt := range toValidUTF8TestsForBytea { - - customVal := fmt.Sprintf("TEST_%d", i) - jobs := []*JobT{{ - Parameters: []byte(`{"batch_id":1,"source_id":"sourceID","source_job_run_id":""}`), - EventPayload: bytes.Replace(eventPayload, []byte("track"), []byte(tt.in), 1), - UserID: uuid.New().String(), - UUID: uuid.New(), - CustomVal: customVal, - WorkspaceId: defaultWorkspaceID, - EventCount: 1, - }} - - err := byteaDB.Store(context.Background(), jobs) - if tt.err != nil { - require.Error(t, err, "should error", i) - require.Contains(t, err.Error(), tt.err.Error(), "should contain error", i) - continue - } - - require.NoError(t, err, i) + for _, tCase := range UTF8Tests { + t.Run(tCase.payloadColumnType, func(t *testing.T) { + _ = startPostgres(t) + conf := config.New() + conf.Set("JobsDB.payloadColumnType", tCase.payloadColumnType) + jobDB := Handle{config: conf} + err := jobDB.Setup(ReadWrite, true, tCase.payloadColumnType+"_"+strings.ToLower(rand.String(5))) + require.NoError(t, err, tCase.payloadColumnType) + eventPayload := []byte(`{"batch":[{"anonymousId":"anon_id","sentAt":"2019-08-12T05:08:30.909Z","type":"track"}]}`) + for i, tt := range tCase.cases { + customVal := fmt.Sprintf("TEST_%d", i) + jobs := []*JobT{{ + Parameters: []byte(`{"batch_id":1,"source_id":"sourceID","source_job_run_id":""}`), + EventPayload: bytes.Replace(eventPayload, []byte("track"), []byte(tt.in), 1), + UserID: uuid.New().String(), + UUID: uuid.New(), + CustomVal: customVal, + WorkspaceId: defaultWorkspaceID, + EventCount: 1, + }} + err := jobDB.Store(context.Background(), jobs) + if tt.err != nil { + require.Error(t, err, "should error", tCase.payloadColumnType, i) + require.Contains(t, err.Error(), tt.err.Error(), "should contain error", tCase.payloadColumnType, i) + continue + } - unprocessedJob, err := byteaDB.GetUnprocessed(context.Background(), GetQueryParams{ - CustomValFilters: []string{customVal}, - JobsLimit: 10, - ParameterFilters: []ParameterFilterT{}, + require.NoError(t, err, tCase.payloadColumnType, i) + + unprocessedJob, err := jobDB.GetUnprocessed(context.Background(), GetQueryParams{ + CustomValFilters: []string{customVal}, + JobsLimit: 10, + ParameterFilters: []ParameterFilterT{}, + }) + require.NoError(t, err, "should not error") + + require.Len(t, unprocessedJob.Jobs, 1, tCase.payloadColumnType, i) + + if tCase.payloadColumnType == string(TEXT) { // some can't be valid json + require.Equal(t, + string(bytes.Replace(eventPayload, []byte("track"), []byte(tt.out), 1)), + string(unprocessedJob.Jobs[0].EventPayload), + tCase.payloadColumnType, i, + ) + } else { + require.JSONEq(t, + string(bytes.Replace(eventPayload, []byte("track"), []byte(tt.out), 1)), + string(unprocessedJob.Jobs[0].EventPayload), + tCase.payloadColumnType, i, + ) + } + } + jobDB.TearDown() }) - require.NoError(t, err, "should not error", i) - - require.Len(t, unprocessedJob.Jobs, 1) - - require.JSONEq(t, - string(bytes.Replace(eventPayload, []byte("track"), []byte(tt.out), 1)), - string(unprocessedJob.Jobs[0].EventPayload), - i, - ) } - byteaDB.TearDown() } // BenchmarkJobsdb takes time... keep waiting From df7901aa20c75cf7e1acd6e8f07d633de9ed10b4 Mon Sep 17 00:00:00 2001 From: "siddarth.msv" Date: Fri, 3 Jan 2025 16:16:00 +0530 Subject: [PATCH 15/17] chore: set default to jsonb --- jobsdb/jobsdb.go | 2 +- .../000014_event_payload_column_type.up.sql | 35 +------------------ 2 files changed, 2 insertions(+), 35 deletions(-) diff --git a/jobsdb/jobsdb.go b/jobsdb/jobsdb.go index c540f878f8..6210df537e 100644 --- a/jobsdb/jobsdb.go +++ b/jobsdb/jobsdb.go @@ -793,7 +793,7 @@ func (jd *Handle) init() { } if string(jd.conf.payloadColumnType) == "" { - jd.conf.payloadColumnType = payloadColumnType(jd.config.GetStringVar(string(TEXT), "JobsDB.payloadColumnType")) + jd.conf.payloadColumnType = payloadColumnType(jd.config.GetStringVar(string(JSONB), "JobsDB.payloadColumnType")) } if jd.stats == nil { diff --git a/sql/migrations/node/000014_event_payload_column_type.up.sql b/sql/migrations/node/000014_event_payload_column_type.up.sql index d43a6fb139..3fd57cbff1 100644 --- a/sql/migrations/node/000014_event_payload_column_type.up.sql +++ b/sql/migrations/node/000014_event_payload_column_type.up.sql @@ -1,38 +1,5 @@ -DROP FUNCTION IF EXISTS payloadColumnType(TEXT); -DROP FUNCTION IF EXISTS payloadColumnConvertToText(TEXT); DROP FUNCTION IF EXISTS unionjobsdb(text,integer); --- returns payload column tyoe of the jobs table -CREATE OR REPLACE FUNCTION payloadColumnType(tableName TEXT) -RETURNS TEXT -AS $$ -BEGIN -RETURN(SELECT data_type FROM information_schema.columns WHERE table_name = tableName and column_name='event_payload' LIMIT 1); -END; -$$ LANGUAGE plpgsql; - --- return payload column type conversion literal -CREATE OR REPLACE FUNCTION payloadColumnConvertToText(columnType TEXT) -RETURNS TEXT -AS $$ -DECLARE - ret TEXT; -BEGIN -CASE - WHEN columnType = 'text' THEN - ret = 'event_payload'; - WHEN columnType = 'jsonb' THEN - ret = 'event_payload::TEXT'; - WHEN columnType = 'bytea' THEN - ret = 'convert_from(event_payload, "UTF8")'; - ELSE - ret = 'invalid'; -END CASE; -RETURN ret; -END -$$ LANGUAGE plpgsql; - - -- change function return table's payload type CREATE OR REPLACE FUNCTION unionjobsdb(prefix text, num int) RETURNS table ( @@ -62,7 +29,7 @@ SELECT string_agg( format('SELECT %1$L, j.job_id, j.workspace_id, j.uuid, j.user_id, j.parameters, j.custom_val, (j.event_payload::TEXT), j.event_count, j.created_at, j.expire_at, latest_status.id, latest_status.job_state, latest_status.attempt, latest_status.exec_time, latest_status.error_code, latest_status.error_response FROM %1$I j LEFT JOIN %2$I latest_status on latest_status.job_id = j.job_id', alltables.table_name, 'v_last_' || prefix || '_job_status_'|| substring(alltables.table_name, char_length(prefix)+7,30)), ' UNION ') INTO qry FROM (select table_name from information_schema.tables -WHERE table_name LIKE prefix || '_jobs_%' order by table_name asc LIMIT num) alltables; +WHERE table_name LIKE prefix || '_jobs_%' order by split_part(split_part(table_name, '_jobs_', 2), '_', 1)::integer asc LIMIT num) alltables; RETURN QUERY EXECUTE qry; END; $$ LANGUAGE plpgsql; \ No newline at end of file From 3bc24b41609f62765e3a33e425f580f27ca96cb1 Mon Sep 17 00:00:00 2001 From: Rohith BCS Date: Tue, 7 Jan 2025 15:27:10 +0530 Subject: [PATCH 16/17] chore: clean up code --- jobsdb/jobsdb.go | 44 ++++++++---------------- jobsdb/migration.go | 72 ++++++++++++++++++---------------------- jobsdb/migration_test.go | 37 +++++++++++++++++++++ 3 files changed, 83 insertions(+), 70 deletions(-) diff --git a/jobsdb/jobsdb.go b/jobsdb/jobsdb.go index 6210df537e..a82fe6a0cd 100644 --- a/jobsdb/jobsdb.go +++ b/jobsdb/jobsdb.go @@ -36,31 +36,26 @@ import ( "time" "unicode/utf8" - "golang.org/x/sync/errgroup" - "golang.org/x/sync/singleflight" - + "github.com/google/uuid" jsoniter "github.com/json-iterator/go" + "github.com/lib/pq" "github.com/samber/lo" "github.com/tidwall/gjson" + "golang.org/x/sync/errgroup" + "golang.org/x/sync/singleflight" "github.com/rudderlabs/rudder-go-kit/bytesize" - - "github.com/rudderlabs/rudder-go-kit/logger" - - "github.com/rudderlabs/rudder-server/jobsdb/internal/cache" - "github.com/rudderlabs/rudder-server/jobsdb/internal/lock" - "github.com/rudderlabs/rudder-server/utils/crash" - . "github.com/rudderlabs/rudder-server/utils/tx" //nolint:staticcheck - "github.com/rudderlabs/rudder-go-kit/config" + "github.com/rudderlabs/rudder-go-kit/logger" "github.com/rudderlabs/rudder-go-kit/stats" "github.com/rudderlabs/rudder-go-kit/stats/collectors" + "github.com/rudderlabs/rudder-server/jobsdb/internal/cache" + "github.com/rudderlabs/rudder-server/jobsdb/internal/lock" "github.com/rudderlabs/rudder-server/services/rmetrics" + "github.com/rudderlabs/rudder-server/utils/crash" "github.com/rudderlabs/rudder-server/utils/misc" - - "github.com/google/uuid" - "github.com/lib/pq" + . "github.com/rudderlabs/rudder-server/utils/tx" //nolint:staticcheck ) var ( @@ -79,7 +74,6 @@ const ( JSONB payloadColumnType = "jsonb" BYTEA payloadColumnType = "bytea" TEXT payloadColumnType = "text" - // JSON ?? ) // QueryConditions holds jobsdb query conditions @@ -681,7 +675,7 @@ func init() { type OptsFunc func(jd *Handle) -// WithClearDB, if set to true it will remove all existing tables +// WithClearDB if set to true it will remove all existing tables func WithClearDB(clearDB bool) OptsFunc { return func(jd *Handle) { jd.conf.clearAll = clearDB @@ -712,18 +706,6 @@ func WithStats(s stats.Stats) OptsFunc { } } -func WithBinaryPayload() OptsFunc { - return func(jd *Handle) { - jd.conf.payloadColumnType = BYTEA - } -} - -func WithTextPayload() OptsFunc { - return func(jd *Handle) { - jd.conf.payloadColumnType = TEXT - } -} - func WithSkipMaintenanceErr(ignore bool) OptsFunc { return func(jd *Handle) { jd.conf.skipMaintenanceError = ignore @@ -1461,10 +1443,10 @@ func (jd *Handle) createDSTablesInTx(ctx context.Context, tx *Tx, newDS dataSetT columnType = JSONB case BYTEA: columnType = BYTEA - // case TEXT: - // columnType = TEXT - default: + case TEXT: columnType = TEXT + default: + columnType = JSONB } if _, err := tx.ExecContext(ctx, fmt.Sprintf(`CREATE TABLE %q ( job_id BIGSERIAL PRIMARY KEY, diff --git a/jobsdb/migration.go b/jobsdb/migration.go index 7fced243d6..685ce60f75 100644 --- a/jobsdb/migration.go +++ b/jobsdb/migration.go @@ -459,41 +459,32 @@ func (jd *Handle) getMigrationList(dsList []dataSetT) (migrateFrom []dsWithPendi return } -func getColumnConversion(srcType, destType string) string { +func getColumnConversion(srcType, destType string) (string, error) { if srcType == destType { - return "j.event_payload" - } - switch srcType { - case "jsonb": - switch destType { - case "text": - return "j.event_payload::TEXT" - case "bytea": - return "convert_to(j.event_payload::TEXT, 'UTF8')" - default: - panic(fmt.Sprintf("unsupported payload column types: src-%s, dest-%s", srcType, destType)) + return "j.event_payload", nil + } + + conversions := map[string]map[string]string{ + "jsonb": { + "text": "j.event_payload::TEXT", + "bytea": "convert_to(j.event_payload::TEXT, 'UTF8')", + }, + "bytea": { + "text": "convert_from(j.event_payload, 'UTF8')", + "jsonb": "convert_from(j.event_payload, 'UTF8')::jsonb", + }, + "text": { + "jsonb": "j.event_payload::jsonb", + "bytea": "convert_to(j.event_payload, 'UTF8')", + }, + } + var result string + if destMap, ok := conversions[srcType]; ok { + if result, ok = destMap[destType]; !ok { + return "", fmt.Errorf("unsupported payload column types: src-%s, dest-%s", srcType, destType) } - case "bytea": - switch destType { - case "text": - return "convert_from(j.event_payload, 'UTF8')" - case "jsonb": - return "convert_from(j.event_payload, 'UTF8')::jsonb" - default: - panic(fmt.Sprintf("unsupported payload column types: src-%s, dest-%s", srcType, destType)) - } - case "text": - switch destType { - case "jsonb": - return "j.event_payload::jsonb" - case "bytea": - return "convert_to(j.event_payload, 'UTF8')" - default: - panic(fmt.Sprintf("unsupported payload column types: src-%s, dest-%s", srcType, destType)) - } - default: - panic(fmt.Sprintf("unsupported payload column types: src-%s, dest-%s", srcType, destType)) } + return result, nil } func (jd *Handle) migrateJobsInTx(ctx context.Context, tx *Tx, srcDS, destDS dataSetT) (int, error) { @@ -502,8 +493,8 @@ func (jd *Handle) migrateJobsInTx(ctx context.Context, tx *Tx, srcDS, destDS dat &statTags{CustomValFilters: []string{jd.tablePrefix}}, ).RecordDuration()() - columnTypeMap := map[string]string{srcDS.JobTable: "jsonb", destDS.JobTable: "jsonb"} - // find colummn types first - to differentiate between `text`, `bytea` and `jsonb` + columnTypeMap := map[string]string{srcDS.JobTable: string(JSONB), destDS.JobTable: string(JSONB)} + // find column types first - to differentiate between `text`, `bytea` and `jsonb` rows, err := tx.QueryContext( ctx, `select table_name, data_type @@ -514,13 +505,13 @@ func (jd *Handle) migrateJobsInTx(ctx context.Context, tx *Tx, srcDS, destDS dat if err != nil { return 0, fmt.Errorf("get column types: %w", err) } - defer rows.Close() + defer func() { _ = rows.Close() }() var jobsTable, columnType string for rows.Next() { if err = rows.Scan(&jobsTable, &columnType); err != nil { return 0, fmt.Errorf("scan column types: %w", err) } - if columnType != "bytea" && columnType != "jsonb" && columnType != "text" { + if columnType != string(BYTEA) && columnType != string(JSONB) && columnType != string(TEXT) { return 0, fmt.Errorf("unsupported column type %s", columnType) } columnTypeMap[jobsTable] = columnType @@ -528,7 +519,10 @@ func (jd *Handle) migrateJobsInTx(ctx context.Context, tx *Tx, srcDS, destDS dat if err = rows.Err(); err != nil { return 0, fmt.Errorf("rows.Err() on column types: %w", err) } - payloadLiteral := getColumnConversion(columnTypeMap[srcDS.JobTable], columnTypeMap[destDS.JobTable]) + payloadLiteral, err := getColumnConversion(columnTypeMap[srcDS.JobTable], columnTypeMap[destDS.JobTable]) + if err != nil { + return 0, err + } compactDSQuery := fmt.Sprintf( `with last_status as (select * from "v_last_%[1]s"), @@ -552,7 +546,7 @@ func (jd *Handle) migrateJobsInTx(ctx context.Context, tx *Tx, srcDS, destDS dat payloadLiteral, ) - var numJobsMigrated int64 + var numJobsMigrated int if err := tx.QueryRowContext( ctx, compactDSQuery, @@ -562,7 +556,7 @@ func (jd *Handle) migrateJobsInTx(ctx context.Context, tx *Tx, srcDS, destDS dat if _, err := tx.Exec(fmt.Sprintf(`ANALYZE %q, %q`, destDS.JobTable, destDS.JobStatusTable)); err != nil { return 0, err } - return int(numJobsMigrated), nil + return numJobsMigrated, nil } func (jd *Handle) computeNewIdxForIntraNodeMigration(l lock.LockToken, insertBeforeDS dataSetT) (string, error) { // Within the node diff --git a/jobsdb/migration_test.go b/jobsdb/migration_test.go index e39ff6b9d3..f97ddcf022 100644 --- a/jobsdb/migration_test.go +++ b/jobsdb/migration_test.go @@ -647,3 +647,40 @@ func TestPayloadLiteral(t *testing.T) { ) } } + +func Test_GetColumnConversion(t *testing.T) { + t.Run("bytea to text", func(t *testing.T) { + res, err := getColumnConversion(string(BYTEA), string(TEXT)) + require.NoError(t, err) + require.Equal(t, `convert_from(j.event_payload, 'UTF8')`, res) + }) + t.Run("bytea to jsonb", func(t *testing.T) { + res, err := getColumnConversion(string(BYTEA), string(JSONB)) + require.NoError(t, err) + require.Equal(t, `convert_from(j.event_payload, 'UTF8')::jsonb`, res) + }) + t.Run("text to bytea", func(t *testing.T) { + res, err := getColumnConversion(string(TEXT), string(BYTEA)) + require.NoError(t, err) + require.Equal(t, `convert_to(j.event_payload, 'UTF8')`, res) + }) + t.Run("text to jsonb", func(t *testing.T) { + res, err := getColumnConversion(string(TEXT), string(JSONB)) + require.NoError(t, err) + require.Equal(t, `j.event_payload::jsonb`, res) + }) + t.Run("jsonb to bytea", func(t *testing.T) { + res, err := getColumnConversion(string(JSONB), string(BYTEA)) + require.NoError(t, err) + require.Equal(t, `convert_to(j.event_payload::TEXT, 'UTF8')`, res) + }) + t.Run("jsonb to text", func(t *testing.T) { + res, err := getColumnConversion(string(JSONB), string(TEXT)) + require.NoError(t, err) + require.Equal(t, `j.event_payload::TEXT`, res) + }) + t.Run("invalid conversion", func(t *testing.T) { + _, err := getColumnConversion(string(JSONB), "random") + require.Error(t, err) + }) +} From 935f63c0bb512a5f7567d6ff99e14c54fe1ba145 Mon Sep 17 00:00:00 2001 From: Rohith BCS Date: Tue, 7 Jan 2025 19:22:31 +0530 Subject: [PATCH 17/17] chore: ad fuzz tests --- jobsdb/integration_test.go | 107 +++++++++++++++++++++++++++++++++++++ jobsdb/migration_test.go | 8 +-- 2 files changed, 111 insertions(+), 4 deletions(-) diff --git a/jobsdb/integration_test.go b/jobsdb/integration_test.go index 7db19c8b67..237ffbad3f 100644 --- a/jobsdb/integration_test.go +++ b/jobsdb/integration_test.go @@ -1207,6 +1207,113 @@ func TestJobsdbSanitizeJSON(t *testing.T) { } } +// Fuzzer represents a test fuzzer for analytics events +type Fuzzer struct { + testData []string +} + +// NewFuzzer creates a new Fuzzer instance +func NewFuzzer() *Fuzzer { + return &Fuzzer{ + testData: make([]string, 0), + } +} + +// Add adds a test case to the fuzzer +func (f *Fuzzer) Add(data string) { + f.testData = append(f.testData, data) +} + +// TestFuzzCorpus is the main test function that sets up the test corpus +func GenerateFuzzCorpus() []string { + f := NewFuzzer() + + // Basic event types + f.Add(`{"type":"alias","messageId":"messageId","anonymousId":"anonymousId","userId":"userId","previousId":"previousId","sentAt":"2021-09-01T00:00:00.000Z","timestamp":"2021-09-01T00:00:00.000Z","receivedAt":"2021-09-01T00:00:00.000Z","originalTimestamp":"2021-09-01T00:00:00.000Z","channel":"web","request_ip":"5.6.7.8","context":{"traits":{"email":"rhedricks@example.com","logins":2},"ip":"1.2.3.4"}}`) + f.Add(`{"type":"alias","messageId":"messageId","anonymousId":"anonymousId","userId":"userId","previousId":"previousId","sentAt":"2021-09-01T00:00:00.000Z","timestamp":"2021-09-01T00:00:00.000Z","receivedAt":"2021-09-01T00:00:00.000Z","originalTimestamp":"2021-09-01T00:00:00.000Z","channel":"web","request_ip":"5.6.7.8","traits":{"title":"Home | RudderStack","url":"https://www.rudderstack.com"}}`) + + // Add page views + f.Add(`{"type":"page","name":"Home","messageId":"messageId","anonymousId":"anonymousId","userId":"userId","sentAt":"2021-09-01T00:00:00.000Z","timestamp":"2021-09-01T00:00:00.000Z","receivedAt":"2021-09-01T00:00:00.000Z","originalTimestamp":"2021-09-01T00:00:00.000Z","channel":"web","request_ip":"5.6.7.8","context":{"traits":{"name":"Richard Hendricks","email":"rhedricks@example.com","logins":2},"ip":"1.2.3.4"}}`) + + // Add screen views + f.Add(`{"type":"screen","name":"Main","messageId":"messageId","anonymousId":"anonymousId","userId":"userId","sentAt":"2021-09-01T00:00:00.000Z","timestamp":"2021-09-01T00:00:00.000Z","receivedAt":"2021-09-01T00:00:00.000Z","originalTimestamp":"2021-09-01T00:00:00.000Z","channel":"web","request_ip":"5.6.7.8","context":{"traits":{"name":"Richard Hendricks","email":"rhedricks@example.com","logins":2},"ip":"1.2.3.4"}}`) + + // Add group events + f.Add(`{"type":"group","messageId":"messageId","anonymousId":"anonymousId","userId":"userId","groupId":"groupId","sentAt":"2021-09-01T00:00:00.000Z","timestamp":"2021-09-01T00:00:00.000Z","receivedAt":"2021-09-01T00:00:00.000Z","originalTimestamp":"2021-09-01T00:00:00.000Z","channel":"web","request_ip":"5.6.7.8","context":{"traits":{"email":"rhedricks@example.com","logins":2},"ip":"1.2.3.4"}}`) + + // Add track events + f.Add(`{"type":"track","messageId":"messageId","anonymousId":"anonymousId","userId":"userId","sentAt":"2021-09-01T00:00:00.000Z","timestamp":"2021-09-01T00:00:00.000Z","receivedAt":"2021-09-01T00:00:00.000Z","originalTimestamp":"2021-09-01T00:00:00.000Z","channel":"web","event":"event","request_ip":"5.6.7.8","userProperties":{"rating":3.0,"review_body":"OK for the price. It works but the material feels flimsy."},"context":{"traits":{"name":"Richard Hendricks","email":"rhedricks@example.com","logins":2},"ip":"1.2.3.4"}}`) + + // Test column names with special characters and edge cases + columnNames := []string{ + // SQL keywords + "select", "from", "where", "and", "or", "not", "insert", "update", "delete", + // Special characters + "column name", "column-name", "column.name", "column@name", "column#name", + // Unicode characters + "columnñame", "colûmnname", "columnнаме", "列名", "カラム名", + // Very long names + "this_is_a_very_long_column_name_that_exceeds_the_maximum_allowed_length", + } + + for _, columnName := range columnNames { + f.Add(fmt.Sprintf(`{"type":"track","messageId":"messageId","userId":"userId","event":"test","properties":{"%s":"test_value"}}`, columnName)) + } + + // Test event names with special cases + eventNames := []string{ + "omega", "omega v2 ", "9mega", "mega&", "ome$ga", + "select", "drop", "create", "alter", "index", + "name with spaces", "name@with@special@chars", + "序列化", "テーブル", "таблица", + } + + for _, eventName := range eventNames { + f.Add(fmt.Sprintf(`{"type":"track","messageId":"messageId","userId":"userId","event":"%s","properties":{"test":"value"}}`, eventName)) + } + + // Add merge events + f.Add(`{"type":"merge","mergeProperties":[{"type":"email","value":"alex@example.com"},{"type":"mobile","value":"+1-202-555-0146"}]}`) + f.Add(`{"type":"merge"}`) + f.Add(`{"type":"merge", "mergeProperties": "invalid"}`) + + // Add identify events + f.Add(`{"type":"identify","messageId":"messageId","anonymousId":"anonymousId","userId":"userId","sentAt":"2021-09-01T00:00:00.000Z","timestamp":"2021-09-01T00:00:00.000Z","receivedAt":"2021-09-01T00:00:00.000Z","originalTimestamp":"2021-09-01T00:00:00.000Z","channel":"web","request_ip":"5.6.7.8","traits":{"name":"Richard Hendricks","email":"rhedricks@example.com","logins":2}}`) + + // Add extract events + f.Add(`{"type":"extract","recordId":"recordID","event":"event","receivedAt":"2021-09-01T00:00:00.000Z","context":{"traits":{"name":"Richard Hendricks","email":"rhedricks@example.com","logins":2},"ip":"1.2.3.4"}}`) + return f.testData +} + +func Test_FuzzTestStore(t *testing.T) { + _ = startPostgres(t) + conf := config.New() + columnTypes := []string{"jsonb", "text", "bytea"} + for _, column := range columnTypes { + t.Run(fmt.Sprintf("Store with %s column type", column), func(t *testing.T) { + conf.Set("JobsDB.payloadColumnType", column) + jobDB := Handle{config: conf} + err := jobDB.Setup(ReadWrite, true, column+"_"+strings.ToLower(rand.String(5))) + require.NoError(t, err) + testPayloads := GenerateFuzzCorpus() + for i, payload := range testPayloads { + jobs := []*JobT{{ + Parameters: []byte(`{"batch_id":1,"source_id":"sourceID","source_job_run_id":""}`), + EventPayload: []byte(payload), + UserID: uuid.New().String(), + UUID: uuid.New(), + CustomVal: fmt.Sprintf("TEST_%d", i), + WorkspaceId: defaultWorkspaceID, + EventCount: 1, + }} + err := jobDB.Store(context.Background(), jobs) + require.NoError(t, err) + } + jobDB.TearDown() + }) + } +} + // BenchmarkJobsdb takes time... keep waiting func BenchmarkJobsdb(b *testing.B) { // We are intentionally not using b.N, since we want to have a testbench for stress testing jobsdb's behaviour for longer periods of time (5-15 seconds) diff --git a/jobsdb/migration_test.go b/jobsdb/migration_test.go index f97ddcf022..c342d123e0 100644 --- a/jobsdb/migration_test.go +++ b/jobsdb/migration_test.go @@ -103,7 +103,7 @@ func TestMigration(t *testing.T) { require.EqualValues(t, 3, jobDB.GetMaxDSIndex()) // last DS - // should have enough statuses for a clean up to be triggered + // should have enough statuses for a cleanup to be triggered // all non-terminal require.NoError(t, jobDB.Store(context.Background(), jobs[20:30])) for i := 0; i < 10; i++ { @@ -140,7 +140,7 @@ func TestMigration(t *testing.T) { triggerMigrateDS <- time.Now() // waits for last loop to finish // we should see that in the three DSs we have, - // the first one should only have non-terminal jobs left now(with only the last status) in an jobs_1_1 + // the first one should only have non-terminal jobs left now(with only the last status) in jobs_1_1 // the second one should have all jobs // the third DS should have all jobs with only the last status per job @@ -434,7 +434,7 @@ func TestMigration(t *testing.T) { require.EqualValues(t, "bytea", payloadType) // last DS - // should have enough statuses for a clean up to be triggered + // should have enough statuses for a cleanup to be triggered // all non-terminal require.NoError(t, jobDB.Store(context.Background(), jobs[20:30])) for i := 0; i < 10; i++ { @@ -468,7 +468,7 @@ func TestMigration(t *testing.T) { // data moved from both jsonb and bytea columns to a text column // we should see that in the three DSs we have, - // the first one should only have non-terminal jobs left now(with only the last status) in an jobs_1_1 + // the first one should only have non-terminal jobs left now(with only the last status) in jobs_1_1 // the second one should have all jobs // the third DS should have all jobs with only the last status per job