Skip to content

Commit

Permalink
Add integration test
Browse files Browse the repository at this point in the history
for MK behavior in case of disabled integration
  • Loading branch information
mszadkow committed Nov 22, 2024
1 parent 1f35e35 commit 8d915b9
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 16 deletions.
2 changes: 1 addition & 1 deletion cmd/kueue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func setupControllers(ctx context.Context, mgr ctrl.Manager, cCache *cache.Cache
if features.Enabled(features.MultiKueue) {
adapters, err := jobframework.GetMultiKueueAdapters(sets.New(cfg.Integrations.Frameworks...))
if err != nil {
setupLog.Error(err, "Could not setup MultiKueue controller")
setupLog.Error(err, "Could not get the enabled multikueue adapters")
os.Exit(1)
}
if err := multikueue.SetupControllers(mgr, *cfg.Namespace,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,8 +377,7 @@ func TestUpdateConfig(t *testing.T) {
builder = builder.WithStatusSubresource(slices.Map(tc.clusters, func(c *kueue.MultiKueueCluster) client.Object { return c })...)
c := builder.Build()

enabledIntegrations := sets.New([]string{"batch/job"}...)
adapters, _ := jobframework.GetMultiKueueAdapters(enabledIntegrations)
adapters, _ := jobframework.GetMultiKueueAdapters(sets.New[string]("batch/job"))
reconciler := newClustersReconciler(c, TestNamespace, 0, defaultOrigin, nil, adapters)
//nolint:fatcontext
reconciler.rootContext = ctx
Expand Down Expand Up @@ -539,8 +538,7 @@ func TestRemoteClientGC(t *testing.T) {
worker1Builder = worker1Builder.WithLists(&kueue.WorkloadList{Items: tc.workersWorkloads}, &batchv1.JobList{Items: tc.workersJobs})
worker1Client := worker1Builder.Build()

enabledIntegrations := sets.New([]string{"batch/job"}...)
adapters, _ := jobframework.GetMultiKueueAdapters(enabledIntegrations)
adapters, _ := jobframework.GetMultiKueueAdapters(sets.New[string]("batch/job"))
w1remoteClient := newRemoteClient(managerClient, nil, nil, defaultOrigin, "", adapters)
w1remoteClient.client = worker1Client
w1remoteClient.connecting.Store(false)
Expand Down
5 changes: 1 addition & 4 deletions pkg/controller/admissionchecks/multikueue/workload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ var (
)

func TestWlReconcile(t *testing.T) {
t.Cleanup(jobframework.EnableIntegrationsForTest(t, "batch/job"))
now := time.Now()
fakeClock := testingclock.NewFakeClock(now)

Expand Down Expand Up @@ -987,9 +986,7 @@ func TestWlReconcile(t *testing.T) {
)

managerClient := managerBuilder.Build()

enabledIntegrations := sets.New([]string{"batch/job"}...)
adapters, _ := jobframework.GetMultiKueueAdapters(enabledIntegrations)
adapters, _ := jobframework.GetMultiKueueAdapters(sets.New[string]("batch/job"))
cRec := newClustersReconciler(managerClient, TestNamespace, 0, defaultOrigin, nil, adapters)

worker1Builder, _ := getClientBuilder()
Expand Down
161 changes: 159 additions & 2 deletions test/integration/multikueue/multikueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
versionutil "k8s.io/apimachinery/pkg/util/version"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -62,6 +63,11 @@ import (
"sigs.k8s.io/kueue/test/util"
)

var defaultEnabledIntegrations sets.Set[string] = sets.New(
"batch/job", "kubeflow.org/mpijob", "ray.io/rayjob", "ray.io/raycluster",
"jobset.x-k8s.io/jobset", "kubeflow.org/mxjob", "kubeflow.org/paddlejob",
"kubeflow.org/pytorchjob", "kubeflow.org/tfjob", "kubeflow.org/xgboostjob")

var _ = ginkgo.Describe("Multikueue", ginkgo.Ordered, ginkgo.ContinueOnFailure, func() {
var (
managerNs *corev1.Namespace
Expand All @@ -86,7 +92,7 @@ var _ = ginkgo.Describe("Multikueue", ginkgo.Ordered, ginkgo.ContinueOnFailure,

ginkgo.BeforeAll(func() {
managerTestCluster.fwk.StartManager(managerTestCluster.ctx, managerTestCluster.cfg, func(ctx context.Context, mgr manager.Manager) {
managerAndMultiKueueSetup(ctx, mgr, 2*time.Second)
managerAndMultiKueueSetup(ctx, mgr, 2*time.Second, defaultEnabledIntegrations)
})
})

Expand Down Expand Up @@ -1462,7 +1468,7 @@ var _ = ginkgo.Describe("Multikueue no GC", ginkgo.Ordered, ginkgo.ContinueOnFai

ginkgo.BeforeAll(func() {
managerTestCluster.fwk.StartManager(managerTestCluster.ctx, managerTestCluster.cfg, func(ctx context.Context, mgr manager.Manager) {
managerAndMultiKueueSetup(ctx, mgr, 0)
managerAndMultiKueueSetup(ctx, mgr, 0, defaultEnabledIntegrations)
})
})

Expand Down Expand Up @@ -1638,6 +1644,157 @@ var _ = ginkgo.Describe("Multikueue no GC", ginkgo.Ordered, ginkgo.ContinueOnFai
})
})

var _ = ginkgo.Describe("Multikueue with disabled integrations", ginkgo.Ordered, ginkgo.ContinueOnFailure, func() {
var (
managerNs *corev1.Namespace
worker1Ns *corev1.Namespace

managerMultikueueSecret1 *corev1.Secret
workerCluster1 *kueue.MultiKueueCluster
managerMultiKueueConfig *kueue.MultiKueueConfig
multikueueAC *kueue.AdmissionCheck
managerCq *kueue.ClusterQueue
managerLq *kueue.LocalQueue

worker1Cq *kueue.ClusterQueue
worker1Lq *kueue.LocalQueue
)

ginkgo.BeforeAll(func() {
managerTestCluster.fwk.StartManager(managerTestCluster.ctx, managerTestCluster.cfg, func(ctx context.Context, mgr manager.Manager) {
managerAndMultiKueueSetup(ctx, mgr, 2*time.Second, sets.New("batch/job"))
})
})

ginkgo.AfterAll(func() {
managerTestCluster.fwk.StopManager(managerTestCluster.ctx)
})

ginkgo.BeforeEach(func() {
managerNs = &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "multikueue-",
},
}
gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, managerNs)).To(gomega.Succeed())

worker1Ns = &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: managerNs.Name,
},
}
gomega.Expect(worker1TestCluster.client.Create(worker1TestCluster.ctx, worker1Ns)).To(gomega.Succeed())

w1Kubeconfig, err := worker1TestCluster.kubeConfigBytes()
gomega.Expect(err).NotTo(gomega.HaveOccurred())

managerMultikueueSecret1 = &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "multikueue1",
Namespace: managersConfigNamespace.Name,
},
Data: map[string][]byte{
kueue.MultiKueueConfigSecretKey: w1Kubeconfig,
},
}
gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, managerMultikueueSecret1)).To(gomega.Succeed())

workerCluster1 = utiltesting.MakeMultiKueueCluster("worker1").KubeConfig(kueue.SecretLocationType, managerMultikueueSecret1.Name).Obj()
gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, workerCluster1)).To(gomega.Succeed())

