Skip to content

Commit

Permalink
Update and unify cluster/resource paused handling
Browse files Browse the repository at this point in the history
- Ports predicates for handling paused annotations on resources from cluster-api-provider-aws
  - Includes logging of predicate handling (level v4)
- Adds predicatee helpers for:
  - Aggregating predicates
  - adding a Watch to a controller for triggering updates on Cluster.Spec.Paused transitioning to false
    - Includes logging for predicate handling (level v4)
    - Avoids triggering events for Delete/Generic events
  - adding a Watch to a controller for triggering updates on both Cluster.Spec.Paused and Cluster.Status.InfrastructureReady
    - Includes logging for predicate handling (level v4)
    - Avoids triggering events for Delete/Generic events
    - Ensures that controllers get prompt updates for Cluster.Status.InfrastructureReady becoming true where needed
- Updates in-tree controllers to standardize on the use of the above helpers
- Unifies other aspects of paused handling among in-tree controllers
  • Loading branch information
detiber committed May 4, 2020
1 parent 0bb5cbf commit 30c377c
Show file tree
Hide file tree
Showing 22 changed files with 623 additions and 115 deletions.
32 changes: 23 additions & 9 deletions bootstrap/kubeadm/controllers/kubeadmconfig_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ import (
expv1 "sigs.k8s.io/cluster-api/exp/api/v1alpha3"
"sigs.k8s.io/cluster-api/feature"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/annotations"
"sigs.k8s.io/cluster-api/util/patch"
"sigs.k8s.io/cluster-api/util/predicates"
"sigs.k8s.io/cluster-api/util/secret"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -90,35 +92,42 @@ func (r *KubeadmConfigReconciler) SetupWithManager(mgr ctrl.Manager, option cont

r.scheme = mgr.GetScheme()

builder := ctrl.NewControllerManagedBy(mgr).
b := ctrl.NewControllerManagedBy(mgr).
For(&bootstrapv1.KubeadmConfig{}).
WithOptions(option).
WithEventFilter(predicates.ResourceNotPaused(r.Log)).
Watches(
&source.Kind{Type: &clusterv1.Machine{}},
&handler.EnqueueRequestsFromMapFunc{
ToRequests: handler.ToRequestsFunc(r.MachineToBootstrapMapFunc),
},
).
Watches(
&source.Kind{Type: &clusterv1.Cluster{}},
&handler.EnqueueRequestsFromMapFunc{
ToRequests: handler.ToRequestsFunc(r.ClusterToKubeadmConfigs),
},
)

if feature.Gates.Enabled(feature.MachinePool) {
builder = builder.Watches(
b = b.Watches(
&source.Kind{Type: &expv1.MachinePool{}},
&handler.EnqueueRequestsFromMapFunc{
ToRequests: handler.ToRequestsFunc(r.MachinePoolToBootstrapMapFunc),
},
)
}

if err := builder.Complete(r); err != nil {
c, err := b.Build(r)
if err != nil {
return errors.Wrap(err, "failed setting up with a controller manager")
}

err = c.Watch(
&source.Kind{Type: &clusterv1.Cluster{}},
&handler.EnqueueRequestsFromMapFunc{
ToRequests: handler.ToRequestsFunc(r.ClusterToKubeadmConfigs),
},
predicates.ClusterUnpausedAndInfrastructureReady(r.Log),
)
if err != nil {
return errors.Wrap(err, "failed adding Watch for Clusters to controller manager")
}

return nil
}

Expand Down Expand Up @@ -168,6 +177,11 @@ func (r *KubeadmConfigReconciler) Reconcile(req ctrl.Request) (_ ctrl.Result, re
return ctrl.Result{}, err
}

if annotations.IsPaused(cluster, config) {
log.Info("Reconciliation is paused for this object")
return ctrl.Result{}, nil
}

scope := &Scope{
Logger: log,
Config: config,
Expand Down
5 changes: 4 additions & 1 deletion controllers/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ import (
"sigs.k8s.io/cluster-api/controllers/metrics"
capierrors "sigs.k8s.io/cluster-api/errors"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/annotations"
"sigs.k8s.io/cluster-api/util/patch"
"sigs.k8s.io/cluster-api/util/predicates"
"sigs.k8s.io/cluster-api/util/secret"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -77,6 +79,7 @@ func (r *ClusterReconciler) SetupWithManager(mgr ctrl.Manager, options controlle
&handler.EnqueueRequestsFromMapFunc{ToRequests: handler.ToRequestsFunc(r.controlPlaneMachineToCluster)},
).
WithOptions(options).
WithEventFilter(predicates.ResourceNotPaused(r.Log)).
Build(r)

if err != nil {
Expand Down Expand Up @@ -109,7 +112,7 @@ func (r *ClusterReconciler) Reconcile(req ctrl.Request) (_ ctrl.Result, reterr e
}

// Return early if the object or Cluster is paused.
if util.IsPaused(cluster, cluster) {
if annotations.IsPaused(cluster, cluster) {
logger.Info("Reconciliation is paused for this object")
return ctrl.Result{}, nil
}
Expand Down
3 changes: 2 additions & 1 deletion controllers/cluster_controller_phases.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"sigs.k8s.io/cluster-api/controllers/external"
capierrors "sigs.k8s.io/cluster-api/errors"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/annotations"
utilconversion "sigs.k8s.io/cluster-api/util/conversion"
"sigs.k8s.io/cluster-api/util/kubeconfig"
"sigs.k8s.io/cluster-api/util/patch"
Expand Down Expand Up @@ -78,7 +79,7 @@ func (r *ClusterReconciler) reconcileExternal(ctx context.Context, cluster *clus
}

// if external ref is paused, return error.
if util.IsPaused(cluster, obj) {
if annotations.IsPaused(cluster, obj) {
logger.V(3).Info("External object referenced is paused")
return external.ReconcileOutput{Paused: true}, nil
}
Expand Down
2 changes: 2 additions & 0 deletions controllers/external/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/cluster-api/util/predicates"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/source"
Expand Down Expand Up @@ -55,6 +56,7 @@ func (o *ObjectTracker) Watch(log logr.Logger, obj runtime.Object, handler handl
err := o.Controller.Watch(
&source.Kind{Type: u},
handler,
predicates.ResourceNotPaused(log),
)
if err != nil {
o.m.Delete(obj)
Expand Down
27 changes: 19 additions & 8 deletions controllers/machine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,16 @@ import (
capierrors "sigs.k8s.io/cluster-api/errors"
kubedrain "sigs.k8s.io/cluster-api/third_party/kubernetes-drain"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/annotations"
"sigs.k8s.io/cluster-api/util/patch"
"sigs.k8s.io/cluster-api/util/predicates"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)

var (
Expand Down Expand Up @@ -77,22 +80,30 @@ type MachineReconciler struct {
}

func (r *MachineReconciler) SetupWithManager(mgr ctrl.Manager, options controller.Options) error {
clusterToMachines, err := util.ClusterToObjectsMapper(mgr.GetClient(), &clusterv1.MachineList{}, mgr.GetScheme())
if err != nil {
return err
}

controller, err := ctrl.NewControllerManagedBy(mgr).
For(&clusterv1.Machine{}).
WithOptions(options).
WithEventFilter(predicates.ResourceNotPaused(r.Log)).
Build(r)

if err != nil {
return errors.Wrap(err, "failed setting up with a controller manager")
}

// Add a watch on clusterv1.Cluster object for paused notifications.
clusterToMachines, err := util.ClusterToObjectsMapper(mgr.GetClient(), &clusterv1.MachineList{}, mgr.GetScheme())
err = controller.Watch(
&source.Kind{Type: &clusterv1.Cluster{}},
&handler.EnqueueRequestsFromMapFunc{
ToRequests: clusterToMachines,
},
// TODO: should this wait for Cluster.Status.InfrastructureReady similar to Infra Machine resources?
predicates.ClusterUnpaused(r.Log),
)
if err != nil {
return err
}
if err := util.WatchOnClusterPaused(controller, clusterToMachines); err != nil {
return err
return errors.Wrap(err, "failed to add Watch for Clusters to controller manager")
}

r.recorder = mgr.GetEventRecorderFor("machine-controller")
Expand Down Expand Up @@ -143,7 +154,7 @@ func (r *MachineReconciler) Reconcile(req ctrl.Request) (_ ctrl.Result, reterr e
}

// Return early if the object or Cluster is paused.
if util.IsPaused(cluster, m) {
if annotations.IsPaused(cluster, m) {
logger.Info("Reconciliation is paused for this object")
return ctrl.Result{}, nil
}
Expand Down
3 changes: 2 additions & 1 deletion controllers/machine_controller_phases.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/utils/pointer"
"sigs.k8s.io/cluster-api/util/annotations"
utilconversion "sigs.k8s.io/cluster-api/util/conversion"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
Expand Down Expand Up @@ -102,7 +103,7 @@ func (r *MachineReconciler) reconcileExternal(ctx context.Context, cluster *clus
}

// if external ref is paused, return error.
if util.IsPaused(cluster, obj) {
if annotations.IsPaused(cluster, obj) {
logger.V(3).Info("External object referenced is paused")
return external.ReconcileOutput{Paused: true}, nil
}
Expand Down
28 changes: 19 additions & 9 deletions controllers/machinedeployment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ import (
"k8s.io/client-go/tools/record"
clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha3"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/annotations"
"sigs.k8s.io/cluster-api/util/patch"
"sigs.k8s.io/cluster-api/util/predicates"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
Expand Down Expand Up @@ -58,27 +60,35 @@ type MachineDeploymentReconciler struct {
}

func (r *MachineDeploymentReconciler) SetupWithManager(mgr ctrl.Manager, options controller.Options) error {
controller, err := ctrl.NewControllerManagedBy(mgr).
clusterToMachineDeployments, err := util.ClusterToObjectsMapper(mgr.GetClient(), &clusterv1.MachineDeploymentList{}, mgr.GetScheme())
if err != nil {
return err
}

c, err := ctrl.NewControllerManagedBy(mgr).
For(&clusterv1.MachineDeployment{}).
Owns(&clusterv1.MachineSet{}).
Watches(
&source.Kind{Type: &clusterv1.MachineSet{}},
&handler.EnqueueRequestsFromMapFunc{ToRequests: handler.ToRequestsFunc(r.MachineSetToDeployments)},
).
WithOptions(options).
WithEventFilter(predicates.ResourceNotPaused(r.Log)).
Build(r)

if err != nil {
return errors.Wrap(err, "failed setting up with a controller manager")
}

// Add a watch on clusterv1.Cluster object for paused notifications.
clusterToMachineDeployments, err := util.ClusterToObjectsMapper(mgr.GetClient(), &clusterv1.MachineDeploymentList{}, mgr.GetScheme())
err = c.Watch(
&source.Kind{Type: &clusterv1.Cluster{}},
&handler.EnqueueRequestsFromMapFunc{
ToRequests: clusterToMachineDeployments,
},
// TODO: should this wait for Cluster.Status.InfrastructureReady similar to Infra Machine resources?
predicates.ClusterUnpaused(r.Log),
)
if err != nil {
return err
}
if err := util.WatchOnClusterPaused(controller, clusterToMachineDeployments); err != nil {
return err
return errors.Wrap(err, "failed to add Watch for Clusters to controller manager")
}

r.recorder = mgr.GetEventRecorderFor("machinedeployment-controller")
Expand Down Expand Up @@ -107,7 +117,7 @@ func (r *MachineDeploymentReconciler) Reconcile(req ctrl.Request) (_ ctrl.Result
}

// Return early if the object or Cluster is paused.
if util.IsPaused(cluster, deployment) {
if annotations.IsPaused(cluster, deployment) {
logger.Info("Reconciliation is paused for this object")
return ctrl.Result{}, nil
}
Expand Down
19 changes: 13 additions & 6 deletions controllers/machinehealthcheck_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ import (
clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha3"
"sigs.k8s.io/cluster-api/controllers/remote"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/annotations"
"sigs.k8s.io/cluster-api/util/patch"
"sigs.k8s.io/cluster-api/util/predicates"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -76,20 +78,25 @@ type MachineHealthCheckReconciler struct {
func (r *MachineHealthCheckReconciler) SetupWithManager(mgr ctrl.Manager, options controller.Options) error {
controller, err := ctrl.NewControllerManagedBy(mgr).
For(&clusterv1.MachineHealthCheck{}).
Watches(
&source.Kind{Type: &clusterv1.Cluster{}},
&handler.EnqueueRequestsFromMapFunc{ToRequests: handler.ToRequestsFunc(r.clusterToMachineHealthCheck)},
).
Watches(
&source.Kind{Type: &clusterv1.Machine{}},
&handler.EnqueueRequestsFromMapFunc{ToRequests: handler.ToRequestsFunc(r.machineToMachineHealthCheck)},
).
WithOptions(options).
WithEventFilter(predicates.ResourceNotPaused(r.Log)).
Build(r)

if err != nil {
return errors.Wrap(err, "failed setting up with a controller manager")
}
err = controller.Watch(
&source.Kind{Type: &clusterv1.Cluster{}},
&handler.EnqueueRequestsFromMapFunc{ToRequests: handler.ToRequestsFunc(r.clusterToMachineHealthCheck)},
// TODO: should this wait for Cluster.Status.InfrastructureReady similar to Infra Machine resources?
predicates.ClusterUnpaused(r.Log),
)
if err != nil {
return errors.Wrap(err, "failed to add Watch for Clusters to controller manager")
}

// Add index to MachineHealthCheck for listing by Cluster Name
if err := mgr.GetCache().IndexField(&clusterv1.MachineHealthCheck{},
Expand Down Expand Up @@ -140,7 +147,7 @@ func (r *MachineHealthCheckReconciler) Reconcile(req ctrl.Request) (_ ctrl.Resul
}

// Return early if the object or Cluster is paused.
if util.IsPaused(cluster, m) {
if annotations.IsPaused(cluster, m) {
logger.Info("Reconciliation is paused for this object")
return ctrl.Result{}, nil
}
Expand Down
26 changes: 17 additions & 9 deletions controllers/machineset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@ import (
"sigs.k8s.io/cluster-api/controllers/noderefutil"
"sigs.k8s.io/cluster-api/controllers/remote"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/annotations"
utilconversion "sigs.k8s.io/cluster-api/util/conversion"
"sigs.k8s.io/cluster-api/util/patch"
"sigs.k8s.io/cluster-api/util/predicates"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
Expand Down Expand Up @@ -75,27 +77,33 @@ type MachineSetReconciler struct {
}

func (r *MachineSetReconciler) SetupWithManager(mgr ctrl.Manager, options controller.Options) error {
controller, err := ctrl.NewControllerManagedBy(mgr).
clusterToMachineSets, err := util.ClusterToObjectsMapper(mgr.GetClient(), &clusterv1.MachineSetList{}, mgr.GetScheme())
if err != nil {
return err
}

c, err := ctrl.NewControllerManagedBy(mgr).
For(&clusterv1.MachineSet{}).
Owns(&clusterv1.Machine{}).
Watches(
&source.Kind{Type: &clusterv1.Machine{}},
&handler.EnqueueRequestsFromMapFunc{ToRequests: handler.ToRequestsFunc(r.MachineToMachineSets)},
).
WithOptions(options).
WithEventFilter(predicates.ResourceNotPaused(r.Log)).
Build(r)

if err != nil {
return errors.Wrap(err, "failed setting up with a controller manager")
}

// Add a watch on clusterv1.Cluster object for paused notifications.
clusterToMachineSets, err := util.ClusterToObjectsMapper(mgr.GetClient(), &clusterv1.MachineSetList{}, mgr.GetScheme())
err = c.Watch(
&source.Kind{Type: &clusterv1.Cluster{}},
&handler.EnqueueRequestsFromMapFunc{ToRequests: clusterToMachineSets},
// TODO: should this wait for Cluster.Status.InfrastructureReady similar to Infra Machine resources?
predicates.ClusterUnpaused(r.Log),
)
if err != nil {
return err
}
if err := util.WatchOnClusterPaused(controller, clusterToMachineSets); err != nil {
return err
return errors.Wrap(err, "failed to add Watch for Clusters to controller manager")
}

r.recorder = mgr.GetEventRecorderFor("machineset-controller")
Expand Down Expand Up @@ -124,7 +132,7 @@ func (r *MachineSetReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error)
}

// Return early if the object or Cluster is paused.
if util.IsPaused(cluster, machineSet) {
if annotations.IsPaused(cluster, machineSet) {
logger.Info("Reconciliation is paused for this object")
return ctrl.Result{}, nil
}
Expand Down
Loading

0 comments on commit 30c377c

Please sign in to comment.