From 55d7ef6af579bbe88edb5c965209e12ec0eaa591 Mon Sep 17 00:00:00 2001 From: Utkarsh Srivastava Date: Thu, 21 Oct 2021 19:21:21 +0530 Subject: [PATCH 1/4] reduce logging to minimise resource hogging Signed-off-by: Utkarsh Srivastava --- internal/pipeline/handlers.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/pipeline/handlers.go b/internal/pipeline/handlers.go index 280a2e61..bc9b005b 100644 --- a/internal/pipeline/handlers.go +++ b/internal/pipeline/handlers.go @@ -29,7 +29,7 @@ func (c *ResourceWatcher) startWatching(s cache.SharedIndexInformer) { c.publishItem(objCasted, broker.Update) } else { - c.log.Info(fmt.Sprintf( + c.log.Debug(fmt.Sprintf( "skipping update event for: %s => [No changes detected]: %d %d", objCasted.GetName(), oldRV, From f350b62b956596b8f57eb8946fdc909717a547ec Mon Sep 17 00:00:00 2001 From: Utkarsh Srivastava Date: Thu, 21 Oct 2021 19:21:50 +0530 Subject: [PATCH 2/4] handle edge cases for resource cleanups Signed-off-by: Utkarsh Srivastava --- meshsync/exec.go | 66 +++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 54 insertions(+), 12 deletions(-) diff --git a/meshsync/exec.go b/meshsync/exec.go index 14552b52..9ed8aa6a 100644 --- a/meshsync/exec.go +++ b/meshsync/exec.go @@ -2,6 +2,7 @@ package meshsync import ( "bufio" + "crypto/rand" "fmt" "io" "io/ioutil" @@ -20,6 +21,9 @@ import ( "k8s.io/kubectl/pkg/util/term" ) +// KB stands for KiloByte +const KB = 1024 + func (h *Handler) processExecRequest(obj interface{}, cfg config.ListenerConfig) error { reqs := make(model.ExecRequests) d, err := utils.Marshal(obj) @@ -44,7 +48,7 @@ func (h *Handler) processExecRequest(obj interface{}, cfg config.ListenerConfig) } else { // Already running subscription if bool(req.Stop) { - h.channelPool[id].(channels.StructChannel) <- struct{}{} + execCleanup(h, id) } } } @@ -57,7 +61,8 @@ func (h *Handler) streamSession(id string, req model.ExecRequest, cfg config.Lis tstdin, putStdin := io.Pipe() stdin := ioutil.NopCloser(tstdin) getStdout, stdout := io.Pipe() - err := h.Broker.SubscribeWithChannel(id, id, subCh) + + err := h.Broker.SubscribeWithChannel(fmt.Sprintf("input.%s", id), generateID(), subCh) if err != nil { h.Log.Error(ErrExecTerminal(err)) } @@ -70,6 +75,8 @@ func (h *Handler) streamSession(id string, req model.ExecRequest, cfg config.Lis Raw: true, } sizeQueue := t.MonitorSize(t.GetSize()) + + // TTY request GoRoutine go func() { fn := func() error { request := h.staticClient.CoreV1().RESTClient().Post(). @@ -91,40 +98,50 @@ func (h *Handler) streamSession(id string, req model.ExecRequest, cfg config.Lis return err } - err = exec.Stream(remotecommand.StreamOptions{ + if err := exec.Stream(remotecommand.StreamOptions{ Stdin: stdin, Stdout: stdout, Stderr: stdout, Tty: true, TerminalSizeQueue: sizeQueue, - }) - if err != nil { + }); err != nil { return err } + + // Cleanup the resources when the streaming process terminates + execCleanup(h, id) return nil } if err := t.Safe(fn); err != nil { h.Log.Error(ErrExecTerminal(err)) - delete(h.channelPool, id) + execCleanup(h, id) + + // If the TTY fails then send the error message to the client + if err := h.Broker.Publish(id, &broker.Message{ + ObjectType: broker.ErrorObject, + Object: err.Error(), + }); err != nil { + h.Log.Error(ErrExecTerminal(err)) + } + return } }() + // TTY stdout streaming Goroutine go func() { rdr := bufio.NewReader(getStdout) for { - message, err := rdr.ReadString('#') + data := make([]byte, 1*KB) + _, err := rdr.Read(data) if err == io.EOF { - break - } - if err != nil { - h.Log.Error(ErrCopyBuffer(err)) + break // No clean up here as this can generate a false positive } err = h.Broker.Publish(id, &broker.Message{ ObjectType: broker.ExecOutputObject, - Object: message, + Object: string(data), }) if err != nil { h.Log.Error(ErrExecTerminal(err)) @@ -137,6 +154,7 @@ func (h *Handler) streamSession(id string, req model.ExecRequest, cfg config.Lis h.Log.Info("Session closed for: ", id) return } + select { case msg := <-subCh: if msg.ObjectType == broker.ExecInputObject { @@ -151,3 +169,27 @@ func (h *Handler) streamSession(id string, req model.ExecRequest, cfg config.Lis } } } + +func execCleanup(h *Handler, id string) { + ch, ok := h.channelPool[id] + if !ok { + return + } + + structChan, ok := ch.(channels.StructChannel) + if !ok { + return + } + + structChan <- struct{}{} +} + +func generateID() string { + b := make([]byte, 8) + _, err := rand.Read(b) + if err != nil { + return "" + } + + return fmt.Sprintf("%x", b) +} From 927e5cc22e4406316ac86c6d75888887c2b20467 Mon Sep 17 00:00:00 2001 From: Utkarsh Srivastava Date: Thu, 21 Oct 2021 19:23:18 +0530 Subject: [PATCH 3/4] update go.mod Signed-off-by: Utkarsh Srivastava --- go.mod | 1 + 1 file changed, 1 insertion(+) diff --git a/go.mod b/go.mod index a6ff75ad..52238bd4 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( github.com/google/uuid v1.1.1 github.com/layer5io/meshkit v0.2.14 github.com/myntra/pipeline v0.0.0-20180618182531-2babf4864ce8 + github.com/spf13/viper v1.7.1 gorm.io/gorm v1.20.10 k8s.io/api v0.18.12 k8s.io/apimachinery v0.18.12 From f851fccedb782f4e0099e96b09d31c91845f302f Mon Sep 17 00:00:00 2001 From: Utkarsh Srivastava Date: Fri, 22 Oct 2021 01:31:34 +0530 Subject: [PATCH 4/4] replace ids with UUID Signed-off-by: Utkarsh Srivastava --- meshsync/exec.go | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/meshsync/exec.go b/meshsync/exec.go index 9ed8aa6a..802cc916 100644 --- a/meshsync/exec.go +++ b/meshsync/exec.go @@ -2,13 +2,13 @@ package meshsync import ( "bufio" - "crypto/rand" "fmt" "io" "io/ioutil" "os" "strings" + "github.com/google/uuid" "github.com/layer5io/meshkit/broker" "github.com/layer5io/meshkit/utils" "github.com/layer5io/meshsync/internal/channels" @@ -185,11 +185,5 @@ func execCleanup(h *Handler, id string) { } func generateID() string { - b := make([]byte, 8) - _, err := rand.Read(b) - if err != nil { - return "" - } - - return fmt.Sprintf("%x", b) + return uuid.New().String() }