Skip to content

Commit e7d8c38

Browse files
committed
Fix bug where tasks weren't deleted after exhausted retries / SkipRetry.
1 parent a094680 commit e7d8c38

File tree

2 files changed

+92
-2
lines changed

2 files changed

+92
-2
lines changed

nanoq.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -350,9 +350,9 @@ func (p *Processor) process(ctx context.Context) error {
350350
if err := p.client.UpdateTask(ctx, tx, t); err != nil {
351351
return fmt.Errorf("update task %v: %w", t.ID, err)
352352
}
353-
}
354353

355-
return nil
354+
return nil
355+
}
356356
}
357357

358358
if err := p.client.DeleteTask(ctx, tx, t); err != nil {

nanoq_test.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package nanoq_test
33
import (
44
"context"
55
"errors"
6+
"fmt"
67
"slices"
78
"testing"
89
"time"
@@ -184,3 +185,92 @@ func TestProcessor_Run(t *testing.T) {
184185
t.Errorf("erorr handler called %v times instead of %v", errorHandlerCalled, 1)
185186
}
186187
}
188+
189+
func TestProcessor_Run_RetriesExhausted(t *testing.T) {
190+
db, mock, _ := sqlmock.New()
191+
defer db.Close()
192+
client := nanoq.NewClient(sqlx.NewDb(db, "sqlmock"))
193+
processor := nanoq.NewProcessor(client, zerolog.Nop())
194+
processor.Handle("my-type", func(ctx context.Context, task nanoq.Task) error {
195+
return errors.New("temporary error")
196+
})
197+
errorHandlerCalled := 0
198+
processor.OnError(func(ctx context.Context, task nanoq.Task, err error) {
199+
errorHandlerCalled++
200+
})
201+
202+
// First task claim and retry.
203+
mock.ExpectBegin()
204+
rows := sqlmock.NewRows([]string{"id", "fingerprint", "type", "payload", "retries", "max_retries", "created_at", "scheduled_at"}).
205+
AddRow("01HQJHTZCAT5WDCGVTWJ640VMM", "25c084d0", "my-type", "{}", "0", "1", time.Now(), time.Now())
206+
mock.ExpectQuery(`SELECT \* FROM tasks WHERE(.+)`).WillReturnRows(rows)
207+
208+
mock.ExpectExec("UPDATE tasks SET retries = (.+), scheduled_at = (.+) WHERE id = (.+)").WithArgs(1, sqlmock.AnyArg(), "01HQJHTZCAT5WDCGVTWJ640VMM").
209+
WillReturnResult(sqlmock.NewResult(0, 1))
210+
mock.ExpectCommit()
211+
212+
// Second task claim and deletion (due to exhausted retries).
213+
mock.ExpectBegin()
214+
rows = sqlmock.NewRows([]string{"id", "fingerprint", "type", "payload", "retries", "max_retries", "created_at", "scheduled_at"}).
215+
AddRow("01HQJHTZCAT5WDCGVTWJ640VMM", "25c084d0", "my-type", "{}", "1", "1", time.Now(), time.Now())
216+
mock.ExpectQuery(`SELECT \* FROM tasks WHERE(.+)`).WillReturnRows(rows)
217+
218+
mock.ExpectExec("DELETE FROM tasks WHERE id = (.+)").WithArgs("01HQJHTZCAT5WDCGVTWJ640VMM").
219+
WillReturnResult(sqlmock.NewResult(0, 1))
220+
mock.ExpectCommit()
221+
222+
ctx, cancel := context.WithCancel(context.Background())
223+
go processor.Run(ctx, 1, 1*time.Second)
224+
time.Sleep(1 * time.Second)
225+
cancel()
226+
227+
err := mock.ExpectationsWereMet()
228+
if err != nil {
229+
t.Error(err)
230+
}
231+
232+
if errorHandlerCalled != 2 {
233+
t.Errorf("erorr handler called %v times instead of %v", errorHandlerCalled, 2)
234+
}
235+
}
236+
237+
func TestProcessor_Run_SkipRetry(t *testing.T) {
238+
db, mock, _ := sqlmock.New()
239+
defer db.Close()
240+
client := nanoq.NewClient(sqlx.NewDb(db, "sqlmock"))
241+
processor := nanoq.NewProcessor(client, zerolog.Nop())
242+
processor.Handle("my-type", func(ctx context.Context, task nanoq.Task) error {
243+
return fmt.Errorf("something terrible happened: %w", nanoq.ErrSkipRetry)
244+
})
245+
errorHandlerCalled := 0
246+
processor.OnError(func(ctx context.Context, task nanoq.Task, err error) {
247+
if !errors.Is(err, nanoq.ErrSkipRetry) {
248+
t.Errorf("error handler called with unexpected error: %v", err)
249+
}
250+
errorHandlerCalled++
251+
})
252+
253+
// Task claim and deletion.
254+
mock.ExpectBegin()
255+
rows := sqlmock.NewRows([]string{"id", "fingerprint", "type", "payload", "retries", "max_retries", "created_at", "scheduled_at"}).
256+
AddRow("01HQJHTZCAT5WDCGVTWJ640VMM", "25c084d0", "my-type", "{}", "0", "1", time.Now(), time.Now())
257+
mock.ExpectQuery(`SELECT \* FROM tasks WHERE(.+)`).WillReturnRows(rows)
258+
259+
mock.ExpectExec("DELETE FROM tasks WHERE id = (.+)").WithArgs("01HQJHTZCAT5WDCGVTWJ640VMM").
260+
WillReturnResult(sqlmock.NewResult(0, 1))
261+
mock.ExpectCommit()
262+
263+
ctx, cancel := context.WithCancel(context.Background())
264+
go processor.Run(ctx, 1, 1*time.Second)
265+
time.Sleep(1 * time.Second)
266+
cancel()
267+
268+
err := mock.ExpectationsWereMet()
269+
if err != nil {
270+
t.Error(err)
271+
}
272+
273+
if errorHandlerCalled != 1 {
274+
t.Errorf("erorr handler called %v times instead of %v", errorHandlerCalled, 1)
275+
}
276+
}

0 commit comments

Comments
 (0)