Skip to content

Commit a456dab

Browse files
authored
Generate payer reports (#646)
## tl;dr - Defines structs for the off chain representation of Payer Reports - Actually generates Payer Reports - Alters the `unsettled_usage` table to keep track of the last known sequence ID for each originator/payer combo in a given minute. This makes it cheap to look up when generating a Payer Report ## Notes ### Minimum Report Size - To generate a Payer Report you need a minimum of two messages, originated in two distinct minutes. This resolves #645 ## Tickets Implements part of #514 <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced new queries to aggregate spending data and retrieve envelope details, enhancing data reporting. - Added a comprehensive mechanism for generating and verifying payer reports, improving overall reporting accuracy. - Improved envelope creation by integrating timestamp support for more precise data handling. - Added new queries for enhanced data retrieval capabilities, including payer reports and gateway envelope details. - Introduced a new field for tracking last sequence IDs in unsettled usage records, enhancing data integrity. - **Bug Fixes** - Adjusted existing queries to return additional data fields, ensuring accurate reporting. - **Tests** - Expanded test coverage to validate various report generation scenarios, ensuring reliable performance. - Added tests for handling out-of-order sequence IDs during database operations, improving robustness. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent 2bcc92c commit a456dab

12 files changed

+681
-20
lines changed

pkg/db/gatewayEnvelope.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@ func InsertGatewayEnvelopeAndIncrementUnsettledUsage(
3333

3434
var wg sync.WaitGroup
3535
var incrementErr, congestionErr error
36+
// Use the sequence ID from the envelope to set the last sequence ID value
37+
if incrementParams.SequenceID == 0 {
38+
incrementParams.SequenceID = insertParams.OriginatorSequenceID
39+
}
3640

3741
wg.Add(2)
3842

pkg/db/gatewayEnvelope_test.go

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,8 @@ func TestInsertAndIncrement(t *testing.T) {
6464
queries.GetPayerUnsettledUsageParams{PayerID: payerID},
6565
)
6666
require.NoError(t, err)
67-
require.Equal(t, payerSpend, int64(100))
67+
require.Equal(t, payerSpend.TotalSpendPicodollars, int64(100))
68+
require.Equal(t, payerSpend.LastSequenceID, sequenceID)
6869

6970
originatorCongestion, err := querier.SumOriginatorCongestion(
7071
ctx,
@@ -138,7 +139,8 @@ func TestInsertAndIncrementParallel(t *testing.T) {
138139
queries.GetPayerUnsettledUsageParams{PayerID: payerID},
139140
)
140141
require.NoError(t, err)
141-
require.Equal(t, payerSpend, int64(100))
142+
require.Equal(t, payerSpend.TotalSpendPicodollars, int64(100))
143+
require.Equal(t, payerSpend.LastSequenceID, sequenceID)
142144

143145
originatorCongestion, err := querier.SumOriginatorCongestion(
144146
ctx,
@@ -147,3 +149,44 @@ func TestInsertAndIncrementParallel(t *testing.T) {
147149
require.NoError(t, err)
148150
require.Equal(t, originatorCongestion, int64(1))
149151
}
152+
153+
func TestInsertAndIncrementWithOutOfOrderSequenceID(t *testing.T) {
154+
ctx := context.Background()
155+
db, _, cleanup := testutils.NewDB(t, ctx)
156+
defer cleanup()
157+
158+
querier := queries.New(db)
159+
160+
payerID := testutils.CreatePayer(t, db, testutils.RandomAddress().Hex())
161+
originatorID := testutils.RandomInt32()
162+
sequenceID := int64(10)
163+
164+
insertParams, incrementParams := buildParams(payerID, originatorID, sequenceID, 100)
165+
166+
_, err := xmtpd_db.InsertGatewayEnvelopeAndIncrementUnsettledUsage(
167+
ctx,
168+
db,
169+
insertParams,
170+
incrementParams,
171+
)
172+
require.NoError(t, err)
173+
174+
lowerSequenceID := int64(5)
175+
176+
insertParams, incrementParams = buildParams(payerID, originatorID, lowerSequenceID, 100)
177+
178+
_, err = xmtpd_db.InsertGatewayEnvelopeAndIncrementUnsettledUsage(
179+
ctx,
180+
db,
181+
insertParams,
182+
incrementParams,
183+
)
184+
require.NoError(t, err)
185+
186+
payerSpend, err := querier.GetPayerUnsettledUsage(
187+
ctx,
188+
queries.GetPayerUnsettledUsageParams{PayerID: payerID},
189+
)
190+
require.NoError(t, err)
191+
require.Equal(t, payerSpend.LastSequenceID, sequenceID)
192+
}

pkg/db/queries.sql

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -181,15 +181,17 @@ ON CONFLICT (address)
181181
id;
182182

183183
-- name: IncrementUnsettledUsage :exec
184-
INSERT INTO unsettled_usage(payer_id, originator_id, minutes_since_epoch, spend_picodollars)
185-
VALUES (@payer_id, @originator_id, @minutes_since_epoch, @spend_picodollars)
184+
INSERT INTO unsettled_usage(payer_id, originator_id, minutes_since_epoch, spend_picodollars, last_sequence_id)
185+
VALUES (@payer_id, @originator_id, @minutes_since_epoch, @spend_picodollars, @sequence_id)
186186
ON CONFLICT (payer_id, originator_id, minutes_since_epoch)
187187
DO UPDATE SET
188-
spend_picodollars = unsettled_usage.spend_picodollars + @spend_picodollars;
188+
spend_picodollars = unsettled_usage.spend_picodollars + @spend_picodollars,
189+
last_sequence_id = GREATEST(unsettled_usage.last_sequence_id, @sequence_id);
189190

190191
-- name: GetPayerUnsettledUsage :one
191192
SELECT
192-
COALESCE(SUM(spend_picodollars), 0)::BIGINT AS total_spend_picodollars
193+
COALESCE(SUM(spend_picodollars), 0)::BIGINT AS total_spend_picodollars,
194+
COALESCE(MAX(last_sequence_id), 0)::BIGINT AS last_sequence_id
193195
FROM
194196
unsettled_usage
195197
WHERE
@@ -261,3 +263,42 @@ WHERE
261263
OR minutes_since_epoch > @minutes_since_epoch_gt::BIGINT)
262264
AND (@minutes_since_epoch_lt::BIGINT = 0
263265
OR minutes_since_epoch < @minutes_since_epoch_lt::BIGINT);
266+
267+
-- name: BuildPayerReport :many
268+
SELECT
269+
payers.address as payer_address,
270+
SUM(spend_picodollars)::BIGINT AS total_spend_picodollars
271+
FROM
272+
unsettled_usage
273+
JOIN payers on payers.id = unsettled_usage.payer_id
274+
WHERE
275+
originator_id = @originator_id
276+
AND minutes_since_epoch > @start_minutes_since_epoch
277+
AND minutes_since_epoch <= @end_minutes_since_epoch
278+
GROUP BY
279+
payers.address;
280+
281+
-- name: GetGatewayEnvelopeByID :one
282+
SELECT * FROM gateway_envelopes
283+
WHERE originator_sequence_id = @originator_sequence_id
284+
-- Include the node ID to take advantage of the primary key index
285+
AND originator_node_id = @originator_node_id;
286+
287+
-- name: GetSecondNewestMinute :one
288+
WITH second_newest_minute
289+
AS
290+
(
291+
SELECT minutes_since_epoch
292+
FROM unsettled_usage
293+
WHERE originator_id = @originator_id
294+
AND unsettled_usage.minutes_since_epoch > @minimum_minutes_since_epoch
295+
GROUP BY unsettled_usage.minutes_since_epoch
296+
ORDER BY unsettled_usage.minutes_since_epoch DESC
297+
LIMIT 1
298+
OFFSET 1)
299+
SELECT coalesce(max(last_sequence_id), 0)::BIGINT as max_sequence_id,
300+
coalesce(max(unsettled_usage.minutes_since_epoch), 0)::INT as minutes_since_epoch
301+
FROM unsettled_usage
302+
JOIN second_newest_minute
303+
ON second_newest_minute.minutes_since_epoch = unsettled_usage.minutes_since_epoch
304+
WHERE unsettled_usage.originator_id = @originator_id;

pkg/db/queries/models.go

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/db/queries/queries.sql.go

Lines changed: 129 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/db/unsettledUsage_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func TestIncrementUnsettledUsage(t *testing.T) {
3434
},
3535
)
3636
require.NoError(t, err)
37-
require.Equal(t, unsettledUsage, int64(100))
37+
require.Equal(t, unsettledUsage.TotalSpendPicodollars, int64(100))
3838

3939
require.NoError(t, querier.IncrementUnsettledUsage(ctx, queries.IncrementUnsettledUsageParams{
4040
PayerID: payerId,
@@ -50,7 +50,7 @@ func TestIncrementUnsettledUsage(t *testing.T) {
5050
},
5151
)
5252
require.NoError(t, err)
53-
require.Equal(t, unsettledUsage, int64(200))
53+
require.Equal(t, unsettledUsage.TotalSpendPicodollars, int64(200))
5454
}
5555

5656
func TestGetUnsettledUsage(t *testing.T) {
@@ -86,7 +86,7 @@ func TestGetUnsettledUsage(t *testing.T) {
8686
},
8787
)
8888
require.NoError(t, err)
89-
require.Equal(t, unsettledUsage, int64(300))
89+
require.Equal(t, unsettledUsage.TotalSpendPicodollars, int64(300))
9090

9191
unsettledUsage, err = querier.GetPayerUnsettledUsage(
9292
ctx,
@@ -96,5 +96,5 @@ func TestGetUnsettledUsage(t *testing.T) {
9696
},
9797
)
9898
require.NoError(t, err)
99-
require.Equal(t, unsettledUsage, int64(500))
99+
require.Equal(t, unsettledUsage.TotalSpendPicodollars, int64(500))
100100
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
ALTER TABLE unsettled_usage DROP COLUMN last_sequence_id;
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
-- Dropping the table to add a new column with a not null constraint
2+
DROP TABLE unsettled_usage;
3+
4+
CREATE TABLE unsettled_usage(
5+
payer_id INTEGER NOT NULL,
6+
originator_id INTEGER NOT NULL,
7+
minutes_since_epoch INTEGER NOT NULL,
8+
spend_picodollars BIGINT NOT NULL,
9+
last_sequence_id BIGINT NOT NULL,
10+
PRIMARY KEY (payer_id, originator_id, minutes_since_epoch)
11+
);
12+
13+
CREATE INDEX idx_unsettled_usage_originator_id_minutes_since_epoch
14+
ON unsettled_usage(originator_id, minutes_since_epoch DESC);

0 commit comments

Comments
 (0)