Skip to content

Commit

Permalink
Acknowledge websocket POSTED events when they should be (#722)
Browse files Browse the repository at this point in the history
* WIP

* Use a goroutine for the ACK

* Only ACK other users posts

* Only ack if the prop is true

* Change the ack name

* Use non-blocking call to buffered channel
  • Loading branch information
devinbinnie authored Apr 11, 2024
1 parent 4a0b1c7 commit 2ac55d1
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 1 deletion.
7 changes: 7 additions & 0 deletions loadtest/control/simulcontroller/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ func (c *SimulController) wsEventHandler(wg *sync.WaitGroup) {
break
}

if ack, ok := ev.GetData()["should_ack"]; ok && ack.(bool) {
if err := c.user.PostedAck(post.Id, "success", "", ""); err != nil {
c.status <- c.newErrorStatus(err)
break
}
}

cm, _ := c.user.Store().ChannelMember(post.ChannelId, c.user.Store().Id())
if cm.UserId != "" {
break
Expand Down
1 change: 1 addition & 0 deletions loadtest/user/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type User interface {
UpdateActiveChannel(channelId string) error
UpdateActiveThread(channelId string) error
UpdateActiveTeam(teamId string) error
PostedAck(postId string, result string, reason string, postedData string) error

//server
// GetConfig fetches and stores the server's configuration.
Expand Down
9 changes: 8 additions & 1 deletion loadtest/user/userentity/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ type teamPresenceMsg struct {
teamId string
}

type postedAckMsg struct {
postId string
result string
reason string
postedData string
}

type ueTransport struct {
transport http.RoundTripper
ue *UserEntity
Expand Down Expand Up @@ -153,7 +160,7 @@ func (ue *UserEntity) Connect() (<-chan error, error) {
}

ue.wsEventChan = make(chan *model.WebSocketEvent)
ue.dataChan = make(chan any)
ue.dataChan = make(chan any, 10)
go ue.listen(ue.wsErrorChan)
ue.connected = true
return ue.wsErrorChan, nil
Expand Down
23 changes: 23 additions & 0 deletions loadtest/user/userentity/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,12 @@ start:
if err := client.UpdateActiveTeam(v.teamId); err != nil {
errChan <- fmt.Errorf("userentity: error in client.UpdateActiveTeam: %w", err)
}
case postedAckMsg:
if err := client.PostedAck(v.postId, v.result, v.reason, v.postedData); err != nil {
errChan <- fmt.Errorf("userentity: error in client.PostedAck: %w", err)
}
}

}
if chanClosed {
client.Close()
Expand Down Expand Up @@ -298,3 +303,21 @@ func (ue *UserEntity) UpdateActiveTeam(teamId string) error {
}
return nil
}

func (ue *UserEntity) PostedAck(postId string, result string, reason string, postedData string) error {
if !ue.connected {
return errors.New("user is not connected")
}
select {
case ue.dataChan <- postedAckMsg{
postId,
result,
reason,
postedData,
}:
default:
return errors.New("failed to send posted ACK")
}

return nil
}
12 changes: 12 additions & 0 deletions loadtest/user/websocket/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,3 +204,15 @@ func (c *Client) GetStatusesByIds(userIds []string) error {
}
return c.SendMessage("get_statuses_by_ids", data)
}

func (c *Client) PostedAck(postId string, result string, reason string, postedData string) error {
data := map[string]interface{}{
"post_id": postId,
"user_agent": "LoadTest",
"result": result,
"reason": reason,
"data": postedData,
}

return c.SendMessage("posted_notify_ack", data)
}

0 comments on commit 2ac55d1

Please sign in to comment.