Skip to content

Commit

Permalink
Merge pull request kubevirt#461 from cynepco3hahue/add_lead_election_…
Browse files Browse the repository at this point in the history
…to_controller

Add leader election mechanism to virt-controller
  • Loading branch information
davidvossel authored Oct 18, 2017
2 parents 75b8634 + 4dcb03c commit 5a76441
Show file tree
Hide file tree
Showing 8 changed files with 424 additions and 23 deletions.
13 changes: 10 additions & 3 deletions automation/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,17 @@ while [ -n "$(kubectl get pods --no-headers | grep -v Running)" ]; do
sleep 10
done

# Make sure all containers are ready
while [ -n "$(kubectl get pods -o'custom-columns=status:status.containerStatuses[*].ready' --no-headers | grep false)" ]; do
# Make sure all containers except virt-controller are ready
while [ -n "$(kubectl get pods -o'custom-columns=status:status.containerStatuses[*].ready,metadata:metadata.name' --no-headers | awk '!/virt-controller/ && /false/')" ]; do
echo "Waiting for KubeVirt containers to become ready ..."
kubectl get pods -ocustom-columns='name:metadata.name,ready:status.containerStatuses[*].ready' | grep false || true
kubectl get pods -o'custom-columns=status:status.containerStatuses[*].ready,metadata:metadata.name' --no-headers | awk '!/virt-controller/ && /false/' || true
sleep 10
done

# Make sure that at least one virt-controller container is ready
while [ "$(kubectl get pods -o'custom-columns=status:status.containerStatuses[*].ready,metadata:metadata.name' --no-headers | awk '/virt-controller/ && /true/' | wc -l)" -lt "1" ]; do
echo "Waiting for KubeVirt virt-controller container to become ready ..."
kubectl get pods -o'custom-columns=status:status.containerStatuses[*].ready,metadata:metadata.name' --no-headers | awk '/virt-controller/ && /true/' | wc -l
sleep 10
done

Expand Down
14 changes: 14 additions & 0 deletions manifests/virt-controller.yaml.in
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ kind: Deployment
metadata:
name: virt-controller
spec:
replicas: 2
template:
metadata:
labels:
Expand All @@ -38,6 +39,19 @@ spec:
- containerPort: 8182
name: "virt-controller"
protocol: "TCP"
livenessProbe:
failureThreshold: 8
httpGet:
port: 8182
path: /healthz
initialDelaySeconds: 15
timeoutSeconds: 10
readinessProbe:
httpGet:
port: 8182
path: /leader
initialDelaySeconds: 15
timeoutSeconds: 10
securityContext:
runAsNonRoot: true
nodeSelector:
Expand Down
65 changes: 65 additions & 0 deletions pkg/virt-controller/leaderelectionconfig/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* This file is part of the KubeVirt project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Copyright 2017 Red Hat, Inc.
*
*/

package leaderelectionconfig

import (
"flag"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
rl "k8s.io/client-go/tools/leaderelection/resourcelock"
)

const (
DefaultLeaseDuration = 15 * time.Second
DefaultRenewDeadline = 10 * time.Second
DefaultRetryPeriod = 2 * time.Second
DefaultNamespace = "default"
DefaultEndpointName = "virt-controller-service"
)

func DefaultLeaderElectionConfiguration() Configuration {
return Configuration{
LeaseDuration: metav1.Duration{Duration: DefaultLeaseDuration},
RenewDeadline: metav1.Duration{Duration: DefaultRenewDeadline},
RetryPeriod: metav1.Duration{Duration: DefaultRetryPeriod},
ResourceLock: rl.EndpointsResourceLock,
}
}

// BindFlags binds the common LeaderElectionCLIConfig flags
func BindFlags(l *Configuration) {
flag.DurationVar(&l.LeaseDuration.Duration, "leader-elect-lease-duration", l.LeaseDuration.Duration, ""+
"The duration that non-leader candidates will wait after observing a leadership "+
"renewal until attempting to acquire leadership of a led but unrenewed leader "+
"slot. This is effectively the maximum duration that a leader can be stopped "+
"before it is replaced by another candidate. This is only applicable if leader "+
"election is enabled.")
flag.DurationVar(&l.RenewDeadline.Duration, "leader-elect-renew-deadline", l.RenewDeadline.Duration, ""+
"The interval between attempts by the acting master to renew a leadership slot "+
"before it stops leading. This must be less than or equal to the lease duration. "+
"This is only applicable if leader election is enabled.")
flag.DurationVar(&l.RetryPeriod.Duration, "leader-elect-retry-period", l.RetryPeriod.Duration, ""+
"The duration the clients should wait between attempting acquisition and renewal "+
"of a leadership. This is only applicable if leader election is enabled.")
flag.StringVar(&l.ResourceLock, "leader-elect-resource-lock", l.ResourceLock, ""+
"The type of resource object that is used for locking during "+
"leader election. Supported options are `endpoints` (default) and `configmap`.")
}
46 changes: 46 additions & 0 deletions pkg/virt-controller/leaderelectionconfig/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* This file is part of the KubeVirt project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Copyright 2017 Red Hat, Inc.
*
*/

