Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: event_payload column can be JSONB or TEXT #5372

Merged
merged 25 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
58a72f5
chore: event_payload column can be JSONB, BYTEA or TEXT
Sidddddarth Dec 13, 2024
c5c21e3
fixup! chore: event_payload column can be JSONB, BYTEA or TEXT
Sidddddarth Dec 13, 2024
08c358a
chore: accommodate jobsdb startup to create/update lastDS - only w, r…
Sidddddarth Dec 17, 2024
4129e5e
fixup! chore: accommodate jobsdb startup to create/update lastDS - on…
Sidddddarth Dec 17, 2024
9894405
Merge branch 'master' into chore.jobsdbPayloadColumnType
Sidddddarth Dec 17, 2024
de47b4e
fixup! chore: accommodate jobsdb startup to create/update lastDS - on…
Sidddddarth Dec 17, 2024
0674995
chore: jobsdb_sanitizeJSON test
Sidddddarth Dec 17, 2024
3b802a9
chore: address tests for text payload column
Sidddddarth Dec 18, 2024
ef0001a
fixup! chore: address tests for text payload column
Sidddddarth Dec 18, 2024
8524933
fixup! chore: address tests for text payload column
Sidddddarth Dec 19, 2024
37133be
Merge branch 'master' into chore.jobsdbPayloadColumnType
Sidddddarth Dec 19, 2024
394a9ed
Merge branch 'master' into chore.jobsdbPayloadColumnType
Sidddddarth Dec 24, 2024
5a4adff
fixup! chore: address tests for text payload column
Sidddddarth Dec 26, 2024
571ee38
chore: update batchrouter isolation test to use inline json payload
Sidddddarth Dec 26, 2024
1e913a7
Merge branch 'master' into chore.jobsdbPayloadColumnType
Sidddddarth Dec 26, 2024
9f0c4b7
Merge branch 'master' into chore.jobsdbPayloadColumnType
Sidddddarth Dec 27, 2024
ee2c12a
chore: some review comments work
Sidddddarth Dec 28, 2024
ece9675
fixup! chore: some review comments work
Sidddddarth Dec 28, 2024
b55615a
Merge branch 'master' into chore.jobsdbPayloadColumnType
Sidddddarth Dec 28, 2024
a03976d
chore: table tests for jobsdb sanitizeJson - pending fuzz
Sidddddarth Dec 30, 2024
df7901a
chore: set default to jsonb
Sidddddarth Jan 3, 2025
3bc24b4
chore: clean up code
cisse21 Jan 7, 2025
c32baa3
Merge branch 'master' into chore.jobsdbPayloadColumnType
cisse21 Jan 7, 2025
935f63c
chore: ad fuzz tests
cisse21 Jan 7, 2025
41f8474
Merge branch 'master' into chore.jobsdbPayloadColumnType
cisse21 Jan 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
347 changes: 274 additions & 73 deletions jobsdb/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1024,92 +1024,293 @@ func requireSequential(t *testing.T, jobs []*JobT) {
}
}

