Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GetMultiKueueAdapters() returns only registered and enabled adapters #3603

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cmd/kueue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
schedulingv1 "k8s.io/api/scheduling/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
autoscaling "k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1beta1"
"k8s.io/client-go/discovery"
Expand Down Expand Up @@ -264,10 +265,16 @@ func setupControllers(ctx context.Context, mgr ctrl.Manager, cCache *cache.Cache
}

if features.Enabled(features.MultiKueue) {
adapters, err := jobframework.GetMultiKueueAdapters(sets.New(cfg.Integrations.Frameworks...))
if err != nil {
setupLog.Error(err, "Could not get the enabled multikueue adapters")
os.Exit(1)
}
if err := multikueue.SetupControllers(mgr, *cfg.Namespace,
multikueue.WithGCInterval(cfg.MultiKueue.GCInterval.Duration),
multikueue.WithOrigin(ptr.Deref(cfg.MultiKueue.Origin, configapi.DefaultMultiKueueOrigin)),
multikueue.WithWorkerLostTimeout(cfg.MultiKueue.WorkerLostTimeout.Duration),
multikueue.WithAdapters(adapters),
); err != nil {
setupLog.Error(err, "Could not setup MultiKueue controller")
os.Exit(1)
Expand Down
18 changes: 11 additions & 7 deletions pkg/controller/admissionchecks/multikueue/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type SetupOptions struct {
origin string
workerLostTimeout time.Duration
eventsBatchPeriod time.Duration
adapters map[string]jobframework.MultiKueueAdapter
}

type SetupOption func(o *SetupOptions)
Expand Down Expand Up @@ -72,12 +73,20 @@ func WithEventsBatchPeriod(d time.Duration) SetupOption {
}
}

// WithAdapter - sets or updates the adapter of the MultiKueue adapters.
func WithAdapters(adapters map[string]jobframework.MultiKueueAdapter) SetupOption {
return func(o *SetupOptions) {
o.adapters = adapters
}
}

func SetupControllers(mgr ctrl.Manager, namespace string, opts ...SetupOption) error {
options := &SetupOptions{
gcInterval: defaultGCInterval,
origin: defaultOrigin,
workerLostTimeout: defaultWorkerLostTimeout,
eventsBatchPeriod: constants.UpdatesBatchPeriod,
adapters: make(map[string]jobframework.MultiKueueAdapter),
}

for _, o := range opts {
Expand All @@ -95,12 +104,7 @@ func SetupControllers(mgr ctrl.Manager, namespace string, opts ...SetupOption) e
return err
}

adapters, err := jobframework.GetMultiKueueAdapters()
if err != nil {
return err
}

cRec := newClustersReconciler(mgr.GetClient(), namespace, options.gcInterval, options.origin, fsWatcher, adapters)
cRec := newClustersReconciler(mgr.GetClient(), namespace, options.gcInterval, options.origin, fsWatcher, options.adapters)
err = cRec.setupWithManager(mgr)
if err != nil {
return err
Expand All @@ -112,6 +116,6 @@ func SetupControllers(mgr ctrl.Manager, namespace string, opts ...SetupOption) e
return err
}

wlRec := newWlReconciler(mgr.GetClient(), helper, cRec, options.origin, options.workerLostTimeout, options.eventsBatchPeriod, adapters)
wlRec := newWlReconciler(mgr.GetClient(), helper, cRec, options.origin, options.workerLostTimeout, options.eventsBatchPeriod, options.adapters)
return wlRec.setupWithManager(mgr)
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/watch"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/interceptor"
Expand Down Expand Up @@ -376,7 +377,7 @@ func TestUpdateConfig(t *testing.T) {
builder = builder.WithStatusSubresource(slices.Map(tc.clusters, func(c *kueue.MultiKueueCluster) client.Object { return c })...)
c := builder.Build()

adapters, _ := jobframework.GetMultiKueueAdapters()
adapters, _ := jobframework.GetMultiKueueAdapters(sets.New[string]("batch/job"))
reconciler := newClustersReconciler(c, TestNamespace, 0, defaultOrigin, nil, adapters)
//nolint:fatcontext
reconciler.rootContext = ctx
Expand Down Expand Up @@ -537,7 +538,7 @@ func TestRemoteClientGC(t *testing.T) {
worker1Builder = worker1Builder.WithLists(&kueue.WorkloadList{Items: tc.workersWorkloads}, &batchv1.JobList{Items: tc.workersJobs})
worker1Client := worker1Builder.Build()

adapters, _ := jobframework.GetMultiKueueAdapters()
adapters, _ := jobframework.GetMultiKueueAdapters(sets.New[string]("batch/job"))
w1remoteClient := newRemoteClient(managerClient, nil, nil, defaultOrigin, "", adapters)
w1remoteClient.client = worker1Client
w1remoteClient.connecting.Store(false)
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/admissionchecks/multikueue/workload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
testingclock "k8s.io/utils/clock/testing"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/interceptor"
Expand Down Expand Up @@ -985,8 +986,7 @@ func TestWlReconcile(t *testing.T) {
)

managerClient := managerBuilder.Build()

adapters, _ := jobframework.GetMultiKueueAdapters()
adapters, _ := jobframework.GetMultiKueueAdapters(sets.New[string]("batch/job"))
cRec := newClustersReconciler(managerClient, TestNamespace, 0, defaultOrigin, nil, adapters)

worker1Builder, _ := getClientBuilder()
Expand Down
8 changes: 4 additions & 4 deletions pkg/controller/jobframework/integrationmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,12 +304,12 @@ func GetEmptyOwnerObject(owner *metav1.OwnerReference) client.Object {
}

// GetMultiKueueAdapters returns the map containing the MultiKueue adapters for the
// registered integrations.
// registered and enabled integrations.
// An error is returned if more then one adapter is registers for one object type.
func GetMultiKueueAdapters() (map[string]MultiKueueAdapter, error) {
func GetMultiKueueAdapters(enabledIntegrations sets.Set[string]) (map[string]MultiKueueAdapter, error) {
ret := map[string]MultiKueueAdapter{}
if err := manager.forEach(func(_ string, cb IntegrationCallbacks) error {
if cb.MultiKueueAdapter != nil {
if err := manager.forEach(func(intName string, cb IntegrationCallbacks) error {
if cb.MultiKueueAdapter != nil && enabledIntegrations.Has(intName) {
gvk := cb.MultiKueueAdapter.GVK().String()
if _, found := ret[gvk]; found {
return fmt.Errorf("multiple adapters for GVK: %q", gvk)
Expand Down
161 changes: 159 additions & 2 deletions test/integration/multikueue/multikueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
versionutil "k8s.io/apimachinery/pkg/util/version"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -62,6 +63,11 @@ import (
"sigs.k8s.io/kueue/test/util"
)

var defaultEnabledIntegrations sets.Set[string] = sets.New(
"batch/job", "kubeflow.org/mpijob", "ray.io/rayjob", "ray.io/raycluster",
"jobset.x-k8s.io/jobset", "kubeflow.org/mxjob", "kubeflow.org/paddlejob",
"kubeflow.org/pytorchjob", "kubeflow.org/tfjob", "kubeflow.org/xgboostjob")

var _ = ginkgo.Describe("Multikueue", ginkgo.Ordered, ginkgo.ContinueOnFailure, func() {
var (
managerNs *corev1.Namespace
Expand All @@ -86,7 +92,7 @@ var _ = ginkgo.Describe("Multikueue", ginkgo.Ordered, ginkgo.ContinueOnFailure,

ginkgo.BeforeAll(func() {
managerTestCluster.fwk.StartManager(managerTestCluster.ctx, managerTestCluster.cfg, func(ctx context.Context, mgr manager.Manager) {
managerAndMultiKueueSetup(ctx, mgr, 2*time.Second)
managerAndMultiKueueSetup(ctx, mgr, 2*time.Second, defaultEnabledIntegrations)
})
})

Expand Down Expand Up @@ -1462,7 +1468,7 @@ var _ = ginkgo.Describe("Multikueue no GC", ginkgo.Ordered, ginkgo.ContinueOnFai

ginkgo.BeforeAll(func() {
managerTestCluster.fwk.StartManager(managerTestCluster.ctx, managerTestCluster.cfg, func(ctx context.Context, mgr manager.Manager) {
managerAndMultiKueueSetup(ctx, mgr, 0)
managerAndMultiKueueSetup(ctx, mgr, 0, defaultEnabledIntegrations)
})
})

Expand Down Expand Up @@ -1638,6 +1644,157 @@ var _ = ginkgo.Describe("Multikueue no GC", ginkgo.Ordered, ginkgo.ContinueOnFai
})
})

var _ = ginkgo.Describe("Multikueue with disabled integrations", ginkgo.Ordered, ginkgo.ContinueOnFailure, func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
var _ = ginkgo.Describe("Multikueue with disabled integrations", ginkgo.Ordered, ginkgo.ContinueOnFailure, func() {
var _ = ginkgo.Describe("Multikueue with some integration disabled", ginkgo.Ordered, ginkgo.ContinueOnFailure, func() {

var (
managerNs *corev1.Namespace
worker1Ns *corev1.Namespace

managerMultikueueSecret1 *corev1.Secret
workerCluster1 *kueue.MultiKueueCluster
managerMultiKueueConfig *kueue.MultiKueueConfig
multikueueAC *kueue.AdmissionCheck
managerCq *kueue.ClusterQueue
managerLq *kueue.LocalQueue

worker1Cq *kueue.ClusterQueue
worker1Lq *kueue.LocalQueue
)

ginkgo.BeforeAll(func() {
managerTestCluster.fwk.StartManager(managerTestCluster.ctx, managerTestCluster.cfg, func(ctx context.Context, mgr manager.Manager) {
managerAndMultiKueueSetup(ctx, mgr, 2*time.Second, sets.New("batch/job"))
})
})

ginkgo.AfterAll(func() {
managerTestCluster.fwk.StopManager(managerTestCluster.ctx)
})

ginkgo.BeforeEach(func() {
managerNs = &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "multikueue-",
},
}
gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, managerNs)).To(gomega.Succeed())

worker1Ns = &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: managerNs.Name,
},
}
gomega.Expect(worker1TestCluster.client.Create(worker1TestCluster.ctx, worker1Ns)).To(gomega.Succeed())

w1Kubeconfig, err := worker1TestCluster.kubeConfigBytes()
gomega.Expect(err).NotTo(gomega.HaveOccurred())

managerMultikueueSecret1 = &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "multikueue1",
Namespace: managersConfigNamespace.Name,
},
Data: map[string][]byte{
kueue.MultiKueueConfigSecretKey: w1Kubeconfig,
},
}
gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, managerMultikueueSecret1)).To(gomega.Succeed())

workerCluster1 = utiltesting.MakeMultiKueueCluster("worker1").KubeConfig(kueue.SecretLocationType, managerMultikueueSecret1.Name).Obj()
gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, workerCluster1)).To(gomega.Succeed())

managerMultiKueueConfig = utiltesting.MakeMultiKueueConfig("multikueueconfig").Clusters(workerCluster1.Name).Obj()
gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, managerMultiKueueConfig)).Should(gomega.Succeed())

multikueueAC = utiltesting.MakeAdmissionCheck("ac1").
ControllerName(kueue.MultiKueueControllerName).
Parameters(kueue.GroupVersion.Group, "MultiKueueConfig", managerMultiKueueConfig.Name).
Obj()
gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, multikueueAC)).Should(gomega.Succeed())

ginkgo.By("wait for check active", func() {
updatedAc := kueue.AdmissionCheck{}
acKey := client.ObjectKeyFromObject(multikueueAC)
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, acKey, &updatedAc)).To(gomega.Succeed())
cond := apimeta.FindStatusCondition(updatedAc.Status.Conditions, kueue.AdmissionCheckActive)
g.Expect(cond).NotTo(gomega.BeNil())
g.Expect(cond.Status).To(gomega.Equal(metav1.ConditionTrue), "Reason: %s, Message: %q", cond.Reason, cond.Message)
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

managerCq = utiltesting.MakeClusterQueue("q1").
AdmissionChecks(multikueueAC.Name).
Obj()
gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, managerCq)).Should(gomega.Succeed())

managerLq = utiltesting.MakeLocalQueue(managerCq.Name, managerNs.Name).ClusterQueue(managerCq.Name).Obj()
gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, managerLq)).Should(gomega.Succeed())

worker1Cq = utiltesting.MakeClusterQueue("q1").Obj()
gomega.Expect(worker1TestCluster.client.Create(worker1TestCluster.ctx, worker1Cq)).Should(gomega.Succeed())
worker1Lq = utiltesting.MakeLocalQueue(worker1Cq.Name, worker1Ns.Name).ClusterQueue(worker1Cq.Name).Obj()
gomega.Expect(worker1TestCluster.client.Create(worker1TestCluster.ctx, worker1Lq)).Should(gomega.Succeed())
})

