Skip to content

Commit

Permalink
MM-56188: Add presence indicator to load test (#678)
Browse files Browse the repository at this point in the history
* MM-56188: Add presence indicator to load test

We send the message via the websocket whenever
the client state changes.

https://mattermost.atlassian.net/browse/MM-56188

* using FF and also handling both RHS and global thread

* use channel to pass messages
  • Loading branch information
agnivade authored Feb 23, 2024
1 parent 89471d6 commit 51e020f
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 11 deletions.
29 changes: 29 additions & 0 deletions loadtest/control/simulcontroller/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/mattermost/mattermost-load-test-ng/loadtest/user"

"github.com/mattermost/mattermost/server/public/model"
"github.com/mattermost/mattermost/server/public/shared/mlog"
)

type userAction struct {
Expand Down Expand Up @@ -293,6 +294,12 @@ func (c *SimulController) switchTeam(u user.User) control.UserActionResponse {
return control.UserActionResponse{Err: control.NewUserError(err)}
}

if c.user.Store().FeatureFlags()["WebSocketEventScope"] {
if err := u.UpdateActiveTeam(team.Id); err != nil {
mlog.Warn("Failed to update active team", mlog.String("team_id", team.Id))
}
}

// We should probably keep track of the last channel viewed in the team but
// for now we can simplify and randomly pick one each time.

Expand Down Expand Up @@ -530,6 +537,12 @@ func switchChannel(u user.User) control.UserActionResponse {
return control.UserActionResponse{Err: control.NewUserError(err)}
}

if u.Store().FeatureFlags()["WebSocketEventScope"] {
if err := u.UpdateActiveChannel(channel.Id); err != nil {
mlog.Warn("Failed to update active channel", mlog.String("channel_id", channel.Id))
}
}

return control.UserActionResponse{Info: fmt.Sprintf("switched to channel %s", channel.Id)}
}

Expand Down Expand Up @@ -1414,6 +1427,15 @@ func (c *SimulController) initialJoinTeam(u user.User) control.UserActionRespons
return c.joinTeam(c.user)
}

if c.user.Store().FeatureFlags()["WebSocketEventScope"] {
// Setting the active thread to empty to allow the optimization to kick in early.
// We don't do this in the webapp to keep the code simple, but it's okay to do this in load-test
// to magnify the boost.
if err := u.UpdateActiveThread(""); err != nil {
mlog.Warn("Failed to update active thread", mlog.String("channel_id", ""))
}
}

return resp
}

Expand Down Expand Up @@ -1732,6 +1754,13 @@ func (c *SimulController) viewThread(u user.User) control.UserActionResponse {
case <-time.After(idleTime):
}
}

if c.user.Store().FeatureFlags()["WebSocketEventScope"] {
if err := u.UpdateActiveThread(thread.Post.ChannelId); err != nil {
mlog.Warn("Failed to update active thread", mlog.String("channel_id", thread.Post.ChannelId))
}
}

return control.UserActionResponse{Info: fmt.Sprintf("viewedthread %s", thread.PostId)}
}