package leaderelectionconfig

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

type Configuration struct {
// leaseDuration is the duration that non-leader candidates will wait
// after observing a leadership renewal until attempting to acquire
// leadership of a led but unrenewed leader slot. This is effectively the
// maximum duration that a leader can be stopped before it is replaced
// by another candidate. This is only applicable if leader election is
// enabled.
LeaseDuration metav1.Duration
// renewDeadline is the interval between attempts by the acting master to
// renew a leadership slot before it stops leading. This must be less
// than or equal to the lease duration. This is only applicable if leader
// election is enabled.
RenewDeadline metav1.Duration
// retryPeriod is the duration the clients should wait between attempting
// acquisition and renewal of a leadership. This is only applicable if
// leader election is enabled.
RetryPeriod metav1.Duration
// resourceLock indicates the resource object type that will be used to lock
// during leader election cycles.
ResourceLock string
}
3 changes: 1 addition & 2 deletions pkg/virt-controller/rest/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ package rest
import (
"github.com/emicklei/go-restful"

"kubevirt.io/kubevirt/pkg/api/v1"
"kubevirt.io/kubevirt/pkg/healthz"
)

Expand All @@ -31,5 +30,5 @@ var WebService *restful.WebService
func init() {
WebService = new(restful.WebService)
WebService.Path("/").Consumes(restful.MIME_JSON).Produces(restful.MIME_JSON)
WebService.Route(WebService.GET("/apis/" + v1.GroupVersion.String() + "/healthz").To(healthz.KubeConnectionHealthzFunc).Doc("Health endpoint"))
WebService.Route(WebService.GET("/healthz").To(healthz.KubeConnectionHealthzFunc).Doc("Health endpoint"))
}
107 changes: 89 additions & 18 deletions pkg/virt-controller/watch/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,25 @@ import (
"flag"
golog "log"
"net/http"
"os"
"strconv"

"github.com/emicklei/go-restful"
"k8s.io/api/core/v1"
k8sv1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes/scheme"
k8coresv1 "k8s.io/client-go/kubernetes/typed/core/v1"
v12 "k8s.io/client-go/kubernetes/typed/core/v1"
clientrest "k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"

"kubevirt.io/kubevirt/pkg/controller"
"kubevirt.io/kubevirt/pkg/kubecli"
"kubevirt.io/kubevirt/pkg/logging"
registrydisk "kubevirt.io/kubevirt/pkg/registry-disk"
"kubevirt.io/kubevirt/pkg/registry-disk"
"kubevirt.io/kubevirt/pkg/virt-controller/leaderelectionconfig"
"kubevirt.io/kubevirt/pkg/virt-controller/rest"
"kubevirt.io/kubevirt/pkg/virt-controller/services"
)
Expand All @@ -47,20 +49,27 @@ type VirtControllerApp struct {
rsController *VMReplicaSet
rsInformer cache.SharedIndexInformer

LeaderElection leaderelectionconfig.Configuration

host string
port int
launcherImage string
migratorImage string
socketDir string
ephemeralDiskDir string
readyChan chan bool
}

