Skip to content

Commit 9e7ffce

Browse files
committed
Implemented internal ws conn pinger on a single thread
1 parent 4a389a4 commit 9e7ffce

File tree

5 files changed

+57
-28
lines changed

5 files changed

+57
-28
lines changed

server/src/main/java/org/red5/net/websocket/WebSocketScope.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import java.util.Optional;
1313
import java.util.Set;
1414
import java.util.concurrent.ConcurrentSkipListSet;
15+
import java.util.concurrent.CopyOnWriteArraySet;
1516

1617
import org.red5.net.websocket.listener.IWebSocketDataListener;
1718
import org.red5.net.websocket.model.WSMessage;
@@ -36,7 +37,8 @@ public class WebSocketScope implements InitializingBean, DisposableBean {
3637

3738
protected ConcurrentSkipListSet<WebSocketConnection> conns = new ConcurrentSkipListSet<>();
3839

39-
protected ConcurrentSkipListSet<IWebSocketDataListener> listeners = new ConcurrentSkipListSet<>();
40+
// this has very few entries, possibly only one, COWAS is fine here and won't incur Comparable requirements
41+
protected CopyOnWriteArraySet<IWebSocketDataListener> listeners = new CopyOnWriteArraySet<>();
4042

4143
protected IScope scope;
4244

@@ -180,9 +182,7 @@ public void addConnection(WebSocketConnection conn) {
180182
// prevent false failed logging when a connection is already registered
181183
if (conns.add(conn)) {
182184
log.debug("Added connection: {}", conn);
183-
for (IWebSocketDataListener listener : listeners) {
184-
listener.onWSConnect(conn);
185-
}
185+
listeners.forEach(listener -> listener.onWSConnect(conn));
186186
} else {
187187
log.debug("Add connection skipped, already registered: {}", conn);
188188
}
@@ -197,9 +197,7 @@ public void removeConnection(WebSocketConnection conn) {
197197
// prevent false failed logging when a connection isnt registered
198198
if (conns.remove(conn)) {
199199
log.debug("Removed connection: {}", conn);
200-
for (IWebSocketDataListener listener : listeners) {
201-
listener.onWSDisconnect(conn);
202-
}
200+
listeners.forEach(listener -> listener.onWSDisconnect(conn));
203201
} else {
204202
log.debug("Remove connection skipped, not registered: {}", conn);
205203
}

server/src/main/java/org/red5/net/websocket/WebSocketScopeManager.java

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
import java.util.concurrent.ConcurrentHashMap;
1212
import java.util.concurrent.ConcurrentMap;
1313
import java.util.concurrent.CopyOnWriteArraySet;
14+
import java.util.concurrent.ExecutorService;
15+
import java.util.concurrent.Executors;
1416
import java.util.concurrent.Future;
1517

1618
import org.red5.net.websocket.listener.DefaultWebSocketDataListener;
@@ -32,6 +34,15 @@ public class WebSocketScopeManager {
3234

3335
private static final Logger log = LoggerFactory.getLogger(WebSocketScopeManager.class);
3436

37+
// used to ping WS connections
38+
private static final byte[] PING_BYTES = "PING!".getBytes();
39+
40+
// one executor per scope manager
41+
private ExecutorService executor = Executors.newFixedThreadPool(1);
42+
43+
// future for the ws pinger
44+
private Future<?> pingFuture;
45+
3546
// reference to the owning application scope
3647
private IScope appScope;
3748

@@ -47,6 +58,9 @@ public class WebSocketScopeManager {
4758
// whether or not to copy listeners from parent to child on create
4859
protected boolean copyListeners = true;
4960

61+
// value for the websocket ping period/interval
62+
public static long websocketPingInterval = 5000L;
63+
5064
public void addListener(IWebSocketScopeListener listener) {
5165
scopeListeners.add(listener);
5266
}
@@ -136,6 +150,37 @@ public boolean addWebSocketScope(WebSocketScope webSocketScope) {
136150
if (scopes.putIfAbsent(path, webSocketScope) == null) {
137151
log.info("addWebSocketScope: {}", webSocketScope);
138152
notifyListeners(WebSocketEvent.SCOPE_ADDED, webSocketScope);
153+
// ensure the ping future exists, if not spawn it
154+
if (pingFuture == null || pingFuture.isDone()) {
155+
pingFuture = executor.submit(() -> {
156+
final String oldName = Thread.currentThread().getName();
157+
Thread.currentThread().setName("WebSocketPinger");
158+
do {
159+
scopes.forEach((sName, wsScope) -> {
160+
log.trace("start pinging scope: {}", sName);
161+
wsScope.getConns().forEach(wsConn -> {
162+
// ping connected websocket
163+
if (wsConn.isConnected()) {
164+
log.debug("pinging ws: {} on scope: {}", wsConn.getWsSessionId(), sName);
165+
try {
166+
wsConn.sendPing(PING_BYTES);
167+
} catch (Exception e) {
168+
}
169+
}
170+
});
171+
log.trace("finished pinging scope: {}", sName);
172+
});
173+
// sleep for interval
174+
try {
175+
Thread.sleep(websocketPingInterval);
176+
} catch (InterruptedException e1) {
177+
}
178+
} while (!scopes.isEmpty());
179+
// reset ping future
180+
pingFuture = null;
181+
Thread.currentThread().setName(oldName);
182+
});
183+
}
139184
return true;
140185
}
141186
return false;
@@ -165,7 +210,9 @@ public boolean removeWebSocketScope(WebSocketScope webSocketScope) {
165210
*/
166211
public void addConnection(WebSocketConnection conn) {
167212
WebSocketScope scope = getScope(conn);
168-
scope.addConnection(conn);
213+
if (scope != null) {
214+
scope.addConnection(conn);
215+
}
169216
}
170217

171218
/**
@@ -180,7 +227,7 @@ public void removeConnection(WebSocketConnection conn) {
180227
if (scope != null) {
181228
scope.removeConnection(conn);
182229
if (!scope.isValid()) {
183-
// scope is not valid. delete this.
230+
// scope is not valid, delete it
184231
removeWebSocketScope(scope);
185232
}
186233
}
@@ -346,6 +393,9 @@ private WebSocketScope getScope(WebSocketConnection conn) {
346393
* Stops this manager and the scopes contained within.
347394
*/
348395
public void stop() {
396+
if (pingFuture != null && !pingFuture.isCancelled()) {
397+
pingFuture.cancel(true);
398+
}
349399
for (WebSocketScope scope : scopes.values()) {
350400
scope.unregister();
351401
}

server/src/main/java/org/red5/net/websocket/server/DefaultWebSocketEndpoint.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,6 @@ public class DefaultWebSocketEndpoint extends Endpoint {
5252
* onMessage, onClose methods to ensure the room thread always gets the correct instance of the variable holder.
5353
*/
5454

55-
//private ThreadLocal<WebSocketConnection> connectionLocal = new ThreadLocal<>();
56-
5755
@Override
5856
public void onOpen(Session session, EndpointConfig config) {
5957
log.debug("Session opened: {}\n{}", session.getId(), session.getRequestParameterMap());

server/src/main/java/org/red5/net/websocket/server/UpgradeUtil.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -178,8 +178,6 @@ public static void doUpgrade(DefaultWsServerContainer sc, HttpServletRequest req
178178
ep = new PojoEndpointServer(pathParams, clazz);
179179
// Need to make path params available to POJO
180180
perSessionServerEndpointConfig.getUserProperties().put("org.apache.tomcat.websocket.pojo.PojoEndpoint.pathParams", pathParams);
181-
// removed in 8.5.x post .61
182-
//perSessionServerEndpointConfig.getUserProperties().put(org.apache.tomcat.websocket.pojo.Constants.POJO_PATH_PARAM_KEY, pathParams);
183181
}
184182
} catch (InstantiationException e) {
185183
throw new ServletException(e);

server/src/main/java/org/red5/net/websocket/server/WsFrameServer.java

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -50,17 +50,6 @@ private void onDataAvailable() throws IOException {
5050
if (log.isDebugEnabled()) {
5151
log.debug("wsFrameServer.onDataAvailable - session {}", wsSession.getId());
5252
}
53-
/*
54-
// set connection local to the message handler so WSMessage will contain the connection
55-
DefaultWebSocketEndpoint ep = (DefaultWebSocketEndpoint) wsSession.getLocal();
56-
if (ep.getConnectionLocal() == null) {
57-
if (log.isDebugEnabled()) {
58-
log.debug("Endpoint had no connection local for session: {}", wsSession.getId());
59-
}
60-
WebSocketConnection conn = (WebSocketConnection) wsSession.getUserProperties().get(WSConstants.WS_CONNECTION);
61-
ep.setConnectionLocal(conn);
62-
}
63-
*/
6453
// handle input
6554
if (isOpen() && inputBuffer.hasRemaining() && !isSuspended()) {
6655
// There might be a data that was left in the buffer when the read has been suspended.
@@ -84,10 +73,6 @@ private void onDataAvailable() throws IOException {
8473
}
8574
processInputBuffer();
8675
}
87-
/*
88-
// clear thread local
89-
((DefaultWebSocketEndpoint) wsSession.getLocal()).setConnectionLocal(null);
90-
*/
9176
}
9277

9378
@Override

0 commit comments

Comments
 (0)