Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MM-56188: Add presence indicator to load test #678

Merged
merged 3 commits into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions loadtest/control/simulcontroller/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/mattermost/mattermost-load-test-ng/loadtest/user"

"github.com/mattermost/mattermost/server/public/model"
"github.com/mattermost/mattermost/server/public/shared/mlog"
)

type userAction struct {
Expand Down Expand Up @@ -293,6 +294,12 @@ func (c *SimulController) switchTeam(u user.User) control.UserActionResponse {
return control.UserActionResponse{Err: control.NewUserError(err)}
}

if c.user.Store().FeatureFlags()["WebSocketEventScope"] {
if err := u.UpdateActiveTeam(team.Id); err != nil {
mlog.Warn("Failed to update active team", mlog.String("team_id", team.Id))
}
}

// We should probably keep track of the last channel viewed in the team but
// for now we can simplify and randomly pick one each time.

Expand Down Expand Up @@ -530,6 +537,12 @@ func switchChannel(u user.User) control.UserActionResponse {
return control.UserActionResponse{Err: control.NewUserError(err)}
}

if u.Store().FeatureFlags()["WebSocketEventScope"] {
if err := u.UpdateActiveChannel(channel.Id); err != nil {
mlog.Warn("Failed to update active channel", mlog.String("channel_id", channel.Id))
}
}

return control.UserActionResponse{Info: fmt.Sprintf("switched to channel %s", channel.Id)}
}

Expand Down Expand Up @@ -1410,6 +1423,15 @@ func (c *SimulController) initialJoinTeam(u user.User) control.UserActionRespons
return c.joinTeam(c.user)
}

if c.user.Store().FeatureFlags()["WebSocketEventScope"] {
// Setting the active thread to empty to allow the optimization to kick in early.
// We don't do this in the webapp to keep the code simple, but it's okay to do this in load-test
// to magnify the boost.
Comment on lines +1427 to +1429
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When in doubt I'd probably want the load-test simulation to perform worse than webapp to be on the safe side. I'd be okay with keeping this to prove the ideal improvements though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also a bit worried that we deviate from what the webapp does. Should we just remove this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is our simulation is not close enough to the webapp. We don't differentiate between loading a thread from RHS or from global thread view. It's all just a loadThread action. Until we differentiate that, we'll have to approximate somehow.

Copy link
Member

@agarciamontoro agarciamontoro Jan 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But are we making things more performant here with this piece of code? Or am I misunderstanding its goal and side effects?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, we are making it more performant here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's what worries me (and what I think concerned Claudio as well). Is the performance improvement in here really needed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I talked about this earlier in this comment: #678 (comment)

Until we do that, we have to approximate somehow. And if we choose not to set the active thread at all, then there will be no improvements at all, so it will be impossible to measure the impact of the change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, let's go with it then! Thank you for the patience :)

if err := u.UpdateActiveThread(""); err != nil {
mlog.Warn("Failed to update active thread", mlog.String("channel_id", ""))
}
}

return resp
}

Expand Down Expand Up @@ -1728,6 +1750,13 @@ func (c *SimulController) viewThread(u user.User) control.UserActionResponse {
case <-time.After(idleTime):
}
}

if c.user.Store().FeatureFlags()["WebSocketEventScope"] {
if err := u.UpdateActiveThread(thread.Post.ChannelId); err != nil {
mlog.Warn("Failed to update active thread", mlog.String("channel_id", thread.Post.ChannelId))
}
}

return control.UserActionResponse{Info: fmt.Sprintf("viewedthread %s", thread.PostId)}
}

