Skip to content

Commit

Permalink
Merge pull request #93 from utkarsh-pro/utkarsh-pro/fix/interactive-t…
Browse files Browse the repository at this point in the history
…erminal

Fix exec sessions
  • Loading branch information
leecalcote authored Oct 21, 2021
2 parents f2855b5 + f851fcc commit 10327cd
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 13 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
github.com/google/uuid v1.1.1
github.com/layer5io/meshkit v0.2.14
github.com/myntra/pipeline v0.0.0-20180618182531-2babf4864ce8
github.com/spf13/viper v1.7.1
gorm.io/gorm v1.20.10
k8s.io/api v0.18.12
k8s.io/apimachinery v0.18.12
Expand Down
2 changes: 1 addition & 1 deletion internal/pipeline/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (c *ResourceWatcher) startWatching(s cache.SharedIndexInformer) {

c.publishItem(objCasted, broker.Update)
} else {
c.log.Info(fmt.Sprintf(
c.log.Debug(fmt.Sprintf(
"skipping update event for: %s => [No changes detected]: %d %d",
objCasted.GetName(),
oldRV,
Expand Down
60 changes: 48 additions & 12 deletions meshsync/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"strings"

"github.com/google/uuid"
"github.com/layer5io/meshkit/broker"
"github.com/layer5io/meshkit/utils"
"github.com/layer5io/meshsync/internal/channels"
Expand All @@ -20,6 +21,9 @@ import (
"k8s.io/kubectl/pkg/util/term"
)

// KB stands for KiloByte
const KB = 1024

func (h *Handler) processExecRequest(obj interface{}, cfg config.ListenerConfig) error {
reqs := make(model.ExecRequests)
d, err := utils.Marshal(obj)
Expand All @@ -44,7 +48,7 @@ func (h *Handler) processExecRequest(obj interface{}, cfg config.ListenerConfig)
} else {
// Already running subscription
if bool(req.Stop) {
h.channelPool[id].(channels.StructChannel) <- struct{}{}
execCleanup(h, id)
}
}
}
Expand All @@ -57,7 +61,8 @@ func (h *Handler) streamSession(id string, req model.ExecRequest, cfg config.Lis
tstdin, putStdin := io.Pipe()
stdin := ioutil.NopCloser(tstdin)
getStdout, stdout := io.Pipe()
err := h.Broker.SubscribeWithChannel(id, id, subCh)

err := h.Broker.SubscribeWithChannel(fmt.Sprintf("input.%s", id), generateID(), subCh)
if err != nil {
h.Log.Error(ErrExecTerminal(err))
}
Expand All @@ -70,6 +75,8 @@ func (h *Handler) streamSession(id string, req model.ExecRequest, cfg config.Lis
Raw: true,
}
sizeQueue := t.MonitorSize(t.GetSize())

// TTY request GoRoutine
go func() {
fn := func() error {
request := h.staticClient.CoreV1().RESTClient().Post().
Expand All @@ -91,40 +98,50 @@ func (h *Handler) streamSession(id string, req model.ExecRequest, cfg config.Lis
return err
}

err = exec.Stream(remotecommand.StreamOptions{
if err := exec.Stream(remotecommand.StreamOptions{
Stdin: stdin,
Stdout: stdout,
Stderr: stdout,
Tty: true,
TerminalSizeQueue: sizeQueue,
})
if err != nil {
}); err != nil {
return err
}

// Cleanup the resources when the streaming process terminates
execCleanup(h, id)
return nil
}

if err := t.Safe(fn); err != nil {
h.Log.Error(ErrExecTerminal(err))
delete(h.channelPool, id)
execCleanup(h, id)

// If the TTY fails then send the error message to the client
if err := h.Broker.Publish(id, &broker.Message{
ObjectType: broker.ErrorObject,
Object: err.Error(),
}); err != nil {
h.Log.Error(ErrExecTerminal(err))
}

return
}
}()

// TTY stdout streaming Goroutine
go func() {
rdr := bufio.NewReader(getStdout)
for {
message, err := rdr.ReadString('#')
data := make([]byte, 1*KB)
_, err := rdr.Read(data)
if err == io.EOF {
break
}
if err != nil {
h.Log.Error(ErrCopyBuffer(err))
break // No clean up here as this can generate a false positive
}

err = h.Broker.Publish(id, &broker.Message{
ObjectType: broker.ExecOutputObject,
Object: message,
Object: string(data),
})
if err != nil {
h.Log.Error(ErrExecTerminal(err))
Expand All @@ -137,6 +154,7 @@ func (h *Handler) streamSession(id string, req model.ExecRequest, cfg config.Lis
h.Log.Info("Session closed for: ", id)
return
}

select {
case msg := <-subCh:
if msg.ObjectType == broker.ExecInputObject {
Expand All @@ -151,3 +169,21 @@ func (h *Handler) streamSession(id string, req model.ExecRequest, cfg config.Lis
}
}
}

func execCleanup(h *Handler, id string) {
ch, ok := h.channelPool[id]
if !ok {
return
}

structChan, ok := ch.(channels.StructChannel)
if !ok {
return
}

structChan <- struct{}{}
}

func generateID() string {
return uuid.New().String()
}

0 comments on commit 10327cd

Please sign in to comment.