Skip to content

Commit

Permalink
Merge pull request #95 from bariqhibat/bariqhibat/subscription-active…
Browse files Browse the repository at this point in the history
…-exec-sessions

add resolver for subscribing active exec sessions through nats
  • Loading branch information
tangledbytes authored Nov 16, 2021
2 parents 10327cd + ea85795 commit 43881a0
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 0 deletions.
41 changes: 41 additions & 0 deletions meshsync/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io/ioutil"
"os"
"strings"
"time"

"github.com/google/uuid"
"github.com/layer5io/meshkit/broker"
Expand Down Expand Up @@ -43,18 +44,58 @@ func (h *Handler) processExecRequest(obj interface{}, cfg config.ListenerConfig)
if !bool(req.Stop) {
h.channelPool[id] = channels.NewStructChannel()
h.Log.Info("Starting session")

err := h.Broker.Publish("active_sessions.exec", &broker.Message{
ObjectType: broker.ActiveExecObject,
Object: h.getActiveChannels(),
})
if err != nil {
h.Log.Error(ErrGetObject(err))
}
go h.streamSession(id, req, cfg)
}
} else {
// Already running subscription
if bool(req.Stop) {
// TODO: once we have a unsubscribe functionality, need to publish message to active sessions subject
execCleanup(h, id)
}
}
}

return nil
}
func (h *Handler) processActiveExecRequest() error {
go h.streamChannelPool()

return nil
}
func (h *Handler) getActiveChannels() []*string {
activeChannels := make([]*string, 0, len(h.channelPool))
for k := range h.channelPool {
activeChannels = append(activeChannels, &k)
}

return activeChannels
}

func (h *Handler) streamChannelPool() error {
go func() {
for {
err := h.Broker.Publish("active_sessions.exec", &broker.Message{
ObjectType: broker.ActiveExecObject,
Object: h.getActiveChannels(),
})
if err != nil {
h.Log.Error(ErrGetObject(err))
}

time.Sleep(10 * time.Second)
}
}()

return nil
}

func (h *Handler) streamSession(id string, req model.ExecRequest, cfg config.ListenerConfig) {
subCh := make(chan *broker.Message)
Expand Down
7 changes: 7 additions & 0 deletions meshsync/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,13 @@ func (h *Handler) ListenToRequests() {
h.Log.Error(err)
continue
}
case broker.ActiveExecEntity:
h.Log.Info("Connecting to channel pool")
err := h.processActiveExecRequest()
if err != nil {
h.Log.Error(err)
continue
}
}
}
}

0 comments on commit 43881a0

Please sign in to comment.