Skip to content

Commit

Permalink
dataclients/kubernetes: use healthcheck based on Shutdown predicate (#…
Browse files Browse the repository at this point in the history
…1918)

Updates #1899

Signed-off-by: Alexander Yastrebov <[email protected]>
  • Loading branch information
AlexanderYastrebov authored Dec 1, 2021
1 parent e5a6fb5 commit 7643317
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 297 deletions.
117 changes: 40 additions & 77 deletions dataclients/kubernetes/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,22 @@ import (
"net"
"net/http"
"os"
"os/signal"
"regexp"
"strings"
"syscall"
"text/template"
"time"

log "github.com/sirupsen/logrus"
"github.com/zalando/skipper/eskip"
"github.com/zalando/skipper/filters"
"github.com/zalando/skipper/predicates"
)

const (
defaultIngressClass = "skipper"
defaultRouteGroupClass = "skipper"
serviceHostEnvVar = "KUBERNETES_SERVICE_HOST"
servicePortEnvVar = "KUBERNETES_SERVICE_PORT"
healthcheckRouteID = "kube__healthz"
httpRedirectRouteID = "kube__redirect"
healthcheckPath = "/kube-system/healthz"
defaultLoadBalancerAlgorithm = "roundRobin"
defaultEastWestDomain = "skipper.cluster.local"
)
Expand Down Expand Up @@ -192,13 +188,10 @@ type Client struct {
ingress *ingress
routeGroups *routeGroups
provideHealthcheck bool
healthy bool
provideHTTPSRedirect bool
termReceived bool
reverseSourcePredicate bool
httpsRedirectCode int
current map[string]*eskip.Route
sigs chan os.Signal
quit chan struct{}
defaultFiltersDir string
}
Expand Down Expand Up @@ -230,13 +223,6 @@ func New(o Options) (*Client, error) {
o.KubernetesInCluster, apiURL, o.ProvideHealthcheck, ingCls, rgCls, o.KubernetesNamespace,
)

var sigs chan os.Signal
if o.ProvideHealthcheck {
log.Info("register sigterm handler")
sigs = make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGTERM)
}

