@@ -17,23 +17,37 @@ limitations under the License.
1717package cert
1818
1919import (
20+ "bytes"
21+ "context"
22+ "errors"
2023 "fmt"
24+ "sort"
2125 "strings"
26+ "time"
2227
2328 "github.com/go-logr/logr"
2429 cert "github.com/open-policy-agent/cert-controller/pkg/rotator"
30+ apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
31+ apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
32+ apierrors "k8s.io/apimachinery/pkg/api/errors"
33+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2534 "k8s.io/apimachinery/pkg/types"
35+ "k8s.io/client-go/kubernetes"
36+ "k8s.io/client-go/util/retry"
2637 ctrl "sigs.k8s.io/controller-runtime"
2738
2839 config "sigs.k8s.io/kueue/apis/config/v1beta2"
2940)
3041
3142const (
32- caName = "kueue-ca"
33- caOrganization = "kueue"
34- webhookServiceSuffix = "-webhook-service"
43+ caName = "kueue-ca"
44+ caOrganization = "kueue"
45+ webhookServiceSuffix = "-webhook-service"
46+ conversionGroup = "kueue.x-k8s.io"
47+ conversionQueryTimeout = 10 * time .Second
3548)
3649
50+ // +kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch
3751// +kubebuilder:rbac:groups="admissionregistration.k8s.io",resources=mutatingwebhookconfigurations,verbs=get;list;watch;update
3852// +kubebuilder:rbac:groups="admissionregistration.k8s.io",resources=validatingwebhookconfigurations,verbs=get;list;watch;update
3953// +kubebuilder:rbac:groups="apiextensions.k8s.io",resources=customresourcedefinitions,verbs=get;list;watch;update
@@ -48,6 +62,28 @@ func ManageCerts(mgr ctrl.Manager, cfg config.Configuration, setupFinished chan
4862 mutatingWebhookName := buildWebhookConfigurationName (webhookBaseName , "mutating" )
4963 validatingWebhookName := buildWebhookConfigurationName (webhookBaseName , "validating" )
5064
65+ webhooks := []cert.WebhookInfo {{
66+ Type : cert .Validating ,
67+ Name : validatingWebhookName ,
68+ }, {
69+ Type : cert .Mutating ,
70+ Name : mutatingWebhookName ,
71+ }}
72+
73+ ctx , cancel := context .WithTimeout (context .Background (), conversionQueryTimeout )
74+ defer cancel ()
75+ conversionCRDs := discoverConversionCRDs (ctx , mgr )
76+ conversionReady := ensureConversionCRDCABundles (mgr , cfg , conversionCRDs )
77+
78+ internalReady := make (chan struct {})
79+ go func () {
80+ <- internalReady
81+ if conversionReady != nil {
82+ <- conversionReady
83+ }
84+ close (setupFinished )
85+ }()
86+
5187 return cert .AddRotator (mgr , & cert.CertRotator {
5288 SecretKey : types.NamespacedName {
5389 Namespace : * cfg .Namespace ,
@@ -57,23 +93,8 @@ func ManageCerts(mgr ctrl.Manager, cfg config.Configuration, setupFinished chan
5793 CAName : caName ,
5894 CAOrganization : caOrganization ,
5995 DNSName : dnsName ,
60- IsReady : setupFinished ,
61- Webhooks : []cert.WebhookInfo {{
62- Type : cert .Validating ,
63- Name : validatingWebhookName ,
64- }, {
65- Type : cert .Mutating ,
66- Name : mutatingWebhookName ,
67- }, {
68- Type : cert .CRDConversion ,
69- Name : "localqueues.kueue.x-k8s.io" ,
70- }, {
71- Type : cert .CRDConversion ,
72- Name : "clusterqueues.kueue.x-k8s.io" ,
73- }, {
74- Type : cert .CRDConversion ,
75- Name : "workloads.kueue.x-k8s.io" ,
76- }},
96+ IsReady : internalReady ,
97+ Webhooks : webhooks ,
7798 // When kueue is running in the leader election mode,
7899 // we expect webhook server will run in primary and secondary instance
79100 RequireLeaderElection : false ,
@@ -96,3 +117,119 @@ func deriveWebhookBaseName(webhookServiceName string) string {
96117func buildWebhookConfigurationName (baseName , webhookType string ) string {
97118 return fmt .Sprintf ("%s-%s-webhook-configuration" , baseName , webhookType )
98119}
120+
121+ func discoverConversionCRDs (ctx context.Context , mgr ctrl.Manager ) []string {
122+ log := ctrl .Log .WithName ("cert" )
123+ clientset , err := apiextensionsclient .NewForConfig (mgr .GetConfig ())
124+ if err != nil {
125+ log .Error (err , "failed to create clientset" )
126+ return nil
127+ }
128+
129+ crds , err := clientset .ApiextensionsV1 ().CustomResourceDefinitions ().List (ctx , metav1.ListOptions {})
130+ if err != nil {
131+ log .Error (err , "failed to list CRDs" )
132+ return nil
133+ }
134+
135+ var names []string
136+ for _ , crd := range crds .Items {
137+ if crd .Spec .Group == conversionGroup &&
138+ crd .Spec .Conversion != nil &&
139+ crd .Spec .Conversion .Strategy == apiextensionsv1 .WebhookConverter &&
140+ crd .Spec .Conversion .Webhook != nil &&
141+ crd .Spec .Conversion .Webhook .ClientConfig != nil {
142+ names = append (names , crd .Name )
143+ }
144+ }
145+ sort .Strings (names )
146+ log .Info ("Discovered conversion CRDs" , "count" , len (names ), "names" , names )
147+ return names
148+ }
149+
150+ func ensureConversionCRDCABundles (mgr ctrl.Manager , cfg config.Configuration , crdNames []string ) <- chan struct {} {
151+ if len (crdNames ) == 0 {
152+ return nil
153+ }
154+
155+ done := make (chan struct {})
156+ go func () {
157+ defer close (done )
158+ log := ctrl .Log .WithName ("cert" )
159+
160+ caData , err := waitForCAData (mgr , cfg , log )
161+ if err != nil {
162+ return
163+ }
164+
165+ if err := injectCABundlesIntoCRDs (mgr , crdNames , caData , log ); err != nil {
166+ log .Error (err , "failed to inject CA bundles" )
167+ }
168+ }()
169+ return done
170+ }
171+
172+ func waitForCAData (mgr ctrl.Manager , cfg config.Configuration , log logr.Logger ) ([]byte , error ) {
173+ if cfg .InternalCertManagement == nil || cfg .InternalCertManagement .WebhookSecretName == nil || cfg .Namespace == nil {
174+ return nil , errors .New ("cert management disabled" )
175+ }
176+
177+ coreClient , err := kubernetes .NewForConfig (mgr .GetConfig ())
178+ if err != nil {
179+ log .Error (err , "failed to create clientset" )
180+ return nil , err
181+ }
182+
183+ secretKey := types.NamespacedName {
184+ Namespace : * cfg .Namespace ,
185+ Name : * cfg .InternalCertManagement .WebhookSecretName ,
186+ }
187+
188+ for range [30 ]int {} {
189+ secret , err := coreClient .CoreV1 ().Secrets (secretKey .Namespace ).Get (context .Background (), secretKey .Name , metav1.GetOptions {})
190+ if err == nil {
191+ if data , ok := secret .Data ["ca.crt" ]; ok && len (data ) > 0 {
192+ return append ([]byte (nil ), data ... ), nil
193+ }
194+ } else if ! apierrors .IsNotFound (err ) {
195+ log .Error (err , "failed to fetch secret" )
196+ return nil , err
197+ }
198+ time .Sleep (2 * time .Second )
199+ }
200+ err = errors .New ("timeout waiting for CA certificate" )
201+ log .Error (err , "CA bundle injection failed" )
202+ return nil , err
203+ }
204+
205+ func injectCABundlesIntoCRDs (mgr ctrl.Manager , crdNames []string , caData []byte , log logr.Logger ) error {
206+ clientset , err := apiextensionsclient .NewForConfig (mgr .GetConfig ())
207+ if err != nil {
208+ log .Error (err , "failed to create clientset" )
209+ return err
210+ }
211+
212+ for _ , name := range crdNames {
213+ err := retry .RetryOnConflict (retry .DefaultBackoff , func () error {
214+ crd , err := clientset .ApiextensionsV1 ().CustomResourceDefinitions ().Get (context .Background (), name , metav1.GetOptions {})
215+ if err != nil {
216+ return err
217+ }
218+ if crd .Spec .Conversion == nil || crd .Spec .Conversion .Webhook == nil || crd .Spec .Conversion .Webhook .ClientConfig == nil {
219+ return nil
220+ }
221+ if bytes .Equal (crd .Spec .Conversion .Webhook .ClientConfig .CABundle , caData ) {
222+ return nil
223+ }
224+ crd .Spec .Conversion .Webhook .ClientConfig .CABundle = caData
225+ _ , err = clientset .ApiextensionsV1 ().CustomResourceDefinitions ().Update (context .Background (), crd , metav1.UpdateOptions {})
226+ return err
227+ })
228+ if err != nil {
229+ log .Error (err , "failed to inject CA bundle" , "crd" , name )
230+ } else {
231+ log .Info ("Injected conversion CRD CA bundle" , "crd" , name )
232+ }
233+ }
234+ return nil
235+ }
0 commit comments