Expand Down
5 changes: 5 additions & 0 deletions loadtest/user/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ type User interface {
// SendTypingEvent will push a user_typing event out to all connected users
// who are in the specified channel.
SendTypingEvent(channelId, parentId string) error
// These methods are to send info to the server on
// the state of the client.
UpdateActiveChannel(channelId string) error
UpdateActiveThread(channelId string) error
UpdateActiveTeam(teamId string) error

//server
// GetConfig fetches and stores the server's configuration.
Expand Down
19 changes: 16 additions & 3 deletions loadtest/user/userentity/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type UserEntity struct {
wsClosed chan struct{}
wsErrorChan chan error
wsEventChan chan *model.WebSocketEvent
wsTyping chan userTypingMsg
dataChan chan any
connected bool
config Config
metrics *performance.UserEntityMetrics
Expand Down Expand Up @@ -64,6 +64,19 @@ type userTypingMsg struct {
parentId string
}

type channelPresenceMsg struct {
channelId string
}

type threadPresenceMsg struct {
channelId string
threadView bool
}

type teamPresenceMsg struct {
teamId string
}

type ueTransport struct {
transport http.RoundTripper
ue *UserEntity
Expand Down Expand Up @@ -140,7 +153,7 @@ func (ue *UserEntity) Connect() (<-chan error, error) {
}

ue.wsEventChan = make(chan *model.WebSocketEvent)
ue.wsTyping = make(chan userTypingMsg)
ue.dataChan = make(chan any)
go ue.listen(ue.wsErrorChan)
ue.connected = true
return ue.wsErrorChan, nil
Expand Down Expand Up @@ -175,7 +188,7 @@ func (ue *UserEntity) Disconnect() error {
<-ue.wsClosed

close(ue.wsEventChan)
close(ue.wsTyping)
close(ue.dataChan)
close(ue.wsErrorChan)
ue.connected = false
return nil
Expand Down
69 changes: 61 additions & 8 deletions loadtest/user/userentity/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ start:
connectionFailCount++
select {
// Draining the channel to avoid blocking the sender.
case <-ue.wsTyping:
case <-ue.dataChan:
case <-ue.wsClosing:
// Explicit disconnect. Return.
close(ue.wsClosed)
Expand Down Expand Up @@ -189,13 +189,28 @@ start:
// Explicit disconnect. Return.
close(ue.wsClosed)
return
case msg, ok := <-ue.wsTyping:
case msg, ok := <-ue.dataChan:
if !ok {
chanClosed = true
break
}
if err := client.UserTyping(msg.channelId, msg.parentId); err != nil {
errChan <- fmt.Errorf("userentity: error in client.UserTyping: %w", err)
switch v := msg.(type) {
case userTypingMsg:
if err := client.UserTyping(v.channelId, v.parentId); err != nil {
errChan <- fmt.Errorf("userentity: error in client.UserTyping: %w", err)
}
case threadPresenceMsg:
if err := client.UpdateActiveThread(v.channelId, v.threadView); err != nil {
errChan <- fmt.Errorf("userentity: error in client.UpdateActiveThread: %w", err)
}
case channelPresenceMsg:
if err := client.UpdateActiveChannel(v.channelId); err != nil {
errChan <- fmt.Errorf("userentity: error in client.UpdateActiveChannel: %w", err)
}
case teamPresenceMsg:
if err := client.UpdateActiveTeam(v.teamId); err != nil {
errChan <- fmt.Errorf("userentity: error in client.UpdateActiveTeam: %w", err)
}
}
}
if chanClosed {
Expand All @@ -209,7 +224,7 @@ start:
connectionFailCount++
select {
// Draining the channel to avoid blocking the sender.
case <-ue.wsTyping:
case <-ue.dataChan:
case <-ue.wsClosing:
// Explicit disconnect. Return.
close(ue.wsClosed)
Expand Down Expand Up @@ -239,9 +254,47 @@ func (ue *UserEntity) SendTypingEvent(channelId, parentId string) error {
if !ue.connected {
return errors.New("user is not connected")
}
ue.wsTyping <- userTypingMsg{
channelId,
parentId,
ue.dataChan <- userTypingMsg{
channelId: channelId,
parentId: parentId,
}
return nil
}

func (ue *UserEntity) UpdateActiveChannel(channelId string) error {
if !ue.connected {
return errors.New("user is not connected")
}
ue.dataChan <- channelPresenceMsg{
channelId: channelId,
}
return nil
}

func (ue *UserEntity) UpdateActiveThread(channelId string) error {
if !ue.connected {
return errors.New("user is not connected")
}
// We don't really have a notion of RHS thread vs global thread in the load test.
// We either load a channel or load a thread. For now, we just set `is_thread_view`
// as both true and false to set the scope.
ue.dataChan <- threadPresenceMsg{
channelId: channelId,
threadView: true,
}
ue.dataChan <- threadPresenceMsg{
channelId: channelId,
threadView: false,
}
return nil
}

func (ue *UserEntity) UpdateActiveTeam(teamId string) error {
if !ue.connected {
return errors.New("user is not connected")
}
ue.dataChan <- teamPresenceMsg{
teamId: teamId,
}
return nil
}
25 changes: 25 additions & 0 deletions loadtest/user/websocket/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,31 @@ func (c *Client) SendBinaryMessage(action string, data map[string]interface{}) e

// Helper utilities that call SendMessage.

func (c *Client) UpdateActiveChannel(channelId string) error {
data := map[string]interface{}{
"channel_id": channelId,
}

return c.SendMessage("presence", data)
}

func (c *Client) UpdateActiveThread(channelId string, threadView bool) error {
data := map[string]interface{}{
"thread_channel_id": channelId,
"is_thread_view": threadView,
}

return c.SendMessage("presence", data)
}

func (c *Client) UpdateActiveTeam(teamId string) error {
data := map[string]interface{}{
"team_id": teamId,
}

return c.SendMessage("presence", data)
}

func (c *Client) UserTyping(channelId, parentId string) error {
data := map[string]interface{}{
"channel_id": channelId,
Expand Down

0 comments on commit 51e020f

Please sign in to comment.