1212import java .util .Set ;
1313import java .util .concurrent .ExecutorService ;
1414import java .util .concurrent .Executors ;
15+ import java .util .concurrent .LinkedBlockingQueue ;
16+ import java .util .concurrent .atomic .AtomicBoolean ;
1517
1618import org .apache .mina .core .session .IoSession ;
1719import org .red5 .io .object .StreamAction ;
20+ import org .red5 .server .api .Red5 ;
1821import org .red5 .server .api .event .IEventDispatcher ;
1922import org .red5 .server .api .service .IPendingServiceCall ;
2023import org .red5 .server .api .service .IPendingServiceCallback ;
3740import org .red5 .server .net .rtmp .message .Packet ;
3841import org .red5 .server .net .rtmp .status .StatusCodes ;
3942import org .red5 .server .so .SharedObjectMessage ;
43+ import org .red5 .server .stream .ClientBroadcastStream ;
4044import org .slf4j .Logger ;
4145import org .slf4j .LoggerFactory ;
4246
4347/**
4448 * Base class for all RTMP handlers.
4549 *
4650 * @author The Red5 Project
51+ * @author Andy Shaules
52+ * @author Paul Gregoire
4753 */
4854public abstract class BaseRTMPHandler implements IRTMPHandler , Constants , StatusCodes {
4955
@@ -99,17 +105,14 @@ public void messageReceived(RTMPConnection conn, Packet packet) throws Exception
99105 // log.trace("Marking message as originating from a Live source");
100106 message .setSourceType (Constants .SOURCE_TYPE_LIVE );
101107 // NOTE: If we respond to "publish" with "NetStream.Publish.BadName",
102- // the client sends a few stream packets before stopping. We need to ignore them
108+ // the client sends a few stream packets before stopping; we need to ignore them.
103109 if (stream != null ) {
104- recvDispatchExecutor .submit (() -> {
105- try {
106- Thread .currentThread ().setName (String .format ("RTMPRecvDispatch@%s" , conn .getSessionId ()));
107- ((IEventDispatcher ) stream ).dispatchEvent (message );
108- message .release ();
109- } catch (Exception e ) {
110- log .warn ("Exception on Media dispatch" , e );
111- }
112- });
110+ EnsuresPacketExecutionOrder epeo = (EnsuresPacketExecutionOrder ) conn .getAttribute (EnsuresPacketExecutionOrder .ATTRIBUTE_NAME );
111+ if (epeo == null && stream != null ) {
112+ epeo = new EnsuresPacketExecutionOrder ((ClientBroadcastStream ) stream , conn );
113+ conn .setAttribute (EnsuresPacketExecutionOrder .ATTRIBUTE_NAME , epeo );
114+ }
115+ epeo .addPacket (message );
113116 }
114117 break ;
115118 case TYPE_FLEX_SHARED_OBJECT :
@@ -131,15 +134,12 @@ public void messageReceived(RTMPConnection conn, Packet packet) throws Exception
131134 case TYPE_FLEX_STREAM_SEND :
132135 if (((Notify ) message ).getData () != null && stream != null ) {
133136 // Stream metadata
134- recvDispatchExecutor .submit (() -> {
135- try {
136- Thread .currentThread ().setName (String .format ("RTMPRecvDispatch@%s" , conn .getSessionId ()));
137- ((IEventDispatcher ) stream ).dispatchEvent (message );
138- message .release ();
139- } catch (Exception e ) {
140- log .warn ("Exception on Notify dispatch" , e );
141- }
142- });
137+ EnsuresPacketExecutionOrder epeo = (EnsuresPacketExecutionOrder ) conn .getAttribute (EnsuresPacketExecutionOrder .ATTRIBUTE_NAME );
138+ if (epeo == null ) {
139+ epeo = new EnsuresPacketExecutionOrder ((ClientBroadcastStream ) stream , conn );
140+ conn .setAttribute (EnsuresPacketExecutionOrder .ATTRIBUTE_NAME , epeo );
141+ }
142+ epeo .addPacket (message );
143143 } else {
144144 onCommand (conn , channel , header , (Notify ) message );
145145 }
@@ -364,4 +364,65 @@ protected void onStreamBytesRead(RTMPConnection conn, Channel channel, Header so
364364 */
365365 protected abstract void onSharedObject (RTMPConnection conn , Channel channel , Header source , SharedObjectMessage message );
366366
367+ /**
368+ * Class ensures a stream's event dispatching occurs on only one core at any one time. Eliminates thread racing internal to ClientBroadcastStream
369+ * and keeps all incoming events in order.
370+ */
371+ private static class EnsuresPacketExecutionOrder implements Runnable {
372+
373+ public final static String ATTRIBUTE_NAME = "EnsuresPacketExecutionOrder" ;
374+
375+ private LinkedBlockingQueue <IRTMPEvent > events = new LinkedBlockingQueue <>();
376+
377+ private AtomicBoolean state = new AtomicBoolean ();
378+
379+ private final ClientBroadcastStream stream ;
380+
381+ private final RTMPConnection conn ;
382+
383+ private int iter ;
384+
385+ public EnsuresPacketExecutionOrder (ClientBroadcastStream stream , RTMPConnection conn ) {
386+ this .stream = stream ;
387+ this .conn = conn ;
388+ }
389+
390+ /**
391+ * Add packet to the stream's incoming queue.
392+ * @param packet
393+ */
394+ public void addPacket (IRTMPEvent packet ) {
395+ events .offer (packet );
396+ if (state .compareAndSet (false , true )) {
397+ recvDispatchExecutor .submit (this );
398+ }
399+ }
400+
401+ public void run () {
402+ // use int to identify different thread instance
403+ Thread .currentThread ().setName (String .format ("RTMPRecvDispatch@%s-%d" , conn .getSessionId (), iter ++));
404+ iter &= 7 ;
405+ // always set connection local on dispatch threads
406+ Red5 .setConnectionLocal (conn );
407+ // we were created for a reason, grab the event
408+ IRTMPEvent message = events .poll ();
409+ // null check just in case queue was drained before we woke
410+ if (message != null ) {
411+ // dispatch to stream
412+ stream .dispatchEvent (message );
413+ // release / clean up
414+ message .release ();
415+ }
416+ // set null before resubmit
417+ Red5 .setConnectionLocal (null );
418+ // resubmit for another go if we have more
419+ if (!events .isEmpty ()) {
420+ recvDispatchExecutor .submit (this );
421+ } else {
422+ state .set (false );
423+ }
424+ // resubmitting rather than looping until empty plays nice with other threads
425+ }
426+ }
427+
367428}
0 commit comments