Skip to content

Commit

Permalink
adds self.run_forever_flag to streams
Browse files Browse the repository at this point in the history
  • Loading branch information
0xfMissingNo committed Sep 29, 2021
1 parent f9e25cd commit 0b4cae4
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 3 deletions.
2 changes: 1 addition & 1 deletion alpaca_trade_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
from .stream import Stream # noqa
from .stream2 import StreamConn # noqa

__version__ = '1.4.0'
__version__ = '1.4.1'
26 changes: 24 additions & 2 deletions alpaca_trade_api/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def __init__(self,
'dailyBars': {},
}
self._name = 'data'
self._run_forever_flag = False

async def _connect(self):
self._ws = await websockets.connect(
Expand Down Expand Up @@ -79,6 +80,7 @@ async def close(self):
await self._ws.close()
self._ws = None
self._running = False
self._run_forever_flag = False

async def stop_ws(self):
self._stop_stream_queue.put_nowait({"should_stop": True})
Expand Down Expand Up @@ -191,7 +193,8 @@ async def _run_forever(self):
await asyncio.sleep(0.1)
log.info(f'started {self._name} stream')
self._running = False
while True:
self._run_forever_flag = True
while self._run_forever_flag:
try:
if not self._running:
log.info("starting websocket connection")
Expand Down Expand Up @@ -376,6 +379,7 @@ def __init__(self,
self._ws = None
self._running = False
self._stop_stream_queue = queue.Queue()
self._run_forever_flag = False

async def _connect(self):
self._ws = await websockets.connect(self._endpoint)
Expand Down Expand Up @@ -443,7 +447,8 @@ async def _run_forever(self):
await asyncio.sleep(0.1)
log.info('started trading stream')
self._running = False
while True:
self._run_forever_flag = True
while self._run_forever_flag:
try:
if not self._running:
log.info("starting websocket connection")
Expand All @@ -466,6 +471,7 @@ async def close(self):
await self._ws.close()
self._ws = None
self._running = False
self._run_forever_flag = False

async def stop_ws(self):
self._stop_stream_queue.put_nowait({"should_stop": True})
Expand Down Expand Up @@ -648,6 +654,22 @@ def run(self):
print('keyboard interrupt, bye')
pass

async def _close(self):
await asyncio.gather(
self.stop_ws(),
self._trading_ws.close(),
self._data_ws.close(),
self._crypto_ws.close()
)

def close(self):
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(self._close())
except KeyboardInterrupt:
print('keyboard interrupt, bye')
pass

async def stop_ws(self):
"""
Signal the ws connections to stop listenning to api stream.
Expand Down

0 comments on commit 0b4cae4

Please sign in to comment.