Skip to content

Commit b33f5ae

Browse files
"max-replacement-nodes" feature to avoid cluster ballooning during upgrades. (keikoproj#328)
* add feature max-replacement-nodes Signed-off-by: sbadiger <[email protected]>
1 parent 51fef36 commit b33f5ae

File tree

7 files changed

+181
-38
lines changed

7 files changed

+181
-38
lines changed

controllers/helpers_test.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,9 @@ func createRollingUpgradeReconciler(t *testing.T) *RollingUpgradeReconciler {
5151
ScriptRunner: ScriptRunner{
5252
Logger: logger,
5353
},
54-
DrainGroupMapper: &sync.Map{},
55-
DrainErrorMapper: &sync.Map{},
54+
DrainGroupMapper: &sync.Map{},
55+
DrainErrorMapper: &sync.Map{},
56+
ReplacementNodesMap: &sync.Map{},
5657
}
5758
return reconciler
5859

@@ -72,8 +73,10 @@ func createRollingUpgradeContext(r *RollingUpgradeReconciler) *RollingUpgradeCon
7273
DrainErrors: drainErrs.(chan error),
7374
DrainGroup: drainGroup.(*sync.WaitGroup),
7475
},
75-
RollingUpgrade: rollingUpgrade,
76-
metricsMutex: &sync.Mutex{},
76+
RollingUpgrade: rollingUpgrade,
77+
metricsMutex: &sync.Mutex{},
78+
ReplacementNodesMap: r.ReplacementNodesMap,
79+
MaxReplacementNodes: r.MaxReplacementNodes,
7780
}
7881

7982
}

controllers/rollingupgrade_controller.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ type RollingUpgradeReconciler struct {
5555
ReconcileMap *sync.Map
5656
DrainTimeout int
5757
IgnoreDrainFailures bool
58+
ReplacementNodesMap *sync.Map
59+
MaxReplacementNodes int
5860
}
5961

6062
// RollingUpgradeAuthenticator has the clients for providers
@@ -172,6 +174,8 @@ func (r *RollingUpgradeReconciler) Reconcile(ctx context.Context, req ctrl.Reque
172174

173175
DrainTimeout: r.DrainTimeout,
174176
IgnoreDrainFailures: r.IgnoreDrainFailures,
177+
ReplacementNodesMap: r.ReplacementNodesMap,
178+
MaxReplacementNodes: r.MaxReplacementNodes,
175179
}
176180

177181
// process node rotation
@@ -246,6 +250,14 @@ func (r *RollingUpgradeReconciler) UpdateStatus(rollingUpgrade *v1alpha1.Rolling
246250
}
247251
}
248252

