Skip to content

Commit 22b14cf

Browse files
feat(event): handle in advance using source metadata payload
1 parent f277b44 commit 22b14cf

File tree

5 files changed

+81
-30
lines changed

5 files changed

+81
-30
lines changed

api

Submodule api updated 259 files

events-processor/models/event.go

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,19 @@ import (
1010
const HTTP_RUBY string = "http_ruby"
1111

1212
type Event struct {
13-
OrganizationID string `json:"organization_id"`
14-
ExternalSubscriptionID string `json:"external_subscription_id"`
15-
TransactionID string `json:"transaction_id"`
16-
Code string `json:"code"`
17-
Properties map[string]any `json:"properties"`
18-
PreciseTotalAmountCents string `json:"precise_total_amount_cents"`
19-
Source string `json:"source,omotempty"`
20-
Timestamp any `json:"timestamp"`
13+
OrganizationID string `json:"organization_id"`
14+
ExternalSubscriptionID string `json:"external_subscription_id"`
15+
TransactionID string `json:"transaction_id"`
16+
Code string `json:"code"`
17+
Properties map[string]any `json:"properties"`
18+
PreciseTotalAmountCents string `json:"precise_total_amount_cents"`
19+
Source string `json:"source,omotempty"`
20+
Timestamp any `json:"timestamp"`
21+
SourceMetadata *SourceMetadata `json:"source_metadata"`
22+
}
23+
24+
type SourceMetadata struct {
25+
ApiPostProcess bool `json:"api_post_processed"`
2126
}
2227

2328
type EnrichedEvent struct {
@@ -30,9 +35,9 @@ type EnrichedEvent struct {
3035
Properties map[string]any `json:"properties"`
3136
PreciseTotalAmountCents string `json:"precise_total_amount_cents"`
3237
Source string `json:"source,omotempty"`
33-
TimestampStr string `json:"-"`
34-
Timestamp float64 `json:"timestamp"`
3538
Value *string `json:"value"`
39+
Timestamp float64 `json:"timestamp"`
40+
TimestampStr string `json:"-"`
3641
Time time.Time `json:"-"`
3742
}
3843

@@ -71,3 +76,11 @@ func (ev *Event) ToEnrichedEvent() utils.Result[*EnrichedEvent] {
7176

7277
return utils.SuccessResult(er)
7378
}
79+
80+
func (ev *Event) ShouldCheckInAdvanceBilling() bool {
81+
if ev.Source != HTTP_RUBY {
82+
return true
83+
}
84+
85+
return ev.SourceMetadata == nil || !ev.SourceMetadata.ApiPostProcess
86+
}

events-processor/models/event_test.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,3 +54,34 @@ func TestToEnrichedEvent(t *testing.T) {
5454
assert.Equal(t, "strconv.ParseFloat: parsing \"2025-03-03T13:03:29Z\": invalid syntax", result.ErrorMsg())
5555
})
5656
}
57+
58+
func TestShouldCheckInAdvanceBilling(t *testing.T) {
59+
t.Run("When event source is not HTTP_RUBY", func(t *testing.T) {
60+
event := Event{
61+
Source: "REDPANDA_CONNECT",
62+
}
63+
64+
assert.True(t, event.ShouldCheckInAdvanceBilling())
65+
})
66+
67+
t.Run("When event source is HTTP_RUBY without source metadata", func(t *testing.T) {
68+
event := Event{
69+
Source: HTTP_RUBY,
70+
}
71+
72+
assert.True(t, event.ShouldCheckInAdvanceBilling())
73+
})
74+
75+
t.Run("When event source is HTTP_RUBY with source metadata", func(t *testing.T) {
76+
event := Event{
77+
Source: HTTP_RUBY,
78+
SourceMetadata: &SourceMetadata{
79+
ApiPostProcess: true,
80+
},
81+
}
82+
assert.False(t, event.ShouldCheckInAdvanceBilling())
83+
84+
event.SourceMetadata.ApiPostProcess = false
85+
assert.True(t, event.ShouldCheckInAdvanceBilling())
86+
})
87+
}

events-processor/processors/events.go

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -74,18 +74,23 @@ func processEvent(event *models.Event) utils.Result[*models.EnrichedEvent] {
7474
}
7575
bm := bmResult.Value()
7676

