Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: migrate sample event column to text for reporting #5503

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion enterprise/reporting/error_reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func (edr *ErrorDetailReporter) Report(ctx context.Context, metrics []*types.PUR
metric.StatusDetail.ErrorDetails.Code,
metric.StatusDetail.ErrorDetails.Message,
sampleResponse,
string(sampleEvent),
sampleEvent,
metric.StatusDetail.EventName,
)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion enterprise/reporting/label_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func createMetricObject(eventName, errorMessage string) types.PUReportedMetric {
Count: 3,
StatusCode: 0,
SampleResponse: `{"some-sample-response-key": "some-sample-response-value"}`,
SampleEvent: []byte(`{"some-sample-event-key": "some-sample-event-value"}`),
SampleEvent: `{"some-sample-event-key": "some-sample-event-value"}`,
EventName: eventName,
EventType: "some-event-type",
},
Expand Down
8 changes: 4 additions & 4 deletions enterprise/reporting/reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,12 +250,12 @@ func (r *DefaultReporter) getReports(currentMs, aggregationIntervalMin int64, sy
SELECT
%s, MAX(reported_at),
COALESCE(
(ARRAY_AGG(sample_response ORDER BY id DESC) FILTER (WHERE (sample_event != '{}'::jsonb AND sample_event IS NOT NULL) OR (sample_response IS NOT NULL AND sample_response != '')))[1],
(ARRAY_AGG(sample_response ORDER BY id DESC) FILTER (WHERE (sample_event != '{}' AND sample_event IS NOT NULL) OR (sample_response IS NOT NULL AND sample_response != '')))[1],
''
) AS sample_response,
COALESCE(
(ARRAY_AGG(sample_event ORDER BY id DESC) FILTER (WHERE (sample_event != '{}'::jsonb AND sample_event IS NOT NULL) OR (sample_response IS NOT NULL AND sample_response != '')))[1],
'{}'::jsonb
(ARRAY_AGG(sample_event ORDER BY id DESC) FILTER (WHERE (sample_event != '{}' AND sample_event IS NOT NULL) OR (sample_response IS NOT NULL AND sample_response != '')))[1],
'{}'
) AS sample_event,
SUM(count),
SUM(violation_count)
Expand Down Expand Up @@ -715,7 +715,7 @@ func (r *DefaultReporter) Report(ctx context.Context, metrics []*types.PUReporte
metric.StatusDetail.Count, metric.StatusDetail.ViolationCount,
metric.PUDetails.TerminalPU, metric.PUDetails.InitialPU,
metric.StatusDetail.StatusCode,
sampleResponse, string(sampleEvent),
sampleResponse, sampleEvent,
metric.StatusDetail.EventName, metric.StatusDetail.EventType,
metric.StatusDetail.ErrorType,
)
Expand Down
32 changes: 16 additions & 16 deletions enterprise/reporting/reporting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ var _ = Describe("Reporting", func() {
Count: 3,
StatusCode: 0,
SampleResponse: `{"some-sample-response-key": "some-sample-response-value"}`,
SampleEvent: []byte(`{"some-sample-event-key": "some-sample-event-value"}`),
SampleEvent: `{"some-sample-event-key": "some-sample-event-value"}`,
EventName: "some-event-name",
EventType: "some-event-type",
},
Expand All @@ -61,7 +61,7 @@ var _ = Describe("Reporting", func() {
Count: 3,
StatusCode: 0,
SampleResponse: "",
SampleEvent: []byte(`{}`),
SampleEvent: `{}`,
EventName: "",
EventType: "",
},
Expand Down Expand Up @@ -118,7 +118,7 @@ func TestGetAggregatedReports(t *testing.T) {
ViolationCount: 5,
StatusCode: 200,
SampleResponse: "",
SampleEvent: []byte(`{}`),
SampleEvent: `{}`,
ErrorType: "",
},
},
Expand All @@ -145,7 +145,7 @@ func TestGetAggregatedReports(t *testing.T) {
ViolationCount: 10,
StatusCode: 200,
SampleResponse: "",
SampleEvent: []byte(`{}`),
SampleEvent: `{}`,
ErrorType: "some-error-type",
},
},
Expand All @@ -172,7 +172,7 @@ func TestGetAggregatedReports(t *testing.T) {
ViolationCount: 10,
StatusCode: 200,
SampleResponse: "",
SampleEvent: []byte(`{}`),
SampleEvent: `{}`,
ErrorType: "some-error-type",
},
},
Expand Down Expand Up @@ -212,7 +212,7 @@ func TestGetAggregatedReports(t *testing.T) {
ViolationCount: 5,
StatusCode: 200,
SampleResponse: "",
SampleEvent: []byte(`{}`),
SampleEvent: `{}`,
ErrorType: "",
},
},
Expand Down Expand Up @@ -242,7 +242,7 @@ func TestGetAggregatedReports(t *testing.T) {
ViolationCount: 10,
StatusCode: 200,
SampleResponse: "",
SampleEvent: []byte(`{}`),
SampleEvent: `{}`,
ErrorType: "some-error-type",
},
},
Expand Down Expand Up @@ -272,7 +272,7 @@ func TestGetAggregatedReports(t *testing.T) {
ViolationCount: 10,
StatusCode: 200,
SampleResponse: "",
SampleEvent: []byte(`{}`),
SampleEvent: `{}`,
ErrorType: "some-error-type",
},
},
Expand Down Expand Up @@ -313,7 +313,7 @@ func TestGetAggregatedReports(t *testing.T) {
ViolationCount: 5,
StatusCode: 200,
SampleResponse: "",
SampleEvent: []byte(`{}`),
SampleEvent: `{}`,
ErrorType: "",
},
{
Expand All @@ -322,7 +322,7 @@ func TestGetAggregatedReports(t *testing.T) {
ViolationCount: 10,
StatusCode: 200,
SampleResponse: "",
SampleEvent: []byte(`{}`),
SampleEvent: `{}`,
ErrorType: "some-error-type",
},
},
Expand Down Expand Up @@ -352,7 +352,7 @@ func TestGetAggregatedReports(t *testing.T) {
ViolationCount: 10,
StatusCode: 200,
SampleResponse: "",
SampleEvent: []byte(`{}`),
SampleEvent: `{}`,
ErrorType: "some-error-type",
},
},
Expand Down Expand Up @@ -390,7 +390,7 @@ func TestGetAggregatedReports(t *testing.T) {
ViolationCount: 10,
StatusCode: 200,
SampleResponse: "",
SampleEvent: []byte(`{}`),
SampleEvent: `{}`,
ErrorType: "another-error-type",
},
}
Expand Down Expand Up @@ -421,7 +421,7 @@ func TestGetAggregatedReports(t *testing.T) {
ViolationCount: 5,
StatusCode: 200,
SampleResponse: "",
SampleEvent: []byte(`{}`),
SampleEvent: `{}`,
ErrorType: "",
},
{
Expand All @@ -430,7 +430,7 @@ func TestGetAggregatedReports(t *testing.T) {
ViolationCount: 10,
StatusCode: 200,
SampleResponse: "",
SampleEvent: []byte(`{}`),
SampleEvent: `{}`,
ErrorType: "some-error-type",
},
},
Expand Down Expand Up @@ -460,7 +460,7 @@ func TestGetAggregatedReports(t *testing.T) {
ViolationCount: 10,
StatusCode: 200,
SampleResponse: "",
SampleEvent: []byte(`{}`),
SampleEvent: `{}`,
ErrorType: "some-error-type",
},
},
Expand Down Expand Up @@ -490,7 +490,7 @@ func TestGetAggregatedReports(t *testing.T) {
ViolationCount: 10,
StatusCode: 200,
SampleResponse: "",
SampleEvent: []byte(`{}`),
SampleEvent: `{}`,
ErrorType: "another-error-type",
},
},
Expand Down
20 changes: 3 additions & 17 deletions enterprise/reporting/util_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package reporting

