1
1
use std:: { sync:: Arc , time:: Duration } ;
2
2
3
3
use anyhow:: { bail, ensure, Context , Result } ;
4
- use futures_util:: { SinkExt , StreamExt } ;
4
+ use futures_util:: { SinkExt , StreamExt , TryStreamExt } ;
5
5
use tokio:: {
6
6
net:: TcpStream ,
7
7
sync:: { mpsc, Mutex , MutexGuard } ,
@@ -11,13 +11,13 @@ use tokio_tungstenite::{
11
11
tungstenite:: { self , error:: ProtocolError , http:: Uri , protocol:: WebSocketConfig } ,
12
12
MaybeTlsStream ,
13
13
} ;
14
- use tracing:: { error, info, warn} ;
14
+ use tracing:: { error, info, trace , warn} ;
15
15
use twitch_api:: {
16
16
eventsub:: {
17
17
channel:: { ChannelChatMessageV1 , ChannelChatMessageV1Payload } ,
18
18
stream:: { StreamOfflineV1 , StreamOnlineV1 } ,
19
- Event , EventsubWebsocketData , Message , Payload , ReconnectPayload , SessionData , Transport ,
20
- WelcomePayload ,
19
+ Event , EventType , EventsubWebsocketData , Message , Payload , ReconnectPayload , SessionData ,
20
+ Transport , WelcomePayload ,
21
21
} ,
22
22
helix:: chat:: { SendChatMessageBody , SendChatMessageRequest } ,
23
23
twitch_oauth2:: { client:: Client as Oauth2Client , TwitchToken , UserToken } ,
@@ -239,32 +239,59 @@ impl EventSubClient {
239
239
self . connect_url = url. parse ( ) ?;
240
240
}
241
241
242
- let transport = Transport :: websocket ( data. id ) ;
242
+ let transport = Transport :: websocket ( & data. id ) ;
243
243
let token = self . token . get ( & self . client ) . await ?;
244
244
245
- self . client
246
- . create_eventsub_subscription (
247
- StreamOnlineV1 :: broadcaster_user_id ( self . streamer_id . clone ( ) ) ,
248
- transport. clone ( ) ,
249
- & * token,
250
- )
245
+ let subs = self
246
+ . client
247
+ . get_eventsub_subscriptions ( None , None , None , & * token)
248
+ . try_collect :: < Vec < _ > > ( )
251
249
. await ?;
252
250
253
- self . client
254
- . create_eventsub_subscription (
255
- StreamOfflineV1 :: broadcaster_user_id ( self . streamer_id . clone ( ) ) ,
256
- transport. clone ( ) ,
257
- & * token,
258
- )
259
- . await ?;
251
+ // Find any active subs for this specific session, so we don't fail on
252
+ // re-creating the event subs that are already in place.
253
+ let subs = subs
254
+ . into_iter ( )
255
+ . flat_map ( |subs| subs. subscriptions )
256
+ . filter_map ( |sub| {
257
+ sub. transport
258
+ . as_websocket ( )
259
+ . is_some_and ( |ws| ws. session_id == data. id )
260
+ . then_some ( sub. type_ )
261
+ } )
262
+ . collect :: < Vec < _ > > ( ) ;
263
+
264
+ trace ! ( ?subs, "loaded active subscriptions" ) ;
265
+
266
+ if !subs. contains ( & EventType :: StreamOnline ) {
267
+ self . client
268
+ . create_eventsub_subscription (
269
+ StreamOnlineV1 :: broadcaster_user_id ( self . streamer_id . clone ( ) ) ,
270
+ transport. clone ( ) ,
271
+ & * token,
272
+ )
273
+ . await ?;
274
+ }
260
275
261
- self . client
262
- . create_eventsub_subscription (
263
- ChannelChatMessageV1 :: new ( self . streamer_id . clone ( ) , self . user_id . clone ( ) ) ,
264
- transport,
265
- & * token,
266
- )
267
- . await ?;
276
+ if !subs. contains ( & EventType :: StreamOffline ) {
277
+ self . client
278
+ . create_eventsub_subscription (
279
+ StreamOfflineV1 :: broadcaster_user_id ( self . streamer_id . clone ( ) ) ,
280
+ transport. clone ( ) ,
281
+ & * token,
282
+ )
283
+ . await ?;
284
+ }
285
+
286
+ if !subs. contains ( & EventType :: ChannelChatMessage ) {
287
+ self . client
288
+ . create_eventsub_subscription (
289
+ ChannelChatMessageV1 :: new ( self . streamer_id . clone ( ) , self . user_id . clone ( ) ) ,
290
+ transport,
291
+ & * token,
292
+ )
293
+ . await ?;
294
+ }
268
295
269
296
Ok ( ( ) )
270
297
}
0 commit comments