diff --git a/op/reboot.go b/op/reboot.go index 5cc207e4..8112e1ac 100644 --- a/op/reboot.go +++ b/op/reboot.go @@ -203,6 +203,109 @@ func (c rebootDrainStartCommand) Run(ctx context.Context, inf cke.Infrastructure // +type rebootDeleteDaemonSetPodOp struct { + finished bool + + entries []*cke.RebootQueueEntry + config *cke.Reboot + apiserver *cke.Node + + mu sync.Mutex + failedNodes []string +} + +func RebootDeleteDaemonSetPodOp(apiserver *cke.Node, entries []*cke.RebootQueueEntry, config *cke.Reboot) cke.InfoOperator { + return &rebootDeleteDaemonSetPodOp{ + entries: entries, + config: config, + apiserver: apiserver, + } +} + +type rebootDeleteDaemonSetPodCommand struct { + entries []*cke.RebootQueueEntry + apiserver *cke.Node + + notifyFailedNode func(string) +} + +func (o *rebootDeleteDaemonSetPodOp) Name() string { + return "reboot-delete-daemonset-pod" +} + +func (o *rebootDeleteDaemonSetPodOp) notifyFailedNode(node string) { + o.mu.Lock() + o.failedNodes = append(o.failedNodes, node) + o.mu.Unlock() +} + +func (o *rebootDeleteDaemonSetPodOp) Targets() []string { + ipAddresses := make([]string, len(o.entries)) + for i, entry := range o.entries { + ipAddresses[i] = entry.Node + } + return ipAddresses +} + +func (o *rebootDeleteDaemonSetPodOp) Info() string { + if len(o.failedNodes) == 0 { + return "" + } + return fmt.Sprintf("failed to delete DaemonSet pods on some nodes: %v", o.failedNodes) +} + +func (o *rebootDeleteDaemonSetPodOp) NextCommand() cke.Commander { + if o.finished { + return nil + } + o.finished = true + + return rebootDeleteDaemonSetPodCommand{ + entries: o.entries, + apiserver: o.apiserver, + notifyFailedNode: o.notifyFailedNode, + } +} + +func (c rebootDeleteDaemonSetPodCommand) Command() cke.Command { + ipAddresses := make([]string, len(c.entries)) + for i, entry := range c.entries { + ipAddresses[i] = entry.Node + } + return cke.Command{ + Name: "rebootDeleteDaemonSetPodCommand", + Target: strings.Join(ipAddresses, ","), + } +} + +func (c rebootDeleteDaemonSetPodCommand) Run(ctx context.Context, inf cke.Infrastructure, _ string) error { + cs, err := inf.K8sClient(ctx, c.apiserver) + if err != nil { + return err + } + + // delete DaemonSet pod on each node + for _, entry := range c.entries { + // keep entry.Status as RebootStatusDraining and don't update it here. + + log.Info("start deletion of DaemonSet pod", map[string]interface{}{ + "name": entry.Node, + }) + err := deleteOnDeleteDaemonSetPod(ctx, cs, entry.Node) + if err != nil { + log.Warn("deletion of DaemonSet pod failed", map[string]interface{}{ + "name": entry.Node, + log.FnError: err, + }) + c.notifyFailedNode(entry.Node) + } + } + + return nil +} + +// + type rebootRebootOp struct { finished bool diff --git a/op/reboot_decide.go b/op/reboot_decide.go index c630470d..7eeefaf7 100644 --- a/op/reboot_decide.go +++ b/op/reboot_decide.go @@ -9,6 +9,7 @@ import ( "github.com/cybozu-go/cke" "github.com/cybozu-go/log" "github.com/cybozu-go/well" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" policyv1 "k8s.io/api/policy/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -61,6 +62,42 @@ func enumeratePods(ctx context.Context, cs *kubernetes.Clientset, node string, return nil } +// enumerateOnDeleteDaemonSetPods enumerates Pods on a specified node that are owned by "updateStrategy:OnDelete" DaemonSets. +// It calls podHandler for each target pods. +// If the handler returns error, this function returns the error immediately. +// Note: This function does not distinguish API errors and state evaluation returned from subfunction. +func enumerateOnDeleteDaemonSetPods(ctx context.Context, cs *kubernetes.Clientset, node string, + podHandler func(pod *corev1.Pod) error) error { + + daemonSets, err := cs.AppsV1().DaemonSets(corev1.NamespaceAll).List(ctx, metav1.ListOptions{}) + if err != nil { + return err + } + + for _, ds := range daemonSets.Items { + if ds.Spec.UpdateStrategy.Type == appsv1.OnDeleteDaemonSetStrategyType { + labelSelector := metav1.FormatLabelSelector(ds.Spec.Selector) + pods, err := cs.CoreV1().Pods(ds.Namespace).List(ctx, metav1.ListOptions{ + LabelSelector: labelSelector, + }) + if err != nil { + return err + } + + for _, pod := range pods.Items { + if pod.Spec.NodeName == node { + err = podHandler(&pod) + if err != nil { + return err + } + } + } + } + } + + return nil +} + // dryRunEvictOrDeleteNodePod checks eviction or deletion of Pods on the specified Node can proceed. // It returns an error if a running Pod exists or an eviction of the Pod in protected namespace failed. func dryRunEvictOrDeleteNodePod(ctx context.Context, cs *kubernetes.Clientset, node string, protected map[string]bool) error { @@ -73,6 +110,11 @@ func evictOrDeleteNodePod(ctx context.Context, cs *kubernetes.Clientset, node st return doEvictOrDeleteNodePod(ctx, cs, node, protected, attempts, interval, false) } +// deleteOnDeleteDaemonSetPod evicts or delete Pods on the specified Node that are owned by "updateStrategy:OnDelete" DaemonSets. +func deleteOnDeleteDaemonSetPod(ctx context.Context, cs *kubernetes.Clientset, node string) error { + return doDeleteOnDeleteDaemonSetPod(ctx, cs, node) +} + // doEvictOrDeleteNodePod evicts or delete Pods on the specified Node. // It first tries eviction. // If the eviction failed and the Pod's namespace is not protected, it deletes the Pod. @@ -163,6 +205,21 @@ func doEvictOrDeleteNodePod(ctx context.Context, cs *kubernetes.Clientset, node }) } +// doDeleteOnDeleteDaemonSetPod deletes 'OnDelete' DaemonSet pods on the specified Node. +func doDeleteOnDeleteDaemonSetPod(ctx context.Context, cs *kubernetes.Clientset, node string) error { + return enumerateOnDeleteDaemonSetPods(ctx, cs, node, func(pod *corev1.Pod) error { + err := cs.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return err + } + log.Info("deleted daemonset pod", map[string]interface{}{ + "namespace": pod.Namespace, + "name": pod.Name, + }) + return nil + }) +} + // checkPodDeletion checks whether the evicted or deleted Pods are eventually deleted. // If those pods still exist, this function returns an error. func checkPodDeletion(ctx context.Context, cs *kubernetes.Clientset, node string) error { diff --git a/server/strategy.go b/server/strategy.go index c24fbd05..a29b3c3d 100644 --- a/server/strategy.go +++ b/server/strategy.go @@ -898,6 +898,8 @@ func rebootOps(c *cke.Cluster, constraints *cke.Constraints, rebootArgs DecideOp } if len(rebootArgs.DrainCompleted) > 0 { + // After eviction of normal pods, evict "OnDelete" daemonset pods. + ops = append(ops, op.RebootDeleteDaemonSetPodOp(nf.HealthyAPIServer(), rebootArgs.DrainCompleted, &c.Reboot)) ops = append(ops, op.RebootRebootOp(nf.HealthyAPIServer(), rebootArgs.DrainCompleted, &c.Reboot)) } if len(rebootArgs.NewlyDrained) > 0 { diff --git a/server/strategy_test.go b/server/strategy_test.go index c09a684a..a98e9303 100644 --- a/server/strategy_test.go +++ b/server/strategy_test.go @@ -2637,6 +2637,7 @@ func TestDecideOps(t *testing.T) { }, }), ExpectedOps: []opData{ + {"reboot-delete-daemonset-pod", 1}, {"reboot-reboot", 1}, }, },