Skip to content
This repository has been archived by the owner on Jul 16, 2020. It is now read-only.

Commit

Permalink
Merge pull request #1151 from markdryan/fix-qmp-cancel
Browse files Browse the repository at this point in the history
Fix Cancelling of QMP commands and some spelling mistakes
  • Loading branch information
Tim Pepper committed Feb 16, 2017
2 parents 45c4899 + 0d829ab commit 87911c2
Show file tree
Hide file tree
Showing 2 changed files with 183 additions and 21 deletions.
67 changes: 66 additions & 1 deletion qemu/qmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,15 @@ func (q *QMP) processQMPInput(line []byte, cmdQueue *list.List) {
}
}

func currentCommandDoneCh(cmdQueue *list.List) <-chan struct{} {
cmdEl := cmdQueue.Front()
if cmdEl == nil {
return nil
}
cmd := cmdEl.Value.(*qmpCommand)
return cmd.ctx.Done()
}

func (q *QMP) writeNextQMPCommand(cmdQueue *list.List) {
cmdEl := cmdQueue.Front()
cmd := cmdEl.Value.(*qmpCommand)
Expand Down Expand Up @@ -293,6 +302,16 @@ func failOutstandingCommands(cmdQueue *list.List) {
}
}

func (q *QMP) cancelCurrentCommand(cmdQueue *list.List) {
cmdEl := cmdQueue.Front()
cmd := cmdEl.Value.(*qmpCommand)
if cmd.resultReceived {
q.finaliseCommand(cmdEl, cmdQueue, false)
} else {
cmd.filter = nil
}
}

func (q *QMP) parseVersion(version []byte) *QMPVersion {
var qmp map[string]interface{}
err := json.Unmarshal(version, &qmp)
Expand Down Expand Up @@ -327,6 +346,39 @@ func (q *QMP) parseVersion(version []byte) *QMPVersion {
}
}

// The qemu package allows multiple QMP commands to be submitted concurrently
// from different Go routines. Unfortunately, QMP doesn't really support parallel
// commands as there is no way reliable way to associate a command response
// with a request. For this reason we need to submit our commands to
// QMP serially. The qemu package performs this serialisation using a
// queue (cmdQueue owned by mainLoop). We use a queue rather than a simple
// mutex so we can support cancelling of commands (see below) and ordered
// execution of commands, i.e., if command B is issued before command C,
// it should be executed before command C even if both commands are initially
// blocked waiting for command A to finish. This would be hard to achieve with
// a simple mutex.
//
// Cancelling is a little tricky. Commands such as ExecuteQMPCapabilities
// can be cancelled by cancelling or timing out their contexts. When a
// command is cancelled the calling function, e.g., ExecuteQMPCapabilities,
// will return but we may not be able to remove the command's entry from
// the command queue or issue the next command. There are two scenarios
// here.
//
// 1. The command has been processed by QMP, i.e., we have received a
// return or an error, but is still blocking as it is waiting for
// an event. For example, the ExecuteDeviceDel blocks until a DEVICE_DELETED
// event is received. When such a command is cancelled we can remove it
// from the queue and start issuing the next command. When the DEVICE_DELETED
// event eventually arrives it will just be ignored.
//
// 2. The command has not been processed by QMP. In this case the command
// needs to remain on the cmdQueue until the response to this command is
// received from QMP. During this time no new commands can be issued. When the
// response is received, it is discarded (as no one is interested in the result
// any more), the entry is removed from the cmdQueue and we can proceed to
// execute the next command.

func (q *QMP) mainLoop() {
cmdQueue := list.New().Init()
fromVMCh := make(chan []byte)
Expand All @@ -343,6 +395,7 @@ func (q *QMP) mainLoop() {
}()

version := []byte{}
var cmdDoneCh <-chan struct{}

DONE:
for {
Expand All @@ -359,6 +412,7 @@ DONE:
}
if cmdQueue.Len() >= 1 {
q.writeNextQMPCommand(cmdQueue)
cmdDoneCh = currentCommandDoneCh(cmdQueue)
}
break DONE
}
Expand All @@ -373,14 +427,25 @@ DONE:
return
}
_ = cmdQueue.PushBack(&cmd)
if cmdQueue.Len() >= 1 {

// We only want to execute the new cmd if there
// are no other commands pending. If there are
// commands pending our new command will get
// run when the pending commands complete.

if cmdQueue.Len() == 1 {
q.writeNextQMPCommand(cmdQueue)
cmdDoneCh = currentCommandDoneCh(cmdQueue)
}
case line, ok := <-fromVMCh:
if !ok {
return
}
q.processQMPInput(line, cmdQueue)
cmdDoneCh = currentCommandDoneCh(cmdQueue)
case <-cmdDoneCh:
q.cancelCurrentCommand(cmdQueue)
cmdDoneCh = currentCommandDoneCh(cmdQueue)
}
}
}
Expand Down
137 changes: 117 additions & 20 deletions qemu/qmp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (b *qmpTestCommandBuffer) startEventLoop(wg *sync.WaitGroup) {
}()
}

