Skip to content

Commit

Permalink
MM-56188: Add presence indicator to load test
Browse files Browse the repository at this point in the history
We send the message via the websocket whenever
the client state changes.

https://mattermost.atlassian.net/browse/MM-56188
  • Loading branch information
agnivade committed Dec 13, 2023
1 parent 8bc4f81 commit 5a09a72
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 6 deletions.
14 changes: 14 additions & 0 deletions loadtest/control/simulcontroller/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/mattermost/mattermost-load-test-ng/loadtest/user"

"github.com/mattermost/mattermost/server/public/model"
"github.com/mattermost/mattermost/server/public/shared/mlog"
)

type userAction struct {
Expand Down Expand Up @@ -293,6 +294,10 @@ func (c *SimulController) switchTeam(u user.User) control.UserActionResponse {
return control.UserActionResponse{Err: control.NewUserError(err)}
}

if err := u.UpdateActiveTeam(team.Id); err != nil {
mlog.Warn("Failed to update active team", mlog.String("team_id", team.Id))
}

// We should probably keep track of the last channel viewed in the team but
// for now we can simplify and randomly pick one each time.

Expand Down Expand Up @@ -530,6 +535,10 @@ func switchChannel(u user.User) control.UserActionResponse {
return control.UserActionResponse{Err: control.NewUserError(err)}
}

if err := u.UpdateActiveChannel(channel.Id); err != nil {
mlog.Warn("Failed to update active channel", mlog.String("channel_id", channel.Id))
}

return control.UserActionResponse{Info: fmt.Sprintf("switched to channel %s", channel.Id)}
}

Expand Down Expand Up @@ -1717,6 +1726,11 @@ func (c *SimulController) viewThread(u user.User) control.UserActionResponse {
case <-time.After(idleTime):
}
}

if err := u.UpdateActiveThread(thread.Post.ChannelId); err != nil {
mlog.Warn("Failed to update active thread", mlog.String("channel_id", thread.Post.ChannelId))
}

return control.UserActionResponse{Info: fmt.Sprintf("viewedthread %s", thread.PostId)}
}

Expand Down
5 changes: 5 additions & 0 deletions loadtest/user/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ type User interface {
// SendTypingEvent will push a user_typing event out to all connected users
// who are in the specified channel.
SendTypingEvent(channelId, parentId string) error
// These methods are to send info to the server on
// the state of the client.
UpdateActiveChannel(channelId string) error
UpdateActiveThread(channelId string) error
UpdateActiveTeam(teamId string) error

//server
// GetConfig fetches and stores the server's configuration.
Expand Down
2 changes: 2 additions & 0 deletions loadtest/user/userentity/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/mattermost/mattermost-load-test-ng/loadtest/store"
"github.com/mattermost/mattermost-load-test-ng/loadtest/user/websocket"
"github.com/mattermost/mattermost-load-test-ng/performance"

"github.com/gocolly/colly/v2"
Expand All @@ -21,6 +22,7 @@ import (
type UserEntity struct {
store store.MutableUserStore
client *model.Client4
wsClient *websocket.Client
wsClosing chan struct{}
wsClosed chan struct{}
wsErrorChan chan error
Expand Down
34 changes: 28 additions & 6 deletions loadtest/user/userentity/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,10 @@ func (ue *UserEntity) wsEventHandler(ev *model.WebSocketEvent) error {
// Only on calling Disconnect explicitly, it will return.
func (ue *UserEntity) listen(errChan chan error) {
connectionFailCount := 0
var err error
start:
for {
client, err := websocket.NewClient4(&websocket.ClientParams{
ue.wsClient, err = websocket.NewClient4(&websocket.ClientParams{
WsURL: ue.config.WebSocketURL,
AuthToken: ue.client.AuthToken,
ConnID: ue.wsConnID,
Expand All @@ -168,23 +169,23 @@ start:
var chanClosed bool
for {
select {
case ev, ok := <-client.EventChannel:
case ev, ok := <-ue.wsClient.EventChannel:
if !ok {
chanClosed = true
break
}
if err := ue.wsEventHandler(ev); err != nil {
if err == errSeqMismatch {
// Disconnect and reconnect.
client.Close()
ue.wsClient.Close()
ue.decWebSocketConnections()
continue start
}
errChan <- fmt.Errorf("userentity: error in wsEventHandler: %w", err)
}
ue.wsEventChan <- ev
case <-ue.wsClosing:
client.Close()
ue.wsClient.Close()
ue.decWebSocketConnections()
// Explicit disconnect. Return.
close(ue.wsClosed)
Expand All @@ -194,12 +195,12 @@ start:
chanClosed = true
break
}
if err := client.UserTyping(msg.channelId, msg.parentId); err != nil {
if err := ue.wsClient.UserTyping(msg.channelId, msg.parentId); err != nil {
errChan <- fmt.Errorf("userentity: error in client.UserTyping: %w", err)
}
}
if chanClosed {
client.Close()
ue.wsClient.Close()
break
}
}
Expand Down Expand Up @@ -245,3 +246,24 @@ func (ue *UserEntity) SendTypingEvent(channelId, parentId string) error {
}
return nil
}

func (ue *UserEntity) UpdateActiveChannel(channelId string) error {
if ue.wsClient != nil {
return ue.wsClient.UpdateActiveChannel(channelId)
}
return nil
}

func (ue *UserEntity) UpdateActiveThread(channelId string) error {
if ue.wsClient != nil {
return ue.wsClient.UpdateActiveThread(channelId)
}
return nil
}

func (ue *UserEntity) UpdateActiveTeam(teamId string) error {
if ue.wsClient != nil {
return ue.wsClient.UpdateActiveTeam(teamId)
}
return nil
}
24 changes: 24 additions & 0 deletions loadtest/user/websocket/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,30 @@ func (c *Client) SendBinaryMessage(action string, data map[string]interface{}) e

// Helper utilities that call SendMessage.

func (c *Client) UpdateActiveChannel(channelId string) error {
data := map[string]interface{}{
"channel_id": channelId,
}

return c.SendMessage("presence", data)
}

func (c *Client) UpdateActiveThread(channelId string) error {
data := map[string]interface{}{
"thread_channel_id": channelId,
}

return c.SendMessage("presence", data)
}

func (c *Client) UpdateActiveTeam(teamId string) error {
data := map[string]interface{}{
"team_id": teamId,
}

return c.SendMessage("presence", data)
}

func (c *Client) UserTyping(channelId, parentId string) error {
data := map[string]interface{}{
"channel_id": channelId,
Expand Down

0 comments on commit 5a09a72

Please sign in to comment.