Skip to content

Commit

Permalink
Merge pull request #17889 from serathius/robustness-operations-failpo…
Browse files Browse the repository at this point in the history
…ints

Robustness operations failpoints
  • Loading branch information
serathius authored May 8, 2024
2 parents 570370f + c4e3b61 commit bb398a0
Show file tree
Hide file tree
Showing 13 changed files with 193 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package traffic
package client

import (
"context"
Expand All @@ -33,7 +33,7 @@ import (
// clientv3.Client) that records all the requests and responses made. Doesn't
// allow for concurrent requests to confirm to model.AppendableHistory requirements.
type RecordingClient struct {
id int
ID int
client clientv3.Client
// using baseTime time-measuring operation to get monotonic clock reading
// see https://github.com/golang/go/blob/master/src/time/time.go#L17
Expand All @@ -51,7 +51,7 @@ type TimedWatchEvent struct {
Time time.Duration
}

func NewClient(endpoints []string, ids identity.Provider, baseTime time.Time) (*RecordingClient, error) {
func NewRecordingClient(endpoints []string, ids identity.Provider, baseTime time.Time) (*RecordingClient, error) {
cc, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
Logger: zap.NewNop(),
Expand All @@ -62,7 +62,7 @@ func NewClient(endpoints []string, ids identity.Provider, baseTime time.Time) (*
return nil, err
}
return &RecordingClient{
id: ids.NewClientID(),
ID: ids.NewClientID(),
client: *cc,
kvOperations: model.NewAppendableHistory(ids),
baseTime: baseTime,
Expand All @@ -75,7 +75,7 @@ func (c *RecordingClient) Close() error {

func (c *RecordingClient) Report() report.ClientReport {
return report.ClientReport{
ClientID: c.id,
ClientID: c.ID,
KeyValue: c.kvOperations.History.Operations(),
Watch: c.watchOperations,
}
Expand Down Expand Up @@ -190,6 +190,65 @@ func (c *RecordingClient) Defragment(ctx context.Context) (*clientv3.DefragmentR
return resp, err
}

func (c *RecordingClient) Compact(ctx context.Context, rev int64) (*clientv3.CompactResponse, error) {
c.kvMux.Lock()
defer c.kvMux.Unlock()
resp, err := c.client.Compact(ctx, rev)
return resp, err
}
func (c *RecordingClient) MemberList(ctx context.Context, opts ...clientv3.OpOption) (*clientv3.MemberListResponse, error) {
c.kvMux.Lock()
defer c.kvMux.Unlock()
resp, err := c.client.MemberList(ctx, opts...)
return resp, err
}

func (c *RecordingClient) MemberAdd(ctx context.Context, peerAddrs []string) (*clientv3.MemberAddResponse, error) {
c.kvMux.Lock()
defer c.kvMux.Unlock()
resp, err := c.client.MemberAdd(ctx, peerAddrs)
return resp, err
}

func (c *RecordingClient) MemberAddAsLearner(ctx context.Context, peerAddrs []string) (*clientv3.MemberAddResponse, error) {
c.kvMux.Lock()
defer c.kvMux.Unlock()
resp, err := c.client.MemberAddAsLearner(ctx, peerAddrs)
return resp, err
}

func (c *RecordingClient) MemberRemove(ctx context.Context, id uint64) (*clientv3.MemberRemoveResponse, error) {
c.kvMux.Lock()
defer c.kvMux.Unlock()
resp, err := c.client.MemberRemove(ctx, id)
return resp, err
}

func (c *RecordingClient) MemberUpdate(ctx context.Context, id uint64, peerAddrs []string) (*clientv3.MemberUpdateResponse, error) {
c.kvMux.Lock()
defer c.kvMux.Unlock()
resp, err := c.client.MemberUpdate(ctx, id, peerAddrs)
return resp, err
}

func (c *RecordingClient) MemberPromote(ctx context.Context, id uint64) (*clientv3.MemberPromoteResponse, error) {
c.kvMux.Lock()
defer c.kvMux.Unlock()
resp, err := c.client.MemberPromote(ctx, id)
return resp, err
}

func (c *RecordingClient) Status(ctx context.Context, endpoint string) (*clientv3.StatusResponse, error) {
c.kvMux.Lock()
defer c.kvMux.Unlock()
resp, err := c.client.Status(ctx, endpoint)
return resp, err
}

func (c *RecordingClient) Endpoints() []string {
return c.client.Endpoints()
}

func (c *RecordingClient) Watch(ctx context.Context, key string, rev int64, withPrefix bool, withProgressNotify bool, withPrevKV bool) clientv3.WatchChan {
request := model.WatchRequest{
Key: key,
Expand Down
36 changes: 17 additions & 19 deletions tests/robustness/failpoint/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ import (
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/tests/v3/framework/e2e"
"go.etcd.io/etcd/tests/v3/robustness/client"
"go.etcd.io/etcd/tests/v3/robustness/identity"
"go.etcd.io/etcd/tests/v3/robustness/report"
)

var (
Expand All @@ -36,26 +39,21 @@ var (

type memberReplace struct{}

func (f memberReplace) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error {
func (f memberReplace) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, baseTime time.Time, ids identity.Provider) ([]report.ClientReport, error) {
memberID := uint64(rand.Int() % len(clus.Procs))
member := clus.Procs[memberID]
var endpoints []string
for i := 1; i < len(clus.Procs); i++ {
endpoints = append(endpoints, clus.Procs[(int(memberID)+i)%len(clus.Procs)].EndpointsGRPC()...)
}
cc, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
Logger: zap.NewNop(),
DialKeepAliveTime: 50 * time.Second,
DialKeepAliveTimeout: 100 * time.Millisecond,
})
cc, err := client.NewRecordingClient(endpoints, ids, baseTime)
if err != nil {
return err
return nil, err
}
defer cc.Close()
memberID, found, err := getID(ctx, cc, member.Config().Name)
if err != nil {
return err
return nil, err
}
if !found {
t.Fatal("Member not found")
Expand All @@ -65,11 +63,11 @@ func (f memberReplace) Inject(ctx context.Context, t *testing.T, lg *zap.Logger,
lg.Info("Removing member", zap.String("member", member.Config().Name))
_, err = cc.MemberRemove(ctx, memberID)
if err != nil {
return err
return nil, err
}
_, found, err = getID(ctx, cc, member.Config().Name)
if err != nil {
return err
return nil, err
}
if found {
t.Fatal("Expected member to be removed")
Expand All @@ -83,21 +81,21 @@ func (f memberReplace) Inject(ctx context.Context, t *testing.T, lg *zap.Logger,
err = member.Wait(ctx)
if err != nil && !strings.Contains(err.Error(), "unexpected exit code") {
lg.Info("Failed to kill the process", zap.Error(err))
return fmt.Errorf("failed to kill the process within %s, err: %w", triggerTimeout, err)
return nil, fmt.Errorf("failed to kill the process within %s, err: %w", triggerTimeout, err)
}
}
lg.Info("Removing member data", zap.String("member", member.Config().Name))
err = os.RemoveAll(member.Config().DataDirPath)
if err != nil {
return err
return nil, err
}

lg.Info("Adding member back", zap.String("member", member.Config().Name))
removedMemberPeerURL := member.Config().PeerURL.String()
for {
select {
case <-ctx.Done():
return ctx.Err()
return nil, ctx.Err()
default:
}
reqCtx, cancel := context.WithTimeout(ctx, time.Second)
Expand All @@ -109,17 +107,17 @@ func (f memberReplace) Inject(ctx context.Context, t *testing.T, lg *zap.Logger,
}
err = patchArgs(member.Config().Args, "initial-cluster-state", "existing")
if err != nil {
return err
return nil, err
}
lg.Info("Starting member", zap.String("member", member.Config().Name))
err = member.Start(ctx)
if err != nil {
return err
return nil, err
}
for {
select {
case <-ctx.Done():
return ctx.Err()
return nil, ctx.Err()
default:
}
_, found, err := getID(ctx, cc, member.Config().Name)
Expand All @@ -130,7 +128,7 @@ func (f memberReplace) Inject(ctx context.Context, t *testing.T, lg *zap.Logger,
break
}
}
return nil
return nil, nil
}

func (f memberReplace) Name() string {
Expand All @@ -141,7 +139,7 @@ func (f memberReplace) Available(config e2e.EtcdProcessClusterConfig, _ e2e.Etcd
return config.ClusterSize > 1
}

func getID(ctx context.Context, cc *clientv3.Client, name string) (id uint64, found bool, err error) {
func getID(ctx context.Context, cc clientv3.Cluster, name string) (id uint64, found bool, err error) {
resp, err := cc.MemberList(ctx)
if err != nil {
return 0, false, err
Expand Down
26 changes: 18 additions & 8 deletions tests/robustness/failpoint/failpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (

clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/tests/v3/framework/e2e"
"go.etcd.io/etcd/tests/v3/robustness/identity"
"go.etcd.io/etcd/tests/v3/robustness/report"
)

const (
Expand Down Expand Up @@ -75,7 +77,7 @@ func Validate(clus *e2e.EtcdProcessCluster, failpoint Failpoint) error {
return nil
}

func Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, failpoint Failpoint, baseTime time.Time) (*InjectionReport, error) {
func Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, failpoint Failpoint, baseTime time.Time, ids identity.Provider) (*FailpointReport, error) {
ctx, cancel := context.WithTimeout(ctx, triggerTimeout)
defer cancel()
var err error
Expand All @@ -85,7 +87,7 @@ func Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdPro
}
lg.Info("Triggering failpoint", zap.String("failpoint", failpoint.Name()))
start := time.Since(baseTime)
err = failpoint.Inject(ctx, t, lg, clus)
clientReport, err := failpoint.Inject(ctx, t, lg, clus, baseTime, ids)
if err != nil {
lg.Error("Failed to trigger failpoint", zap.String("failpoint", failpoint.Name()), zap.Error(err))
return nil, fmt.Errorf("failed triggering failpoint, err: %v", err)
Expand All @@ -96,14 +98,22 @@ func Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdPro
lg.Info("Finished triggering failpoint", zap.String("failpoint", failpoint.Name()))
end := time.Since(baseTime)

return &InjectionReport{
Start: start,
End: end,
Name: failpoint.Name(),
return &FailpointReport{
Injection: Injection{
Start: start,
End: end,
Name: failpoint.Name(),
},
Client: clientReport,
}, nil
}

type InjectionReport struct {
type FailpointReport struct {
Injection
Client []report.ClientReport
}

type Injection struct {
Start, End time.Duration
Name string
}
Expand Down Expand Up @@ -137,7 +147,7 @@ func verifyClusterHealth(ctx context.Context, _ *testing.T, clus *e2e.EtcdProces
}

type Failpoint interface {
Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error
Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, baseTime time.Time, ids identity.Provider) ([]report.ClientReport, error)
Name() string
AvailabilityChecker
}
Expand Down
Loading

0 comments on commit bb398a0

Please sign in to comment.