Skip to content

Commit

Permalink
Merge pull request #336 from kvaps/reconcole-infra-eps
Browse files Browse the repository at this point in the history
Add support for reconciling and restoring infra EndpointSlices
  • Loading branch information
kubevirt-bot authored Jan 2, 2025
2 parents da9e0cf + 480cb31 commit 443a1fe
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 11 deletions.
84 changes: 73 additions & 11 deletions pkg/controller/kubevirteps/kubevirteps_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,32 +108,24 @@ func newRequest(reqType ReqType, obj interface{}, oldObj interface{}) *Request {
}

func (c *Controller) Init() error {

// Act on events from Services on the infra cluster. These are created by the EnsureLoadBalancer function.
// We need to watch for these events so that we can update the EndpointSlices in the infra cluster accordingly.
// Existing Service event handlers...
_, err := c.infraFactory.Core().V1().Services().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
// cast obj to Service
svc := obj.(*v1.Service)
// Only act on Services of type LoadBalancer
if svc.Spec.Type == v1.ServiceTypeLoadBalancer {
klog.Infof("Service added: %v/%v", svc.Namespace, svc.Name)
c.queue.Add(newRequest(AddReq, obj, nil))
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
// cast obj to Service
newSvc := newObj.(*v1.Service)
// Only act on Services of type LoadBalancer
if newSvc.Spec.Type == v1.ServiceTypeLoadBalancer {
klog.Infof("Service updated: %v/%v", newSvc.Namespace, newSvc.Name)
c.queue.Add(newRequest(UpdateReq, newObj, oldObj))
}
},
DeleteFunc: func(obj interface{}) {
// cast obj to Service
svc := obj.(*v1.Service)
// Only act on Services of type LoadBalancer
if svc.Spec.Type == v1.ServiceTypeLoadBalancer {
klog.Infof("Service deleted: %v/%v", svc.Namespace, svc.Name)
c.queue.Add(newRequest(DeleteReq, obj, nil))
Expand All @@ -144,7 +136,7 @@ func (c *Controller) Init() error {
return err
}

// Monitor endpoint slices that we are interested in based on known services in the infra cluster
// Existing EndpointSlice event handlers in tenant cluster...
_, err = c.tenantFactory.Discovery().V1().EndpointSlices().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
eps := obj.(*discovery.EndpointSlice)
Expand Down Expand Up @@ -194,10 +186,80 @@ func (c *Controller) Init() error {
return err
}

//TODO: Add informer for EndpointSlices in the infra cluster to watch for (unwanted) changes
// Add an informer for EndpointSlices in the infra cluster
_, err = c.infraFactory.Discovery().V1().EndpointSlices().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
eps := obj.(*discovery.EndpointSlice)
if c.managedByController(eps) {
svc, svcErr := c.getInfraServiceForEPS(context.TODO(), eps)
if svcErr != nil {
klog.Errorf("Failed to get infra Service for EndpointSlice %s/%s: %v", eps.Namespace, eps.Name, svcErr)
return
}
if svc != nil {
klog.Infof("Infra EndpointSlice added: %v/%v, requeuing Service: %v/%v", eps.Namespace, eps.Name, svc.Namespace, svc.Name)
c.queue.Add(newRequest(AddReq, svc, nil))
}
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
eps := newObj.(*discovery.EndpointSlice)
if c.managedByController(eps) {
svc, svcErr := c.getInfraServiceForEPS(context.TODO(), eps)
if svcErr != nil {
klog.Errorf("Failed to get infra Service for EndpointSlice %s/%s: %v", eps.Namespace, eps.Name, svcErr)
return
}
if svc != nil {
klog.Infof("Infra EndpointSlice updated: %v/%v, requeuing Service: %v/%v", eps.Namespace, eps.Name, svc.Namespace, svc.Name)
c.queue.Add(newRequest(UpdateReq, svc, nil))
}
}
},
DeleteFunc: func(obj interface{}) {
eps := obj.(*discovery.EndpointSlice)
if c.managedByController(eps) {
svc, svcErr := c.getInfraServiceForEPS(context.TODO(), eps)
if svcErr != nil {
klog.Errorf("Failed to get infra Service for EndpointSlice %s/%s on delete: %v", eps.Namespace, eps.Name, svcErr)
return
}
if svc != nil {
klog.Infof("Infra EndpointSlice deleted: %v/%v, requeuing Service: %v/%v", eps.Namespace, eps.Name, svc.Namespace, svc.Name)
c.queue.Add(newRequest(DeleteReq, svc, nil))
}
}
},
})
if err != nil {
return err
}

return nil
}

