Skip to content

Commit d8ede0f

Browse files
committed
Updated version to 1.2.23. Fixes for scope lookup and rtmpe playback
1 parent f67d2a1 commit d8ede0f

File tree

16 files changed

+87
-56
lines changed

16 files changed

+87
-56
lines changed

client/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
<parent>
44
<groupId>org.red5</groupId>
55
<artifactId>red5-parent</artifactId>
6-
<version>1.2.22</version>
6+
<version>1.2.23</version>
77
</parent>
88
<modelVersion>4.0.0</modelVersion>
99
<artifactId>red5-client</artifactId>

client/src/main/java/org/red5/client/Red5Client.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ public final class Red5Client {
1818
/**
1919
* Current server version with revision
2020
*/
21-
public static final String VERSION = "Red5 Client 1.2.22";
21+
public static final String VERSION = "Red5 Client 1.2.23";
2222

2323
/**
2424
* Create a new Red5Client object using the connection local to the current thread A bit of magic that lets you access the red5 scope

common/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
<parent>
44
<groupId>org.red5</groupId>
55
<artifactId>red5-parent</artifactId>
6-
<version>1.2.22</version>
6+
<version>1.2.23</version>
77
</parent>
88
<modelVersion>4.0.0</modelVersion>
99
<artifactId>red5-server-common</artifactId>

common/src/main/java/org/red5/server/api/Red5.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,12 +55,12 @@ public final class Red5 {
5555
/**
5656
* Server version with revision
5757
*/
58-
public static final String VERSION = "Red5 Server 1.2.22";
58+
public static final String VERSION = "Red5 Server 1.2.23";
5959

6060
/**
6161
* Server version for fmsVer requests
6262
*/
63-
public static final String FMS_VERSION = "RED5/1,2,22,0";
63+
public static final String FMS_VERSION = "RED5/1,2,23,0";
6464

6565
/**
6666
* Server capabilities

common/src/main/java/org/red5/server/net/rtmp/RTMPConnection.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,13 @@
4949
import org.red5.server.exception.ClientRejectedException;
5050
import org.red5.server.net.protocol.RTMPDecodeState;
5151
import org.red5.server.net.rtmp.codec.RTMP;
52+
import org.red5.server.net.rtmp.event.AudioData;
5253
import org.red5.server.net.rtmp.event.BytesRead;
5354
import org.red5.server.net.rtmp.event.ChunkSize;
5455
import org.red5.server.net.rtmp.event.ClientBW;
5556
import org.red5.server.net.rtmp.event.ClientInvokeEvent;
5657
import org.red5.server.net.rtmp.event.ClientNotifyEvent;
58+
import org.red5.server.net.rtmp.event.IRTMPEvent;
5759
import org.red5.server.net.rtmp.event.Invoke;
5860
import org.red5.server.net.rtmp.event.Notify;
5961
import org.red5.server.net.rtmp.event.Ping;
@@ -1366,6 +1368,10 @@ protected void writingMessage(Packet message) {
13661368
}
13671369
old.incrementAndGet();
13681370
}
1371+
// XXX(paul) work-around for RTMPE issue with Mina messageSent callback
1372+
if (isEncrypted()) {
1373+
writtenMessages.incrementAndGet();
1374+
}
13691375
}
13701376

13711377
/**
@@ -1602,7 +1608,10 @@ public void onSuccess(Packet packet) {
16021608
* Message to mark
16031609
*/
16041610
public void messageSent(Packet message) {
1605-
if (message.getMessage() instanceof VideoData) {
1611+
//log.info("messageSent: {}", message);
1612+
IRTMPEvent event = message.getMessage();
1613+
if (event instanceof VideoData) {
1614+
log.info("Video message sent");
16061615
Number streamId = message.getHeader().getStreamId();
16071616
AtomicInteger pending = pendingVideos.get(streamId.doubleValue());
16081617
if (isTrace) {
@@ -1611,6 +1620,12 @@ public void messageSent(Packet message) {
16111620
if (pending != null) {
16121621
pending.decrementAndGet();
16131622
}
1623+
} else if (event instanceof AudioData) {
1624+
log.info("Audio message sent");
1625+
} else if (event instanceof Notify) {
1626+
log.info("Notify message sent");
1627+
} else {
1628+
log.warn("Message sent: {} data type: {}", event.getType(), event.getDataType());
16141629
}
16151630
writtenMessages.incrementAndGet();
16161631
}

common/src/main/java/org/red5/server/net/rtmp/RTMPMinaConnection.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -327,15 +327,14 @@ public void setIoSession(IoSession protocolSession) {
327327
public void write(Packet out) {
328328
if (ioSession != null) {
329329
final Semaphore lock = getLock();
330-
if (log.isTraceEnabled()) {
331-
log.trace("Write lock wait count: {} closed: {}", lock.getQueueLength(), isClosed());
332-
}
330+
//if (log.isTraceEnabled()) {
331+
// log.trace("Write lock wait count: {} closed: {}", lock.getQueueLength(), isClosed());
332+
//}
333333
while (!isClosed()) {
334334
boolean acquired = false;
335335
try {
336336
acquired = lock.tryAcquire(10, TimeUnit.MILLISECONDS);
337-
if (acquired) {
338-
// attempt write if not closing
337+
if (acquired) { // attempt write if not closing
339338
if (!ioSession.isClosing()) {
340339
if (log.isTraceEnabled()) {
341340
log.trace("Writing message");
@@ -348,12 +347,11 @@ public void write(Packet out) {
348347
} catch (InterruptedException e) {
349348
log.warn("Interrupted while waiting for write lock. State: {}", RTMP.states[state.getState()], e);
350349
if (log.isInfoEnabled()) {
351-
// further debugging to assist with possible connection problems
350+
// further debugging to assist with possible connection problems
352351
log.info("Session id: {} in queue size: {} pending msgs: {} last ping/pong: {}", getSessionId(), currentQueueSize(), getPendingMessages(), getLastPingSentAndLastPongReceivedInterval());
353352
log.info("Available permits - decoder: {} encoder: {}", decoderLock.availablePermits(), encoderLock.availablePermits());
354353
}
355-
String exMsg = e.getMessage();
356-
// if the exception cause is null break out of here to prevent looping until closed
354+
String exMsg = e.getMessage(); // if the exception cause is null break out of here to prevent looping until closed
357355
if (exMsg == null || exMsg.indexOf("null") >= 0) {
358356
log.debug("Exception writing to connection: {}", this);
359357
break;

common/src/main/java/org/red5/server/scope/Scope.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1446,7 +1446,7 @@ public IBasicScope getBasicScope(ScopeType type, String name) {
14461446
// if its broadcast type then allow an alias match in addition to the name match
14471447
if (type == ScopeType.BROADCAST) {
14481448
// checks publish and subscribe aliases
1449-
scope = stream().filter(child -> child.getType().equals(type) && (name.equals(child.getName()) || name.equals(((IBroadcastScope) child).getClientBroadcastStream().getAlias()) || ((IBroadcastScope) child).getClientBroadcastStream().containsAlias(name))).findFirst();
1449+
scope = stream().filter(child -> child.getType().equals(type) && (name.equals(child.getName()) || ((IBroadcastScope) child).getClientBroadcastStream().containsAlias(name))).findFirst();
14501450
} else {
14511451
scope = stream().filter(child -> child.getType().equals(type) && name.equals(child.getName())).findFirst();
14521452
}

common/src/main/java/org/red5/server/stream/PlayEngine.java

Lines changed: 28 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,10 @@ public final class PlayEngine implements IFilter, IPushableConsumer, IPipeConnec
8383

8484
private static final Logger log = Red5LoggerFactory.getLogger(PlayEngine.class);
8585

86+
private static boolean isDebug = log.isDebugEnabled();
87+
88+
private static boolean isTrace = log.isTraceEnabled();
89+
8690
private final AtomicReference<IMessageInput> msgInReference = new AtomicReference<>();
8791

8892
private final AtomicReference<IMessageOutput> msgOutReference = new AtomicReference<>();
@@ -280,7 +284,7 @@ void setMessageOut(IMessageOutput msgOut) {
280284
* Start stream
281285
*/
282286
public void start() {
283-
if (log.isDebugEnabled()) {
287+
if (isDebug) {
284288
log.debug("start - subscriber stream state: {}", (subscriberStream != null ? subscriberStream.getState() : null));
285289
}
286290
switch (subscriberStream.getState()) {
@@ -290,7 +294,7 @@ public void start() {
290294
IMessageOutput out = consumerService.getConsumerOutput(subscriberStream);
291295
if (msgOutReference.compareAndSet(null, out)) {
292296
out.subscribe(this, null);
293-
} else if (log.isDebugEnabled()) {
297+
} else if (isDebug) {
294298
log.debug("Message output was already set for stream: {}", subscriberStream);
295299
}
296300
break;
@@ -389,7 +393,7 @@ public void play(IPlayItem item, boolean withReset) throws StreamNotFoundExcepti
389393
IMessage msg = null;
390394
currentItem.set(item);
391395
long itemLength = item.getLength();
392-
if (log.isDebugEnabled()) {
396+
if (isDebug) {
393397
log.debug("Play decision is {} (0=Live, 1=File, 2=Wait, 3=N/A) item length: {}", playDecision, itemLength);
394398
}
395399
switch (playDecision) {
@@ -431,7 +435,7 @@ public void play(IPlayItem item, boolean withReset) throws StreamNotFoundExcepti
431435
in = providerService.getLiveProviderInput(thisScope, itemName, true);
432436
if (msgInReference.compareAndSet(null, in)) {
433437
if (type == -1 && itemLength >= 0) {
434-
if (log.isDebugEnabled()) {
438+
if (isDebug) {
435439
log.debug("Creating wait job for {}", itemLength);
436440
}
437441
// Wait given timeout for stream to be published
@@ -443,7 +447,7 @@ public void execute(ISchedulingService service) {
443447
}
444448
});
445449
} else if (type == -2) {
446-
if (log.isDebugEnabled()) {
450+
if (isDebug) {
447451
log.debug("Creating wait job");
448452
}
449453
// Wait x seconds for the stream to be published
@@ -456,7 +460,7 @@ public void execute(ISchedulingService service) {
456460
} else {
457461
connectToProvider(itemName);
458462
}
459-
} else if (log.isDebugEnabled()) {
463+
} else if (isDebug) {
460464
log.debug("Message input already set for {}", itemName);
461465
}
462466
break;
@@ -756,7 +760,7 @@ public void seek(int position) throws IllegalStateException, OperationNotSupport
756760
* If stream is in stopped state
757761
*/
758762
public void stop() throws IllegalStateException {
759-
if (log.isDebugEnabled()) {
763+
if (isDebug) {
760764
log.debug("stop - subscriber stream state: {}", (subscriberStream != null ? subscriberStream.getState() : null));
761765
}
762766
// allow stop if playing or paused
@@ -805,7 +809,7 @@ public void stop() throws IllegalStateException {
805809
* Close stream
806810
*/
807811
public void close() {
808-
if (log.isDebugEnabled()) {
812+
if (isDebug) {
809813
log.debug("close");
810814
}
811815
if (!subscriberStream.getState().equals(StreamState.CLOSED)) {
@@ -824,7 +828,7 @@ public void close() {
824828
if (out != null) {
825829
List<IConsumer> consumers = out.getConsumers();
826830
// assume a list of 1 in most cases
827-
if (log.isDebugEnabled()) {
831+
if (isDebug) {
828832
log.debug("Message out consumers: {}", consumers.size());
829833
}
830834
if (!consumers.isEmpty()) {
@@ -965,7 +969,7 @@ private void doPushMessage(Status status) {
965969
* The message to send.
966970
*/
967971
private void doPushMessage(AbstractMessage message) {
968-
if (log.isTraceEnabled()) {
972+
if (isTrace) {
969973
String msgType = message.getMessageType();
970974
log.trace("doPushMessage: {}", msgType);
971975
}
@@ -1019,7 +1023,7 @@ private void sendMessage(RTMPMessage messageIn) {
10191023
event.setSourceType(eventIn.getSourceType());
10201024
// instance the outgoing message
10211025
RTMPMessage messageOut = RTMPMessage.build(event, eventTime);
1022-
if (log.isTraceEnabled()) {
1026+
if (isTrace) {
10231027
log.trace("Source type - in: {} out: {}", eventIn.getSourceType(), messageOut.getBody().getSourceType());
10241028
long delta = System.currentTimeMillis() - playbackStart;
10251029
log.trace("sendMessage: streamStartTS {}, length {}, streamOffset {}, timestamp {} last timestamp {} delta {} buffered {}", new Object[] { streamStartTS.get(), currentItem.get().getLength(), streamOffset, eventTime, lastMessageTs, delta, lastMessageTs - delta });
@@ -1032,7 +1036,7 @@ private void sendMessage(RTMPMessage messageIn) {
10321036
long length = currentItem.get().getLength();
10331037
if (length >= 0) {
10341038
int duration = eventTime - streamStartTS.get();
1035-
if (log.isTraceEnabled()) {
1039+
if (isTrace) {
10361040
log.trace("sendMessage duration={} length={}", duration, length);
10371041
}
10381042
if (duration - streamOffset >= length) {
@@ -1052,7 +1056,7 @@ private void sendMessage(RTMPMessage messageIn) {
10521056
// subtract the offset time of when the stream started playing for the client
10531057
eventTime -= startTs;
10541058
messageOut.getBody().setTimestamp(eventTime);
1055-
if (log.isTraceEnabled()) {
1059+
if (isTrace) {
10561060
log.trace("sendMessage (updated): streamStartTS={}, length={}, streamOffset={}, timestamp={}", new Object[] { startTs, currentItem.get().getLength(), streamOffset, eventTime });
10571061
}
10581062
}
@@ -1150,7 +1154,7 @@ private void sendStopStatus(IPlayItem item) {
11501154
* @param bytes
11511155
*/
11521156
private void sendOnPlayStatus(String code, int duration, long bytes) {
1153-
if (log.isDebugEnabled()) {
1157+
if (isDebug) {
11541158
log.debug("Sending onPlayStatus - code: {} duration: {} bytes: {}", code, duration, bytes);
11551159
}
11561160
// create the buffer
@@ -1204,7 +1208,7 @@ private void sendTransitionStatus() {
12041208
private void sendCompleteStatus() {
12051209
// may be the correct duration
12061210
int duration = (lastMessageTs > 0) ? Math.max(0, lastMessageTs - streamStartTS.get()) : 0;
1207-
if (log.isDebugEnabled()) {
1211+
if (isDebug) {
12081212
log.debug("sendCompleteStatus - duration: {} bytes sent: {}", duration, bytesSent.get());
12091213
}
12101214
sendOnPlayStatus(StatusCodes.NS_PLAY_COMPLETE, duration, bytesSent.get());
@@ -1417,7 +1421,7 @@ public void onPipeConnectionEvent(PipeConnectionEvent event) {
14171421
}
14181422
break;
14191423
default:
1420-
if (log.isDebugEnabled()) {
1424+
if (isDebug) {
14211425
log.debug("Unhandled pipe event: {}", event);
14221426
}
14231427
}
@@ -1447,10 +1451,11 @@ public void pushMessage(IPipe pipe, IMessage message) throws IOException {
14471451
RTMPMessage rtmpMessage = (RTMPMessage) message;
14481452
IRTMPEvent body = rtmpMessage.getBody();
14491453
if (body instanceof IStreamData) {
1454+
final String subscribedStreamName = subscriberStream.getBroadcastStreamPublishName();
14501455
// the subscriber paused
14511456
if (subscriberStream.getState() == StreamState.PAUSED) {
14521457
if (log.isInfoEnabled() && shouldLogPacketDrop()) {
1453-
log.info("Dropping packet because we are paused. sessionId={} stream={} count={}", sessionId, subscriberStream.getBroadcastStreamPublishName(), droppedPacketsCount);
1458+
log.info("Dropping packet because we are paused. sessionId={} stream={} count={}", sessionId, subscribedStreamName, droppedPacketsCount);
14541459
}
14551460
videoFrameDropper.dropPacket(rtmpMessage);
14561461
return;
@@ -1469,7 +1474,7 @@ public void pushMessage(IPipe pipe, IMessage message) throws IOException {
14691474
droppedPacketsCount++;
14701475
if (log.isInfoEnabled() && shouldLogPacketDrop()) {
14711476
// client disabled video or the app doesn't have enough bandwidth allowed for this stream
1472-
log.info("Drop packet. Failed to acquire token or no video. sessionId={} stream={} count={}", sessionId, subscriberStream.getBroadcastStreamPublishName(), droppedPacketsCount);
1477+
log.info("Drop packet. Failed to acquire token or no video. sessionId={} stream={} count={}", sessionId, subscribedStreamName, droppedPacketsCount);
14731478
}
14741479
return;
14751480
}
@@ -1480,14 +1485,14 @@ public void pushMessage(IPipe pipe, IMessage message) throws IOException {
14801485
// pending video messages and drop video packets until the queue is below the threshold.
14811486
// only check for frame dropping if the codec supports it
14821487
long pendingVideos = pendingVideoMessages();
1483-
if (log.isTraceEnabled()) {
1484-
log.trace("Pending messages sessionId={} pending={} threshold={} sequential={} stream={}, count={}", new Object[] { sessionId, pendingVideos, maxPendingVideoFrames, numSequentialPendingVideoFrames, subscriberStream.getBroadcastStreamPublishName(), droppedPacketsCount });
1488+
if (isTrace) {
1489+
log.trace("Pending messages sessionId={} stream={} pending={} threshold={} sequential={} dropped={}", new Object[] { sessionId, subscribedStreamName, pendingVideos, maxPendingVideoFrames, numSequentialPendingVideoFrames, droppedPacketsCount });
14851490
}
14861491
if (!videoFrameDropper.canSendPacket(rtmpMessage, pendingVideos)) {
14871492
// drop frame as it depends on other frames that were dropped before
14881493
droppedPacketsCount++;
14891494
if (log.isInfoEnabled() && shouldLogPacketDrop()) {
1490-
log.info("Frame dropper says to drop packet. sessionId={} stream={} count={}", sessionId, subscriberStream.getBroadcastStreamPublishName(), droppedPacketsCount);
1495+
log.info("Frame dropper says to drop packet. sessionId={} stream={} dropped={}", sessionId, subscribedStreamName, droppedPacketsCount);
14911496
}
14921497
return;
14931498
}
@@ -1501,7 +1506,7 @@ public void pushMessage(IPipe pipe, IMessage message) throws IOException {
15011506
if (pendingVideos > maxPendingVideoFrames || numSequentialPendingVideoFrames > maxSequentialPendingVideoFrames) {
15021507
droppedPacketsCount++;
15031508
if (log.isInfoEnabled() && shouldLogPacketDrop()) {
1504-
log.info("Drop packet. Pending above threshold. sessionId={} pending={} threshold={} sequential={} stream={} count={}", new Object[] { sessionId, pendingVideos, maxPendingVideoFrames, numSequentialPendingVideoFrames, subscriberStream.getBroadcastStreamPublishName(), droppedPacketsCount });
1509+
log.info("Drop packet. Pending above threshold. sessionId={} stream={} pending={} threshold={} sequential={} dropped={}", new Object[] { sessionId, subscribedStreamName, pendingVideos, maxPendingVideoFrames, numSequentialPendingVideoFrames, droppedPacketsCount });
15051510
}
15061511
// drop because the client has insufficient bandwidth
15071512
long now = System.currentTimeMillis();
@@ -1570,7 +1575,7 @@ private long pendingVideoMessages() {
15701575
return (Long) pendingRequest.getResult();
15711576
}
15721577
}
1573-
return 0;
1578+
return 0L;
15741579
}
15751580

15761581
/**

0 commit comments

Comments
 (0)