import (
"encoding/json"
"errors"
"testing"
"time"
Expand Down Expand Up @@ -226,7 +225,7 @@ func TestGetAggregationBucket(t *testing.T) {
}

func TestGetSampleWithEventSamplingWithNilEventSampler(t *testing.T) {
inputSampleEvent := json.RawMessage(`{"event": "1"}`)
inputSampleEvent := `{"event": "1"}`
inputSampleResponse := "response"
metric := types.PUReportedMetric{
StatusDetail: &types.StatusDetail{
Expand Down Expand Up @@ -269,9 +268,9 @@ func TestFloorFactor(t *testing.T) {
}

func TestGetSampleWithEventSampling(t *testing.T) {
sampleEvent := json.RawMessage(`{"event": "2"}`)
sampleEvent := `{"event": "2"}`
sampleResponse := "sample response"
emptySampleEvent := json.RawMessage(`{}`)
emptySampleEvent := `{}`
emptySampleResponse := ""

tests := []struct {
Expand All @@ -284,19 +283,6 @@ func TestGetSampleWithEventSampling(t *testing.T) {
shouldPut bool
putError error
}{
{
name: "Nil sample event",
metric: types.PUReportedMetric{
StatusDetail: &types.StatusDetail{
SampleEvent: nil,
},
},
wantMetric: types.PUReportedMetric{
StatusDetail: &types.StatusDetail{
SampleEvent: nil,
},
},
},
{
name: "Empty sample event",
metric: types.PUReportedMetric{
Expand Down
10 changes: 5 additions & 5 deletions enterprise/reporting/utils.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package reporting

import (
"encoding/json"
"sort"
"strings"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-server/enterprise/reporting/event_sampler"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/types"
)

Expand Down Expand Up @@ -45,15 +45,15 @@ func getAggregationBucketMinute(timeMs, intervalMs int64) (int64, int64) {
return bucketStart, bucketEnd
}

func getSampleWithEventSampling(metric types.PUReportedMetric, reportedAt int64, eventSampler event_sampler.EventSampler, eventSamplingEnabled bool, eventSamplingDuration int64) (sampleEvent json.RawMessage, sampleResponse string, err error) {
func getSampleWithEventSampling(metric types.PUReportedMetric, reportedAt int64, eventSampler event_sampler.EventSampler, eventSamplingEnabled bool, eventSamplingDuration int64) (sampleEvent, sampleResponse string, err error) {
sampleEvent = metric.StatusDetail.SampleEvent
sampleResponse = metric.StatusDetail.SampleResponse

if !eventSamplingEnabled || eventSampler == nil {
return sampleEvent, sampleResponse, nil
}

isValidSample := (sampleEvent != nil && string(sampleEvent) != "{}") || sampleResponse != ""
isValidSample := (sampleEvent != misc.EmptyPayloadString || sampleResponse != "")

if isValidSample {
sampleEventBucket, _ := getAggregationBucketMinute(reportedAt, eventSamplingDuration)
Expand All @@ -66,7 +66,7 @@ func getSampleWithEventSampling(metric types.PUReportedMetric, reportedAt int64,
}

if found {
sampleEvent = json.RawMessage(`{}`)
sampleEvent = misc.EmptyPayloadString
sampleResponse = ""
} else {
err = eventSampler.Put(hash)
Expand All @@ -80,7 +80,7 @@ func transformMetricForPII(metric types.PUReportedMetric, piiColumns []string) t
for _, col := range piiColumns {
switch col {
case "sample_event":
metric.StatusDetail.SampleEvent = []byte(`{}`)
metric.StatusDetail.SampleEvent = misc.EmptyPayloadString
case "sample_response":
metric.StatusDetail.SampleResponse = ""
case "event_name":
Expand Down
30 changes: 15 additions & 15 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1147,20 +1147,20 @@
)

for _, message := range messages {
proc.updateMetricMaps(successCountMetadataMap, successCountMap, connectionDetailsMap, statusDetailsMap, userTransformedEvent, jobsdb.Succeeded.State, pu, func() json.RawMessage {
proc.updateMetricMaps(successCountMetadataMap, successCountMap, connectionDetailsMap, statusDetailsMap, userTransformedEvent, jobsdb.Succeeded.State, pu, func() string {
if pu != types.TRACKINGPLAN_VALIDATOR {
return []byte(`{}`)
return misc.EmptyPayloadString
}
if proc.transientSources.Apply(commonMetaData.SourceID) {
return []byte(`{}`)
return misc.EmptyPayloadString

Check warning on line 1155 in processor/processor.go

View check run for this annotation

Codecov / codecov/patch

processor/processor.go#L1155

Added line #L1155 was not covered by tests
}

sampleEvent, err := jsonfast.Marshal(message)
if err != nil {
proc.logger.Errorf(`[Processor: getDestTransformerEvents] Failed to unmarshal first element in transformed events: %v`, err)
sampleEvent = []byte(`{}`)
}
return sampleEvent
return string(sampleEvent)

Check warning on line 1163 in processor/processor.go

View check run for this annotation

Codecov / codecov/patch

processor/processor.go#L1163

Added line #L1163 was not covered by tests
},
nil)
}
Expand Down Expand Up @@ -1218,7 +1218,7 @@
statusDetailsMap map[string]map[string]*types.StatusDetail,
event *transformer.TransformerResponse,
status, stage string,
payload func() json.RawMessage,
payload func() string,
eventsByMessageID map[string]types.SingularEventWithReceivedAt,
) {
if !proc.isReportingEnabled() {
Expand Down Expand Up @@ -1450,16 +1450,16 @@
failedEvent,
state,
pu,
func() json.RawMessage {
func() string {
if proc.transientSources.Apply(commonMetaData.SourceID) {
return []byte(`{}`)
return misc.EmptyPayloadString

Check warning on line 1455 in processor/processor.go

View check run for this annotation

Codecov / codecov/patch

processor/processor.go#L1455

Added line #L1455 was not covered by tests
}
sampleEvent, err := jsonfast.Marshal(message)
if err != nil {
proc.logger.Errorf(`[Processor: getTransformationMetrics] Failed to unmarshal first element in failed events: %v`, err)
sampleEvent = []byte(`{}`)
}
return sampleEvent
return string(sampleEvent)
},
eventsByMessageID)
}
Expand Down Expand Up @@ -1613,7 +1613,7 @@
StatusDetail: &types.StatusDetail{
Status: types.DiffStatus,
Count: diff,
SampleEvent: []byte(`{}`),
SampleEvent: misc.EmptyPayloadString,
EventName: eventName,
EventType: eventType,
},
Expand Down Expand Up @@ -1938,14 +1938,14 @@
transformerEvent,
jobsdb.Succeeded.State,
types.GATEWAY,
func() json.RawMessage {
func() string {
if sourceIsTransient {
return []byte(`{}`)
return misc.EmptyPayloadString

Check warning on line 1943 in processor/processor.go

View check run for this annotation

Codecov / codecov/patch

processor/processor.go#L1943

Added line #L1943 was not covered by tests
}
if payload := event.payloadFunc(); payload != nil {
return payload
return string(payload)
}
return []byte("{}")
return misc.EmptyPayloadString

Check warning on line 1948 in processor/processor.go

View check run for this annotation

Codecov / codecov/patch

processor/processor.go#L1948

Added line #L1948 was not covered by tests
},
nil,
)
Expand Down Expand Up @@ -1988,7 +1988,7 @@
groupedEventsBySourceId[SourceIDT(sourceId)] = append(groupedEventsBySourceId[SourceIDT(sourceId)], shallowEventCopy)

if proc.isReportingEnabled() {
proc.updateMetricMaps(inCountMetadataMap, outCountMap, connectionDetailsMap, destFilterStatusDetailMap, transformerEvent, jobsdb.Succeeded.State, types.DESTINATION_FILTER, func() json.RawMessage { return []byte(`{}`) }, nil)
proc.updateMetricMaps(inCountMetadataMap, outCountMap, connectionDetailsMap, destFilterStatusDetailMap, transformerEvent, jobsdb.Succeeded.State, types.DESTINATION_FILTER, func() string { return misc.EmptyPayloadString }, nil)
}
}

Expand Down Expand Up @@ -2946,7 +2946,7 @@
successCountMap := make(map[string]int64)
for i := range response.Events {
// Update metrics maps
proc.updateMetricMaps(nil, successCountMap, connectionDetailsMap, statusDetailsMap, &response.Events[i], jobsdb.Succeeded.State, types.DEST_TRANSFORMER, func() json.RawMessage { return []byte(`{}`) }, nil)
proc.updateMetricMaps(nil, successCountMap, connectionDetailsMap, statusDetailsMap, &response.Events[i], jobsdb.Succeeded.State, types.DEST_TRANSFORMER, func() string { return misc.EmptyPayloadString }, nil)
}
types.AssertSameKeys(connectionDetailsMap, statusDetailsMap)

Expand Down
Loading
Loading