Skip to content

Commit

Permalink
Merge pull request #39 from ashwani2k/scale-mcm
Browse files Browse the repository at this point in the history
Enable configuring delays for scaling of dependents during probe
  • Loading branch information
ashwani2k authored Feb 28, 2022
2 parents 190a54b + 4051153 commit 0853384
Show file tree
Hide file tree
Showing 421 changed files with 65,651 additions and 32,091 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ require (
github.com/prometheus/client_golang v1.3.0
github.com/spf13/cobra v0.0.6
github.com/spf13/pflag v1.0.5
golang.org/x/sys v0.0.0-20200523222454-059865788121 // indirect
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect
golang.org/x/tools v0.1.9 // indirect
k8s.io/api v0.18.2
k8s.io/apimachinery v0.18.2
k8s.io/client-go v11.0.1-0.20190409021438-1a26190bd76a+incompatible
Expand Down
62 changes: 21 additions & 41 deletions go.sum

Large diffs are not rendered by default.

11 changes: 11 additions & 0 deletions hack/config-probe.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,14 @@ probes:
kind: Deployment
name: kube-controller-manager
replicas: 1
- scaleRef:
apiVersion: extensions/v1beta1
kind: Deployment
name: machine-controller-manager
scaleRefDependsOn:
- apiVersion: extensions/v1beta1
kind: Deployment
name: kube-controller-manager
replicas: 1
scaleUpDelaySeconds: 10
scaleDownDelaySeconds: 0
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ package main
import "github.com/gardener/dependency-watchdog/cmd"

func main() {
cmd.Execute()
cmd.Execute()
}
2 changes: 2 additions & 0 deletions pkg/multicontext/multicontext.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func (m *Multicontext) Start(stopCh <-chan struct{}) {
return
case cmsg := <-m.ContextCh:
oldCancelFn, ok := m.CancelFns[cmsg.Key]
klog.V(4).Infof("Checking the oldCancelFn for key %v in the multicontext map and recieved ok code %v", cmsg.Key, ok)
if cmsg.CancelFn != nil {
klog.Infof("Registering the context for the key: %s", cmsg.Key)
m.CancelFns[cmsg.Key] = cmsg.CancelFn
Expand All @@ -65,6 +66,7 @@ func (m *Multicontext) Start(stopCh <-chan struct{}) {

func (m *Multicontext) cancelAll() {
for key, cancelFn := range m.CancelFns {
klog.V(4).Infof("Deleting cancelFn for key %s \n", key)
delete(m.CancelFns, key)
cancelFn()
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/restarter/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@ type Service struct {

// DependantPods struct captures the details needed to identify dependant pods.
type DependantPods struct {
Name string `json:"name,omitempty"`
Name string `json:"name,omitempty"`
Selector *metav1.LabelSelector `json:"selector"`
}
7 changes: 5 additions & 2 deletions pkg/scaler/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ type ProbeDetails struct {

// DependantScaleDetails has the details about the dependant scale sub-resource.
type DependantScaleDetails struct {
ScaleRef autoscalingv1.CrossVersionObjectReference `json:"scaleRef"`
Replicas *int32 `json:"replicas"`
ScaleRef autoscalingv1.CrossVersionObjectReference `json:"scaleRef"`
Replicas *int32 `json:"replicas"`
ScaleUpDelaySeconds *int32 `json:"scaleUpDelaySeconds,omitempty"`
ScaleDownDelaySeconds *int32 `json:"scaleDownDelaySeconds,omitempty"`
ScaleRefDependsOn []autoscalingv1.CrossVersionObjectReference `json:"scaleRefDependsOn,omitempty"`
}
70 changes: 67 additions & 3 deletions pkg/scaler/prober.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ func (p *prober) tryAndRun(prepareRun func() (stopCh <-chan struct{}), cancelFn

internalClient, internalSHA, internalErr := p.getClientFromSecret(p.probeDeps.Probe.Internal.KubeconfigSecretName, p.internalSHA)
externalClient, externalSHA, externalErr := p.getClientFromSecret(p.probeDeps.Probe.External.KubeconfigSecretName, p.externalSHA)
klog.V(4).Infof("Secret fetch completed with internalErr: %v", internalErr)
klog.V(4).Infof("Secret fetch completed with externalErr: %v", externalErr)

shootNotReady, err := p.shootNotReady()
if (shootNotReady && err == nil) || apierrors.IsNotFound(internalErr) || apierrors.IsNotFound(externalErr) {
Expand Down Expand Up @@ -459,11 +461,41 @@ func (p *prober) scaleTo(parentContext context.Context, msg string, replicas int
klog.V(4).Infof("%s: skipped because desired=%d and current=%d", prefix, replicas, s.Spec.Replicas)
continue
}
/*
Check if the scaled objects has defined any delays for the operation.
scaleUpDelay is the delay in seconds to wait before initiating scaleUp to ensures that the resource is scaled up after allowing sufficient time for system to recover.
scaleDownDelay is the delay in seconds to wait before initiating scaleDown to ensure that the resource is scaled down after allowing its dependents room to react.
*/
var depChecked bool
// Check for scaleUp delays
if replicas > 0 {
if dsd.ScaleUpDelaySeconds != nil {
klog.V(4).Infof("Delaying scale up of %s by %d seconds \n", dsd.ScaleRef.Name, *dsd.ScaleUpDelaySeconds)
time.Sleep(toDuration(dsd.ScaleUpDelaySeconds, 0))
}
depChecked = p.checkScaleRefDependsOn(parentContext, fmt.Sprintf("Checking dependents of %s before scaleUp", dsd.ScaleRef.Name), dsd.ScaleRefDependsOn, replicas, checkFn)
klog.V(4).Infof("Check for Scaleref depends on returned %t\n", depChecked)

} else if replicas == 0 { // check for scaleDown delays
if dsd.ScaleDownDelaySeconds != nil {
klog.V(4).Infof("Delaying scale down of %s by %d seconds \n", dsd.ScaleRef.Name, *dsd.ScaleDownDelaySeconds)
time.Sleep(toDuration(dsd.ScaleDownDelaySeconds, 0))
}
depChecked = p.checkScaleRefDependsOn(parentContext, fmt.Sprintf("Checking dependents of %s before scaleDown", dsd.ScaleRef.Name), dsd.ScaleRefDependsOn, replicas, checkFn)
klog.V(4).Infof("Check for Scaleref depends on returned %t\n", depChecked)

if err = retry(msg, p.getScalingFn(parentContext, gr, s, replicas), defaultMaxRetries); err != nil {
klog.Errorf("%s: Error scaling : %s", prefix, err)
} else {
klog.Errorf("%s: Replicas has a unsupported value %d\n", prefix, replicas)
}
if depChecked {

if err = retry(msg, p.getScalingFn(parentContext, gr, s, replicas), defaultMaxRetries); err != nil {
klog.Errorf("%s: Error scaling : %s", prefix, err)
}
klog.Infof("%s: replicas=%d: successful", prefix, replicas)
} else {
klog.V(4).Infof("Check for dependents returned false. Skipping scaling")
}
klog.Infof("%s: replicas=%d: successful", prefix, replicas)
} else {
klog.Errorf("%s: Could not get target reference: %s", prefix, err)
klog.Errorf("%s: replicas=%d: failed", prefix, replicas)
Expand All @@ -477,6 +509,7 @@ func (p *prober) getScalingFn(parentContext context.Context, gr schema.GroupReso
return func() error {
s = s.DeepCopy()
s.Spec.Replicas = replicas

timeout := toDuration(p.probeDeps.Probe.TimeoutSeconds, defaultTimeoutSeconds)
_, cancelFn := context.WithTimeout(parentContext, timeout)
defer cancelFn()
Expand Down Expand Up @@ -504,3 +537,34 @@ func (p *prober) scaleUp(ctx context.Context) error {
return n > o // scale to at least n
})
}

// Checks for a given resource considered for scale, if for the respecitve scale operations its dependent deployments are in desired state.
// If availableReplicas is not equal to desired then it fails the check and the scaling fo the parent is stopped
func (p *prober) checkScaleRefDependsOn(ctx context.Context, prefix string, dependsOnScaleRefs []autoscalingapi.CrossVersionObjectReference, replicas int32, checkFn func(oReplicas, nReplicas int32) bool) bool {
// if possible check from the cache if the target needs to be scaled
klog.V(4).Infof("Check scale for dependents with prefix %s and dependendents %v", prefix, dependsOnScaleRefs)
if len(dependsOnScaleRefs) != 0 {
for _, dependsOnScaleRef := range dependsOnScaleRefs {
klog.V(4).Infof("Checking if the dependent scaleRef %v has the desired replicas %d\n ", dependsOnScaleRef, replicas)
if dependsOnScaleRef.APIVersion == appsv1.SchemeGroupVersion.String() && dependsOnScaleRef.Kind == kindDeployment {
dwdGetTargetFromCacheTotal.With(prometheus.Labels{labelResource: resourceDeployments}).Inc()
d, err := p.deploymentsLister.Deployments(p.namespace).Get(dependsOnScaleRef.Name)
if err != nil {
klog.Errorf("%s: Could not find the target reference for %s: %s", prefix, dependsOnScaleRef.Name, err)
return false
}
var availableReplicas = int32(0)
availableReplicas = d.Status.AvailableReplicas //check if available replicas is as desired
if !checkFn(availableReplicas, replicas) {
klog.V(4).Infof("%s: check for dependent %s succeeded as desired=%d and available=%d", prefix, d.Name, replicas, availableReplicas)
return true // can continue with scale operation of the parent
}
klog.V(4).Infof("%s: check for dependent %s failed as desired=%d and available=%d", prefix, d.Name, replicas, availableReplicas)
return false // stop the scale operation of parent as dependent has not yet scaled
}

}
}
klog.V(4).Infof("%s skipped as there are no dependents to process.", prefix)
return true
}
30 changes: 25 additions & 5 deletions pkg/scaler/scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,10 @@ func NewController(clientset kubernetes.Interface,
},
})
c.secretsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.enqueueProbe,
AddFunc: func(new interface{}) {
klog.V(4).Infof("Secret Added\n")
c.enqueueProbe(new)
},
UpdateFunc: func(old, new interface{}) {
newSecret := new.(*v1.Secret)
oldSecret := old.(*v1.Secret)
Expand All @@ -97,9 +100,13 @@ func NewController(clientset kubernetes.Interface,
// Two different versions of the same Deployment will always have different RVs.
return
}
klog.V(4).Info("Secret changed")
c.enqueueProbe(new)
},
DeleteFunc: c.enqueueProbe,
DeleteFunc: func(old interface{}) {
klog.V(4).Infof("Secret deleted\n")
c.enqueueProbe(old)
},
})
c.hasSecretsSynced = c.secretsInformer.HasSynced
c.hasDeploymentsSynced = c.deploymentsInformer.HasSynced
Expand Down Expand Up @@ -145,6 +152,7 @@ func (c *Controller) enqueueProbe(obj interface{}) {

// Enqueue for reconciliation only if the secret name is one of the names configured.
if found {
klog.V(4).Infof("Enque probe received in namespace %s for name %s", ns, name)
c.workqueue.AddRateLimited(ns)
}
}
Expand Down Expand Up @@ -196,6 +204,7 @@ func (c *Controller) processNextWorkItem() bool {
obj, shutdown := c.workqueue.Get()

if shutdown {
klog.V(4).Infof("Received shutdown signal from worker queue\n")
return false
}

Expand Down Expand Up @@ -238,6 +247,7 @@ func (c *Controller) processNamespace(key string) error {
return err
}
if c.probeDependantsList.Namespace != "" && namespace != c.probeDependantsList.Namespace {
klog.V(5).Infof("Namespace %s is not in the list probe dependant namespace \n", namespace)
return nil
}

Expand All @@ -255,9 +265,9 @@ func (c *Controller) processNamespace(key string) error {
probeDeps: probeDeps,
}
err := p.tryAndRun(func() <-chan struct{} {
klog.Infof("Starting the probe in the namespace %s: %v", ns, pd)
klog.Infof("Starting the probe in the namespace %s: %v", ns, pd.Name)
ctx, cancelFn := c.newContext(ns, pd)

klog.V(5).Infof("Created the context %v with cancelFun %v\n", ctx, cancelFn)
// Register the context's cancelFn. This also cancels the previous context if any.
c.Multicontext.ContextCh <- &multicontext.ContextMessage{
Key: c.getKey(ns, pd),
Expand All @@ -267,14 +277,18 @@ func (c *Controller) processNamespace(key string) error {
c.registerProber(p)
return ctx.Done()
}, func() {
klog.V(4).Infof("Setting the context nil for ns %s and probe dependent %v\n", ns, probeDeps)
c.Multicontext.ContextCh <- &multicontext.ContextMessage{
Key: c.getKey(ns, pd),
CancelFn: nil,
}
}, func() {
klog.V(4).Infof("Enqueuing with a delay of 10 mins\n")
c.workqueue.AddAfter(ns, 10*time.Minute)
}, func() bool {

_, ok := c.probers[ns]
klog.V(4).Infof("Prober ran with ok code %v\n", ok)
return ok
})

Expand All @@ -301,22 +315,24 @@ func (c *Controller) registerProber(p *prober) *prober {
ns = p.namespace
probeDeps = p.probeDeps
)

if probeDeps == nil {
return nil
}

key := c.getKey(ns, probeDeps)
klog.V(4).Infof("Registering Probe for key %s\n", key)

c.mux.Lock() // serialize access to c.probers
defer c.mux.Unlock()

if c.probers == nil {
klog.V(4).Infof("No probers are running yet. Adding key %s to the probers list\n", key)
c.probers = make(map[string]*prober)
}

pb, ok := c.probers[key]
if ok && pb != nil {
klog.V(4).Infof("Found and existing probe for key %s so using existing probe %v \n", key, pb)
return pb
}

Expand All @@ -328,6 +344,7 @@ func (c *Controller) registerProber(p *prober) *prober {
probeDeps: probeDeps,
}
c.probers[key] = pb
klog.V(4).Infof("Created a new probe and added for key %v \n", key)
return pb
}

Expand All @@ -336,16 +353,19 @@ func (c *Controller) deleteProber(key string) {
defer c.mux.Unlock()

if c.probers == nil {
klog.V(4).Infof("No prober found returning\n")
return
}

delete(c.probers, key)
klog.V(4).Infof("Deleted probe for key %v \n", key)
}

func (c *Controller) newContext(ns string, probeDeps *api.ProbeDependants) (context.Context, context.CancelFunc) {
key := c.getKey(ns, probeDeps)

ctx, cancelFn := context.WithCancel(context.Background())
klog.V(4).Infof("Created new context %v with cancelFn %v \n", ctx, cancelFn)
return ctx, func() {
defer cancelFn()
c.deleteProber(key)
Expand Down
1 change: 1 addition & 0 deletions vendor/golang.org/x/net/context/go17.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions vendor/golang.org/x/net/context/go19.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions vendor/golang.org/x/net/context/pre_go17.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions vendor/golang.org/x/net/context/pre_go19.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion vendor/golang.org/x/net/html/const.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 0853384

Please sign in to comment.