Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disable scheduleSend when onMessage callback is running #144

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions client/internal/httpsender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,39 @@ import (
"github.com/stretchr/testify/assert"
)

func TestDelaySchedule(t *testing.T) {
sender := NewHTTPSender(&sharedinternal.NopLogger{})
pendingMessageChan := sender.hasPendingMessage
scheduleSendDelayChan := sender.registerScheduleSend
sender.DisableScheduleSend()

// Verify ScheduleSend is not writing to message channel when disabled
sender.ScheduleSend()
assert.Equal(t, 0, len(pendingMessageChan))
assert.Equal(t, 1, len(scheduleSendDelayChan))

// Repeat process to verify non-blocking and no change in channel length
sender.ScheduleSend()
assert.Equal(t, 0, len(pendingMessageChan))
assert.Equal(t, 1, len(scheduleSendDelayChan))

// Verify ScheduleSend is writing to message channel when enabled
sender.EnableScheduleSend()
assert.Equal(t, 1, len(pendingMessageChan))
assert.Equal(t, 0, len(scheduleSendDelayChan))

// Repeat process to verify non-blocking and no change in channel length
sender.EnableScheduleSend()
assert.Equal(t, 1, len(pendingMessageChan))
assert.Equal(t, 0, len(scheduleSendDelayChan))

// ScheduleSend sanity check after enabling
sender.ScheduleSend()
assert.Equal(t, 1, len(pendingMessageChan))
assert.Equal(t, 0, len(scheduleSendDelayChan))

}