// getInfraServiceForEPS returns the Service in the infra cluster associated with the given EndpointSlice.
// It does this by reading the "kubernetes.io/service-name" label from the EndpointSlice, which should correspond
// to the Service name. If not found or if the Service doesn't exist, it returns nil.
func (c *Controller) getInfraServiceForEPS(ctx context.Context, eps *discovery.EndpointSlice) (*v1.Service, error) {
svcName := eps.Labels[discovery.LabelServiceName]
if svcName == "" {
// No service name label found, can't determine infra service.
return nil, nil
}

svc, err := c.infraClient.CoreV1().Services(c.infraNamespace).Get(ctx, svcName, metav1.GetOptions{})
if err != nil {
if k8serrors.IsNotFound(err) {
// Service doesn't exist
return nil, nil
}
return nil, err
}

return svc, nil
}

// Run starts an asynchronous loop that monitors and updates GKENetworkParamSet in the cluster.
func (c *Controller) Run(numWorkers int, stopCh <-chan struct{}, controllerManagerMetrics *controllersmetrics.ControllerManagerMetrics) {
defer utilruntime.HandleCrash()
Expand Down
44 changes: 44 additions & 0 deletions pkg/controller/kubevirteps/kubevirteps_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,5 +642,49 @@ var _ = g.Describe("KubevirtEPSController", g.Ordered, func() {
}).Should(BeTrue(), "Expect call to the infraDynamic.Get to return the VMI")

})

g.It("Should reconcile after infra EndpointSlice deletion and restore it", func() {
// Create a VMI in the infra cluster
// This ensures that when tenant EndpointSlice is created, it can be reconciled properly
createAndAssertVMI("worker-0-test", "ip-10-32-5-13", "123.45.67.89")

// Create an EndpointSlice in the tenant cluster representing the desired state
createAndAssertTenantSlice("test-epslice-infra", "tenant-service-name", discoveryv1.AddressTypeIPv4,
*createPort("http", 80, v1.ProtocolTCP),
[]discoveryv1.Endpoint{*createEndpoint("123.45.67.89", "worker-0-test", true, true, false)})

// Create a Service in the infra cluster that should trigger the creation of an EndpointSlice in the infra cluster
createAndAssertInfraServiceLB("infra-service-restore", "tenant-service-name", "test-cluster",
v1.ServicePort{Name: "web", Port: 80, NodePort: 31900, Protocol: v1.ProtocolTCP, TargetPort: intstr.IntOrString{IntVal: 30390}},
v1.ServiceExternalTrafficPolicyLocal)

var epsList *discoveryv1.EndpointSliceList
var err error

// Wait until the infra EndpointSlice is created by the controller
Eventually(func() (bool, error) {
epsList, err = testVals.infraClient.DiscoveryV1().EndpointSlices(infraNamespace).List(context.TODO(), metav1.ListOptions{})
if len(epsList.Items) == 1 {
return true, err
} else {
return false, err
}
}).Should(BeTrue(), "Infra EndpointSlice should be created by the controller")

// Now, simulate an external deletion of the EndpointSlice in the infra cluster
err = testVals.infraClient.DiscoveryV1().EndpointSlices(infraNamespace).Delete(context.TODO(), epsList.Items[0].Name, metav1.DeleteOptions{})
Expect(err).To(BeNil(), "Deleting infra EndpointSlice should succeed")

// The controller, now watching infra EndpointSlices, should detect the removal
// and trigger a reconcile to restore it.
Eventually(func() (bool, error) {
epsList, err = testVals.infraClient.DiscoveryV1().EndpointSlices(infraNamespace).List(context.TODO(), metav1.ListOptions{})
// After some time, we expect exactly one EndpointSlice to be recreated.
if err == nil && len(epsList.Items) == 1 {
return true, nil
}
return false, err
}).Should(BeTrue(), "EndpointSlice in infra cluster should be recreated by the controller after deletion")
})
})
})

0 comments on commit 443a1fe

Please sign in to comment.