Skip to content

Commit

Permalink
Merge pull request kubevirt#1840 from fromanirh/vm-metrics-collector
Browse files Browse the repository at this point in the history
Vm metrics collector
  • Loading branch information
rmohr authored Feb 15, 2019
2 parents 1fee7da + 5c4c53c commit 363cf5c
Show file tree
Hide file tree
Showing 24 changed files with 1,678 additions and 3 deletions.
4 changes: 4 additions & 0 deletions cmd/virt-handler/virt-handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"kubevirt.io/kubevirt/pkg/log"
_ "kubevirt.io/kubevirt/pkg/monitoring/client/prometheus" // import for prometheus metrics
_ "kubevirt.io/kubevirt/pkg/monitoring/reflector/prometheus" // import for prometheus metrics
promvm "kubevirt.io/kubevirt/pkg/monitoring/vms/prometheus" // import for prometheus metrics
_ "kubevirt.io/kubevirt/pkg/monitoring/workqueue/prometheus" // import for prometheus metrics
"kubevirt.io/kubevirt/pkg/service"
"kubevirt.io/kubevirt/pkg/util"
Expand Down Expand Up @@ -182,6 +183,9 @@ func (app *virtHandlerApp) Run() {
if err != nil {
glog.Fatalf("unable to generate certificates: %v", err)
}

promvm.SetupCollector(app.VirtShareDir)

// Bootstrapping. From here on the startup order matters
stop := make(chan struct{})
defer close(stop)
Expand Down
77 changes: 77 additions & 0 deletions docs/devel/vm-monitoring.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
Monitoring VMs
==============

Summary
-------

VM metrics are collected through libvirt APIs and reported using a prometheus endpoint.
The metrics are scraped when the endpoint is queried, implementing the [Collector](https://godoc.org/github.com/prometheus/client_golang/prometheus#Collector) interface. There is no caching.
The collecting code is robust with respect to slow or unresponsive VMs.
Each VM on a given node is queried by a goroutine.
At any given time no more than one single goroutine can be querying the VM for metrics.

Design
------

The requirements for the collecting code are (not in priority order)
1. be as fast as possible
2. be as lightweight as possible
3. deal gracefully with unresponsive source (more on that below)

While the first two bullet points are easy to understand, the third needs some more context

Unresponsive metrics sources
----------------------------

When we use QEMU on shared storage, like we want to do with Kubevirt, any network issue could cause
one or more storage operations to delay, or to be lost entirely.

In that case, the userspace process that requested the operation can end up in the D state,
and become unresponsive, and unkillable.

A robust monitoring application must deal with the fact that
the libvirt API can block for a long time, or forever. This is not an issue or a bug of one specific
API, but it is rather a byproduct of how libvirt and QEMU interact.

Whenever we query more than one VM, we should take care to avoid that a blocked VM prevent other,
well behaving VMs to be queried. IOW, we don't want one rogue VM to disrupt well-behaving VMs.
Unfortunately, any way we enumerate VMs, either implicitly, using the libvirt bulk stats API,
or explicitly, listing all libvirt domains and query each one in turn, we may unpredictably encounter
unresponsive VMs.


Dealing with unresponsive metrics source
----------------------------------------

From a monitoring perspective, _any_ monitoring-related libvirt call could be unresponsive any given time.
To deal with that:
1. the monitoring of each VM is done in a separate goroutine. Goroutines are cheap, so we don't recycle them (e.g. nothing like a goroutine pool).
Each monitoring goroutine ends once it collected the metrics.
2. we don't want monitoring goroutines to pile up on unresponsive VMs. To avoid that we track the business of metrics source. No more than one goroutine may
query a VM for metrics at any given time. If the VM is unresponsive, no more than a goroutine waits for it. This also act as simple throttling mechanism.
3. it is possible that a libvirt API call _eventually_ unblocks. Thus the monitoring goroutine must take care of checking that the data it is going to submit
is still fresh, and avoid overriding fresh data with stale data.


Appendix: high level recap: libvirt client, libvirt daemon, QEMU
-----------------------------------------------------------------

Let's review how the client application (anything using the pkg.monitoring/vms/processes/prometheus package),
the libvirtd daemon and the QEMU processes interact with each other.

The libvirt daemon talks to QEMU using the JSON QMP protocol over UNIX domain sockets. This happens in the virt-launcher pod.
The details of the protocol are not important now, but the key part is that the protocol
is a simple request/response, meaning that libvirtd must serialize all the interactions
with the QEMU monitor, and must protects its endpoint with a lock.
No out of order request/responses are possible (e.g. no pipelining or async replies).
This means that if for any reason a QMP request could not be completed, any other caller
trying to access the QEMU monitor will block until the blocked caller returns.

To retrieve some key informations, most notably about the block device state or the balloon
device state, the libvirtd daemon *must* use the QMP protocol.

The QEMU core, including the handling of the QMP protocol, is single-threaded.
All the above combined make it possible for a client to block forever waiting for a QMP
request, if QEMU itself is blocked. The most likely cause of block is I/O, and this is especially
true considering how QEMU is used.

107 changes: 107 additions & 0 deletions pkg/monitoring/vms/prometheus/collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* This file is part of the KubeVirt project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Copyright 2018 Red Hat, Inc.
*
*/

package prometheus

import (
"sync"
"time"

"kubevirt.io/kubevirt/pkg/log"
)

const collectionTimeout time.Duration = 10 * time.Second // "long enough", crude heuristic

type metricsScraper interface {
Scrape(key string)
}

type concurrentCollector struct {
lock sync.Mutex
busyKeys map[string]bool
}

func NewConcurrentCollector() *concurrentCollector {
return &concurrentCollector{
busyKeys: make(map[string]bool),
}
}

func (cc *concurrentCollector) Collect(keys []string, scraper metricsScraper, timeout time.Duration) ([]string, bool) {
log.Log.V(3).Infof("Collecting VM metrics from %d sources", len(keys))
var busyScrapers sync.WaitGroup

skipped := []string{}
for _, key := range keys {
reserved := cc.reserveKey(key)
if !reserved {
log.Log.Warningf("Source %s busy from a previous collection, skipped", key)
skipped = append(skipped, key)
continue
}

log.Log.V(4).Infof("Source %s responsive, scraping", key)
busyScrapers.Add(1)
go cc.collectFromSource(key, scraper, &busyScrapers)
}

completed := true
c := make(chan struct{})
go func() {
busyScrapers.Wait()
c <- struct{}{}
}()
select {
case <-c:
log.Log.V(3).Infof("Collection successful")
case <-time.After(timeout):
log.Log.Warning("Collection timeout")
completed = false
}

log.Log.V(2).Infof("Collection completed")

return skipped, completed
}

func (cc *concurrentCollector) collectFromSource(key string, scraper metricsScraper, wg *sync.WaitGroup) {
defer wg.Done()
defer cc.releaseKey(key)

log.Log.V(4).Infof("Getting stats from source %s", key)
scraper.Scrape(key)
log.Log.V(4).Infof("Updated stats from source %s", key)
}

func (cc *concurrentCollector) reserveKey(key string) bool {
cc.lock.Lock()
defer cc.lock.Unlock()
busy := cc.busyKeys[key]
if busy {
return false
}
cc.busyKeys[key] = true
return true
}

func (cc *concurrentCollector) releaseKey(key string) {
cc.lock.Lock()
defer cc.lock.Unlock()
cc.busyKeys[key] = false
}
13 changes: 13 additions & 0 deletions pkg/monitoring/vms/prometheus/collector_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package prometheus_test

import (
"testing"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)

func TestCollector(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Collector Suite")
}
151 changes: 151 additions & 0 deletions pkg/monitoring/vms/prometheus/collector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* This file is part of the KubeVirt project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Copyright 2018 Red Hat, Inc.
*
*/

package prometheus

import (
"time"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"

"kubevirt.io/kubevirt/pkg/log"
)

var _ = Describe("Collector", func() {
log.Log.SetIOWriter(GinkgoWriter)

Context("on running source", func() {
It("should scrape all the sources", func() {
keys := []string{"a", "b", "c"} // keep sorted
fs := newFakeScraper(len(keys))
cc := NewConcurrentCollector()

skipped, completed := cc.Collect(keys, fs, 1*time.Second)

Expect(len(skipped)).To(Equal(0))
Expect(completed).To(BeTrue())
})
})

Context("on blocked source", func() {
It("should gather the available data", func() {
keys := []string{"a", "b", "c"} // keep sorted
fs := newFakeScraper(len(keys))
fs.Block("a")
cc := NewConcurrentCollector()

skipped, completed := cc.Collect(keys, fs, 1*time.Second)

Expect(len(skipped)).To(Equal(0))
Expect(completed).To(BeFalse())
})

It("should skip it on later collections", func() {
keys := []string{"a", "b", "c"} // keep sorted
fs := newFakeScraper(len(keys))
fs.Block("a")
cc := NewConcurrentCollector()

By("Doing a first collection")
skipped, completed := cc.Collect(keys, fs, 1*time.Second)
// first collection is not aware of the blocked source
Expect(len(skipped)).To(Equal(0))
Expect(completed).To(BeFalse())

By("Collecting again with a blocked source")
skipped, completed = cc.Collect(keys, fs, 1*time.Second)
// second collection is aware of the blocked source
Expect(len(skipped)).To(Equal(1))
Expect(skipped[0]).To(Equal("a"))
Expect(completed).To(BeTrue())

})

It("should resume scraping when unblocks", func() {
keys := []string{"a", "b", "c"} // keep sorted
fs := newFakeScraper(len(keys))
fs.Block("b")
cc := NewConcurrentCollector()

By("Doing a first collection")
skipped, completed := cc.Collect(keys, fs, 1*time.Second)
// first collection is not aware of the blocked source
Expect(len(skipped)).To(Equal(0))
Expect(completed).To(BeFalse())

By("Collecting again with a blocked source")
skipped, completed = cc.Collect(keys, fs, 1*time.Second)
// second collection is aware of the blocked source
Expect(len(skipped)).To(Equal(1))
Expect(skipped[0]).To(Equal("b"))
Expect(completed).To(BeTrue())

By("Unblocking the stuck source")
ready := fs.Wakeup("b")
<-ready
fs.Unblock("b")

By("Restored a clean state")
skipped, completed = cc.Collect(keys, fs, 1*time.Second)
Expect(len(skipped)).To(Equal(0))
Expect(completed).To(BeTrue())
})
})
})

type fakeScraper struct {
ready map[string]chan bool
blocked map[string]chan bool
}

func newFakeScraper(maxKeys int) *fakeScraper {
return &fakeScraper{
blocked: make(map[string]chan bool),
ready: make(map[string]chan bool),
}
}

// Unblock makes sure Scrape() WILL block. Call it when no goroutines are running (e.g. when concurrentCollector is known idle.)
func (fs *fakeScraper) Block(key string) {
fs.blocked[key] = make(chan bool)
fs.ready[key] = make(chan bool)
}

// Unblock makes sure Scrape() won't block. Call it when no goroutines are running (e.g. when concurrentCollector is known idle.)
func (fs *fakeScraper) Unblock(key string) {
delete(fs.blocked, key)
delete(fs.ready, key)
}

// Wakeup awakens a blocked Scrape(). Can be called when concurrentCollector is running.
func (fs *fakeScraper) Wakeup(key string) chan bool {
if c, ok := fs.blocked[key]; ok {
c <- true
return fs.ready[key]
}
return nil
}

func (fs *fakeScraper) Scrape(key string) {
if c, ok := fs.blocked[key]; ok {
<-c
fs.ready[key] <- true
}
}
Loading

0 comments on commit 363cf5c

Please sign in to comment.