diff --git a/pkg/controller/kubevirteps/kubevirteps_controller.go b/pkg/controller/kubevirteps/kubevirteps_controller.go index a3c1aa338..6f6e3d322 100644 --- a/pkg/controller/kubevirteps/kubevirteps_controller.go +++ b/pkg/controller/kubevirteps/kubevirteps_controller.go @@ -108,32 +108,24 @@ func newRequest(reqType ReqType, obj interface{}, oldObj interface{}) *Request { } func (c *Controller) Init() error { - - // Act on events from Services on the infra cluster. These are created by the EnsureLoadBalancer function. - // We need to watch for these events so that we can update the EndpointSlices in the infra cluster accordingly. + // Existing Service event handlers... _, err := c.infraFactory.Core().V1().Services().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - // cast obj to Service svc := obj.(*v1.Service) - // Only act on Services of type LoadBalancer if svc.Spec.Type == v1.ServiceTypeLoadBalancer { klog.Infof("Service added: %v/%v", svc.Namespace, svc.Name) c.queue.Add(newRequest(AddReq, obj, nil)) } }, UpdateFunc: func(oldObj, newObj interface{}) { - // cast obj to Service newSvc := newObj.(*v1.Service) - // Only act on Services of type LoadBalancer if newSvc.Spec.Type == v1.ServiceTypeLoadBalancer { klog.Infof("Service updated: %v/%v", newSvc.Namespace, newSvc.Name) c.queue.Add(newRequest(UpdateReq, newObj, oldObj)) } }, DeleteFunc: func(obj interface{}) { - // cast obj to Service svc := obj.(*v1.Service) - // Only act on Services of type LoadBalancer if svc.Spec.Type == v1.ServiceTypeLoadBalancer { klog.Infof("Service deleted: %v/%v", svc.Namespace, svc.Name) c.queue.Add(newRequest(DeleteReq, obj, nil)) @@ -144,7 +136,7 @@ func (c *Controller) Init() error { return err } - // Monitor endpoint slices that we are interested in based on known services in the infra cluster + // Existing EndpointSlice event handlers in tenant cluster... _, err = c.tenantFactory.Discovery().V1().EndpointSlices().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { eps := obj.(*discovery.EndpointSlice) @@ -194,10 +186,80 @@ func (c *Controller) Init() error { return err } - //TODO: Add informer for EndpointSlices in the infra cluster to watch for (unwanted) changes + // Add an informer for EndpointSlices in the infra cluster + _, err = c.infraFactory.Discovery().V1().EndpointSlices().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + eps := obj.(*discovery.EndpointSlice) + if c.managedByController(eps) { + svc, svcErr := c.getInfraServiceForEPS(context.TODO(), eps) + if svcErr != nil { + klog.Errorf("Failed to get infra Service for EndpointSlice %s/%s: %v", eps.Namespace, eps.Name, svcErr) + return + } + if svc != nil { + klog.Infof("Infra EndpointSlice added: %v/%v, requeuing Service: %v/%v", eps.Namespace, eps.Name, svc.Namespace, svc.Name) + c.queue.Add(newRequest(AddReq, svc, nil)) + } + } + }, + UpdateFunc: func(oldObj, newObj interface{}) { + eps := newObj.(*discovery.EndpointSlice) + if c.managedByController(eps) { + svc, svcErr := c.getInfraServiceForEPS(context.TODO(), eps) + if svcErr != nil { + klog.Errorf("Failed to get infra Service for EndpointSlice %s/%s: %v", eps.Namespace, eps.Name, svcErr) + return + } + if svc != nil { + klog.Infof("Infra EndpointSlice updated: %v/%v, requeuing Service: %v/%v", eps.Namespace, eps.Name, svc.Namespace, svc.Name) + c.queue.Add(newRequest(UpdateReq, svc, nil)) + } + } + }, + DeleteFunc: func(obj interface{}) { + eps := obj.(*discovery.EndpointSlice) + if c.managedByController(eps) { + svc, svcErr := c.getInfraServiceForEPS(context.TODO(), eps) + if svcErr != nil { + klog.Errorf("Failed to get infra Service for EndpointSlice %s/%s on delete: %v", eps.Namespace, eps.Name, svcErr) + return + } + if svc != nil { + klog.Infof("Infra EndpointSlice deleted: %v/%v, requeuing Service: %v/%v", eps.Namespace, eps.Name, svc.Namespace, svc.Name) + c.queue.Add(newRequest(DeleteReq, svc, nil)) + } + } + }, + }) + if err != nil { + return err + } + return nil } +// getInfraServiceForEPS returns the Service in the infra cluster associated with the given EndpointSlice. +// It does this by reading the "kubernetes.io/service-name" label from the EndpointSlice, which should correspond +// to the Service name. If not found or if the Service doesn't exist, it returns nil. +func (c *Controller) getInfraServiceForEPS(ctx context.Context, eps *discovery.EndpointSlice) (*v1.Service, error) { + svcName := eps.Labels[discovery.LabelServiceName] + if svcName == "" { + // No service name label found, can't determine infra service. + return nil, nil + } + + svc, err := c.infraClient.CoreV1().Services(c.infraNamespace).Get(ctx, svcName, metav1.GetOptions{}) + if err != nil { + if k8serrors.IsNotFound(err) { + // Service doesn't exist + return nil, nil + } + return nil, err + } + + return svc, nil +} + // Run starts an asynchronous loop that monitors and updates GKENetworkParamSet in the cluster. func (c *Controller) Run(numWorkers int, stopCh <-chan struct{}, controllerManagerMetrics *controllersmetrics.ControllerManagerMetrics) { defer utilruntime.HandleCrash() diff --git a/pkg/controller/kubevirteps/kubevirteps_controller_test.go b/pkg/controller/kubevirteps/kubevirteps_controller_test.go index 99664b903..1fb86e25f 100644 --- a/pkg/controller/kubevirteps/kubevirteps_controller_test.go +++ b/pkg/controller/kubevirteps/kubevirteps_controller_test.go @@ -642,5 +642,49 @@ var _ = g.Describe("KubevirtEPSController", g.Ordered, func() { }).Should(BeTrue(), "Expect call to the infraDynamic.Get to return the VMI") }) + + g.It("Should reconcile after infra EndpointSlice deletion and restore it", func() { + // Create a VMI in the infra cluster + // This ensures that when tenant EndpointSlice is created, it can be reconciled properly + createAndAssertVMI("worker-0-test", "ip-10-32-5-13", "123.45.67.89") + + // Create an EndpointSlice in the tenant cluster representing the desired state + createAndAssertTenantSlice("test-epslice-infra", "tenant-service-name", discoveryv1.AddressTypeIPv4, + *createPort("http", 80, v1.ProtocolTCP), + []discoveryv1.Endpoint{*createEndpoint("123.45.67.89", "worker-0-test", true, true, false)}) + + // Create a Service in the infra cluster that should trigger the creation of an EndpointSlice in the infra cluster + createAndAssertInfraServiceLB("infra-service-restore", "tenant-service-name", "test-cluster", + v1.ServicePort{Name: "web", Port: 80, NodePort: 31900, Protocol: v1.ProtocolTCP, TargetPort: intstr.IntOrString{IntVal: 30390}}, + v1.ServiceExternalTrafficPolicyLocal) + + var epsList *discoveryv1.EndpointSliceList + var err error + + // Wait until the infra EndpointSlice is created by the controller + Eventually(func() (bool, error) { + epsList, err = testVals.infraClient.DiscoveryV1().EndpointSlices(infraNamespace).List(context.TODO(), metav1.ListOptions{}) + if len(epsList.Items) == 1 { + return true, err + } else { + return false, err + } + }).Should(BeTrue(), "Infra EndpointSlice should be created by the controller") + + // Now, simulate an external deletion of the EndpointSlice in the infra cluster + err = testVals.infraClient.DiscoveryV1().EndpointSlices(infraNamespace).Delete(context.TODO(), epsList.Items[0].Name, metav1.DeleteOptions{}) + Expect(err).To(BeNil(), "Deleting infra EndpointSlice should succeed") + + // The controller, now watching infra EndpointSlices, should detect the removal + // and trigger a reconcile to restore it. + Eventually(func() (bool, error) { + epsList, err = testVals.infraClient.DiscoveryV1().EndpointSlices(infraNamespace).List(context.TODO(), metav1.ListOptions{}) + // After some time, we expect exactly one EndpointSlice to be recreated. + if err == nil && len(epsList.Items) == 1 { + return true, nil + } + return false, err + }).Should(BeTrue(), "EndpointSlice in infra cluster should be recreated by the controller after deletion") + }) }) })