diff --git a/automation/test.sh b/automation/test.sh index 42951da97778..dbe0940eadc8 100644 --- a/automation/test.sh +++ b/automation/test.sh @@ -106,10 +106,17 @@ while [ -n "$(kubectl get pods --no-headers | grep -v Running)" ]; do sleep 10 done -# Make sure all containers are ready -while [ -n "$(kubectl get pods -o'custom-columns=status:status.containerStatuses[*].ready' --no-headers | grep false)" ]; do +# Make sure all containers except virt-controller are ready +while [ -n "$(kubectl get pods -o'custom-columns=status:status.containerStatuses[*].ready,metadata:metadata.name' --no-headers | awk '!/virt-controller/ && /false/')" ]; do echo "Waiting for KubeVirt containers to become ready ..." - kubectl get pods -ocustom-columns='name:metadata.name,ready:status.containerStatuses[*].ready' | grep false || true + kubectl get pods -o'custom-columns=status:status.containerStatuses[*].ready,metadata:metadata.name' --no-headers | awk '!/virt-controller/ && /false/' || true + sleep 10 +done + +# Make sure that at least one virt-controller container is ready +while [ "$(kubectl get pods -o'custom-columns=status:status.containerStatuses[*].ready,metadata:metadata.name' --no-headers | awk '/virt-controller/ && /true/' | wc -l)" -lt "1" ]; do + echo "Waiting for KubeVirt virt-controller container to become ready ..." + kubectl get pods -o'custom-columns=status:status.containerStatuses[*].ready,metadata:metadata.name' --no-headers | awk '/virt-controller/ && /true/' | wc -l sleep 10 done diff --git a/manifests/virt-controller.yaml.in b/manifests/virt-controller.yaml.in index cb953f4d1807..a88dadb8352b 100644 --- a/manifests/virt-controller.yaml.in +++ b/manifests/virt-controller.yaml.in @@ -16,6 +16,7 @@ kind: Deployment metadata: name: virt-controller spec: + replicas: 2 template: metadata: labels: @@ -38,6 +39,19 @@ spec: - containerPort: 8182 name: "virt-controller" protocol: "TCP" + livenessProbe: + failureThreshold: 8 + httpGet: + port: 8182 + path: /healthz + initialDelaySeconds: 15 + timeoutSeconds: 10 + readinessProbe: + httpGet: + port: 8182 + path: /leader + initialDelaySeconds: 15 + timeoutSeconds: 10 securityContext: runAsNonRoot: true nodeSelector: diff --git a/pkg/virt-controller/leaderelectionconfig/config.go b/pkg/virt-controller/leaderelectionconfig/config.go new file mode 100644 index 000000000000..dfe287eacf9d --- /dev/null +++ b/pkg/virt-controller/leaderelectionconfig/config.go @@ -0,0 +1,65 @@ +/* + * This file is part of the KubeVirt project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Copyright 2017 Red Hat, Inc. + * + */ + +package leaderelectionconfig + +import ( + "flag" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + rl "k8s.io/client-go/tools/leaderelection/resourcelock" +) + +const ( + DefaultLeaseDuration = 15 * time.Second + DefaultRenewDeadline = 10 * time.Second + DefaultRetryPeriod = 2 * time.Second + DefaultNamespace = "default" + DefaultEndpointName = "virt-controller-service" +) + +func DefaultLeaderElectionConfiguration() Configuration { + return Configuration{ + LeaseDuration: metav1.Duration{Duration: DefaultLeaseDuration}, + RenewDeadline: metav1.Duration{Duration: DefaultRenewDeadline}, + RetryPeriod: metav1.Duration{Duration: DefaultRetryPeriod}, + ResourceLock: rl.EndpointsResourceLock, + } +} + +// BindFlags binds the common LeaderElectionCLIConfig flags +func BindFlags(l *Configuration) { + flag.DurationVar(&l.LeaseDuration.Duration, "leader-elect-lease-duration", l.LeaseDuration.Duration, ""+ + "The duration that non-leader candidates will wait after observing a leadership "+ + "renewal until attempting to acquire leadership of a led but unrenewed leader "+ + "slot. This is effectively the maximum duration that a leader can be stopped "+ + "before it is replaced by another candidate. This is only applicable if leader "+ + "election is enabled.") + flag.DurationVar(&l.RenewDeadline.Duration, "leader-elect-renew-deadline", l.RenewDeadline.Duration, ""+ + "The interval between attempts by the acting master to renew a leadership slot "+ + "before it stops leading. This must be less than or equal to the lease duration. "+ + "This is only applicable if leader election is enabled.") + flag.DurationVar(&l.RetryPeriod.Duration, "leader-elect-retry-period", l.RetryPeriod.Duration, ""+ + "The duration the clients should wait between attempting acquisition and renewal "+ + "of a leadership. This is only applicable if leader election is enabled.") + flag.StringVar(&l.ResourceLock, "leader-elect-resource-lock", l.ResourceLock, ""+ + "The type of resource object that is used for locking during "+ + "leader election. Supported options are `endpoints` (default) and `configmap`.") +} diff --git a/pkg/virt-controller/leaderelectionconfig/types.go b/pkg/virt-controller/leaderelectionconfig/types.go new file mode 100644 index 000000000000..33c19aea737b --- /dev/null +++ b/pkg/virt-controller/leaderelectionconfig/types.go @@ -0,0 +1,46 @@ +/* + * This file is part of the KubeVirt project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Copyright 2017 Red Hat, Inc. + * + */ + +package leaderelectionconfig + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +type Configuration struct { + // leaseDuration is the duration that non-leader candidates will wait + // after observing a leadership renewal until attempting to acquire + // leadership of a led but unrenewed leader slot. This is effectively the + // maximum duration that a leader can be stopped before it is replaced + // by another candidate. This is only applicable if leader election is + // enabled. + LeaseDuration metav1.Duration + // renewDeadline is the interval between attempts by the acting master to + // renew a leadership slot before it stops leading. This must be less + // than or equal to the lease duration. This is only applicable if leader + // election is enabled. + RenewDeadline metav1.Duration + // retryPeriod is the duration the clients should wait between attempting + // acquisition and renewal of a leadership. This is only applicable if + // leader election is enabled. + RetryPeriod metav1.Duration + // resourceLock indicates the resource object type that will be used to lock + // during leader election cycles. + ResourceLock string +} diff --git a/pkg/virt-controller/rest/rest.go b/pkg/virt-controller/rest/rest.go index d28eda774d9d..b829fb0a35e5 100644 --- a/pkg/virt-controller/rest/rest.go +++ b/pkg/virt-controller/rest/rest.go @@ -22,7 +22,6 @@ package rest import ( "github.com/emicklei/go-restful" - "kubevirt.io/kubevirt/pkg/api/v1" "kubevirt.io/kubevirt/pkg/healthz" ) @@ -31,5 +30,5 @@ var WebService *restful.WebService func init() { WebService = new(restful.WebService) WebService.Path("/").Consumes(restful.MIME_JSON).Produces(restful.MIME_JSON) - WebService.Route(WebService.GET("/apis/" + v1.GroupVersion.String() + "/healthz").To(healthz.KubeConnectionHealthzFunc).Doc("Health endpoint")) + WebService.Route(WebService.GET("/healthz").To(healthz.KubeConnectionHealthzFunc).Doc("Health endpoint")) } diff --git a/pkg/virt-controller/watch/application.go b/pkg/virt-controller/watch/application.go index 252af145a9f1..0f327eddf27d 100644 --- a/pkg/virt-controller/watch/application.go +++ b/pkg/virt-controller/watch/application.go @@ -4,23 +4,25 @@ import ( "flag" golog "log" "net/http" + "os" "strconv" "github.com/emicklei/go-restful" - "k8s.io/api/core/v1" k8sv1 "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes/scheme" k8coresv1 "k8s.io/client-go/kubernetes/typed/core/v1" - v12 "k8s.io/client-go/kubernetes/typed/core/v1" clientrest "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" "kubevirt.io/kubevirt/pkg/controller" "kubevirt.io/kubevirt/pkg/kubecli" "kubevirt.io/kubevirt/pkg/logging" - registrydisk "kubevirt.io/kubevirt/pkg/registry-disk" + "kubevirt.io/kubevirt/pkg/registry-disk" + "kubevirt.io/kubevirt/pkg/virt-controller/leaderelectionconfig" "kubevirt.io/kubevirt/pkg/virt-controller/rest" "kubevirt.io/kubevirt/pkg/virt-controller/services" ) @@ -47,20 +49,27 @@ type VirtControllerApp struct { rsController *VMReplicaSet rsInformer cache.SharedIndexInformer + LeaderElection leaderelectionconfig.Configuration + host string port int launcherImage string migratorImage string socketDir string ephemeralDiskDir string + readyChan chan bool } func Execute() { var err error var app VirtControllerApp = VirtControllerApp{} + app.LeaderElection = leaderelectionconfig.DefaultLeaderElectionConfiguration() + app.DefineFlags() + app.readyChan = make(chan bool, 1) + logging.InitializeLogging("virt-controller") app.clientSet, err = kubecli.GetKubevirtClient() @@ -71,7 +80,9 @@ func Execute() { app.restClient = app.clientSet.RestClient() - restful.Add(rest.WebService) + webService := rest.WebService + webService.Route(webService.GET("/leader").To(app.readinessProbe).Doc("Leader endpoint")) + restful.Add(webService) // Bootstrapping. From here on the initialization order is important @@ -92,9 +103,7 @@ func Execute() { app.podInformer.AddEventHandler(controller.NewResourceEventHandlerFuncsForFunc(migrationPodLabelHandler(app.migrationQueue))) app.migrationCache = app.migrationInformer.GetStore() - broadcaster := record.NewBroadcaster() - broadcaster.StartRecordingToSink(&k8coresv1.EventSinkImpl{Interface: app.clientSet.CoreV1().Events(k8sv1.NamespaceAll)}) - app.migrationRecorder = broadcaster.NewRecorder(scheme.Scheme, k8sv1.EventSource{Component: "virt-migration-controller"}) + app.migrationRecorder = app.getNewRecorder(k8sv1.NamespaceAll, "virt-migration-controller") app.rsInformer = app.informerFactory.VMReplicaSet() @@ -107,14 +116,63 @@ func (vca *VirtControllerApp) Run() { stop := make(chan struct{}) defer close(stop) vca.informerFactory.Start(stop) - go vca.vmController.Run(3, stop) - go vca.migrationController.Run(3, stop) - go vca.rsController.Run(3, stop) - httpLogger := logger.With("service", "http") - httpLogger.Info().Log("action", "listening", "interface", vca.host, "port", vca.port) - if err := http.ListenAndServe(vca.host+":"+strconv.Itoa(vca.port), nil); err != nil { + go func() { + httpLogger := logger.With("service", "http") + httpLogger.Info().Log("action", "listening", "interface", vca.host, "port", vca.port) + if err := http.ListenAndServe(vca.host+":"+strconv.Itoa(vca.port), nil); err != nil { + golog.Fatal(err) + } + }() + + recorder := vca.getNewRecorder(k8sv1.NamespaceAll, leaderelectionconfig.DefaultEndpointName) + + id, err := os.Hostname() + if err != nil { + golog.Fatalf("unable to get hostname: %v", err) + } + + rl, err := resourcelock.New(vca.LeaderElection.ResourceLock, + leaderelectionconfig.DefaultNamespace, + leaderelectionconfig.DefaultEndpointName, + vca.clientSet.CoreV1(), + resourcelock.ResourceLockConfig{ + Identity: id, + EventRecorder: recorder, + }) + if err != nil { + golog.Fatal(err) + } + + leaderElector, err := leaderelection.NewLeaderElector( + leaderelection.LeaderElectionConfig{ + Lock: rl, + LeaseDuration: vca.LeaderElection.LeaseDuration.Duration, + RenewDeadline: vca.LeaderElection.RenewDeadline.Duration, + RetryPeriod: vca.LeaderElection.RetryPeriod.Duration, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: func(stopCh <-chan struct{}) { + go vca.vmController.Run(3, stop) + go vca.migrationController.Run(3, stop) + go vca.rsController.Run(3, stop) + close(vca.readyChan) + }, + OnStoppedLeading: func() { + golog.Fatal("leaderelection lost") + }, + }, + }) + if err != nil { golog.Fatal(err) } + + leaderElector.Run() + panic("unreachable") +} + +func (vca *VirtControllerApp) getNewRecorder(namespace string, componentName string) record.EventRecorder { + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartRecordingToSink(&k8coresv1.EventSinkImpl{Interface: vca.clientSet.CoreV1().Events(namespace)}) + return eventBroadcaster.NewRecorder(scheme.Scheme, k8sv1.EventSource{Component: componentName}) } func (vca *VirtControllerApp) initCommon() { @@ -134,14 +192,26 @@ func (vca *VirtControllerApp) initCommon() { } func (vca *VirtControllerApp) initReplicaSet() { - broadcaster := record.NewBroadcaster() - broadcaster.StartRecordingToSink(&v12.EventSinkImpl{Interface: vca.clientSet.CoreV1().Events(v1.NamespaceAll)}) - // TODO what is scheme used for in Recorder? - recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "virtualmachinereplicaset-controller"}) - + recorder := vca.getNewRecorder(k8sv1.NamespaceAll, "virtualmachinereplicaset-controller") vca.rsController = NewVMReplicaSet(vca.vmInformer, vca.rsInformer, recorder, vca.clientSet, controller.BurstReplicas) } +func (vca *VirtControllerApp) readinessProbe(_ *restful.Request, response *restful.Response) { + res := map[string]interface{}{} + + select { + case _, opened := <-vca.readyChan: + if !opened { + res["apiserver"] = map[string]interface{}{"leader": "true"} + response.WriteHeaderAndJson(http.StatusOK, res, restful.MIME_JSON) + return + } + default: + } + res["apiserver"] = map[string]interface{}{"leader": "false", "error": "current pod is not leader"} + response.WriteHeaderAndJson(http.StatusServiceUnavailable, res, restful.MIME_JSON) +} + func (vca *VirtControllerApp) DefineFlags() { flag.StringVar(&vca.host, "listen", "0.0.0.0", "Address and port where to listen on") flag.IntVar(&vca.port, "port", 8182, "Port to listen on") @@ -149,5 +219,6 @@ func (vca *VirtControllerApp) DefineFlags() { flag.StringVar(&vca.migratorImage, "migrator-image", "virt-handler", "Container which orchestrates a VM migration") flag.StringVar(&vca.socketDir, "socket-dir", "/var/run/kubevirt", "Directory where to look for sockets for cgroup detection") flag.StringVar(&vca.ephemeralDiskDir, "ephemeral-disk-dir", "/var/run/libvirt/kubevirt-ephemeral-disk", "Base direcetory for ephemeral disk data") + leaderelectionconfig.BindFlags(&vca.LeaderElection) flag.Parse() } diff --git a/pkg/virt-controller/watch/application_test.go b/pkg/virt-controller/watch/application_test.go new file mode 100644 index 000000000000..a2cf45c18b46 --- /dev/null +++ b/pkg/virt-controller/watch/application_test.go @@ -0,0 +1,77 @@ +/* + * This file is part of the KubeVirt project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Copyright 2017 Red Hat, Inc. + * + */ + +package watch + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "net/http" + + "net/http/httptest" + "net/url" + + "github.com/emicklei/go-restful" + + "kubevirt.io/kubevirt/pkg/rest" +) + +func newValidGetRequest() *http.Request { + request, _ := http.NewRequest("GET", "/leader", nil) + return request +} + +var _ = Describe("Application", func() { + var app VirtControllerApp = VirtControllerApp{} + + Describe("Readiness probe", func() { + var recorder *httptest.ResponseRecorder + var request *http.Request + var handler http.Handler + + BeforeEach(func() { + app.readyChan = make(chan bool, 1) + + ws := new(restful.WebService) + ws.Produces(restful.MIME_JSON) + handler = http.Handler(restful.NewContainer().Add(ws)) + ws.Route(ws.GET("/leader").Produces(rest.MIME_JSON).To(app.readinessProbe)) + + request = newValidGetRequest() + recorder = httptest.NewRecorder() + }) + + Context("with closed channel", func() { + It("should return 200", func() { + close(app.readyChan) + request.URL, _ = url.Parse("/leader") + handler.ServeHTTP(recorder, request) + Expect(recorder.Code).To(Equal(http.StatusOK)) + }) + }) + Context("with opened channel", func() { + It("should return 503", func() { + request.URL, _ = url.Parse("/leader") + handler.ServeHTTP(recorder, request) + Expect(recorder.Code).To(Equal(http.StatusServiceUnavailable)) + }) + }) + }) +}) diff --git a/tests/controller_leader_election_test.go b/tests/controller_leader_election_test.go new file mode 100644 index 000000000000..77bbfff233f5 --- /dev/null +++ b/tests/controller_leader_election_test.go @@ -0,0 +1,122 @@ +/* + * This file is part of the KubeVirt project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Copyright 2017 Red Hat, Inc. + * + */ + +package tests_test + +import ( + "encoding/json" + "flag" + "fmt" + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + k8sv1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/tools/leaderelection/resourcelock" + + "kubevirt.io/kubevirt/pkg/kubecli" + "kubevirt.io/kubevirt/pkg/virt-controller/leaderelectionconfig" + "kubevirt.io/kubevirt/tests" +) + +var _ = Describe("LeaderElection", func() { + flag.Parse() + + virtClient, err := kubecli.GetKubevirtClient() + tests.PanicOnError(err) + + BeforeEach(func() { + tests.BeforeTestCleanup() + }) + + Context("Controller pod destroyed", func() { + It("should success to start VM", func() { + newLeaderPod := getNewLeaderPod(virtClient) + Expect(newLeaderPod).NotTo(BeNil()) + + // TODO: It can be race condition when newly deployed pod receive leadership, in this case we will need + // to reduce Deployment replica before destroy the pod and restore it after the test + Eventually(func() string { + leaderPodName := getLeader() + + Expect(virtClient.CoreV1().Pods(leaderelectionconfig.DefaultNamespace).Delete(leaderPodName, &metav1.DeleteOptions{})).To(BeNil()) + + Eventually(getLeader, 30*time.Second, 5*time.Second).ShouldNot(Equal(leaderPodName)) + + leaderPod, err := virtClient.CoreV1().Pods(leaderelectionconfig.DefaultNamespace).Get(getLeader(), metav1.GetOptions{}) + Expect(err).To(BeNil()) + + return leaderPod.Name + }, 90*time.Second, 5*time.Second).Should(Equal(newLeaderPod.Name)) + + Eventually(func() k8sv1.ConditionStatus { + leaderPod, err := virtClient.CoreV1().Pods(leaderelectionconfig.DefaultNamespace).Get(newLeaderPod.Name, metav1.GetOptions{}) + Expect(err).To(BeNil()) + + for _, condition := range leaderPod.Status.Conditions { + if condition.Type == k8sv1.PodReady { + return condition.Status + } + } + return k8sv1.ConditionUnknown + }, + 30*time.Second, + 5*time.Second).Should(Equal(k8sv1.ConditionTrue)) + + vm := tests.NewRandomVM() + obj, err := virtClient.RestClient().Post().Resource("virtualmachines").Namespace(tests.NamespaceTestDefault).Body(vm).Do().Get() + Expect(err).To(BeNil()) + tests.WaitForSuccessfulVMStart(obj) + }, 150) + }) +}) + +func getLeader() string { + virtClient, err := kubecli.GetKubevirtClient() + tests.PanicOnError(err) + + controllerEndpoint, err := virtClient.CoreV1().Endpoints(leaderelectionconfig.DefaultNamespace).Get(leaderelectionconfig.DefaultEndpointName, metav1.GetOptions{}) + tests.PanicOnError(err) + + var record resourcelock.LeaderElectionRecord + if recordBytes, found := controllerEndpoint.Annotations[resourcelock.LeaderElectionRecordAnnotationKey]; found { + err := json.Unmarshal([]byte(recordBytes), &record) + tests.PanicOnError(err) + } + return record.HolderIdentity +} + +func getNewLeaderPod(virtClient kubecli.KubevirtClient) *k8sv1.Pod { + labelSelector, err := labels.Parse(fmt.Sprint("app=virt-controller")) + tests.PanicOnError(err) + fieldSelector := fields.ParseSelectorOrDie("status.phase=" + string(k8sv1.PodRunning)) + controllerPods, err := virtClient.CoreV1().Pods(leaderelectionconfig.DefaultNamespace).List( + metav1.ListOptions{LabelSelector: labelSelector.String(), FieldSelector: fieldSelector.String()}) + leaderPodName := getLeader() + for _, pod := range controllerPods.Items { + if pod.Name != leaderPodName { + return &pod + } + } + return nil +}