func TestJobsDB_SanitizeJSON(t *testing.T) {
_ = startPostgres(t)
jobDB := Handle{config: config.New()}
func TestJobsdbSanitizeJSON(t *testing.T) {
type testCase struct {
in, out string
err error
}
ch := func(n int) string {
return strings.Repeat("�", n)
}
toValidUTF8Tests := []struct {
in string
out string
err error
byteaInvalidInputSyntaxError := errors.New("pq: invalid input syntax for type bytea")
UTF8Tests := []struct {
payloadColumnType string
cases []testCase
}{
{`\u0000`, "", nil},
{`\u0000☺\u0000b☺`, "☺b☺", nil},
// NOTE: we are not handling the following:
// {"\u0000", ""},
// {"\u0000☺\u0000b☺", "☺b☺"},

{"", "", nil},
{"abc", "abc", nil},
{"\uFDDD", "\uFDDD", nil},
{"a\xffb", "a" + ch(1) + "b", nil},
{"a\xffb\uFFFD", "a" + ch(1) + "b\uFFFD", nil},
{"a☺\xffb☺\xC0\xAFc☺\xff", "a☺" + ch(1) + "b☺" + ch(2) + "c☺" + ch(1), nil},
{"\xC0\xAF", ch(2), nil},
{"\xE0\x80\xAF", ch(3), nil},
{"\xed\xa0\x80", ch(3), nil},
{"\xed\xbf\xbf", ch(3), nil},
{"\xF0\x80\x80\xaf", ch(4), nil},
{"\xF8\x80\x80\x80\xAF", ch(5), nil},
{"\xFC\x80\x80\x80\x80\xAF", ch(6), nil},

// {"\ud800", ""},
{`\ud800`, ch(1), nil},
{`\uDEAD`, ch(1), nil},

{`\uD83D\ub000`, string([]byte{239, 191, 189, 235, 128, 128}), nil},
{`\uD83D\ude04`, "😄", nil},

{`\u4e2d\u6587`, "中文", nil},
{`\ud83d\udc4a`, "\xf0\x9f\x91\x8a", nil},

{`\U0001f64f`, ch(1), errors.New(`readEscapedChar: invalid escape char after`)},
{`\uD83D\u00`, ch(1), errors.New(`readU4: expects 0~9 or a~f, but found`)},
{
string(JSONB),
[]testCase{
{`\u0000`, "", nil},
{`\u0000☺\u0000b☺`, "☺b☺", nil},
// NOTE: we are not handling the following:
// {"\u0000", ""},
// {"\u0000☺\u0000b☺", "☺b☺"},

{"", "", nil},
{"abc", "abc", nil},
{"\uFDDD", "\uFDDD", nil},
{"a\xffb", "a" + ch(1) + "b", nil},
{"a\xffb\uFFFD", "a" + ch(1) + "b\uFFFD", nil},
{"a☺\xffb☺\xC0\xAFc☺\xff", "a☺" + ch(1) + "b☺" + ch(2) + "c☺" + ch(1), nil},
{"\xC0\xAF", ch(2), nil},
{"\xE0\x80\xAF", ch(3), nil},
{"\xed\xa0\x80", ch(3), nil},
{"\xed\xbf\xbf", ch(3), nil},
{"\xF0\x80\x80\xaf", ch(4), nil},
{"\xF8\x80\x80\x80\xAF", ch(5), nil},
{"\xFC\x80\x80\x80\x80\xAF", ch(6), nil},

// {"\ud800", ""},
{`\ud800`, ch(1), nil},
{`\uDEAD`, ch(1), nil},

{`\uD83D\ub000`, string([]byte{239, 191, 189, 235, 128, 128}), nil},
{`\uD83D\ude04`, "😄", nil},

{`\u4e2d\u6587`, "中文", nil},
{`\ud83d\udc4a`, "\xf0\x9f\x91\x8a", nil},

{`\U0001f64f`, ch(1), errors.New(`readEscapedChar: invalid escape char after`)},
{`\uD83D\u00`, ch(1), errors.New(`readU4: expects 0~9 or a~f, but found`)},
},
},
{
string(TEXT),
[]testCase{
{`\u0000`, `\u0000`, nil},
{`\u0000☺\u0000b☺`, `\u0000☺\u0000b☺`, nil},

{"", "", nil},
{"abc", "abc", nil},
{"\uFDDD", "\uFDDD", nil},
{"a\xffb", `a\ufffdb`, nil},
{"a\xffb\uFFFD", `a\ufffdb�`, nil},
{"a☺\xffb☺\xC0\xAFc☺\xff", `a☺\ufffdb☺\ufffd\ufffdc☺\ufffd`, nil},
{"\xC0\xAF", `\ufffd\ufffd`, nil},
{"\xE0\x80\xAF", `\ufffd\ufffd\ufffd`, nil},
{"\xed\xa0\x80", `\ufffd\ufffd\ufffd`, nil},
{"\xed\xbf\xbf", `\ufffd\ufffd\ufffd`, nil},
{"\xF0\x80\x80\xaf", `\ufffd\ufffd\ufffd\ufffd`, nil},
{"\xF8\x80\x80\x80\xAF", `\ufffd\ufffd\ufffd\ufffd\ufffd`, nil},
{"\xFC\x80\x80\x80\x80\xAF", `\ufffd\ufffd\ufffd\ufffd\ufffd\ufffd`, nil},

// {"\ud800", ""},
// 15
{`\ud800`, `\ud800`, nil},
{`\uDEAD`, `\uDEAD`, nil},

{`\uD83D\ub000`, `\uD83D\ub000`, nil},
{`\uD83D\ude04`, `\uD83D\ude04`, nil},

{`\u4e2d\u6587`, `\u4e2d\u6587`, nil},
{`\ud83d\udc4a`, `\ud83d\udc4a`, nil},

// 21
{`\U0001f64f`, `\U0001f64f`, nil},
{`\uD83D\u00`, `\uD83D\u00`, nil},
},
},
{
string(BYTEA),
[]testCase{
{`\u0000`, "", nil},
{`\u0000☺\u0000b☺`, "☺b☺", nil},
// NOTE: we are not handling the following:
// {"\u0000", ""},
// {"\u0000☺\u0000b☺", "☺b☺"},

{"", "", nil},
{"abc", "abc", nil},
{"\uFDDD", "\uFDDD", nil},
{"a\xffb", "a" + ch(1) + "b", byteaInvalidInputSyntaxError},
{"a\xffb\uFFFD", "a" + ch(1) + "b\uFFFD", byteaInvalidInputSyntaxError},
{"a☺\xffb☺\xC0\xAFc☺\xff", "a☺" + ch(1) + "b☺" + ch(2) + "c☺" + ch(1), byteaInvalidInputSyntaxError},
{"\xC0\xAF", ch(2), byteaInvalidInputSyntaxError},
{"\xE0\x80\xAF", ch(3), byteaInvalidInputSyntaxError},
{"\xed\xa0\x80", ch(3), byteaInvalidInputSyntaxError},
{"\xed\xbf\xbf", ch(3), byteaInvalidInputSyntaxError},
{"\xF0\x80\x80\xaf", ch(4), byteaInvalidInputSyntaxError},
{"\xF8\x80\x80\x80\xAF", ch(5), byteaInvalidInputSyntaxError},
{"\xFC\x80\x80\x80\x80\xAF", ch(6), byteaInvalidInputSyntaxError},

// {"\ud800", ""},
// 15
{`\ud800`, ch(1), nil},
{`\uDEAD`, ch(1), nil},

{`\uD83D\ub000`, string([]byte{239, 191, 189, 235, 128, 128}), nil},
{`\uD83D\ude04`, "😄", nil},

{`\u4e2d\u6587`, "中文", nil},
{`\ud83d\udc4a`, "\xf0\x9f\x91\x8a", nil},

{`\U0001f64f`, ch(1), errors.New(`readEscapedChar: invalid escape char after`)},
{`\uD83D\u00`, ch(1), errors.New(`readU4: expects 0~9 or a~f, but found`)},
},
},
}

err := jobDB.Setup(ReadWrite, false, strings.ToLower(rand.String(5)))
require.NoError(t, err)
defer jobDB.TearDown()
for _, tCase := range UTF8Tests {
t.Run(tCase.payloadColumnType, func(t *testing.T) {
_ = startPostgres(t)
conf := config.New()
conf.Set("JobsDB.payloadColumnType", tCase.payloadColumnType)
jobDB := Handle{config: conf}
err := jobDB.Setup(ReadWrite, true, tCase.payloadColumnType+"_"+strings.ToLower(rand.String(5)))
require.NoError(t, err, tCase.payloadColumnType)
eventPayload := []byte(`{"batch":[{"anonymousId":"anon_id","sentAt":"2019-08-12T05:08:30.909Z","type":"track"}]}`)
for i, tt := range tCase.cases {
customVal := fmt.Sprintf("TEST_%d", i)
jobs := []*JobT{{
Parameters: []byte(`{"batch_id":1,"source_id":"sourceID","source_job_run_id":""}`),
EventPayload: bytes.Replace(eventPayload, []byte("track"), []byte(tt.in), 1),
UserID: uuid.New().String(),
UUID: uuid.New(),
CustomVal: customVal,
WorkspaceId: defaultWorkspaceID,
EventCount: 1,
}}
err := jobDB.Store(context.Background(), jobs)
if tt.err != nil {
require.Error(t, err, "should error", tCase.payloadColumnType, i)
require.Contains(t, err.Error(), tt.err.Error(), "should contain error", tCase.payloadColumnType, i)
continue
}

eventPayload := []byte(`{"batch": [{"anonymousId":"anon_id","sentAt":"2019-08-12T05:08:30.909Z","type":"track"}]}`)
for i, tt := range toValidUTF8Tests {
require.NoError(t, err, tCase.payloadColumnType, i)

unprocessedJob, err := jobDB.GetUnprocessed(context.Background(), GetQueryParams{
CustomValFilters: []string{customVal},
JobsLimit: 10,
ParameterFilters: []ParameterFilterT{},
})
require.NoError(t, err, "should not error")

require.Len(t, unprocessedJob.Jobs, 1, tCase.payloadColumnType, i)

if tCase.payloadColumnType == string(TEXT) { // some can't be valid json
require.Equal(t,
string(bytes.Replace(eventPayload, []byte("track"), []byte(tt.out), 1)),
string(unprocessedJob.Jobs[0].EventPayload),
tCase.payloadColumnType, i,
)
} else {
require.JSONEq(t,
string(bytes.Replace(eventPayload, []byte("track"), []byte(tt.out), 1)),
string(unprocessedJob.Jobs[0].EventPayload),
tCase.payloadColumnType, i,
)
}
}
jobDB.TearDown()
})
}
}

customVal := fmt.Sprintf("TEST_%d", i)
// Fuzzer represents a test fuzzer for analytics events
type Fuzzer struct {
testData []string
}

jobs := []*JobT{{
Parameters: []byte(`{"batch_id":1,"source_id":"sourceID","source_job_run_id":""}`),
EventPayload: bytes.Replace(eventPayload, []byte("track"), []byte(tt.in), 1),
UserID: uuid.New().String(),
UUID: uuid.New(),
CustomVal: customVal,
WorkspaceId: defaultWorkspaceID,
EventCount: 1,
}}

err := jobDB.Store(context.Background(), jobs)
if tt.err != nil {
require.Error(t, err, "should error")
require.Contains(t, err.Error(), tt.err.Error(), "should contain error")
continue
}
// NewFuzzer creates a new Fuzzer instance
func NewFuzzer() *Fuzzer {
return &Fuzzer{
testData: make([]string, 0),
}
}

require.NoError(t, err)
// Add adds a test case to the fuzzer
func (f *Fuzzer) Add(data string) {
f.testData = append(f.testData, data)
}

unprocessedJob, err := jobDB.GetUnprocessed(context.Background(), GetQueryParams{
CustomValFilters: []string{customVal},
JobsLimit: 10,
ParameterFilters: []ParameterFilterT{},
})
require.NoError(t, err, "should not error")
// TestFuzzCorpus is the main test function that sets up the test corpus
func GenerateFuzzCorpus() []string {
f := NewFuzzer()

require.Len(t, unprocessedJob.Jobs, 1)
// Basic event types
f.Add(`{"type":"alias","messageId":"messageId","anonymousId":"anonymousId","userId":"userId","previousId":"previousId","sentAt":"2021-09-01T00:00:00.000Z","timestamp":"2021-09-01T00:00:00.000Z","receivedAt":"2021-09-01T00:00:00.000Z","originalTimestamp":"2021-09-01T00:00:00.000Z","channel":"web","request_ip":"5.6.7.8","context":{"traits":{"email":"[email protected]","logins":2},"ip":"1.2.3.4"}}`)
f.Add(`{"type":"alias","messageId":"messageId","anonymousId":"anonymousId","userId":"userId","previousId":"previousId","sentAt":"2021-09-01T00:00:00.000Z","timestamp":"2021-09-01T00:00:00.000Z","receivedAt":"2021-09-01T00:00:00.000Z","originalTimestamp":"2021-09-01T00:00:00.000Z","channel":"web","request_ip":"5.6.7.8","traits":{"title":"Home | RudderStack","url":"https://www.rudderstack.com"}}`)

require.JSONEq(t,
string(bytes.Replace(eventPayload, []byte("track"), []byte(tt.out), 1)),
string(unprocessedJob.Jobs[0].EventPayload),
)
// Add page views
f.Add(`{"type":"page","name":"Home","messageId":"messageId","anonymousId":"anonymousId","userId":"userId","sentAt":"2021-09-01T00:00:00.000Z","timestamp":"2021-09-01T00:00:00.000Z","receivedAt":"2021-09-01T00:00:00.000Z","originalTimestamp":"2021-09-01T00:00:00.000Z","channel":"web","request_ip":"5.6.7.8","context":{"traits":{"name":"Richard Hendricks","email":"[email protected]","logins":2},"ip":"1.2.3.4"}}`)

// Add screen views
f.Add(`{"type":"screen","name":"Main","messageId":"messageId","anonymousId":"anonymousId","userId":"userId","sentAt":"2021-09-01T00:00:00.000Z","timestamp":"2021-09-01T00:00:00.000Z","receivedAt":"2021-09-01T00:00:00.000Z","originalTimestamp":"2021-09-01T00:00:00.000Z","channel":"web","request_ip":"5.6.7.8","context":{"traits":{"name":"Richard Hendricks","email":"[email protected]","logins":2},"ip":"1.2.3.4"}}`)

// Add group events
f.Add(`{"type":"group","messageId":"messageId","anonymousId":"anonymousId","userId":"userId","groupId":"groupId","sentAt":"2021-09-01T00:00:00.000Z","timestamp":"2021-09-01T00:00:00.000Z","receivedAt":"2021-09-01T00:00:00.000Z","originalTimestamp":"2021-09-01T00:00:00.000Z","channel":"web","request_ip":"5.6.7.8","context":{"traits":{"email":"[email protected]","logins":2},"ip":"1.2.3.4"}}`)

// Add track events
f.Add(`{"type":"track","messageId":"messageId","anonymousId":"anonymousId","userId":"userId","sentAt":"2021-09-01T00:00:00.000Z","timestamp":"2021-09-01T00:00:00.000Z","receivedAt":"2021-09-01T00:00:00.000Z","originalTimestamp":"2021-09-01T00:00:00.000Z","channel":"web","event":"event","request_ip":"5.6.7.8","userProperties":{"rating":3.0,"review_body":"OK for the price. It works but the material feels flimsy."},"context":{"traits":{"name":"Richard Hendricks","email":"[email protected]","logins":2},"ip":"1.2.3.4"}}`)

// Test column names with special characters and edge cases
columnNames := []string{
// SQL keywords
"select", "from", "where", "and", "or", "not", "insert", "update", "delete",
// Special characters
"column name", "column-name", "column.name", "column@name", "column#name",
// Unicode characters
"columnñame", "colûmnname", "columnнаме", "列名", "カラム名",
// Very long names
"this_is_a_very_long_column_name_that_exceeds_the_maximum_allowed_length",
}

for _, columnName := range columnNames {
f.Add(fmt.Sprintf(`{"type":"track","messageId":"messageId","userId":"userId","event":"test","properties":{"%s":"test_value"}}`, columnName))
}

// Test event names with special cases
eventNames := []string{
"omega", "omega v2 ", "9mega", "mega&", "ome$ga",
"select", "drop", "create", "alter", "index",
"name with spaces", "name@with@special@chars",
"序列化", "テーブル", "таблица",
}

for _, eventName := range eventNames {
f.Add(fmt.Sprintf(`{"type":"track","messageId":"messageId","userId":"userId","event":"%s","properties":{"test":"value"}}`, eventName))
}

// Add merge events
f.Add(`{"type":"merge","mergeProperties":[{"type":"email","value":"[email protected]"},{"type":"mobile","value":"+1-202-555-0146"}]}`)
f.Add(`{"type":"merge"}`)
f.Add(`{"type":"merge", "mergeProperties": "invalid"}`)

// Add identify events
f.Add(`{"type":"identify","messageId":"messageId","anonymousId":"anonymousId","userId":"userId","sentAt":"2021-09-01T00:00:00.000Z","timestamp":"2021-09-01T00:00:00.000Z","receivedAt":"2021-09-01T00:00:00.000Z","originalTimestamp":"2021-09-01T00:00:00.000Z","channel":"web","request_ip":"5.6.7.8","traits":{"name":"Richard Hendricks","email":"[email protected]","logins":2}}`)

// Add extract events
f.Add(`{"type":"extract","recordId":"recordID","event":"event","receivedAt":"2021-09-01T00:00:00.000Z","context":{"traits":{"name":"Richard Hendricks","email":"[email protected]","logins":2},"ip":"1.2.3.4"}}`)
return f.testData
}

func Test_FuzzTestStore(t *testing.T) {
_ = startPostgres(t)
conf := config.New()
columnTypes := []string{"jsonb", "text", "bytea"}
for _, column := range columnTypes {
t.Run(fmt.Sprintf("Store with %s column type", column), func(t *testing.T) {
conf.Set("JobsDB.payloadColumnType", column)
jobDB := Handle{config: conf}
err := jobDB.Setup(ReadWrite, true, column+"_"+strings.ToLower(rand.String(5)))
require.NoError(t, err)
testPayloads := GenerateFuzzCorpus()
for i, payload := range testPayloads {
jobs := []*JobT{{
Parameters: []byte(`{"batch_id":1,"source_id":"sourceID","source_job_run_id":""}`),
EventPayload: []byte(payload),
UserID: uuid.New().String(),
UUID: uuid.New(),
CustomVal: fmt.Sprintf("TEST_%d", i),
WorkspaceId: defaultWorkspaceID,
EventCount: 1,
}}
err := jobDB.Store(context.Background(), jobs)
require.NoError(t, err)
}
jobDB.TearDown()
})
}
}

Expand Down
Loading
Loading