253+
// max number of replacement nodes allowed in a cluster. This will ensure we avoid cluster ballooning.
254+
func (r *RollingUpgradeReconciler) SetMaxReplacementNodes(n int) {
255+
if n >= 1 {
256+
r.Info("setting max replacement nodes", "value", n)
257+
r.MaxReplacementNodes = n
258+
}
259+
}
260+
249261
// extract node objects from syncMap to a slice
250262
func (r *RollingUpgradeReconciler) getClusterNodes() []*corev1.Node {
251263
var clusterNodes []*corev1.Node

controllers/upgrade.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ type RollingUpgradeContext struct {
6262
metricsMutex *sync.Mutex
6363
DrainTimeout int
6464
IgnoreDrainFailures bool
65+
ReplacementNodesMap *sync.Map
66+
MaxReplacementNodes int
67+
AllowReplacements bool
6568
}
6669

6770
func (r *RollingUpgradeContext) RotateNodes() error {
@@ -149,6 +152,18 @@ func (r *RollingUpgradeContext) ReplaceNodeBatch(batch []*autoscaling.Instance)
149152
batchInstanceIDs, inServiceInstanceIDs := awsprovider.GetInstanceIDs(batch), awsprovider.GetInServiceInstanceIDs(batch)
150153
// Tag and set to StandBy only the InService instances.
151154
if len(inServiceInstanceIDs) > 0 {
155+
156+
// Check if replacement nodes are causing cluster to balloon
157+
clusterIsBallooning, allowedBatchSize := r.ClusterBallooning(len(inServiceInstanceIDs))
158+
if clusterIsBallooning || allowedBatchSize == 0 {
159+
// Allowing more replacement nodes can cause cluster ballooning. Requeue CR.
160+
return true, nil
161+
}
162+
if len(inServiceInstanceIDs) != allowedBatchSize {
163+
r.Info("cluster is about to hit max-replacement-nodes capacity, reducing batchSize", "prevBatchSize", len(inServiceInstanceIDs), "currBatchSize", allowedBatchSize, "name", r.RollingUpgrade.NamespacedName())
164+
inServiceInstanceIDs = inServiceInstanceIDs[:allowedBatchSize]
165+
}
166+
152167
// Add in-progress tag
153168
r.Info("setting instances to in-progress", "batch", batchInstanceIDs, "instances(InService)", inServiceInstanceIDs, "name", r.RollingUpgrade.NamespacedName())
154169
if err := r.Auth.TagEC2instances(inServiceInstanceIDs, instanceStateTagKey, inProgressTagValue); err != nil {
@@ -361,6 +376,14 @@ func (r *RollingUpgradeContext) ReplaceNodeBatch(batch []*autoscaling.Instance)
361376
return true, nil
362377
}
363378

379+
// Once instances are terminated, decrease them from the count.
380+
count, _ := r.ReplacementNodesMap.Load("ReplacementNodes")
381+
if count != nil && count.(int) > 0 {
382+
r.ReplacementNodesMap.Store("ReplacementNodes", count.(int)-1)
383+
r.Info("decrementing replacementNodes count", "ReplacementNodes", count.(int)-1, "name", r.RollingUpgrade.NamespacedName())
384+
r.AllowReplacements = false
385+
}
386+
364387
r.RollingUpgrade.SetLastNodeTerminationTime(&metav1.Time{Time: time.Now()})
365388

366389
// Turns onto NodeRotationTerminate
@@ -662,3 +685,31 @@ func (r *RollingUpgradeContext) SetBatchStandBy(instanceIDs []string) error {
662685
}
663686
return nil
664687
}
688+
689+
// Checks for how many replacement nodes exists across all the IGs in the cluster
690+
func (r *RollingUpgradeContext) ClusterBallooning(batchSize int) (bool, int) {
691+
count, _ := r.ReplacementNodesMap.LoadOrStore("ReplacementNodes", 0)
692+
newReplacementCount := count.(int) + batchSize
693+
partialReplacementCount := r.MaxReplacementNodes - count.(int)
694+
695+
// By default, no limits on replacement nodes.
696+
if r.MaxReplacementNodes == 0 {
697+
return false, batchSize
698+
}
699+
700+
// Handle 3 different cases. 1) When entire batch can have replacement nodes. 2) When partial batch can have replacement nodes 3) When there is no availability for replacement nodes and CR has to re-queue
701+
if newReplacementCount <= r.MaxReplacementNodes {
702+
r.ReplacementNodesMap.Store("ReplacementNodes", newReplacementCount)
703+
r.Info("incrementing replacementNodes count", "ReplacementNodes", newReplacementCount, "name", r.RollingUpgrade.NamespacedName())
704+
r.AllowReplacements = true
705+
} else if partialReplacementCount < batchSize && partialReplacementCount > 0 {
706+
r.ReplacementNodesMap.Store("ReplacementNodes", count.(int)+partialReplacementCount)
707+
r.Info("incrementing replacementNodes count", "ReplacementNodes", count.(int)+partialReplacementCount, "name", r.RollingUpgrade.NamespacedName())
708+
r.AllowReplacements = true
709+
batchSize = partialReplacementCount
710+
} else if !r.AllowReplacements {
711+
r.Info("cluster has hit max-replacement-nodes capacity, requeuing rollingUpgrade CR. ", "replacementNodes", count.(int), "maxReplacementNodes", r.MaxReplacementNodes, "name", r.RollingUpgrade.NamespacedName())
712+
return true, 0
713+
}
714+
return false, batchSize
715+
}

controllers/upgrade_test.go

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -517,3 +517,103 @@ func TestIgnoreDrainFailuresAndDrainTimeout(t *testing.T) {
517517
}
518518
}
519519
}
520+
521+
func TestClusterBallooning(t *testing.T) {
522+
var tests = []struct {
523+
TestDescription string
524+
Reconciler *RollingUpgradeReconciler
525+
RollingUpgrade *v1alpha1.RollingUpgrade
526+
AsgClient *MockAutoscalingGroup
527+
ClusterNodes []*corev1.Node
528+
BatchSize int
529+
IsClusterBallooning bool
530+
AllowedBatchSize int
531+
}{
532+
{
533+
"ClusterBallooning - maxReplacementNodes is not set, expect no clusterBallooning",
534+
createRollingUpgradeReconciler(t),
535+
createRollingUpgrade(),
536+
createASGClient(),
537+
createNodeSlice(),
538+
3,
539+
false,
540+
3, // because mock-asg-1 has 3 instances
541+
},
542+
{
543+
"ClusterBallooning - cluster has hit maxReplacementNodes capacity, expect clusterBallooning to be true",
544+
func() *RollingUpgradeReconciler {
545+
reconciler := createRollingUpgradeReconciler(t)
546+
reconciler.MaxReplacementNodes = 500
547+
reconciler.ReplacementNodesMap.Store("ReplacementNodes", 500)
548+
return reconciler
549+
}(),
550+
createRollingUpgrade(),
551+
createASGClient(),
552+
createNodeSlice(),
553+
3,
554+
true,
555+
0,
556+
},
557+
{
558+
"ClusterBallooning - cluster is below maxReplacementNodes capacity, expect no clusterBallooning",
559+
func() *RollingUpgradeReconciler {
560+
reconciler := createRollingUpgradeReconciler(t)
561+
reconciler.MaxReplacementNodes = 500
562+
reconciler.ReplacementNodesMap.Store("ReplacementNodes", 100)
563+
return reconciler
564+
}(),
565+
createRollingUpgrade(),
566+
createASGClient(),
567+
createNodeSlice(),
568+
400,
569+
false,
570+
400,
571+
},
572+
{
573+
"ClusterBallooning - cluster is about to hit maxReplacementNodes capacity, expect reduced batchSize",
574+
func() *RollingUpgradeReconciler {
575+
reconciler := createRollingUpgradeReconciler(t)
576+
reconciler.MaxReplacementNodes = 100
577+
reconciler.ReplacementNodesMap.Store("ReplacementNodes", 97)
578+
return reconciler
579+
}(),
580+
createRollingUpgrade(),
581+
createASGClient(),
582+
createNodeSlice(),
583+
100,
584+
false,
585+
3,
586+
},
587+
{
588+
"ClusterBallooning - cluster is about to hit maxReplacementNodes capacity, expect reduced batchSize",
589+
func() *RollingUpgradeReconciler {
590+
reconciler := createRollingUpgradeReconciler(t)
591+
reconciler.MaxReplacementNodes = 5
592+
reconciler.ReplacementNodesMap.Store("ReplacementNodes", 3)
593+
return reconciler
594+
}(),
595+
createRollingUpgrade(),
596+
createASGClient(),
597+
createNodeSlice(),
598+
3,
599+
false,
600+
2,
601+
},
602+
}
603+
for _, test := range tests {
604+
rollupCtx := createRollingUpgradeContext(test.Reconciler)
605+
rollupCtx.RollingUpgrade = test.RollingUpgrade
606+
rollupCtx.Cloud.ScalingGroups = test.AsgClient.autoScalingGroups
607+
rollupCtx.Cloud.ClusterNodes = test.ClusterNodes
608+
rollupCtx.Auth.AmazonClientSet.AsgClient = test.AsgClient
609+
610+
isClusterBallooning, allowedBatchSize := rollupCtx.ClusterBallooning(test.BatchSize)
611+
if isClusterBallooning != test.IsClusterBallooning {
612+
t.Errorf("Test Description: %s \n isClusterBallooning, expected: %v \n actual: %v", test.TestDescription, test.IsClusterBallooning, isClusterBallooning)
613+
}
614+
if allowedBatchSize != test.AllowedBatchSize {
615+
t.Errorf("Test Description: %s \n allowedBatchSize expected: %v \n actual: %v", test.TestDescription, test.AllowedBatchSize, allowedBatchSize)
616+
}
617+
618+
}
619+
}

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@ require (
1616
k8s.io/client-go v0.20.4
1717
k8s.io/kubectl v0.20.4
1818
sigs.k8s.io/controller-runtime v0.7.0
19-
19+
2020
)

0 commit comments

Comments
 (0)