Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests/robustness: cleanup errgroup usage #17858

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
25 changes: 13 additions & 12 deletions tests/robustness/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ package robustness

import (
"context"
"sync"
"testing"
"time"

"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"golang.org/x/sync/errgroup"

"go.etcd.io/etcd/tests/v3/framework"
"go.etcd.io/etcd/tests/v3/framework/e2e"
Expand Down Expand Up @@ -108,15 +108,17 @@ func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, s testSce
func (s testScenario) run(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) (reports []report.ClientReport) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
g := errgroup.Group{}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we don't use errors, but usage of errgroup is more concise.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am okay with keeping it in as well! I usually just prefer not using things in exp if the language or stdlib can handle it.

wg := sync.WaitGroup{}
var operationReport, watchReport []report.ClientReport
failpointInjected := make(chan failpoint.InjectionReport, 1)

// using baseTime time-measuring operation to get monotonic clock reading
// see https://github.com/golang/go/blob/master/src/time/time.go#L17
baseTime := time.Now()
ids := identity.NewIDProvider()
g.Go(func() error {
wg.Add(3)
go func() {
defer wg.Done()
defer close(failpointInjected)
// Give some time for traffic to reach qps target before injecting failpoint.
time.Sleep(time.Second)
Expand All @@ -130,22 +132,21 @@ func (s testScenario) run(ctx context.Context, t *testing.T, lg *zap.Logger, clu
if fr != nil {
failpointInjected <- *fr
}
return nil
})
}()
maxRevisionChan := make(chan int64, 1)
g.Go(func() error {
go func() {
defer wg.Done()
defer close(maxRevisionChan)
operationReport = traffic.SimulateTraffic(ctx, t, lg, clus, s.profile, s.traffic, failpointInjected, baseTime, ids)
maxRevision := operationsMaxRevision(operationReport)
maxRevisionChan <- maxRevision
lg.Info("Finished simulating traffic", zap.Int64("max-revision", maxRevision))
return nil
})
g.Go(func() error {
}()
go func() {
defer wg.Done()
watchReport = collectClusterWatchEvents(ctx, t, clus, maxRevisionChan, s.watch, baseTime, ids)
return nil
})
g.Wait()
}()
wg.Wait()
return append(operationReport, watchReport...)
}

Expand Down
25 changes: 14 additions & 11 deletions tests/robustness/traffic/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"math/rand"
"sync"

"golang.org/x/sync/errgroup"
"golang.org/x/time/rate"

"go.etcd.io/etcd/api/v3/mvccpb"
Expand Down Expand Up @@ -62,16 +61,18 @@ func (t kubernetesTraffic) Run(ctx context.Context, c *RecordingClient, limiter
kc := &kubernetesClient{client: c}
s := newStorage()
keyPrefix := "/registry/" + t.resource + "/"
g := errgroup.Group{}
wg := sync.WaitGroup{}
readLimit := t.averageKeyCount

g.Go(func() error {
wg.Add(2)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return ctx.Err()
return
Comment on lines -72 to +73
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not returning an error here since we check for it anyway higher up in the call stack:

case <-ctx.Done():

Also, this way it is more consistent with how the etcdTraffic implementation handles this.

case <-finish:
return nil
return
default:
}
rev, err := t.Read(ctx, kc, s, limiter, keyPrefix, readLimit)
Expand All @@ -80,15 +81,16 @@ func (t kubernetesTraffic) Run(ctx context.Context, c *RecordingClient, limiter
}
t.Watch(ctx, kc, s, limiter, keyPrefix, rev+1)
}
})
g.Go(func() error {
}()
go func() {
defer wg.Done()
lastWriteFailed := false
for {
select {
case <-ctx.Done():
return ctx.Err()
return
case <-finish:
return nil
return
default:
}
// Avoid multiple failed writes in a row
Expand All @@ -104,8 +106,9 @@ func (t kubernetesTraffic) Run(ctx context.Context, c *RecordingClient, limiter
continue
}
}
})
g.Wait()
}()

wg.Wait()
}

func (t kubernetesTraffic) Read(ctx context.Context, kc *kubernetesClient, s *storage, limiter *rate.Limiter, keyPrefix string, limit int) (rev int64, err error) {
Expand Down
22 changes: 12 additions & 10 deletions tests/robustness/traffic/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,40 +15,42 @@
package traffic

import (
"sync"
"sync/atomic"
"testing"

"github.com/stretchr/testify/assert"
"golang.org/x/sync/errgroup"
)

func TestLimiter(t *testing.T) {
limiter := NewConcurrencyLimiter(3)
counter := &atomic.Int64{}
g := errgroup.Group{}
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
g.Go(func() error {
wg.Add(1)
go func() {
defer wg.Done()
if limiter.Take() {
counter.Add(1)
}
return nil
})
}()
}
g.Wait()
wg.Wait()
assert.Equal(t, 3, int(counter.Load()))
assert.False(t, limiter.Take())

limiter.Return()
counter.Store(0)
for i := 0; i < 10; i++ {
g.Go(func() error {
wg.Add(1)
go func() {
defer wg.Done()
if limiter.Take() {
counter.Add(1)
}
return nil
})
}()
}
g.Wait()
wg.Wait()
assert.Equal(t, 1, int(counter.Load()))
assert.False(t, limiter.Take())

Expand Down