managerMultiKueueConfig = utiltesting.MakeMultiKueueConfig("multikueueconfig").Clusters(workerCluster1.Name).Obj()
gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, managerMultiKueueConfig)).Should(gomega.Succeed())

multikueueAC = utiltesting.MakeAdmissionCheck("ac1").
ControllerName(kueue.MultiKueueControllerName).
Parameters(kueue.GroupVersion.Group, "MultiKueueConfig", managerMultiKueueConfig.Name).
Obj()
gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, multikueueAC)).Should(gomega.Succeed())

ginkgo.By("wait for check active", func() {
updatedAc := kueue.AdmissionCheck{}
acKey := client.ObjectKeyFromObject(multikueueAC)
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, acKey, &updatedAc)).To(gomega.Succeed())
cond := apimeta.FindStatusCondition(updatedAc.Status.Conditions, kueue.AdmissionCheckActive)
g.Expect(cond).NotTo(gomega.BeNil())
g.Expect(cond.Status).To(gomega.Equal(metav1.ConditionTrue), "Reason: %s, Message: %q", cond.Reason, cond.Message)
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

managerCq = utiltesting.MakeClusterQueue("q1").
AdmissionChecks(multikueueAC.Name).
Obj()
gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, managerCq)).Should(gomega.Succeed())

managerLq = utiltesting.MakeLocalQueue(managerCq.Name, managerNs.Name).ClusterQueue(managerCq.Name).Obj()
gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, managerLq)).Should(gomega.Succeed())

worker1Cq = utiltesting.MakeClusterQueue("q1").Obj()
gomega.Expect(worker1TestCluster.client.Create(worker1TestCluster.ctx, worker1Cq)).Should(gomega.Succeed())
worker1Lq = utiltesting.MakeLocalQueue(worker1Cq.Name, worker1Ns.Name).ClusterQueue(worker1Cq.Name).Obj()
gomega.Expect(worker1TestCluster.client.Create(worker1TestCluster.ctx, worker1Lq)).Should(gomega.Succeed())
})