func (b *qmpTestCommandBuffer) AddCommmand(name string, args map[string]interface{},
func (b *qmpTestCommandBuffer) AddCommand(name string, args map[string]interface{},
result string, data map[string]interface{}) {
b.cmds = append(b.cmds, qmpTestCommand{name, args})
if data == nil {
Expand Down Expand Up @@ -186,7 +186,9 @@ func (b *qmpTestCommandBuffer) Read(p []byte) (n int, err error) {

func (b *qmpTestCommandBuffer) Write(p []byte) (int, error) {
var cmdJSON map[string]interface{}
if b.currentCmd >= len(b.cmds) {
currentCmd := b.currentCmd
b.currentCmd++
if currentCmd >= len(b.cmds) {
b.t.Fatalf("Unexpected command")
}
err := json.Unmarshal(p, &cmdJSON)
Expand All @@ -195,14 +197,14 @@ func (b *qmpTestCommandBuffer) Write(p []byte) (int, error) {
}
cmdName := cmdJSON["execute"]
gotCmdName := cmdName.(string)
result := b.results[b.currentCmd].result
if gotCmdName != b.cmds[b.currentCmd].name {
result := b.results[currentCmd].result
if gotCmdName != b.cmds[currentCmd].name {
b.t.Errorf("Unexpected command. Expected %s found %s",
b.cmds[b.currentCmd].name, gotCmdName)
b.cmds[currentCmd].name, gotCmdName)
result = "error"
}
resultMap := make(map[string]interface{})
resultMap[result] = b.results[b.currentCmd].data
resultMap[result] = b.results[currentCmd].data
encodedRes, err := json.Marshal(&resultMap)
if err != nil {
b.t.Errorf("Unable to encode result: %v", err)
Expand Down Expand Up @@ -263,7 +265,7 @@ func TestQMPCapabilities(t *testing.T) {
connectedCh := make(chan *QMPVersion)
disconnectedCh := make(chan struct{})
buf := newQMPTestCommandBuffer(t)
buf.AddCommmand("qmp_capabilities", nil, "return", nil)
buf.AddCommand("qmp_capabilities", nil, "return", nil)
cfg := QMPConfig{Logger: qmpTestLogger{}}
q := startQMPLoop(buf, cfg, connectedCh, disconnectedCh)
checkVersion(t, connectedCh)
Expand All @@ -286,7 +288,7 @@ func TestQMPStop(t *testing.T) {
connectedCh := make(chan *QMPVersion)
disconnectedCh := make(chan struct{})
buf := newQMPTestCommandBuffer(t)
buf.AddCommmand("stop", nil, "return", nil)
buf.AddCommand("stop", nil, "return", nil)
cfg := QMPConfig{Logger: qmpTestLogger{}}
q := startQMPLoop(buf, cfg, connectedCh, disconnectedCh)
checkVersion(t, connectedCh)
Expand All @@ -309,7 +311,7 @@ func TestQMPCont(t *testing.T) {
connectedCh := make(chan *QMPVersion)
disconnectedCh := make(chan struct{})
buf := newQMPTestCommandBuffer(t)
buf.AddCommmand("cont", nil, "return", nil)
buf.AddCommand("cont", nil, "return", nil)
cfg := QMPConfig{Logger: qmpTestLogger{}}
q := startQMPLoop(buf, cfg, connectedCh, disconnectedCh)
checkVersion(t, connectedCh)
Expand All @@ -331,7 +333,7 @@ func TestQMPQuit(t *testing.T) {
connectedCh := make(chan *QMPVersion)
disconnectedCh := make(chan struct{})
buf := newQMPTestCommandBuffer(t)
buf.AddCommmand("quit", nil, "return", nil)
buf.AddCommand("quit", nil, "return", nil)
cfg := QMPConfig{Logger: qmpTestLogger{}}
q := startQMPLoop(buf, cfg, connectedCh, disconnectedCh)
checkVersion(t, connectedCh)
Expand All @@ -353,7 +355,7 @@ func TestQMPBlockdevAdd(t *testing.T) {
connectedCh := make(chan *QMPVersion)
disconnectedCh := make(chan struct{})
buf := newQMPTestCommandBuffer(t)
buf.AddCommmand("blockdev-add", nil, "return", nil)
buf.AddCommand("blockdev-add", nil, "return", nil)
cfg := QMPConfig{Logger: qmpTestLogger{}}
q := startQMPLoop(buf, cfg, connectedCh, disconnectedCh)
checkVersion(t, connectedCh)
Expand All @@ -376,7 +378,7 @@ func TestQMPDeviceAdd(t *testing.T) {
connectedCh := make(chan *QMPVersion)
disconnectedCh := make(chan struct{})
buf := newQMPTestCommandBuffer(t)
buf.AddCommmand("device_add", nil, "return", nil)
buf.AddCommand("device_add", nil, "return", nil)
cfg := QMPConfig{Logger: qmpTestLogger{}}
q := startQMPLoop(buf, cfg, connectedCh, disconnectedCh)
checkVersion(t, connectedCh)
Expand All @@ -401,7 +403,7 @@ func TestQMPXBlockdevDel(t *testing.T) {
connectedCh := make(chan *QMPVersion)
disconnectedCh := make(chan struct{})
buf := newQMPTestCommandBuffer(t)
buf.AddCommmand("x-blockdev-del", nil, "return", nil)
buf.AddCommand("x-blockdev-del", nil, "return", nil)
cfg := QMPConfig{Logger: qmpTestLogger{}}
q := startQMPLoop(buf, cfg, connectedCh, disconnectedCh)
checkVersion(t, connectedCh)
Expand Down Expand Up @@ -435,7 +437,7 @@ func TestQMPDeviceDel(t *testing.T) {
connectedCh := make(chan *QMPVersion)
disconnectedCh := make(chan struct{})
buf := newQMPTestCommandBuffer(t)
buf.AddCommmand("device_del", nil, "return", nil)
buf.AddCommand("device_del", nil, "return", nil)
buf.AddEvent("DEVICE_DELETED", time.Millisecond*200,
map[string]interface{}{
"path": path,
Expand Down Expand Up @@ -500,7 +502,7 @@ func TestQMPDeviceDelTimeout(t *testing.T) {
connectedCh := make(chan *QMPVersion)
disconnectedCh := make(chan struct{})
buf := newQMPTestCommandBuffer(t)
buf.AddCommmand("device_del", nil, "return", nil)
buf.AddCommand("device_del", nil, "return", nil)
cfg := QMPConfig{Logger: qmpTestLogger{}}
q := startQMPLoop(buf, cfg, connectedCh, disconnectedCh)
checkVersion(t, connectedCh)
Expand All @@ -527,8 +529,8 @@ func TestQMPCancel(t *testing.T) {
connectedCh := make(chan *QMPVersion)
disconnectedCh := make(chan struct{})
buf := newQMPTestCommandBuffer(t)
buf.AddCommmand("qmp_capabilities", nil, "return", nil)
buf.AddCommmand("qmp_capabilities", nil, "return", nil)
buf.AddCommand("qmp_capabilities", nil, "return", nil)
buf.AddCommand("qmp_capabilities", nil, "return", nil)
cfg := QMPConfig{Logger: qmpTestLogger{}}
q := startQMPLoop(buf, cfg, connectedCh, disconnectedCh)
checkVersion(t, connectedCh)
Expand Down Expand Up @@ -562,7 +564,7 @@ func TestQMPSystemPowerdown(t *testing.T) {
connectedCh := make(chan *QMPVersion)
disconnectedCh := make(chan struct{})
buf := newQMPTestCommandBuffer(t)
buf.AddCommmand("system_powerdown", nil, "return", nil)
buf.AddCommand("system_powerdown", nil, "return", nil)
buf.AddEvent("SHUTDOWN", time.Millisecond*100,
nil,
map[string]interface{}{
Expand All @@ -582,6 +584,101 @@ func TestQMPSystemPowerdown(t *testing.T) {
wg.Wait()
}

// Checks that event commands can be cancelled.
//
// We start a QMPLoop, send the system_powerdown command. This command
// will time out after 1 second as the SHUTDOWN event never arrives.
// We then send a quit command to terminate the session.
//
// The system_powerdown command should be correctly sent but should block
// waiting for the SHUTDOWN event and should be successfully cancelled.
// The quit command should be successfully received and the QMP loop should
// exit gracefully.
func TestQMPEventedCommandCancel(t *testing.T) {
var wg sync.WaitGroup
connectedCh := make(chan *QMPVersion)
disconnectedCh := make(chan struct{})
buf := newQMPTestCommandBuffer(t)
buf.AddCommand("system_powerdown", nil, "return", nil)
buf.AddCommand("quit", nil, "return", nil)
cfg := QMPConfig{Logger: qmpTestLogger{}}
q := startQMPLoop(buf, cfg, connectedCh, disconnectedCh)
checkVersion(t, connectedCh)
buf.startEventLoop(&wg)
ctx, cancelFN := context.WithTimeout(context.Background(), time.Second)
err := q.ExecuteSystemPowerdown(ctx)
cancelFN()
if err == nil {
t.Fatalf("Expected SystemPowerdown to fail")
}
err = q.ExecuteQuit(context.Background())
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
q.Shutdown()
<-disconnectedCh
wg.Wait()
}

// Checks that queued commands execute after an evented command is cancelled.
//
// This test is similar to the previous test with the exception that it
// tries to ensure that a second command is placed on the QMP structure's
// command queue before the evented command is cancelled. This allows us
// to test a slightly different use case. We start a QMPLoop, send the
// system_powerdown command. We do this by sending the command directly
// down the QMP.cmdCh rather than calling a higher level function as this
// allows us to ensure that we have another command queued before we
// timeout the first command. We then send a qmp_capabilities command and
// then we shutdown.
//
// The system_powerdown command should be correctly sent but should block
// waiting for the SHUTDOWN event and should be successfully cancelled.
// The query_capabilities command should be successfully received and the
// QMP loop should exit gracefully.
func TestQMPEventedCommandCancelConcurrent(t *testing.T) {
var wg sync.WaitGroup
connectedCh := make(chan *QMPVersion)
disconnectedCh := make(chan struct{})
buf := newQMPTestCommandBuffer(t)

buf.AddCommand("system_powerdown", nil, "error", nil)
buf.AddCommand("qmp_capabilities", nil, "return", nil)

cfg := QMPConfig{Logger: qmpTestLogger{}}
q := startQMPLoop(buf, cfg, connectedCh, disconnectedCh)
checkVersion(t, connectedCh)
buf.startEventLoop(&wg)

resCh := make(chan qmpResult)
ctx, cancelFn := context.WithTimeout(context.Background(), time.Second)
q.cmdCh <- qmpCommand{
ctx: ctx,
res: resCh,
name: "system_powerdown",
filter: &qmpEventFilter{
eventName: "SHUTDOWN",
},
}

var cmdWg sync.WaitGroup
cmdWg.Add(1)
go func() {
err := q.ExecuteQMPCapabilities(context.Background())
if err != nil {
t.Errorf("Unexpected error %v", err)
}
cmdWg.Done()
}()

<-resCh
cancelFn()
cmdWg.Wait()
q.Shutdown()
<-disconnectedCh
wg.Wait()
}

// Checks that events can be received and parsed.
//
// Two events are provisioned and the QMPLoop is started with an valid eventCh.
Expand Down Expand Up @@ -683,13 +780,13 @@ func TestQMPLostLoop(t *testing.T) {
q := startQMPLoop(buf, cfg, connectedCh, disconnectedCh)
checkVersion(t, connectedCh)
close(buf.forceFail)
buf.AddCommmand("qmp_capabilities", nil, "return", nil)
buf.AddCommand("qmp_capabilities", nil, "return", nil)
err := q.ExecuteQMPCapabilities(context.Background())
if err == nil {
t.Error("Expected executeQMPCapabilities to fail")
}
<-disconnectedCh
buf.AddCommmand("qmp_capabilities", nil, "return", nil)
buf.AddCommand("qmp_capabilities", nil, "return", nil)
err = q.ExecuteQMPCapabilities(context.Background())
if err == nil {
t.Error("Expected executeQMPCapabilities to fail")
Expand Down

0 comments on commit 87911c2

Please sign in to comment.