ginkgo.AfterEach(func() {
gomega.Expect(util.DeleteNamespace(managerTestCluster.ctx, managerTestCluster.client, managerNs)).To(gomega.Succeed())
gomega.Expect(util.DeleteNamespace(worker1TestCluster.ctx, worker1TestCluster.client, worker1Ns)).To(gomega.Succeed())
util.ExpectObjectToBeDeleted(managerTestCluster.ctx, managerTestCluster.client, managerCq, true)
util.ExpectObjectToBeDeleted(worker1TestCluster.ctx, worker1TestCluster.client, worker1Cq, true)
util.ExpectObjectToBeDeleted(managerTestCluster.ctx, managerTestCluster.client, multikueueAC, true)
util.ExpectObjectToBeDeleted(managerTestCluster.ctx, managerTestCluster.client, managerMultiKueueConfig, true)
util.ExpectObjectToBeDeleted(managerTestCluster.ctx, managerTestCluster.client, workerCluster1, true)
util.ExpectObjectToBeDeleted(managerTestCluster.ctx, managerTestCluster.client, managerMultikueueSecret1, true)
})

ginkgo.It("Should not create a MPIJob workload, when MPIJob adapter is not enabled", func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test looks ok, but I think we need to add a test case for using the positive scenario (which was broken), that some integrations can be used (e.g. batch/Job) while not all are enabled, maybe "should create batch/Job workload when some frameworks are disabled"

admission := utiltesting.MakeAdmission(managerCq.Name).PodSets(
kueue.PodSetAssignment{
Name: "launcher",
}, kueue.PodSetAssignment{
Name: "worker",
},
)
mpijob := testingmpijob.MakeMPIJob("mpijob1", managerNs.Name).
Queue(managerLq.Name).
ManagedBy(kueue.MultiKueueControllerName).
MPIJobReplicaSpecs(
testingmpijob.MPIJobReplicaSpecRequirement{
ReplicaType: kfmpi.MPIReplicaTypeLauncher,
ReplicaCount: 1,
RestartPolicy: corev1.RestartPolicyOnFailure,
},
testingmpijob.MPIJobReplicaSpecRequirement{
ReplicaType: kfmpi.MPIReplicaTypeWorker,
ReplicaCount: 1,
RestartPolicy: corev1.RestartPolicyNever,
},
).
Obj()
gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, mpijob)).Should(gomega.Succeed())
wlLookupKey := types.NamespacedName{Name: workloadmpijob.GetWorkloadNameForMPIJob(mpijob.Name, mpijob.UID), Namespace: managerNs.Name}

