From 131033caf515f3c338e090bb5c9217e70f4ad951 Mon Sep 17 00:00:00 2001 From: naoki-take Date: Tue, 25 Jun 2024 08:54:43 +0000 Subject: [PATCH 1/2] Evict 'OnDelete' DaemonSet pods Signed-off-by: naoki-take --- op/reboot.go | 131 ++++++++++++++++++++++++++++++++++++++++ op/reboot_decide.go | 67 +++++++++++++++++--- server/strategy.go | 2 + server/strategy_test.go | 1 + 4 files changed, 192 insertions(+), 9 deletions(-) diff --git a/op/reboot.go b/op/reboot.go index 5cc207e4..7d7ea6f6 100644 --- a/op/reboot.go +++ b/op/reboot.go @@ -203,6 +203,137 @@ func (c rebootDrainStartCommand) Run(ctx context.Context, inf cke.Infrastructure // +type rebootEvictDaemonSetPodOp struct { + finished bool + + entries []*cke.RebootQueueEntry + config *cke.Reboot + apiserver *cke.Node + + mu sync.Mutex + failedNodes []string +} + +func RebootEvictDaemonSetPodOp(apiserver *cke.Node, entries []*cke.RebootQueueEntry, config *cke.Reboot) cke.InfoOperator { + return &rebootEvictDaemonSetPodOp{ + entries: entries, + config: config, + apiserver: apiserver, + } +} + +type rebootEvictDaemonSetPodCommand struct { + entries []*cke.RebootQueueEntry + protectedNamespaces *metav1.LabelSelector + apiserver *cke.Node + evictAttempts int + evictInterval time.Duration + + notifyFailedNode func(string) +} + +func (o *rebootEvictDaemonSetPodOp) Name() string { + return "reboot-evict-daemonset-pod" +} + +func (o *rebootEvictDaemonSetPodOp) notifyFailedNode(node string) { + o.mu.Lock() + o.failedNodes = append(o.failedNodes, node) + o.mu.Unlock() +} + +func (o *rebootEvictDaemonSetPodOp) Targets() []string { + ipAddresses := make([]string, len(o.entries)) + for i, entry := range o.entries { + ipAddresses[i] = entry.Node + } + return ipAddresses +} + +func (o *rebootEvictDaemonSetPodOp) Info() string { + if len(o.failedNodes) == 0 { + return "" + } + return fmt.Sprintf("failed to evict DaemonSet pods on some nodes: %v", o.failedNodes) +} + +func (o *rebootEvictDaemonSetPodOp) NextCommand() cke.Commander { + if o.finished { + return nil + } + o.finished = true + + attempts := 1 + if o.config.EvictRetries != nil { + attempts = *o.config.EvictRetries + 1 + } + interval := 0 * time.Second + if o.config.EvictInterval != nil { + interval = time.Second * time.Duration(*o.config.EvictInterval) + } + + return rebootEvictDaemonSetPodCommand{ + entries: o.entries, + protectedNamespaces: o.config.ProtectedNamespaces, + apiserver: o.apiserver, + notifyFailedNode: o.notifyFailedNode, + evictAttempts: attempts, + evictInterval: interval, + } +} + +func (c rebootEvictDaemonSetPodCommand) Command() cke.Command { + ipAddresses := make([]string, len(c.entries)) + for i, entry := range c.entries { + ipAddresses[i] = entry.Node + } + return cke.Command{ + Name: "rebootEvictDaemonSetPodCommand", + Target: strings.Join(ipAddresses, ","), + } +} + +func (c rebootEvictDaemonSetPodCommand) Run(ctx context.Context, inf cke.Infrastructure, _ string) error { + cs, err := inf.K8sClient(ctx, c.apiserver) + if err != nil { + return err + } + + protected, err := listProtectedNamespaces(ctx, cs, c.protectedNamespaces) + if err != nil { + return err + } + + // evict DaemonSet pod on each node + // cordon is unnecessary for DaemonSet pods, so dry-run eviction is also skipped. + for _, entry := range c.entries { + // keep entry.Status as RebootStatusDraining and don't update it here. + + log.Info("start eviction of DaemonSet pod", map[string]interface{}{ + "name": entry.Node, + }) + err := evictOrDeleteOnDeleteDaemonSetPod(ctx, cs, entry.Node, protected, c.evictAttempts, c.evictInterval) + if err != nil { + log.Warn("eviction of DaemonSet pod failed", map[string]interface{}{ + "name": entry.Node, + log.FnError: err, + }) + c.notifyFailedNode(entry.Node) + err = drainBackOff(ctx, inf, entry, err) + if err != nil { + return err + } + log.Info("eviction of DaemonSet pod succeeded", map[string]interface{}{ + "name": entry.Node, + }) + } + } + + return nil +} + +// + type rebootRebootOp struct { finished bool diff --git a/op/reboot_decide.go b/op/reboot_decide.go index c630470d..434a010b 100644 --- a/op/reboot_decide.go +++ b/op/reboot_decide.go @@ -9,6 +9,7 @@ import ( "github.com/cybozu-go/cke" "github.com/cybozu-go/log" "github.com/cybozu-go/well" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" policyv1 "k8s.io/api/policy/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -61,25 +62,75 @@ func enumeratePods(ctx context.Context, cs *kubernetes.Clientset, node string, return nil } +// enumerateOnDeleteDaemonSetPods enumerates Pods on a specified node that are owned by "updateStrategy:OnDelete" DaemonSets. +// It calls podHandler for each target pods. +// If the handler returns error, this function returns the error immediately. +// Note: This function does not distinguish API errors and state evaluation returned from subfunction. +func enumerateOnDeleteDaemonSetPods(ctx context.Context, cs *kubernetes.Clientset, node string, + podHandler func(pod *corev1.Pod) error) error { + + daemonSets, err := cs.AppsV1().DaemonSets(corev1.NamespaceAll).List(ctx, metav1.ListOptions{}) + if err != nil { + return err + } + + for _, ds := range daemonSets.Items { + if ds.Spec.UpdateStrategy.Type == appsv1.OnDeleteDaemonSetStrategyType { + labelSelector := metav1.FormatLabelSelector(ds.Spec.Selector) + pods, err := cs.CoreV1().Pods(ds.Namespace).List(ctx, metav1.ListOptions{ + LabelSelector: labelSelector, + }) + if err != nil { + return err + } + + for _, pod := range pods.Items { + if pod.Spec.NodeName == node { + err = podHandler(&pod) + if err != nil { + return err + } + } + } + } + } + + return nil +} + // dryRunEvictOrDeleteNodePod checks eviction or deletion of Pods on the specified Node can proceed. // It returns an error if a running Pod exists or an eviction of the Pod in protected namespace failed. func dryRunEvictOrDeleteNodePod(ctx context.Context, cs *kubernetes.Clientset, node string, protected map[string]bool) error { - return doEvictOrDeleteNodePod(ctx, cs, node, protected, 0, 0, true) + return enumeratePods(ctx, cs, node, + doEvictOrDeleteNodePod(ctx, cs, node, protected, 0, time.Duration(0), true), + func(pod *corev1.Pod) error { + return fmt.Errorf("job-managed pod exists: %s/%s, phase=%s", pod.Namespace, pod.Name, pod.Status.Phase) + }, + ) } // evictOrDeleteNodePod evicts or delete Pods on the specified Node. // If a running Job Pod exists, this function returns an error. func evictOrDeleteNodePod(ctx context.Context, cs *kubernetes.Clientset, node string, protected map[string]bool, attempts int, interval time.Duration) error { - return doEvictOrDeleteNodePod(ctx, cs, node, protected, attempts, interval, false) + return enumeratePods(ctx, cs, node, + doEvictOrDeleteNodePod(ctx, cs, node, protected, attempts, interval, false), + func(pod *corev1.Pod) error { + return fmt.Errorf("job-managed pod exists: %s/%s, phase=%s", pod.Namespace, pod.Name, pod.Status.Phase) + }, + ) +} + +// evictOrDeleteOnDeleteDaemonSetPod evicts or delete Pods on the specified Node that are owned by "updateStrategy:OnDelete" DaemonSets. +func evictOrDeleteOnDeleteDaemonSetPod(ctx context.Context, cs *kubernetes.Clientset, node string, protected map[string]bool, attempts int, interval time.Duration) error { + return enumerateOnDeleteDaemonSetPods(ctx, cs, node, doEvictOrDeleteNodePod(ctx, cs, node, protected, attempts, interval, false)) } -// doEvictOrDeleteNodePod evicts or delete Pods on the specified Node. +// doEvictOrDeleteNodePod returns a pod handler that evicts or delete Pods on the specified Node. // It first tries eviction. // If the eviction failed and the Pod's namespace is not protected, it deletes the Pod. // If the eviction failed and the Pod's namespace is protected, it retries after `interval` interval at most `attempts` times. -// If a running Job Pod exists, this function returns an error. // If `dry` is true, it performs dry run and `attempts` and `interval` are ignored. -func doEvictOrDeleteNodePod(ctx context.Context, cs *kubernetes.Clientset, node string, protected map[string]bool, attempts int, interval time.Duration, dry bool) error { +func doEvictOrDeleteNodePod(ctx context.Context, cs *kubernetes.Clientset, node string, protected map[string]bool, attempts int, interval time.Duration, dry bool) func(pod *corev1.Pod) error { var deleteOptions *metav1.DeleteOptions if dry { deleteOptions = &metav1.DeleteOptions{ @@ -87,7 +138,7 @@ func doEvictOrDeleteNodePod(ctx context.Context, cs *kubernetes.Clientset, node } } - return enumeratePods(ctx, cs, node, func(pod *corev1.Pod) error { + return func(pod *corev1.Pod) error { if dry && !protected[pod.Namespace] { // in case of dry-run for Pods in non-protected namespace, // return immediately because its "eviction or deletion" never fails @@ -158,9 +209,7 @@ func doEvictOrDeleteNodePod(ctx context.Context, cs *kubernetes.Clientset, node return fmt.Errorf("failed to evict pod %s/%s due to PDB: %w", pod.Namespace, pod.Name, err) } return nil - }, func(pod *corev1.Pod) error { - return fmt.Errorf("job-managed pod exists: %s/%s, phase=%s", pod.Namespace, pod.Name, pod.Status.Phase) - }) + } } // checkPodDeletion checks whether the evicted or deleted Pods are eventually deleted. diff --git a/server/strategy.go b/server/strategy.go index c24fbd05..ca3e0a2e 100644 --- a/server/strategy.go +++ b/server/strategy.go @@ -898,6 +898,8 @@ func rebootOps(c *cke.Cluster, constraints *cke.Constraints, rebootArgs DecideOp } if len(rebootArgs.DrainCompleted) > 0 { + // After eviction of normal pods, evict "OnDelete" daemonset pods. + ops = append(ops, op.RebootEvictDaemonSetPodOp(nf.HealthyAPIServer(), rebootArgs.DrainCompleted, &c.Reboot)) ops = append(ops, op.RebootRebootOp(nf.HealthyAPIServer(), rebootArgs.DrainCompleted, &c.Reboot)) } if len(rebootArgs.NewlyDrained) > 0 { diff --git a/server/strategy_test.go b/server/strategy_test.go index c09a684a..69bd6b5c 100644 --- a/server/strategy_test.go +++ b/server/strategy_test.go @@ -2637,6 +2637,7 @@ func TestDecideOps(t *testing.T) { }, }), ExpectedOps: []opData{ + {"reboot-evict-daemonset-pod", 1}, {"reboot-reboot", 1}, }, }, From 6c9df33ce528086b3a1fd69d0762e137bf5ec5ab Mon Sep 17 00:00:00 2001 From: naoki-take Date: Wed, 26 Jun 2024 06:54:26 +0000 Subject: [PATCH 2/2] Just delete daemonset pods Signed-off-by: naoki-take --- op/reboot.go | 76 +++++++++++++---------------------------- op/reboot_decide.go | 46 ++++++++++++++----------- server/strategy.go | 2 +- server/strategy_test.go | 2 +- 4 files changed, 53 insertions(+), 73 deletions(-) diff --git a/op/reboot.go b/op/reboot.go index 7d7ea6f6..8112e1ac 100644 --- a/op/reboot.go +++ b/op/reboot.go @@ -203,7 +203,7 @@ func (c rebootDrainStartCommand) Run(ctx context.Context, inf cke.Infrastructure // -type rebootEvictDaemonSetPodOp struct { +type rebootDeleteDaemonSetPodOp struct { finished bool entries []*cke.RebootQueueEntry @@ -214,35 +214,32 @@ type rebootEvictDaemonSetPodOp struct { failedNodes []string } -func RebootEvictDaemonSetPodOp(apiserver *cke.Node, entries []*cke.RebootQueueEntry, config *cke.Reboot) cke.InfoOperator { - return &rebootEvictDaemonSetPodOp{ +func RebootDeleteDaemonSetPodOp(apiserver *cke.Node, entries []*cke.RebootQueueEntry, config *cke.Reboot) cke.InfoOperator { + return &rebootDeleteDaemonSetPodOp{ entries: entries, config: config, apiserver: apiserver, } } -type rebootEvictDaemonSetPodCommand struct { - entries []*cke.RebootQueueEntry - protectedNamespaces *metav1.LabelSelector - apiserver *cke.Node - evictAttempts int - evictInterval time.Duration +type rebootDeleteDaemonSetPodCommand struct { + entries []*cke.RebootQueueEntry + apiserver *cke.Node notifyFailedNode func(string) } -func (o *rebootEvictDaemonSetPodOp) Name() string { - return "reboot-evict-daemonset-pod" +func (o *rebootDeleteDaemonSetPodOp) Name() string { + return "reboot-delete-daemonset-pod" } -func (o *rebootEvictDaemonSetPodOp) notifyFailedNode(node string) { +func (o *rebootDeleteDaemonSetPodOp) notifyFailedNode(node string) { o.mu.Lock() o.failedNodes = append(o.failedNodes, node) o.mu.Unlock() } -func (o *rebootEvictDaemonSetPodOp) Targets() []string { +func (o *rebootDeleteDaemonSetPodOp) Targets() []string { ipAddresses := make([]string, len(o.entries)) for i, entry := range o.entries { ipAddresses[i] = entry.Node @@ -250,82 +247,57 @@ func (o *rebootEvictDaemonSetPodOp) Targets() []string { return ipAddresses } -func (o *rebootEvictDaemonSetPodOp) Info() string { +func (o *rebootDeleteDaemonSetPodOp) Info() string { if len(o.failedNodes) == 0 { return "" } - return fmt.Sprintf("failed to evict DaemonSet pods on some nodes: %v", o.failedNodes) + return fmt.Sprintf("failed to delete DaemonSet pods on some nodes: %v", o.failedNodes) } -func (o *rebootEvictDaemonSetPodOp) NextCommand() cke.Commander { +func (o *rebootDeleteDaemonSetPodOp) NextCommand() cke.Commander { if o.finished { return nil } o.finished = true - attempts := 1 - if o.config.EvictRetries != nil { - attempts = *o.config.EvictRetries + 1 - } - interval := 0 * time.Second - if o.config.EvictInterval != nil { - interval = time.Second * time.Duration(*o.config.EvictInterval) - } - - return rebootEvictDaemonSetPodCommand{ - entries: o.entries, - protectedNamespaces: o.config.ProtectedNamespaces, - apiserver: o.apiserver, - notifyFailedNode: o.notifyFailedNode, - evictAttempts: attempts, - evictInterval: interval, + return rebootDeleteDaemonSetPodCommand{ + entries: o.entries, + apiserver: o.apiserver, + notifyFailedNode: o.notifyFailedNode, } } -func (c rebootEvictDaemonSetPodCommand) Command() cke.Command { +func (c rebootDeleteDaemonSetPodCommand) Command() cke.Command { ipAddresses := make([]string, len(c.entries)) for i, entry := range c.entries { ipAddresses[i] = entry.Node } return cke.Command{ - Name: "rebootEvictDaemonSetPodCommand", + Name: "rebootDeleteDaemonSetPodCommand", Target: strings.Join(ipAddresses, ","), } } -func (c rebootEvictDaemonSetPodCommand) Run(ctx context.Context, inf cke.Infrastructure, _ string) error { +func (c rebootDeleteDaemonSetPodCommand) Run(ctx context.Context, inf cke.Infrastructure, _ string) error { cs, err := inf.K8sClient(ctx, c.apiserver) if err != nil { return err } - protected, err := listProtectedNamespaces(ctx, cs, c.protectedNamespaces) - if err != nil { - return err - } - - // evict DaemonSet pod on each node - // cordon is unnecessary for DaemonSet pods, so dry-run eviction is also skipped. + // delete DaemonSet pod on each node for _, entry := range c.entries { // keep entry.Status as RebootStatusDraining and don't update it here. - log.Info("start eviction of DaemonSet pod", map[string]interface{}{ + log.Info("start deletion of DaemonSet pod", map[string]interface{}{ "name": entry.Node, }) - err := evictOrDeleteOnDeleteDaemonSetPod(ctx, cs, entry.Node, protected, c.evictAttempts, c.evictInterval) + err := deleteOnDeleteDaemonSetPod(ctx, cs, entry.Node) if err != nil { - log.Warn("eviction of DaemonSet pod failed", map[string]interface{}{ + log.Warn("deletion of DaemonSet pod failed", map[string]interface{}{ "name": entry.Node, log.FnError: err, }) c.notifyFailedNode(entry.Node) - err = drainBackOff(ctx, inf, entry, err) - if err != nil { - return err - } - log.Info("eviction of DaemonSet pod succeeded", map[string]interface{}{ - "name": entry.Node, - }) } } diff --git a/op/reboot_decide.go b/op/reboot_decide.go index 434a010b..7eeefaf7 100644 --- a/op/reboot_decide.go +++ b/op/reboot_decide.go @@ -101,36 +101,27 @@ func enumerateOnDeleteDaemonSetPods(ctx context.Context, cs *kubernetes.Clientse // dryRunEvictOrDeleteNodePod checks eviction or deletion of Pods on the specified Node can proceed. // It returns an error if a running Pod exists or an eviction of the Pod in protected namespace failed. func dryRunEvictOrDeleteNodePod(ctx context.Context, cs *kubernetes.Clientset, node string, protected map[string]bool) error { - return enumeratePods(ctx, cs, node, - doEvictOrDeleteNodePod(ctx, cs, node, protected, 0, time.Duration(0), true), - func(pod *corev1.Pod) error { - return fmt.Errorf("job-managed pod exists: %s/%s, phase=%s", pod.Namespace, pod.Name, pod.Status.Phase) - }, - ) + return doEvictOrDeleteNodePod(ctx, cs, node, protected, 0, 0, true) } // evictOrDeleteNodePod evicts or delete Pods on the specified Node. // If a running Job Pod exists, this function returns an error. func evictOrDeleteNodePod(ctx context.Context, cs *kubernetes.Clientset, node string, protected map[string]bool, attempts int, interval time.Duration) error { - return enumeratePods(ctx, cs, node, - doEvictOrDeleteNodePod(ctx, cs, node, protected, attempts, interval, false), - func(pod *corev1.Pod) error { - return fmt.Errorf("job-managed pod exists: %s/%s, phase=%s", pod.Namespace, pod.Name, pod.Status.Phase) - }, - ) + return doEvictOrDeleteNodePod(ctx, cs, node, protected, attempts, interval, false) } -// evictOrDeleteOnDeleteDaemonSetPod evicts or delete Pods on the specified Node that are owned by "updateStrategy:OnDelete" DaemonSets. -func evictOrDeleteOnDeleteDaemonSetPod(ctx context.Context, cs *kubernetes.Clientset, node string, protected map[string]bool, attempts int, interval time.Duration) error { - return enumerateOnDeleteDaemonSetPods(ctx, cs, node, doEvictOrDeleteNodePod(ctx, cs, node, protected, attempts, interval, false)) +// deleteOnDeleteDaemonSetPod evicts or delete Pods on the specified Node that are owned by "updateStrategy:OnDelete" DaemonSets. +func deleteOnDeleteDaemonSetPod(ctx context.Context, cs *kubernetes.Clientset, node string) error { + return doDeleteOnDeleteDaemonSetPod(ctx, cs, node) } -// doEvictOrDeleteNodePod returns a pod handler that evicts or delete Pods on the specified Node. +// doEvictOrDeleteNodePod evicts or delete Pods on the specified Node. // It first tries eviction. // If the eviction failed and the Pod's namespace is not protected, it deletes the Pod. // If the eviction failed and the Pod's namespace is protected, it retries after `interval` interval at most `attempts` times. +// If a running Job Pod exists, this function returns an error. // If `dry` is true, it performs dry run and `attempts` and `interval` are ignored. -func doEvictOrDeleteNodePod(ctx context.Context, cs *kubernetes.Clientset, node string, protected map[string]bool, attempts int, interval time.Duration, dry bool) func(pod *corev1.Pod) error { +func doEvictOrDeleteNodePod(ctx context.Context, cs *kubernetes.Clientset, node string, protected map[string]bool, attempts int, interval time.Duration, dry bool) error { var deleteOptions *metav1.DeleteOptions if dry { deleteOptions = &metav1.DeleteOptions{ @@ -138,7 +129,7 @@ func doEvictOrDeleteNodePod(ctx context.Context, cs *kubernetes.Clientset, node } } - return func(pod *corev1.Pod) error { + return enumeratePods(ctx, cs, node, func(pod *corev1.Pod) error { if dry && !protected[pod.Namespace] { // in case of dry-run for Pods in non-protected namespace, // return immediately because its "eviction or deletion" never fails @@ -209,7 +200,24 @@ func doEvictOrDeleteNodePod(ctx context.Context, cs *kubernetes.Clientset, node return fmt.Errorf("failed to evict pod %s/%s due to PDB: %w", pod.Namespace, pod.Name, err) } return nil - } + }, func(pod *corev1.Pod) error { + return fmt.Errorf("job-managed pod exists: %s/%s, phase=%s", pod.Namespace, pod.Name, pod.Status.Phase) + }) +} + +// doDeleteOnDeleteDaemonSetPod deletes 'OnDelete' DaemonSet pods on the specified Node. +func doDeleteOnDeleteDaemonSetPod(ctx context.Context, cs *kubernetes.Clientset, node string) error { + return enumerateOnDeleteDaemonSetPods(ctx, cs, node, func(pod *corev1.Pod) error { + err := cs.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return err + } + log.Info("deleted daemonset pod", map[string]interface{}{ + "namespace": pod.Namespace, + "name": pod.Name, + }) + return nil + }) } // checkPodDeletion checks whether the evicted or deleted Pods are eventually deleted. diff --git a/server/strategy.go b/server/strategy.go index ca3e0a2e..a29b3c3d 100644 --- a/server/strategy.go +++ b/server/strategy.go @@ -899,7 +899,7 @@ func rebootOps(c *cke.Cluster, constraints *cke.Constraints, rebootArgs DecideOp if len(rebootArgs.DrainCompleted) > 0 { // After eviction of normal pods, evict "OnDelete" daemonset pods. - ops = append(ops, op.RebootEvictDaemonSetPodOp(nf.HealthyAPIServer(), rebootArgs.DrainCompleted, &c.Reboot)) + ops = append(ops, op.RebootDeleteDaemonSetPodOp(nf.HealthyAPIServer(), rebootArgs.DrainCompleted, &c.Reboot)) ops = append(ops, op.RebootRebootOp(nf.HealthyAPIServer(), rebootArgs.DrainCompleted, &c.Reboot)) } if len(rebootArgs.NewlyDrained) > 0 { diff --git a/server/strategy_test.go b/server/strategy_test.go index 69bd6b5c..a98e9303 100644 --- a/server/strategy_test.go +++ b/server/strategy_test.go @@ -2637,7 +2637,7 @@ func TestDecideOps(t *testing.T) { }, }), ExpectedOps: []opData{ - {"reboot-evict-daemonset-pod", 1}, + {"reboot-delete-daemonset-pod", 1}, {"reboot-reboot", 1}, }, },