Skip to content

Commit

Permalink
Run Member tests in parallel
Browse files Browse the repository at this point in the history
Introduce port allocator and remove unused MemberNumber.

On my local machine it brings down execution time from 5m to 32s.

Issue: etcd-io#18983
Signed-off-by: Aleksander Mistewicz <[email protected]>
  • Loading branch information
AwesomePatrol committed Dec 12, 2024
1 parent f03dee9 commit 46570f6
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 15 deletions.
6 changes: 6 additions & 0 deletions tests/common/member_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ func TestMemberList(t *testing.T) {

for _, tc := range clusterTestCases() {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
clus := testRunner.NewCluster(ctx, t, config.WithClusterConfig(tc.config))
Expand Down Expand Up @@ -113,6 +115,8 @@ func TestMemberAdd(t *testing.T) {
for _, quorumTc := range quorumTcs {
for _, clusterTc := range clusterTestCases() {
t.Run(learnerTc.name+"/"+quorumTc.name+"/"+clusterTc.name, func(t *testing.T) {
t.Parallel()

ctxTimeout := 10 * time.Second
if quorumTc.waitForQuorum {
ctxTimeout += etcdserver.HealthInterval
Expand Down Expand Up @@ -198,6 +202,8 @@ func TestMemberRemove(t *testing.T) {
continue
}
t.Run(quorumTc.name+"/"+clusterTc.name, func(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithTimeout(context.Background(), 14*time.Second)
defer cancel()
c := clusterTc.config
Expand Down
17 changes: 13 additions & 4 deletions tests/framework/e2e/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,9 +407,6 @@ func InitEtcdProcessCluster(t testing.TB, cfg *EtcdProcessClusterConfig) (*EtcdP
if cfg.Logger == nil {
cfg.Logger = zaptest.NewLogger(t)
}
if cfg.BasePort == 0 {
cfg.BasePort = EtcdProcessBasePort
}
if cfg.ServerConfig.SnapshotCount == 0 {
cfg.ServerConfig.SnapshotCount = etcdserver.DefaultSnapshotCount
}
Expand Down Expand Up @@ -518,6 +515,16 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
peer2Port := port + 3
clientHTTPPort := port + 4

var allocatedPorts []int
if cfg.BasePort == 0 {
clientPort = uniquePorts.Alloc()
peerPort = uniquePorts.Alloc()
metricsPort = uniquePorts.Alloc()
peer2Port = uniquePorts.Alloc()
clientHTTPPort = uniquePorts.Alloc()
allocatedPorts = []int{clientPort, peerPort, metricsPort, peer2Port, clientHTTPPort}
}

if cfg.Client.ConnectionType == ClientTLSAndNonTLS {
curl = clientURL(cfg.ClientScheme(), clientPort, ClientNonTLS)
curls = []string{curl, clientURL(cfg.ClientScheme(), clientPort, ClientTLS)}
Expand Down Expand Up @@ -639,7 +646,8 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
}
var gofailPort int
if cfg.GoFailEnabled {
gofailPort = (i+1)*10000 + 2381
gofailPort = uniquePorts.Alloc()
allocatedPorts = append(allocatedPorts, gofailPort)
envVars["GOFAIL_HTTP"] = fmt.Sprintf("127.0.0.1:%d", gofailPort)
}

Expand All @@ -662,6 +670,7 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
GoFailClientTimeout: cfg.GoFailClientTimeout,
Proxy: proxyCfg,
LazyFSEnabled: cfg.LazyFSEnabled,
AllocatedPorts: allocatedPorts,
}
}

Expand Down
14 changes: 10 additions & 4 deletions tests/framework/e2e/etcd_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,11 @@ type EtcdServerProcessConfig struct {

Name string

PeerURL url.URL
ClientURL string
ClientHTTPURL string
MetricsURL string
PeerURL url.URL
ClientURL string
ClientHTTPURL string
MetricsURL string
AllocatedPorts []int

InitialToken string
InitialCluster string
Expand Down Expand Up @@ -248,6 +249,11 @@ func (ep *EtcdServerProcess) Close() error {
ep.cfg.lg.Info("removing directory", zap.String("data-dir", ep.cfg.DataDirPath))
return os.RemoveAll(ep.cfg.DataDirPath)
}

for _, port := range ep.cfg.AllocatedPorts {
uniquePorts.Free(port)
}
ep.cfg.AllocatedPorts = nil
return nil
}

Expand Down
58 changes: 58 additions & 0 deletions tests/framework/e2e/port_alloc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2024 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package e2e

import "sync"

// uniquePorts is a global instance of testPorts.
var uniquePorts *testPorts

func init() {
uniquePorts = newTestPorts(11000, 19000)
}

// testPorts is used to allocate listen ports for etcd instance in tests
// in a safe way for concurrent use (i.e. running tests in parallel).
type testPorts struct {
mux sync.Mutex
unused map[int]bool
}

// newTestPorts keeps track of unused ports in the specified range.
func newTestPorts(start, end int) *testPorts {
m := make(map[int]bool, end-start)
for i := start; i < end; i++ {
m[i] = true
}
return &testPorts{unused: m}
}

// Alloc allocates a new port or panics if none is available.
func (pa *testPorts) Alloc() int {
pa.mux.Lock()
defer pa.mux.Unlock()
for port := range pa.unused {
delete(pa.unused, port)
return port
}
panic("all ports are used")
}

// Free makes port available for allocation through Alloc.
func (pa *testPorts) Free(port int) {
pa.mux.Lock()
defer pa.mux.Unlock()
pa.unused[port] = true
}
10 changes: 3 additions & 7 deletions tests/framework/integration/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,13 +260,12 @@ func (c *Cluster) ProtoMembers() []*pb.Member {
}

func (c *Cluster) mustNewMember(t testutil.TB) *Member {
memberNumber := c.LastMemberNum
uniqueNumber := atomic.AddInt32(&UniqueNumber, 1)*10 + int32(c.LastMemberNum)
c.LastMemberNum++

m := MustNewMember(t,
MemberConfig{
Name: fmt.Sprintf("m%v", memberNumber),
MemberNumber: memberNumber,
Name: fmt.Sprintf("m%v", uniqueNumber),
AuthToken: c.Cfg.AuthToken,
PeerTLS: c.Cfg.PeerTLS,
ClientTLS: c.Cfg.ClientTLS,
Expand Down Expand Up @@ -549,7 +548,6 @@ func NewListenerWithAddr(t testutil.TB, addr string) net.Listener {
type Member struct {
config.ServerConfig
UniqNumber int
MemberNumber int
Port string
PeerListeners, ClientListeners []net.Listener
GRPCListener net.Listener
Expand Down Expand Up @@ -591,7 +589,6 @@ type Member struct {
type MemberConfig struct {
Name string
UniqNumber int64
MemberNumber int
PeerTLS *transport.TLSInfo
ClientTLS *transport.TLSInfo
AuthToken string
Expand Down Expand Up @@ -624,8 +621,7 @@ type MemberConfig struct {
func MustNewMember(t testutil.TB, mcfg MemberConfig) *Member {
var err error
m := &Member{
MemberNumber: mcfg.MemberNumber,
UniqNumber: int(atomic.AddInt32(&UniqueCount, 1)),
UniqNumber: int(atomic.AddInt32(&UniqueCount, 1)),
}

peerScheme := SchemeFromTLSInfo(mcfg.PeerTLS)
Expand Down

0 comments on commit 46570f6

Please sign in to comment.