99
1010import java .util .ArrayList ;
1111import java .util .Collection ;
12+ import java .util .List ;
1213import java .util .concurrent .ConcurrentHashMap ;
1314import java .util .concurrent .ConcurrentMap ;
1415import java .util .concurrent .atomic .AtomicInteger ;
1516
1617import org .red5 .client .net .rtmpt .RTMPTClientConnection ;
18+ import org .red5 .server .BaseConnection ;
1719import org .red5 .server .api .Red5 ;
1820import org .red5 .server .net .IConnectionManager ;
1921import org .red5 .server .net .rtmp .RTMPConnection ;
2729 *
2830 * @author The Red5 Project
2931 */
30- public class RTMPConnManager implements IConnectionManager <RTMPConnection > {
32+ public class RTMPClientConnManager implements IConnectionManager <BaseConnection > {
3133
32- private static final Logger log = LoggerFactory .getLogger (RTMPConnManager .class );
34+ private static final Logger log = LoggerFactory .getLogger (RTMPClientConnManager .class );
3335
3436 private static int maxHandshakeTimeout = 7000 ;
3537
@@ -42,21 +44,24 @@ public class RTMPConnManager implements IConnectionManager<RTMPConnection> {
4244 // whether or not to use the ThreadPoolTaskExecutor for incoming messages
4345 protected static boolean enableTaskExecutor ;
4446
45- protected static IConnectionManager <RTMPConnection > instance = new RTMPConnManager () ;
47+ protected static IConnectionManager <BaseConnection > instance ;
4648
47- protected ConcurrentMap <String , RTMPConnection > connMap = new ConcurrentHashMap <>();
49+ protected ConcurrentMap <String , BaseConnection > connMap = new ConcurrentHashMap <>();
4850
4951 protected AtomicInteger conns = new AtomicInteger ();
5052
51- public static IConnectionManager <RTMPConnection > getInstance () {
53+ public static IConnectionManager <BaseConnection > getInstance () {
54+ if (instance == null ) {
55+ instance = new RTMPClientConnManager ();
56+ }
5257 return instance ;
5358 }
5459
5560 /** {@inheritDoc} */
5661 @ Override
57- public RTMPConnection createConnection (Class <?> connCls ) {
58- RTMPConnection conn = null ;
59- if (RTMPConnection .class .isAssignableFrom (connCls )) {
62+ public BaseConnection createConnection (Class <?> connCls ) {
63+ BaseConnection conn = null ;
64+ if (BaseConnection .class .isAssignableFrom (connCls )) {
6065 try {
6166 // create connection
6267 conn = createConnectionInstance (connCls );
@@ -73,9 +78,9 @@ public RTMPConnection createConnection(Class<?> connCls) {
7378
7479 /** {@inheritDoc} */
7580 @ Override
76- public RTMPConnection createConnection (Class <?> connCls , String sessionId ) {
77- RTMPConnection conn = null ;
78- if (RTMPConnection .class .isAssignableFrom (connCls )) {
81+ public BaseConnection createConnection (Class <?> connCls , String sessionId ) {
82+ BaseConnection conn = null ;
83+ if (BaseConnection .class .isAssignableFrom (connCls )) {
7984 try {
8085 // create connection
8186 conn = createConnectionInstance (connCls );
@@ -93,47 +98,14 @@ public RTMPConnection createConnection(Class<?> connCls, String sessionId) {
9398 return conn ;
9499 }
95100
96- /**
97- * Adds a connection.
98- *
99- * @param conn connection
100- */
101- @ Override
102- public void setConnection (RTMPConnection conn ) {
103- log .trace ("Adding connection: {}" , conn );
104- int id = conn .getId ();
105- if (id == -1 ) {
106- log .debug ("Connection has unsupported id, using session id hash" );
107- id = conn .getSessionId ().hashCode ();
108- }
109- log .debug ("Connection id: {} session id hash: {}" , conn .getId (), conn .getSessionId ().hashCode ());
110- }
111-
112- /**
113- * Returns a connection for a given client id.
114- *
115- * @param clientId client id
116- * @return connection if found and null otherwise
117- */
118- @ Override
119- public RTMPConnection getConnection (int clientId ) {
120- log .trace ("Getting connection by client id: {}" , clientId );
121- for (RTMPConnection conn : connMap .values ()) {
122- if (conn .getId () == clientId ) {
123- return connMap .get (conn .getSessionId ());
124- }
125- }
126- return null ;
127- }
128-
129101 /**
130102 * Returns a connection for a given session id.
131103 *
132104 * @param sessionId session id
133105 * @return connection if found and null otherwise
134106 */
135107 @ Override
136- public RTMPConnection getConnectionBySessionId (String sessionId ) {
108+ public BaseConnection getConnectionBySessionId (String sessionId ) {
137109 log .debug ("Getting connection by session id: {}" , sessionId );
138110 if (connMap .containsKey (sessionId )) {
139111 return connMap .get (sessionId );
@@ -148,28 +120,20 @@ public RTMPConnection getConnectionBySessionId(String sessionId) {
148120
149121 /** {@inheritDoc} */
150122 @ Override
151- public RTMPConnection removeConnection (int clientId ) {
152- log .trace ("Removing connection with id: {}" , clientId );
153- // remove from map
154- for (RTMPConnection conn : connMap .values ()) {
155- if (conn .getId () == clientId ) {
156- // remove the conn
157- return removeConnection (conn .getSessionId ());
158- }
159- }
160- log .warn ("Connection was not removed by id: {}" , clientId );
161- return null ;
123+ public BaseConnection removeConnection (BaseConnection conn ) {
124+ log .trace ("Removing connection: {}" , conn );
125+ return removeConnection (conn .getSessionId ());
162126 }
163127
164128 /** {@inheritDoc} */
165129 @ Override
166- public RTMPConnection removeConnection (String sessionId ) {
130+ public BaseConnection removeConnection (String sessionId ) {
167131 log .debug ("Removing connection with session id: {}" , sessionId );
168132 if (log .isTraceEnabled ()) {
169133 log .trace ("Connections ({}) at pre-remove: {}" , connMap .size (), connMap .values ());
170134 }
171135 // remove from map
172- RTMPConnection conn = connMap .remove (sessionId );
136+ BaseConnection conn = connMap .remove (sessionId );
173137 if (conn != null ) {
174138 log .trace ("Connections: {}" , conns .decrementAndGet ());
175139 Red5 .setConnectionLocal (null );
@@ -179,19 +143,20 @@ public RTMPConnection removeConnection(String sessionId) {
179143
180144 /** {@inheritDoc} */
181145 @ Override
182- public Collection <RTMPConnection > getAllConnections () {
183- ArrayList <RTMPConnection > list = new ArrayList <RTMPConnection >(connMap .size ());
146+ public Collection <BaseConnection > getAllConnections () {
147+ ArrayList <BaseConnection > list = new ArrayList <>(connMap .size ());
184148 list .addAll (connMap .values ());
185149 return list ;
186150 }
187151
188152 /** {@inheritDoc} */
189153 @ Override
190- public Collection <RTMPConnection > removeConnections () {
191- ArrayList <RTMPConnection > list = new ArrayList <RTMPConnection >(connMap .size ());
192- list .addAll (connMap .values ());
193- connMap .clear ();
194- conns .set (0 );
154+ public Collection <BaseConnection > removeConnections () {
155+ final List <BaseConnection > list = new ArrayList <>(connMap .size ());
156+ connMap .values ().forEach (conn -> {
157+ removeConnection (conn .getSessionId ());
158+ list .add (conn );
159+ });
195160 return list ;
196161 }
197162
@@ -228,23 +193,23 @@ public RTMPConnection createConnectionInstance(Class<?> cls) throws Exception {
228193 }
229194
230195 public static void setMaxHandshakeTimeout (int maxHandshakeTimeout ) {
231- RTMPConnManager .maxHandshakeTimeout = maxHandshakeTimeout ;
196+ RTMPClientConnManager .maxHandshakeTimeout = maxHandshakeTimeout ;
232197 }
233198
234199 public static void setMaxInactivity (int maxInactivity ) {
235- RTMPConnManager .maxInactivity = maxInactivity ;
200+ RTMPClientConnManager .maxInactivity = maxInactivity ;
236201 }
237202
238203 public static void setPingInterval (int pingInterval ) {
239- RTMPConnManager .pingInterval = pingInterval ;
204+ RTMPClientConnManager .pingInterval = pingInterval ;
240205 }
241206
242207 public static void setExecutorQueueCapacity (int executorQueueCapacity ) {
243- RTMPConnManager .executorQueueCapacity = executorQueueCapacity ;
208+ RTMPClientConnManager .executorQueueCapacity = executorQueueCapacity ;
244209 }
245210
246211 public static void setEnableTaskExecutor (boolean enableTaskExecutor ) {
247- RTMPConnManager .enableTaskExecutor = enableTaskExecutor ;
212+ RTMPClientConnManager .enableTaskExecutor = enableTaskExecutor ;
248213 }
249214
250215}
0 commit comments