Skip to content

Commit 9f8d16a

Browse files
authored
Merge pull request #85 from alitto/fix/AD/group-metrics
fix(taskgroup): failed group tasks not counted
2 parents 543ed3a + 688b02c commit 9f8d16a

File tree

2 files changed

+59
-2
lines changed

2 files changed

+59
-2
lines changed

group.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,15 +104,15 @@ func (g *abstractTaskGroup[T, E, O]) submit(task any) {
104104

105105
g.taskWaitGroup.Add(1)
106106

107-
err := g.pool.Go(func() {
107+
err := g.pool.dispatcher.Write(func() error {
108108
defer g.taskWaitGroup.Done()
109109

110110
// Check if the context has been cancelled to prevent running tasks that are not needed
111111
if err := g.future.Context().Err(); err != nil {
112112
g.futureResolver(index, &result[O]{
113113
Err: err,
114114
}, err)
115-
return
115+
return err
116116
}
117117

118118
// Invoke the task
@@ -122,6 +122,8 @@ func (g *abstractTaskGroup[T, E, O]) submit(task any) {
122122
Output: output,
123123
Err: err,
124124
}, err)
125+
126+
return err
125127
})
126128

127129
if err != nil {

group_test.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,3 +323,58 @@ func TestTaskGroupDone(t *testing.T) {
323323

324324
assert.Equal(t, int32(5), executedCount.Load())
325325
}
326+
327+
func TestTaskGroupMetrics(t *testing.T) {
328+
pool := NewPool(1)
329+
330+
group := pool.NewGroup()
331+
332+
for i := 0; i < 9; i++ {
333+
group.Submit(func() {
334+
time.Sleep(1 * time.Millisecond)
335+
})
336+
}
337+
338+
// The last task will return an error
339+
sampleErr := errors.New("sample error")
340+
group.SubmitErr(func() error {
341+
time.Sleep(1 * time.Millisecond)
342+
return sampleErr
343+
})
344+
345+
err := group.Wait()
346+
347+
time.Sleep(10 * time.Millisecond)
348+
349+
assert.Equal(t, sampleErr, err)
350+
assert.Equal(t, uint64(10), pool.SubmittedTasks())
351+
assert.Equal(t, uint64(9), pool.SuccessfulTasks())
352+
assert.Equal(t, uint64(1), pool.FailedTasks())
353+
}
354+
355+
func TestTaskGroupMetricsWithCancelledContext(t *testing.T) {
356+
pool := NewPool(1)
357+
358+
ctx, cancel := context.WithCancel(context.Background())
359+
defer cancel()
360+
361+
group := pool.NewGroupContext(ctx)
362+
363+
for i := 0; i < 10; i++ {
364+
i := i
365+
group.Submit(func() {
366+
time.Sleep(20 * time.Millisecond)
367+
if i == 4 {
368+
cancel()
369+
}
370+
})
371+
}
372+
err := group.Wait()
373+
374+
time.Sleep(10 * time.Millisecond)
375+
376+
assert.Equal(t, err, context.Canceled)
377+
assert.Equal(t, uint64(10), pool.SubmittedTasks())
378+
assert.Equal(t, uint64(5), pool.SuccessfulTasks())
379+
assert.Equal(t, uint64(5), pool.FailedTasks())
380+
}

0 commit comments

Comments
 (0)