Skip to content

Commit

Permalink
Merge pull request kubevirt#541 from davidvossel/console-tests
Browse files Browse the repository at this point in the history
Generic VM console test tooling
  • Loading branch information
rmohr authored Nov 3, 2017
2 parents f143606 + 9d2fb17 commit f4bcab2
Show file tree
Hide file tree
Showing 7 changed files with 205 additions and 148 deletions.
24 changes: 21 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,12 @@ ignored = ["code.google.com/p/go-charset","github.com/go-kit/kit/examples","gith

[[constraint]]
name = "k8s.io/client-go"

[[constraint]]
name = "github.com/google/goexpect"

[[constraint]]
name = "github.com/google/goterm"

[[constraint]]
name = "github.com/google/grpc"
8 changes: 8 additions & 0 deletions pkg/kubecli/kubecli.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,11 @@ func GetKubevirtClientFromFlags(master string, kubeconfig string) (KubevirtClien
func GetKubevirtClient() (KubevirtClient, error) {
return GetKubevirtClientFromFlags(master, kubeconfig)
}

func GetKubevirtClientConfig() (*rest.Config, error) {
config, err := clientcmd.BuildConfigFromFlags(master, kubeconfig)
if err != nil {
return nil, err
}
return config, nil
}
182 changes: 113 additions & 69 deletions pkg/virtctl/console/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
"os/signal"
"time"

"sync"

"github.com/gorilla/websocket"
flag "github.com/spf13/pflag"
"golang.org/x/crypto/ssh/terminal"
Expand Down Expand Up @@ -79,106 +81,148 @@ func (c *Console) Run(flags *flag.FlagSet) int {
return 1
}

// Create a round tripper with all necessary kubernetes security details
wrappedRoundTripper, err := roundTripperFromConfig(config)
err = ConnectToConsole(config, namespace, vm, device, TerminalWebsocketCallback)
if err != nil {
log.Println(err)
return 1
}
return 0
}

func ConnectToConsole(config *rest.Config, namespace string, name string, console string, callback RoundTripCallback) error {

// Create a round tripper with all necessary kubernetes security details
wrappedRoundTripper, err := RoundTripperFromConfig(config, callback)
if err != nil {
return err
}

// Create the basic console request
req, err := requestFromConfig(config, vm, namespace, device)
req, err := RequestFromConfig(config, name, namespace, console)
if err != nil {
log.Println(err)
return 1
return err
}

// Do the call and process the websocket connection with the callback
_, err = wrappedRoundTripper.RoundTrip(req)

if err != nil {
log.Println(err)
return 1
return err
}
return 0
return nil
}

func WebsocketCallback(ws *websocket.Conn, resp *http.Response, err error) error {
func NewWebsocketCallback(in io.ReadCloser, out io.WriteCloser, stopChan chan struct{}) RoundTripCallback {

if err != nil {
if resp != nil && resp.StatusCode != http.StatusOK {
buf := new(bytes.Buffer)
buf.ReadFrom(resp.Body)
return fmt.Errorf("Can't connect to console (%d): %s\n", resp.StatusCode, buf.String())
}
return fmt.Errorf("Can't connect to console: %s\n", err.Error())
}
return func(ws *websocket.Conn, resp *http.Response, err error) error {

interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
if err != nil {
if resp != nil && resp.StatusCode != http.StatusOK {
buf := new(bytes.Buffer)
buf.ReadFrom(resp.Body)
return fmt.Errorf("Can't connect to console (%d): %s\n", resp.StatusCode, buf.String())
}
return fmt.Errorf("Can't connect to console: %s\n", err.Error())
}

writeStop := make(chan struct{})
readStop := make(chan struct{})
writeStop := make(chan struct{})
readStop := make(chan struct{})

go func() {
defer close(readStop)
for {
_, message, err := ws.ReadMessage()
if err != nil {
out.Write(message)
return
}
_, err = out.Write(message)
if err == io.EOF {
return
}
}
}()

state, err := terminal.MakeRaw(int(os.Stdin.Fd()))
if err != nil {
return fmt.Errorf("Make raw terminal failed: %s", err)
}
defer terminal.Restore(int(os.Stdin.Fd()), state)
fmt.Fprint(os.Stderr, "Escape sequence is ^]")
buf := make([]byte, 1024, 1024)

go func() {
defer close(readStop)
for {
_, message, err := ws.ReadMessage()
if err != nil {
os.Stdout.Write(message)
return
}
os.Stdout.Write(message)
// Synchronize writes for final close announcements
var writeMux sync.Mutex
writeProtected := func(messageType int, data []byte) error {
writeMux.Lock()
defer writeMux.Unlock()
return ws.WriteMessage(websocket.TextMessage, data)
}
}()

buf := make([]byte, 1024, 1024)
go func() {
defer close(writeStop)
for {
n, err := os.Stdin.Read(buf)
if err != nil && err != io.EOF {
log.Println(err)
return
go func() {
defer close(writeStop)
for {
n, err := in.Read(buf)
if err != nil && err != io.EOF {
log.Println(err)
return
}

// TODO move this to the TerminalWebsocketCallback
if buf[0] == 29 {
return
}

// If there is nothing more to write and we have reached EOF, return
if n == 0 && err == io.EOF {
return
}

err = writeProtected(websocket.TextMessage, buf[0:n])
if err != nil && err != io.EOF {
log.Println(err)
return
}
}
}()

if buf[0] == 29 {
return
}
select {
case <-stopChan:
case <-readStop:
case <-writeStop:
}

err = ws.WriteMessage(websocket.TextMessage, buf[0:n])
if err != nil && err != io.EOF {
log.Println(err)
return
}
err = writeProtected(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
if err != nil {
return fmt.Errorf("Error on close announcement: %s", err.Error())
}
select {
case <-readStop:
case <-time.After(time.Second):
}
return nil
}
}

func TerminalWebsocketCallback(ws *websocket.Conn, resp *http.Response, connErr error) error {

stopChan := make(chan struct{}, 1)

go func() {
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
<-interrupt
close(stopChan)
}()

select {
case <-interrupt:
case <-readStop:
case <-writeStop:
// If there is no obvious connection error, set up the terminal
if connErr == nil {
state, err := terminal.MakeRaw(int(os.Stdin.Fd()))
if err != nil {
return fmt.Errorf("Make raw terminal failed: %s", err)
}
defer terminal.Restore(int(os.Stdin.Fd()), state)
fmt.Fprint(os.Stderr, "Escape sequence is ^]")
}

err = ws.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
if err != nil {
return fmt.Errorf("Error on close announcement: %s", err.Error())
}
select {
case <-readStop:
case <-time.After(time.Second):
}
return nil
return NewWebsocketCallback(os.Stdin, os.Stdout, stopChan)(ws, resp, connErr)
}

func requestFromConfig(config *rest.Config, vm string, namespace string, device string) (*http.Request, error) {
func RequestFromConfig(config *rest.Config, vm string, namespace string, device string) (*http.Request, error) {

u, err := url.Parse(config.Host)
if err != nil {
Expand Down Expand Up @@ -206,7 +250,7 @@ func requestFromConfig(config *rest.Config, vm string, namespace string, device
return req, nil
}

func roundTripperFromConfig(config *rest.Config) (http.RoundTripper, error) {
func RoundTripperFromConfig(config *rest.Config, callback RoundTripCallback) (http.RoundTripper, error) {

// Configure TLS
tlsConfig, err := rest.TLSConfigFor(config)
Expand All @@ -222,7 +266,7 @@ func roundTripperFromConfig(config *rest.Config) (http.RoundTripper, error) {

// Create a roundtripper which will pass in the final underlying websocket connection to a callback
rt := &WebsocketRoundTripper{
Do: WebsocketCallback,
Do: callback,
Dialer: dialer,
}

Expand Down
Loading

0 comments on commit f4bcab2

Please sign in to comment.