From 5959110f4a44ffcfc18ebbcd465ecae06c04b7a5 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Sun, 21 Apr 2024 15:08:05 +0200 Subject: [PATCH] Implement Compaction support in robustness test Signed-off-by: Marek Siarkowicz --- tests/robustness/client/client.go | 4 + tests/robustness/failpoint/trigger.go | 14 +++- tests/robustness/model/describe.go | 7 ++ tests/robustness/model/deterministic.go | 48 +++++++++--- tests/robustness/model/history.go | 27 +++++++ .../options/server_config_options.go | 6 ++ tests/robustness/report/wal.go | 6 +- tests/robustness/scenarios.go | 3 +- tests/robustness/traffic/etcd.go | 15 +++- tests/robustness/traffic/kubernetes.go | 17 +++- tests/robustness/validate/patch_history.go | 2 + tests/robustness/validate/validate_test.go | 8 +- tests/robustness/watch.go | 78 +++++++++++-------- 13 files changed, 178 insertions(+), 57 deletions(-) diff --git a/tests/robustness/client/client.go b/tests/robustness/client/client.go index bd3ccb5b9f3..516e5d5df8d 100644 --- a/tests/robustness/client/client.go +++ b/tests/robustness/client/client.go @@ -193,9 +193,13 @@ func (c *RecordingClient) Defragment(ctx context.Context) (*clientv3.DefragmentR func (c *RecordingClient) Compact(ctx context.Context, rev int64) (*clientv3.CompactResponse, error) { c.kvMux.Lock() defer c.kvMux.Unlock() + callTime := time.Since(c.baseTime) resp, err := c.client.Compact(ctx, rev) + returnTime := time.Since(c.baseTime) + c.kvOperations.AppendCompact(rev, callTime, returnTime, resp, err) return resp, err } + func (c *RecordingClient) MemberList(ctx context.Context, opts ...clientv3.OpOption) (*clientv3.MemberListResponse, error) { c.kvMux.Lock() defer c.kvMux.Unlock() diff --git a/tests/robustness/failpoint/trigger.go b/tests/robustness/failpoint/trigger.go index a099ca312e5..1c1702d24a2 100644 --- a/tests/robustness/failpoint/trigger.go +++ b/tests/robustness/failpoint/trigger.go @@ -78,12 +78,22 @@ func (t triggerCompact) Trigger(ctx context.Context, _ *testing.T, member e2e.Et } _, err = cc.Compact(ctx, rev) if err != nil && !connectionError(err) { - return nil, err + return nil, fmt.Errorf("failed to compact: %w", err) } return []report.ClientReport{cc.Report()}, nil } -func (t triggerCompact) Available(e2e.EtcdProcessClusterConfig, e2e.EtcdProcess) bool { +func (t triggerCompact) Available(config e2e.EtcdProcessClusterConfig, _ e2e.EtcdProcess) bool { + // Since introduction of compaction into traffic, injecting compaction failpoints started interfeering with peer proxy. + // TODO: Re-enable the peer proxy for compact failpoints when we confirm the root cause. + if config.PeerProxy { + return false + } + // For multiBatchCompaction we need to guarantee that there are enough revisions between two compaction requests. + // With addition of compaction requests to traffic this might be hard if experimental-compaction-batch-limit is too high. + if t.multiBatchCompaction { + return config.ServerConfig.ExperimentalCompactionBatchLimit <= 10 + } return true } diff --git a/tests/robustness/model/describe.go b/tests/robustness/model/describe.go index 42c23ddd53e..a889b4d2b1b 100644 --- a/tests/robustness/model/describe.go +++ b/tests/robustness/model/describe.go @@ -25,6 +25,9 @@ func describeEtcdResponse(request EtcdRequest, response MaybeEtcdResponse) strin if response.Error != "" { return fmt.Sprintf("err: %q", response.Error) } + if response.ClientError != "" { + return fmt.Sprintf("err: %q", response.ClientError) + } if response.PartialResponse { return fmt.Sprintf("unknown, rev: %d", response.Revision) } @@ -38,6 +41,8 @@ func describeEtcdResponse(request EtcdRequest, response MaybeEtcdResponse) strin return "ok" } return fmt.Sprintf("ok, rev: %d", response.Revision) + case Compact: + return "ok" default: return fmt.Sprintf("", request.Type) } @@ -67,6 +72,8 @@ func describeEtcdRequest(request EtcdRequest) string { return fmt.Sprintf("leaseRevoke(%d)", request.LeaseRevoke.LeaseID) case Defragment: return fmt.Sprintf("defragment()") + case Compact: + return fmt.Sprintf("compact(%d)", request.Compact.Revision) default: return fmt.Sprintf("", request.Type) } diff --git a/tests/robustness/model/deterministic.go b/tests/robustness/model/deterministic.go index 57da61372a3..49111f66835 100644 --- a/tests/robustness/model/deterministic.go +++ b/tests/robustness/model/deterministic.go @@ -24,6 +24,8 @@ import ( "sort" "github.com/anishathalye/porcupine" + + "go.etcd.io/etcd/server/v3/storage/mvcc" ) // DeterministicModel assumes a deterministic execution of etcd requests. All @@ -65,10 +67,11 @@ var DeterministicModel = porcupine.Model{ } type EtcdState struct { - Revision int64 - KeyValues map[string]ValueRevision - KeyLeases map[string]int64 - Leases map[int64]EtcdLease + Revision int64 + CompactRevision int64 + KeyValues map[string]ValueRevision + KeyLeases map[string]int64 + Leases map[int64]EtcdLease } func (s EtcdState) apply(request EtcdRequest, response EtcdResponse) (bool, EtcdState) { @@ -77,7 +80,10 @@ func (s EtcdState) apply(request EtcdRequest, response EtcdResponse) (bool, Etcd } func (s EtcdState) DeepCopy() EtcdState { - newState := EtcdState{Revision: s.Revision} + newState := EtcdState{ + Revision: s.Revision, + CompactRevision: s.CompactRevision, + } newState.KeyValues = maps.Clone(s.KeyValues) newState.KeyLeases = maps.Clone(s.KeyLeases) @@ -92,10 +98,12 @@ func (s EtcdState) DeepCopy() EtcdState { func freshEtcdState() EtcdState { return EtcdState{ - Revision: 1, - KeyValues: map[string]ValueRevision{}, - KeyLeases: map[string]int64{}, - Leases: map[int64]EtcdLease{}, + Revision: 1, + // Start from CompactRevision equal -1 as etcd allows client to compact revision 0 for some reason. + CompactRevision: -1, + KeyValues: map[string]ValueRevision{}, + KeyLeases: map[string]int64{}, + Leases: map[int64]EtcdLease{}, } } @@ -112,6 +120,9 @@ func (s EtcdState) Step(request EtcdRequest) (EtcdState, MaybeEtcdResponse) { if request.Range.Revision > newState.Revision { return newState, MaybeEtcdResponse{Error: ErrEtcdFutureRev.Error()} } + if request.Range.Revision < newState.CompactRevision { + return newState, MaybeEtcdResponse{EtcdResponse: EtcdResponse{ClientError: mvcc.ErrCompacted.Error()}} + } return newState, MaybeEtcdResponse{PartialResponse: true, EtcdResponse: EtcdResponse{Revision: newState.Revision}} case Txn: failure := false @@ -190,6 +201,14 @@ func (s EtcdState) Step(request EtcdRequest) (EtcdState, MaybeEtcdResponse) { return newState, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Revision: newState.Revision, LeaseRevoke: &LeaseRevokeResponse{}}} case Defragment: return newState, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Defragment: &DefragmentResponse{}, Revision: newState.Revision}} + case Compact: + if request.Compact.Revision <= newState.CompactRevision { + return newState, MaybeEtcdResponse{EtcdResponse: EtcdResponse{ClientError: mvcc.ErrCompacted.Error()}} + } + newState.CompactRevision = request.Compact.Revision + // Set fake revision as compaction returns non-linearizable revision. + // TODO: Model non-linearizable response revision in model. + return newState, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Compact: &CompactResponse{}, Revision: -1}} default: panic(fmt.Sprintf("Unknown request type: %v", request.Type)) } @@ -249,6 +268,7 @@ const ( LeaseGrant RequestType = "leaseGrant" LeaseRevoke RequestType = "leaseRevoke" Defragment RequestType = "defragment" + Compact RequestType = "compact" ) type EtcdRequest struct { @@ -258,6 +278,7 @@ type EtcdRequest struct { Range *RangeRequest Txn *TxnRequest Defragment *DefragmentRequest + Compact *CompactRequest } func (r *EtcdRequest) IsRead() bool { @@ -349,6 +370,8 @@ type EtcdResponse struct { LeaseGrant *LeaseGrantReponse LeaseRevoke *LeaseRevokeResponse Defragment *DefragmentResponse + Compact *CompactResponse + ClientError string Revision int64 } @@ -417,3 +440,10 @@ func ToValueOrHash(value string) ValueOrHash { } return v } + +type CompactResponse struct { +} + +type CompactRequest struct { + Revision int64 +} diff --git a/tests/robustness/model/history.go b/tests/robustness/model/history.go index edf2c778f05..c64e1a73aa9 100644 --- a/tests/robustness/model/history.go +++ b/tests/robustness/model/history.go @@ -16,6 +16,7 @@ package model import ( "fmt" + "strings" "time" "github.com/anishathalye/porcupine" @@ -23,6 +24,7 @@ import ( "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/server/v3/storage/mvcc" "go.etcd.io/etcd/tests/v3/robustness/identity" ) @@ -259,6 +261,23 @@ func (h *AppendableHistory) AppendDefragment(start, end time.Duration, resp *cli h.appendSuccessful(request, start, end, defragmentResponse(revision)) } +func (h *AppendableHistory) AppendCompact(rev int64, start, end time.Duration, resp *clientv3.CompactResponse, err error) { + request := compactRequest(rev) + if err != nil { + if strings.Contains(err.Error(), mvcc.ErrCompacted.Error()) { + h.appendSuccessful(request, start, end, MaybeEtcdResponse{ + EtcdResponse: EtcdResponse{ClientError: mvcc.ErrCompacted.Error()}, + }) + return + } + h.appendFailed(request, start, end, err) + return + } + // Set fake revision as compaction returns non-linearizable revision. + // TODO: Model non-linearizable response revision in model. + h.appendSuccessful(request, start, end, compactResponse(-1)) +} + func (h *AppendableHistory) appendFailed(request EtcdRequest, start, end time.Duration, err error) { op := porcupine.Operation{ ClientId: h.streamID, @@ -444,6 +463,14 @@ func defragmentResponse(revision int64) MaybeEtcdResponse { return MaybeEtcdResponse{EtcdResponse: EtcdResponse{Defragment: &DefragmentResponse{}, Revision: revision}} } +func compactRequest(rev int64) EtcdRequest { + return EtcdRequest{Type: Compact, Compact: &CompactRequest{Revision: rev}} +} + +func compactResponse(revision int64) MaybeEtcdResponse { + return MaybeEtcdResponse{EtcdResponse: EtcdResponse{Compact: &CompactResponse{}, Revision: revision}} +} + type History struct { operations []porcupine.Operation } diff --git a/tests/robustness/options/server_config_options.go b/tests/robustness/options/server_config_options.go index e955784bf62..a0502a1f7dc 100644 --- a/tests/robustness/options/server_config_options.go +++ b/tests/robustness/options/server_config_options.go @@ -26,6 +26,12 @@ func WithSnapshotCount(input ...uint64) e2e.EPClusterOption { } } +func WithCompactionBatchLimit(input ...int) e2e.EPClusterOption { + return func(c *e2e.EtcdProcessClusterConfig) { + c.ServerConfig.ExperimentalCompactionBatchLimit = input[internalRand.Intn(len(input))] + } +} + func WithSnapshotCatchUpEntries(input ...uint64) e2e.EPClusterOption { return func(c *e2e.EtcdProcessClusterConfig) { c.ServerConfig.SnapshotCatchUpEntries = input[internalRand.Intn(len(input))] diff --git a/tests/robustness/report/wal.go b/tests/robustness/report/wal.go index 9ef212332b6..5ba0cb37059 100644 --- a/tests/robustness/report/wal.go +++ b/tests/robustness/report/wal.go @@ -184,7 +184,11 @@ func parseEntryNormal(ent raftpb.Entry) (*model.EtcdRequest, error) { case raftReq.ClusterVersionSet != nil: return nil, nil case raftReq.Compaction != nil: - return nil, nil + request := model.EtcdRequest{ + Type: model.Compact, + Compact: &model.CompactRequest{Revision: raftReq.Compaction.Revision}, + } + return &request, nil case raftReq.Txn != nil: txn := model.TxnRequest{ Conditions: []model.EtcdCondition{}, diff --git a/tests/robustness/scenarios.go b/tests/robustness/scenarios.go index cca05dea49c..e94eab7ae80 100644 --- a/tests/robustness/scenarios.go +++ b/tests/robustness/scenarios.go @@ -91,7 +91,8 @@ func exploratoryScenarios(_ *testing.T) []testScenario { options.WithSnapshotCount(50, 100, 1000), options.WithSubsetOptions(randomizableOptions...), e2e.WithGoFailEnabled(true), - e2e.WithCompactionBatchLimit(100), + // Set low minimal compaction batch limit to allow for triggering multi batch compaction failpoints. + options.WithCompactionBatchLimit(10, 100, 1000), e2e.WithWatchProcessNotifyInterval(100 * time.Millisecond), } diff --git a/tests/robustness/traffic/etcd.go b/tests/robustness/traffic/etcd.go index 44ad21ddf99..a264341b0d9 100644 --- a/tests/robustness/traffic/etcd.go +++ b/tests/robustness/traffic/etcd.go @@ -38,13 +38,14 @@ var ( {choice: List, weight: 15}, {choice: StaleGet, weight: 10}, {choice: StaleList, weight: 10}, - {choice: Put, weight: 23}, - {choice: LargePut, weight: 2}, {choice: Delete, weight: 5}, {choice: MultiOpTxn, weight: 5}, {choice: PutWithLease, weight: 5}, {choice: LeaseRevoke, weight: 5}, {choice: CompareAndSet, weight: 5}, + {choice: Put, weight: 15}, + {choice: LargePut, weight: 5}, + {choice: Compact, weight: 5}, }, } EtcdPut = etcdTraffic{ @@ -56,9 +57,10 @@ var ( {choice: List, weight: 15}, {choice: StaleGet, weight: 10}, {choice: StaleList, weight: 10}, - {choice: Put, weight: 40}, {choice: MultiOpTxn, weight: 5}, {choice: LargePut, weight: 5}, + {choice: Put, weight: 35}, + {choice: Compact, weight: 5}, }, } ) @@ -89,6 +91,7 @@ const ( LeaseRevoke etcdRequestType = "leaseRevoke" CompareAndSet etcdRequestType = "compareAndSet" Defragment etcdRequestType = "defragment" + Compact etcdRequestType = "compact" ) func (t etcdTraffic) Name() string { @@ -266,6 +269,12 @@ func (c etcdTrafficClient) Request(ctx context.Context, request etcdRequestType, if resp != nil { rev = resp.Header.Revision } + case Compact: + var resp *clientv3.CompactResponse + resp, err = c.client.Compact(opCtx, lastRev) + if resp != nil { + rev = resp.Header.Revision + } default: panic("invalid choice") } diff --git a/tests/robustness/traffic/kubernetes.go b/tests/robustness/traffic/kubernetes.go index 4fbf8d0a73c..e38dcbc3dd9 100644 --- a/tests/robustness/traffic/kubernetes.go +++ b/tests/robustness/traffic/kubernetes.go @@ -37,9 +37,10 @@ var ( resource: "pods", namespace: "default", writeChoices: []choiceWeight[KubernetesRequestType]{ - {choice: KubernetesUpdate, weight: 90}, + {choice: KubernetesUpdate, weight: 85}, {choice: KubernetesDelete, weight: 5}, {choice: KubernetesCreate, weight: 5}, + {choice: KubernetesCompact, weight: 5}, }, } ) @@ -168,6 +169,8 @@ func (t kubernetesTraffic) Write(ctx context.Context, kc *kubernetesClient, ids _, err = kc.OptimisticUpdate(writeCtx, key, fmt.Sprintf("%d", ids.NewRequestID()), rev) case KubernetesCreate: err = kc.OptimisticCreate(writeCtx, t.generateKey(), fmt.Sprintf("%d", ids.NewRequestID())) + case KubernetesCompact: + err = kc.Compact(writeCtx, rev) default: panic(fmt.Sprintf("invalid choice: %q", op)) } @@ -213,9 +216,10 @@ func (t kubernetesTraffic) generateKey() string { type KubernetesRequestType string const ( - KubernetesDelete KubernetesRequestType = "delete" - KubernetesUpdate KubernetesRequestType = "update" - KubernetesCreate KubernetesRequestType = "create" + KubernetesDelete KubernetesRequestType = "delete" + KubernetesUpdate KubernetesRequestType = "update" + KubernetesCreate KubernetesRequestType = "create" + KubernetesCompact KubernetesRequestType = "compact" ) type kubernetesClient struct { @@ -254,6 +258,11 @@ func (k kubernetesClient) RequestProgress(ctx context.Context) error { return k.client.RequestProgress(clientv3.WithRequireLeader(ctx)) } +func (k kubernetesClient) Compact(ctx context.Context, rev int64) error { + _, err := k.client.Compact(ctx, rev) + return err +} + // Kubernetes optimistically assumes that key didn't change since it was last observed, so it executes operations within a transaction conditioned on key not changing. // However, if the keys value changed it wants imminently to read it, thus the Get operation on failure. func (k kubernetesClient) optimisticOperationOrGet(ctx context.Context, key string, operation clientv3.Op, expectedRevision int64) (*mvccpb.KeyValue, error) { diff --git a/tests/robustness/validate/patch_history.go b/tests/robustness/validate/patch_history.go index 03e23e5bcc4..c5e1efa2b61 100644 --- a/tests/robustness/validate/patch_history.go +++ b/tests/robustness/validate/patch_history.go @@ -193,6 +193,7 @@ func persistedOperationsReturnTime(allOperations []porcupine.Operation, persiste } case model.LeaseGrant: case model.LeaseRevoke: + case model.Compact: default: panic(fmt.Sprintf("Unknown request type: %q", request.Type)) } @@ -218,6 +219,7 @@ func operationReturnTime(operations []porcupine.Operation) map[model.EtcdOperati case model.Range: case model.LeaseGrant: case model.LeaseRevoke: + case model.Compact: default: panic(fmt.Sprintf("Unknown request type: %q", request.Type)) } diff --git a/tests/robustness/validate/validate_test.go b/tests/robustness/validate/validate_test.go index 1ebf90750d1..d752b8bb257 100644 --- a/tests/robustness/validate/validate_test.go +++ b/tests/robustness/validate/validate_test.go @@ -49,11 +49,9 @@ func TestDataReports(t *testing.T) { } visualize := ValidateAndReturnVisualize(t, zaptest.NewLogger(t), Config{}, reports, persistedRequests, 5*time.Minute) - if t.Failed() { - err := visualize(filepath.Join(path, "history.html")) - if err != nil { - t.Fatal(err) - } + err = visualize(filepath.Join(path, "history.html")) + if err != nil { + t.Fatal(err) } }) } diff --git a/tests/robustness/watch.go b/tests/robustness/watch.go index f19fced7374..45869446fbd 100644 --- a/tests/robustness/watch.go +++ b/tests/robustness/watch.go @@ -67,45 +67,59 @@ type watchConfig struct { // watchUntilRevision watches all changes until context is cancelled, it has observed revision provided via maxRevisionChan or maxRevisionChan was closed. func watchUntilRevision(ctx context.Context, t *testing.T, c *client.RecordingClient, maxRevisionChan <-chan int64, cfg watchConfig) { var maxRevision int64 - var lastRevision int64 + var lastRevision int64 = 1 ctx, cancel := context.WithCancel(ctx) defer cancel() - watch := c.Watch(ctx, "", 1, true, true, false) +resetWatch: for { - select { - case <-ctx.Done(): - if maxRevision == 0 { - t.Errorf("Client didn't collect all events, max revision not set") - } - if lastRevision < maxRevision { - t.Errorf("Client didn't collect all events, revision got %d, expected: %d", lastRevision, maxRevision) - } - return - case revision, ok := <-maxRevisionChan: - if ok { - maxRevision = revision - if lastRevision >= maxRevision { - cancel() - } - } else { - // Only cancel if maxRevision was never set. + watch := c.Watch(ctx, "", lastRevision+1, true, true, false) + for { + select { + case <-ctx.Done(): if maxRevision == 0 { + t.Errorf("Client didn't collect all events, max revision not set") + } + if lastRevision < maxRevision { + t.Errorf("Client didn't collect all events, revision got %d, expected: %d", lastRevision, maxRevision) + } + return + case revision, ok := <-maxRevisionChan: + if ok { + maxRevision = revision + if lastRevision >= maxRevision { + cancel() + } + } else { + // Only cancel if maxRevision was never set. + if maxRevision == 0 { + cancel() + } + } + case resp, ok := <-watch: + if !ok { + t.Logf("Watch channel closed") + continue resetWatch + } + if cfg.requestProgress { + c.RequestProgress(ctx) + } + + if resp.Err() != nil { + if resp.Canceled { + if resp.CompactRevision > lastRevision { + lastRevision = resp.CompactRevision + } + continue resetWatch + } + t.Errorf("Watch stream received error, err %v", resp.Err()) + } + if len(resp.Events) > 0 { + lastRevision = resp.Events[len(resp.Events)-1].Kv.ModRevision + } + if maxRevision != 0 && lastRevision >= maxRevision { cancel() } } - case resp := <-watch: - if cfg.requestProgress { - c.RequestProgress(ctx) - } - if resp.Err() != nil && !resp.Canceled { - t.Errorf("Watch stream received error, err %v", resp.Err()) - } - if len(resp.Events) > 0 { - lastRevision = resp.Events[len(resp.Events)-1].Kv.ModRevision - } - if maxRevision != 0 && lastRevision >= maxRevision { - cancel() - } } } }