Skip to content

Commit 173d76e

Browse files
andrewd-zededaeriknordmark
authored andcommitted
Kubevirt: Defer eve reboot/shutdown/update until drain completes
As a part of kubevirt-eve we have multiple cluster nodes each hosting app workloads and volume replicas. This implements defer for eve mgmt config operations which will result in unavailability of storage replicas until the cluster volume is not running on a single replica. An example: 1. Node 1 outage and recovers. 2. Before volumes complete rebuilding on node 1 there is a node 2 outage and recovery. 3. Volumes begin rebuilding replicas on nodes 1 and 2. Only available rebuild source is on node 3. 4. User initiated request to reboot/shutdown/update eve-os on node 3. 5. That config request is set to defer until replicas are rebuilt on the other nodes. At a high level the eve-side workflow looks like this: 1. eve config received requesting reboot/shutdown/baseos-image-change to node 1 2. drain requested for node 1 3. zedkube cordons node 1 so that new workloads are blocked from scheduling on that node. 4. zedkube initiates a kubernetes drain of that node removing workloads 5. As a part of drain, PDB (Pod Disruption Budget) at longhorn level determines local replica is the last online one. 6. Drain waits for volume replicas to rebuild across the cluster. 7. Drain completes and NodeDrainStatus message sent to continue original config request. 8. On the next boot event zedkube nodeOnBootHealthStatusWatcher() waits until the local kubernetes node comes online/ready for the first time on each boot event and uncordons it, allowing workloads to be scheduled. Note: For eve baseos image updates this path waits until a new baseos image is fully available locally (LOADED or INSTALLED) and activated before beginning drain. Signed-off-by: Andrew Durbin <[email protected]>
1 parent 8a92a7f commit 173d76e

22 files changed

+1291
-8
lines changed

docs/CONFIG-PROPERTIES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
| goroutine.leak.detection.check.window.minutes | integer (minutes) | 10 | Interval in minutes for which the leak analysis is performed. It should contain at least 10 measurements, so no less than 10 × goroutine.leak.detection.check.interval.minutes. |
6767
| goroutine.leak.detection.keep.stats.hours | integer (hours) | 24 | Amount of hours to keep the stats for leak detection. We keep more stats than the check window to be able to react to settings with a bigger check window via configuration. |
6868
| goroutine.leak.detection.cooldown.minutes | integer (minutes) | 5 | Cooldown period in minutes after the leak detection is triggered. During this period, no stack traces are collected; only warning messages are logged. |
69+
| kubevirt.drain.timeout | integer | 24 | hours to allow kubernetes to drain a node |
6970

7071
## Log levels
7172