77-
if event.Source != models.HTTP_RUBY {
78-
subResult := apiStore.FetchSubscription(event.OrganizationID, event.ExternalSubscriptionID, enrichedEvent.Time)
79-
if subResult.Failure() {
80-
return failedResult(subResult, "fetch_subscription", "Error fetching subscription")
81-
}
82-
sub := subResult.Value()
77+
subResult := apiStore.FetchSubscription(event.OrganizationID, event.ExternalSubscriptionID, enrichedEvent.Time)
78+
if subResult.Failure() {
79+
return failedResult(subResult, "fetch_subscription", "Error fetching subscription")
80+
}
81+
sub := subResult.Value()
8382

84-
expressionResult := evaluateExpression(enrichedEvent, bm)
85-
if expressionResult.Failure() {
86-
return failedResult(expressionResult, "evaluate_expression", "Error evaluating custom expression")
87-
}
83+
expressionResult := evaluateExpression(enrichedEvent, bm)
84+
if expressionResult.Failure() {
85+
return failedResult(expressionResult, "evaluate_expression", "Error evaluating custom expression")
86+
}
87+
88+
var value = fmt.Sprintf("%v", event.Properties[bm.FieldName])
89+
enrichedEvent.Value = &value
8890

91+
go produceEnrichedEvent(enrichedEvent)
92+
93+
if event.ShouldCheckInAdvanceBilling() {
8994
hasInAdvanceChargeResult := apiStore.AnyInAdvanceCharge(sub.PlanID, bm.ID)
9095
if hasInAdvanceChargeResult.Failure() {
9196
return failedResult(hasInAdvanceChargeResult, "fetch_in_advance_charges", "Error fetching in advance charges")
@@ -96,10 +101,6 @@ func processEvent(event *models.Event) utils.Result[*models.EnrichedEvent] {
96101
}
97102
}
98103

99-
var value = fmt.Sprintf("%v", event.Properties[bm.FieldName])
100-
enrichedEvent.Value = &value
101-
go produceEnrichedEvent(enrichedEvent)
102-
103104
return utils.SuccessResult(enrichedEvent)
104105
}
105106

events-processor/processors/events_test.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ func TestProcessEvent(t *testing.T) {
7575
assert.Equal(t, "Error fetching billable metric", result.ErrorMessage())
7676
})
7777

78-
t.Run("When event source is HTTP_RUBY", func(t *testing.T) {
78+
t.Run("When event source is post processed on API", func(t *testing.T) {
7979
sqlmock, delete := setupTestEnv(t)
8080
defer delete()
8181

@@ -90,6 +90,9 @@ func TestProcessEvent(t *testing.T) {
9090
Timestamp: 1741007009,
9191
Source: models.HTTP_RUBY,
9292
Properties: properties,
93+
SourceMetadata: &models.SourceMetadata{
94+
ApiPostProcess: true,
95+
},
9396
}
9497

9598
bm := models.BillableMetric{
@@ -103,6 +106,9 @@ func TestProcessEvent(t *testing.T) {
103106
}
104107
mockBmLookup(sqlmock, &bm)
105108

109+
sub := models.Subscription{ID: "sub123"}
110+
mockSubscriptionLookup(sqlmock, &sub)
111+
106112
enrichedProducer := tests.MockMessageProducer{}
107113
eventsEnrichedProducer = &enrichedProducer
108114

@@ -117,7 +123,7 @@ func TestProcessEvent(t *testing.T) {
117123
assert.Equal(t, 1, enrichedProducer.ExecutionCount)
118124
})
119125

120-
t.Run("When event source is not HTTP_RUBY when timestamp is invalid", func(t *testing.T) {
126+
t.Run("When event source is not post process on API when timestamp is invalid", func(t *testing.T) {
121127
sqlmock, delete := setupTestEnv(t)
122128
defer delete()
123129

@@ -147,7 +153,7 @@ func TestProcessEvent(t *testing.T) {
147153
assert.Equal(t, "Error while converting event to enriched event", result.ErrorMessage())
148154
})
149155

150-
t.Run("When event source is not HTTP_RUBY when no subscriptions are found", func(t *testing.T) {
156+
t.Run("When event source is not post process on API when no subscriptions are found", func(t *testing.T) {
151157
sqlmock, delete := setupTestEnv(t)
152158
defer delete()
153159

@@ -179,7 +185,7 @@ func TestProcessEvent(t *testing.T) {
179185
assert.Equal(t, "Error fetching subscription", result.ErrorMessage())
180186
})
181187

182-
t.Run("When event source is not HTTP_RUBY when expression failed to evaluate", func(t *testing.T) {
188+
t.Run("When event source is not post process on API when expression failed to evaluate", func(t *testing.T) {
183189
sqlmock, delete := setupTestEnv(t)
184190
defer delete()
185191

@@ -217,7 +223,7 @@ func TestProcessEvent(t *testing.T) {
217223
assert.Equal(t, "Error evaluating custom expression", result.ErrorMessage())
218224
})
219225

220-
t.Run("When event source is not HTTP_RUBY and events belongs to an in advance charge", func(t *testing.T) {
226+
t.Run("When event source is not post process on API and events belongs to an in advance charge", func(t *testing.T) {
221227
sqlmock, delete := setupTestEnv(t)
222228
defer delete()
223229

0 commit comments

Comments
 (0)