Expand Down
5 changes: 5 additions & 0 deletions loadtest/user/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ type User interface {
// SendTypingEvent will push a user_typing event out to all connected users
// who are in the specified channel.
SendTypingEvent(channelId, parentId string) error
// These methods are to send info to the server on
// the state of the client.
UpdateActiveChannel(channelId string) error
UpdateActiveThread(channelId string) error
UpdateActiveTeam(teamId string) error

//server
// GetConfig fetches and stores the server's configuration.
Expand Down
19 changes: 16 additions & 3 deletions loadtest/user/userentity/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type UserEntity struct {
wsClosed chan struct{}
wsErrorChan chan error
wsEventChan chan *model.WebSocketEvent
wsTyping chan userTypingMsg
dataChan chan any
connected bool
config Config
metrics *performance.UserEntityMetrics
Expand Down Expand Up @@ -64,6 +64,19 @@ type userTypingMsg struct {
parentId string
}

type channelPresenceMsg struct {
channelId string
}

type threadPresenceMsg struct {
channelId string
threadView bool
}

type teamPresenceMsg struct {
teamId string
}

type ueTransport struct {
transport http.RoundTripper
ue *UserEntity
Expand Down Expand Up @@ -140,7 +153,7 @@ func (ue *UserEntity) Connect() (<-chan error, error) {
}

ue.wsEventChan = make(chan *model.WebSocketEvent)
ue.wsTyping = make(chan userTypingMsg)
ue.dataChan = make(chan any)
go ue.listen(ue.wsErrorChan)
ue.connected = true
return ue.wsErrorChan, nil
Expand Down Expand Up @@ -175,7 +188,7 @@ func (ue *UserEntity) Disconnect() error {
<-ue.wsClosed

close(ue.wsEventChan)
close(ue.wsTyping)
close(ue.dataChan)
close(ue.wsErrorChan)
ue.connected = false
return nil
Expand Down
69 changes: 61 additions & 8 deletions loadtest/user/userentity/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ start:
connectionFailCount++
select {
// Draining the channel to avoid blocking the sender.
case <-ue.wsTyping:
case <-ue.dataChan:
case <-ue.wsClosing:
// Explicit disconnect. Return.
close(ue.wsClosed)
Expand Down Expand Up @@ -189,13 +189,28 @@ start:
// Explicit disconnect. Return.
close(ue.wsClosed)
return
case msg, ok := <-ue.wsTyping:
case msg, ok := <-ue.dataChan:
if !ok {
chanClosed = true
break
}
if err := client.UserTyping(msg.channelId, msg.parentId); err != nil {
errChan <- fmt.Errorf("userentity: error in client.UserTyping: %w", err)
switch v := msg.(type) {
case userTypingMsg:
if err := client.UserTyping(v.channelId, v.parentId); err != nil {
errChan <- fmt.Errorf("userentity: error in client.UserTyping: %w", err)
}
case threadPresenceMsg:
if err := client.UpdateActiveThread(v.channelId, v.threadView); err != nil {
errChan <- fmt.Errorf("userentity: error in client.UpdateActiveThread: %w", err)
}
case channelPresenceMsg:
if err := client.UpdateActiveChannel(v.channelId); err != nil {
errChan <- fmt.Errorf("userentity: error in client.UpdateActiveChannel: %w", err)
}
case teamPresenceMsg:
if err := client.UpdateActiveTeam(v.teamId); err != nil {
errChan <- fmt.Errorf("userentity: error in client.UpdateActiveTeam: %w", err)
}
}
}
if chanClosed {
Expand All @@ -209,7 +224,7 @@ start:
connectionFailCount++
select {
// Draining the channel to avoid blocking the sender.
case <-ue.wsTyping:
case <-ue.dataChan:
case <-ue.wsClosing:
// Explicit disconnect. Return.
close(ue.wsClosed)
Expand Down Expand Up @@ -239,9 +254,47 @@ func (ue *UserEntity) SendTypingEvent(channelId, parentId string) error {
if !ue.connected {
return errors.New("user is not connected")
}
ue.wsTyping <- userTypingMsg{
channelId,
parentId,
ue.dataChan <- userTypingMsg{
channelId: channelId,
parentId: parentId,
}
return nil
}

func (ue *UserEntity) UpdateActiveChannel(channelId string) error {
if !ue.connected {
return errors.New("user is not connected")
}
ue.dataChan <- channelPresenceMsg{
channelId: channelId,
}
return nil
}

func (ue *UserEntity) UpdateActiveThread(channelId string) error {
if !ue.connected {
return errors.New("user is not connected")
}
// We don't really have a notion of RHS thread vs global thread in the load test.
// We either load a channel or load a thread. For now, we just set `is_thread_view`
// as both true and false to set the scope.
ue.dataChan <- threadPresenceMsg{
channelId: channelId,
threadView: true,
}
ue.dataChan <- threadPresenceMsg{
channelId: channelId,
threadView: false,
}
Comment on lines +278 to +288
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this. What does the webapp do with these two messages?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The webapp has 2 different ways of loading a thread. One from RHS and one from global thread viewer. That means one can have 2 threads open at the same time. Since we don't simulate that, we are approximating it here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I get it now, we're simply opening both threads, right? That makes sense, thanks!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically. Actually we are making the API call to open just a single thread. But we are setting the scope such that both threads are open.

return nil
}

func (ue *UserEntity) UpdateActiveTeam(teamId string) error {
if !ue.connected {
return errors.New("user is not connected")
}
ue.dataChan <- teamPresenceMsg{
teamId: teamId,
}
return nil
}
25 changes: 25 additions & 0 deletions loadtest/user/websocket/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,31 @@ func (c *Client) SendBinaryMessage(action string, data map[string]interface{}) e

// Helper utilities that call SendMessage.

func (c *Client) UpdateActiveChannel(channelId string) error {
data := map[string]interface{}{
"channel_id": channelId,
}

return c.SendMessage("presence", data)
}

func (c *Client) UpdateActiveThread(channelId string, threadView bool) error {
data := map[string]interface{}{
"thread_channel_id": channelId,
"is_thread_view": threadView,
}

return c.SendMessage("presence", data)
}

func (c *Client) UpdateActiveTeam(teamId string) error {
data := map[string]interface{}{
"team_id": teamId,
}

return c.SendMessage("presence", data)
}

func (c *Client) UserTyping(channelId, parentId string) error {
data := map[string]interface{}{
"channel_id": channelId,
Expand Down