diff --git a/config/components/rbac/role.yaml b/config/components/rbac/role.yaml index d708ec0799..7b94ff9d9e 100644 --- a/config/components/rbac/role.yaml +++ b/config/components/rbac/role.yaml @@ -290,6 +290,14 @@ rules: verbs: - get - update +- apiGroups: + - resource.k8s.io + resources: + - resourceclaimtemplates + verbs: + - get + - list + - watch - apiGroups: - scheduling.k8s.io resources: diff --git a/keps/2941-DRA-Structured-Parameters/README.md b/keps/2941-DRA-Structured-Parameters/README.md new file mode 100644 index 0000000000..953777fc07 --- /dev/null +++ b/keps/2941-DRA-Structured-Parameters/README.md @@ -0,0 +1,394 @@ +# KEP-2941: Structured Parameters + + + + + + +- [Summary](#summary) +- [Motivation](#motivation) + - [Background](#background) + - [DRA Example](#dra-example) + - [Workload Example](#workload-example) + - [Example Driver Cluster Resources](#example-driver-cluster-resources) + - [Resource slices](#resource-slices) + - [Device classes](#device-classes) + - [Goals](#goals) + - [Non-Goals](#non-goals) +- [Proposal](#proposal) + - [User Stories (Optional)](#user-stories-optional) + - [Story 1](#story-1) + - [Notes/Constraints/Caveats (Optional)](#notesconstraintscaveats-optional) + - [Risks and Mitigations](#risks-and-mitigations) +- [Design Details](#design-details) + - [Resource Quota API](#resource-quota-api) + - [Workloads](#workloads) + - [Test Plan](#test-plan) + - [Prerequisite testing updates](#prerequisite-testing-updates) + - [Unit Tests](#unit-tests) + - [Integration tests](#integration-tests) + - [E2E Test](#e2e-test) + - [Graduation Criteria](#graduation-criteria) + - [Feature Gate](#feature-gate) +- [Implementation History](#implementation-history) +- [Drawbacks](#drawbacks) +- [Alternatives](#alternatives) + - [Resource Claim By Count](#resource-claim-by-count) + + +## Summary + +Dynamic Resource Allocation (DRA) is a major effort to improve device support in Kubernetes. +It changes how one can request resources in a myriad of ways. + +## Motivation + +Dynamic Resource Allocation (DRA) provides the groundwork for more sophisticated device allocations to Pods. +Quota management is about enforcing rules around the use of resources. +For example, GPUs are resource constrained and a popular request is the ability to enforce fair sharing of GPU resources. +With these devices, many users want access and sometimes some users want the ability to preempt other users if their workloads have a higher priority. Kueue provides support for this. + +DRA provides a future where users could schedule partitionable GPU devices (MIG) or time slicing. As devices gain a more robust way to schedule, it is important to walk through how support of DRA will work with Kueue. + +### Background + +DRA has three APIs that are relevant for a Kueue: + +- Resource Claims +- DeviceClasses +- ResourceSlices + +#### DRA Example + +I found the easiest way to test DRA was to use [dra example driver repository](https://github.com/kubernetes-sigs/dra-example-driver) + +You can clone that repo and run `make setup-e2e` and that will create a Kind cluster with the DRA feature gate and install a mock dra driver. + +This does not use actual GPUs so it is perfect for a test environment for exploring Kueue and DRA integration. + +#### Workload Example + +An example workload that uses DRA: + +```yaml +--- + +apiVersion: resource.k8s.io/v1alpha3 +kind: ResourceClaimTemplate +metadata: + namespace: gpu-test1 + name: single-gpu +spec: + spec: + devices: + requests: + - name: gpu + deviceClassName: gpu.example.com + +--- + +apiVersion: batch/v1 +kind: Job +metadata: + namespace: gpu-test1 + name: job0 + labels: + app: job + kueue.x-k8s.io/queue-name: user-queue +spec: + template: + spec: + restartPolicy: Never + containers: + - name: ctr0 + image: ubuntu:22.04 + command: ["bash", "-c"] + args: ["export; sleep 9999"] + resources: + claims: + - name: gpu + requests: + cpu: 1 + memory: "200Mi" + resourceClaims: + - name: gpu + resourceClaimTemplateName: gpu.example.com +``` + +#### Example Driver Cluster Resources + +The dra-example-driver creates a resource slice and a device class for the entire cluster. + +##### Resource slices + +Resource slices are meant for communication between drivers and the control planes. These are not expected to be used for workloads. + +Kueue does not need to be aware of these resources. + +##### Device classes + +Each driver creates a device class and every resource claim will reference the device class. + +The dra-example-driver has a simple device class named `gpu.example.com`. + +This can be a way to enforce quota limits. + +### Goals + +- Users can submit workloads using resource claims and Kueue can monitor the usage. +- Admins can enforce the number of requests to a given device class. + + + +### Non-Goals + +- We are limiting scope for DRA to structured parameters (beta in 1.32) + + + +## Proposal + + + + +### User Stories (Optional) + + + +#### Story 1 + +As an user, I want to use resource claims to provide more control over the scheduling of devices. +I have a dra driver installed on my cluster and I am interested in using DRA for scheduling. + +I want to enforce quota usage for a ClusterQueue and forbid admitting workloads once they exceed the cluster queue limit. + + +### Notes/Constraints/Caveats (Optional) + + + +### Risks and Mitigations + + + +## Design Details + +### Resource Quota API + +```golang +type ResourceQuota struct { + // ... + // Kind is the type of resource that this resource is + // +kubebuilder:validation:Enum={Core,DeviceClass} + // +kubebuilder:default=Core + Kind ResourceKind `json:"kind"` +} +``` + +Kind allows one to distinguish between a Core resource and a Device class. + +With this, a cluster queue could be defined as follows: + +```yaml +apiVersion: kueue.x-k8s.io/v1beta1 +kind: ClusterQueue +metadata: + name: "cluster-queue" +spec: + namespaceSelector: {} # match all. + resourceGroups: + - coveredResources: ["cpu", "memory", "gpu.example.com"] + flavors: + - name: "default-flavor" + resources: + - name: "cpu" + nominalQuota: 9 + - name: "memory" + nominalQuota: "200Mi" + - name: "gpu.example.com" + nominalQuota: 2 + kind: "DeviceClass" +``` + +### Workloads + +When a user submits a workload and KueueDynamicResourceAllocation feature gate is on, Kueue will do the following: + +a. Claims will be read from resources.claims in the PodTemplateSpec. +b. The name of the claim will be used to look up the corresponding `ResourceClaimTemplateName` in the PodTemplateSpec. +c. The ResourceClaim will be read given the name in b and using the same namespace as the workload. +d. From the ResourceClaimTemplate, the deviceClassName will be read. +e. Every claim that requests the same deviceClassName will be tallied and reported in the ResourceUsage. + +```yaml +--- + +apiVersion: batch/v1 +kind: Job +metadata: + namespace: gpu-test1 + name: job0 + labels: + app: job + kueue.x-k8s.io/queue-name: user-queue +spec: + template: + spec: + restartPolicy: Never + containers: + - name: ctr0 + image: ubuntu:22.04 + command: ["bash", "-c"] + args: ["export; sleep 9999"] + resources: + claims: + - name: gpu. #a) read the claim from resources.claims + requests: + cpu: 1 + memory: "200Mi" + resourceClaims: + - name: gpu # b) use the name in resources.claim + resourceClaimTemplateName: single-gpu # c) the name for resource claim templates +--- +apiVersion: resource.k8s.io/v1alpha3 +kind: ResourceClaimTemplate +metadata: + namespace: gpu-test1 + name: single-gpu +spec: + spec: + devices: + requests: + - name: gpu + deviceClassName: gpu.example.com # d) the name of the device class + +``` + + +### Test Plan + +[x] I/we understand the owners of the involved components may require updates to +existing tests to make this code solid enough prior to committing the changes necessary +to implement this enhancement. + +##### Prerequisite testing updates + + + +#### Unit Tests + + + + + +TBD +- ``: `` - `` + +#### Integration tests + +I am not sure if we can test DRA functionality (requiring alpha features enabled) at the integration level. + +DRA requires a kubelet plugin so this may not be a good candidate for an integration test. + +#### E2E Test + +It may be worth creating install dra-example-driver and testing this e2e. + + + +### Graduation Criteria + +#### Feature Gate + +We will introduce a KueueDynamicResourceAllocation feature gate. + +This feature gate will go beta once DRA is beta. + +The goal will be limit changes only if this feature gate is enabled in combination with the DRA feature. + +## Implementation History + +- Draft on September 16th 2024. + +## Drawbacks + +NA. Kueue should be able to schedule devices following what upstream is proposing. +The only drawbacks are that workloads will have to fetch the resource claim if they are specifying resource claims. + +## Alternatives + +### Resource Claim By Count + +Originally I was thinking one could keep a tally of the resource claims for a given workload. +The issue with this is that resource claims are namespaced scoped. +To enforce quota usage across namespaces we need to use cluster scoped resources. \ No newline at end of file diff --git a/keps/2941-DRA-Structured-Parameters/examples/gpu-test1/gpu-test1.yaml b/keps/2941-DRA-Structured-Parameters/examples/gpu-test1/gpu-test1.yaml new file mode 100644 index 0000000000..6e40ebe8c7 --- /dev/null +++ b/keps/2941-DRA-Structured-Parameters/examples/gpu-test1/gpu-test1.yaml @@ -0,0 +1,48 @@ +# Two pods, one container each +# Each container asking for 1 distinct GPU + +--- + +apiVersion: resource.k8s.io/v1alpha3 +kind: ResourceClaimTemplate +metadata: + namespace: gpu-test1 + name: single-gpu +spec: + spec: + devices: + requests: + - name: gpu + deviceClassName: gpu.example.com + +--- + +apiVersion: batch/v1 +kind: Job +metadata: + namespace: gpu-test1 + name: job0 + labels: + app: job + kueue.x-k8s.io/queue-name: user-queue +spec: + parallelism: 2 + completions: 2 + template: + spec: + restartPolicy: Never + containers: + - name: ctr0 + image: ubuntu:22.04 + command: ["bash", "-c"] + args: ["export; sleep 9999"] + resources: + claims: + - name: gpu + requests: + cpu: 1 + memory: "100Mi" + resourceClaims: + - name: gpu + resourceClaimTemplateName: single-gpu + diff --git a/keps/2941-DRA-Structured-Parameters/examples/gpu-test1/single-clusterqueue-setup.yaml b/keps/2941-DRA-Structured-Parameters/examples/gpu-test1/single-clusterqueue-setup.yaml new file mode 100644 index 0000000000..7ea9df25a4 --- /dev/null +++ b/keps/2941-DRA-Structured-Parameters/examples/gpu-test1/single-clusterqueue-setup.yaml @@ -0,0 +1,32 @@ + +apiVersion: kueue.x-k8s.io/v1beta1 +kind: ResourceFlavor +metadata: + name: "default-flavor" +--- +apiVersion: kueue.x-k8s.io/v1beta1 +kind: ClusterQueue +metadata: + name: "cluster-queue" +spec: + namespaceSelector: {} # match all. + resourceGroups: + - coveredResources: ["cpu", "memory", "gpu.example.com"] + flavors: + - name: "default-flavor" + resources: + - name: "cpu" + nominalQuota: 9 + - name: "memory" + nominalQuota: "200Mi" + - name: "gpu.example.com" + nominalQuota: 2 +--- +apiVersion: kueue.x-k8s.io/v1beta1 +kind: LocalQueue +metadata: + namespace: "gpu-test1" + name: "user-queue" +spec: + clusterQueue: "cluster-queue" + diff --git a/keps/2941-DRA-Structured-Parameters/examples/gpu-test2/gpu-test2.yaml b/keps/2941-DRA-Structured-Parameters/examples/gpu-test2/gpu-test2.yaml new file mode 100644 index 0000000000..35f334a326 --- /dev/null +++ b/keps/2941-DRA-Structured-Parameters/examples/gpu-test2/gpu-test2.yaml @@ -0,0 +1,42 @@ +# One pod, one container +# Asking for 2 distinct GPUs + +--- +apiVersion: resource.k8s.io/v1alpha3 +kind: ResourceClaimTemplate +metadata: + namespace: gpu-test2 + name: multiple-gpus +spec: + spec: + devices: + requests: + - name: gpus + deviceClassName: gpu.example.com + allocationMode: ExactCount + count: 2 + +--- +apiVersion: batch/v1 +kind: Job +metadata: + namespace: gpu-test2 + name: job0 + labels: + app: job + kueue.x-k8s.io/queue-name: user-queue-gpu-test2 +spec: + template: + spec: + restartPolicy: Never + containers: + - name: ctr0 + image: ubuntu:22.04 + command: ["bash", "-c"] + args: ["export; sleep 9999"] + resources: + claims: + - name: gpus + resourceClaims: + - name: gpus + resourceClaimTemplateName: multiple-gpus diff --git a/keps/2941-DRA-Structured-Parameters/examples/gpu-test2/single-clusterqueue-setup.yaml b/keps/2941-DRA-Structured-Parameters/examples/gpu-test2/single-clusterqueue-setup.yaml new file mode 100644 index 0000000000..e6f3f59ef5 --- /dev/null +++ b/keps/2941-DRA-Structured-Parameters/examples/gpu-test2/single-clusterqueue-setup.yaml @@ -0,0 +1,9 @@ + +apiVersion: kueue.x-k8s.io/v1beta1 +kind: LocalQueue +metadata: + namespace: "gpu-test2" + name: "user-queue-gpu-test2" +spec: + clusterQueue: "cluster-queue" + diff --git a/keps/2941-DRA-Structured-Parameters/examples/gpu-test3/gpu-test3.yaml b/keps/2941-DRA-Structured-Parameters/examples/gpu-test3/gpu-test3.yaml new file mode 100644 index 0000000000..4a47a71657 --- /dev/null +++ b/keps/2941-DRA-Structured-Parameters/examples/gpu-test3/gpu-test3.yaml @@ -0,0 +1,56 @@ +# One pod, two containers +# Each asking for shared access to a single GPU + +--- +apiVersion: v1 +kind: Namespace +metadata: + name: gpu-test3 + +--- +apiVersion: resource.k8s.io/v1alpha3 +kind: ResourceClaimTemplate +metadata: + namespace: gpu-test3 + name: single-gpu +spec: + spec: + devices: + requests: + - name: gpu + deviceClassName: gpu.example.com + +--- + +apiVersion: batch/v1 +kind: Job +metadata: + namespace: gpu-test3 + name: job0 + labels: + app: job + kueue.x-k8s.io/queue-name: "user-queue-gpu-test3" +spec: + parallelism: 1 + completions: 1 + template: + spec: + restartPolicy: Never + containers: + - name: ctr0 + image: ubuntu:22.04 + command: ["bash", "-c"] + args: ["export; sleep 9999"] + resources: + claims: + - name: shared-gpu + - name: ctr1 + image: ubuntu:22.04 + command: ["bash", "-c"] + args: ["export; sleep 9999"] + resources: + claims: + - name: shared-gpu + resourceClaims: + - name: shared-gpu + resourceClaimTemplateName: single-gpu diff --git a/keps/2941-DRA-Structured-Parameters/examples/gpu-test3/single-clusterqueue-setup.yaml b/keps/2941-DRA-Structured-Parameters/examples/gpu-test3/single-clusterqueue-setup.yaml new file mode 100644 index 0000000000..1c26666caf --- /dev/null +++ b/keps/2941-DRA-Structured-Parameters/examples/gpu-test3/single-clusterqueue-setup.yaml @@ -0,0 +1,9 @@ + +apiVersion: kueue.x-k8s.io/v1beta1 +kind: LocalQueue +metadata: + namespace: "gpu-test3" + name: "user-queue-gpu-test3" +spec: + clusterQueue: "cluster-queue" + diff --git a/keps/2941-DRA-Structured-Parameters/kep.yaml b/keps/2941-DRA-Structured-Parameters/kep.yaml new file mode 100644 index 0000000000..2a2b40d737 --- /dev/null +++ b/keps/2941-DRA-Structured-Parameters/kep.yaml @@ -0,0 +1,42 @@ +title: DRA (Structured Parameters) Support in Kueue +kep-number: 2941 +authors: + - "@kannon92" +status: provisional +creation-date: 2024-08-30 +reviewers: + - tbd + - "tbd" +approvers: + - TBD + - "tbd" + +see-also: + - "dra kep in upstream" + - "/keps/2345-everyone-gets-a-kep" +replaces: + - "/keps/3456-replaced-kep" + +# The target maturity stage in the current dev cycle for this KEP. +stage: alpha + +# The most recent milestone for which work toward delivery of this KEP has been +# done. This can be the current (upcoming) milestone, if it is being actively +# worked on. +latest-milestone: "v0.9" + +# The milestone at which this feature was, or is targeted to be, at each stage. +milestone: + alpha: "v0.9" + beta: "v0.10" + stable: "v0.11" + +# The following PRR answers are required at alpha release +# List the feature gate name and the components for which it must be enabled +feature-gates: + - name: KueueDynamicResourceAllocation +disable-supported: true + +# The following PRR answers are required at beta release +metrics: + - my_feature_metric diff --git a/pkg/controller/core/clusterqueue_controller.go b/pkg/controller/core/clusterqueue_controller.go index 719f4caced..785cca556f 100644 --- a/pkg/controller/core/clusterqueue_controller.go +++ b/pkg/controller/core/clusterqueue_controller.go @@ -152,6 +152,7 @@ func NewClusterQueueReconciler( // +kubebuilder:rbac:groups="",resources=namespaces,verbs=get;list;watch // +kubebuilder:rbac:groups="",resources=events,verbs=create;watch;update;patch +// +kubebuilder:rbac:groups=resource.k8s.io,resources=resourceclaimtemplates,verbs=get;list;watch // +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=clusterqueues,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=clusterqueues/status,verbs=get;update;patch // +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=clusterqueues/finalizers,verbs=update diff --git a/pkg/controller/core/workload_controller.go b/pkg/controller/core/workload_controller.go index 642312555d..23db631a62 100644 --- a/pkg/controller/core/workload_controller.go +++ b/pkg/controller/core/workload_controller.go @@ -569,6 +569,7 @@ func (r *WorkloadReconciler) Create(e event.CreateEvent) bool { ctx := ctrl.LoggerInto(context.Background(), log) wlCopy := wl.DeepCopy() workload.AdjustResources(ctx, r.client, wlCopy) + workload.AddDeviceClassesToContainerRequests(ctx, r.client, wlCopy) if !workload.HasQuotaReservation(wl) { if !r.queues.AddOrUpdateWorkload(wlCopy) { @@ -655,6 +656,7 @@ func (r *WorkloadReconciler) Update(e event.UpdateEvent) bool { wlCopy := wl.DeepCopy() // We do not handle old workload here as it will be deleted or replaced by new one anyway. workload.AdjustResources(ctrl.LoggerInto(ctx, log), r.client, wlCopy) + workload.AddDeviceClassesToContainerRequests(ctx, r.client, wlCopy) switch { case status == workload.StatusFinished || !active: @@ -858,6 +860,8 @@ func (h *resourceUpdatesHandler) queueReconcileForPending(ctx context.Context, _ log := log.WithValues("workload", klog.KObj(wlCopy)) log.V(5).Info("Queue reconcile for") workload.AdjustResources(ctrl.LoggerInto(ctx, log), h.r.client, wlCopy) + workload.AddDeviceClassesToContainerRequests(ctx, h.r.client, wlCopy) + if !h.r.queues.AddOrUpdateWorkload(wlCopy) { log.V(2).Info("Queue for workload didn't exist") } diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 1fea5f250a..6c57ffd864 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -101,6 +101,13 @@ const ( // Enable more than one workload sharing flavors to preempt within a Cohort, // as long as the preemption targets don't overlap. MultiplePreemptions featuregate.Feature = "MultiplePreemptions" + + // owner: @kannon92 + // kep: 2941 + // this should follow graduation process for strucuted parameters + // alpha: v0.9 + // Enable the usage of DRA for Kueue + DynamicResourceStructuredParameters featuregate.Feature = "DynamicResourceStructuredParameters" ) func init() { @@ -124,6 +131,7 @@ var defaultFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{ LendingLimit: {Default: true, PreRelease: featuregate.Beta}, MultiKueueBatchJobWithManagedBy: {Default: false, PreRelease: featuregate.Alpha}, MultiplePreemptions: {Default: true, PreRelease: featuregate.Beta}, + DynamicResourceStructuredParameters: {Default: false, PreRelease: featuregate.Alpha}, } func SetFeatureGateDuringTest(tb testing.TB, f featuregate.Feature, value bool) { diff --git a/pkg/queue/manager.go b/pkg/queue/manager.go index a36ee73f52..5a43b3efde 100644 --- a/pkg/queue/manager.go +++ b/pkg/queue/manager.go @@ -220,6 +220,7 @@ func (m *Manager) AddLocalQueue(ctx context.Context, q *kueue.LocalQueue) error continue } workload.AdjustResources(ctx, m.client, &w) + workload.AddDeviceClassesToContainerRequests(ctx, m.client, &w) qImpl.AddOrUpdate(workload.NewInfo(&w, m.workloadInfoOptions...)) } cq := m.hm.ClusterQueues[qImpl.ClusterQueue] diff --git a/pkg/util/limitrange/limitrange.go b/pkg/util/limitrange/limitrange.go index 98fbde5faf..3bd3bd7b82 100644 --- a/pkg/util/limitrange/limitrange.go +++ b/pkg/util/limitrange/limitrange.go @@ -22,6 +22,9 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/utils/field" + "k8s.io/utils/ptr" + + k8sresource "k8s.io/apimachinery/pkg/api/resource" "sigs.k8s.io/kueue/pkg/util/resource" ) @@ -133,6 +136,60 @@ func calculateSidecarContainersResources(initContainers []corev1.Container) core return total } +// TotalResourceClaimsFromPodSpec will calculate the number of requests +// for ResourceClaimTemplates from a single pod spec. +// We will increment all requests for PodResourceClaims. +func TotalResourceClaimsFromPodSpec(ps *corev1.PodSpec) corev1.ResourceList { + return calculatePodClaims(ps) +} + +func calculatePodClaims(ps *corev1.PodSpec) corev1.ResourceList { + totalClaims := make(map[string]int64) + totalResourceClaimTemplate := corev1.ResourceList{} + containers := ps.Containers + initContainers := ps.InitContainers + // We want to track the number of claims for the pod. + for i := range ps.Containers { + for _, val := range containers[i].Resources.Claims { + totalClaims[val.Name]++ + } + } + for i := range initContainers { + for _, val := range initContainers[i].Resources.Claims { + totalClaims[val.Name]++ + } + } + for i := range initContainers { + if isSidecarContainer(initContainers[i]) { + for _, val := range initContainers[i].Resources.Claims { + totalClaims[val.Name]++ + } + } + } + for _, val := range ps.ResourceClaims { + _, ok := totalClaims[val.Name] + if ok { + keyName := "" + switch { + case ptr.Deref(val.ResourceClaimName, "") != "": + keyName = *val.ResourceClaimName + case ptr.Deref(val.ResourceClaimTemplateName, "") != "": + keyName = *val.ResourceClaimTemplateName + default: + return totalResourceClaimTemplate + } + countOfClaims, ok := totalResourceClaimTemplate[corev1.ResourceName(keyName)] + if ok { + count := countOfClaims.Value() + totalClaims[val.Name] + totalResourceClaimTemplate[corev1.ResourceName(keyName)] = *k8sresource.NewQuantity(count, k8sresource.DecimalSI) + } else { + totalResourceClaimTemplate[corev1.ResourceName(keyName)] = *k8sresource.NewQuantity(totalClaims[val.Name], k8sresource.DecimalSI) + } + } + } + return totalResourceClaimTemplate +} + func isSidecarContainer(container corev1.Container) bool { return container.RestartPolicy != nil && *container.RestartPolicy == corev1.ContainerRestartPolicyAlways } diff --git a/pkg/util/limitrange/limitrange_test.go b/pkg/util/limitrange/limitrange_test.go index 536f5b0f68..dfe2d703d9 100644 --- a/pkg/util/limitrange/limitrange_test.go +++ b/pkg/util/limitrange/limitrange_test.go @@ -23,6 +23,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/utils/field" + "k8s.io/utils/ptr" testingutil "sigs.k8s.io/kueue/pkg/util/testing" ) @@ -501,3 +502,45 @@ func TestValidatePodSpec(t *testing.T) { }) } } + +func TestTotalResourceClaimsFromPodSpec(t *testing.T) { + cases := map[string]struct { + podSpec *corev1.PodSpec + want corev1.ResourceList + }{ + "pod without init containers. resource claims shared": { + podSpec: &corev1.PodSpec{ + Containers: []corev1.Container{ + *testingutil.MakeContainer(). + WithClaimReq([]corev1.ResourceClaim{{Name: "test"}}). + Obj(), + *testingutil.MakeContainer(). + WithClaimReq([]corev1.ResourceClaim{{Name: "test1"}}). + Obj(), + }, + ResourceClaims: []corev1.PodResourceClaim{ + { + Name: "test", + ResourceClaimTemplateName: ptr.To("single-gpu"), + }, + { + Name: "test1", + ResourceClaimTemplateName: ptr.To("single-gpu"), + }, + }, + }, + want: corev1.ResourceList{ + "single-gpu": resource.MustParse("2"), + }, + }, + } + + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + result := TotalResourceClaimsFromPodSpec(tc.podSpec) + if diff := cmp.Diff(tc.want, result); diff != "" { + t.Errorf("Unexpected result (-want,+got):\n%s", diff) + } + }) + } +} diff --git a/pkg/util/testing/wrappers.go b/pkg/util/testing/wrappers.go index f7c3555739..b82895cb75 100644 --- a/pkg/util/testing/wrappers.go +++ b/pkg/util/testing/wrappers.go @@ -401,6 +401,22 @@ func (p *PodSetWrapper) Limit(r corev1.ResourceName, q string) *PodSetWrapper { return p } +func (p *PodSetWrapper) Claim(claim corev1.ResourceClaim) *PodSetWrapper { + if p.Template.Spec.Containers[0].Resources.Claims == nil { + p.Template.Spec.Containers[0].Resources.Claims = []corev1.ResourceClaim{} + } + p.Template.Spec.Containers[0].Resources.Claims = append(p.Template.Spec.Containers[0].Resources.Claims, claim) + return p +} + +func (p *PodSetWrapper) ResourceClaim(resourceClaim corev1.PodResourceClaim) *PodSetWrapper { + if p.Template.Spec.ResourceClaims == nil { + p.Template.Spec.ResourceClaims = []corev1.PodResourceClaim{} + } + p.Template.Spec.ResourceClaims = append(p.Template.Spec.ResourceClaims, resourceClaim) + return p +} + func (p *PodSetWrapper) Image(image string) *PodSetWrapper { p.Template.Spec.Containers[0].Image = image return p @@ -1203,6 +1219,12 @@ func (c *ContainerWrapper) WithResourceReq(resourceName corev1.ResourceName, qua return c } +// WithResourceClaim appends a claim request to the container +func (c *ContainerWrapper) WithClaimReq(claims []corev1.ResourceClaim) *ContainerWrapper { + c.Container.Resources.Claims = claims + return c +} + // AsSidecar makes the container a sidecar when used as an Init Container. func (c *ContainerWrapper) AsSidecar() *ContainerWrapper { c.Container.RestartPolicy = ptr.To(corev1.ContainerRestartPolicyAlways) diff --git a/pkg/workload/resources.go b/pkg/workload/resources.go index b8cf31c182..5854a5e4ce 100644 --- a/pkg/workload/resources.go +++ b/pkg/workload/resources.go @@ -22,12 +22,16 @@ import ( corev1 "k8s.io/api/core/v1" nodev1 "k8s.io/api/node/v1" + dra "k8s.io/api/resource/v1alpha3" + k8sresource "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" "sigs.k8s.io/kueue/pkg/controller/core/indexer" + "sigs.k8s.io/kueue/pkg/features" + "sigs.k8s.io/kueue/pkg/resources" "sigs.k8s.io/kueue/pkg/util/limitrange" "sigs.k8s.io/kueue/pkg/util/resource" ) @@ -119,3 +123,79 @@ func AdjustResources(ctx context.Context, cl client.Client, wl *kueue.Workload) } handleLimitsToRequests(wl) } + +// GetResourceClaimTemplates will retrieve the ResourceClaimTemplate from the api server. +func GetResourceClaimTemplates(ctx context.Context, c client.Client, name, namespace string) (dra.ResourceClaimTemplate, error) { + resourceClaimTemplate := dra.ResourceClaimTemplate{} + err := c.Get(ctx, types.NamespacedName{Name: name, Namespace: namespace}, &resourceClaimTemplate, &client.GetOptions{}) + return resourceClaimTemplate, err +} + +func AddDeviceClassesToContainerRequests(ctx context.Context, cl client.Client, wl *kueue.Workload) { + // If DRA is not enabled then this becomes a no op and workloads won't be modified. + // There is a potential issue + // If Kueue has this feature enabled but the k8s cluster does not have this enabled, + // then Kueue may not be able to actually find these resources + if !features.Enabled(features.DynamicResourceStructuredParameters) { + return + } + + log := ctrl.LoggerFrom(ctx) + resourceList, errors := handleResourceClaimTemplate(ctx, cl, AddResourceClaimsToResourceList(wl), wl.Namespace) + for key, val := range resourceList { + log.Info("ResourceList", "key", key, "val", val) + } + for _, err := range errors { + log.Error(err, "Failures adjusting requests for dynamic resources") + } + for pi := range wl.Spec.PodSets { + resourceClaimsToContainerRequests(&wl.Spec.PodSets[pi].Template.Spec, resourceList) + } +} + +func resourceClaimsToContainerRequests(podSpec *corev1.PodSpec, resourceList corev1.ResourceList) { + for i := range podSpec.InitContainers { + res := &podSpec.InitContainers[i].Resources + res.Requests = resource.MergeResourceListKeepFirst(res.Requests, resourceList) + } + for i := range podSpec.Containers { + res := &podSpec.Containers[i].Resources + res.Requests = resource.MergeResourceListKeepFirst(res.Requests, resourceList) + } +} +func handleResourceClaimTemplate(ctx context.Context, cl client.Client, psr []PodSetResources, namespace string) (corev1.ResourceList, []error) { + var errors []error + updateResourceList := corev1.ResourceList{} + for _, singlePsr := range psr { + for key, request := range singlePsr.Requests { + draDeviceClass, err := GetResourceClaimTemplates(ctx, cl, key.String(), namespace) + if err != nil { + errors = append(errors, fmt.Errorf("unable to get %s/%s resource claim %v", namespace, key, err)) + } + for _, val := range draDeviceClass.Spec.Spec.Devices.Requests { + updateResourceList[corev1.ResourceName(val.DeviceClassName)] = *k8sresource.NewQuantity(request, k8sresource.DecimalSI) + } + } + } + return updateResourceList, errors +} + +func AddResourceClaimsToResourceList(wl *kueue.Workload) []PodSetResources { + if len(wl.Spec.PodSets) == 0 { + return nil + } + res := make([]PodSetResources, 0, len(wl.Spec.PodSets)) + podSets := &wl.Spec.PodSets + currentCounts := podSetsCountsAfterReclaim(wl) + for _, ps := range *podSets { + count := currentCounts[ps.Name] + setRes := PodSetResources{ + Name: ps.Name, + Count: count, + } + setRes.Requests = resources.NewRequests(limitrange.TotalResourceClaimsFromPodSpec(&ps.Template.Spec)) + scaleUp(setRes.Requests, int64(count)) + res = append(res, setRes) + } + return res +} diff --git a/pkg/workload/resources_test.go b/pkg/workload/resources_test.go index 0b5623401e..fc22ea21dd 100644 --- a/pkg/workload/resources_test.go +++ b/pkg/workload/resources_test.go @@ -19,10 +19,14 @@ import ( "github.com/google/go-cmp/cmp" corev1 "k8s.io/api/core/v1" nodev1 "k8s.io/api/node/v1" + dra "k8s.io/api/resource/v1alpha3" "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/ptr" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" "sigs.k8s.io/kueue/pkg/controller/core/indexer" + "sigs.k8s.io/kueue/pkg/features" "sigs.k8s.io/kueue/pkg/resources" utiltesting "sigs.k8s.io/kueue/pkg/util/testing" ) @@ -481,3 +485,130 @@ func TestAdjustResources(t *testing.T) { }) } } + +func TestAddDeviceClassesToContainerRequests(t *testing.T) { + cases := map[string]struct { + wl *kueue.Workload + enableDRAGate bool + resourceClaimTemplate []dra.ResourceClaimTemplate + wantWl *kueue.Workload + }{ + "dra feature gate off; ignore devices": { + enableDRAGate: false, + resourceClaimTemplate: []dra.ResourceClaimTemplate{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "single-gpu", + Namespace: "", + }, + Spec: dra.ResourceClaimTemplateSpec{ + Spec: dra.ResourceClaimSpec{ + Devices: dra.DeviceClaim{ + Requests: []dra.DeviceRequest{{ + Name: "single-gpu", + DeviceClassName: "gpu.example.com", + }}, + }, + }, + }, + }, + }, + wl: utiltesting.MakeWorkload("foo", ""). + PodSets( + *utiltesting.MakePodSet("a", 1). + Limit(corev1.ResourceCPU, "2"). + Request(corev1.ResourceCPU, "1"). + Claim(corev1.ResourceClaim{ + Name: "gpu", + }). + ResourceClaim(corev1.PodResourceClaim{ + Name: "gpu", + ResourceClaimTemplateName: ptr.To("single-gpu"), + }). + Obj(), + ). + Obj(), + wantWl: utiltesting.MakeWorkload("foo", ""). + PodSets( + *utiltesting.MakePodSet("a", 1). + Limit(corev1.ResourceCPU, "2"). + Request(corev1.ResourceCPU, "1"). + Claim(corev1.ResourceClaim{ + Name: "gpu", + }). + ResourceClaim(corev1.PodResourceClaim{ + Name: "gpu", + ResourceClaimTemplateName: ptr.To("single-gpu"), + }). + Obj(), + ). + Obj(), + }, + "single device class request in a container": { + enableDRAGate: true, + resourceClaimTemplate: []dra.ResourceClaimTemplate{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "single-gpu", + Namespace: "", + }, + Spec: dra.ResourceClaimTemplateSpec{ + Spec: dra.ResourceClaimSpec{ + Devices: dra.DeviceClaim{ + Requests: []dra.DeviceRequest{{ + Name: "single-gpu", + DeviceClassName: "gpu.example.com", + }}, + }, + }, + }, + }, + }, + wl: utiltesting.MakeWorkload("foo", ""). + PodSets( + *utiltesting.MakePodSet("a", 1). + Limit(corev1.ResourceCPU, "2"). + Request(corev1.ResourceCPU, "1"). + Claim(corev1.ResourceClaim{ + Name: "gpu", + }). + ResourceClaim(corev1.PodResourceClaim{ + Name: "gpu", + ResourceClaimTemplateName: ptr.To("single-gpu"), + }). + Obj(), + ). + Obj(), + wantWl: utiltesting.MakeWorkload("foo", ""). + PodSets( + *utiltesting.MakePodSet("a", 1). + Limit(corev1.ResourceCPU, "2"). + Request(corev1.ResourceCPU, "1"). + Request("gpu.example.com", "1"). + Claim(corev1.ResourceClaim{ + Name: "gpu", + }). + ResourceClaim(corev1.PodResourceClaim{ + Name: "gpu", + ResourceClaimTemplateName: ptr.To("single-gpu"), + }). + Obj(), + ). + Obj(), + }, + } + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + cl := utiltesting.NewClientBuilder().WithLists( + &dra.ResourceClaimTemplateList{Items: tc.resourceClaimTemplate}, + ). + Build() + ctx, _ := utiltesting.ContextWithLog(t) + features.SetFeatureGateDuringTest(t, features.DynamicResourceStructuredParameters, tc.enableDRAGate) + AddDeviceClassesToContainerRequests(ctx, cl, tc.wl) + if diff := cmp.Diff(tc.wl, tc.wantWl); diff != "" { + t.Errorf("Unexpected resources after adjusting (-want,+got): %s", diff) + } + }) + } +} diff --git a/test/e2e/config/common/manager_e2e_patch.yaml b/test/e2e/config/common/manager_e2e_patch.yaml index 2a5c3b0486..fbb0ffbe0b 100644 --- a/test/e2e/config/common/manager_e2e_patch.yaml +++ b/test/e2e/config/common/manager_e2e_patch.yaml @@ -3,4 +3,4 @@ value: IfNotPresent - op: add path: /spec/template/spec/containers/0/args/- - value: --feature-gates=MultiKueue=true,MultiKueueBatchJobWithManagedBy=true + value: --feature-gates=VisibilityOnDemand=true,MultiKueue=true,MultiKueueBatchJobWithManagedBy=true,DynamicResourceStructuredParameters=true