diff --git a/cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider.go b/cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider.go index 3a31cc7f2b7..e6ff41de627 100644 --- a/cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider.go +++ b/cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider.go @@ -225,20 +225,11 @@ func (mcm *mcmCloudProvider) checkMCMAvailableReplicas() error { // Refresh is called before every main loop and can be used to dynamically update cloud provider state. // In particular the list of node groups returned by NodeGroups can change as a result of CloudProvider.Refresh(). func (mcm *mcmCloudProvider) Refresh() error { - err := mcm.checkMCMAvailableReplicas() if err != nil { return err } - - for _, machineDeployment := range mcm.machinedeployments { - err := mcm.mcmManager.resetPriorityForNotToBeDeletedMachines(machineDeployment.Name) - if err != nil { - klog.Errorf("failed to reset priority for machines in MachineDeployment %s, err: %v", machineDeployment.Name, err.Error()) - return err - } - } - return nil + return mcm.mcmManager.Refresh() } // GPULabel returns the label added to nodes with GPU resource. diff --git a/cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider_test.go b/cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider_test.go index 1ef72a061af..6752c249785 100644 --- a/cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider_test.go +++ b/cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider_test.go @@ -153,7 +153,7 @@ func TestDeleteNodes(t *testing.T) { }, }, { - "should not scale down when machine deployment update call times out", + "should not scale down when machine deployment update call times out and should reset priority of the corresponding machine", setup{ nodes: newNodes(2, "fakeID", []bool{true, false}), machines: newMachines(2, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"3", "3"}, []bool{false, false}), @@ -168,10 +168,10 @@ func TestDeleteNodes(t *testing.T) { }, action{node: newNodes(1, "fakeID", []bool{true})[0]}, expect{ - machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}, []bool{false}), + machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"3"}, []bool{false}), mdName: "machinedeployment-1", mdReplicas: 2, - err: fmt.Errorf("unable to scale in machine deployment machinedeployment-1, Error: %v", mdUpdateErrorMsg), + err: errors.Join(nil, fmt.Errorf("unable to scale in machine deployment machinedeployment-1, Error: %w", errors.New(mdUpdateErrorMsg))), }, }, { @@ -332,13 +332,13 @@ func TestDeleteNodes(t *testing.T) { flag := false for _, entryMachineItem := range entry.expect.machines { if entryMachineItem.Name == machine.Name { - g.Expect(machine.Annotations[priorityAnnotationKey]).To(Equal(entryMachineItem.Annotations[priorityAnnotationKey])) + g.Expect(machine.Annotations[machinePriorityAnnotation]).To(Equal(entryMachineItem.Annotations[machinePriorityAnnotation])) flag = true break } } if !flag { - g.Expect(machine.Annotations[priorityAnnotationKey]).To(Equal("3")) + g.Expect(machine.Annotations[machinePriorityAnnotation]).To(Equal("3")) } } }) @@ -357,7 +357,6 @@ func TestRefresh(t *testing.T) { } table := []data{ { - "should return an error if MCM has zero available replicas", setup{ nodes: newNodes(1, "fakeID", []bool{false}), @@ -371,7 +370,6 @@ func TestRefresh(t *testing.T) { }, }, { - "should return an error if MCM deployment is not found", setup{ nodes: newNodes(1, "fakeID", []bool{false}), @@ -384,8 +382,7 @@ func TestRefresh(t *testing.T) { }, }, { - - "should reset priority of a machine with node without ToBeDeletedTaint to 3", + "should reset priority of a machine to 3 if machine deployment is not scaled in", setup{ nodes: newNodes(1, "fakeID", []bool{false}), machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}, []bool{false}), @@ -399,7 +396,7 @@ func TestRefresh(t *testing.T) { }, }, { - "should not reset priority of a machine to 3 if the node has ToBeDeleted taint", + "should reset priority of a machine to 3 if machine deployment is not scaled in even if ToBeDeletedTaint is present on the corresponding node", setup{ nodes: newNodes(1, "fakeID", []bool{true}), machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}, []bool{false}), @@ -407,6 +404,56 @@ func TestRefresh(t *testing.T) { nodeGroups: []string{nodeGroup2}, mcmDeployment: newMCMDeployment(1), }, + expect{ + machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"3"}, []bool{false}), + err: nil, + }, + }, + { + "should NOT skip paused machine deployment", + setup{ + nodes: newNodes(1, "fakeID", []bool{false}), + machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}, []bool{false}), + machineDeployments: newMachineDeployments(1, 1, &v1alpha1.MachineDeploymentStatus{ + Conditions: []v1alpha1.MachineDeploymentCondition{ + {Type: v1alpha1.MachineDeploymentProgressing, Status: v1alpha1.ConditionUnknown, Reason: machineDeploymentPausedReason}, + }, + }, nil, nil), + nodeGroups: []string{nodeGroup2}, + mcmDeployment: newMCMDeployment(1), + }, + expect{ + machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"3"}, []bool{false}), + err: nil, + }, + }, + { + "should ignore terminating/failed machines in checking if number of annotated machines is more than desired", + setup{ + nodes: newNodes(1, "fakeID", []bool{true}), + machines: newMachines(1, "fakeID", &v1alpha1.MachineStatus{ + CurrentStatus: v1alpha1.CurrentStatus{Phase: v1alpha1.MachineFailed}, + }, "machinedeployment-1", "machineset-1", []string{"1"}, []bool{false}), + machineDeployments: newMachineDeployments(1, 1, nil, nil, nil), + nodeGroups: []string{nodeGroup2}, + mcmDeployment: newMCMDeployment(1), + }, + expect{ + machines: newMachines(1, "fakeID", &v1alpha1.MachineStatus{ + CurrentStatus: v1alpha1.CurrentStatus{Phase: v1alpha1.MachineFailed}, + }, "machinedeployment-1", "machineset-1", []string{"1"}, []bool{false}), + err: nil, + }, + }, + { + "should not reset priority of a machine to 3 if machine deployment is scaled in", + setup{ + nodes: newNodes(1, "fakeID", []bool{true}), + machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}, []bool{false}), + machineDeployments: newMachineDeployments(1, 0, nil, nil, nil), + nodeGroups: []string{nodeGroup2}, + mcmDeployment: newMCMDeployment(1), + }, expect{ machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}, []bool{false}), err: nil, @@ -428,7 +475,7 @@ func TestRefresh(t *testing.T) { }, expect{ machines: []*v1alpha1.Machine{newMachine("machine-1", "fakeID-1", nil, "machinedeployment-1", "machineset-1", "1", false, true)}, - err: errors.Join(fmt.Errorf("could not reset priority annotation on machine machine-1, Error: %v", mcUpdateErrorMsg)), + err: errors.Join(nil, errors.Join(fmt.Errorf("could not reset priority annotation on machine machine-1, Error: %v", mcUpdateErrorMsg))), }, }, } @@ -461,7 +508,7 @@ func TestRefresh(t *testing.T) { for _, mc := range entry.expect.machines { machine, err := m.machineClient.Machines(m.namespace).Get(context.TODO(), mc.Name, metav1.GetOptions{}) g.Expect(err).To(BeNil()) - g.Expect(mc.Annotations[priorityAnnotationKey]).To(Equal(machine.Annotations[priorityAnnotationKey])) + g.Expect(mc.Annotations[machinePriorityAnnotation]).To(Equal(machine.Annotations[machinePriorityAnnotation])) } }) } diff --git a/cluster-autoscaler/cloudprovider/mcm/mcm_manager.go b/cluster-autoscaler/cloudprovider/mcm/mcm_manager.go index 0b7d59d3164..399862c9e38 100644 --- a/cluster-autoscaler/cloudprovider/mcm/mcm_manager.go +++ b/cluster-autoscaler/cloudprovider/mcm/mcm_manager.go @@ -59,7 +59,6 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/config/dynamic" "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset" "k8s.io/autoscaler/cluster-autoscaler/utils/gpu" - "k8s.io/autoscaler/cluster-autoscaler/utils/taints" "k8s.io/client-go/discovery" appsinformers "k8s.io/client-go/informers" coreinformers "k8s.io/client-go/informers" @@ -77,8 +76,10 @@ const ( // defaultResetAnnotationTimeout is the timeout for resetting the priority annotation of a machine defaultResetAnnotationTimeout = 10 * time.Second // defaultPriorityValue is the default value for the priority annotation used by CA. It is set to 3 because MCM defaults the priority of machine it creates to 3. - defaultPriorityValue = "3" - minResyncPeriodDefault = 1 * time.Hour + defaultPriorityValue = "3" + // priorityValueForCandidateMachines is the priority annotation value set on machines that the CA wants to be deleted. Its value is set to 1. + priorityValueForCandidateMachines = "1" + minResyncPeriodDefault = 1 * time.Hour // machinePriorityAnnotation is the annotation to set machine priority while deletion machinePriorityAnnotation = "machinepriority.machine.sapcloud.io" // kindMachineClass is the kind for generic machine class used by the OOT providers @@ -91,13 +92,10 @@ const ( machineGroup = "machine.sapcloud.io" // machineGroup is the API version used to identify machine API group objects machineVersion = "v1alpha1" - // machineDeploymentProgressing tells that deployment is progressing. Progress for a MachineDeployment is considered when a new machine set is created or adopted, and when new machines scale up or old machines scale down. - // Progress is not estimated for paused MachineDeployments. It is also updated if progressDeadlineSeconds is not specified(treated as infinite deadline), in which case it would never be updated to "false". - machineDeploymentProgressing v1alpha1.MachineDeploymentConditionType = "Progressing" // newISAvailableReason is the reason in "Progressing" condition when machineDeployment rollout is complete newISAvailableReason = "NewMachineSetAvailable" - // conditionTrue means the given condition status is true - conditionTrue v1alpha1.ConditionStatus = "True" + // machineDeploymentPausedReason is the reason in "Progressing" condition when machineDeployment is paused + machineDeploymentPausedReason = "DeploymentPaused" // machineDeploymentNameLabel key for Machine Deployment name in machine labels machineDeploymentNameLabel = "name" ) @@ -411,9 +409,52 @@ func (m *McmManager) GetMachineDeploymentForMachine(machine *Ref) (*MachineDeplo }, nil } -// Refresh does nothing at the moment. +// Refresh method, for each machine deployment, will reset the priority of the machines if the number of annotated machines is more than desired. +// It will select the machines to reset the priority based on the descending order of creation timestamp. func (m *McmManager) Refresh() error { - return nil + machineDeployments, err := m.machineDeploymentLister.MachineDeployments(m.namespace).List(labels.Everything()) + if err != nil { + klog.Errorf("[Refresh] unable to list machine deployments") + return err + } + var collectiveError error + for _, machineDeployment := range machineDeployments { + // ignore the machine deployment if it is in rolling update + if !isRollingUpdateFinished(machineDeployment) { + klog.Infof("[Refresh] machine deployment %s is under rolling update, skipping", machineDeployment.Name) + continue + } + replicas := machineDeployment.Spec.Replicas + // check if number of annotated machine objects is more than desired and correspondingly reset the priority annotation value if needed. + machines, err := m.getMachinesForMachineDeployment(machineDeployment.Name) + if err != nil { + klog.Errorf("[Refresh] failed to get machines for machine deployment %s, hence skipping it. Err: %v", machineDeployment.Name, err.Error()) + collectiveError = errors.Join(collectiveError, err) + continue + } + var machinesMarkedForDeletion []*v1alpha1.Machine + for _, machine := range machines { + // no need to reset priority for machines already in termination or failed phase + if machine.Status.CurrentStatus.Phase == v1alpha1.MachineTerminating || machine.Status.CurrentStatus.Phase == v1alpha1.MachineFailed { + continue + } + if annotValue, ok := machine.Annotations[machinePriorityAnnotation]; ok && annotValue == priorityValueForCandidateMachines { + machinesMarkedForDeletion = append(machinesMarkedForDeletion, machine) + } + } + if int(replicas) > len(machines)-len(machinesMarkedForDeletion) { + slices.SortStableFunc(machinesMarkedForDeletion, func(m1, m2 *v1alpha1.Machine) int { + return -m1.CreationTimestamp.Compare(m2.CreationTimestamp.Time) + }) + diff := int(replicas) - len(machines) + len(machinesMarkedForDeletion) + targetRefs := make([]*Ref, 0, diff) + for i := 0; i < min(diff, len(machinesMarkedForDeletion)); i++ { + targetRefs = append(targetRefs, &Ref{Name: machinesMarkedForDeletion[i].Name, Namespace: machinesMarkedForDeletion[i].Namespace}) + } + collectiveError = errors.Join(collectiveError, m.resetPriorityForMachines(targetRefs)) + } + } + return collectiveError } // Cleanup does nothing at the moment. @@ -449,7 +490,7 @@ func (m *McmManager) SetMachineDeploymentSize(ctx context.Context, machinedeploy return true, err } -// DeleteMachines deletes the Machines and also reduces the desired replicas of the MachineDeployment in parallel. +// DeleteMachines annotates the target machines and also reduces the desired replicas of the MachineDeployment. func (m *McmManager) DeleteMachines(targetMachineRefs []*Ref) error { if len(targetMachineRefs) == 0 { return nil @@ -468,7 +509,7 @@ func (m *McmManager) DeleteMachines(targetMachineRefs []*Ref) error { return fmt.Errorf("MachineDeployment %s is under rolling update , cannot reduce replica count", commonMachineDeployment.Name) } // update priorities of machines to be deleted except the ones already in termination to 1 - scaleDownAmount, err := m.prioritizeMachinesForDeletion(targetMachineRefs, commonMachineDeployment.Name) + scaleDownAmount, err := m.prioritizeMachinesForDeletion(targetMachineRefs) if err != nil { return err } @@ -477,33 +518,26 @@ func (m *McmManager) DeleteMachines(targetMachineRefs []*Ref) error { return m.scaleDownMachineDeployment(ctx, commonMachineDeployment.Name, scaleDownAmount) }, "MachineDeployment", "update", commonMachineDeployment.Name) if err != nil { - klog.Errorf("unable to scale in machine deployment %s, Error: %v", commonMachineDeployment.Name, err) - return fmt.Errorf("unable to scale in machine deployment %s, Error: %v", commonMachineDeployment.Name, err) + klog.Errorf("unable to scale in machine deployment %s, will reset priority of target machines, Error: %v", commonMachineDeployment.Name, err) + return errors.Join(err, m.resetPriorityForMachines(targetMachineRefs)) } return nil } -// resetPriorityForNotToBeDeletedMachines resets the priority of machines with nodes without ToBeDeleted taint to 3 -func (m *McmManager) resetPriorityForNotToBeDeletedMachines(mdName string) error { - allMachinesForMachineDeployment, err := m.getMachinesForMachineDeployment(mdName) - if err != nil { - return fmt.Errorf("unable to list all machines for node group %s, Error: %v", mdName, err) - } +// resetPriorityForMachines resets the priority of machines passed in the argument to defaultPriorityValue +func (m *McmManager) resetPriorityForMachines(mcRefs []*Ref) error { var collectiveError error - for _, machine := range allMachinesForMachineDeployment { + for _, mcRef := range mcRefs { + machine, err := m.machineLister.Machines(m.namespace).Get(mcRef.Name) + if err != nil { + collectiveError = errors.Join(collectiveError, fmt.Errorf("unable to get Machine object %s, Error: %v", mcRef, err)) + continue + } ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(defaultResetAnnotationTimeout)) - err := func() error { + err = func() error { defer cancelFn() val, ok := machine.Annotations[machinePriorityAnnotation] if ok && val != defaultPriorityValue { - nodeName := machine.Labels[v1alpha1.NodeLabelKey] - node, err := m.nodeLister.Get(nodeName) - if err != nil && !kube_errors.IsNotFound(err) { - return fmt.Errorf("unable to get Node object %s for machine %s, Error: %v", nodeName, machine.Name, err) - } else if err == nil && taints.HasToBeDeletedTaint(node) { - // Don't update priority annotation if the taint is present on the node - return nil - } _, err = m.updateAnnotationOnMachine(ctx, machine.Name, machinePriorityAnnotation, defaultPriorityValue) return err } @@ -518,7 +552,7 @@ func (m *McmManager) resetPriorityForNotToBeDeletedMachines(mdName string) error } // prioritizeMachinesForDeletion prioritizes the targeted machines by updating their priority annotation to 1 -func (m *McmManager) prioritizeMachinesForDeletion(targetMachineRefs []*Ref, mdName string) (int, error) { +func (m *McmManager) prioritizeMachinesForDeletion(targetMachineRefs []*Ref) (int, error) { var expectedToTerminateMachineNodePairs = make(map[string]string) for _, machineRef := range targetMachineRefs { // Trying to update the priority of machineRef till m.maxRetryTimeout @@ -536,7 +570,7 @@ func (m *McmManager) prioritizeMachinesForDeletion(targetMachineRefs []*Ref, mdN return false, nil } expectedToTerminateMachineNodePairs[mc.Name] = mc.Labels["node"] - return m.updateAnnotationOnMachine(ctx, mc.Name, machinePriorityAnnotation, "1") + return m.updateAnnotationOnMachine(ctx, mc.Name, machinePriorityAnnotation, priorityValueForCandidateMachines) }, "Machine", "update", machineRef.Name); err != nil { klog.Errorf("could not prioritize machine %s for deletion, aborting scale in of machine deployment, Error: %v", machineRef.Name, err) return 0, fmt.Errorf("could not prioritize machine %s for deletion, aborting scale in of machine deployment, Error: %v", machineRef.Name, err) @@ -570,7 +604,7 @@ func (m *McmManager) updateAnnotationOnMachine(ctx context.Context, mcName strin } _, err = m.machineClient.Machines(machine.Namespace).Update(ctx, clone, metav1.UpdateOptions{}) if err == nil { - klog.Infof("Machine %s marked with priority 1 successfully", mcName) + klog.Infof("Machine %s marked with priority %s successfully", mcName, val) } return true, err } @@ -594,7 +628,7 @@ func (m *McmManager) scaleDownMachineDeployment(ctx context.Context, mdName stri mdclone.Spec.Replicas = expectedReplicas _, err = m.machineClient.MachineDeployments(mdclone.Namespace).Update(ctx, mdclone, metav1.UpdateOptions{}) if err != nil { - return true, err + return true, fmt.Errorf("unable to scale in machine deployment %s, Error: %w", mdName, err) } klog.V(2).Infof("MachineDeployment %s size decreased to %d ", mdclone.Name, mdclone.Spec.Replicas) return false, nil @@ -885,9 +919,12 @@ func (m *McmManager) GetMachineDeploymentNodeTemplate(machinedeployment *Machine func isRollingUpdateFinished(md *v1alpha1.MachineDeployment) bool { for _, cond := range md.Status.Conditions { switch { - case cond.Type == machineDeploymentProgressing && cond.Status == conditionTrue && cond.Reason == newISAvailableReason: + case cond.Type == v1alpha1.MachineDeploymentProgressing && cond.Status == v1alpha1.ConditionTrue && cond.Reason == newISAvailableReason: + return true + // NOTE:- This check is for paused machine deployments as is taken from MCM. If the check in MCM changes, this should be updated. + case cond.Type == v1alpha1.MachineDeploymentProgressing && cond.Status == v1alpha1.ConditionUnknown && cond.Reason == machineDeploymentPausedReason: return true - case cond.Type == machineDeploymentProgressing: + case cond.Type == v1alpha1.MachineDeploymentProgressing: return false } } diff --git a/cluster-autoscaler/cloudprovider/mcm/test_utils.go b/cluster-autoscaler/cloudprovider/mcm/test_utils.go index a0d53acb62a..3c3f55e4c69 100644 --- a/cluster-autoscaler/cloudprovider/mcm/test_utils.go +++ b/cluster-autoscaler/cloudprovider/mcm/test_utils.go @@ -30,9 +30,8 @@ import ( ) var ( - testNamespace = "test-namespace" - priorityAnnotationKey = "machinepriority.machine.sapcloud.io" - testTaintValue = fmt.Sprint(time.Now().Unix()) + testNamespace = "test-namespace" + testTaintValue = fmt.Sprint(time.Now().Unix()) ) func newMachineDeployments( @@ -135,7 +134,7 @@ func newMachines( {Name: msName}, }, Labels: map[string]string{machineDeploymentNameLabel: mdName}, - Annotations: map[string]string{priorityAnnotationKey: priorityAnnotationValues[i]}, + Annotations: map[string]string{machinePriorityAnnotation: priorityAnnotationValues[i]}, CreationTimestamp: metav1.Now(), }, }