Skip to content

Commit d666d1a

Browse files
committed
Fix for WsSession leak; Add extra Tomcat timeout properties
1 parent b1fb0aa commit d666d1a

File tree

12 files changed

+78
-46
lines changed

12 files changed

+78
-46
lines changed

client/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
<parent>
44
<groupId>org.red5</groupId>
55
<artifactId>red5-parent</artifactId>
6-
<version>2.0.15</version>
6+
<version>2.0.16</version>
77
</parent>
88
<modelVersion>4.0.0</modelVersion>
99
<artifactId>red5-client</artifactId>

client/src/main/java/org/red5/client/Red5Client.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ public final class Red5Client {
1818
/**
1919
* Current server version with revision
2020
*/
21-
public static final String VERSION = "Red5 Client 2.0.15";
21+
public static final String VERSION = "Red5 Client 2.0.16";
2222

2323
/**
2424
* Create a new Red5Client object using the connection local to the current thread A bit of magic that lets you access the red5 scope

common/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
<parent>
44
<groupId>org.red5</groupId>
55
<artifactId>red5-parent</artifactId>
6-
<version>2.0.15</version>
6+
<version>2.0.16</version>
77
</parent>
88
<modelVersion>4.0.0</modelVersion>
99
<artifactId>red5-server-common</artifactId>
@@ -105,7 +105,7 @@
105105
<dependency>
106106
<groupId>net.engio</groupId>
107107
<artifactId>mbassador</artifactId>
108-
<version>2.0.15</version>
108+
<version>2.0.16</version>
109109
</dependency> -->
110110
<dependency>
111111
<groupId>junit</groupId>

common/src/main/java/org/red5/server/api/Red5.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,12 @@ public final class Red5 {
5757
/**
5858
* Server version with revision
5959
*/
60-
public static final String VERSION = "Red5 Server 2.0.15";
60+
public static final String VERSION = "Red5 Server 2.0.16";
6161

6262
/**
6363
* Server version for fmsVer requests
6464
*/
65-
public static final String FMS_VERSION = "RED5/2,0,15,0";
65+
public static final String FMS_VERSION = "RED5/2,0,16,0";
6666

6767
/**
6868
* Server capabilities

io/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
<parent>
44
<groupId>org.red5</groupId>
55
<artifactId>red5-parent</artifactId>
6-
<version>2.0.15</version>
6+
<version>2.0.16</version>
77
</parent>
88
<modelVersion>4.0.0</modelVersion>
99
<artifactId>red5-io</artifactId>

pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
<name>Red5</name>
2525
<description>The Red5 server</description>
2626
<groupId>org.red5</groupId>
27-
<version>2.0.15</version>
27+
<version>2.0.16</version>
2828
<url>https://github.com/Red5/red5-server</url>
2929
<inceptionYear>2005</inceptionYear>
3030
<organization>
@@ -99,7 +99,7 @@
9999
<red5-io.version>${project.version}</red5-io.version>
100100
<red5-server-common.version>${project.version}</red5-server-common.version>
101101
<red5-service.version>${project.version}</red5-service.version>
102-
<slf4j.version>2.0.15</slf4j.version>
102+
<slf4j.version>2.0.16</slf4j.version>
103103
<logback.version>1.5.6</logback.version>
104104
<bc.version>1.79</bc.version>
105105
<mina.version>2.0.23</mina.version>

server/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
<parent>
44
<groupId>org.red5</groupId>
55
<artifactId>red5-parent</artifactId>
6-
<version>2.0.15</version>
6+
<version>2.0.16</version>
77
</parent>
88
<modelVersion>4.0.0</modelVersion>
99
<artifactId>red5-server</artifactId>

server/src/main/java/org/red5/net/websocket/WebSocketConnection.java

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
import java.io.IOException;
1111
import java.io.UnsupportedEncodingException;
12-
import java.lang.ref.WeakReference;
1312
import java.nio.ByteBuffer;
1413
import java.util.Arrays;
1514
import java.util.Collections;
@@ -28,6 +27,7 @@
2827
import org.apache.commons.lang3.StringUtils;
2928
import org.apache.tomcat.websocket.Constants;
3029
import org.apache.tomcat.websocket.WsSession;
30+
import org.red5.net.websocket.model.WSMessage;
3131
import org.red5.server.AttributeStore;
3232
import org.slf4j.Logger;
3333
import org.slf4j.LoggerFactory;
@@ -50,9 +50,7 @@ public class WebSocketConnection extends AttributeStore implements Comparable<We
5050

5151
private static final Logger log = LoggerFactory.getLogger(WebSocketConnection.class);
5252

53-
private static final boolean isTrace = log.isTraceEnabled();
54-
55-
private static final boolean isDebug = log.isDebugEnabled();
53+
private static final boolean isTrace = log.isTraceEnabled(), isDebug = log.isDebugEnabled();
5654

5755
// Sending async on windows times out
5856
private static boolean useAsync;
@@ -69,7 +67,7 @@ public class WebSocketConnection extends AttributeStore implements Comparable<We
6967
private final WsSession wsSession;
7068

7169
// reference to the scope for manager access
72-
private WeakReference<WebSocketScope> scope;
70+
private final WebSocketScope scope;
7371

7472
// unique identifier for the session
7573
private final String wsSessionId;
@@ -111,7 +109,9 @@ public class WebSocketConnection extends AttributeStore implements Comparable<We
111109
public WebSocketConnection(WebSocketScope scope, Session session) {
112110
log.debug("New WebSocket - scope: {} session: {}", scope, session);
113111
// set the scope for ease of use later
114-
this.scope = new WeakReference<>(scope);
112+
this.scope = scope;
113+
// add the session to the user props
114+
session.getUserProperties().put(WSConstants.WS_SCOPE, scope);
115115
// set our path
116116
path = scope.getPath();
117117
if (isDebug) {
@@ -167,15 +167,20 @@ public WebSocketConnection(WebSocketScope scope, Session session) {
167167
// add the timeouts to the user props
168168
userProps.put(Constants.READ_IDLE_TIMEOUT_MS, readTimeout);
169169
userProps.put(Constants.WRITE_IDLE_TIMEOUT_MS, sendTimeout);
170-
// set the close timeout to 5 seconds
171-
userProps.put(Constants.SESSION_CLOSE_TIMEOUT_PROPERTY, TimeUnit.SECONDS.toMillis(5));
170+
// write timeout used when sending WebSocket messages in blocking mode
171+
userProps.put(Constants.BLOCKING_SEND_TIMEOUT_PROPERTY, Long.getLong(Constants.BLOCKING_SEND_TIMEOUT_PROPERTY, 8000L).longValue());
172+
// write timeout Tomcat uses when writing a session close message when the close is abnormal
173+
userProps.put(Constants.ABNORMAL_SESSION_CLOSE_SEND_TIMEOUT_PROPERTY, Long.getLong(Constants.ABNORMAL_SESSION_CLOSE_SEND_TIMEOUT_PROPERTY, 10000L).longValue());
174+
// time Tomcat waits for a peer to send a WebSocket session close message after Tomcat has sent a close message
175+
// to the peer
176+
userProps.put(Constants.SESSION_CLOSE_TIMEOUT_PROPERTY, Long.getLong(Constants.SESSION_CLOSE_TIMEOUT_PROPERTY, 5000L).longValue());
172177
if (isDebug) {
173178
log.debug("userProps: {}", userProps);
174179
}
175180
// set maximum messages size to 10,000 bytes
176181
session.setMaxTextMessageBufferSize(10000);
177-
// set maximum idle timeout to 30 seconds (read timeout)
178-
session.setMaxIdleTimeout(readTimeout);
182+
// set maximum idle timeout to the largest of the read and send timeouts
183+
session.setMaxIdleTimeout(Math.max(readTimeout, sendTimeout));
179184
}
180185

181186
/**
@@ -320,6 +325,16 @@ public void sendPong(byte[] buf) throws IllegalArgumentException, IOException {
320325
}
321326
}
322327

328+
/**
329+
* Send a received message to the scope.
330+
*
331+
* @param wsMessage
332+
* WSMessage
333+
*/
334+
public void onReceive(WSMessage wsMessage) {
335+
scope.onMessage(wsMessage);
336+
}
337+
323338
/**
324339
* Close the connection.
325340
*/
@@ -386,7 +401,7 @@ public static void setUseAsync(boolean useAsync) {
386401
* @return WebSocketScope
387402
*/
388403
public WebSocketScope getScope() {
389-
return scope != null ? scope.get() : null;
404+
return scope;
390405
}
391406

392407
/**

server/src/main/java/org/red5/net/websocket/server/DefaultWebSocketEndpoint.java

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import java.io.IOException;
1212
import java.io.UnsupportedEncodingException;
1313
import java.nio.ByteBuffer;
14+
import java.util.Map;
1415

1516
import org.apache.mina.core.buffer.IoBuffer;
1617
import org.red5.net.websocket.WSConstants;
@@ -36,12 +37,8 @@ public class DefaultWebSocketEndpoint extends Endpoint {
3637

3738
private final Logger log = LoggerFactory.getLogger(DefaultWebSocketEndpoint.class);
3839

39-
@SuppressWarnings("unused")
4040
private final boolean isDebug = log.isDebugEnabled(), isTrace = log.isTraceEnabled();
4141

42-
// websocket scope where connections connect
43-
private WebSocketScope scope;
44-
4542
/**
4643
* TODO: Currently, Tomcat uses an Endpoint instance once - however the java doc of endpoint says: "Each instance
4744
* of a websocket endpoint is guaranteed not to be called by more than one thread at a time per active connection."
@@ -56,10 +53,15 @@ public void onOpen(Session session, EndpointConfig config) {
5653
if (isDebug) {
5754
log.debug("Session opened: {}\n{}", session.getId(), session.getRequestParameterMap());
5855
}
56+
// user props from session
57+
Map<String, Object> userProps = session.getUserProperties();
5958
// get ws scope from user props
60-
scope = (WebSocketScope) config.getUserProperties().get(WSConstants.WS_SCOPE);
59+
WebSocketScope scope = (WebSocketScope) userProps.get(WSConstants.WS_SCOPE);
60+
if (isDebug) {
61+
log.debug("onOpen - session: {} props: {} scope: {}", session.getId(), userProps, scope);
62+
}
6163
// get ws connection from session user props
62-
WebSocketConnection conn = (WebSocketConnection) session.getUserProperties().get(WSConstants.WS_CONNECTION);
64+
WebSocketConnection conn = (WebSocketConnection) userProps.get(WSConstants.WS_CONNECTION);
6365
if (conn == null) {
6466
log.warn("WebSocketConnection null at onOpen for {}", session.getId());
6567
}
@@ -72,11 +74,18 @@ public void onOpen(Session session, EndpointConfig config) {
7274
public void onClose(Session session, CloseReason closeReason) {
7375
final String sessionId = session.getId();
7476
log.debug("Session closed: {}", sessionId);
77+
// user props from session
78+
Map<String, Object> userProps = session.getUserProperties();
79+
// get ws scope from user props
80+
WebSocketScope scope = (WebSocketScope) userProps.get(WSConstants.WS_SCOPE);
81+
if (isDebug) {
82+
log.debug("onClose - session: {} props: {} scope: {}", session.getId(), userProps, scope);
83+
}
7584
WebSocketConnection conn = null;
7685
// getting the sessions user properties on a closed connection will throw an exception when it checks state
7786
try {
7887
// ensure we grab the scope from the session if its null
79-
conn = (WebSocketConnection) session.getUserProperties().get(WSConstants.WS_CONNECTION);
88+
conn = (WebSocketConnection) userProps.get(WSConstants.WS_CONNECTION);
8089
// if we don't get it from the session, try the scope lookup
8190
if (conn == null) {
8291
log.warn("Connection for id: {} was not found in the session onClose", sessionId);
@@ -137,8 +146,7 @@ public void onMessage(String message) {
137146
try {
138147
// create a websocket message and add the current connection for listener access
139148
WSMessage wsMessage = new WSMessage(message, conn);
140-
// fire the message off to the scope for handling
141-
scope.onMessage(wsMessage);
149+
conn.onReceive(wsMessage);
142150
} catch (UnsupportedEncodingException e) {
143151
log.warn("Exception on message", e);
144152
}
@@ -167,8 +175,7 @@ public void onMessage(ByteBuffer message) {
167175
conn.updateReadBytes(message.limit());
168176
// create a websocket message and add the current connection for listener access
169177
WSMessage wsMessage = new WSMessage(IoBuffer.wrap(message), conn);
170-
// fire the message off to the scope for handling
171-
scope.onMessage(wsMessage);
178+
conn.onReceive(wsMessage);
172179
} else {
173180
log.debug("Connection null or not connected", conn);
174181
}

server/src/main/java/org/red5/net/websocket/server/WsHttpUpgradeHandler.java

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,7 @@ public class WsHttpUpgradeHandler implements InternalHttpUpgradeHandler {
3939

4040
private Logger log = LoggerFactory.getLogger(WsHttpUpgradeHandler.class); // must not be static
4141

42-
private final boolean isTrace = log.isTraceEnabled();
43-
44-
@SuppressWarnings("unused")
45-
private final boolean isDebug = log.isDebugEnabled();
42+
private final boolean isTrace = log.isTraceEnabled(), isDebug = log.isDebugEnabled();
4643

4744
private static final StringManager sm = StringManager.getManager(WsHttpUpgradeHandler.class);
4845

@@ -83,6 +80,10 @@ public class WsHttpUpgradeHandler implements InternalHttpUpgradeHandler {
8380

8481
private long lastReadBytes, lastWrittenBytes;
8582

83+
private transient WebSocketScopeManager manager;
84+
85+
private transient WebSocketScope scope;
86+
8687
public WsHttpUpgradeHandler() {
8788
applicationClassLoader = Thread.currentThread().getContextClassLoader();
8889
}
@@ -110,6 +111,12 @@ public void preInit(Endpoint ep, EndpointConfig endpointConfig, DefaultWsServerC
110111
} else {
111112
log.debug("pre-init without http session");
112113
}
114+
// user props
115+
Map<String, Object> userProps = endpointConfig.getUserProperties();
116+
// get the ws scope manager from user props
117+
manager = (WebSocketScopeManager) userProps.get(WSConstants.WS_MANAGER);
118+
// get ws scope from user props
119+
scope = (WebSocketScope) userProps.get(WSConstants.WS_SCOPE);
113120
}
114121

115122
@Override
@@ -141,10 +148,6 @@ public void init(WebConnection connection) {
141148
wsFrame = new WsFrameServer(socketWrapper, upgradeInfo, wsSession, transformation, applicationClassLoader);
142149
// WsFrame adds the necessary final transformations. Copy the completed transformation chain to the remote end point.
143150
wsRemoteEndpointServer.setTransformation(wsFrame.getTransformation());
144-
// get the ws scope manager from user props
145-
WebSocketScopeManager manager = (WebSocketScopeManager) endpointConfig.getUserProperties().get(WSConstants.WS_MANAGER);
146-
// get ws scope from user props
147-
WebSocketScope scope = (WebSocketScope) endpointConfig.getUserProperties().get(WSConstants.WS_SCOPE);
148151
// create a ws connection instance
149152
WebSocketConnection conn = new WebSocketConnection(scope, wsSession);
150153
// set ip and port
@@ -243,7 +246,7 @@ public void destroy() {
243246
}
244247

245248
private void onError(Throwable throwable) {
246-
if (log.isDebugEnabled()) {
249+
if (isDebug) {
247250
log.debug("onError for ws id: {}", wsSession.getId(), throwable);
248251
}
249252
// Need to call onError using the web application's class loader
@@ -258,7 +261,7 @@ private void onError(Throwable throwable) {
258261
}
259262

260263
private void close(CloseReason cr) {
261-
if (log.isDebugEnabled()) {
264+
if (isDebug) {
262265
log.debug("close for ws id: {} reason: {}", wsSession.getId(), cr);
263266
}
264267
/*
@@ -268,6 +271,15 @@ private void close(CloseReason cr) {
268271
* recover from whatever messed up state the client put the connection into.
269272
*/
270273
wsSession.onClose(cr);
274+
// null these so that we don't try to use them again
275+
wsSession = null;
276+
connection = null;
277+
upgradeInfo = null;
278+
// null the socket wrapper so that we don't try to use it again
279+
if (socketWrapper != null) {
280+
socketWrapper.close();
281+
socketWrapper = null;
282+
}
271283
}
272284

273285
@Override
@@ -290,8 +302,6 @@ public void timeoutAsync(long now) {
290302
if (wsSession != null) {
291303
try {
292304
final String wsSessionId = wsSession.getId();
293-
// get scope from endpoint config
294-
WebSocketScope scope = (WebSocketScope) endpointConfig.getUserProperties().get(WSConstants.WS_SCOPE);
295305
// do lookup by session id, skips need for session user props
296306
WebSocketConnection conn = scope.getConnectionBySessionId(wsSessionId);
297307
// if we don't get it from the scope, try the session lookup

0 commit comments

Comments
 (0)