-
Notifications
You must be signed in to change notification settings - Fork 464
Implement the PodTermination controller to gracefully handle "stuck" pods
#7312
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 21 commits
b4d8577
c8e14ce
e5e01fd
c1c3739
d18e6ba
3c8a8ae
4576252
9aa810a
ec481a2
d379ac4
92c4e3a
178119f
21ead2e
86e4bc8
b2c6e58
00254ab
a1a9346
b494149
68e1f58
b981205
1044989
3f11131
606dcc6
f5f1d2d
e0024de
2b70919
e266319
bee8767
20d4563
c3a5c37
d15bb80
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,140 @@ | ||
| /* | ||
| Copyright The Kubernetes Authors. | ||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||
| you may not use this file except in compliance with the License. | ||
| You may obtain a copy of the License at | ||
| http://www.apache.org/licenses/LICENSE-2.0 | ||
| Unless required by applicable law or agreed to in writing, software | ||
| distributed under the License is distributed on an "AS IS" BASIS, | ||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| See the License for the specific language governing permissions and | ||
| limitations under the License. | ||
| */ | ||
|
|
||
| package failurerecovery | ||
|
|
||
| import ( | ||
| "context" | ||
| "time" | ||
|
|
||
| corev1 "k8s.io/api/core/v1" | ||
| "k8s.io/apimachinery/pkg/types" | ||
| "k8s.io/utils/clock" | ||
| ctrl "sigs.k8s.io/controller-runtime" | ||
| "sigs.k8s.io/controller-runtime/pkg/client" | ||
|
|
||
| "sigs.k8s.io/kueue/pkg/controller/constants" | ||
| utilpod "sigs.k8s.io/kueue/pkg/util/pod" | ||
| utiltaints "sigs.k8s.io/kueue/pkg/util/taints" | ||
| ) | ||
|
|
||
| // +kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch | ||
| // +kubebuilder:rbac:groups="",resources=pods/status,verbs=get;patch | ||
| // +kubebuilder:rbac:groups="",resources=nodes,verbs=get;list;watch | ||
|
|
||
| var ( | ||
| realClock = clock.RealClock{} | ||
| ) | ||
|
|
||
| type TerminatingPodReconciler struct { | ||
| client client.Client | ||
| clock clock.Clock | ||
| forcefulTerminationGracePeriod time.Duration | ||
| } | ||
|
|
||
| type TerminatingPodReconcilerOptions struct { | ||
| clock clock.Clock | ||
| forcefulTerminationGracePeriod time.Duration | ||
| } | ||
|
|
||
| type TerminatingPodReconcilerOption func(*TerminatingPodReconcilerOptions) | ||
|
|
||
| func WithClock(c clock.Clock) TerminatingPodReconcilerOption { | ||
| return func(o *TerminatingPodReconcilerOptions) { | ||
| o.clock = c | ||
| } | ||
| } | ||
|
|
||
| func WithForcefulTerminationGracePeriod(t time.Duration) TerminatingPodReconcilerOption { | ||
| return func(o *TerminatingPodReconcilerOptions) { | ||
| o.forcefulTerminationGracePeriod = t | ||
| } | ||
| } | ||
|
|
||
| var defaultOptions = TerminatingPodReconcilerOptions{ | ||
| clock: realClock, | ||
| forcefulTerminationGracePeriod: time.Minute, | ||
| } | ||
|
|
||
| func NewTerminatingPodReconciler( | ||
| client client.Client, | ||
| opts ...TerminatingPodReconcilerOption, | ||
| ) (*TerminatingPodReconciler, error) { | ||
| options := defaultOptions | ||
| for _, opt := range opts { | ||
| opt(&options) | ||
| } | ||
|
|
||
| return &TerminatingPodReconciler{ | ||
| client: client, | ||
| clock: options.clock, | ||
| forcefulTerminationGracePeriod: options.forcefulTerminationGracePeriod, | ||
| }, nil | ||
| } | ||
|
|
||
| func (r *TerminatingPodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { | ||
| pod := &corev1.Pod{} | ||
| if err := r.client.Get(ctx, req.NamespacedName, pod); err != nil { | ||
| return ctrl.Result{}, client.IgnoreNotFound(err) | ||
| } | ||
|
|
||
| // Pod did not opt-in to be forcefully terminated | ||
| annotationValue, hasAnnotation := pod.Annotations[constants.SafeToForcefullyTerminateAnnotationKey] | ||
| if !hasAnnotation || annotationValue != constants.SafeToForcefullyTerminateAnnotationValue { | ||
| return ctrl.Result{}, nil | ||
| } | ||
|
|
||
| // Pod was not marked for termination | ||
| if pod.DeletionTimestamp.IsZero() { | ||
| return ctrl.Result{}, nil | ||
| } | ||
|
|
||
| // Pod is not in a running phase | ||
| if utilpod.IsTerminated(pod) { | ||
| return ctrl.Result{}, nil | ||
| } | ||
kshalot marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| node := &corev1.Node{} | ||
| nodeKey := types.NamespacedName{Name: pod.Spec.NodeName} | ||
| if err := r.client.Get(ctx, nodeKey, node); err != nil { | ||
| return ctrl.Result{}, err | ||
| } | ||
| // Pod is not scheduled on an unreachable node | ||
| if !utiltaints.TaintKeyExists(node.Spec.Taints, corev1.TaintNodeUnreachable) { | ||
| return ctrl.Result{}, nil | ||
| } | ||
|
|
||
| now := r.clock.Now() | ||
| forcefulTerminationThreshold := pod.DeletionTimestamp.Add(r.forcefulTerminationGracePeriod) | ||
| if now.Before(forcefulTerminationThreshold) { | ||
| remainingTime := forcefulTerminationThreshold.Sub(now) | ||
| return ctrl.Result{RequeueAfter: remainingTime}, nil | ||
| } | ||
kshalot marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| podPatch := pod.DeepCopy() | ||
| podPatch.Status.Phase = corev1.PodFailed | ||
kshalot marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if err := r.client.Status().Patch(ctx, podPatch, client.MergeFrom(pod)); err != nil { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This patch may potentially override some conditions or status changes done concurrently by another controller. To avoid that use our helper in clientutil which supports "strict" mode which compares the ResrouceVersion. |
||
| return ctrl.Result{}, err | ||
| } | ||
|
|
||
| return ctrl.Result{}, nil | ||
| } | ||
|
|
||
| func (r *TerminatingPodReconciler) SetupWithManager(mgr ctrl.Manager) error { | ||
| return ctrl.NewControllerManagedBy(mgr). | ||
| For(&corev1.Pod{}). | ||
| Complete(r) | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,177 @@ | ||
| /* | ||
| Copyright The Kubernetes Authors. | ||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||
| you may not use this file except in compliance with the License. | ||
| You may obtain a copy of the License at | ||
| http://www.apache.org/licenses/LICENSE-2.0 | ||
| Unless required by applicable law or agreed to in writing, software | ||
| distributed under the License is distributed on an "AS IS" BASIS, | ||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| See the License for the specific language governing permissions and | ||
| limitations under the License. | ||
| */ | ||
|
|
||
| package failurerecovery | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| "testing" | ||
| "time" | ||
|
|
||
| "github.com/google/go-cmp/cmp" | ||
| "github.com/google/go-cmp/cmp/cmpopts" | ||
| corev1 "k8s.io/api/core/v1" | ||
| apierrors "k8s.io/apimachinery/pkg/api/errors" | ||
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
| "k8s.io/apimachinery/pkg/runtime/schema" | ||
| testingclock "k8s.io/utils/clock/testing" | ||
| ctrl "sigs.k8s.io/controller-runtime" | ||
| "sigs.k8s.io/controller-runtime/pkg/client" | ||
| "sigs.k8s.io/controller-runtime/pkg/reconcile" | ||
|
|
||
| "sigs.k8s.io/kueue/pkg/controller/constants" | ||
| utiltesting "sigs.k8s.io/kueue/pkg/util/testing" | ||
| testingnode "sigs.k8s.io/kueue/pkg/util/testingjobs/node" | ||
| testingpod "sigs.k8s.io/kueue/pkg/util/testingjobs/pod" | ||
| ) | ||
|
|
||
| var ( | ||
| podCmpOpts = cmp.Options{ | ||
| cmpopts.EquateEmpty(), | ||
| cmpopts.IgnoreFields( | ||
| corev1.Pod{}, "ObjectMeta.ResourceVersion", "ObjectMeta.DeletionTimestamp", | ||
| ), | ||
| } | ||
| ) | ||
|
|
||
| func TestReconciler(t *testing.T) { | ||
| now := time.Now() | ||
| nowSecondPrecision := metav1.NewTime(now).Rfc3339Copy() | ||
| beforeGracePeriod := now.Add(-time.Minute * 3) | ||
| fakeClock := testingclock.NewFakeClock(now) | ||
|
|
||
| unreachableNode := testingnode.MakeNode("unreachable-node"). | ||
| Taints(corev1.Taint{Key: corev1.TaintNodeUnreachable}).Obj() | ||
| healthyNode := testingnode.MakeNode("healthy-node").Obj() | ||
| podToForcefullyTerminate := testingpod.MakePod("pod", ""). | ||
| StatusPhase(corev1.PodRunning). | ||
| Annotation(constants.SafeToForcefullyTerminateAnnotationKey, constants.SafeToForcefullyTerminateAnnotationValue). | ||
| NodeName(unreachableNode.Name). | ||
| DeletionTimestamp(beforeGracePeriod). | ||
| KueueFinalizer() | ||
|
|
||
| cases := map[string]struct { | ||
| testPod *corev1.Pod | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's split this into Yes, this is more lines of test code, but very declarative in nature
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. lgtm |
||
| wantResult ctrl.Result | ||
| wantErr error | ||
| wantPod *corev1.Pod | ||
| }{ | ||
| "pod is not found": { | ||
| testPod: testingpod.MakePod("pod2", "").Obj(), | ||
| wantResult: ctrl.Result{}, | ||
| wantErr: nil, | ||
| }, | ||
| "pod did not opt-in with annotation": { | ||
| testPod: podToForcefullyTerminate. | ||
| Clone(). | ||
| Annotation(constants.SafeToForcefullyTerminateAnnotationKey, "false"). | ||
| Obj(), | ||
| wantResult: ctrl.Result{}, | ||
| wantErr: nil, | ||
| }, | ||
| "pod is not marked for termination": { | ||
| testPod: podToForcefullyTerminate. | ||
| Clone(). | ||
| DeletionTimestamp(time.Time{}). | ||
| Obj(), | ||
| wantResult: ctrl.Result{}, | ||
| wantErr: nil, | ||
| }, | ||
| "pod is in failed phase": { | ||
| testPod: podToForcefullyTerminate. | ||
| Clone(). | ||
| StatusPhase(corev1.PodFailed). | ||
| Obj(), | ||
| wantResult: ctrl.Result{}, | ||
| wantErr: nil, | ||
| }, | ||
| "pod is in succeeded phase": { | ||
| testPod: podToForcefullyTerminate. | ||
| Clone(). | ||
| StatusPhase(corev1.PodSucceeded). | ||
| Obj(), | ||
| wantResult: ctrl.Result{}, | ||
| wantErr: nil, | ||
| }, | ||
| "pod is not scheduled on an unreachable node": { | ||
| testPod: podToForcefullyTerminate. | ||
| Clone(). | ||
| NodeName(healthyNode.Name). | ||
| Obj(), | ||
| wantResult: ctrl.Result{}, | ||
| wantErr: nil, | ||
| }, | ||
| "forceful termination grace period did not elapse for pod": { | ||
| testPod: podToForcefullyTerminate. | ||
| Clone(). | ||
| DeletionTimestamp(now). | ||
| Obj(), | ||
| wantResult: ctrl.Result{RequeueAfter: nowSecondPrecision.Add(time.Minute).Sub(now)}, | ||
| wantErr: nil, | ||
| }, | ||
| "forceful termination grace period elapsed for pod": { | ||
| testPod: podToForcefullyTerminate.Clone().Obj(), | ||
| wantResult: ctrl.Result{}, | ||
| wantErr: nil, | ||
| wantPod: podToForcefullyTerminate.Clone().StatusPhase(corev1.PodFailed).Obj(), | ||
| }, | ||
| "pod is scheduled on a node that does not exist": { | ||
| testPod: podToForcefullyTerminate.Clone().NodeName("missing-node").Obj(), | ||
| wantResult: ctrl.Result{}, | ||
| wantErr: apierrors.NewNotFound(schema.GroupResource{Group: corev1.GroupName, Resource: "nodes"}, "missing-node"), | ||
| }, | ||
| } | ||
|
|
||
| for name, tc := range cases { | ||
| t.Run(name, func(t *testing.T) { | ||
| objs := []client.Object{tc.testPod, healthyNode, unreachableNode} | ||
| clientBuilder := utiltesting.NewClientBuilder().WithObjects(objs...) | ||
| cl := clientBuilder.Build() | ||
| reconciler, err := NewTerminatingPodReconciler(cl, WithClock(fakeClock)) | ||
| if err != nil { | ||
| t.Fatalf("could not create reconciler: %v", err) | ||
| } | ||
|
|
||
| ctxWithLogger, _ := utiltesting.ContextWithLog(t) | ||
| ctx, ctxCancel := context.WithCancel(ctxWithLogger) | ||
| defer ctxCancel() | ||
|
|
||
| gotResult, gotError := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(tc.testPod)}) | ||
|
|
||
| if diff := cmp.Diff(tc.wantResult, gotResult); diff != "" { | ||
| fmt.Println(tc.testPod.DeletionTimestamp.Add(time.Minute).Sub(now)) | ||
| t.Errorf("unexpected reconcile result (-want/+got):\n%s", diff) | ||
| } | ||
|
|
||
| if diff := cmp.Diff(tc.wantErr, gotError); diff != "" { | ||
| t.Errorf("unexpected reconcile error (-want/+got):\n%s", diff) | ||
| } | ||
|
|
||
| gotPod := podToForcefullyTerminate.Clone().Obj() | ||
| if err := cl.Get(ctx, client.ObjectKeyFromObject(tc.testPod), gotPod); err != nil { | ||
| t.Fatalf("could not get pod after reconcile") | ||
| } | ||
| wantPod := tc.wantPod | ||
| if wantPod == nil { | ||
| wantPod = tc.testPod | ||
| } | ||
| if diff := cmp.Diff(wantPod, gotPod, podCmpOpts...); diff != "" { | ||
| t.Errorf("Workloads after reconcile (-want,+got):\n%s", diff) | ||
| } | ||
| }) | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.