diff --git a/bootstrap/kubeadm/controllers/kubeadmconfig_controller.go b/bootstrap/kubeadm/controllers/kubeadmconfig_controller.go index fec09f45292d..a2f99eb47ac0 100644 --- a/bootstrap/kubeadm/controllers/kubeadmconfig_controller.go +++ b/bootstrap/kubeadm/controllers/kubeadmconfig_controller.go @@ -42,7 +42,9 @@ import ( expv1 "sigs.k8s.io/cluster-api/exp/api/v1alpha3" "sigs.k8s.io/cluster-api/feature" "sigs.k8s.io/cluster-api/util" + "sigs.k8s.io/cluster-api/util/annotations" "sigs.k8s.io/cluster-api/util/patch" + "sigs.k8s.io/cluster-api/util/predicates" "sigs.k8s.io/cluster-api/util/secret" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -90,24 +92,19 @@ func (r *KubeadmConfigReconciler) SetupWithManager(mgr ctrl.Manager, option cont r.scheme = mgr.GetScheme() - builder := ctrl.NewControllerManagedBy(mgr). + b := ctrl.NewControllerManagedBy(mgr). For(&bootstrapv1.KubeadmConfig{}). WithOptions(option). + WithEventFilter(predicates.ResourceNotPaused(r.Log)). Watches( &source.Kind{Type: &clusterv1.Machine{}}, &handler.EnqueueRequestsFromMapFunc{ ToRequests: handler.ToRequestsFunc(r.MachineToBootstrapMapFunc), }, - ). - Watches( - &source.Kind{Type: &clusterv1.Cluster{}}, - &handler.EnqueueRequestsFromMapFunc{ - ToRequests: handler.ToRequestsFunc(r.ClusterToKubeadmConfigs), - }, ) if feature.Gates.Enabled(feature.MachinePool) { - builder = builder.Watches( + b = b.Watches( &source.Kind{Type: &expv1.MachinePool{}}, &handler.EnqueueRequestsFromMapFunc{ ToRequests: handler.ToRequestsFunc(r.MachinePoolToBootstrapMapFunc), @@ -115,10 +112,22 @@ func (r *KubeadmConfigReconciler) SetupWithManager(mgr ctrl.Manager, option cont ) } - if err := builder.Complete(r); err != nil { + c, err := b.Build(r) + if err != nil { return errors.Wrap(err, "failed setting up with a controller manager") } + err = c.Watch( + &source.Kind{Type: &clusterv1.Cluster{}}, + &handler.EnqueueRequestsFromMapFunc{ + ToRequests: handler.ToRequestsFunc(r.ClusterToKubeadmConfigs), + }, + predicates.ClusterUnpausedAndInfrastructureReady(r.Log), + ) + if err != nil { + return errors.Wrap(err, "failed adding Watch for Clusters to controller manager") + } + return nil } @@ -168,6 +177,11 @@ func (r *KubeadmConfigReconciler) Reconcile(req ctrl.Request) (_ ctrl.Result, re return ctrl.Result{}, err } + if annotations.IsPaused(cluster, config) { + log.Info("Reconciliation is paused for this object") + return ctrl.Result{}, nil + } + scope := &Scope{ Logger: log, Config: config, diff --git a/controllers/cluster_controller.go b/controllers/cluster_controller.go index 977bcc96647f..cd4541e4e7cb 100644 --- a/controllers/cluster_controller.go +++ b/controllers/cluster_controller.go @@ -35,7 +35,9 @@ import ( "sigs.k8s.io/cluster-api/controllers/metrics" capierrors "sigs.k8s.io/cluster-api/errors" "sigs.k8s.io/cluster-api/util" + "sigs.k8s.io/cluster-api/util/annotations" "sigs.k8s.io/cluster-api/util/patch" + "sigs.k8s.io/cluster-api/util/predicates" "sigs.k8s.io/cluster-api/util/secret" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -77,6 +79,7 @@ func (r *ClusterReconciler) SetupWithManager(mgr ctrl.Manager, options controlle &handler.EnqueueRequestsFromMapFunc{ToRequests: handler.ToRequestsFunc(r.controlPlaneMachineToCluster)}, ). WithOptions(options). + WithEventFilter(predicates.ResourceNotPaused(r.Log)). Build(r) if err != nil { @@ -109,7 +112,7 @@ func (r *ClusterReconciler) Reconcile(req ctrl.Request) (_ ctrl.Result, reterr e } // Return early if the object or Cluster is paused. - if util.IsPaused(cluster, cluster) { + if annotations.IsPaused(cluster, cluster) { logger.Info("Reconciliation is paused for this object") return ctrl.Result{}, nil } diff --git a/controllers/cluster_controller_phases.go b/controllers/cluster_controller_phases.go index 7888f5ac76df..6f815fc52cd5 100644 --- a/controllers/cluster_controller_phases.go +++ b/controllers/cluster_controller_phases.go @@ -29,6 +29,7 @@ import ( "sigs.k8s.io/cluster-api/controllers/external" capierrors "sigs.k8s.io/cluster-api/errors" "sigs.k8s.io/cluster-api/util" + "sigs.k8s.io/cluster-api/util/annotations" utilconversion "sigs.k8s.io/cluster-api/util/conversion" "sigs.k8s.io/cluster-api/util/kubeconfig" "sigs.k8s.io/cluster-api/util/patch" @@ -78,7 +79,7 @@ func (r *ClusterReconciler) reconcileExternal(ctx context.Context, cluster *clus } // if external ref is paused, return error. - if util.IsPaused(cluster, obj) { + if annotations.IsPaused(cluster, obj) { logger.V(3).Info("External object referenced is paused") return external.ReconcileOutput{Paused: true}, nil } diff --git a/controllers/external/tracker.go b/controllers/external/tracker.go index bbc097c60af1..29088fdacb72 100644 --- a/controllers/external/tracker.go +++ b/controllers/external/tracker.go @@ -23,6 +23,7 @@ import ( "github.com/pkg/errors" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" + "sigs.k8s.io/cluster-api/util/predicates" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/source" @@ -55,6 +56,7 @@ func (o *ObjectTracker) Watch(log logr.Logger, obj runtime.Object, handler handl err := o.Controller.Watch( &source.Kind{Type: u}, handler, + predicates.ResourceNotPaused(log), ) if err != nil { o.m.Delete(obj) diff --git a/controllers/machine_controller.go b/controllers/machine_controller.go index 2298b70f0a86..ab7feca5d3c1 100644 --- a/controllers/machine_controller.go +++ b/controllers/machine_controller.go @@ -42,13 +42,16 @@ import ( capierrors "sigs.k8s.io/cluster-api/errors" kubedrain "sigs.k8s.io/cluster-api/third_party/kubernetes-drain" "sigs.k8s.io/cluster-api/util" + "sigs.k8s.io/cluster-api/util/annotations" "sigs.k8s.io/cluster-api/util/patch" + "sigs.k8s.io/cluster-api/util/predicates" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" ) var ( @@ -77,22 +80,30 @@ type MachineReconciler struct { } func (r *MachineReconciler) SetupWithManager(mgr ctrl.Manager, options controller.Options) error { + clusterToMachines, err := util.ClusterToObjectsMapper(mgr.GetClient(), &clusterv1.MachineList{}, mgr.GetScheme()) + if err != nil { + return err + } + controller, err := ctrl.NewControllerManagedBy(mgr). For(&clusterv1.Machine{}). WithOptions(options). + WithEventFilter(predicates.ResourceNotPaused(r.Log)). Build(r) - if err != nil { return errors.Wrap(err, "failed setting up with a controller manager") } - // Add a watch on clusterv1.Cluster object for paused notifications. - clusterToMachines, err := util.ClusterToObjectsMapper(mgr.GetClient(), &clusterv1.MachineList{}, mgr.GetScheme()) + err = controller.Watch( + &source.Kind{Type: &clusterv1.Cluster{}}, + &handler.EnqueueRequestsFromMapFunc{ + ToRequests: clusterToMachines, + }, + // TODO: should this wait for Cluster.Status.InfrastructureReady similar to Infra Machine resources? + predicates.ClusterUnpaused(r.Log), + ) if err != nil { - return err - } - if err := util.WatchOnClusterPaused(controller, clusterToMachines); err != nil { - return err + return errors.Wrap(err, "failed to add Watch for Clusters to controller manager") } r.recorder = mgr.GetEventRecorderFor("machine-controller") @@ -143,7 +154,7 @@ func (r *MachineReconciler) Reconcile(req ctrl.Request) (_ ctrl.Result, reterr e } // Return early if the object or Cluster is paused. - if util.IsPaused(cluster, m) { + if annotations.IsPaused(cluster, m) { logger.Info("Reconciliation is paused for this object") return ctrl.Result{}, nil } diff --git a/controllers/machine_controller_phases.go b/controllers/machine_controller_phases.go index 1f6df26a8774..73cf303a9804 100644 --- a/controllers/machine_controller_phases.go +++ b/controllers/machine_controller_phases.go @@ -28,6 +28,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/utils/pointer" + "sigs.k8s.io/cluster-api/util/annotations" utilconversion "sigs.k8s.io/cluster-api/util/conversion" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/handler" @@ -102,7 +103,7 @@ func (r *MachineReconciler) reconcileExternal(ctx context.Context, cluster *clus } // if external ref is paused, return error. - if util.IsPaused(cluster, obj) { + if annotations.IsPaused(cluster, obj) { logger.V(3).Info("External object referenced is paused") return external.ReconcileOutput{Paused: true}, nil } diff --git a/controllers/machinedeployment_controller.go b/controllers/machinedeployment_controller.go index b0ced4ecb00e..6404c3e80e69 100644 --- a/controllers/machinedeployment_controller.go +++ b/controllers/machinedeployment_controller.go @@ -30,7 +30,9 @@ import ( "k8s.io/client-go/tools/record" clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha3" "sigs.k8s.io/cluster-api/util" + "sigs.k8s.io/cluster-api/util/annotations" "sigs.k8s.io/cluster-api/util/patch" + "sigs.k8s.io/cluster-api/util/predicates" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" @@ -58,7 +60,12 @@ type MachineDeploymentReconciler struct { } func (r *MachineDeploymentReconciler) SetupWithManager(mgr ctrl.Manager, options controller.Options) error { - controller, err := ctrl.NewControllerManagedBy(mgr). + clusterToMachineDeployments, err := util.ClusterToObjectsMapper(mgr.GetClient(), &clusterv1.MachineDeploymentList{}, mgr.GetScheme()) + if err != nil { + return err + } + + c, err := ctrl.NewControllerManagedBy(mgr). For(&clusterv1.MachineDeployment{}). Owns(&clusterv1.MachineSet{}). Watches( @@ -66,19 +73,22 @@ func (r *MachineDeploymentReconciler) SetupWithManager(mgr ctrl.Manager, options &handler.EnqueueRequestsFromMapFunc{ToRequests: handler.ToRequestsFunc(r.MachineSetToDeployments)}, ). WithOptions(options). + WithEventFilter(predicates.ResourceNotPaused(r.Log)). Build(r) - if err != nil { return errors.Wrap(err, "failed setting up with a controller manager") } - // Add a watch on clusterv1.Cluster object for paused notifications. - clusterToMachineDeployments, err := util.ClusterToObjectsMapper(mgr.GetClient(), &clusterv1.MachineDeploymentList{}, mgr.GetScheme()) + err = c.Watch( + &source.Kind{Type: &clusterv1.Cluster{}}, + &handler.EnqueueRequestsFromMapFunc{ + ToRequests: clusterToMachineDeployments, + }, + // TODO: should this wait for Cluster.Status.InfrastructureReady similar to Infra Machine resources? + predicates.ClusterUnpaused(r.Log), + ) if err != nil { - return err - } - if err := util.WatchOnClusterPaused(controller, clusterToMachineDeployments); err != nil { - return err + return errors.Wrap(err, "failed to add Watch for Clusters to controller manager") } r.recorder = mgr.GetEventRecorderFor("machinedeployment-controller") @@ -107,7 +117,7 @@ func (r *MachineDeploymentReconciler) Reconcile(req ctrl.Request) (_ ctrl.Result } // Return early if the object or Cluster is paused. - if util.IsPaused(cluster, deployment) { + if annotations.IsPaused(cluster, deployment) { logger.Info("Reconciliation is paused for this object") return ctrl.Result{}, nil } diff --git a/controllers/machinehealthcheck_controller.go b/controllers/machinehealthcheck_controller.go index 5bd74f2a8b3d..f6de9b728abf 100644 --- a/controllers/machinehealthcheck_controller.go +++ b/controllers/machinehealthcheck_controller.go @@ -35,7 +35,9 @@ import ( clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha3" "sigs.k8s.io/cluster-api/controllers/remote" "sigs.k8s.io/cluster-api/util" + "sigs.k8s.io/cluster-api/util/annotations" "sigs.k8s.io/cluster-api/util/patch" + "sigs.k8s.io/cluster-api/util/predicates" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" @@ -76,20 +78,25 @@ type MachineHealthCheckReconciler struct { func (r *MachineHealthCheckReconciler) SetupWithManager(mgr ctrl.Manager, options controller.Options) error { controller, err := ctrl.NewControllerManagedBy(mgr). For(&clusterv1.MachineHealthCheck{}). - Watches( - &source.Kind{Type: &clusterv1.Cluster{}}, - &handler.EnqueueRequestsFromMapFunc{ToRequests: handler.ToRequestsFunc(r.clusterToMachineHealthCheck)}, - ). Watches( &source.Kind{Type: &clusterv1.Machine{}}, &handler.EnqueueRequestsFromMapFunc{ToRequests: handler.ToRequestsFunc(r.machineToMachineHealthCheck)}, ). WithOptions(options). + WithEventFilter(predicates.ResourceNotPaused(r.Log)). Build(r) - if err != nil { return errors.Wrap(err, "failed setting up with a controller manager") } + err = controller.Watch( + &source.Kind{Type: &clusterv1.Cluster{}}, + &handler.EnqueueRequestsFromMapFunc{ToRequests: handler.ToRequestsFunc(r.clusterToMachineHealthCheck)}, + // TODO: should this wait for Cluster.Status.InfrastructureReady similar to Infra Machine resources? + predicates.ClusterUnpaused(r.Log), + ) + if err != nil { + return errors.Wrap(err, "failed to add Watch for Clusters to controller manager") + } // Add index to MachineHealthCheck for listing by Cluster Name if err := mgr.GetCache().IndexField(&clusterv1.MachineHealthCheck{}, @@ -140,7 +147,7 @@ func (r *MachineHealthCheckReconciler) Reconcile(req ctrl.Request) (_ ctrl.Resul } // Return early if the object or Cluster is paused. - if util.IsPaused(cluster, m) { + if annotations.IsPaused(cluster, m) { logger.Info("Reconciliation is paused for this object") return ctrl.Result{}, nil } diff --git a/controllers/machineset_controller.go b/controllers/machineset_controller.go index dedbe249a41a..2d3b160890ce 100644 --- a/controllers/machineset_controller.go +++ b/controllers/machineset_controller.go @@ -38,8 +38,10 @@ import ( "sigs.k8s.io/cluster-api/controllers/noderefutil" "sigs.k8s.io/cluster-api/controllers/remote" "sigs.k8s.io/cluster-api/util" + "sigs.k8s.io/cluster-api/util/annotations" utilconversion "sigs.k8s.io/cluster-api/util/conversion" "sigs.k8s.io/cluster-api/util/patch" + "sigs.k8s.io/cluster-api/util/predicates" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" @@ -75,7 +77,12 @@ type MachineSetReconciler struct { } func (r *MachineSetReconciler) SetupWithManager(mgr ctrl.Manager, options controller.Options) error { - controller, err := ctrl.NewControllerManagedBy(mgr). + clusterToMachineSets, err := util.ClusterToObjectsMapper(mgr.GetClient(), &clusterv1.MachineSetList{}, mgr.GetScheme()) + if err != nil { + return err + } + + c, err := ctrl.NewControllerManagedBy(mgr). For(&clusterv1.MachineSet{}). Owns(&clusterv1.Machine{}). Watches( @@ -83,19 +90,20 @@ func (r *MachineSetReconciler) SetupWithManager(mgr ctrl.Manager, options contro &handler.EnqueueRequestsFromMapFunc{ToRequests: handler.ToRequestsFunc(r.MachineToMachineSets)}, ). WithOptions(options). + WithEventFilter(predicates.ResourceNotPaused(r.Log)). Build(r) - if err != nil { return errors.Wrap(err, "failed setting up with a controller manager") } - // Add a watch on clusterv1.Cluster object for paused notifications. - clusterToMachineSets, err := util.ClusterToObjectsMapper(mgr.GetClient(), &clusterv1.MachineSetList{}, mgr.GetScheme()) + err = c.Watch( + &source.Kind{Type: &clusterv1.Cluster{}}, + &handler.EnqueueRequestsFromMapFunc{ToRequests: clusterToMachineSets}, + // TODO: should this wait for Cluster.Status.InfrastructureReady similar to Infra Machine resources? + predicates.ClusterUnpaused(r.Log), + ) if err != nil { - return err - } - if err := util.WatchOnClusterPaused(controller, clusterToMachineSets); err != nil { - return err + return errors.Wrap(err, "failed to add Watch for Clusters to controller manager") } r.recorder = mgr.GetEventRecorderFor("machineset-controller") @@ -124,7 +132,7 @@ func (r *MachineSetReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) } // Return early if the object or Cluster is paused. - if util.IsPaused(cluster, machineSet) { + if annotations.IsPaused(cluster, machineSet) { logger.Info("Reconciliation is paused for this object") return ctrl.Result{}, nil } diff --git a/controlplane/kubeadm/controllers/controller.go b/controlplane/kubeadm/controllers/controller.go index 7831c6dec7ad..b481a615cc1d 100644 --- a/controlplane/kubeadm/controllers/controller.go +++ b/controlplane/kubeadm/controllers/controller.go @@ -36,7 +36,9 @@ import ( "sigs.k8s.io/cluster-api/controlplane/kubeadm/internal/machinefilters" capierrors "sigs.k8s.io/cluster-api/errors" "sigs.k8s.io/cluster-api/util" + "sigs.k8s.io/cluster-api/util/annotations" "sigs.k8s.io/cluster-api/util/patch" + "sigs.k8s.io/cluster-api/util/predicates" "sigs.k8s.io/cluster-api/util/secret" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -70,17 +72,24 @@ func (r *KubeadmControlPlaneReconciler) SetupWithManager(mgr ctrl.Manager, optio c, err := ctrl.NewControllerManagedBy(mgr). For(&controlplanev1.KubeadmControlPlane{}). Owns(&clusterv1.Machine{}). - Watches( - &source.Kind{Type: &clusterv1.Cluster{}}, - &handler.EnqueueRequestsFromMapFunc{ToRequests: handler.ToRequestsFunc(r.ClusterToKubeadmControlPlane)}, - ). WithOptions(options). + WithEventFilter(predicates.ResourceNotPaused(r.Log)). Build(r) - if err != nil { return errors.Wrap(err, "failed setting up with a controller manager") } + err = c.Watch( + &source.Kind{Type: &clusterv1.Cluster{}}, + &handler.EnqueueRequestsFromMapFunc{ + ToRequests: handler.ToRequestsFunc(r.ClusterToKubeadmControlPlane), + }, + predicates.ClusterUnpausedAndInfrastructureReady(r.Log), + ) + if err != nil { + return errors.Wrap(err, "failed adding Watch for Clusters to controller manager") + } + r.scheme = mgr.GetScheme() r.controller = c r.recorder = mgr.GetEventRecorderFor("kubeadm-control-plane-controller") @@ -117,7 +126,7 @@ func (r *KubeadmControlPlaneReconciler) Reconcile(req ctrl.Request) (res ctrl.Re } logger = logger.WithValues("cluster", cluster.Name) - if util.IsPaused(cluster, kcp) { + if annotations.IsPaused(cluster, kcp) { logger.Info("Reconciliation is paused for this object") return ctrl.Result{}, nil } diff --git a/docs/book/src/developer/providers/v1alpha2-to-v1alpha3.md b/docs/book/src/developer/providers/v1alpha2-to-v1alpha3.md index d51bbc2ff885..6c3b287a562e 100644 --- a/docs/book/src/developer/providers/v1alpha2-to-v1alpha3.md +++ b/docs/book/src/developer/providers/v1alpha2-to-v1alpha3.md @@ -162,18 +162,45 @@ outside of the existing module. } ``` - Unless your controller is already watching Clusters, add a Watch to get notifications when Cluster.Spec.Paused field changes. - In most cases, `util.WatchOnClusterPaused` and `util.ClusterToObjectsMapper` can be used like in the example below: + In most cases, `predicates.ClusterUnpaused` and `util.ClusterToObjectsMapper` can be used like in the example below: ```go // Add a watch on clusterv1.Cluster object for paused notifications. clusterToObjectFunc, err := util.ClusterToObjectsMapper(mgr.GetClient(), , mgr.GetScheme()) if err != nil { return err } - if err := util.WatchOnClusterPaused(controller, clusterToObjectFunc); err != nil { + err = controller.Watch( + &source.Kind{Type: &cluserv1.Cluster{}}, + &handler.EnqueueRequestsFromMapFunc{ + ToRequests: clusterToObjectFunc, + }, + predicates.ClusterUnpaused(r.Log), + ) + if err != nil { return err } ``` NB: You need to have `cluster.x-k8s.io/cluster-name` applied to all your objects for the mapper to function. +- In some cases, you'll want to not just watch on Cluster.Spec.Paused changes, but also on + Cluster.Status.InfrastructureReady. For those cases `predicates.ClusterUnpausedAndInfrastructureReady` should be used + instead. + ```go + // Add a watch on clusterv1.Cluster object for paused and infrastructureReady notifications. + clusterToObjectFunc, err := util.ClusterToObjectsMapper(mgr.GetClient(), , mgr.GetScheme()) + if err != nil { + return err + } + err = controller.Watch( + &source.Kind{Type: &cluserv1.Cluster{}}, + &handler.EnqueueRequestsFromMapFunc{ + ToRequests: clusterToObjectFunc, + }, + predicates.ClusterUnpausedAndInfrastructureReady(r.Log), + ) + if err != nil { + return err + } + ``` ## [OPTIONAL] Support failure domains. diff --git a/exp/controllers/machinepool_controller.go b/exp/controllers/machinepool_controller.go index 1d1d64ffcc6b..9f8030317b40 100644 --- a/exp/controllers/machinepool_controller.go +++ b/exp/controllers/machinepool_controller.go @@ -36,11 +36,15 @@ import ( capierrors "sigs.k8s.io/cluster-api/errors" expv1 "sigs.k8s.io/cluster-api/exp/api/v1alpha3" "sigs.k8s.io/cluster-api/util" + "sigs.k8s.io/cluster-api/util/annotations" "sigs.k8s.io/cluster-api/util/patch" + "sigs.k8s.io/cluster-api/util/predicates" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/source" ) // +kubebuilder:rbac:groups=core,resources=events,verbs=get;list;watch;create;patch @@ -62,13 +66,30 @@ type MachinePoolReconciler struct { } func (r *MachinePoolReconciler) SetupWithManager(mgr ctrl.Manager, options controller.Options) error { + clusterToMachinePools, err := util.ClusterToObjectsMapper(mgr.GetClient(), &expv1.MachinePoolList{}, mgr.GetScheme()) + if err != nil { + return err + } + c, err := ctrl.NewControllerManagedBy(mgr). For(&expv1.MachinePool{}). WithOptions(options). + WithEventFilter(predicates.ResourceNotPaused(r.Log)). Build(r) if err != nil { return errors.Wrap(err, "failed setting up with a controller manager") } + err = c.Watch( + &source.Kind{Type: &clusterv1.Cluster{}}, + &handler.EnqueueRequestsFromMapFunc{ + ToRequests: clusterToMachinePools, + }, + // TODO: should this wait for Cluster.Status.InfrastructureReady similar to Infra Machine resources? + predicates.ClusterUnpaused(r.Log), + ) + if err != nil { + return errors.Wrap(err, "failed adding Watch for Cluster to controller manager") + } r.controller = c r.recorder = mgr.GetEventRecorderFor("machinepool-controller") @@ -100,7 +121,7 @@ func (r *MachinePoolReconciler) Reconcile(req ctrl.Request) (_ ctrl.Result, rete } // Return early if the object or Cluster is paused. - if util.IsPaused(cluster, mp) { + if annotations.IsPaused(cluster, mp) { logger.Info("Reconciliation is paused for this object") return ctrl.Result{}, nil } diff --git a/exp/controllers/machinepool_controller_phases.go b/exp/controllers/machinepool_controller_phases.go index 7ed61f1f0b16..ec0af7ddf874 100644 --- a/exp/controllers/machinepool_controller_phases.go +++ b/exp/controllers/machinepool_controller_phases.go @@ -33,6 +33,7 @@ import ( capierrors "sigs.k8s.io/cluster-api/errors" expv1 "sigs.k8s.io/cluster-api/exp/api/v1alpha3" "sigs.k8s.io/cluster-api/util" + "sigs.k8s.io/cluster-api/util/annotations" "sigs.k8s.io/cluster-api/util/patch" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/handler" @@ -90,7 +91,7 @@ func (r *MachinePoolReconciler) reconcileExternal(ctx context.Context, cluster * } // if external ref is paused, return error. - if util.IsPaused(cluster, obj) { + if annotations.IsPaused(cluster, obj) { logger.V(3).Info("External object referenced is paused") return external.ReconcileOutput{Paused: true}, nil } diff --git a/go.mod b/go.mod index e58637016a23..48337f5015e2 100644 --- a/go.mod +++ b/go.mod @@ -11,12 +11,9 @@ require ( github.com/evanphx/json-patch v4.5.0+incompatible github.com/go-logr/logr v0.1.0 github.com/gogo/protobuf v1.3.1 - github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect github.com/google/go-github v17.0.0+incompatible github.com/google/go-querystring v1.0.0 // indirect github.com/google/gofuzz v1.1.0 - github.com/hashicorp/golang-lru v0.5.4 // indirect - github.com/imdario/mergo v0.3.8 // indirect github.com/onsi/ginkgo v1.12.0 github.com/onsi/gomega v1.9.0 github.com/opencontainers/go-digest v1.0.0-rc1 // indirect @@ -27,10 +24,8 @@ require ( github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.6.2 go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738 - golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073 // indirect golang.org/x/net v0.0.0-20200301022130-244492dfa37a golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d - golang.org/x/time v0.0.0-20191024005414-555d28b269f0 // indirect google.golang.org/appengine v1.6.1 // indirect google.golang.org/grpc v1.26.0 k8s.io/api v0.17.2 diff --git a/go.sum b/go.sum index e129e3537bf8..c468b50c2d7a 100644 --- a/go.sum +++ b/go.sum @@ -185,9 +185,8 @@ github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXP github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= +github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef h1:veQD95Isof8w9/WXiA+pa3tz3fJXkt5B7QaRBrM62gk= github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= -github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e h1:1r7pUrabqp18hOBcwBwiTsbnFeTZHV9eER/QT5JVZxY= -github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v0.0.0-20161109072736-4bd1920723d7/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -240,17 +239,15 @@ github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t github.com/hashicorp/go-syslog v1.0.0/go.mod h1:qPfqrKkXGihmCqbJM2mZgkZGvKG1dFdvsLplgctolz4= github.com/hashicorp/golang-lru v0.0.0-20180201235237-0fb14efe8c47/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= -github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= -github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= +github.com/imdario/mergo v0.3.6 h1:xTNEAn+kxVO7dTZGu0CegyqKZmoWFI0rF8UxjlB2d28= github.com/imdario/mergo v0.3.6/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= -github.com/imdario/mergo v0.3.8 h1:CGgOkSJeqMRmt0D9XLWExdT4m4F1vd3FV3VPt+0VxkQ= -github.com/imdario/mergo v0.3.8/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/jimstudt/http-authentication v0.0.0-20140401203705-3eca13d6893a/go.mod h1:wK6yTYYcgjHE1Z1QtXACPDjcFJyBskHEdagmnq3vsP8= @@ -465,9 +462,8 @@ golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190617133340-57b3e21c3d56/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550 h1:ObdrDkeb4kJdCP557AjRjq69pTHfNouLtWZG7j9rPN8= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= -golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073 h1:xMPOj6Pz6UipU1wXLkrtqpHbR0AVFnyPEQq/wRWz9lM= -golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190312203227-4b39c73a6495/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -542,9 +538,8 @@ golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 h1:SvFZT6jyqRaOeXpc5h/JSfZenJ2O330aBsf7JfSUXmQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= -golang.org/x/time v0.0.0-20191024005414-555d28b269f0 h1:/5xXl8Y5W96D+TtHSlonuFqGHIWVuyCkGJLwGh9JJFs= -golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20181011042414-1f849cf54d09/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/test/infrastructure/docker/controllers/dockercluster_controller.go b/test/infrastructure/docker/controllers/dockercluster_controller.go index 2871a095a6e4..4c0192aeb529 100644 --- a/test/infrastructure/docker/controllers/dockercluster_controller.go +++ b/test/infrastructure/docker/controllers/dockercluster_controller.go @@ -27,6 +27,7 @@ import ( "sigs.k8s.io/cluster-api/test/infrastructure/docker/docker" "sigs.k8s.io/cluster-api/util" "sigs.k8s.io/cluster-api/util/patch" + "sigs.k8s.io/cluster-api/util/predicates" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" @@ -150,13 +151,18 @@ func reconcileDelete(ctx context.Context, dockerCluster *infrav1.DockerCluster, // SetupWithManager will add watches for this controller func (r *DockerClusterReconciler) SetupWithManager(mgr ctrl.Manager) error { - return ctrl.NewControllerManagedBy(mgr). + c, err := ctrl.NewControllerManagedBy(mgr). For(&infrav1.DockerCluster{}). - Watches( - &source.Kind{Type: &clusterv1.Cluster{}}, - &handler.EnqueueRequestsFromMapFunc{ - ToRequests: util.ClusterToInfrastructureMapFunc(infrav1.GroupVersion.WithKind("DockerCluster")), - }, - ). - Complete(r) + WithEventFilter(predicates.ResourceNotPaused(r.Log)). + Build(r) + if err != nil { + return err + } + return c.Watch( + &source.Kind{Type: &clusterv1.Cluster{}}, + &handler.EnqueueRequestsFromMapFunc{ + ToRequests: util.ClusterToInfrastructureMapFunc(infrav1.GroupVersion.WithKind("DockerCluster")), + }, + predicates.ClusterUnpaused(r.Log), + ) } diff --git a/test/infrastructure/docker/controllers/dockermachine_controller.go b/test/infrastructure/docker/controllers/dockermachine_controller.go index cae11523434a..ad8d5c590fff 100644 --- a/test/infrastructure/docker/controllers/dockermachine_controller.go +++ b/test/infrastructure/docker/controllers/dockermachine_controller.go @@ -31,6 +31,7 @@ import ( "sigs.k8s.io/cluster-api/test/infrastructure/docker/docker" "sigs.k8s.io/cluster-api/util" "sigs.k8s.io/cluster-api/util/patch" + "sigs.k8s.io/cluster-api/util/predicates" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" @@ -251,8 +252,15 @@ func (r *DockerMachineReconciler) reconcileDelete(ctx context.Context, machine * // SetupWithManager will add watches for this controller func (r *DockerMachineReconciler) SetupWithManager(mgr ctrl.Manager, options controller.Options) error { - return ctrl.NewControllerManagedBy(mgr). + clusterToDockerMachines, err := util.ClusterToObjectsMapper(mgr.GetClient(), &infrav1.DockerMachineList{}, mgr.GetScheme()) + if err != nil { + return err + } + + c, err := ctrl.NewControllerManagedBy(mgr). For(&infrav1.DockerMachine{}). + WithOptions(options). + WithEventFilter(predicates.ResourceNotPaused(r.Log)). Watches( &source.Kind{Type: &clusterv1.Machine{}}, &handler.EnqueueRequestsFromMapFunc{ @@ -265,8 +273,17 @@ func (r *DockerMachineReconciler) SetupWithManager(mgr ctrl.Manager, options con ToRequests: handler.ToRequestsFunc(r.DockerClusterToDockerMachines), }, ). - WithOptions(options). - Complete(r) + Build(r) + if err != nil { + return err + } + return c.Watch( + &source.Kind{Type: &clusterv1.Cluster{}}, + &handler.EnqueueRequestsFromMapFunc{ + ToRequests: clusterToDockerMachines, + }, + predicates.ClusterUnpausedAndInfrastructureReady(r.Log), + ) } // DockerClusterToDockerMachines is a handler.ToRequestsFunc to be used to enqeue diff --git a/test/infrastructure/docker/go.sum b/test/infrastructure/docker/go.sum index 99a7cb227940..261f6c684d4f 100644 --- a/test/infrastructure/docker/go.sum +++ b/test/infrastructure/docker/go.sum @@ -161,8 +161,6 @@ github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfU github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef h1:veQD95Isof8w9/WXiA+pa3tz3fJXkt5B7QaRBrM62gk= github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= -github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e h1:1r7pUrabqp18hOBcwBwiTsbnFeTZHV9eER/QT5JVZxY= -github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v0.0.0-20161109072736-4bd1920723d7/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -207,16 +205,12 @@ github.com/hashicorp/golang-lru v0.0.0-20180201235237-0fb14efe8c47/go.mod h1:/m3 github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= -github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= -github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= github.com/imdario/mergo v0.3.6 h1:xTNEAn+kxVO7dTZGu0CegyqKZmoWFI0rF8UxjlB2d28= github.com/imdario/mergo v0.3.6/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= -github.com/imdario/mergo v0.3.8 h1:CGgOkSJeqMRmt0D9XLWExdT4m4F1vd3FV3VPt+0VxkQ= -github.com/imdario/mergo v0.3.8/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/jimstudt/http-authentication v0.0.0-20140401203705-3eca13d6893a/go.mod h1:wK6yTYYcgjHE1Z1QtXACPDjcFJyBskHEdagmnq3vsP8= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= @@ -403,9 +397,8 @@ golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190617133340-57b3e21c3d56/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550 h1:ObdrDkeb4kJdCP557AjRjq69pTHfNouLtWZG7j9rPN8= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= -golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073 h1:xMPOj6Pz6UipU1wXLkrtqpHbR0AVFnyPEQq/wRWz9lM= -golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190312203227-4b39c73a6495/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -481,8 +474,6 @@ golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxb golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 h1:SvFZT6jyqRaOeXpc5h/JSfZenJ2O330aBsf7JfSUXmQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= -golang.org/x/time v0.0.0-20191024005414-555d28b269f0 h1:/5xXl8Y5W96D+TtHSlonuFqGHIWVuyCkGJLwGh9JJFs= -golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20181011042414-1f849cf54d09/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/util/annotations/paused.go b/util/annotations/paused.go new file mode 100644 index 000000000000..acb817a05d12 --- /dev/null +++ b/util/annotations/paused.go @@ -0,0 +1,40 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package annotations + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha3" +) + +// IsPaused returns true if the Cluster is paused or the object has the `paused` annotation. +func IsPaused(cluster *clusterv1.Cluster, o metav1.Object) bool { + if cluster.Spec.Paused { + return true + } + return HasPausedAnnotation(o) +} + +// HasPausedAnnotation returns true if the object has the `paused` annotation. +func HasPausedAnnotation(o metav1.Object) bool { + annotations := o.GetAnnotations() + if annotations == nil { + return false + } + _, ok := annotations[clusterv1.PausedAnnotation] + return ok +} diff --git a/util/predicates/cluster_predicates.go b/util/predicates/cluster_predicates.go new file mode 100644 index 000000000000..06b76ee96f69 --- /dev/null +++ b/util/predicates/cluster_predicates.go @@ -0,0 +1,195 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package predicates + +import ( + "github.com/go-logr/logr" + clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha3" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/predicate" +) + +// ClusterCreateInfraReady returns a predicate that returns true for a create event when a cluster has Status.InfrastructureReady set as true +// it also returns true if the resource provided is not a Cluster to allow for use with controller-runtime NewControllerManagedBy +func ClusterCreateInfraReady(logger logr.Logger) predicate.Funcs { + log := logger.WithValues("predicate", "ClusterCreateInfraReady") + return predicate.Funcs{ + CreateFunc: func(e event.CreateEvent) bool { + log = log.WithValues("eventType", "create") + + c, ok := e.Object.(*clusterv1.Cluster) + if !ok { + + log.V(4).Info("Expected Cluster", "type", e.Object.GetObjectKind().GroupVersionKind().String()) + return false + } + log = log.WithValues("namespace", c.Namespace, "cluster", c.Name) + + // Only need to trigger a reconcile if the Cluster.Status.InfrastructureReady is true + if c.Status.InfrastructureReady { + log.V(4).Info("Cluster infrastructure is ready, allowing further processing") + return true + } + + log.V(4).Info("Cluster infrastructure is not ready, blocking further processing") + return false + }, + UpdateFunc: func(e event.UpdateEvent) bool { return false }, + DeleteFunc: func(e event.DeleteEvent) bool { return false }, + GenericFunc: func(e event.GenericEvent) bool { return false }, + } +} + +// ClusterCreateNotPaused returns a predicate that returns true for a create event when a cluster has Spec.Paused set as false +// it also returns true if the resource provided is not a Cluster to allow for use with controller-runtime NewControllerManagedBy +func ClusterCreateNotPaused(logger logr.Logger) predicate.Funcs { + log := logger.WithValues("predicate", "ClusterCreateNotPaused") + return predicate.Funcs{ + CreateFunc: func(e event.CreateEvent) bool { + log = log.WithValues("eventType", "create") + + c, ok := e.Object.(*clusterv1.Cluster) + if !ok { + + log.V(4).Info("Expected Cluster", "type", e.Object.GetObjectKind().GroupVersionKind().String()) + return false + } + log = log.WithValues("namespace", c.Namespace, "cluster", c.Name) + + // Only need to trigger a reconcile if the Cluster.Spec.Paused is false + if !c.Spec.Paused { + log.V(4).Info("Cluster is not paused, allowing further processing") + return true + } + + log.V(4).Info("Cluster is not paused, blocking further processing") + return false + }, + UpdateFunc: func(e event.UpdateEvent) bool { return false }, + DeleteFunc: func(e event.DeleteEvent) bool { return false }, + GenericFunc: func(e event.GenericEvent) bool { return false }, + } +} + +// ClusterUpdateInfraReady returns a predicate that returns true for an update event when a cluster has Status.InfrastructureReady changed from false to true +// it also returns true if the resource provided is not a Cluster to allow for use with controller-runtime NewControllerManagedBy +func ClusterUpdateInfraReady(logger logr.Logger) predicate.Funcs { + log := logger.WithValues("predicate", "ClusterUpdateInfraReady") + return predicate.Funcs{ + UpdateFunc: func(e event.UpdateEvent) bool { + log = log.WithValues("eventType", "update") + + oldCluster, ok := e.ObjectOld.(*clusterv1.Cluster) + if !ok { + + log.V(4).Info("Expected Cluster", "type", e.ObjectOld.GetObjectKind().GroupVersionKind().String()) + return false + } + log = log.WithValues("namespace", oldCluster.Namespace, "cluster", oldCluster.Name) + + newCluster := e.ObjectNew.(*clusterv1.Cluster) + + if !oldCluster.Status.InfrastructureReady && newCluster.Status.InfrastructureReady { + log.V(4).Info("Cluster infrastructure became ready, allowing further processing") + return true + } + + log.V(4).Info("Cluster infrastructure did not become ready, blocking further processing") + return false + }, + CreateFunc: func(e event.CreateEvent) bool { return false }, + DeleteFunc: func(e event.DeleteEvent) bool { return false }, + GenericFunc: func(e event.GenericEvent) bool { return false }, + } +} + +// ClusterUpdateUnpaused returns a predicate that returns true for an update event when a cluster has Spec.Paused changed from true to false +// it also returns true if the resource provided is not a Cluster to allow for use with controller-runtime NewControllerManagedBy +func ClusterUpdateUnpaused(logger logr.Logger) predicate.Funcs { + log := logger.WithValues("predicate", "ClusterUpdateUnpaused") + return predicate.Funcs{ + UpdateFunc: func(e event.UpdateEvent) bool { + log = log.WithValues("eventType", "update") + + oldCluster, ok := e.ObjectOld.(*clusterv1.Cluster) + if !ok { + + log.V(4).Info("Expected Cluster", "type", e.ObjectOld.GetObjectKind().GroupVersionKind().String()) + return false + } + log = log.WithValues("namespace", oldCluster.Namespace, "cluster", oldCluster.Name) + + newCluster := e.ObjectNew.(*clusterv1.Cluster) + + if oldCluster.Spec.Paused && !newCluster.Spec.Paused { + log.V(4).Info("Cluster was unpaused, allowing further processing") + return true + } + + log.V(4).Info("Cluster was not unpaused, blocking further processing") + return false + }, + CreateFunc: func(e event.CreateEvent) bool { return false }, + DeleteFunc: func(e event.DeleteEvent) bool { return false }, + GenericFunc: func(e event.GenericEvent) bool { return false }, + } +} + +// ClusterUnpaused returns a Predicate that returns true on Cluster creation events where Cluster.Spec.Paused is false +// and Update events when Cluster.Spec.Paused transitions to false. +// This implements a common requirement for many cluster-api and provider controllers (such as Cluster Infrastructure +// controllers) to resume reconciliation when the Cluster is unpaused. +// Example use: +// err := controller.Watch( +// &source.Kind{Type: &clusterv1.Cluster{}}, +// &handler.EnqueueRequestsFromMapFunc{ +// ToRequests: clusterToMachines, +// }, +// predicates.ClusterUnpaused(r.Log), +// ) +func ClusterUnpaused(logger logr.Logger) predicate.Funcs { + log := logger.WithValues("predicate", "ClusterUnpaused") + + // Use any to ensure we process either create or update events we care about + return Any(log, ClusterCreateNotPaused(log), ClusterUpdateUnpaused(log)) +} + +// ClusterUnpausedAndInfrastructureReady returns a Predicate that returns true on Cluster creation events where +// both Cluster.Spec.Paused is false and Cluster.Status.InfrastructureReady is true and Update events when +// either Cluster.Spec.Paused transitions to false or Cluster.Status.InfrastructureReady transitions to true. +// This implements a common requirement for some cluster-api and provider controllers (such as Machine Infrastructure +// controllers) to resume reconciliation when the Cluster is unpaused and when the infrastructure becomes ready. +// Example use: +// err := controller.Watch( +// &source.Kind{Type: &clusterv1.Cluster{}}, +// &handler.EnqueueRequestsFromMapFunc{ +// ToRequests: clusterToMachines, +// }, +// predicates.ClusterUnpausedAndInfrastructureReady(r.Log), +// ) +func ClusterUnpausedAndInfrastructureReady(logger logr.Logger) predicate.Funcs { + log := logger.WithValues("predicate", "ClusterUnpausedAndInfrastructureReady") + + // Only continue processing create events if both not paused and infrastructure is ready + createPredicates := All(log, ClusterCreateNotPaused(log), ClusterCreateInfraReady(log)) + + // Process update events if either Cluster is unpaused or infrastructure becomes ready + updatePredicates := Any(log, ClusterUpdateUnpaused(log), ClusterUpdateInfraReady(log)) + + // Use any to ensure we process either create or update events we care about + return Any(log, createPredicates, updatePredicates) +} diff --git a/util/predicates/generic_predicates.go b/util/predicates/generic_predicates.go new file mode 100644 index 000000000000..6fac482614a9 --- /dev/null +++ b/util/predicates/generic_predicates.go @@ -0,0 +1,164 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package predicates + +import ( + "strings" + + "github.com/go-logr/logr" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "sigs.k8s.io/cluster-api/util/annotations" + + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/predicate" +) + +// All returns a predicate that returns true only if all given predicates return true +func All(logger logr.Logger, predicates ...predicate.Funcs) predicate.Funcs { + log := logger.WithValues("predicateAggregation", "All") + return predicate.Funcs{ + UpdateFunc: func(e event.UpdateEvent) bool { + for _, p := range predicates { + if !p.UpdateFunc(e) { + log.V(4).Info("One of the provided predicates returned false, blocking further processing") + return false + } + } + log.V(4).Info("All provided predicates returned true, allowing further processing") + return true + }, + CreateFunc: func(e event.CreateEvent) bool { + for _, p := range predicates { + if !p.CreateFunc(e) { + log.V(4).Info("One of the provided predicates returned false, blocking further processing") + return false + } + } + log.V(4).Info("All provided predicates returned true, allowing further processing") + return true + }, + DeleteFunc: func(e event.DeleteEvent) bool { + for _, p := range predicates { + if !p.DeleteFunc(e) { + log.V(4).Info("One of the provided predicates returned false, blocking further processing") + return false + } + } + log.V(4).Info("All provided predicates returned true, allowing further processing") + return true + }, + GenericFunc: func(e event.GenericEvent) bool { + for _, p := range predicates { + if !p.GenericFunc(e) { + log.V(4).Info("One of the provided predicates returned false, blocking further processing") + return false + } + } + log.V(4).Info("All provided predicates returned true, allowing further processing") + return true + }, + } +} + +// Any returns a predicate that returns true only if any given predicate returns true +func Any(logger logr.Logger, predicates ...predicate.Funcs) predicate.Funcs { + log := logger.WithValues("predicateAggregation", "Any") + return predicate.Funcs{ + UpdateFunc: func(e event.UpdateEvent) bool { + for _, p := range predicates { + if p.UpdateFunc(e) { + log.V(4).Info("One of the provided predicates returned true, allowing further processing") + return true + } + } + log.V(4).Info("All of the provided predicates returned false, blocking further processing") + return false + }, + CreateFunc: func(e event.CreateEvent) bool { + for _, p := range predicates { + if p.CreateFunc(e) { + log.V(4).Info("One of the provided predicates returned true, allowing further processing") + return true + } + } + log.V(4).Info("All of the provided predicates returned false, blocking further processing") + return false + }, + DeleteFunc: func(e event.DeleteEvent) bool { + for _, p := range predicates { + if p.DeleteFunc(e) { + log.V(4).Info("One of the provided predicates returned true, allowing further processing") + return true + } + } + log.V(4).Info("All of the provided predicates returned false, blocking further processing") + return false + }, + GenericFunc: func(e event.GenericEvent) bool { + for _, p := range predicates { + if p.GenericFunc(e) { + log.V(4).Info("One of the provided predicates returned true, allowing further processing") + return true + } + } + log.V(4).Info("All of the provided predicates returned false, blocking further processing") + return false + }, + } +} + +// ResourceNotPaused returns a Predicate that returns true only if the provided resource does not contain the +// paused annotation. +// This implements a common requirement for all cluster-api and provider controllers skip reconciliation when the paused +// annotation is present for a resource. +// Example use: +// func (r *MyReconciler) SetupWithManager(mgr ctrl.Manager, options controller.Options) error { +// controller, err := ctrl.NewControllerManagedBy(mgr). +// For(&v1.MyType{}). +// WithOptions(options). +// WithEventFilter(util.ResourceNotPaused(r.Log)). +// Build(r) +// return err +// } +func ResourceNotPaused(logger logr.Logger) predicate.Funcs { + return predicate.Funcs{ + UpdateFunc: func(e event.UpdateEvent) bool { + return processIfNotPaused(logger.WithValues("predicate", "updateEvent"), e.ObjectNew, e.MetaNew) + }, + CreateFunc: func(e event.CreateEvent) bool { + return processIfNotPaused(logger.WithValues("predicate", "createEvent"), e.Object, e.Meta) + }, + DeleteFunc: func(e event.DeleteEvent) bool { + return processIfNotPaused(logger.WithValues("predicate", "deleteEvent"), e.Object, e.Meta) + }, + GenericFunc: func(e event.GenericEvent) bool { + return processIfNotPaused(logger.WithValues("predicate", "genericEvent"), e.Object, e.Meta) + }, + } +} + +func processIfNotPaused(logger logr.Logger, obj runtime.Object, meta v1.Object) bool { + kind := strings.ToLower(obj.GetObjectKind().GroupVersionKind().Kind) + log := logger.WithValues("namespace", meta.GetNamespace(), kind, meta.GetName()) + if annotations.HasPausedAnnotation(meta) { + log.V(4).Info("Resource is paused, will not attempt to map resource") + return false + } + log.V(4).Info("Resource is not paused, will attempt to map resource") + return true +} diff --git a/util/util.go b/util/util.go index efce5c79015e..e455b8af3a59 100644 --- a/util/util.go +++ b/util/util.go @@ -37,15 +37,16 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/version" + "k8s.io/klog/klogr" clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha3" + "sigs.k8s.io/cluster-api/util/annotations" "sigs.k8s.io/cluster-api/util/container" + "sigs.k8s.io/cluster-api/util/predicates" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" "sigs.k8s.io/controller-runtime/pkg/controller" - "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" - "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" ) @@ -433,23 +434,15 @@ func HasOwner(refList []metav1.OwnerReference, apiVersion string, kinds []string return false } -// IsPaused returns true if the Cluster is paused or the object has the `paused` annotation. -func IsPaused(cluster *clusterv1.Cluster, o metav1.Object) bool { - if cluster.Spec.Paused { - return true - } - return HasPausedAnnotation(o) -} +var ( + // IsPaused returns true if the Cluster is paused or the object has the `paused` annotation. + // Deprecated: use util/annotations/IsPaused instead + IsPaused = annotations.IsPaused -// HasPausedAnnotation returns true if the object has the `paused` annotation. -func HasPausedAnnotation(o metav1.Object) bool { - annotations := o.GetAnnotations() - if annotations == nil { - return false - } - _, ok := annotations[clusterv1.PausedAnnotation] - return ok -} + // HasPausedAnnotation returns true if the object has the `paused` annotation. + // Deprecated: use util/annotations/HasPausedAnnotation instead + HasPausedAnnotation = annotations.HasPausedAnnotation +) // GetCRDWithContract retrieves a list of CustomResourceDefinitions from using controller-runtime Client, // filtering with the `contract` label passed in. @@ -505,19 +498,16 @@ func (o MachinesByCreationTimestamp) Less(i, j int) bool { // WatchOnClusterPaused adds a conditional watch to the controlled given as input // that sends watch notifications on any create or delete, and only updates // that toggle Cluster.Spec.Cluster. +// Deprecated: Instead add the Watch directly and use predicates.ClusterUnpaused or +// predicates.ClusterUnpausedAndInfrastructureReady depending on your use case. func WatchOnClusterPaused(c controller.Controller, mapFunc handler.Mapper) error { + log := klogr.New().WithName("WatchOnClusterPaused") return c.Watch( &source.Kind{Type: &clusterv1.Cluster{}}, &handler.EnqueueRequestsFromMapFunc{ ToRequests: mapFunc, }, - predicate.Funcs{ - UpdateFunc: func(e event.UpdateEvent) bool { - oldCluster := e.ObjectOld.(*clusterv1.Cluster) - newCluster := e.ObjectNew.(*clusterv1.Cluster) - return oldCluster.Spec.Paused && !newCluster.Spec.Paused - }, - }, + predicates.ClusterUnpaused(log), ) }