Skip to content

Commit

Permalink
Merge pull request kubevirt#351 from rmohr/flaky
Browse files Browse the repository at this point in the history
Simplify and improve Libvirt event notification
  • Loading branch information
davidvossel authored Nov 3, 2017
2 parents 52b6cc2 + b3d0aba commit cb49b67
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 48 deletions.
2 changes: 2 additions & 0 deletions pkg/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,12 @@ func (l FilteredLogger) log(skipFrames int, params ...interface{}) error {
func (l FilteredLogger) Object(obj LoggableObject) *FilteredLogger {

name := obj.GetObjectMeta().GetName()
namespace := obj.GetObjectMeta().GetNamespace()
uid := obj.GetObjectMeta().GetUID()
kind := obj.GetObjectKind().GroupVersionKind().Kind

logParams := make([]interface{}, 0)
logParams = append(logParams, "namespace", namespace)
logParams = append(logParams, "name", name)
logParams = append(logParams, "kind", kind)
logParams = append(logParams, "uid", uid)
Expand Down
7 changes: 4 additions & 3 deletions pkg/log/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,9 +306,10 @@ func TestObject(t *testing.T) {
assert(t, logEntry[4].(string) == "pos", "Logged line was not pos")
assert(t, logEntry[6].(string) == "component", "Logged line is not expected format")
assert(t, logEntry[7].(string) == "test", "Component was not logged")
assert(t, logEntry[8].(string) == "name", "Logged line did not contain object name")
assert(t, logEntry[10].(string) == "kind", "Logged line did not contain object kind")
assert(t, logEntry[12].(string) == "uid", "Logged line did not contain UUID")
assert(t, logEntry[8].(string) == "namespace", "Logged line did not contain object namespace")
assert(t, logEntry[10].(string) == "name", "Logged line did not contain object name")
assert(t, logEntry[12].(string) == "kind", "Logged line did not contain object kind")
assert(t, logEntry[14].(string) == "uid", "Logged line did not contain UUID")
tearDown()
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/virt-handler/virtwrap/api/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ const (
ReasonSaved StateChangeReason = "Saved"
ReasonFailed StateChangeReason = "Failed"
ReasonFromSnapshot StateChangeReason = "FromSnapshot"

// NoState reasons
ReasonNonExistent StateChangeReason = "NonExistent"
)

type Domain struct {
Expand Down
79 changes: 35 additions & 44 deletions pkg/virt-handler/virtwrap/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"kubevirt.io/kubevirt/pkg/log"
"kubevirt.io/kubevirt/pkg/virt-handler/virtwrap/api"
"kubevirt.io/kubevirt/pkg/virt-handler/virtwrap/cli"
"kubevirt.io/kubevirt/pkg/virt-handler/virtwrap/errors"
)

var LifeCycleTranslationMap = map[libvirt.DomainState]api.LifeCycle{
Expand Down Expand Up @@ -81,18 +82,28 @@ func newListWatchFromClient(c cli.Connection, events ...int) *cache.ListWatch {
list := api.DomainList{
Items: []api.Domain{},
}
// Whenever we gat a IsNotFound error, we just go on to the next Domain
for _, dom := range doms {
domain, err := NewDomain(dom)
if err != nil {
if errors.IsNotFound(err) {
continue
}
return nil, err
}
spec, err := NewDomainSpec(dom)
if err != nil {
if errors.IsNotFound(err) {
continue
}
return nil, err
}
domain.Spec = *spec
status, reason, err := dom.GetState()
if err != nil {
if errors.IsNotFound(err) {
continue
}
return nil, err
}
domain.SetState(convState(status), convReason(status, reason))
Expand Down Expand Up @@ -125,7 +136,7 @@ func newDomainWatcher(c cli.Connection, events ...int) (watch.Interface, error)

// check for reconnects, and emit an error to force a resync
if event == nil {
watcher.C <- watch.Event{Type: watch.Error, Object: &metav1.Status{Status: metav1.StatusFailure, Message: "Libvirt reconnected"}}
watcher.C <- newWatchEventError(fmt.Errorf("Libvirt reconnect"))
return
}
log.Log.V(3).Infof("Libvirt event %d with reason %d received", event.Event, event.Detail)
Expand Down Expand Up @@ -196,70 +207,46 @@ func callback(d cli.VirDomain, event *libvirt.DomainEventLifecycle, watcher chan
domain, err := NewDomain(d)
if err != nil {
log.Log.Reason(err).Error("Could not create the Domain.")
watcher <- watch.Event{Type: watch.Error, Object: &metav1.Status{Status: metav1.StatusFailure, Message: err.Error()}}
watcher <- newWatchEventError(err)
return
}
log.Log.Infof("event received: %v:%v", event.Event, event.Detail)
// TODO In case of other events, it might not be enough to just send state and domainxml, maybe we have to embed the event and the details too
// Think about device removal: First event is a DEFINED/UPDATED event and then we get the REMOVED event when it is done (is it that way?)
switch event.Event {

case libvirt.DOMAIN_EVENT_STOPPED,
libvirt.DOMAIN_EVENT_SHUTDOWN,
libvirt.DOMAIN_EVENT_CRASHED,
libvirt.DOMAIN_EVENT_UNDEFINED:
// We can't count on a domain xml in these cases, but let's try it
if event.Event != libvirt.DOMAIN_EVENT_UNDEFINED {
spec, err := NewDomainSpec(d)
if err != nil {

if err.(libvirt.Error).Code != libvirt.ERR_NO_DOMAIN {
log.Log.Reason(err).Error("Could not fetch the Domain specification.")
watcher <- watch.Event{Type: watch.Error, Object: &metav1.Status{Status: metav1.StatusFailure, Message: err.Error()}}
return
}
} else {
domain.Spec = *spec
}
}
status, reason, err := d.GetState()
if err != nil {
// No matter which event, try to fetch the domain xml and the state. If we get a IsNotFound error, that means that the VM was removed.
spec, err := NewDomainSpec(d)
if err != nil {

if err.(libvirt.Error).Code != libvirt.ERR_NO_DOMAIN {
log.Log.Reason(err).Error("Could not fetch the Domain state.")
watcher <- watch.Event{Type: watch.Error, Object: &metav1.Status{Status: metav1.StatusFailure, Message: err.Error()}}
return
}
domain.SetState(api.NoState, api.ReasonUnknown)
} else {
domain.SetState(convState(status), convReason(status, reason))
}
default:
spec, err := NewDomainSpec(d)
if err != nil {
if !errors.IsNotFound(err) {
log.Log.Reason(err).Error("Could not fetch the Domain specification.")
watcher <- newWatchEventError(err)
return
}
} else {
domain.Spec = *spec
status, reason, err := d.GetState()
if err != nil {
}
status, reason, err := d.GetState()
if err != nil {
if !errors.IsNotFound(err) {
log.Log.Reason(err).Error("Could not fetch the Domain state.")
watcher <- newWatchEventError(err)
return
}
domain.SetState(api.NoState, api.ReasonNonExistent)
} else {
domain.SetState(convState(status), convReason(status, reason))
}

switch event.Event {
case libvirt.DOMAIN_EVENT_DEFINED:
if libvirt.DomainEventDefinedDetailType(event.Detail) == libvirt.DOMAIN_EVENT_DEFINED_ADDED {
switch domain.Status.Reason {
case api.ReasonNonExistent:
watcher <- watch.Event{Type: watch.Deleted, Object: domain}
default:
if event.Event == libvirt.DOMAIN_EVENT_DEFINED && libvirt.DomainEventDefinedDetailType(event.Detail) == libvirt.DOMAIN_EVENT_DEFINED_ADDED {
watcher <- watch.Event{Type: watch.Added, Object: domain}
} else {
watcher <- watch.Event{Type: watch.Modified, Object: domain}
}
case libvirt.DOMAIN_EVENT_UNDEFINED:
watcher <- watch.Event{Type: watch.Deleted, Object: domain}
default:
watcher <- watch.Event{Type: watch.Modified, Object: domain}
}

}
Expand All @@ -280,3 +267,7 @@ func convReason(status libvirt.DomainState, reason int) api.StateChangeReason {
return api.ReasonUnknown
}
}

func newWatchEventError(err error) watch.Event {
return watch.Event{Type: watch.Error, Object: &metav1.Status{Status: metav1.StatusFailure, Message: err.Error()}}
}
3 changes: 2 additions & 1 deletion pkg/virt-handler/virtwrap/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ var _ = Describe("Cache", func() {
table.Entry("unknown", libvirt.DOMAIN_NOSTATE, api.NoState),
table.Entry("running", libvirt.DOMAIN_RUNNING, api.Running),
)
table.DescribeTable("should receive non delete evens of type",
table.DescribeTable("should receive non-delete events of type",
func(state libvirt.DomainState, event libvirt.DomainEventType, kubevirtState api.LifeCycle, kubeEventType watch.EventType) {
mockDomain.EXPECT().GetState().Return(state, -1, nil)
mockDomain.EXPECT().GetName().Return("test", nil)
Expand Down Expand Up @@ -112,6 +112,7 @@ var _ = Describe("Cache", func() {
)
It("should receive a delete event when a VM is undefined",
func() {
mockDomain.EXPECT().GetXMLDesc(gomock.Eq(libvirt.DOMAIN_XML_MIGRATABLE)).Return("", libvirt.Error{Code: libvirt.ERR_NO_DOMAIN})
mockDomain.EXPECT().GetState().Return(libvirt.DOMAIN_NOSTATE, -1, libvirt.Error{Code: libvirt.ERR_NO_DOMAIN})
mockDomain.EXPECT().GetName().Return("test", nil)
mockDomain.EXPECT().GetUUIDString().Return("1235", nil)
Expand Down

0 comments on commit cb49b67

Please sign in to comment.