pkg/pillar/cmd/baseosmgr/baseosmgr.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ type baseOsMgrContext struct {
5050
subContentTreeStatus pubsub.Subscription
5151
subNodeAgentStatus pubsub.Subscription
5252
subZedAgentStatus pubsub.Subscription
53+
subNodeDrainStatus pubsub.Subscription
54+
pubNodeDrainRequest pubsub.Publication
55+
deferredBaseOsID string
5356
rebootReason string // From last reboot
5457
rebootTime time.Time // From last reboot
5558
rebootImage string // Image from which the last reboot happened
@@ -103,6 +106,7 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar
103106
initializeNodeAgentHandles(ps, &ctx)
104107
initializeZedagentHandles(ps, &ctx)
105108
initializeVolumemgrHandles(ps, &ctx)
109+
initializeNodeDrainHandles(ps, &ctx)
106110

107111
// publish initial zboot partition status
108112
updateAndPublishZbootStatusAll(&ctx)
@@ -157,6 +161,9 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar
157161
case change := <-ctx.subZedAgentStatus.MsgChan():
158162
ctx.subZedAgentStatus.ProcessChange(change)
159163

164+
case change := <-ctx.subNodeDrainStatus.MsgChan():
165+
ctx.subNodeDrainStatus.ProcessChange(change)
166+
160167
case res := <-ctx.worker.MsgChan():
161168
res.Process(&ctx, true)
162169

pkg/pillar/cmd/baseosmgr/handlebaseos.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,17 @@ func baseOsHandleStatusUpdateUUID(ctx *baseOsMgrContext, id string) {
3232
return
3333
}
3434

35+
// We want to wait to drain until we're sure we actually have a usable image locally.
36+
// eve baseos image is downloaded locally, verified, available, and most importantly has been activated
37+
// before the node downtime/reboot is initiated, see if we need to defer the operation
38+
if ((status.State == types.LOADED) || (status.State == types.INSTALLED)) && config.Activate && !status.Activated {
39+
log.Tracef("baseOsHandleStatusUpdateUUID() image just activated id:%s config:%v status:%v state:%s", id, config, status, status.State)
40+
deferUpdate := shouldDeferForNodeDrain(ctx, id, config, status)
41+
if deferUpdate {
42+
return
43+
}
44+
}
45+
3546
// handle the change event for this base os config
3647
baseOsHandleStatusUpdate(ctx, config, status)
3748
}
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
// Copyright (c) 2025 Zededa, Inc.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package baseosmgr
5+
6+
import (
7+
"github.com/lf-edge/eve/pkg/pillar/kubeapi"
8+
"github.com/lf-edge/eve/pkg/pillar/pubsub"
9+
"github.com/lf-edge/eve/pkg/pillar/types"
10+
)
11+
12+
func handleNodeDrainStatusCreate(ctxArg interface{}, key string,
13+
configArg interface{}) {
14+
handleNodeDrainStatusImpl(ctxArg, key, configArg, nil)
15+
}
16+
17+
func handleNodeDrainStatusModify(ctxArg interface{}, key string,
18+
configArg interface{}, oldConfigArg interface{}) {
19+
handleNodeDrainStatusImpl(ctxArg, key, configArg, oldConfigArg)
20+
}
21+
22+
func handleNodeDrainStatusImpl(ctxArg interface{}, _ string,
23+
configArg interface{}, _ interface{}) {
24+
newStatus, ok := configArg.(kubeapi.NodeDrainStatus)
25+
if !ok {
26+
log.Errorf("handleNodeDrainStatusImpl invalid type in configArg: %v", configArg)
27+
return
28+
}
29+
ctx, ok := ctxArg.(*baseOsMgrContext)
30+
if !ok {
31+
log.Errorf("handleNodeDrainStatusImpl invalid type in ctxArg: %v", ctxArg)
32+
return
33+
}
34+
35+
if newStatus.RequestedBy != kubeapi.UPDATE {
36+
return
37+
}
38+
39+
log.Functionf("handleNodeDrainStatusImpl to:%v", newStatus)
40+
if (newStatus.Status == kubeapi.FAILEDCORDON) ||
41+
(newStatus.Status == kubeapi.FAILEDDRAIN) {
42+
log.Errorf("handleNodeDrainStatusImpl nodedrain-step:drain-failed-handler unpublish NodeDrainRequest due to NodeDrainStatus:%v", newStatus)
43+
if err := ctx.pubNodeDrainRequest.Unpublish("global"); err != nil {
44+
log.Errorf("Unable to remove NodeDrainRequest object:%v", err)
45+
}
46+
}
47+
if newStatus.Status == kubeapi.COMPLETE {
48+
id := ctx.deferredBaseOsID
49+
if id != "" {
50+
log.Noticef("handleNodeDrainStatusImpl nodedrain-step:drain-complete-handler, continuing baseosstatus update id:%s", id)
51+
baseOsHandleStatusUpdateUUID(ctx, id)
52+
}
53+
}
54+
}
55+
56+
func handleNodeDrainStatusDelete(_ interface{}, _ string,
57+
_ interface{}) {
58+
log.Function("handleNodeDrainStatusDelete")
59+
}
60+
61+
func initializeNodeDrainHandles(ps *pubsub.PubSub, ctx *baseOsMgrContext) {
62+
subNodeDrainStatus, err := ps.NewSubscription(pubsub.SubscriptionOptions{
63+
AgentName: "zedkube",
64+
MyAgentName: agentName,
65+
TopicImpl: kubeapi.NodeDrainStatus{},
66+
Persistent: false,
67+
Activate: false,
68+
Ctx: ctx,
69+
CreateHandler: handleNodeDrainStatusCreate,
70+
ModifyHandler: handleNodeDrainStatusModify,
71+
DeleteHandler: handleNodeDrainStatusDelete,
72+
WarningTime: warningTime,
73+
ErrorTime: errorTime,
74+
})
75+
if err != nil {
76+
log.Fatalf("initNodeDrainPubSub subNodeDrainStatus err:%v", err)
77+
return
78+
}
79+
if err := subNodeDrainStatus.Activate(); err != nil {
80+
log.Fatalf("initNodeDrainPubSub can't activate sub:%v", err)
81+
}
82+
83+
pubNodeDrainRequest, err := ps.NewPublication(
84+
pubsub.PublicationOptions{
85+
AgentName: agentName,
86+
TopicType: kubeapi.NodeDrainRequest{},
87+
})
88+
if err != nil {
89+
log.Fatalf("initNodeDrainPubSub pubNodeDrainRequest err:%v", err)
90+
}
91+
ctx.subNodeDrainStatus = subNodeDrainStatus
92+
ctx.pubNodeDrainRequest = pubNodeDrainRequest
93+
}
94+
95+
// shouldDeferForNodeDrain will return true if this BaseOsStatus update will be handled later
96+
func shouldDeferForNodeDrain(ctx *baseOsMgrContext, id string, config *types.BaseOsConfig, status *types.BaseOsStatus) bool {
97+
drainStatus := kubeapi.GetNodeDrainStatus(ctx.subNodeDrainStatus, log)
98+
if drainStatus.Status == kubeapi.NOTSUPPORTED {
99+
return false
100+
}
101+
if drainStatus.Status == kubeapi.UNKNOWN {
102+
log.Error("shouldDeferForNodeDrain EARLY boot request, zedkube not up yet")
103+
return false
104+
}
105+
106+
log.Noticef("shouldDeferForNodeDrain drainCheck id:%s state:%d baseOsConfig:%v baseOsStatus:%v drainStatus:%d",
107+
id, status.State, config, status, drainStatus.Status)
108+
// To allow switching baseos version mid-drain, keep this general to all
109+
// cases of: restarting-failed-drain, starting-fresh-drain
110+
ctx.deferredBaseOsID = id
111+
112+
if drainStatus.Status == kubeapi.NOTREQUESTED ||
113+
drainStatus.Status == kubeapi.FAILEDCORDON ||
114+
drainStatus.Status == kubeapi.FAILEDDRAIN {
115+
log.Noticef("shouldDeferForNodeDrain nodedrain-step:request requester:eve-os-update ctx:%s", id)
116+
err := kubeapi.RequestNodeDrain(ctx.pubNodeDrainRequest, kubeapi.UPDATE, id)
117+
if err != nil {
118+
log.Errorf("shouldDeferForNodeDrain: can't request node drain: %v", err)
119+
}
120+
return true
121+
}
122+
if drainStatus.Status == kubeapi.REQUESTED ||
123+
drainStatus.Status == kubeapi.STARTING ||
124+
drainStatus.Status == kubeapi.CORDONED ||
125+
drainStatus.Status == kubeapi.DRAINRETRYING {
126+
log.Functionf("shouldDeferForNodeDrain drain in-progress or in error, still defer")
127+
return true
128+
}
129+
130+
if drainStatus.Status != kubeapi.COMPLETE {
131+
log.Errorf("shouldDeferForNodeDrain unhanded NodeDrainStatus:%v", drainStatus)
132+
}
133+
134+
log.Noticef("shouldDeferForNodeDrain nodedrain-step:handle-complete requester:eve-os-update ctx:%s", id)
135+
return false
136+
}

