From 2b54660a04e75ac7b3ae28583e5610380b844942 Mon Sep 17 00:00:00 2001 From: Chao Chen Date: Thu, 5 Oct 2023 23:28:33 -0700 Subject: [PATCH 1/4] http health check bug fixes Signed-off-by: Chao Chen --- server/etcdserver/api/etcdhttp/health.go | 26 ++++++++------- server/etcdserver/api/etcdhttp/health_test.go | 32 ++++++++----------- server/proxy/grpcproxy/health.go | 8 +++-- 3 files changed, 33 insertions(+), 33 deletions(-) diff --git a/server/etcdserver/api/etcdhttp/health.go b/server/etcdserver/api/etcdhttp/health.go index 3d952426631..27e4d884f3f 100644 --- a/server/etcdserver/api/etcdhttp/health.go +++ b/server/etcdserver/api/etcdhttp/health.go @@ -41,6 +41,7 @@ type ServerHealth interface { serverHealthV2V3 Range(context.Context, *pb.RangeRequest) (*pb.RangeResponse, error) Config() config.ServerConfig + AuthStore() auth.AuthStore } type serverHealthV2V3 interface { @@ -50,33 +51,33 @@ type serverHealthV2V3 interface { // HandleHealth registers metrics and health handlers for v2. func HandleHealthForV2(lg *zap.Logger, mux *http.ServeMux, srv etcdserver.ServerV2) { - mux.Handle(PathHealth, NewHealthHandler(lg, func(excludedAlarms AlarmSet, serializable bool) Health { + mux.Handle(PathHealth, NewHealthHandler(lg, func(ctx context.Context, excludedAlarms AlarmSet, serializable bool) Health { if h := checkAlarms(lg, srv, excludedAlarms); h.Health != "true" { return h } if h := checkLeader(lg, srv, serializable); h.Health != "true" { return h } - return checkV2API(lg, srv) + return checkV2API(ctx, lg, srv) })) } // HandleHealth registers metrics and health handlers. it checks health by using v3 range request // and its corresponding timeout. func HandleHealth(lg *zap.Logger, mux *http.ServeMux, srv ServerHealth) { - mux.Handle(PathHealth, NewHealthHandler(lg, func(excludedAlarms AlarmSet, serializable bool) Health { + mux.Handle(PathHealth, NewHealthHandler(lg, func(ctx context.Context, excludedAlarms AlarmSet, serializable bool) Health { if h := checkAlarms(lg, srv, excludedAlarms); h.Health != "true" { return h } if h := checkLeader(lg, srv, serializable); h.Health != "true" { return h } - return checkAPI(lg, srv, serializable) + return checkAPI(ctx, lg, srv, serializable) })) } // NewHealthHandler handles '/health' requests. -func NewHealthHandler(lg *zap.Logger, hfunc func(excludedAlarms AlarmSet, Serializable bool) Health) http.HandlerFunc { +func NewHealthHandler(lg *zap.Logger, hfunc func(ctx context.Context, excludedAlarms AlarmSet, Serializable bool) Health) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { w.Header().Set("Allow", http.MethodGet) @@ -90,7 +91,7 @@ func NewHealthHandler(lg *zap.Logger, hfunc func(excludedAlarms AlarmSet, Serial // This is useful for probes attempting to validate the liveness of // the etcd process vs readiness of the cluster to serve requests. serializableFlag := getSerializableFlag(r) - h := hfunc(excludedAlarms, serializableFlag) + h := hfunc(r.Context(), excludedAlarms, serializableFlag) defer func() { if h.Health == "true" { healthSuccess.Inc() @@ -197,9 +198,9 @@ func checkLeader(lg *zap.Logger, srv serverHealthV2V3, serializable bool) Health return h } -func checkV2API(lg *zap.Logger, srv etcdserver.ServerV2) Health { +func checkV2API(ctx context.Context, lg *zap.Logger, srv etcdserver.ServerV2) Health { h := Health{Health: "true"} - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + ctx, cancel := context.WithTimeout(ctx, time.Second) _, err := srv.Do(ctx, pb.Request{Method: "QGET"}) cancel() if err != nil { @@ -212,13 +213,14 @@ func checkV2API(lg *zap.Logger, srv etcdserver.ServerV2) Health { return h } -func checkAPI(lg *zap.Logger, srv ServerHealth, serializable bool) Health { +func checkAPI(ctx context.Context, lg *zap.Logger, srv ServerHealth, serializable bool) Health { h := Health{Health: "true"} cfg := srv.Config() - ctx, cancel := context.WithTimeout(context.Background(), cfg.ReqTimeout()) - _, err := srv.Range(ctx, &pb.RangeRequest{KeysOnly: true, Limit: 1, Serializable: serializable}) + ctx = srv.AuthStore().WithRoot(ctx) + cctx, cancel := context.WithTimeout(ctx, cfg.ReqTimeout()) + _, err := srv.Range(cctx, &pb.RangeRequest{KeysOnly: true, Limit: 1, Serializable: serializable}) cancel() - if err != nil && err != auth.ErrUserEmpty && err != auth.ErrPermissionDenied { + if err != nil { h.Health = "false" h.Reason = fmt.Sprintf("RANGE ERROR:%s", err) lg.Warn("serving /health false; Range fails", zap.Error(err)) diff --git a/server/etcdserver/api/etcdhttp/health_test.go b/server/etcdserver/api/etcdhttp/health_test.go index 27ad30be4e0..4d2ebbe82c5 100644 --- a/server/etcdserver/api/etcdhttp/health_test.go +++ b/server/etcdserver/api/etcdhttp/health_test.go @@ -9,6 +9,8 @@ import ( "net/http/httptest" "testing" + "go.uber.org/zap/zaptest" + pb "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/client/pkg/v3/testutil" "go.etcd.io/etcd/client/pkg/v3/types" @@ -16,16 +18,17 @@ import ( "go.etcd.io/etcd/server/v3/auth" "go.etcd.io/etcd/server/v3/config" "go.etcd.io/etcd/server/v3/etcdserver" - "go.uber.org/zap/zaptest" + betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing" ) type fakeHealthServer struct { fakeServer - health string - apiError error + health string + apiError error + authStore auth.AuthStore } -func (s *fakeHealthServer) Range(ctx context.Context, request *pb.RangeRequest) (*pb.RangeResponse, error) { +func (s *fakeHealthServer) Range(_ context.Context, _ *pb.RangeRequest) (*pb.RangeResponse, error) { return nil, s.apiError } @@ -39,12 +42,13 @@ func (s *fakeHealthServer) Leader() types.ID { } return types.ID(raft.None) } -func (s *fakeHealthServer) Do(ctx context.Context, r pb.Request) (etcdserver.Response, error) { +func (s *fakeHealthServer) Do(_ context.Context, _ pb.Request) (etcdserver.Response, error) { if s.health == "true" { return etcdserver.Response{}, nil } return etcdserver.Response{}, fmt.Errorf("fail health check") } +func (s *fakeHealthServer) AuthStore() auth.AuthStore { return s.authStore } func (s *fakeHealthServer) ClientCertAuthEnabled() bool { return false } func TestHealthHandler(t *testing.T) { @@ -108,20 +112,6 @@ func TestHealthHandler(t *testing.T) { expectStatusCode: http.StatusOK, expectHealth: "true", }, - { - name: "Healthy even if authentication failed", - healthCheckURL: "/health", - apiError: auth.ErrUserEmpty, - expectStatusCode: http.StatusOK, - expectHealth: "true", - }, - { - name: "Healthy even if authorization failed", - healthCheckURL: "/health", - apiError: auth.ErrPermissionDenied, - expectStatusCode: http.StatusOK, - expectHealth: "true", - }, { name: "Unhealthy if api is not available", healthCheckURL: "/health", @@ -134,10 +124,14 @@ func TestHealthHandler(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { mux := http.NewServeMux() + lg := zaptest.NewLogger(t) + be, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, be) HandleHealth(zaptest.NewLogger(t), mux, &fakeHealthServer{ fakeServer: fakeServer{alarms: tt.alarms}, health: tt.expectHealth, apiError: tt.apiError, + authStore: auth.NewAuthStore(lg, be, nil, 0), }) ts := httptest.NewServer(mux) defer ts.Close() diff --git a/server/proxy/grpcproxy/health.go b/server/proxy/grpcproxy/health.go index 882af4b46a8..e4964055596 100644 --- a/server/proxy/grpcproxy/health.go +++ b/server/proxy/grpcproxy/health.go @@ -31,7 +31,9 @@ func HandleHealth(lg *zap.Logger, mux *http.ServeMux, c *clientv3.Client) { if lg == nil { lg = zap.NewNop() } - mux.Handle(etcdhttp.PathHealth, etcdhttp.NewHealthHandler(lg, func(excludedAlarms etcdhttp.AlarmSet, serializable bool) etcdhttp.Health { return checkHealth(c) })) + mux.Handle(etcdhttp.PathHealth, etcdhttp.NewHealthHandler(lg, func(ctx context.Context, excludedAlarms etcdhttp.AlarmSet, serializable bool) etcdhttp.Health { + return checkHealth(c) + })) } // HandleProxyHealth registers health handler on '/proxy/health'. @@ -39,7 +41,9 @@ func HandleProxyHealth(lg *zap.Logger, mux *http.ServeMux, c *clientv3.Client) { if lg == nil { lg = zap.NewNop() } - mux.Handle(etcdhttp.PathProxyHealth, etcdhttp.NewHealthHandler(lg, func(excludedAlarms etcdhttp.AlarmSet, serializable bool) etcdhttp.Health { return checkProxyHealth(c) })) + mux.Handle(etcdhttp.PathProxyHealth, etcdhttp.NewHealthHandler(lg, func(ctx context.Context, excludedAlarms etcdhttp.AlarmSet, serializable bool) etcdhttp.Health { + return checkProxyHealth(c) + })) } func checkHealth(c *clientv3.Client) etcdhttp.Health { From f5d7f997d668432de9dd45097dc8d63042bd8e33 Mon Sep 17 00:00:00 2001 From: Siyuan Zhang Date: Mon, 25 Sep 2023 15:44:19 -0700 Subject: [PATCH 2/4] etcdserver: add livez and ready http endpoints for etcd. Add two separate probes, one for liveness and one for readiness. The liveness probe would check that the local individual node is up and running, or else restart the node, while the readiness probe would check that the cluster is ready to serve traffic. This would make etcd health-check fully Kubernetes API complient. Signed-off-by: Siyuan Zhang --- server/etcdserver/api/etcdhttp/health.go | 201 +++++++++++++- server/etcdserver/api/etcdhttp/health_test.go | 252 ++++++++++++++---- server/proxy/grpcproxy/health.go | 4 +- 3 files changed, 384 insertions(+), 73 deletions(-) diff --git a/server/etcdserver/api/etcdhttp/health.go b/server/etcdserver/api/etcdhttp/health.go index 27e4d884f3f..ba7b78f1fba 100644 --- a/server/etcdserver/api/etcdhttp/health.go +++ b/server/etcdserver/api/etcdhttp/health.go @@ -15,12 +15,16 @@ package etcdhttp import ( + "bytes" "context" "encoding/json" "fmt" "net/http" "time" + "path" + "strings" + "go.uber.org/zap" "github.com/prometheus/client_golang/prometheus" @@ -51,7 +55,7 @@ type serverHealthV2V3 interface { // HandleHealth registers metrics and health handlers for v2. func HandleHealthForV2(lg *zap.Logger, mux *http.ServeMux, srv etcdserver.ServerV2) { - mux.Handle(PathHealth, NewHealthHandler(lg, func(ctx context.Context, excludedAlarms AlarmSet, serializable bool) Health { + mux.Handle(PathHealth, NewHealthHandler(lg, func(ctx context.Context, excludedAlarms StringSet, serializable bool) Health { if h := checkAlarms(lg, srv, excludedAlarms); h.Health != "true" { return h } @@ -65,7 +69,7 @@ func HandleHealthForV2(lg *zap.Logger, mux *http.ServeMux, srv etcdserver.Server // HandleHealth registers metrics and health handlers. it checks health by using v3 range request // and its corresponding timeout. func HandleHealth(lg *zap.Logger, mux *http.ServeMux, srv ServerHealth) { - mux.Handle(PathHealth, NewHealthHandler(lg, func(ctx context.Context, excludedAlarms AlarmSet, serializable bool) Health { + mux.Handle(PathHealth, NewHealthHandler(lg, func(ctx context.Context, excludedAlarms StringSet, serializable bool) Health { if h := checkAlarms(lg, srv, excludedAlarms); h.Health != "true" { return h } @@ -74,10 +78,13 @@ func HandleHealth(lg *zap.Logger, mux *http.ServeMux, srv ServerHealth) { } return checkAPI(ctx, lg, srv, serializable) })) + + installLivezEndpoints(lg, mux, srv) + installReadyzEndpoints(lg, mux, srv) } // NewHealthHandler handles '/health' requests. -func NewHealthHandler(lg *zap.Logger, hfunc func(ctx context.Context, excludedAlarms AlarmSet, Serializable bool) Health) http.HandlerFunc { +func NewHealthHandler(lg *zap.Logger, hfunc func(ctx context.Context, excludedAlarms StringSet, Serializable bool) Health) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { w.Header().Set("Allow", http.MethodGet) @@ -85,7 +92,7 @@ func NewHealthHandler(lg *zap.Logger, hfunc func(ctx context.Context, excludedAl lg.Warn("/health error", zap.Int("status-code", http.StatusMethodNotAllowed)) return } - excludedAlarms := getExcludedAlarms(r) + excludedAlarms := getQuerySet(r, "exclude") // Passing the query parameter "serializable=true" ensures that the // health of the local etcd is checked vs the health of the cluster. // This is useful for probes attempting to validate the liveness of @@ -138,20 +145,18 @@ type Health struct { Reason string `json:"reason"` } -type AlarmSet map[string]struct{} - -func getExcludedAlarms(r *http.Request) (alarms AlarmSet) { - alarms = make(map[string]struct{}, 2) - alms, found := r.URL.Query()["exclude"] +func getQuerySet(r *http.Request, query string) StringSet { + querySet := make(map[string]struct{}) + qs, found := r.URL.Query()[query] if found { - for _, alm := range alms { - if len(alm) == 0 { + for _, q := range qs { + if len(q) == 0 { continue } - alarms[alm] = struct{}{} + querySet[q] = struct{}{} } } - return alarms + return querySet } func getSerializableFlag(r *http.Request) bool { @@ -160,7 +165,7 @@ func getSerializableFlag(r *http.Request) bool { // TODO: etcdserver.ErrNoLeader in health API -func checkAlarms(lg *zap.Logger, srv serverHealthV2V3, excludedAlarms AlarmSet) Health { +func checkAlarms(lg *zap.Logger, srv serverHealthV2V3, excludedAlarms StringSet) Health { h := Health{Health: "true"} as := srv.Alarms() if len(as) > 0 { @@ -229,3 +234,171 @@ func checkAPI(ctx context.Context, lg *zap.Logger, srv ServerHealth, serializabl lg.Debug("serving /health true") return h } + +type HealthCheck func(ctx context.Context) error + +type CheckRegistry struct { + path string + checks map[string]HealthCheck +} + +func installLivezEndpoints(lg *zap.Logger, mux *http.ServeMux, server ServerHealth) { + reg := CheckRegistry{path: "/livez", checks: make(map[string]HealthCheck)} + reg.Register("serializable_read", serializableReadCheck(server)) + reg.InstallHttpEndpoints(lg, mux) +} + +func installReadyzEndpoints(lg *zap.Logger, mux *http.ServeMux, server ServerHealth) { + reg := CheckRegistry{path: "/readyz", checks: make(map[string]HealthCheck)} + reg.Register("data_corruption", activeAlarmCheck(server, pb.AlarmType_CORRUPT)) + reg.Register("serializable_read", serializableReadCheck(server)) + reg.InstallHttpEndpoints(lg, mux) +} + +func (reg *CheckRegistry) Register(name string, check HealthCheck) { + reg.checks[name] = check +} + +func (reg *CheckRegistry) InstallHttpEndpoints(lg *zap.Logger, mux *http.ServeMux) { + checkNames := make([]string, 0, len(reg.checks)) + for k := range reg.checks { + checkNames = append(checkNames, k) + } + + // installs the http handler for the root path. + reg.installRootHttpEndpoint(lg, mux, reg.path, checkNames...) + for _, checkName := range checkNames { + // installs the http handler for the individual check sub path. + subpath := path.Join(reg.path, checkName) + check := checkName + mux.Handle(subpath, newHealthHandler(subpath, lg, func(r *http.Request) Health { + return reg.runHealthChecks(r.Context(), check) + })) + } +} + +func (reg *CheckRegistry) runHealthChecks(ctx context.Context, checkNames ...string) Health { + h := Health{Health: "true"} + var individualCheckOutput bytes.Buffer + for _, checkName := range checkNames { + check, found := reg.checks[checkName] + if !found { + panic(fmt.Errorf("Health check: %s not registered", checkName)) + } + if err := check(ctx); err != nil { + fmt.Fprintf(&individualCheckOutput, "[-]%s failed: %v\n", checkName, err) + h.Health = "false" + } else { + fmt.Fprintf(&individualCheckOutput, "[+]%s ok\n", checkName) + } + } + h.Reason = individualCheckOutput.String() + return h +} + +// installRootHttpEndpoint installs the http handler for the root path. +func (reg *CheckRegistry) installRootHttpEndpoint(lg *zap.Logger, mux *http.ServeMux, path string, checks ...string) { + hfunc := func(r *http.Request) Health { + // extracts the health check names to be excludeList from the query param + excluded := getQuerySet(r, "exclude") + + filteredCheckNames := filterCheckList(lg, listToStringSet(checks), excluded) + return reg.runHealthChecks(r.Context(), filteredCheckNames...) + } + mux.Handle(path, newHealthHandler(path, lg, hfunc)) +} + +// newHealthHandler generates a http HandlerFunc for a health check function hfunc. +func newHealthHandler(path string, lg *zap.Logger, hfunc func(*http.Request) Health) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + w.Header().Set("Allow", http.MethodGet) + http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) + lg.Warn("Health request error", zap.String("path", path), zap.Int("status-code", http.StatusMethodNotAllowed)) + return + } + h := hfunc(r) + // Always returns detailed reason for failed checks. + if h.Health != "true" { + http.Error(w, h.Reason, http.StatusServiceUnavailable) + lg.Error("Health check error", zap.String("path", path), zap.String("reason", h.Reason), zap.Int("status-code", http.StatusServiceUnavailable)) + return + } + w.Header().Set("Content-Type", "text/plain; charset=utf-8") + w.Header().Set("X-Content-Type-Options", "nosniff") + // Only writes detailed reason for verbose requests. + if _, found := r.URL.Query()["verbose"]; found { + fmt.Fprint(w, h.Reason) + } + fmt.Fprint(w, "ok\n") + lg.Debug("Health check OK", zap.String("path", path), zap.String("reason", h.Reason), zap.Int("status-code", http.StatusOK)) + } +} + +func filterCheckList(lg *zap.Logger, checks StringSet, excluded StringSet) []string { + filteredList := []string{} + for chk := range checks { + if _, found := excluded[chk]; found { + delete(excluded, chk) + continue + } + filteredList = append(filteredList, chk) + } + if len(excluded) > 0 { + // For version compatibility, excluding non-exist checks would not fail the request. + lg.Warn("some health checks cannot be excluded", zap.String("missing-health-checks", formatQuoted(excluded.List()...))) + } + return filteredList +} + +// formatQuoted returns a formatted string of the health check names, +// preserving the order passed in. +func formatQuoted(names ...string) string { + quoted := make([]string, 0, len(names)) + for _, name := range names { + quoted = append(quoted, fmt.Sprintf("%q", name)) + } + return strings.Join(quoted, ",") +} + +type StringSet map[string]struct{} + +func (s StringSet) List() []string { + keys := make([]string, 0, len(s)) + for k := range s { + keys = append(keys, k) + } + return keys +} + +func listToStringSet(list []string) StringSet { + set := make(map[string]struct{}) + for _, s := range list { + set[s] = struct{}{} + } + return set +} + +// activeAlarmCheck checks if a specific alarm type is active in the server. +func activeAlarmCheck(srv ServerHealth, at pb.AlarmType) func(context.Context) error { + return func(ctx context.Context) error { + as := srv.Alarms() + for _, v := range as { + if v.Alarm == at { + return fmt.Errorf("alarm activated: %s", at.String()) + } + } + return nil + } +} + +func serializableReadCheck(srv ServerHealth) func(ctx context.Context) error { + return func(ctx context.Context) error { + ctx = srv.AuthStore().WithRoot(ctx) + _, err := srv.Range(ctx, &pb.RangeRequest{KeysOnly: true, Limit: 1, Serializable: true}) + if err != nil { + return fmt.Errorf("range error: %w", err) + } + return nil + } +} diff --git a/server/etcdserver/api/etcdhttp/health_test.go b/server/etcdserver/api/etcdhttp/health_test.go index 4d2ebbe82c5..d2bd1b197f4 100644 --- a/server/etcdserver/api/etcdhttp/health_test.go +++ b/server/etcdserver/api/etcdhttp/health_test.go @@ -2,11 +2,11 @@ package etcdhttp import ( "context" - "encoding/json" "fmt" "io" "net/http" "net/http/httptest" + "strings" "testing" "go.uber.org/zap/zaptest" @@ -17,15 +17,14 @@ import ( "go.etcd.io/etcd/raft/v3" "go.etcd.io/etcd/server/v3/auth" "go.etcd.io/etcd/server/v3/config" - "go.etcd.io/etcd/server/v3/etcdserver" betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing" ) type fakeHealthServer struct { fakeServer - health string - apiError error - authStore auth.AuthStore + apiError error + missingLeader bool + authStore auth.AuthStore } func (s *fakeHealthServer) Range(_ context.Context, _ *pb.RangeRequest) (*pb.RangeResponse, error) { @@ -37,87 +36,91 @@ func (s *fakeHealthServer) Config() config.ServerConfig { } func (s *fakeHealthServer) Leader() types.ID { - if s.health == "true" { + if !s.missingLeader { return 1 } return types.ID(raft.None) } -func (s *fakeHealthServer) Do(_ context.Context, _ pb.Request) (etcdserver.Response, error) { - if s.health == "true" { - return etcdserver.Response{}, nil - } - return etcdserver.Response{}, fmt.Errorf("fail health check") -} -func (s *fakeHealthServer) AuthStore() auth.AuthStore { return s.authStore } + +func (s *fakeHealthServer) AuthStore() auth.AuthStore { return s.authStore } + func (s *fakeHealthServer) ClientCertAuthEnabled() bool { return false } +type healthTestCase struct { + name string + healthCheckURL string + expectStatusCode int + inResult []string + notInResult []string + + alarms []*pb.AlarmMember + apiError error + missingLeader bool +} + func TestHealthHandler(t *testing.T) { // define the input and expected output // input: alarms, and healthCheckURL - tests := []struct { - name string - alarms []*pb.AlarmMember - healthCheckURL string - apiError error - - expectStatusCode int - expectHealth string - }{ + tests := []healthTestCase{ { name: "Healthy if no alarm", alarms: []*pb.AlarmMember{}, healthCheckURL: "/health", expectStatusCode: http.StatusOK, - expectHealth: "true", }, { name: "Unhealthy if NOSPACE alarm is on", alarms: []*pb.AlarmMember{{MemberID: uint64(0), Alarm: pb.AlarmType_NOSPACE}}, healthCheckURL: "/health", expectStatusCode: http.StatusServiceUnavailable, - expectHealth: "false", }, { name: "Healthy if NOSPACE alarm is on and excluded", alarms: []*pb.AlarmMember{{MemberID: uint64(0), Alarm: pb.AlarmType_NOSPACE}}, healthCheckURL: "/health?exclude=NOSPACE", expectStatusCode: http.StatusOK, - expectHealth: "true", }, { name: "Healthy if NOSPACE alarm is excluded", alarms: []*pb.AlarmMember{}, healthCheckURL: "/health?exclude=NOSPACE", expectStatusCode: http.StatusOK, - expectHealth: "true", }, { name: "Healthy if multiple NOSPACE alarms are on and excluded", alarms: []*pb.AlarmMember{{MemberID: uint64(1), Alarm: pb.AlarmType_NOSPACE}, {MemberID: uint64(2), Alarm: pb.AlarmType_NOSPACE}, {MemberID: uint64(3), Alarm: pb.AlarmType_NOSPACE}}, healthCheckURL: "/health?exclude=NOSPACE", expectStatusCode: http.StatusOK, - expectHealth: "true", }, { name: "Unhealthy if NOSPACE alarms is excluded and CORRUPT is on", alarms: []*pb.AlarmMember{{MemberID: uint64(0), Alarm: pb.AlarmType_NOSPACE}, {MemberID: uint64(1), Alarm: pb.AlarmType_CORRUPT}}, healthCheckURL: "/health?exclude=NOSPACE", expectStatusCode: http.StatusServiceUnavailable, - expectHealth: "false", }, { name: "Unhealthy if both NOSPACE and CORRUPT are on and excluded", alarms: []*pb.AlarmMember{{MemberID: uint64(0), Alarm: pb.AlarmType_NOSPACE}, {MemberID: uint64(1), Alarm: pb.AlarmType_CORRUPT}}, healthCheckURL: "/health?exclude=NOSPACE&exclude=CORRUPT", expectStatusCode: http.StatusOK, - expectHealth: "true", }, { name: "Unhealthy if api is not available", healthCheckURL: "/health", apiError: fmt.Errorf("Unexpected error"), expectStatusCode: http.StatusServiceUnavailable, - expectHealth: "false", + }, + { + name: "Unhealthy if no leader", + healthCheckURL: "/health", + expectStatusCode: http.StatusServiceUnavailable, + missingLeader: true, + }, + { + name: "Healthy if no leader and serializable=true", + healthCheckURL: "/health?serializable=true", + expectStatusCode: http.StatusOK, + missingLeader: true, }, } @@ -128,44 +131,179 @@ func TestHealthHandler(t *testing.T) { be, _ := betesting.NewDefaultTmpBackend(t) defer betesting.Close(t, be) HandleHealth(zaptest.NewLogger(t), mux, &fakeHealthServer{ - fakeServer: fakeServer{alarms: tt.alarms}, - health: tt.expectHealth, - apiError: tt.apiError, - authStore: auth.NewAuthStore(lg, be, nil, 0), + fakeServer: fakeServer{alarms: tt.alarms}, + apiError: tt.apiError, + missingLeader: tt.missingLeader, + authStore: auth.NewAuthStore(lg, be, nil, 0), }) ts := httptest.NewServer(mux) defer ts.Close() + checkHttpResponse(t, ts, tt.healthCheckURL, tt.expectStatusCode, nil, nil) + }) + } +} - res, err := ts.Client().Do(&http.Request{Method: http.MethodGet, URL: testutil.MustNewURL(t, ts.URL+tt.healthCheckURL)}) - if err != nil { - t.Errorf("fail serve http request %s %v", tt.healthCheckURL, err) - } - if res == nil { - t.Errorf("got nil http response with http request %s", tt.healthCheckURL) - return - } - if res.StatusCode != tt.expectStatusCode { - t.Errorf("want statusCode %d but got %d", tt.expectStatusCode, res.StatusCode) +func TestHttpSubPath(t *testing.T) { + be, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, be) + tests := []healthTestCase{ + { + name: "/readyz/data_corruption ok", + healthCheckURL: "/readyz/data_corruption", + expectStatusCode: http.StatusOK, + }, + { + name: "/readyz/serializable_read not ok with error", + apiError: fmt.Errorf("Unexpected error"), + healthCheckURL: "/readyz/serializable_read", + expectStatusCode: http.StatusServiceUnavailable, + notInResult: []string{"data_corruption"}, + }, + { + name: "/readyz/non_exist 404", + healthCheckURL: "/readyz/non_exist", + expectStatusCode: http.StatusNotFound, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mux := http.NewServeMux() + logger := zaptest.NewLogger(t) + s := &fakeHealthServer{ + apiError: tt.apiError, + authStore: auth.NewAuthStore(logger, be, nil, 0), } - health, err := parseHealthOutput(res.Body) - if err != nil { - t.Errorf("fail parse health check output %v", err) + HandleHealth(logger, mux, s) + ts := httptest.NewServer(mux) + defer ts.Close() + checkHttpResponse(t, ts, tt.healthCheckURL, tt.expectStatusCode, tt.inResult, tt.notInResult) + }) + } +} + +func TestDataCorruptionCheck(t *testing.T) { + be, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, be) + tests := []healthTestCase{ + { + name: "Live if CORRUPT alarm is on", + alarms: []*pb.AlarmMember{{MemberID: uint64(0), Alarm: pb.AlarmType_CORRUPT}}, + healthCheckURL: "/livez", + expectStatusCode: http.StatusOK, + notInResult: []string{"data_corruption"}, + }, + { + name: "Not ready if CORRUPT alarm is on", + alarms: []*pb.AlarmMember{{MemberID: uint64(0), Alarm: pb.AlarmType_CORRUPT}}, + healthCheckURL: "/readyz", + expectStatusCode: http.StatusServiceUnavailable, + inResult: []string{"[-]data_corruption failed: alarm activated: CORRUPT"}, + }, + { + name: "ready if CORRUPT alarm is not on", + alarms: []*pb.AlarmMember{{MemberID: uint64(0), Alarm: pb.AlarmType_NOSPACE}}, + healthCheckURL: "/readyz", + expectStatusCode: http.StatusOK, + }, + { + name: "ready if CORRUPT alarm is excluded", + alarms: []*pb.AlarmMember{{MemberID: uint64(0), Alarm: pb.AlarmType_CORRUPT}, {MemberID: uint64(0), Alarm: pb.AlarmType_NOSPACE}}, + healthCheckURL: "/readyz?exclude=data_corruption", + expectStatusCode: http.StatusOK, + }, + { + name: "Not ready if CORRUPT alarm is on", + alarms: []*pb.AlarmMember{{MemberID: uint64(0), Alarm: pb.AlarmType_CORRUPT}}, + healthCheckURL: "/readyz?exclude=non_exist", + expectStatusCode: http.StatusServiceUnavailable, + inResult: []string{"[-]data_corruption failed: alarm activated: CORRUPT"}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mux := http.NewServeMux() + logger := zaptest.NewLogger(t) + s := &fakeHealthServer{ + authStore: auth.NewAuthStore(logger, be, nil, 0), } - if health.Health != tt.expectHealth { - t.Errorf("want health %s but got %s", tt.expectHealth, health.Health) + HandleHealth(logger, mux, s) + ts := httptest.NewServer(mux) + defer ts.Close() + // OK before alarms are activated. + checkHttpResponse(t, ts, tt.healthCheckURL, http.StatusOK, nil, nil) + // Activate the alarms. + s.alarms = tt.alarms + checkHttpResponse(t, ts, tt.healthCheckURL, tt.expectStatusCode, tt.inResult, tt.notInResult) + }) + } +} + +func TestSerializableReadCheck(t *testing.T) { + be, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, be) + tests := []healthTestCase{ + { + name: "Alive normal", + healthCheckURL: "/livez?verbose", + expectStatusCode: http.StatusOK, + inResult: []string{"[+]serializable_read ok"}, + }, + { + name: "Not alive if range api is not available", + healthCheckURL: "/livez", + apiError: fmt.Errorf("Unexpected error"), + expectStatusCode: http.StatusServiceUnavailable, + inResult: []string{"[-]serializable_read failed: range error: Unexpected error"}, + }, + { + name: "Not ready if range api is not available", + healthCheckURL: "/readyz", + apiError: fmt.Errorf("Unexpected error"), + expectStatusCode: http.StatusServiceUnavailable, + inResult: []string{"[-]serializable_read failed: range error: Unexpected error"}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mux := http.NewServeMux() + logger := zaptest.NewLogger(t) + s := &fakeHealthServer{ + apiError: tt.apiError, + authStore: auth.NewAuthStore(logger, be, nil, 0), } + HandleHealth(logger, mux, s) + ts := httptest.NewServer(mux) + defer ts.Close() + checkHttpResponse(t, ts, tt.healthCheckURL, tt.expectStatusCode, tt.inResult, tt.notInResult) }) } } -func parseHealthOutput(body io.Reader) (Health, error) { - obj := Health{} - d, derr := io.ReadAll(body) - if derr != nil { - return obj, derr +func checkHttpResponse(t *testing.T, ts *httptest.Server, url string, expectStatusCode int, inResult []string, notInResult []string) { + res, err := ts.Client().Do(&http.Request{Method: http.MethodGet, URL: testutil.MustNewURL(t, ts.URL+url)}) + + if err != nil { + t.Fatalf("fail serve http request %s %v", url, err) + } + if res.StatusCode != expectStatusCode { + t.Errorf("want statusCode %d but got %d", expectStatusCode, res.StatusCode) + } + defer res.Body.Close() + b, err := io.ReadAll(res.Body) + if err != nil { + t.Fatalf("Failed to read response for %s", url) + } + result := string(b) + for _, substr := range inResult { + if !strings.Contains(result, substr) { + t.Errorf("Could not find substring : %s, in response: %s", substr, result) + return + } } - if err := json.Unmarshal(d, &obj); err != nil { - return obj, err + for _, substr := range notInResult { + if strings.Contains(result, substr) { + t.Errorf("Do not expect substring : %s, in response: %s", substr, result) + return + } } - return obj, nil } diff --git a/server/proxy/grpcproxy/health.go b/server/proxy/grpcproxy/health.go index e4964055596..8747dc3c57d 100644 --- a/server/proxy/grpcproxy/health.go +++ b/server/proxy/grpcproxy/health.go @@ -31,7 +31,7 @@ func HandleHealth(lg *zap.Logger, mux *http.ServeMux, c *clientv3.Client) { if lg == nil { lg = zap.NewNop() } - mux.Handle(etcdhttp.PathHealth, etcdhttp.NewHealthHandler(lg, func(ctx context.Context, excludedAlarms etcdhttp.AlarmSet, serializable bool) etcdhttp.Health { + mux.Handle(etcdhttp.PathHealth, etcdhttp.NewHealthHandler(lg, func(ctx context.Context, excludedAlarms etcdhttp.StringSet, serializable bool) etcdhttp.Health { return checkHealth(c) })) } @@ -41,7 +41,7 @@ func HandleProxyHealth(lg *zap.Logger, mux *http.ServeMux, c *clientv3.Client) { if lg == nil { lg = zap.NewNop() } - mux.Handle(etcdhttp.PathProxyHealth, etcdhttp.NewHealthHandler(lg, func(ctx context.Context, excludedAlarms etcdhttp.AlarmSet, serializable bool) etcdhttp.Health { + mux.Handle(etcdhttp.PathProxyHealth, etcdhttp.NewHealthHandler(lg, func(ctx context.Context, excludedAlarms etcdhttp.StringSet, serializable bool) etcdhttp.Health { return checkProxyHealth(c) })) } From 293fc21cd8cc6a57f2a36489c4334edb7de0fb42 Mon Sep 17 00:00:00 2001 From: Siyuan Zhang Date: Mon, 16 Oct 2023 15:16:42 -0700 Subject: [PATCH 3/4] etcdserver: add metric counters for livez/readyz health checks. Signed-off-by: Siyuan Zhang --- server/etcdserver/api/etcdhttp/health.go | 91 +++++++++++++++---- server/etcdserver/api/etcdhttp/health_test.go | 54 +++++++++++ 2 files changed, 127 insertions(+), 18 deletions(-) diff --git a/server/etcdserver/api/etcdhttp/health.go b/server/etcdserver/api/etcdhttp/health.go index ba7b78f1fba..a442228012b 100644 --- a/server/etcdserver/api/etcdhttp/health.go +++ b/server/etcdserver/api/etcdhttp/health.go @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +// This file defines the http endpoints for etcd health checks. +// The endpoints include /livez, /readyz and /health. + package etcdhttp import ( @@ -37,8 +40,13 @@ import ( ) const ( - PathHealth = "/health" - PathProxyHealth = "/proxy/health" + PathHealth = "/health" + PathProxyHealth = "/proxy/health" + HealthStatusSuccess string = "success" + HealthStatusError string = "error" + checkTypeLivez = "livez" + checkTypeReadyz = "readyz" + checkTypeHealth = "health" ) type ServerHealth interface { @@ -131,11 +139,29 @@ var ( Name: "health_failures", Help: "The total number of failed health checks", }) + healthCheckGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "etcd", + Subsystem: "server", + Name: "healthcheck", + Help: "The result of each kind of healthcheck.", + }, + []string{"type", "name"}, + ) + healthCheckCounter = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "etcd", + Subsystem: "server", + Name: "healthchecks_total", + Help: "The total number of each kind of healthcheck.", + }, + []string{"type", "name", "status"}, + ) ) func init() { prometheus.MustRegister(healthSuccess) prometheus.MustRegister(healthFailed) + prometheus.MustRegister(healthCheckGauge) + prometheus.MustRegister(healthCheckCounter) } // Health defines etcd server health status. @@ -145,6 +171,12 @@ type Health struct { Reason string `json:"reason"` } +// HealthStatus is used in new /readyz or /livez health checks instead of the Health struct. +type HealthStatus struct { + Reason string `json:"reason"` + Status string `json:"status"` +} + func getQuerySet(r *http.Request, query string) StringSet { querySet := make(map[string]struct{}) qs, found := r.URL.Query()[query] @@ -238,18 +270,18 @@ func checkAPI(ctx context.Context, lg *zap.Logger, srv ServerHealth, serializabl type HealthCheck func(ctx context.Context) error type CheckRegistry struct { - path string - checks map[string]HealthCheck + checkType string + checks map[string]HealthCheck } func installLivezEndpoints(lg *zap.Logger, mux *http.ServeMux, server ServerHealth) { - reg := CheckRegistry{path: "/livez", checks: make(map[string]HealthCheck)} + reg := CheckRegistry{checkType: checkTypeLivez, checks: make(map[string]HealthCheck)} reg.Register("serializable_read", serializableReadCheck(server)) reg.InstallHttpEndpoints(lg, mux) } func installReadyzEndpoints(lg *zap.Logger, mux *http.ServeMux, server ServerHealth) { - reg := CheckRegistry{path: "/readyz", checks: make(map[string]HealthCheck)} + reg := CheckRegistry{checkType: checkTypeReadyz, checks: make(map[string]HealthCheck)} reg.Register("data_corruption", activeAlarmCheck(server, pb.AlarmType_CORRUPT)) reg.Register("serializable_read", serializableReadCheck(server)) reg.InstallHttpEndpoints(lg, mux) @@ -259,6 +291,10 @@ func (reg *CheckRegistry) Register(name string, check HealthCheck) { reg.checks[name] = check } +func (reg *CheckRegistry) RootPath() string { + return "/" + reg.checkType +} + func (reg *CheckRegistry) InstallHttpEndpoints(lg *zap.Logger, mux *http.ServeMux) { checkNames := make([]string, 0, len(reg.checks)) for k := range reg.checks { @@ -266,19 +302,19 @@ func (reg *CheckRegistry) InstallHttpEndpoints(lg *zap.Logger, mux *http.ServeMu } // installs the http handler for the root path. - reg.installRootHttpEndpoint(lg, mux, reg.path, checkNames...) + reg.installRootHttpEndpoint(lg, mux, checkNames...) for _, checkName := range checkNames { // installs the http handler for the individual check sub path. - subpath := path.Join(reg.path, checkName) + subpath := path.Join(reg.RootPath(), checkName) check := checkName - mux.Handle(subpath, newHealthHandler(subpath, lg, func(r *http.Request) Health { + mux.Handle(subpath, newHealthHandler(subpath, lg, func(r *http.Request) HealthStatus { return reg.runHealthChecks(r.Context(), check) })) } } -func (reg *CheckRegistry) runHealthChecks(ctx context.Context, checkNames ...string) Health { - h := Health{Health: "true"} +func (reg *CheckRegistry) runHealthChecks(ctx context.Context, checkNames ...string) HealthStatus { + h := HealthStatus{Status: HealthStatusSuccess} var individualCheckOutput bytes.Buffer for _, checkName := range checkNames { check, found := reg.checks[checkName] @@ -287,9 +323,11 @@ func (reg *CheckRegistry) runHealthChecks(ctx context.Context, checkNames ...str } if err := check(ctx); err != nil { fmt.Fprintf(&individualCheckOutput, "[-]%s failed: %v\n", checkName, err) - h.Health = "false" + h.Status = HealthStatusError + recordMetrics(reg.checkType, checkName, HealthStatusError) } else { fmt.Fprintf(&individualCheckOutput, "[+]%s ok\n", checkName) + recordMetrics(reg.checkType, checkName, HealthStatusSuccess) } } h.Reason = individualCheckOutput.String() @@ -297,19 +335,20 @@ func (reg *CheckRegistry) runHealthChecks(ctx context.Context, checkNames ...str } // installRootHttpEndpoint installs the http handler for the root path. -func (reg *CheckRegistry) installRootHttpEndpoint(lg *zap.Logger, mux *http.ServeMux, path string, checks ...string) { - hfunc := func(r *http.Request) Health { +func (reg *CheckRegistry) installRootHttpEndpoint(lg *zap.Logger, mux *http.ServeMux, checks ...string) { + hfunc := func(r *http.Request) HealthStatus { // extracts the health check names to be excludeList from the query param excluded := getQuerySet(r, "exclude") filteredCheckNames := filterCheckList(lg, listToStringSet(checks), excluded) - return reg.runHealthChecks(r.Context(), filteredCheckNames...) + h := reg.runHealthChecks(r.Context(), filteredCheckNames...) + return h } - mux.Handle(path, newHealthHandler(path, lg, hfunc)) + mux.Handle(reg.RootPath(), newHealthHandler(reg.RootPath(), lg, hfunc)) } // newHealthHandler generates a http HandlerFunc for a health check function hfunc. -func newHealthHandler(path string, lg *zap.Logger, hfunc func(*http.Request) Health) http.HandlerFunc { +func newHealthHandler(path string, lg *zap.Logger, hfunc func(*http.Request) HealthStatus) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { w.Header().Set("Allow", http.MethodGet) @@ -319,7 +358,7 @@ func newHealthHandler(path string, lg *zap.Logger, hfunc func(*http.Request) Hea } h := hfunc(r) // Always returns detailed reason for failed checks. - if h.Health != "true" { + if h.Status == HealthStatusError { http.Error(w, h.Reason, http.StatusServiceUnavailable) lg.Error("Health check error", zap.String("path", path), zap.String("reason", h.Reason), zap.Int("status-code", http.StatusServiceUnavailable)) return @@ -379,6 +418,22 @@ func listToStringSet(list []string) StringSet { return set } +func recordMetrics(checkType, name string, status string) { + val := 0.0 + if status == HealthStatusSuccess { + val = 1.0 + } + healthCheckGauge.With(prometheus.Labels{ + "type": checkType, + "name": name, + }).Set(val) + healthCheckCounter.With(prometheus.Labels{ + "type": checkType, + "name": name, + "status": status, + }).Inc() +} + // activeAlarmCheck checks if a specific alarm type is active in the server. func activeAlarmCheck(srv ServerHealth, at pb.AlarmType) func(context.Context) error { return func(ctx context.Context) error { diff --git a/server/etcdserver/api/etcdhttp/health_test.go b/server/etcdserver/api/etcdhttp/health_test.go index d2bd1b197f4..bb1fbd5cbf1 100644 --- a/server/etcdserver/api/etcdhttp/health_test.go +++ b/server/etcdserver/api/etcdhttp/health_test.go @@ -9,6 +9,7 @@ import ( "strings" "testing" + "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap/zaptest" pb "go.etcd.io/etcd/api/v3/etcdserverpb" @@ -177,6 +178,7 @@ func TestHttpSubPath(t *testing.T) { ts := httptest.NewServer(mux) defer ts.Close() checkHttpResponse(t, ts, tt.healthCheckURL, tt.expectStatusCode, tt.inResult, tt.notInResult) + checkMetrics(t, tt.healthCheckURL, "", tt.expectStatusCode) }) } } @@ -275,6 +277,7 @@ func TestSerializableReadCheck(t *testing.T) { ts := httptest.NewServer(mux) defer ts.Close() checkHttpResponse(t, ts, tt.healthCheckURL, tt.expectStatusCode, tt.inResult, tt.notInResult) + checkMetrics(t, tt.healthCheckURL, "serializable_read", tt.expectStatusCode) }) } } @@ -307,3 +310,54 @@ func checkHttpResponse(t *testing.T, ts *httptest.Server, url string, expectStat } } } + +func checkMetrics(t *testing.T, url, checkName string, expectStatusCode int) { + defer healthCheckGauge.Reset() + defer healthCheckCounter.Reset() + + typeName := strings.TrimPrefix(strings.Split(url, "?")[0], "/") + if len(checkName) == 0 { + checkName = strings.Split(typeName, "/")[1] + typeName = strings.Split(typeName, "/")[0] + } + + expectedSuccessCount := 1 + expectedErrorCount := 0 + if expectStatusCode != http.StatusOK { + expectedSuccessCount = 0 + expectedErrorCount = 1 + } + + gather, _ := prometheus.DefaultGatherer.Gather() + for _, mf := range gather { + name := *mf.Name + val := 0 + switch name { + case "etcd_server_healthcheck": + val = int(mf.GetMetric()[0].GetGauge().GetValue()) + case "etcd_server_healthcheck_total": + val = int(mf.GetMetric()[0].GetCounter().GetValue()) + default: + continue + } + labelMap := make(map[string]string) + for _, label := range mf.GetMetric()[0].Label { + labelMap[label.GetName()] = label.GetValue() + } + if typeName != labelMap["type"] { + continue + } + if labelMap["name"] != checkName { + continue + } + if statusLabel, found := labelMap["status"]; found && statusLabel == HealthStatusError { + if val != expectedErrorCount { + t.Fatalf("%s got errorCount %d, wanted %d\n", name, val, expectedErrorCount) + } + } else { + if val != expectedSuccessCount { + t.Fatalf("%s got expectedSuccessCount %d, wanted %d\n", name, val, expectedSuccessCount) + } + } + } +} From ebb7e796c3cc83f99e7837e060159f8ff991ee90 Mon Sep 17 00:00:00 2001 From: Siyuan Zhang Date: Mon, 20 Nov 2023 09:58:40 -0800 Subject: [PATCH 4/4] etcdserver: add linearizable_read check to readyz. Signed-off-by: Siyuan Zhang --- server/etcdserver/api/etcdhttp/health.go | 21 ++--- server/etcdserver/api/etcdhttp/health_test.go | 76 +++++++++++++++---- 2 files changed, 72 insertions(+), 25 deletions(-) diff --git a/server/etcdserver/api/etcdhttp/health.go b/server/etcdserver/api/etcdhttp/health.go index a442228012b..0a933429122 100644 --- a/server/etcdserver/api/etcdhttp/health.go +++ b/server/etcdserver/api/etcdhttp/health.go @@ -23,10 +23,9 @@ import ( "encoding/json" "fmt" "net/http" - "time" - "path" "strings" + "time" "go.uber.org/zap" @@ -276,14 +275,19 @@ type CheckRegistry struct { func installLivezEndpoints(lg *zap.Logger, mux *http.ServeMux, server ServerHealth) { reg := CheckRegistry{checkType: checkTypeLivez, checks: make(map[string]HealthCheck)} - reg.Register("serializable_read", serializableReadCheck(server)) + reg.Register("serializable_read", readCheck(server, true /* serializable */)) reg.InstallHttpEndpoints(lg, mux) } func installReadyzEndpoints(lg *zap.Logger, mux *http.ServeMux, server ServerHealth) { reg := CheckRegistry{checkType: checkTypeReadyz, checks: make(map[string]HealthCheck)} reg.Register("data_corruption", activeAlarmCheck(server, pb.AlarmType_CORRUPT)) - reg.Register("serializable_read", serializableReadCheck(server)) + // serializable_read checks if local read is ok. + // linearizable_read checks if there is consensus in the cluster. + // Having both serializable_read and linearizable_read helps isolate the cause of problems if there is a read failure. + reg.Register("serializable_read", readCheck(server, true)) + // linearizable_read check would be replaced by read_index check in 3.6 + reg.Register("linearizable_read", readCheck(server, false)) reg.InstallHttpEndpoints(lg, mux) } @@ -447,13 +451,10 @@ func activeAlarmCheck(srv ServerHealth, at pb.AlarmType) func(context.Context) e } } -func serializableReadCheck(srv ServerHealth) func(ctx context.Context) error { +func readCheck(srv ServerHealth, serializable bool) func(ctx context.Context) error { return func(ctx context.Context) error { ctx = srv.AuthStore().WithRoot(ctx) - _, err := srv.Range(ctx, &pb.RangeRequest{KeysOnly: true, Limit: 1, Serializable: true}) - if err != nil { - return fmt.Errorf("range error: %w", err) - } - return nil + _, err := srv.Range(ctx, &pb.RangeRequest{KeysOnly: true, Limit: 1, Serializable: serializable}) + return err } } diff --git a/server/etcdserver/api/etcdhttp/health_test.go b/server/etcdserver/api/etcdhttp/health_test.go index bb1fbd5cbf1..0019121755a 100644 --- a/server/etcdserver/api/etcdhttp/health_test.go +++ b/server/etcdserver/api/etcdhttp/health_test.go @@ -23,13 +23,17 @@ import ( type fakeHealthServer struct { fakeServer - apiError error - missingLeader bool - authStore auth.AuthStore + serializableReadError error + linearizableReadError error + missingLeader bool + authStore auth.AuthStore } -func (s *fakeHealthServer) Range(_ context.Context, _ *pb.RangeRequest) (*pb.RangeResponse, error) { - return nil, s.apiError +func (s *fakeHealthServer) Range(_ context.Context, req *pb.RangeRequest) (*pb.RangeResponse, error) { + if req.Serializable { + return nil, s.serializableReadError + } + return nil, s.linearizableReadError } func (s *fakeHealthServer) Config() config.ServerConfig { @@ -132,10 +136,11 @@ func TestHealthHandler(t *testing.T) { be, _ := betesting.NewDefaultTmpBackend(t) defer betesting.Close(t, be) HandleHealth(zaptest.NewLogger(t), mux, &fakeHealthServer{ - fakeServer: fakeServer{alarms: tt.alarms}, - apiError: tt.apiError, - missingLeader: tt.missingLeader, - authStore: auth.NewAuthStore(lg, be, nil, 0), + fakeServer: fakeServer{alarms: tt.alarms}, + serializableReadError: tt.apiError, + linearizableReadError: tt.apiError, + missingLeader: tt.missingLeader, + authStore: auth.NewAuthStore(lg, be, nil, 0), }) ts := httptest.NewServer(mux) defer ts.Close() @@ -171,8 +176,8 @@ func TestHttpSubPath(t *testing.T) { mux := http.NewServeMux() logger := zaptest.NewLogger(t) s := &fakeHealthServer{ - apiError: tt.apiError, - authStore: auth.NewAuthStore(logger, be, nil, 0), + serializableReadError: tt.apiError, + authStore: auth.NewAuthStore(logger, be, nil, 0), } HandleHealth(logger, mux, s) ts := httptest.NewServer(mux) @@ -255,14 +260,14 @@ func TestSerializableReadCheck(t *testing.T) { healthCheckURL: "/livez", apiError: fmt.Errorf("Unexpected error"), expectStatusCode: http.StatusServiceUnavailable, - inResult: []string{"[-]serializable_read failed: range error: Unexpected error"}, + inResult: []string{"[-]serializable_read failed: Unexpected error"}, }, { name: "Not ready if range api is not available", healthCheckURL: "/readyz", apiError: fmt.Errorf("Unexpected error"), expectStatusCode: http.StatusServiceUnavailable, - inResult: []string{"[-]serializable_read failed: range error: Unexpected error"}, + inResult: []string{"[-]serializable_read failed: Unexpected error"}, }, } for _, tt := range tests { @@ -270,8 +275,8 @@ func TestSerializableReadCheck(t *testing.T) { mux := http.NewServeMux() logger := zaptest.NewLogger(t) s := &fakeHealthServer{ - apiError: tt.apiError, - authStore: auth.NewAuthStore(logger, be, nil, 0), + serializableReadError: tt.apiError, + authStore: auth.NewAuthStore(logger, be, nil, 0), } HandleHealth(logger, mux, s) ts := httptest.NewServer(mux) @@ -282,6 +287,47 @@ func TestSerializableReadCheck(t *testing.T) { } } +func TestLinearizableReadCheck(t *testing.T) { + be, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, be) + tests := []healthTestCase{ + { + name: "Alive normal", + healthCheckURL: "/livez?verbose", + expectStatusCode: http.StatusOK, + inResult: []string{"[+]serializable_read ok"}, + }, + { + name: "Alive if lineariable range api is not available", + healthCheckURL: "/livez", + apiError: fmt.Errorf("Unexpected error"), + expectStatusCode: http.StatusOK, + }, + { + name: "Not ready if range api is not available", + healthCheckURL: "/readyz", + apiError: fmt.Errorf("Unexpected error"), + expectStatusCode: http.StatusServiceUnavailable, + inResult: []string{"[+]serializable_read ok", "[-]linearizable_read failed: Unexpected error"}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mux := http.NewServeMux() + logger := zaptest.NewLogger(t) + s := &fakeHealthServer{ + linearizableReadError: tt.apiError, + authStore: auth.NewAuthStore(logger, be, nil, 0), + } + HandleHealth(logger, mux, s) + ts := httptest.NewServer(mux) + defer ts.Close() + checkHttpResponse(t, ts, tt.healthCheckURL, tt.expectStatusCode, tt.inResult, tt.notInResult) + checkMetrics(t, tt.healthCheckURL, "linearizable_read", tt.expectStatusCode) + }) + } +} + func checkHttpResponse(t *testing.T, ts *httptest.Server, url string, expectStatusCode int, inResult []string, notInResult []string) { res, err := ts.Client().Do(&http.Request{Method: http.MethodGet, URL: testutil.MustNewURL(t, ts.URL+url)})