ginkgo.AfterEach(func() {
gomega.Expect(util.DeleteNamespace(managerTestCluster.ctx, managerTestCluster.client, managerNs)).To(gomega.Succeed())
gomega.Expect(util.DeleteNamespace(worker1TestCluster.ctx, worker1TestCluster.client, worker1Ns)).To(gomega.Succeed())
util.ExpectObjectToBeDeleted(managerTestCluster.ctx, managerTestCluster.client, managerCq, true)
util.ExpectObjectToBeDeleted(worker1TestCluster.ctx, worker1TestCluster.client, worker1Cq, true)
util.ExpectObjectToBeDeleted(managerTestCluster.ctx, managerTestCluster.client, multikueueAC, true)
util.ExpectObjectToBeDeleted(managerTestCluster.ctx, managerTestCluster.client, managerMultiKueueConfig, true)
util.ExpectObjectToBeDeleted(managerTestCluster.ctx, managerTestCluster.client, workerCluster1, true)
util.ExpectObjectToBeDeleted(managerTestCluster.ctx, managerTestCluster.client, managerMultikueueSecret1, true)
})

ginkgo.It("Should not create a MPIJob workload, when MPIJob adapter is not enabled", func() {
admission := utiltesting.MakeAdmission(managerCq.Name).PodSets(
kueue.PodSetAssignment{
Name: "launcher",
}, kueue.PodSetAssignment{
Name: "worker",
},
)
mpijob := testingmpijob.MakeMPIJob("mpijob1", managerNs.Name).
Queue(managerLq.Name).
ManagedBy(kueue.MultiKueueControllerName).
MPIJobReplicaSpecs(
testingmpijob.MPIJobReplicaSpecRequirement{
ReplicaType: kfmpi.MPIReplicaTypeLauncher,
ReplicaCount: 1,
RestartPolicy: corev1.RestartPolicyOnFailure,
},
testingmpijob.MPIJobReplicaSpecRequirement{
ReplicaType: kfmpi.MPIReplicaTypeWorker,
ReplicaCount: 1,
RestartPolicy: corev1.RestartPolicyNever,
},
).
Obj()
gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, mpijob)).Should(gomega.Succeed())
wlLookupKey := types.NamespacedName{Name: workloadmpijob.GetWorkloadNameForMPIJob(mpijob.Name, mpijob.UID), Namespace: managerNs.Name}

ginkgo.By("setting workload reservation in the management cluster", func() {
createdWorkload := &kueue.Workload{}
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed())
g.Expect(util.SetQuotaReservation(managerTestCluster.ctx, managerTestCluster.client, createdWorkload, admission.Obj())).To(gomega.Succeed())
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("checking the workload creation was rejected in the management cluster", func() {
managerWl := &kueue.Workload{}
gomega.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, managerWl)).To(gomega.Succeed())
acs := workload.FindAdmissionCheck(managerWl.Status.AdmissionChecks, multikueueAC.Name)
gomega.Expect(acs).To(gomega.BeComparableTo(&kueue.AdmissionCheckState{
Name: multikueueAC.Name,
State: kueue.CheckStateRejected,
Message: `No multikueue adapter found for owner kind "kubeflow.org/v2beta1, Kind=MPIJob"`,
}, cmpopts.IgnoreFields(kueue.AdmissionCheckState{}, "LastTransitionTime")))
})
})
})

func admitWorkloadAndCheckWorkerCopies(acName string, wlLookupKey types.NamespacedName, admission *utiltesting.AdmissionWrapper) {
ginkgo.By("setting workload reservation in the management cluster", func() {
createdWorkload := &kueue.Workload{}
Expand Down
7 changes: 2 additions & 5 deletions test/integration/multikueue/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,16 +208,13 @@ func managerSetup(ctx context.Context, mgr manager.Manager) {
gomega.Expect(err).NotTo(gomega.HaveOccurred())
}

func managerAndMultiKueueSetup(ctx context.Context, mgr manager.Manager, gcInterval time.Duration) {
func managerAndMultiKueueSetup(ctx context.Context, mgr manager.Manager, gcInterval time.Duration, enabledIntegrations sets.Set[string]) {
managerSetup(ctx, mgr)

err := multikueue.SetupIndexer(ctx, mgr.GetFieldIndexer(), managersConfigNamespace.Name)
gomega.Expect(err).NotTo(gomega.HaveOccurred())

adapters, err := jobframework.GetMultiKueueAdapters(sets.New([]string{
"batch/job", "kubeflow.org/mpijob", "ray.io/rayjob", "ray.io/raycluster",
"jobset.x-k8s.io/jobset", "kubeflow.org/mxjob", "kubeflow.org/paddlejob",
"kubeflow.org/pytorchjob", "kubeflow.org/tfjob", "kubeflow.org/xgboostjob"}...))
adapters, err := jobframework.GetMultiKueueAdapters(enabledIntegrations)
gomega.Expect(err).NotTo(gomega.HaveOccurred())

err = multikueue.SetupControllers(mgr, managersConfigNamespace.Name,
Expand Down

0 comments on commit 8d915b9

Please sign in to comment.