pkg/pillar/cmd/diag/diag.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ type diagContext struct {
6969
appInstanceSummary types.AppInstanceSummary
7070
subAppInstanceStatus pubsub.Subscription
7171
subDownloaderStatus pubsub.Subscription
72+
subNodeDrainStatus pubsub.Subscription
7273
zedcloudMetrics *zedcloud.AgentMetrics
7374
gotBC bool
7475
gotDNS bool
@@ -381,6 +382,8 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar
381382
ctx.subDownloaderStatus = subDownloaderStatus
382383
subDownloaderStatus.Activate()
383384

385+
initDrainSub(ps, &ctx)
386+
384387
cloudPingMetricPub, err := ps.NewPublication(
385388
pubsub.PublicationOptions{
386389
AgentName: agentName,
@@ -429,6 +432,9 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar
429432

430433
case change := <-subDownloaderStatus.MsgChan():
431434
subDownloaderStatus.ProcessChange(change)
435+
436+
case change := <-ctx.subNodeDrainStatus.MsgChan():
437+
ctx.subNodeDrainStatus.ProcessChange(change)
432438
}
433439
// Is this the first time we have all the info to print?
434440
if !gotAll && ctx.gotBC && ctx.gotDNS && ctx.gotDPCList {
@@ -1063,6 +1069,8 @@ func printOutput(ctx *diagContext, caller string) {
10631069
ds.Name, ds.ImageSha256, ds.Progress, ds.TotalSize)
10641070
}
10651071
}
1072+
1073+
printNodeDrainStatus(ctx)
10661074
ctx.ph.Flush()
10671075
}
10681076

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
// Copyright (c) 2025 Zededa, Inc.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package diag
5+
6+
import (
7+
"github.com/lf-edge/eve/pkg/pillar/kubeapi"
8+
"github.com/lf-edge/eve/pkg/pillar/pubsub"
9+
)
10+
11+
func initDrainSub(ps *pubsub.PubSub, ctx *diagContext) {
12+
subNodeDrainStatus, err := ps.NewSubscription(pubsub.SubscriptionOptions{
13+
AgentName: "zedkube",
14+
MyAgentName: agentName,
15+
TopicImpl: kubeapi.NodeDrainStatus{},
16+
Persistent: false,
17+
Activate: true,
18+
Ctx: ctx,
19+
CreateHandler: handleNodeDrainStatusCreate,
20+
ModifyHandler: handleNodeDrainStatusModify,
21+
DeleteHandler: handleNodeDrainStatusDelete,
22+
WarningTime: warningTime,
23+
ErrorTime: errorTime,
24+
})
25+
if err != nil {
26+
log.Fatal(err)
27+
}
28+
ctx.subNodeDrainStatus = subNodeDrainStatus
29+
ctx.subNodeDrainStatus.Activate()
30+
}
31+
32+
func handleNodeDrainStatusCreate(ctxArg interface{}, key string,
33+
configArg interface{}) {
34+
handleNodeDrainStatusImpl(ctxArg, key, configArg, nil)
35+
}
36+
37+
func handleNodeDrainStatusModify(ctxArg interface{}, key string,
38+
configArg interface{}, oldConfigArg interface{}) {
39+
handleNodeDrainStatusImpl(ctxArg, key, configArg, oldConfigArg)
40+
}
41+
42+
func handleNodeDrainStatusImpl(ctxArg interface{}, _ string,
43+
_ interface{}, _ interface{}) {
44+
ctx := ctxArg.(*diagContext)
45+
triggerPrintOutput(ctx, "NodeDrain")
46+
}
47+
48+
func printNodeDrainStatus(ctx *diagContext) {
49+
items := ctx.subNodeDrainStatus.GetAll()
50+
for _, item := range items {
51+
nds := item.(kubeapi.NodeDrainStatus)
52+
53+
sev := ""
54+
switch nds.Status {
55+
case kubeapi.UNKNOWN:
56+
case kubeapi.NOTSUPPORTED:
57+
// not kubevirt-EVE or not clustered, skipping unnecessary logging
58+
case kubeapi.NOTREQUESTED:
59+
fallthrough
60+
case kubeapi.REQUESTED:
61+
fallthrough
62+
case kubeapi.STARTING:
63+
fallthrough
64+
case kubeapi.CORDONED:
65+
sev = "INFO"
66+
break
67+
case kubeapi.FAILEDCORDON:
68+
sev = "ERROR"
69+
case kubeapi.DRAINRETRYING:
70+
sev = "WARNING"
71+
case kubeapi.FAILEDDRAIN:
72+
sev = "ERROR"
73+
case kubeapi.COMPLETE:
74+
sev = "INFO"
75+
}
76+
ctx.ph.Print("%s: Node Drain -> %s\n", sev, nds.Status.String())
77+
}
78+
}
79+
80+
func handleNodeDrainStatusDelete(ctxArg interface{}, key string,
81+
statusArg interface{}) {
82+
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
// Copyright (c) 2025 Zededa, Inc.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package nodeagent
5+
6+
import (
7+
"github.com/lf-edge/eve/pkg/pillar/kubeapi"
8+
"github.com/lf-edge/eve/pkg/pillar/pubsub"
9+
)
10+
11+
func handleNodeDrainStatusCreate(ctxArg interface{}, key string,
12+
configArg interface{}) {
13+
handleNodeDrainStatusImpl(ctxArg, key, configArg, nil)
14+
}
15+
16+
func handleNodeDrainStatusModify(ctxArg interface{}, key string,
17+
configArg interface{}, oldConfigArg interface{}) {
18+
handleNodeDrainStatusImpl(ctxArg, key, configArg, oldConfigArg)
19+
}
20+
21+
func handleNodeDrainStatusImpl(ctxArg interface{}, _ string,
22+
configArg interface{}, _ interface{}) {
23+
ctx, ok := ctxArg.(*nodeagentContext)
24+
if !ok {
25+
log.Errorf("handleNodeDrainStatusImpl invalid type in ctxArg:%v", ctxArg)
26+
}
27+
newStatus, ok := configArg.(kubeapi.NodeDrainStatus)
28+
if !ok {
29+
log.Errorf("handleNodeDrainStatusImpl invalid type in configArg:%v", configArg)
30+
}
31+
32+
if newStatus.RequestedBy != kubeapi.DEVICEOP {
33+
return
34+
}
35+
36+
log.Noticef("handleNodeDrainStatusImpl to:%v", newStatus)
37+
// NodeDrainStatus Failures here should keep drainInProgress set.
38+
// As this will set DrainInProgress on NodeAgentStatus and keep zedagent from allowing
39+
// the deferred operation to continue.
40+
if (newStatus.Status >= kubeapi.REQUESTED) && (newStatus.Status < kubeapi.COMPLETE) {
41+
log.Noticef("handleNodeDrainStatusImpl nodedrain-step:drain-inprogress-handler NodeDrainStatus:%v", newStatus)
42+
ctx.waitDrainInProgress = true
43+
publishNodeAgentStatus(ctx)
44+
}
45+
if newStatus.Status == kubeapi.COMPLETE {
46+
log.Notice("handleNodeDrainStatusImpl nodedrain-step:drain-complete-handler notify zedagent")
47+
ctx.waitDrainInProgress = false
48+
publishNodeAgentStatus(ctx)
49+
}
50+
}
51+
52+
func handleNodeDrainStatusDelete(_ interface{}, _ string,
53+
_ interface{}) {
54+
log.Functionf("handleNodeDrainStatusDelete")
55+
}
56+
57+
func initNodeDrainPubSub(ps *pubsub.PubSub, ctx *nodeagentContext) {
58+
subNodeDrainStatus, err := ps.NewSubscription(pubsub.SubscriptionOptions{
59+
AgentName: "zedkube",
60+
MyAgentName: agentName,
61+
TopicImpl: kubeapi.NodeDrainStatus{},
62+
Persistent: false,
63+
Activate: false,
64+
Ctx: ctx,
65+
CreateHandler: handleNodeDrainStatusCreate,
66+
ModifyHandler: handleNodeDrainStatusModify,
67+
DeleteHandler: handleNodeDrainStatusDelete,
68+
WarningTime: warningTime,
69+
ErrorTime: errorTime,
70+
})
71+
if err != nil {
72+
log.Fatalf("initNodeDrainPubSub subNodeDrainStatus err:%v", err)
73+
return
74+
}
75+
if err := subNodeDrainStatus.Activate(); err != nil {
76+
log.Fatalf("initNodeDrainPubSub activate err:%v", err)
77+
}
78+
ctx.subNodeDrainStatus = subNodeDrainStatus
79+
}

0 commit comments

Comments
 (0)