@@ -17,6 +17,10 @@ pub struct MockStream {
1717 rtcp_writer : Mutex < Option < Arc < dyn RTCPWriter + Send + Sync > > > ,
1818 rtp_writer : Mutex < Option < Arc < dyn RTPWriter + Send + Sync > > > ,
1919
20+ internal : Arc < MockStreamInternal > ,
21+ }
22+
23+ struct MockStreamInternal {
2024 rtcp_out_modified_tx : mpsc:: Sender < RTCPPackets > ,
2125 rtp_out_modified_tx : mpsc:: Sender < rtp:: packet:: Packet > ,
2226 rtcp_in_rx : Mutex < mpsc:: Receiver < RTCPPackets > > ,
@@ -46,44 +50,44 @@ impl MockStream {
4650
4751 let stream = Arc :: new ( MockStream {
4852 interceptor : Arc :: clone ( & interceptor) ,
49-
5053 rtcp_writer : Mutex :: new ( None ) ,
5154 rtp_writer : Mutex :: new ( None ) ,
52-
53- rtcp_in_tx : Mutex :: new ( Some ( rtcp_in_tx) ) ,
54- rtp_in_tx : Mutex :: new ( Some ( rtp_in_tx) ) ,
55- rtcp_in_rx : Mutex :: new ( rtcp_in_rx) ,
56- rtp_in_rx : Mutex :: new ( rtp_in_rx) ,
57-
58- rtcp_out_modified_tx,
59- rtp_out_modified_tx,
60- rtcp_out_modified_rx : Mutex :: new ( rtcp_out_modified_rx) ,
61- rtp_out_modified_rx : Mutex :: new ( rtp_out_modified_rx) ,
62-
63- rtcp_in_modified_rx : Mutex :: new ( rtcp_in_modified_rx) ,
64- rtp_in_modified_rx : Mutex :: new ( rtp_in_modified_rx) ,
55+ internal : Arc :: new ( MockStreamInternal {
56+ rtcp_in_tx : Mutex :: new ( Some ( rtcp_in_tx) ) ,
57+ rtp_in_tx : Mutex :: new ( Some ( rtp_in_tx) ) ,
58+ rtcp_in_rx : Mutex :: new ( rtcp_in_rx) ,
59+ rtp_in_rx : Mutex :: new ( rtp_in_rx) ,
60+
61+ rtcp_out_modified_tx,
62+ rtp_out_modified_tx,
63+ rtcp_out_modified_rx : Mutex :: new ( rtcp_out_modified_rx) ,
64+ rtp_out_modified_rx : Mutex :: new ( rtp_out_modified_rx) ,
65+
66+ rtcp_in_modified_rx : Mutex :: new ( rtcp_in_modified_rx) ,
67+ rtp_in_modified_rx : Mutex :: new ( rtp_in_modified_rx) ,
68+ } ) ,
6569 } ) ;
6670
6771 let rtcp_writer = interceptor
68- . bind_rtcp_writer ( Arc :: clone ( & stream) as Arc < dyn RTCPWriter + Send + Sync > )
72+ . bind_rtcp_writer ( Arc :: clone ( & stream. internal ) as Arc < dyn RTCPWriter + Send + Sync > )
6973 . await ;
7074 {
7175 let mut rw = stream. rtcp_writer . lock ( ) . await ;
72- * rw = Some ( rtcp_writer) ;
76+ * rw = Some ( Arc :: clone ( & rtcp_writer) ) ;
7377 }
7478 let rtp_writer = interceptor
7579 . bind_local_stream (
7680 info,
77- Arc :: clone ( & stream) as Arc < dyn RTPWriter + Send + Sync > ,
81+ Arc :: clone ( & stream. internal ) as Arc < dyn RTPWriter + Send + Sync > ,
7882 )
7983 . await ;
8084 {
8185 let mut rw = stream. rtp_writer . lock ( ) . await ;
82- * rw = Some ( rtp_writer) ;
86+ * rw = Some ( Arc :: clone ( & rtp_writer) ) ;
8387 }
8488
8589 let rtcp_reader = interceptor
86- . bind_rtcp_reader ( Arc :: clone ( & stream) as Arc < dyn RTCPReader + Send + Sync > )
90+ . bind_rtcp_reader ( Arc :: clone ( & stream. internal ) as Arc < dyn RTCPReader + Send + Sync > )
8791 . await ;
8892 tokio:: spawn ( async move {
8993 let mut buf = vec ! [ 0u8 ; 1500 ] ;
@@ -104,7 +108,7 @@ impl MockStream {
104108 let rtp_reader = interceptor
105109 . bind_remote_stream (
106110 info,
107- Arc :: clone ( & stream) as Arc < dyn RTPReader + Send + Sync > ,
111+ Arc :: clone ( & stream. internal ) as Arc < dyn RTPReader + Send + Sync > ,
108112 )
109113 . await ;
110114 tokio:: spawn ( async move {
@@ -153,23 +157,23 @@ impl MockStream {
153157
154158 /// receive_rtcp schedules a new rtcp batch, so it can be read be the stream
155159 pub async fn receive_rtcp ( & self , pkts : Vec < Box < dyn rtcp:: packet:: Packet + Send + Sync > > ) {
156- let rtcp_in_tx = self . rtcp_in_tx . lock ( ) . await ;
160+ let rtcp_in_tx = self . internal . rtcp_in_tx . lock ( ) . await ;
157161 if let Some ( tx) = & * rtcp_in_tx {
158162 let _ = tx. send ( pkts) . await ;
159163 }
160164 }
161165
162166 /// receive_rtp schedules a rtp packet, so it can be read be the stream
163167 pub async fn receive_rtp ( & self , pkt : rtp:: packet:: Packet ) {
164- let rtp_in_tx = self . rtp_in_tx . lock ( ) . await ;
168+ let rtp_in_tx = self . internal . rtp_in_tx . lock ( ) . await ;
165169 if let Some ( tx) = & * rtp_in_tx {
166170 let _ = tx. send ( pkt) . await ;
167171 }
168172 }
169173
170174 /// written_rtcp returns a channel containing the rtcp batches written, modified by the interceptor
171175 pub async fn written_rtcp ( & self ) -> Option < Vec < Box < dyn rtcp:: packet:: Packet + Send + Sync > > > {
172- let mut rtcp_out_modified_rx = self . rtcp_out_modified_rx . lock ( ) . await ;
176+ let mut rtcp_out_modified_rx = self . internal . rtcp_out_modified_rx . lock ( ) . await ;
173177 rtcp_out_modified_rx. recv ( ) . await
174178 }
175179
@@ -180,7 +184,7 @@ impl MockStream {
180184 & self ,
181185 ) -> Option < Vec < Box < dyn rtcp:: packet:: Packet + Send + Sync > > > {
182186 let mut last = None ;
183- let mut rtcp_out_modified_rx = self . rtcp_out_modified_rx . lock ( ) . await ;
187+ let mut rtcp_out_modified_rx = self . internal . rtcp_out_modified_rx . lock ( ) . await ;
184188
185189 while let Ok ( v) = rtcp_out_modified_rx. try_recv ( ) {
186190 last = Some ( v) ;
@@ -191,40 +195,40 @@ impl MockStream {
191195
192196 /// written_rtp returns a channel containing rtp packets written, modified by the interceptor
193197 pub async fn written_rtp ( & self ) -> Option < rtp:: packet:: Packet > {
194- let mut rtp_out_modified_rx = self . rtp_out_modified_rx . lock ( ) . await ;
198+ let mut rtp_out_modified_rx = self . internal . rtp_out_modified_rx . lock ( ) . await ;
195199 rtp_out_modified_rx. recv ( ) . await
196200 }
197201
198202 /// read_rtcp returns a channel containing the rtcp batched read, modified by the interceptor
199203 pub async fn read_rtcp (
200204 & self ,
201205 ) -> Option < Result < Vec < Box < dyn rtcp:: packet:: Packet + Send + Sync > > > > {
202- let mut rtcp_in_modified_rx = self . rtcp_in_modified_rx . lock ( ) . await ;
206+ let mut rtcp_in_modified_rx = self . internal . rtcp_in_modified_rx . lock ( ) . await ;
203207 rtcp_in_modified_rx. recv ( ) . await
204208 }
205209
206210 /// read_rtp returns a channel containing the rtp packets read, modified by the interceptor
207211 pub async fn read_rtp ( & self ) -> Option < Result < rtp:: packet:: Packet > > {
208- let mut rtp_in_modified_rx = self . rtp_in_modified_rx . lock ( ) . await ;
212+ let mut rtp_in_modified_rx = self . internal . rtp_in_modified_rx . lock ( ) . await ;
209213 rtp_in_modified_rx. recv ( ) . await
210214 }
211215
212- /// close closes the stream and the underlying interceptor
216+ /// close closes the stream
213217 pub async fn close ( & self ) -> Result < ( ) > {
214218 {
215- let mut rtcp_in_tx = self . rtcp_in_tx . lock ( ) . await ;
219+ let mut rtcp_in_tx = self . internal . rtcp_in_tx . lock ( ) . await ;
216220 rtcp_in_tx. take ( ) ;
217221 }
218222 {
219- let mut rtp_in_tx = self . rtp_in_tx . lock ( ) . await ;
223+ let mut rtp_in_tx = self . internal . rtp_in_tx . lock ( ) . await ;
220224 rtp_in_tx. take ( ) ;
221225 }
222226 self . interceptor . close ( ) . await
223227 }
224228}
225229
226230#[ async_trait]
227- impl RTCPWriter for MockStream {
231+ impl RTCPWriter for MockStreamInternal {
228232 async fn write (
229233 & self ,
230234 pkts : & [ Box < dyn rtcp:: packet:: Packet + Send + Sync > ] ,
@@ -237,7 +241,7 @@ impl RTCPWriter for MockStream {
237241}
238242
239243#[ async_trait]
240- impl RTCPReader for MockStream {
244+ impl RTCPReader for MockStreamInternal {
241245 async fn read (
242246 & self ,
243247 buf : & mut [ u8 ] ,
@@ -260,15 +264,15 @@ impl RTCPReader for MockStream {
260264}
261265
262266#[ async_trait]
263- impl RTPWriter for MockStream {
267+ impl RTPWriter for MockStreamInternal {
264268 async fn write ( & self , pkt : & rtp:: packet:: Packet , _a : & Attributes ) -> Result < usize > {
265269 let _ = self . rtp_out_modified_tx . send ( pkt. clone ( ) ) . await ;
266270 Ok ( 0 )
267271 }
268272}
269273
270274#[ async_trait]
271- impl RTPReader for MockStream {
275+ impl RTPReader for MockStreamInternal {
272276 async fn read (
273277 & self ,
274278 buf : & mut [ u8 ] ,
0 commit comments