Skip to content

Commit 8d278bb

Browse files
committed
Refactor RTMPProtocolDecoder to favor fill packet vs tiny chunk size if available bytes
1 parent e6b3459 commit 8d278bb

File tree

4 files changed

+73
-78
lines changed

4 files changed

+73
-78
lines changed

common/src/main/java/org/red5/server/net/rtmp/codec/RTMPMinaProtocolDecoder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) th
7272
log.trace("Buffers info before: position {}, limit {}, remaining {}", new Object[] { buf.position(), buf.limit(), buf.remaining() });
7373
}
7474
try {
75-
// construct any objects from the decoded bugger
75+
// construct any objects from the decoded buffer
7676
List<?> objects = decoder.decodeBuffer(conn, buf);
7777
log.trace("Decoded: {}", objects);
7878
if (objects != null) {

common/src/main/java/org/red5/server/net/rtmp/codec/RTMPProtocolDecoder.java

Lines changed: 48 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ public class RTMPProtocolDecoder implements Constants, IEventDecoder {
6969

7070
protected static final Logger log = LoggerFactory.getLogger(RTMPProtocolDecoder.class);
7171

72+
protected static final boolean isTrace = log.isTraceEnabled(), isDebug = log.isDebugEnabled();
73+
7274
// close when header errors occur
7375
protected boolean closeOnHeaderError;
7476

@@ -90,8 +92,8 @@ public RTMPProtocolDecoder() {
9092
*/
9193
public List<Object> decodeBuffer(RTMPConnection conn, IoBuffer buffer) {
9294
final int position = buffer.position();
93-
//if (log.isTraceEnabled()) {
94-
//log.trace("decodeBuffer: {}", Hex.encodeHexString(Arrays.copyOfRange(buffer.array(), position, buffer.limit())));
95+
//if (isTrace) {
96+
// log.trace("decodeBuffer: {}", Hex.encodeHexString(Arrays.copyOfRange(buffer.array(), position, buffer.limit())));
9597
//}
9698
// decoded results
9799
List<Object> result = null;
@@ -102,9 +104,9 @@ public List<Object> decodeBuffer(RTMPConnection conn, IoBuffer buffer) {
102104
result = new LinkedList<>();
103105
// get the local decode state
104106
RTMPDecodeState state = conn.getDecoderState();
105-
//if (log.isTraceEnabled()) {
106-
//log.trace("RTMP decode state {}", state);
107-
//}
107+
if (isTrace) {
108+
log.trace("RTMP decode state {}", state);
109+
}
108110
if (!conn.getSessionId().equals(state.getSessionId())) {
109111
log.warn("Session decode overlap: {} != {}", conn.getSessionId(), state.getSessionId());
110112
}
@@ -140,8 +142,8 @@ public List<Object> decodeBuffer(RTMPConnection conn, IoBuffer buffer) {
140142
// close connection because we can't parse data from it
141143
conn.close();
142144
} finally {
143-
//if (log.isTraceEnabled()) {
144-
//log.trace("decodeBuffer - post decode input buffer position: {} remaining: {}", buffer.position(), buffer.remaining());
145+
//if (isTrace) {
146+
// log.trace("decodeBuffer - post decode input buffer position: {} remaining: {}", buffer.position(), buffer.remaining());
145147
//}
146148
buffer.compact();
147149
}
@@ -171,7 +173,7 @@ public List<Object> decodeBuffer(RTMPConnection conn, IoBuffer buffer) {
171173
* on error
172174
*/
173175
public Object decode(RTMPConnection conn, RTMPDecodeState state, IoBuffer in) throws ProtocolException {
174-
//if (log.isTraceEnabled()) {
176+
//if (isTrace) {
175177
//log.trace("Decoding for {}", conn.getSessionId());
176178
//}
177179
try {
@@ -194,7 +196,7 @@ public Object decode(RTMPConnection conn, RTMPDecodeState state, IoBuffer in) th
194196
} catch (RuntimeException e) {
195197
throw new ProtocolException("Error during decoding", e);
196198
} finally {
197-
//if (log.isTraceEnabled()) {
199+
//if (isTrace) {
198200
//log.trace("Decoding finished for {}", conn.getSessionId());
199201
//}
200202
}
@@ -213,7 +215,7 @@ public Object decode(RTMPConnection conn, RTMPDecodeState state, IoBuffer in) th
213215
*/
214216
public Packet decodePacket(RTMPConnection conn, RTMPDecodeState state, IoBuffer in) {
215217
final int position = in.position();
216-
//if (log.isTraceEnabled()) {
218+
//if (isTrace) {
217219
//log.trace("decodePacket - state: {} buffer: {}", state, in);
218220
//log.trace("decodePacket: position {}, limit {}, {}", position, in.limit(), Hex.encodeHexString(Arrays.copyOfRange(in.array(), position, in.limit())));
219221
//log.trace("decodePacket: position {}, limit {}", position, in.limit());
@@ -236,7 +238,7 @@ public Packet decodePacket(RTMPConnection conn, RTMPDecodeState state, IoBuffer
236238
final int channelId = header != null ? header.getChannelId() : chunkHeader.getChannelId();
237239
// header empty vs header null will return the NS_FAILED message
238240
if (header.isEmpty()) {
239-
if (log.isTraceEnabled()) {
241+
if (isTrace) {
240242
log.trace("Header was null or empty - chh: {}", chunkHeader);
241243
}
242244
// send a NetStream.Failed message
@@ -250,34 +252,26 @@ public Packet decodePacket(RTMPConnection conn, RTMPDecodeState state, IoBuffer
250252
// ensure that we dont exceed maximum packet size
251253
int size = header.getSize();
252254
log.debug("Packet size: {}", size);
253-
/* XXX(paul): This is a hack to prevent OOM when decoding has failed in some way
254-
if (size > MAX_PACKET_SIZE) {
255-
// Reject packets that are too big, to protect against OOM when decoding has failed in some way
256-
log.warn("Packet size exceeded. size={}, max={}, connId={}", size, MAX_PACKET_SIZE, conn.getSessionId());
257-
// send a NetStream.Failed message
258-
StreamService.sendNetStreamStatus(conn, StatusCodes.NS_FAILED, "Data exceeded maximum allowed by " + (size - MAX_PACKET_SIZE) + " bytes", "no-name", Status.ERROR, conn.getStreamIdForChannelId(channelId));
259-
throw new ProtocolException(String.format("Packet size exceeded. size: %s", header.getSize()));
260-
}
261-
*/
262255
// get the size of our chunks
263256
int readChunkSize = rtmp.getReadChunkSize();
264257
// check to see if this is a new packet or continue decoding an existing one
265258
Packet packet = rtmp.getLastReadPacket(channelId);
266259
if (packet == null) {
260+
log.trace("Creating new packet");
267261
// create a new packet
268262
packet = new Packet(header.clone());
269263
// store the packet based on its channel id
270264
rtmp.setLastReadPacket(channelId, packet);
271265
}
272266
// get the packet data
273267
IoBuffer buf = packet.getData();
274-
//if (log.isTraceEnabled()) {
275-
//log.trace("Source buffer position: {}, limit: {}, packet-buf.position {}, packet size: {}", new Object[] { in.position(), in.limit(), buf.position(), header.getSize() });
276-
//}
268+
if (isTrace) {
269+
log.trace("Source buffer position: {}, limit: {}, packet-buf.position {}, packet size: {}", in.position(), in.limit(), buf.position(), header.getSize());
270+
}
277271
// read chunk
278-
int length = Math.min(buf.remaining(), readChunkSize);
272+
int length = Math.max(buf.remaining(), readChunkSize);
279273
if (in.remaining() < length) {
280-
//log.debug("Chunk too small, buffering ({},{})", in.remaining(), length);
274+
log.debug("In buffer is too small, buffering ({},{})", in.remaining(), length);
281275
// how much more data we need to continue?
282276
state.bufferDecoding(in.position() - position + length);
283277
// we need to move back position so header will be available during another decode round
@@ -286,17 +280,17 @@ public Packet decodePacket(RTMPConnection conn, RTMPDecodeState state, IoBuffer
286280
}
287281
// get the chunk from our input
288282
byte[] chunk = Arrays.copyOfRange(in.array(), in.position(), in.position() + length);
289-
//if (log.isTraceEnabled()) {
290-
//log.trace("Read chunkSize: {}, length: {}, chunk: {}", readChunkSize, length, Hex.encodeHexString(chunk));
291-
//}
283+
if (isTrace) {
284+
log.trace("Read chunkSize: {}, length: {}, chunk: {}", readChunkSize, length, Hex.encodeHexString(chunk));
285+
}
292286
// move the position
293287
in.skip(length);
294288
// put the chunk into the packet
295289
buf.put(chunk);
296290
if (buf.hasRemaining()) {
297-
//if (log.isTraceEnabled()) {
298-
//log.trace("Packet is incomplete ({},{})", buf.remaining(), buf.limit());
299-
//}
291+
if (isTrace) {
292+
log.trace("Packet is incomplete ({},{})", buf.remaining(), buf.limit());
293+
}
300294
return null;
301295
}
302296
// flip so we can read / decode the packet data into a message
@@ -310,7 +304,7 @@ public Packet decodePacket(RTMPConnection conn, RTMPDecodeState state, IoBuffer
310304
// flash will send an earlier time stamp when resetting a video stream with a new key frame. To avoid dropping it, we give it the
311305
// minimal increment since the last message. To avoid relative time stamps being mis-computed, we don't reset the header we stored.
312306
message.setTimestamp(timestamp);
313-
if (log.isTraceEnabled()) {
307+
if (isTrace) {
314308
log.trace("Decoded message: {}", message);
315309
}
316310
packet.setMessage(message);
@@ -329,7 +323,7 @@ public Packet decodePacket(RTMPConnection conn, RTMPDecodeState state, IoBuffer
329323
lastHeader.setTimerBase(timestamp);
330324
// clear the delta
331325
//lastHeader.setTimerDelta(0);
332-
if (log.isTraceEnabled()) {
326+
if (isTrace) {
333327
log.trace("Last read header after decode: {}", lastHeader);
334328
}
335329
} finally {
@@ -354,7 +348,7 @@ public Packet decodePacket(RTMPConnection conn, RTMPDecodeState state, IoBuffer
354348
* @return Decoded header
355349
*/
356350
public Header decodeHeader(ChunkHeader chh, RTMPDecodeState state, IoBuffer in, RTMP rtmp, int startPostion) {
357-
//if (log.isTraceEnabled()) {
351+
//if (isTrace) {
358352
//log.trace("decodeHeader - chh: {} input: {}", chh, Hex.encodeHexString(Arrays.copyOfRange(in.array(), in.position(), in.limit())));
359353
//log.trace("decodeHeader - chh: {}", chh);
360354
//}
@@ -373,7 +367,7 @@ public Header decodeHeader(ChunkHeader chh, RTMPDecodeState state, IoBuffer in,
373367
}
374368

375369
Header lastHeader = rtmp.getLastReadHeader(channelId);
376-
if (log.isTraceEnabled()) {
370+
if (isTrace) {
377371
log.trace("{} lastHeader: {}", Header.HeaderType.values()[headerSize], lastHeader);
378372
}
379373
// got a non-new header for a channel which has no last-read header
@@ -390,7 +384,7 @@ public Header decodeHeader(ChunkHeader chh, RTMPDecodeState state, IoBuffer in,
390384
return null;
391385
}
392386
}
393-
// if (log.isTraceEnabled()) {
387+
// if (isTrace) {
394388
// log.trace("headerLength: {}", headerLength);
395389
// }
396390

@@ -414,7 +408,7 @@ public Header decodeHeader(ChunkHeader chh, RTMPDecodeState state, IoBuffer in,
414408
}
415409
long ext = in.getUnsignedInt();
416410
timeBase = (int) (ext ^ (ext >>> 32));
417-
if (log.isTraceEnabled()) {
411+
if (isTrace) {
418412
log.trace("Extended time read: {}", timeBase);
419413
}
420414
header.setExtended(true);
@@ -487,7 +481,7 @@ public Header decodeHeader(ChunkHeader chh, RTMPDecodeState state, IoBuffer in,
487481
}
488482
long ext = in.getUnsignedInt();
489483
int timeExt = (int) (ext ^ (ext >>> 32));
490-
if (log.isTraceEnabled()) {
484+
if (isTrace) {
491485
log.trace("Extended time read: {} {}", ext, timeExt);
492486
}
493487
timeBase = timeExt;
@@ -542,7 +536,7 @@ public IRTMPEvent decodeMessage(RTMPConnection conn, Header header, IoBuffer in)
542536
message = decodeAction(conn.getEncoding(), in, header);
543537
break;
544538
case TYPE_FLEX_STREAM_SEND:
545-
if (log.isTraceEnabled()) {
539+
if (isTrace) {
546540
log.trace("Decoding flex stream send on stream id: {}", header.getStreamId());
547541
}
548542
// skip first byte
@@ -551,7 +545,7 @@ public IRTMPEvent decodeMessage(RTMPConnection conn, Header header, IoBuffer in)
551545
message = decodeStreamData(in.slice());
552546
break;
553547
case TYPE_NOTIFY:
554-
if (log.isTraceEnabled()) {
548+
if (isTrace) {
555549
log.trace("Decoding notify on stream id: {}", header.getStreamId());
556550
}
557551
if (header.getStreamId().doubleValue() != 0.0d) {
@@ -616,7 +610,7 @@ private IRTMPEvent decodeClientBW(IoBuffer in) {
616610

617611
/** {@inheritDoc} */
618612
public Unknown decodeUnknown(byte dataType, IoBuffer in) {
619-
if (log.isDebugEnabled()) {
613+
if (isDebug) {
620614
log.debug("decodeUnknown: {}", dataType);
621615
}
622616
return new Unknown(dataType, in);
@@ -784,7 +778,7 @@ private Invoke decodeAction(Encoding encoding, IoBuffer in, Header header) {
784778
if (action == null) {
785779
throw new RuntimeException("Action was null");
786780
}
787-
if (log.isTraceEnabled()) {
781+
if (isTrace) {
788782
log.trace("Action: {}", action);
789783
}
790784
// instance the invoke
@@ -827,7 +821,7 @@ private int readTransactionId(Input input) {
827821
*/
828822
public Ping decodePing(IoBuffer in) {
829823
Ping ping = null;
830-
if (log.isTraceEnabled()) {
824+
if (isTrace) {
831825
// gets the raw data as hex without changing the data or pointer
832826
String hexDump = in.getHexDump();
833827
log.trace("Ping dump: {}", hexDump);
@@ -881,7 +875,7 @@ public VideoData decodeVideoData(IoBuffer in) {
881875
*/
882876
@SuppressWarnings("unchecked")
883877
public Notify decodeStreamData(IoBuffer in) {
884-
if (log.isDebugEnabled()) {
878+
if (isDebug) {
885879
log.debug("decodeStreamData");
886880
}
887881
// our result is a notify
@@ -939,7 +933,7 @@ public Notify decodeStreamData(IoBuffer in) {
939933
params = Collections.EMPTY_MAP;
940934
}
941935
}
942-
if (log.isDebugEnabled()) {
936+
if (isDebug) {
943937
log.debug("Dataframe: {} params: {}", onCueOrOnMeta, params.toString());
944938
}
945939
IoBuffer buf = IoBuffer.allocate(64);
@@ -967,27 +961,27 @@ public Notify decodeStreamData(IoBuffer in) {
967961
log.debug("Params type: {}", object);
968962
if (object == DataTypes.CORE_MAP) {
969963
params = (Map<Object, Object>) input.readMap();
970-
if (log.isDebugEnabled()) {
964+
if (isDebug) {
971965
log.debug("Map params: {}", params.toString());
972966
}
973967
} else if (object == DataTypes.CORE_ARRAY) {
974968
params = (Map<Object, Object>) input.readArray(Object[].class);
975-
if (log.isDebugEnabled()) {
969+
if (isDebug) {
976970
log.debug("Array params: {}", params);
977971
}
978972
} else if (object == DataTypes.CORE_STRING) {
979973
String str = input.readString();
980-
if (log.isDebugEnabled()) {
974+
if (isDebug) {
981975
log.debug("String params: {}", str);
982976
}
983977
params = new HashMap<>();
984978
params.put("0", str);
985979
} else if (object == DataTypes.CORE_OBJECT) {
986980
params = (Map<Object, Object>) input.readObject();
987-
if (log.isDebugEnabled()) {
981+
if (isDebug) {
988982
log.debug("Object params: {}", params);
989983
}
990-
} else if (log.isDebugEnabled()) {
984+
} else if (isDebug) {
991985
log.debug("Stream send did not provide a parameter map");
992986
}
993987
// need to debug this further
@@ -1017,7 +1011,7 @@ public Notify decodeStreamData(IoBuffer in) {
10171011
* @return FlexMessage event
10181012
*/
10191013
public FlexMessage decodeFlexMessage(IoBuffer in) {
1020-
if (log.isDebugEnabled()) {
1014+
if (isDebug) {
10211015
log.debug("decodeFlexMessage");
10221016
}
10231017
// TODO: Unknown byte, probably encoding as with Flex SOs?
@@ -1063,7 +1057,7 @@ public FlexMessage decodeFlexMessage(IoBuffer in) {
10631057
paramList.add(Deserializer.deserialize(input, Object.class));
10641058
}
10651059
params = paramList.toArray();
1066-
if (log.isTraceEnabled()) {
1060+
if (isTrace) {
10671061
log.trace("Parameter count: {}", paramList.size());
10681062
for (int i = 0; i < params.length; i++) {
10691063
log.trace(" > {}: {}", i, params[i]);
@@ -1143,7 +1137,7 @@ private Object[] handleParameters(IoBuffer in, Notify notify, Input input) {
11431137
paramList.add(Deserializer.deserialize(input, Object.class));
11441138
}
11451139
params = paramList.toArray();
1146-
if (log.isDebugEnabled()) {
1140+
if (isDebug) {
11471141
log.debug("Num params: {}", paramList.size());
11481142
for (int i = 0; i < params.length; i++) {
11491143
log.debug(" > {}: {}", i, params[i]);
@@ -1159,7 +1153,7 @@ private Object[] handleParameters(IoBuffer in, Notify notify, Input input) {
11591153
*/
11601154
public static void setMaxPacketSize(int maxPacketSize) {
11611155
MAX_PACKET_SIZE = maxPacketSize;
1162-
if (log.isDebugEnabled()) {
1156+
if (isDebug) {
11631157
log.debug("Max packet size: {}", MAX_PACKET_SIZE);
11641158
}
11651159
}

0 commit comments

Comments
 (0)