11import asyncio
22import logging
33import os
4+ import threading
45import time
6+ from typing import Any , Dict
7+ from urllib .error import URLError
8+ from urllib .request import urlopen
9+ from unittest import TestCase
510
611from aiohttp import WSMsgType , web
712
2833socket_mode_disconnect_message = """{"type":"disconnect","reason":"too_many_websockets","num_connections":2,"debug_info":{"host":"applink-111-xxx"},"connection_info":{"app_id":"A111"}}"""
2934
3035
31- def start_socket_mode_server (self , port : int ):
36+ def start_thread_socket_mode_server (self , port : int ):
3237 logger = logging .getLogger (__name__ )
33- state = {}
38+ state : Dict [ str , Any ] = {}
3439
3540 def reset_server_state ():
3641 state .update (
3742 hello_sent = False ,
43+ disconnect = False ,
3844 envelopes_to_consume = list (socket_mode_envelopes ),
3945 )
4046
4147 self .reset_server_state = reset_server_state
4248
43- async def link (request ):
44- ws = web .WebSocketResponse ()
45- await ws .prepare (request )
46-
47- async for msg in ws :
48- if msg .type != WSMsgType .TEXT :
49- continue
50-
51- message = msg .data
52- logger .debug (f"Server received a message: { message } " )
53-
54- if not state ["hello_sent" ]:
55- state ["hello_sent" ] = True
56- await ws .send_str (socket_mode_hello_message )
57-
58- if state ["envelopes_to_consume" ]:
59- e = state ["envelopes_to_consume" ].pop (0 )
60- logger .debug (f"Send an envelope: { e } " )
61- await ws .send_str (e )
62-
63- await ws .send_str (message )
64-
65- return ws
66-
67- app = web .Application ()
68- app .add_routes ([web .get ("/link" , link )])
69- runner = web .AppRunner (app )
70-
71- def run_server ():
72- reset_server_state ()
73-
74- self .loop = loop = asyncio .new_event_loop ()
75- asyncio .set_event_loop (loop )
76- loop .run_until_complete (runner .setup ())
77- site = web .TCPSite (runner , "127.0.0.1" , port , reuse_port = True )
78- loop .run_until_complete (site .start ())
49+ async def health (request : web .Request ):
50+ wr = web .Response ()
51+ await wr .prepare (request )
52+ wr .set_status (200 )
53+ return wr
7954
80- # run until it's stopped from the main thread
81- loop .run_forever ()
82-
83- loop .run_until_complete (runner .cleanup ())
84- loop .run_until_complete (asyncio .sleep (1 ))
85- loop .close ()
86-
87- return run_server
88-
89-
90- def start_socket_mode_server_with_disconnection (self , port : int ):
91- logger = logging .getLogger (__name__ )
92- state = {}
93-
94- def reset_server_state ():
95- state .update (
96- hello_sent = False ,
97- disconnect_sent = False ,
98- envelopes_to_consume = list (socket_mode_envelopes ),
99- )
100-
101- self .reset_server_state = reset_server_state
55+ async def disconnect (request : web .Request ):
56+ state ["disconnect" ] = True
57+ wr = web .Response ()
58+ await wr .prepare (request )
59+ wr .set_status (200 )
60+ return wr
10261
10362 async def link (request ):
104- disconnected = False
63+ connected = True
10564 ws = web .WebSocketResponse ()
10665 await ws .prepare (request )
10766
10867 async for msg in ws :
109- # To ensure disconnect message is received and handled,
110- # need to keep this ws alive to bypass client ping-pong check.
11168 if msg .type == WSMsgType .PING :
112- t = time .time ()
113- await ws .pong (f"sdk-ping-pong:{ t } " )
69+ await ws .pong (f"sdk-ping-pong:{ time .time ()} " )
11470 continue
11571 if msg .type != WSMsgType .TEXT :
11672 continue
@@ -122,14 +78,14 @@ async def link(request):
12278 state ["hello_sent" ] = True
12379 await ws .send_str (socket_mode_hello_message )
12480
125- if not state ["disconnect_sent " ]:
81+ if state ["disconnect " ]:
12682 state ["hello_sent" ] = False
127- state ["disconnect_sent " ] = True
128- disconnected = True
83+ state ["disconnect " ] = False
84+ connected = False
12985 await ws .send_str (socket_mode_disconnect_message )
130- logger .debug (f "Disconnect message sent" )
86+ logger .debug ("Disconnect message sent" )
13187
132- if state ["envelopes_to_consume" ] and not disconnected :
88+ if state ["envelopes_to_consume" ] and connected :
13389 e = state ["envelopes_to_consume" ].pop (0 )
13490 logger .debug (f"Send an envelope: { e } " )
13591 await ws .send_str (e )
@@ -139,7 +95,13 @@ async def link(request):
13995 return ws
14096
14197 app = web .Application ()
142- app .add_routes ([web .get ("/link" , link )])
98+ app .add_routes (
99+ [
100+ web .get ("/link" , link ),
101+ web .get ("/health" , health ),
102+ web .get ("/disconnect" , disconnect ),
103+ ]
104+ )
143105 runner = web .AppRunner (app )
144106
145107 def run_server ():
@@ -155,7 +117,44 @@ def run_server():
155117 loop .run_forever ()
156118
157119 loop .run_until_complete (runner .cleanup ())
158- loop .run_until_complete (asyncio .sleep (1 ))
159120 loop .close ()
160121
161122 return run_server
123+
124+
125+ def start_socket_mode_server (test , port : int ):
126+ test .sm_thread = threading .Thread (target = start_thread_socket_mode_server (test , port ))
127+ test .sm_thread .daemon = True
128+ test .sm_thread .start ()
129+ wait_for_socket_mode_server (port , 4 )
130+
131+
132+ def stop_socket_mode_server (test : TestCase ):
133+ # An event loop runs in a thread and executes all callbacks and Tasks in
134+ # its thread. While a Task is running in the event loop, no other Tasks
135+ # can run in the same thread. When a Task executes an await expression, the
136+ # running Task gets suspended, and the event loop executes the next Task.
137+ # To schedule a callback from another OS thread, the loop.call_soon_threadsafe() method should be used.
138+ # https://docs.python.org/3/library/asyncio-dev.html#asyncio-multithreading
139+ test .loop .call_soon_threadsafe (test .loop .stop )
140+ test .sm_thread .join (timeout = 5 )
141+
142+
143+ def wait_for_socket_mode_server (port : int , timeout : int ):
144+ start_time = time .time ()
145+ while (time .time () - start_time ) < timeout :
146+ try :
147+ urlopen (f"http://127.0.0.1:{ port } /health" )
148+ return
149+ except URLError :
150+ time .sleep (0.01 )
151+
152+
153+ def request_socket_mode_server_disconnect (port : int , timeout : int ):
154+ start_time = time .time ()
155+ while (time .time () - start_time ) < timeout :
156+ try :
157+ urlopen (f"http://127.0.0.1:{ port } /disconnect" )
158+ return
159+ except URLError :
160+ time .sleep (0.01 )
0 commit comments