func TestHTTPSenderRetryForStatusTooManyRequests(t *testing.T) {

var connectionAttempts int64
Expand Down
7 changes: 6 additions & 1 deletion client/internal/receivedprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ func newReceivedProcessor(
// the received message and performs any processing necessary based on what fields are set.
// This function will call any relevant callbacks.
func (r *receivedProcessor) ProcessReceivedMessage(ctx context.Context, msg *protobufs.ServerToAgent) {
r.sender.DisableScheduleSend()

// Verify message sending is enabled. Can be called several times since process is non-blocking
defer r.sender.EnableScheduleSend()

if r.callbacks != nil {
if msg.Command != nil {
r.rcvCommand(msg.Command)
Expand Down Expand Up @@ -127,9 +132,9 @@ func (r *receivedProcessor) ProcessReceivedMessage(ctx context.Context, msg *pro
msgData.AgentIdentification = msg.AgentIdentification
}
}

r.callbacks.OnMessage(ctx, msgData)

r.sender.EnableScheduleSend()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a comment to EnableScheduleSend definition that calling EnableScheduleSend when it is already enabled is allowed? (since we do it twice here).

r.rcvOpampConnectionSettings(ctx, msg.ConnectionSettings)

if scheduled {
Expand Down
55 changes: 52 additions & 3 deletions client/internal/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package internal

import (
"errors"

"github.com/oklog/ulid/v2"
"github.com/open-telemetry/opamp-go/protobufs"
"sync/atomic"
"time"
)

// Sender is an interface of the sending portion of OpAMP protocol that stores
Expand All @@ -20,6 +21,12 @@ type Sender interface {
// "pending" flag is reset) then no message will be sent.
ScheduleSend()

// DisableScheduleSend temporary preventing ScheduleSend from writing to channel
DisableScheduleSend()

// EnableScheduleSend re-enables ScheduleSend and checks if it was called during onMessage callback
EnableScheduleSend()

// SetInstanceUid sets a new instanceUid to be used for all subsequent messages to be sent.
SetInstanceUid(instanceUid string) error
}
Expand All @@ -31,6 +38,12 @@ type SenderCommon struct {
// Indicates that there is a pending message to send.
hasPendingMessage chan struct{}

// When set to non-zero indicates message sending is disabled
isSendingDisabled int32

// Indicates ScheduleSend() was called when message sending was disabled
registerScheduleSend chan struct{}

// The next message to send.
nextMessage NextMessage
}
Expand All @@ -39,15 +52,27 @@ type SenderCommon struct {
// the WebSocket and HTTP Sender implementations.
func NewSenderCommon() SenderCommon {
return SenderCommon{
hasPendingMessage: make(chan struct{}, 1),
nextMessage: NewNextMessage(),
hasPendingMessage: make(chan struct{}, 1),
registerScheduleSend: make(chan struct{}, 1),
nextMessage: NewNextMessage(),
isSendingDisabled: 0,
}
}

// ScheduleSend signals to HTTPSender that the message in NextMessage struct
// is now ready to be sent. If there is no pending message (e.g. the NextMessage was
// already sent and "pending" flag is reset) then no message will be sent.
func (h *SenderCommon) ScheduleSend() {
if h.IsSendingDisabled() {
// Register message sending to when message sending is enabled, won't block on writing to channel.
select {
case h.registerScheduleSend <- struct{}{}:
default:
break
}
return
}

// Set pending flag. Don't block on writing to channel.
select {
case h.hasPendingMessage <- struct{}{}:
Expand All @@ -62,6 +87,30 @@ func (h *SenderCommon) NextMessage() *NextMessage {
return &h.nextMessage
}

// IsSendingDisabled returns true when onMessage callback is running
func (h *SenderCommon) IsSendingDisabled() bool {
return atomic.LoadInt32(&h.isSendingDisabled) != 0
}

// DisableScheduleSend temporary preventing ScheduleSend from writing to channel
func (h *SenderCommon) DisableScheduleSend() {

atomic.StoreInt32(&h.isSendingDisabled, 1)
}

// EnableScheduleSend re-enables message sending, won't block on reading from channel.
func (h *SenderCommon) EnableScheduleSend() {
atomic.StoreInt32(&h.isSendingDisabled, 0)
select {
case <-h.registerScheduleSend:
h.ScheduleSend()
case <-time.Tick(100 * time.Millisecond):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unclear why this is necessary.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fixed my sync issue. Several tests failed when testing with -race -count 1000 flags. Honestly, I don't fully understand why but it seems that the default case was selected over reading from the registerScheduleSend channel even when it (should have) held an unread message.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Failing tests may be indicating a real problem. The current code (before this PR) does not fail the tests.

The addition of the second case that waits a time without explaining why it fixes the problem is not an acceptable code change. We need precise explanation what the problem is and why this is the correct fix for it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've removed this bypass and tests are looking stable at the moment with the updated branch. Still, I don't have a smoking gun regarding the sync issue, but it could have been resolved due to recent commits. Would appreciate your opinion on whether or not this should stay blocked or more changes are needed

break
default:
break
}
}

// SetInstanceUid sets a new instanceUid to be used for all subsequent messages to be sent.
// Can be called concurrently, normally is called when a message is received from the
// Server that instructs us to change our instance UID.
Expand Down
2 changes: 1 addition & 1 deletion client/internal/wsreceiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func TestServerToAgentCommandExclusive(t *testing.T) {
},
}
clientSyncedState := ClientSyncedState{}
receiver := NewWSReceiver(TestLogger{t}, callbacks, nil, nil, &clientSyncedState, nil, 0)
receiver := NewWSReceiver(TestLogger{t}, callbacks, nil, &WSSender{}, &clientSyncedState, nil, 0)
receiver.processor.ProcessReceivedMessage(context.Background(), &protobufs.ServerToAgent{
Command: &protobufs.ServerToAgentCommand{
Type: protobufs.CommandType_CommandType_Restart,
Expand Down
7 changes: 4 additions & 3 deletions client/wsclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,10 @@ func (c *wsClient) ensureConnected(ctx context.Context) error {
}

// runOneCycle performs the following actions:
// 1. connect (try until succeeds).
// 2. send first status report.
// 3. receive and process messages until error happens.
// 1. connect (try until succeeds).
// 2. send first status report.
// 3. receive and process messages until error happens.
//
// If it encounters an error it closes the connection and returns.
// Will stop and return if Stop() is called (ctx is cancelled, isStopping is set).
func (c *wsClient) runOneCycle(ctx context.Context) {
Expand Down