ginkgo.By("setting workload reservation in the management cluster", func() {
createdWorkload := &kueue.Workload{}
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed())
g.Expect(util.SetQuotaReservation(managerTestCluster.ctx, managerTestCluster.client, createdWorkload, admission.Obj())).To(gomega.Succeed())
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("checking the workload creation was rejected in the management cluster", func() {
managerWl := &kueue.Workload{}
gomega.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, managerWl)).To(gomega.Succeed())
acs := workload.FindAdmissionCheck(managerWl.Status.AdmissionChecks, multikueueAC.Name)
gomega.Expect(acs).To(gomega.BeComparableTo(&kueue.AdmissionCheckState{
Name: multikueueAC.Name,
State: kueue.CheckStateRejected,
Message: `No multikueue adapter found for owner kind "kubeflow.org/v2beta1, Kind=MPIJob"`,
}, cmpopts.IgnoreFields(kueue.AdmissionCheckState{}, "LastTransitionTime")))
})
})
})

func admitWorkloadAndCheckWorkerCopies(acName string, wlLookupKey types.NamespacedName, admission *utiltesting.AdmissionWrapper) {
ginkgo.By("setting workload reservation in the management cluster", func() {
createdWorkload := &kueue.Workload{}
Expand Down
7 changes: 6 additions & 1 deletion test/integration/multikueue/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
versionutil "k8s.io/apimachinery/pkg/util/version"
"k8s.io/client-go/discovery"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -207,16 +208,20 @@ func managerSetup(ctx context.Context, mgr manager.Manager) {
gomega.Expect(err).NotTo(gomega.HaveOccurred())
}

func managerAndMultiKueueSetup(ctx context.Context, mgr manager.Manager, gcInterval time.Duration) {
func managerAndMultiKueueSetup(ctx context.Context, mgr manager.Manager, gcInterval time.Duration, enabledIntegrations sets.Set[string]) {
managerSetup(ctx, mgr)

err := multikueue.SetupIndexer(ctx, mgr.GetFieldIndexer(), managersConfigNamespace.Name)
gomega.Expect(err).NotTo(gomega.HaveOccurred())

adapters, err := jobframework.GetMultiKueueAdapters(enabledIntegrations)
gomega.Expect(err).NotTo(gomega.HaveOccurred())

err = multikueue.SetupControllers(mgr, managersConfigNamespace.Name,
multikueue.WithGCInterval(gcInterval),
multikueue.WithWorkerLostTimeout(testingWorkerLostTimeout),
multikueue.WithEventsBatchPeriod(100*time.Millisecond),
multikueue.WithAdapters(adapters),
)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
}
Expand Down