Skip to content

Commit e2a4cd5

Browse files
committed
Fix for event dispatching and update for connection property exposure to extension
1 parent 85d2998 commit e2a4cd5

File tree

2 files changed

+12
-13
lines changed

2 files changed

+12
-13
lines changed

src/main/java/org/red5/server/net/rtmp/BaseRTMPHandler.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
import org.red5.server.net.rtmp.message.Packet;
4141
import org.red5.server.net.rtmp.status.StatusCodes;
4242
import org.red5.server.so.SharedObjectMessage;
43-
import org.red5.server.stream.ClientBroadcastStream;
4443
import org.slf4j.Logger;
4544
import org.slf4j.LoggerFactory;
4645

@@ -109,7 +108,7 @@ public void messageReceived(RTMPConnection conn, Packet packet) throws Exception
109108
if (stream != null) {
110109
EnsuresPacketExecutionOrder epeo = (EnsuresPacketExecutionOrder) conn.getAttribute(EnsuresPacketExecutionOrder.ATTRIBUTE_NAME);
111110
if (epeo == null && stream != null) {
112-
epeo = new EnsuresPacketExecutionOrder((ClientBroadcastStream) stream, conn);
111+
epeo = new EnsuresPacketExecutionOrder((IEventDispatcher) stream, conn);
113112
conn.setAttribute(EnsuresPacketExecutionOrder.ATTRIBUTE_NAME, epeo);
114113
}
115114
epeo.addPacket(message);
@@ -136,7 +135,7 @@ public void messageReceived(RTMPConnection conn, Packet packet) throws Exception
136135
// Stream metadata
137136
EnsuresPacketExecutionOrder epeo = (EnsuresPacketExecutionOrder) conn.getAttribute(EnsuresPacketExecutionOrder.ATTRIBUTE_NAME);
138137
if (epeo == null) {
139-
epeo = new EnsuresPacketExecutionOrder((ClientBroadcastStream) stream, conn);
138+
epeo = new EnsuresPacketExecutionOrder((IEventDispatcher) stream, conn);
140139
conn.setAttribute(EnsuresPacketExecutionOrder.ATTRIBUTE_NAME, epeo);
141140
}
142141
epeo.addPacket(message);
@@ -376,13 +375,13 @@ private static class EnsuresPacketExecutionOrder implements Runnable {
376375

377376
private AtomicBoolean state = new AtomicBoolean();
378377

379-
private final ClientBroadcastStream stream;
378+
private final IEventDispatcher stream;
380379

381380
private final RTMPConnection conn;
382381

383382
private int iter;
384383

385-
public EnsuresPacketExecutionOrder(ClientBroadcastStream stream, RTMPConnection conn) {
384+
public EnsuresPacketExecutionOrder(IEventDispatcher stream, RTMPConnection conn) {
386385
this.stream = stream;
387386
this.conn = conn;
388387
}

src/main/java/org/red5/server/net/rtmp/RTMPConnection.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -185,31 +185,31 @@ public abstract class RTMPConnection extends BaseConnection implements IStreamCa
185185
*
186186
* @see org.red5.server.net.rtmp.Channel
187187
*/
188-
private transient ConcurrentMap<Integer, Channel> channels = new ConcurrentHashMap<>(channelsInitalCapacity, 0.9f, channelsConcurrencyLevel);
188+
protected transient ConcurrentMap<Integer, Channel> channels = new ConcurrentHashMap<>(channelsInitalCapacity, 0.9f, channelsConcurrencyLevel);
189189

190190
/**
191191
* Queues of tasks for every channel
192192
*
193193
* @see org.red5.server.net.rtmp.ReceivedMessageTaskQueue
194194
*/
195-
private final transient ConcurrentMap<Integer, ReceivedMessageTaskQueue> tasksByStreams = new ConcurrentHashMap<>(streamsInitalCapacity, 0.9f, streamsConcurrencyLevel);
195+
protected final transient ConcurrentMap<Integer, ReceivedMessageTaskQueue> tasksByStreams = new ConcurrentHashMap<>(streamsInitalCapacity, 0.9f, streamsConcurrencyLevel);
196196

197197
/**
198198
* Client streams
199199
*
200200
* @see org.red5.server.api.stream.IClientStream
201201
*/
202-
private transient ConcurrentMap<Number, IClientStream> streams = new ConcurrentHashMap<>(streamsInitalCapacity, 0.9f, streamsConcurrencyLevel);
202+
protected transient ConcurrentMap<Number, IClientStream> streams = new ConcurrentHashMap<>(streamsInitalCapacity, 0.9f, streamsConcurrencyLevel);
203203

204204
/**
205205
* Reserved stream ids. Stream id's directly relate to individual NetStream instances.
206206
*/
207-
private transient Set<Number> reservedStreams = Collections.newSetFromMap(new ConcurrentHashMap<Number, Boolean>(reservedStreamsInitalCapacity, 0.9f, reservedStreamsConcurrencyLevel));
207+
protected transient Set<Number> reservedStreams = Collections.newSetFromMap(new ConcurrentHashMap<Number, Boolean>(reservedStreamsInitalCapacity, 0.9f, reservedStreamsConcurrencyLevel));
208208

209209
/**
210210
* Transaction identifier for remote commands.
211211
*/
212-
private AtomicInteger transactionId = new AtomicInteger(1);
212+
protected AtomicInteger transactionId = new AtomicInteger(1);
213213

214214
/**
215215
* Hash map that stores pending calls and ids as pairs.
@@ -335,7 +335,7 @@ public abstract class RTMPConnection extends BaseConnection implements IStreamCa
335335
/**
336336
* Timestamp generator
337337
*/
338-
private final AtomicInteger timer = new AtomicInteger(0);
338+
protected final AtomicInteger timer = new AtomicInteger(0);
339339

340340
/**
341341
* Closing flag
@@ -559,8 +559,8 @@ private void startRoundTripMeasurement() {
559559
}
560560
}
561561
} else {
562-
// reducing from error to debug as its not all that important of a message these days to have such promotion
563-
log.debug("startRoundTripMeasurement cannot be executed due to missing scheduler. This can happen if a connection drops before handshake is complete");
562+
// reducing from error to trace as its not all that important of a message these days to have such promotion
563+
log.trace("startRoundTripMeasurement not enabled. If RTMP, can occur when lost before handshake is complete");
564564
}
565565
}
566566

0 commit comments

Comments
 (0)