Skip to content

Commit c15cf0b

Browse files
authored
Add Disconnect() method to client connections (open-telemetry#129)
Resolves open-telemetry#91 This enables OpAMP to close the network connection to an agent from the server side
1 parent 481b3de commit c15cf0b

File tree

4 files changed

+60
-2
lines changed

4 files changed

+60
-2
lines changed

server/httpconnection.go

+11-2
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,11 @@ import (
99
"github.com/open-telemetry/opamp-go/server/types"
1010
)
1111

12-
var errInvalidHTTPConnection = errors.New("cannot Send() over HTTP connection")
12+
// ErrInvalidHTTPConnection represents an event of misuse function for plain HTTP
13+
// connection, such as httpConnection.Send() or httpConnection.Disconnect().
14+
// Usage will not result with change but return this error to indicate current state
15+
// might not be as expected.
16+
var ErrInvalidHTTPConnection = errors.New("cannot operate over HTTP connection")
1317

1418
// httpConnection represents an OpAMP connection over a plain HTTP connection.
1519
// Only one response is possible to send when using plain HTTP connection
@@ -28,5 +32,10 @@ var _ types.Connection = (*httpConnection)(nil)
2832
func (c httpConnection) Send(_ context.Context, _ *protobufs.ServerToAgent) error {
2933
// Send() should not be called for plain HTTP connection. Instead, the response will
3034
// be sent after the onMessage callback returns.
31-
return errInvalidHTTPConnection
35+
return ErrInvalidHTTPConnection
36+
}
37+
38+
func (c httpConnection) Disconnect() error {
39+
// Disconnect() should not be called for plain HTTP connection.
40+
return ErrInvalidHTTPConnection
3241
}

server/serverimpl_test.go

+41
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,47 @@ func TestServerStartAcceptConnection(t *testing.T) {
126126
eventually(t, func() bool { return atomic.LoadInt32(&connectionCloseCalled) == 1 })
127127
}
128128

129+
func TestDisconnectHttpConnection(t *testing.T) {
130+
// Verify Disconnect() results with Invalid HTTP Connection error
131+
err := httpConnection{}.Disconnect()
132+
assert.Error(t, err)
133+
assert.Equal(t, ErrInvalidHTTPConnection, err)
134+
}
135+
136+
func TestDisconnectWSConnection(t *testing.T) {
137+
connectionCloseCalled := int32(0)
138+
callback := CallbacksStruct{
139+
OnConnectionCloseFunc: func(conn types.Connection) {
140+
atomic.StoreInt32(&connectionCloseCalled, 1)
141+
},
142+
}
143+
144+
// Start a Server.
145+
settings := &StartSettings{Settings: Settings{Callbacks: callback}}
146+
srv := startServer(t, settings)
147+
defer srv.Stop(context.Background())
148+
149+
// Connect to the Server.
150+
conn, _, err := dialClient(settings)
151+
152+
// Verify that the connection is successful.
153+
assert.NoError(t, err)
154+
assert.True(t, atomic.LoadInt32(&connectionCloseCalled) == 0)
155+
156+
// Close connection from server side
157+
srvConn := wsConnection{wsConn: conn}
158+
err = srvConn.Disconnect()
159+
assert.NoError(t, err)
160+
161+
// Verify connection disconnected from server side
162+
eventually(t, func() bool { return atomic.LoadInt32(&connectionCloseCalled) == 1 })
163+
// Waiting for wsConnection to fail ReadMessage() over a Disconnected communication
164+
eventually(t, func() bool {
165+
_, _, err := conn.ReadMessage()
166+
return err != nil
167+
})
168+
}
169+
129170
func TestServerReceiveSendMessage(t *testing.T) {
130171
var rcvMsg atomic.Value
131172
callbacks := CallbacksStruct{

server/types/connection.go

+4
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,8 @@ type Connection interface {
1919
// Blocks until the message is sent.
2020
// Should return as soon as possible if the ctx is cancelled.
2121
Send(ctx context.Context, message *protobufs.ServerToAgent) error
22+
23+
// Disconnect closes the network connection.
24+
// Any blocked Read or Write operations will be unblocked and return errors.
25+
Disconnect() error
2226
}

server/wsconnection.go

+4
Original file line numberDiff line numberDiff line change
@@ -29,3 +29,7 @@ func (c wsConnection) Send(_ context.Context, message *protobufs.ServerToAgent)
2929
}
3030
return c.wsConn.WriteMessage(websocket.BinaryMessage, bytes)
3131
}
32+
33+
func (c wsConnection) Disconnect() error {
34+
return c.wsConn.Close()
35+
}

0 commit comments

Comments
 (0)