func Execute() {
var err error
var app VirtControllerApp = VirtControllerApp{}

app.LeaderElection = leaderelectionconfig.DefaultLeaderElectionConfiguration()

app.DefineFlags()

app.readyChan = make(chan bool, 1)

logging.InitializeLogging("virt-controller")

app.clientSet, err = kubecli.GetKubevirtClient()
Expand All @@ -71,7 +80,9 @@ func Execute() {

app.restClient = app.clientSet.RestClient()

restful.Add(rest.WebService)
webService := rest.WebService
webService.Route(webService.GET("/leader").To(app.readinessProbe).Doc("Leader endpoint"))
restful.Add(webService)

// Bootstrapping. From here on the initialization order is important

Expand All @@ -92,9 +103,7 @@ func Execute() {
app.podInformer.AddEventHandler(controller.NewResourceEventHandlerFuncsForFunc(migrationPodLabelHandler(app.migrationQueue)))
app.migrationCache = app.migrationInformer.GetStore()

broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(&k8coresv1.EventSinkImpl{Interface: app.clientSet.CoreV1().Events(k8sv1.NamespaceAll)})
app.migrationRecorder = broadcaster.NewRecorder(scheme.Scheme, k8sv1.EventSource{Component: "virt-migration-controller"})
app.migrationRecorder = app.getNewRecorder(k8sv1.NamespaceAll, "virt-migration-controller")

app.rsInformer = app.informerFactory.VMReplicaSet()

Expand All @@ -107,14 +116,63 @@ func (vca *VirtControllerApp) Run() {
stop := make(chan struct{})
defer close(stop)
vca.informerFactory.Start(stop)
go vca.vmController.Run(3, stop)
go vca.migrationController.Run(3, stop)
go vca.rsController.Run(3, stop)
httpLogger := logger.With("service", "http")
httpLogger.Info().Log("action", "listening", "interface", vca.host, "port", vca.port)
if err := http.ListenAndServe(vca.host+":"+strconv.Itoa(vca.port), nil); err != nil {
go func() {
httpLogger := logger.With("service", "http")
httpLogger.Info().Log("action", "listening", "interface", vca.host, "port", vca.port)
if err := http.ListenAndServe(vca.host+":"+strconv.Itoa(vca.port), nil); err != nil {
golog.Fatal(err)
}
}()

recorder := vca.getNewRecorder(k8sv1.NamespaceAll, leaderelectionconfig.DefaultEndpointName)

id, err := os.Hostname()
if err != nil {
golog.Fatalf("unable to get hostname: %v", err)
}

rl, err := resourcelock.New(vca.LeaderElection.ResourceLock,
leaderelectionconfig.DefaultNamespace,
leaderelectionconfig.DefaultEndpointName,
vca.clientSet.CoreV1(),
resourcelock.ResourceLockConfig{
Identity: id,
EventRecorder: recorder,
})
if err != nil {
golog.Fatal(err)
}

leaderElector, err := leaderelection.NewLeaderElector(
leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: vca.LeaderElection.LeaseDuration.Duration,
RenewDeadline: vca.LeaderElection.RenewDeadline.Duration,
RetryPeriod: vca.LeaderElection.RetryPeriod.Duration,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(stopCh <-chan struct{}) {
go vca.vmController.Run(3, stop)
go vca.migrationController.Run(3, stop)
go vca.rsController.Run(3, stop)
close(vca.readyChan)
},
OnStoppedLeading: func() {
golog.Fatal("leaderelection lost")
},
},
})
if err != nil {
golog.Fatal(err)
}

leaderElector.Run()
panic("unreachable")
}

func (vca *VirtControllerApp) getNewRecorder(namespace string, componentName string) record.EventRecorder {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartRecordingToSink(&k8coresv1.EventSinkImpl{Interface: vca.clientSet.CoreV1().Events(namespace)})
return eventBroadcaster.NewRecorder(scheme.Scheme, k8sv1.EventSource{Component: componentName})
}

func (vca *VirtControllerApp) initCommon() {
Expand All @@ -134,20 +192,33 @@ func (vca *VirtControllerApp) initCommon() {
}

func (vca *VirtControllerApp) initReplicaSet() {
broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(&v12.EventSinkImpl{Interface: vca.clientSet.CoreV1().Events(v1.NamespaceAll)})
// TODO what is scheme used for in Recorder?
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "virtualmachinereplicaset-controller"})

recorder := vca.getNewRecorder(k8sv1.NamespaceAll, "virtualmachinereplicaset-controller")
vca.rsController = NewVMReplicaSet(vca.vmInformer, vca.rsInformer, recorder, vca.clientSet, controller.BurstReplicas)
}

func (vca *VirtControllerApp) readinessProbe(_ *restful.Request, response *restful.Response) {
res := map[string]interface{}{}

select {
case _, opened := <-vca.readyChan:
if !opened {
res["apiserver"] = map[string]interface{}{"leader": "true"}
response.WriteHeaderAndJson(http.StatusOK, res, restful.MIME_JSON)
return
}
default:
}
res["apiserver"] = map[string]interface{}{"leader": "false", "error": "current pod is not leader"}
response.WriteHeaderAndJson(http.StatusServiceUnavailable, res, restful.MIME_JSON)
}

func (vca *VirtControllerApp) DefineFlags() {
flag.StringVar(&vca.host, "listen", "0.0.0.0", "Address and port where to listen on")
flag.IntVar(&vca.port, "port", 8182, "Port to listen on")
flag.StringVar(&vca.launcherImage, "launcher-image", "virt-launcher", "Shim container for containerized VMs")
flag.StringVar(&vca.migratorImage, "migrator-image", "virt-handler", "Container which orchestrates a VM migration")
flag.StringVar(&vca.socketDir, "socket-dir", "/var/run/kubevirt", "Directory where to look for sockets for cgroup detection")
flag.StringVar(&vca.ephemeralDiskDir, "ephemeral-disk-dir", "/var/run/libvirt/kubevirt-ephemeral-disk", "Base direcetory for ephemeral disk data")
leaderelectionconfig.BindFlags(&vca.LeaderElection)
flag.Parse()
}
Loading

0 comments on commit 5a76441

Please sign in to comment.