Skip to content

Commit 922554b

Browse files
authored
Merge pull request #81 from Red5/bug/event-racing
Bug/event racing
2 parents 0e9bbfa + 66e79ed commit 922554b

File tree

1 file changed

+80
-19
lines changed

1 file changed

+80
-19
lines changed

src/main/java/org/red5/server/net/rtmp/BaseRTMPHandler.java

Lines changed: 80 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,12 @@
1212
import java.util.Set;
1313
import java.util.concurrent.ExecutorService;
1414
import java.util.concurrent.Executors;
15+
import java.util.concurrent.LinkedBlockingQueue;
16+
import java.util.concurrent.atomic.AtomicBoolean;
1517

1618
import org.apache.mina.core.session.IoSession;
1719
import org.red5.io.object.StreamAction;
20+
import org.red5.server.api.Red5;
1821
import org.red5.server.api.event.IEventDispatcher;
1922
import org.red5.server.api.service.IPendingServiceCall;
2023
import org.red5.server.api.service.IPendingServiceCallback;
@@ -37,13 +40,16 @@
3740
import org.red5.server.net.rtmp.message.Packet;
3841
import org.red5.server.net.rtmp.status.StatusCodes;
3942
import org.red5.server.so.SharedObjectMessage;
43+
import org.red5.server.stream.ClientBroadcastStream;
4044
import org.slf4j.Logger;
4145
import org.slf4j.LoggerFactory;
4246

4347
/**
4448
* Base class for all RTMP handlers.
4549
*
4650
* @author The Red5 Project
51+
* @author Andy Shaules
52+
* @author Paul Gregoire
4753
*/
4854
public abstract class BaseRTMPHandler implements IRTMPHandler, Constants, StatusCodes {
4955

@@ -99,17 +105,14 @@ public void messageReceived(RTMPConnection conn, Packet packet) throws Exception
99105
// log.trace("Marking message as originating from a Live source");
100106
message.setSourceType(Constants.SOURCE_TYPE_LIVE);
101107
// NOTE: If we respond to "publish" with "NetStream.Publish.BadName",
102-
// the client sends a few stream packets before stopping. We need to ignore them
108+
// the client sends a few stream packets before stopping; we need to ignore them.
103109
if (stream != null) {
104-
recvDispatchExecutor.submit(() -> {
105-
try {
106-
Thread.currentThread().setName(String.format("RTMPRecvDispatch@%s", conn.getSessionId()));
107-
((IEventDispatcher) stream).dispatchEvent(message);
108-
message.release();
109-
} catch (Exception e) {
110-
log.warn("Exception on Media dispatch", e);
111-
}
112-
});
110+
EnsuresPacketExecutionOrder epeo = (EnsuresPacketExecutionOrder) conn.getAttribute(EnsuresPacketExecutionOrder.ATTRIBUTE_NAME);
111+
if (epeo == null && stream != null) {
112+
epeo = new EnsuresPacketExecutionOrder((ClientBroadcastStream) stream, conn);
113+
conn.setAttribute(EnsuresPacketExecutionOrder.ATTRIBUTE_NAME, epeo);
114+
}
115+
epeo.addPacket(message);
113116
}
114117
break;
115118
case TYPE_FLEX_SHARED_OBJECT:
@@ -131,15 +134,12 @@ public void messageReceived(RTMPConnection conn, Packet packet) throws Exception
131134
case TYPE_FLEX_STREAM_SEND:
132135
if (((Notify) message).getData() != null && stream != null) {
133136
// Stream metadata
134-
recvDispatchExecutor.submit(() -> {
135-
try {
136-
Thread.currentThread().setName(String.format("RTMPRecvDispatch@%s", conn.getSessionId()));
137-
((IEventDispatcher) stream).dispatchEvent(message);
138-
message.release();
139-
} catch (Exception e) {
140-
log.warn("Exception on Notify dispatch", e);
141-
}
142-
});
137+
EnsuresPacketExecutionOrder epeo = (EnsuresPacketExecutionOrder) conn.getAttribute(EnsuresPacketExecutionOrder.ATTRIBUTE_NAME);
138+
if (epeo == null) {
139+
epeo = new EnsuresPacketExecutionOrder((ClientBroadcastStream) stream, conn);
140+
conn.setAttribute(EnsuresPacketExecutionOrder.ATTRIBUTE_NAME, epeo);
141+
}
142+
epeo.addPacket(message);
143143
} else {
144144
onCommand(conn, channel, header, (Notify) message);
145145
}
@@ -364,4 +364,65 @@ protected void onStreamBytesRead(RTMPConnection conn, Channel channel, Header so
364364
*/
365365
protected abstract void onSharedObject(RTMPConnection conn, Channel channel, Header source, SharedObjectMessage message);
366366

367+
/**
368+
* Class ensures a stream's event dispatching occurs on only one core at any one time. Eliminates thread racing internal to ClientBroadcastStream
369+
* and keeps all incoming events in order.
370+
*/
371+
private static class EnsuresPacketExecutionOrder implements Runnable {
372+
373+
public final static String ATTRIBUTE_NAME = "EnsuresPacketExecutionOrder";
374+
375+
private LinkedBlockingQueue<IRTMPEvent> events = new LinkedBlockingQueue<>();
376+
377+
private AtomicBoolean state = new AtomicBoolean();
378+
379+
private final ClientBroadcastStream stream;
380+
381+
private final RTMPConnection conn;
382+
383+
private int iter;
384+
385+
public EnsuresPacketExecutionOrder(ClientBroadcastStream stream, RTMPConnection conn) {
386+
this.stream = stream;
387+
this.conn = conn;
388+
}
389+
390+
/**
391+
* Add packet to the stream's incoming queue.
392+
* @param packet
393+
*/
394+
public void addPacket(IRTMPEvent packet) {
395+
events.offer(packet);
396+
if (state.compareAndSet(false, true)) {
397+
recvDispatchExecutor.submit(this);
398+
}
399+
}
400+
401+
public void run() {
402+
// use int to identify different thread instance
403+
Thread.currentThread().setName(String.format("RTMPRecvDispatch@%s-%d", conn.getSessionId(), iter++));
404+
iter &= 7;
405+
// always set connection local on dispatch threads
406+
Red5.setConnectionLocal(conn);
407+
// we were created for a reason, grab the event
408+
IRTMPEvent message = events.poll();
409+
// null check just in case queue was drained before we woke
410+
if (message != null) {
411+
// dispatch to stream
412+
stream.dispatchEvent(message);
413+
// release / clean up
414+
message.release();
415+
}
416+
// set null before resubmit
417+
Red5.setConnectionLocal(null);
418+
// resubmit for another go if we have more
419+
if (!events.isEmpty()) {
420+
recvDispatchExecutor.submit(this);
421+
} else {
422+
state.set(false);
423+
}
424+
// resubmitting rather than looping until empty plays nice with other threads
425+
}
426+
}
427+
367428
}

0 commit comments

Comments
 (0)