diff --git a/cmd/kueue/main.go b/cmd/kueue/main.go index 50480d7368..11b1704147 100644 --- a/cmd/kueue/main.go +++ b/cmd/kueue/main.go @@ -267,7 +267,7 @@ func setupControllers(ctx context.Context, mgr ctrl.Manager, cCache *cache.Cache if features.Enabled(features.MultiKueue) { adapters, err := jobframework.GetMultiKueueAdapters(sets.New(cfg.Integrations.Frameworks...)) if err != nil { - setupLog.Error(err, "Could not setup MultiKueue controller") + setupLog.Error(err, "Could not get the enabled multikueue adapters") os.Exit(1) } if err := multikueue.SetupControllers(mgr, *cfg.Namespace, diff --git a/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go b/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go index f6a05ad52c..40a319bace 100644 --- a/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go +++ b/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go @@ -377,8 +377,7 @@ func TestUpdateConfig(t *testing.T) { builder = builder.WithStatusSubresource(slices.Map(tc.clusters, func(c *kueue.MultiKueueCluster) client.Object { return c })...) c := builder.Build() - enabledIntegrations := sets.New([]string{"batch/job"}...) - adapters, _ := jobframework.GetMultiKueueAdapters(enabledIntegrations) + adapters, _ := jobframework.GetMultiKueueAdapters(sets.New[string]("batch/job")) reconciler := newClustersReconciler(c, TestNamespace, 0, defaultOrigin, nil, adapters) //nolint:fatcontext reconciler.rootContext = ctx @@ -539,8 +538,7 @@ func TestRemoteClientGC(t *testing.T) { worker1Builder = worker1Builder.WithLists(&kueue.WorkloadList{Items: tc.workersWorkloads}, &batchv1.JobList{Items: tc.workersJobs}) worker1Client := worker1Builder.Build() - enabledIntegrations := sets.New([]string{"batch/job"}...) - adapters, _ := jobframework.GetMultiKueueAdapters(enabledIntegrations) + adapters, _ := jobframework.GetMultiKueueAdapters(sets.New[string]("batch/job")) w1remoteClient := newRemoteClient(managerClient, nil, nil, defaultOrigin, "", adapters) w1remoteClient.client = worker1Client w1remoteClient.connecting.Store(false) diff --git a/pkg/controller/admissionchecks/multikueue/workload_test.go b/pkg/controller/admissionchecks/multikueue/workload_test.go index c963b42465..8f3417d70f 100644 --- a/pkg/controller/admissionchecks/multikueue/workload_test.go +++ b/pkg/controller/admissionchecks/multikueue/workload_test.go @@ -52,7 +52,6 @@ var ( ) func TestWlReconcile(t *testing.T) { - t.Cleanup(jobframework.EnableIntegrationsForTest(t, "batch/job")) now := time.Now() fakeClock := testingclock.NewFakeClock(now) @@ -987,9 +986,7 @@ func TestWlReconcile(t *testing.T) { ) managerClient := managerBuilder.Build() - - enabledIntegrations := sets.New([]string{"batch/job"}...) - adapters, _ := jobframework.GetMultiKueueAdapters(enabledIntegrations) + adapters, _ := jobframework.GetMultiKueueAdapters(sets.New[string]("batch/job")) cRec := newClustersReconciler(managerClient, TestNamespace, 0, defaultOrigin, nil, adapters) worker1Builder, _ := getClientBuilder() diff --git a/test/integration/multikueue/multikueue_test.go b/test/integration/multikueue/multikueue_test.go index 011b78e925..7c9f36a3ce 100644 --- a/test/integration/multikueue/multikueue_test.go +++ b/test/integration/multikueue/multikueue_test.go @@ -33,6 +33,7 @@ import ( apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" versionutil "k8s.io/apimachinery/pkg/util/version" "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" @@ -62,6 +63,11 @@ import ( "sigs.k8s.io/kueue/test/util" ) +var defaultEnabledIntegrations sets.Set[string] = sets.New( + "batch/job", "kubeflow.org/mpijob", "ray.io/rayjob", "ray.io/raycluster", + "jobset.x-k8s.io/jobset", "kubeflow.org/mxjob", "kubeflow.org/paddlejob", + "kubeflow.org/pytorchjob", "kubeflow.org/tfjob", "kubeflow.org/xgboostjob") + var _ = ginkgo.Describe("Multikueue", ginkgo.Ordered, ginkgo.ContinueOnFailure, func() { var ( managerNs *corev1.Namespace @@ -86,7 +92,7 @@ var _ = ginkgo.Describe("Multikueue", ginkgo.Ordered, ginkgo.ContinueOnFailure, ginkgo.BeforeAll(func() { managerTestCluster.fwk.StartManager(managerTestCluster.ctx, managerTestCluster.cfg, func(ctx context.Context, mgr manager.Manager) { - managerAndMultiKueueSetup(ctx, mgr, 2*time.Second) + managerAndMultiKueueSetup(ctx, mgr, 2*time.Second, defaultEnabledIntegrations) }) }) @@ -1462,7 +1468,7 @@ var _ = ginkgo.Describe("Multikueue no GC", ginkgo.Ordered, ginkgo.ContinueOnFai ginkgo.BeforeAll(func() { managerTestCluster.fwk.StartManager(managerTestCluster.ctx, managerTestCluster.cfg, func(ctx context.Context, mgr manager.Manager) { - managerAndMultiKueueSetup(ctx, mgr, 0) + managerAndMultiKueueSetup(ctx, mgr, 0, defaultEnabledIntegrations) }) }) @@ -1638,6 +1644,157 @@ var _ = ginkgo.Describe("Multikueue no GC", ginkgo.Ordered, ginkgo.ContinueOnFai }) }) +var _ = ginkgo.Describe("Multikueue with disabled integrations", ginkgo.Ordered, ginkgo.ContinueOnFailure, func() { + var ( + managerNs *corev1.Namespace + worker1Ns *corev1.Namespace + + managerMultikueueSecret1 *corev1.Secret + workerCluster1 *kueue.MultiKueueCluster + managerMultiKueueConfig *kueue.MultiKueueConfig + multikueueAC *kueue.AdmissionCheck + managerCq *kueue.ClusterQueue + managerLq *kueue.LocalQueue + + worker1Cq *kueue.ClusterQueue + worker1Lq *kueue.LocalQueue + ) + + ginkgo.BeforeAll(func() { + managerTestCluster.fwk.StartManager(managerTestCluster.ctx, managerTestCluster.cfg, func(ctx context.Context, mgr manager.Manager) { + managerAndMultiKueueSetup(ctx, mgr, 2*time.Second, sets.New("batch/job")) + }) + }) + + ginkgo.AfterAll(func() { + managerTestCluster.fwk.StopManager(managerTestCluster.ctx) + }) + + ginkgo.BeforeEach(func() { + managerNs = &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "multikueue-", + }, + } + gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, managerNs)).To(gomega.Succeed()) + + worker1Ns = &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: managerNs.Name, + }, + } + gomega.Expect(worker1TestCluster.client.Create(worker1TestCluster.ctx, worker1Ns)).To(gomega.Succeed()) + + w1Kubeconfig, err := worker1TestCluster.kubeConfigBytes() + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + managerMultikueueSecret1 = &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "multikueue1", + Namespace: managersConfigNamespace.Name, + }, + Data: map[string][]byte{ + kueue.MultiKueueConfigSecretKey: w1Kubeconfig, + }, + } + gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, managerMultikueueSecret1)).To(gomega.Succeed()) + + workerCluster1 = utiltesting.MakeMultiKueueCluster("worker1").KubeConfig(kueue.SecretLocationType, managerMultikueueSecret1.Name).Obj() + gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, workerCluster1)).To(gomega.Succeed()) + + managerMultiKueueConfig = utiltesting.MakeMultiKueueConfig("multikueueconfig").Clusters(workerCluster1.Name).Obj() + gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, managerMultiKueueConfig)).Should(gomega.Succeed()) + + multikueueAC = utiltesting.MakeAdmissionCheck("ac1"). + ControllerName(kueue.MultiKueueControllerName). + Parameters(kueue.GroupVersion.Group, "MultiKueueConfig", managerMultiKueueConfig.Name). + Obj() + gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, multikueueAC)).Should(gomega.Succeed()) + + ginkgo.By("wait for check active", func() { + updatedAc := kueue.AdmissionCheck{} + acKey := client.ObjectKeyFromObject(multikueueAC) + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, acKey, &updatedAc)).To(gomega.Succeed()) + cond := apimeta.FindStatusCondition(updatedAc.Status.Conditions, kueue.AdmissionCheckActive) + g.Expect(cond).NotTo(gomega.BeNil()) + g.Expect(cond.Status).To(gomega.Equal(metav1.ConditionTrue), "Reason: %s, Message: %q", cond.Reason, cond.Message) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + + managerCq = utiltesting.MakeClusterQueue("q1"). + AdmissionChecks(multikueueAC.Name). + Obj() + gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, managerCq)).Should(gomega.Succeed()) + + managerLq = utiltesting.MakeLocalQueue(managerCq.Name, managerNs.Name).ClusterQueue(managerCq.Name).Obj() + gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, managerLq)).Should(gomega.Succeed()) + + worker1Cq = utiltesting.MakeClusterQueue("q1").Obj() + gomega.Expect(worker1TestCluster.client.Create(worker1TestCluster.ctx, worker1Cq)).Should(gomega.Succeed()) + worker1Lq = utiltesting.MakeLocalQueue(worker1Cq.Name, worker1Ns.Name).ClusterQueue(worker1Cq.Name).Obj() + gomega.Expect(worker1TestCluster.client.Create(worker1TestCluster.ctx, worker1Lq)).Should(gomega.Succeed()) + }) + + ginkgo.AfterEach(func() { + gomega.Expect(util.DeleteNamespace(managerTestCluster.ctx, managerTestCluster.client, managerNs)).To(gomega.Succeed()) + gomega.Expect(util.DeleteNamespace(worker1TestCluster.ctx, worker1TestCluster.client, worker1Ns)).To(gomega.Succeed()) + util.ExpectObjectToBeDeleted(managerTestCluster.ctx, managerTestCluster.client, managerCq, true) + util.ExpectObjectToBeDeleted(worker1TestCluster.ctx, worker1TestCluster.client, worker1Cq, true) + util.ExpectObjectToBeDeleted(managerTestCluster.ctx, managerTestCluster.client, multikueueAC, true) + util.ExpectObjectToBeDeleted(managerTestCluster.ctx, managerTestCluster.client, managerMultiKueueConfig, true) + util.ExpectObjectToBeDeleted(managerTestCluster.ctx, managerTestCluster.client, workerCluster1, true) + util.ExpectObjectToBeDeleted(managerTestCluster.ctx, managerTestCluster.client, managerMultikueueSecret1, true) + }) + + ginkgo.It("Should not create a MPIJob workload, when MPIJob adapter is not enabled", func() { + admission := utiltesting.MakeAdmission(managerCq.Name).PodSets( + kueue.PodSetAssignment{ + Name: "launcher", + }, kueue.PodSetAssignment{ + Name: "worker", + }, + ) + mpijob := testingmpijob.MakeMPIJob("mpijob1", managerNs.Name). + Queue(managerLq.Name). + ManagedBy(kueue.MultiKueueControllerName). + MPIJobReplicaSpecs( + testingmpijob.MPIJobReplicaSpecRequirement{ + ReplicaType: kfmpi.MPIReplicaTypeLauncher, + ReplicaCount: 1, + RestartPolicy: corev1.RestartPolicyOnFailure, + }, + testingmpijob.MPIJobReplicaSpecRequirement{ + ReplicaType: kfmpi.MPIReplicaTypeWorker, + ReplicaCount: 1, + RestartPolicy: corev1.RestartPolicyNever, + }, + ). + Obj() + gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, mpijob)).Should(gomega.Succeed()) + wlLookupKey := types.NamespacedName{Name: workloadmpijob.GetWorkloadNameForMPIJob(mpijob.Name, mpijob.UID), Namespace: managerNs.Name} + + ginkgo.By("setting workload reservation in the management cluster", func() { + createdWorkload := &kueue.Workload{} + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) + g.Expect(util.SetQuotaReservation(managerTestCluster.ctx, managerTestCluster.client, createdWorkload, admission.Obj())).To(gomega.Succeed()) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("checking the workload creation was rejected in the management cluster", func() { + managerWl := &kueue.Workload{} + gomega.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, managerWl)).To(gomega.Succeed()) + acs := workload.FindAdmissionCheck(managerWl.Status.AdmissionChecks, multikueueAC.Name) + gomega.Expect(acs).To(gomega.BeComparableTo(&kueue.AdmissionCheckState{ + Name: multikueueAC.Name, + State: kueue.CheckStateRejected, + Message: `No multikueue adapter found for owner kind "kubeflow.org/v2beta1, Kind=MPIJob"`, + }, cmpopts.IgnoreFields(kueue.AdmissionCheckState{}, "LastTransitionTime"))) + }) + }) +}) + func admitWorkloadAndCheckWorkerCopies(acName string, wlLookupKey types.NamespacedName, admission *utiltesting.AdmissionWrapper) { ginkgo.By("setting workload reservation in the management cluster", func() { createdWorkload := &kueue.Workload{} diff --git a/test/integration/multikueue/suite_test.go b/test/integration/multikueue/suite_test.go index 8a9bc88094..6425fab0dd 100644 --- a/test/integration/multikueue/suite_test.go +++ b/test/integration/multikueue/suite_test.go @@ -208,16 +208,13 @@ func managerSetup(ctx context.Context, mgr manager.Manager) { gomega.Expect(err).NotTo(gomega.HaveOccurred()) } -func managerAndMultiKueueSetup(ctx context.Context, mgr manager.Manager, gcInterval time.Duration) { +func managerAndMultiKueueSetup(ctx context.Context, mgr manager.Manager, gcInterval time.Duration, enabledIntegrations sets.Set[string]) { managerSetup(ctx, mgr) err := multikueue.SetupIndexer(ctx, mgr.GetFieldIndexer(), managersConfigNamespace.Name) gomega.Expect(err).NotTo(gomega.HaveOccurred()) - adapters, err := jobframework.GetMultiKueueAdapters(sets.New([]string{ - "batch/job", "kubeflow.org/mpijob", "ray.io/rayjob", "ray.io/raycluster", - "jobset.x-k8s.io/jobset", "kubeflow.org/mxjob", "kubeflow.org/paddlejob", - "kubeflow.org/pytorchjob", "kubeflow.org/tfjob", "kubeflow.org/xgboostjob"}...)) + adapters, err := jobframework.GetMultiKueueAdapters(enabledIntegrations) gomega.Expect(err).NotTo(gomega.HaveOccurred()) err = multikueue.SetupControllers(mgr, managersConfigNamespace.Name,