Skip to content

Commit 855c331

Browse files
committed
Added sync around ws session blocks to prevent full-text errors
1 parent a4b97a7 commit 855c331

File tree

1 file changed

+36
-26
lines changed

1 file changed

+36
-26
lines changed

server/src/main/java/org/red5/net/websocket/WebSocketConnection.java

Lines changed: 36 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -167,15 +167,17 @@ public void send(String data) throws UnsupportedEncodingException, IOException {
167167
if (wsSession.isOpen()) {
168168
if (isConnected()) {
169169
try {
170-
int lengthToWrite = data.getBytes().length;
171-
if (useAsync) {
172-
// a new future is returned on each call
173-
Future<Void> sendFuture = wsSession.getAsyncRemote().sendText(data);
174-
sendFuture.get(sendTimeout, TimeUnit.MILLISECONDS);
175-
} else {
176-
wsSession.getBasicRemote().sendText(data);
170+
synchronized (wsSession) {
171+
int lengthToWrite = data.getBytes().length;
172+
if (useAsync) {
173+
// a new future is returned on each call
174+
Future<Void> sendFuture = wsSession.getAsyncRemote().sendText(data);
175+
sendFuture.get(sendTimeout, TimeUnit.MILLISECONDS);
176+
} else {
177+
wsSession.getBasicRemote().sendText(data);
178+
}
179+
writeUpdater.addAndGet(this, lengthToWrite);
177180
}
178-
writeUpdater.addAndGet(this, lengthToWrite);
179181
} catch (TimeoutException e) {
180182
log.warn("Send timed out");
181183
} catch (Exception e) {
@@ -206,16 +208,18 @@ public void send(byte[] buf) throws IOException {
206208
}
207209
if (wsSession.isOpen()) {
208210
try {
209-
// send the bytes
210-
if (useAsync) {
211-
Future<Void> sendFuture = wsSession.getAsyncRemote().sendBinary(ByteBuffer.wrap(buf));
212-
// wait up-to ws timeout
213-
sendFuture.get(sendTimeout, TimeUnit.MILLISECONDS);
214-
} else {
215-
wsSession.getBasicRemote().sendBinary(ByteBuffer.wrap(buf));
211+
synchronized (wsSession) {
212+
// send the bytes
213+
if (useAsync) {
214+
Future<Void> sendFuture = wsSession.getAsyncRemote().sendBinary(ByteBuffer.wrap(buf));
215+
// wait up-to ws timeout
216+
sendFuture.get(sendTimeout, TimeUnit.MILLISECONDS);
217+
} else {
218+
wsSession.getBasicRemote().sendBinary(ByteBuffer.wrap(buf));
219+
}
220+
// update counter
221+
writeUpdater.addAndGet(this, buf.length);
216222
}
217-
// update counter
218-
writeUpdater.addAndGet(this, buf.length);
219223
} catch (Exception e) {
220224
log.warn("Send bytes exception", e);
221225
}
@@ -236,10 +240,12 @@ public void sendPing(byte[] buf) throws IllegalArgumentException, IOException {
236240
log.trace("send ping: {}", buf);
237241
}
238242
if (wsSession.isOpen()) {
239-
// send the bytes
240-
wsSession.getBasicRemote().sendPing(ByteBuffer.wrap(buf));
241-
// update counter
242-
writeUpdater.addAndGet(this, buf.length);
243+
synchronized (wsSession) {
244+
// send the bytes
245+
wsSession.getBasicRemote().sendPing(ByteBuffer.wrap(buf));
246+
// update counter
247+
writeUpdater.addAndGet(this, buf.length);
248+
}
243249
} else {
244250
throw new IOException("WS session closed");
245251
}
@@ -257,10 +263,12 @@ public void sendPong(byte[] buf) throws IllegalArgumentException, IOException {
257263
log.trace("send pong: {}", buf);
258264
}
259265
if (wsSession.isOpen()) {
260-
// send the bytes
261-
wsSession.getBasicRemote().sendPong(ByteBuffer.wrap(buf));
262-
// update counter
263-
writeUpdater.addAndGet(this, buf.length);
266+
synchronized (wsSession) {
267+
// send the bytes
268+
wsSession.getBasicRemote().sendPong(ByteBuffer.wrap(buf));
269+
// update counter
270+
writeUpdater.addAndGet(this, buf.length);
271+
}
264272
} else {
265273
throw new IOException("WS session closed");
266274
}
@@ -276,7 +284,9 @@ public void close() {
276284
// normal close
277285
if (wsSession.isOpen()) {
278286
try {
279-
wsSession.close();
287+
synchronized (wsSession) {
288+
wsSession.close();
289+
}
280290
} catch (Throwable e) {
281291
log.debug("Exception during close", e);
282292
}

0 commit comments

Comments
 (0)