diff --git a/cmd/kueue/main.go b/cmd/kueue/main.go index 483dfca644..11b1704147 100644 --- a/cmd/kueue/main.go +++ b/cmd/kueue/main.go @@ -29,6 +29,7 @@ import ( schedulingv1 "k8s.io/api/scheduling/v1" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/sets" utilfeature "k8s.io/apiserver/pkg/util/feature" autoscaling "k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1beta1" "k8s.io/client-go/discovery" @@ -264,10 +265,16 @@ func setupControllers(ctx context.Context, mgr ctrl.Manager, cCache *cache.Cache } if features.Enabled(features.MultiKueue) { + adapters, err := jobframework.GetMultiKueueAdapters(sets.New(cfg.Integrations.Frameworks...)) + if err != nil { + setupLog.Error(err, "Could not get the enabled multikueue adapters") + os.Exit(1) + } if err := multikueue.SetupControllers(mgr, *cfg.Namespace, multikueue.WithGCInterval(cfg.MultiKueue.GCInterval.Duration), multikueue.WithOrigin(ptr.Deref(cfg.MultiKueue.Origin, configapi.DefaultMultiKueueOrigin)), multikueue.WithWorkerLostTimeout(cfg.MultiKueue.WorkerLostTimeout.Duration), + multikueue.WithAdapters(adapters), ); err != nil { setupLog.Error(err, "Could not setup MultiKueue controller") os.Exit(1) diff --git a/pkg/controller/admissionchecks/multikueue/controllers.go b/pkg/controller/admissionchecks/multikueue/controllers.go index 265ddd4e95..8dd0177094 100644 --- a/pkg/controller/admissionchecks/multikueue/controllers.go +++ b/pkg/controller/admissionchecks/multikueue/controllers.go @@ -36,6 +36,7 @@ type SetupOptions struct { origin string workerLostTimeout time.Duration eventsBatchPeriod time.Duration + adapters map[string]jobframework.MultiKueueAdapter } type SetupOption func(o *SetupOptions) @@ -72,12 +73,20 @@ func WithEventsBatchPeriod(d time.Duration) SetupOption { } } +// WithAdapter - sets or updates the adapter of the MultiKueue adapters. +func WithAdapters(adapters map[string]jobframework.MultiKueueAdapter) SetupOption { + return func(o *SetupOptions) { + o.adapters = adapters + } +} + func SetupControllers(mgr ctrl.Manager, namespace string, opts ...SetupOption) error { options := &SetupOptions{ gcInterval: defaultGCInterval, origin: defaultOrigin, workerLostTimeout: defaultWorkerLostTimeout, eventsBatchPeriod: constants.UpdatesBatchPeriod, + adapters: make(map[string]jobframework.MultiKueueAdapter), } for _, o := range opts { @@ -95,12 +104,7 @@ func SetupControllers(mgr ctrl.Manager, namespace string, opts ...SetupOption) e return err } - adapters, err := jobframework.GetMultiKueueAdapters() - if err != nil { - return err - } - - cRec := newClustersReconciler(mgr.GetClient(), namespace, options.gcInterval, options.origin, fsWatcher, adapters) + cRec := newClustersReconciler(mgr.GetClient(), namespace, options.gcInterval, options.origin, fsWatcher, options.adapters) err = cRec.setupWithManager(mgr) if err != nil { return err @@ -112,6 +116,6 @@ func SetupControllers(mgr ctrl.Manager, namespace string, opts ...SetupOption) e return err } - wlRec := newWlReconciler(mgr.GetClient(), helper, cRec, options.origin, options.workerLostTimeout, options.eventsBatchPeriod, adapters) + wlRec := newWlReconciler(mgr.GetClient(), helper, cRec, options.origin, options.workerLostTimeout, options.eventsBatchPeriod, options.adapters) return wlRec.setupWithManager(mgr) } diff --git a/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go b/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go index 92f625abb5..40a319bace 100644 --- a/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go +++ b/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go @@ -28,6 +28,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/watch" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/interceptor" @@ -376,7 +377,7 @@ func TestUpdateConfig(t *testing.T) { builder = builder.WithStatusSubresource(slices.Map(tc.clusters, func(c *kueue.MultiKueueCluster) client.Object { return c })...) c := builder.Build() - adapters, _ := jobframework.GetMultiKueueAdapters() + adapters, _ := jobframework.GetMultiKueueAdapters(sets.New[string]("batch/job")) reconciler := newClustersReconciler(c, TestNamespace, 0, defaultOrigin, nil, adapters) //nolint:fatcontext reconciler.rootContext = ctx @@ -537,7 +538,7 @@ func TestRemoteClientGC(t *testing.T) { worker1Builder = worker1Builder.WithLists(&kueue.WorkloadList{Items: tc.workersWorkloads}, &batchv1.JobList{Items: tc.workersJobs}) worker1Client := worker1Builder.Build() - adapters, _ := jobframework.GetMultiKueueAdapters() + adapters, _ := jobframework.GetMultiKueueAdapters(sets.New[string]("batch/job")) w1remoteClient := newRemoteClient(managerClient, nil, nil, defaultOrigin, "", adapters) w1remoteClient.client = worker1Client w1remoteClient.connecting.Store(false) diff --git a/pkg/controller/admissionchecks/multikueue/workload_test.go b/pkg/controller/admissionchecks/multikueue/workload_test.go index 9b69aafd11..8f3417d70f 100644 --- a/pkg/controller/admissionchecks/multikueue/workload_test.go +++ b/pkg/controller/admissionchecks/multikueue/workload_test.go @@ -28,6 +28,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" testingclock "k8s.io/utils/clock/testing" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/interceptor" @@ -985,8 +986,7 @@ func TestWlReconcile(t *testing.T) { ) managerClient := managerBuilder.Build() - - adapters, _ := jobframework.GetMultiKueueAdapters() + adapters, _ := jobframework.GetMultiKueueAdapters(sets.New[string]("batch/job")) cRec := newClustersReconciler(managerClient, TestNamespace, 0, defaultOrigin, nil, adapters) worker1Builder, _ := getClientBuilder() diff --git a/pkg/controller/jobframework/integrationmanager.go b/pkg/controller/jobframework/integrationmanager.go index 8d703f95ea..7418eb02fe 100644 --- a/pkg/controller/jobframework/integrationmanager.go +++ b/pkg/controller/jobframework/integrationmanager.go @@ -304,12 +304,12 @@ func GetEmptyOwnerObject(owner *metav1.OwnerReference) client.Object { } // GetMultiKueueAdapters returns the map containing the MultiKueue adapters for the -// registered integrations. +// registered and enabled integrations. // An error is returned if more then one adapter is registers for one object type. -func GetMultiKueueAdapters() (map[string]MultiKueueAdapter, error) { +func GetMultiKueueAdapters(enabledIntegrations sets.Set[string]) (map[string]MultiKueueAdapter, error) { ret := map[string]MultiKueueAdapter{} - if err := manager.forEach(func(_ string, cb IntegrationCallbacks) error { - if cb.MultiKueueAdapter != nil { + if err := manager.forEach(func(intName string, cb IntegrationCallbacks) error { + if cb.MultiKueueAdapter != nil && enabledIntegrations.Has(intName) { gvk := cb.MultiKueueAdapter.GVK().String() if _, found := ret[gvk]; found { return fmt.Errorf("multiple adapters for GVK: %q", gvk) diff --git a/test/integration/multikueue/multikueue_test.go b/test/integration/multikueue/multikueue_test.go index 011b78e925..7c9f36a3ce 100644 --- a/test/integration/multikueue/multikueue_test.go +++ b/test/integration/multikueue/multikueue_test.go @@ -33,6 +33,7 @@ import ( apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" versionutil "k8s.io/apimachinery/pkg/util/version" "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" @@ -62,6 +63,11 @@ import ( "sigs.k8s.io/kueue/test/util" ) +var defaultEnabledIntegrations sets.Set[string] = sets.New( + "batch/job", "kubeflow.org/mpijob", "ray.io/rayjob", "ray.io/raycluster", + "jobset.x-k8s.io/jobset", "kubeflow.org/mxjob", "kubeflow.org/paddlejob", + "kubeflow.org/pytorchjob", "kubeflow.org/tfjob", "kubeflow.org/xgboostjob") + var _ = ginkgo.Describe("Multikueue", ginkgo.Ordered, ginkgo.ContinueOnFailure, func() { var ( managerNs *corev1.Namespace @@ -86,7 +92,7 @@ var _ = ginkgo.Describe("Multikueue", ginkgo.Ordered, ginkgo.ContinueOnFailure, ginkgo.BeforeAll(func() { managerTestCluster.fwk.StartManager(managerTestCluster.ctx, managerTestCluster.cfg, func(ctx context.Context, mgr manager.Manager) { - managerAndMultiKueueSetup(ctx, mgr, 2*time.Second) + managerAndMultiKueueSetup(ctx, mgr, 2*time.Second, defaultEnabledIntegrations) }) }) @@ -1462,7 +1468,7 @@ var _ = ginkgo.Describe("Multikueue no GC", ginkgo.Ordered, ginkgo.ContinueOnFai ginkgo.BeforeAll(func() { managerTestCluster.fwk.StartManager(managerTestCluster.ctx, managerTestCluster.cfg, func(ctx context.Context, mgr manager.Manager) { - managerAndMultiKueueSetup(ctx, mgr, 0) + managerAndMultiKueueSetup(ctx, mgr, 0, defaultEnabledIntegrations) }) }) @@ -1638,6 +1644,157 @@ var _ = ginkgo.Describe("Multikueue no GC", ginkgo.Ordered, ginkgo.ContinueOnFai }) }) +var _ = ginkgo.Describe("Multikueue with disabled integrations", ginkgo.Ordered, ginkgo.ContinueOnFailure, func() { + var ( + managerNs *corev1.Namespace + worker1Ns *corev1.Namespace + + managerMultikueueSecret1 *corev1.Secret + workerCluster1 *kueue.MultiKueueCluster + managerMultiKueueConfig *kueue.MultiKueueConfig + multikueueAC *kueue.AdmissionCheck + managerCq *kueue.ClusterQueue + managerLq *kueue.LocalQueue + + worker1Cq *kueue.ClusterQueue + worker1Lq *kueue.LocalQueue + ) + + ginkgo.BeforeAll(func() { + managerTestCluster.fwk.StartManager(managerTestCluster.ctx, managerTestCluster.cfg, func(ctx context.Context, mgr manager.Manager) { + managerAndMultiKueueSetup(ctx, mgr, 2*time.Second, sets.New("batch/job")) + }) + }) + + ginkgo.AfterAll(func() { + managerTestCluster.fwk.StopManager(managerTestCluster.ctx) + }) + + ginkgo.BeforeEach(func() { + managerNs = &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "multikueue-", + }, + } + gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, managerNs)).To(gomega.Succeed()) + + worker1Ns = &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: managerNs.Name, + }, + } + gomega.Expect(worker1TestCluster.client.Create(worker1TestCluster.ctx, worker1Ns)).To(gomega.Succeed()) + + w1Kubeconfig, err := worker1TestCluster.kubeConfigBytes() + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + managerMultikueueSecret1 = &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "multikueue1", + Namespace: managersConfigNamespace.Name, + }, + Data: map[string][]byte{ + kueue.MultiKueueConfigSecretKey: w1Kubeconfig, + }, + } + gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, managerMultikueueSecret1)).To(gomega.Succeed()) + + workerCluster1 = utiltesting.MakeMultiKueueCluster("worker1").KubeConfig(kueue.SecretLocationType, managerMultikueueSecret1.Name).Obj() + gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, workerCluster1)).To(gomega.Succeed()) + + managerMultiKueueConfig = utiltesting.MakeMultiKueueConfig("multikueueconfig").Clusters(workerCluster1.Name).Obj() + gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, managerMultiKueueConfig)).Should(gomega.Succeed()) + + multikueueAC = utiltesting.MakeAdmissionCheck("ac1"). + ControllerName(kueue.MultiKueueControllerName). + Parameters(kueue.GroupVersion.Group, "MultiKueueConfig", managerMultiKueueConfig.Name). + Obj() + gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, multikueueAC)).Should(gomega.Succeed()) + + ginkgo.By("wait for check active", func() { + updatedAc := kueue.AdmissionCheck{} + acKey := client.ObjectKeyFromObject(multikueueAC) + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, acKey, &updatedAc)).To(gomega.Succeed()) + cond := apimeta.FindStatusCondition(updatedAc.Status.Conditions, kueue.AdmissionCheckActive) + g.Expect(cond).NotTo(gomega.BeNil()) + g.Expect(cond.Status).To(gomega.Equal(metav1.ConditionTrue), "Reason: %s, Message: %q", cond.Reason, cond.Message) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + + managerCq = utiltesting.MakeClusterQueue("q1"). + AdmissionChecks(multikueueAC.Name). + Obj() + gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, managerCq)).Should(gomega.Succeed()) + + managerLq = utiltesting.MakeLocalQueue(managerCq.Name, managerNs.Name).ClusterQueue(managerCq.Name).Obj() + gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, managerLq)).Should(gomega.Succeed()) + + worker1Cq = utiltesting.MakeClusterQueue("q1").Obj() + gomega.Expect(worker1TestCluster.client.Create(worker1TestCluster.ctx, worker1Cq)).Should(gomega.Succeed()) + worker1Lq = utiltesting.MakeLocalQueue(worker1Cq.Name, worker1Ns.Name).ClusterQueue(worker1Cq.Name).Obj() + gomega.Expect(worker1TestCluster.client.Create(worker1TestCluster.ctx, worker1Lq)).Should(gomega.Succeed()) + }) + + ginkgo.AfterEach(func() { + gomega.Expect(util.DeleteNamespace(managerTestCluster.ctx, managerTestCluster.client, managerNs)).To(gomega.Succeed()) + gomega.Expect(util.DeleteNamespace(worker1TestCluster.ctx, worker1TestCluster.client, worker1Ns)).To(gomega.Succeed()) + util.ExpectObjectToBeDeleted(managerTestCluster.ctx, managerTestCluster.client, managerCq, true) + util.ExpectObjectToBeDeleted(worker1TestCluster.ctx, worker1TestCluster.client, worker1Cq, true) + util.ExpectObjectToBeDeleted(managerTestCluster.ctx, managerTestCluster.client, multikueueAC, true) + util.ExpectObjectToBeDeleted(managerTestCluster.ctx, managerTestCluster.client, managerMultiKueueConfig, true) + util.ExpectObjectToBeDeleted(managerTestCluster.ctx, managerTestCluster.client, workerCluster1, true) + util.ExpectObjectToBeDeleted(managerTestCluster.ctx, managerTestCluster.client, managerMultikueueSecret1, true) + }) + + ginkgo.It("Should not create a MPIJob workload, when MPIJob adapter is not enabled", func() { + admission := utiltesting.MakeAdmission(managerCq.Name).PodSets( + kueue.PodSetAssignment{ + Name: "launcher", + }, kueue.PodSetAssignment{ + Name: "worker", + }, + ) + mpijob := testingmpijob.MakeMPIJob("mpijob1", managerNs.Name). + Queue(managerLq.Name). + ManagedBy(kueue.MultiKueueControllerName). + MPIJobReplicaSpecs( + testingmpijob.MPIJobReplicaSpecRequirement{ + ReplicaType: kfmpi.MPIReplicaTypeLauncher, + ReplicaCount: 1, + RestartPolicy: corev1.RestartPolicyOnFailure, + }, + testingmpijob.MPIJobReplicaSpecRequirement{ + ReplicaType: kfmpi.MPIReplicaTypeWorker, + ReplicaCount: 1, + RestartPolicy: corev1.RestartPolicyNever, + }, + ). + Obj() + gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, mpijob)).Should(gomega.Succeed()) + wlLookupKey := types.NamespacedName{Name: workloadmpijob.GetWorkloadNameForMPIJob(mpijob.Name, mpijob.UID), Namespace: managerNs.Name} + + ginkgo.By("setting workload reservation in the management cluster", func() { + createdWorkload := &kueue.Workload{} + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) + g.Expect(util.SetQuotaReservation(managerTestCluster.ctx, managerTestCluster.client, createdWorkload, admission.Obj())).To(gomega.Succeed()) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("checking the workload creation was rejected in the management cluster", func() { + managerWl := &kueue.Workload{} + gomega.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, managerWl)).To(gomega.Succeed()) + acs := workload.FindAdmissionCheck(managerWl.Status.AdmissionChecks, multikueueAC.Name) + gomega.Expect(acs).To(gomega.BeComparableTo(&kueue.AdmissionCheckState{ + Name: multikueueAC.Name, + State: kueue.CheckStateRejected, + Message: `No multikueue adapter found for owner kind "kubeflow.org/v2beta1, Kind=MPIJob"`, + }, cmpopts.IgnoreFields(kueue.AdmissionCheckState{}, "LastTransitionTime"))) + }) + }) +}) + func admitWorkloadAndCheckWorkerCopies(acName string, wlLookupKey types.NamespacedName, admission *utiltesting.AdmissionWrapper) { ginkgo.By("setting workload reservation in the management cluster", func() { createdWorkload := &kueue.Workload{} diff --git a/test/integration/multikueue/suite_test.go b/test/integration/multikueue/suite_test.go index c1babb4be8..6425fab0dd 100644 --- a/test/integration/multikueue/suite_test.go +++ b/test/integration/multikueue/suite_test.go @@ -28,6 +28,7 @@ import ( "github.com/onsi/gomega" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" versionutil "k8s.io/apimachinery/pkg/util/version" "k8s.io/client-go/discovery" "k8s.io/client-go/rest" @@ -207,16 +208,20 @@ func managerSetup(ctx context.Context, mgr manager.Manager) { gomega.Expect(err).NotTo(gomega.HaveOccurred()) } -func managerAndMultiKueueSetup(ctx context.Context, mgr manager.Manager, gcInterval time.Duration) { +func managerAndMultiKueueSetup(ctx context.Context, mgr manager.Manager, gcInterval time.Duration, enabledIntegrations sets.Set[string]) { managerSetup(ctx, mgr) err := multikueue.SetupIndexer(ctx, mgr.GetFieldIndexer(), managersConfigNamespace.Name) gomega.Expect(err).NotTo(gomega.HaveOccurred()) + adapters, err := jobframework.GetMultiKueueAdapters(enabledIntegrations) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + err = multikueue.SetupControllers(mgr, managersConfigNamespace.Name, multikueue.WithGCInterval(gcInterval), multikueue.WithWorkerLostTimeout(testingWorkerLostTimeout), multikueue.WithEventsBatchPeriod(100*time.Millisecond), + multikueue.WithAdapters(adapters), ) gomega.Expect(err).NotTo(gomega.HaveOccurred()) }