if len(o.WhitelistedHealthCheckCIDR) > 0 {
whitelistCIDRS := make([]interface{}, len(o.WhitelistedHealthCheckCIDR))
for i, v := range o.WhitelistedHealthCheckCIDR {
Expand Down Expand Up @@ -278,7 +264,6 @@ func New(o Options) (*Client, error) {
provideHTTPSRedirect: o.ProvideHTTPSRedirect,
httpsRedirectCode: o.HTTPSRedirectCode,
current: make(map[string]*eskip.Route),
sigs: sigs,
reverseSourcePredicate: o.ReverseSourcePredicate,
quit: quit,
defaultFiltersDir: o.DefaultFiltersDir,
Expand Down Expand Up @@ -356,7 +341,17 @@ func (c *Client) loadAndConvert() ([]*eskip.Route, error) {
return nil, err
}

return append(ri, rg...), nil
r := append(ri, rg...)

if c.provideHealthcheck {
r = append(r, healthcheckRoutes(c.reverseSourcePredicate)...)
}

if c.provideHTTPSRedirect {
r = append(r, globalRedirectRoute(c.httpsRedirectCode))
}

return r, nil
}

func shuntRoute(r *eskip.Route) {
Expand All @@ -374,53 +369,42 @@ func shuntRoute(r *eskip.Route) {
r.Backend = ""
}

func healthcheckRoute(healthy, reverseSourcePredicate bool) *eskip.Route {
logFilters := []*eskip.Filter{{
Name: filters.StatusName,
Args: []interface{}{http.StatusOK}},
}
if !healthy {
logFilters[0].Args = []interface{}{http.StatusServiceUnavailable}
}
// log if unhealthy or a debug loglevel
if healthy && !log.IsLevelEnabled(log.DebugLevel) {
logFilters = append(logFilters, &eskip.Filter{
Name: filters.DisableAccessLogName,
Args: []interface{}{200},
})
}
func healthcheckRoutes(reverseSourcePredicate bool) []*eskip.Route {
template := template.Must(template.New("healthcheck").Parse(`
kube__healthz_up: Path("/kube-system/healthz") && {{.Source}}({{.SourceCIDRs}}) -> {{.DisableAccessLog}} status(200) -> <shunt>;
kube__healthz_down: Path("/kube-system/healthz") && {{.Source}}({{.SourceCIDRs}}) && Shutdown() -> status(503) -> <shunt>;
`))

params := struct {
Source string
SourceCIDRs string
DisableAccessLog string
}{}

var p []*eskip.Predicate
if reverseSourcePredicate {
p = []*eskip.Predicate{{
Name: predicates.SourceFromLastName,
Args: internalIPs,
}}
params.Source = "SourceFromLast"
} else {
p = []*eskip.Predicate{{
Name: predicates.SourceName,
Args: internalIPs,
}}
params.Source = "Source"
}

return &eskip.Route{
Id: healthcheckRouteID,
Predicates: p,
Path: healthcheckPath,
Filters: logFilters,
Shunt: true,
if !log.IsLevelEnabled(log.DebugLevel) {
params.DisableAccessLog = "disableAccessLog(200) ->"
}
}

func (c *Client) hasReceivedTerm() bool {
select {
case s := <-c.sigs:
log.Infof("shutdown, caused by %s, set health check to be unhealthy", s)
c.termReceived = true
default:
cidrs := new(strings.Builder)
for i, ip := range internalIPs {
if i > 0 {
cidrs.WriteString(", ")
}
cidrs.WriteString(fmt.Sprintf("%q", ip))
}
params.SourceCIDRs = cidrs.String()

out := new(strings.Builder)
_ = template.Execute(out, params)
routes, _ := eskip.Parse(out.String())

return c.termReceived
return routes
}

func (c *Client) LoadAll() ([]*eskip.Route, error) {
Expand All @@ -430,16 +414,6 @@ func (c *Client) LoadAll() ([]*eskip.Route, error) {
return nil, fmt.Errorf("failed to load cluster state: %w", err)
}

// teardown handling: always healthy unless SIGTERM received
if c.provideHealthcheck {
c.healthy = !c.hasReceivedTerm()
r = append(r, healthcheckRoute(c.healthy, c.reverseSourcePredicate))
}

if c.provideHTTPSRedirect {
r = append(r, globalRedirectRoute(c.httpsRedirectCode))
}

c.current = mapRoutes(r)
log.Debugf("all routes loaded and mapped")

Expand Down Expand Up @@ -470,7 +444,7 @@ func (c *Client) LoadUpdate() ([]*eskip.Route, []string, error) {
// TODO: use eskip.Eq()
if r, ok := next[id]; ok && r.String() != c.current[id].String() {
updatedRoutes = append(updatedRoutes, r)
} else if !ok && id != healthcheckRouteID && id != httpRedirectRouteID {
} else if !ok {
deletedIDs = append(deletedIDs, id)
}
}
Expand All @@ -485,17 +459,6 @@ func (c *Client) LoadUpdate() ([]*eskip.Route, []string, error) {
log.Infof("diff taken, inserts/updates: %d, deletes: %d", len(updatedRoutes), len(deletedIDs))
}

// teardown handling: always healthy unless SIGTERM received
if c.provideHealthcheck {
healthy := !c.hasReceivedTerm()
if healthy != c.healthy {
c.healthy = healthy
hc := healthcheckRoute(c.healthy, c.reverseSourcePredicate)
next[healthcheckRouteID] = hc
updatedRoutes = append(updatedRoutes, hc)
}
}

c.current = next
return updatedRoutes, deletedIDs, nil
}
Expand Down
Loading

0 comments on commit 7643317

Please sign in to comment.