Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adds self.run_forever_flag to streams #501

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion alpaca_trade_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
from .stream import Stream # noqa
from .stream2 import StreamConn # noqa

__version__ = '1.4.0'
__version__ = '1.4.1'
Copy link
Contributor

@haxdds haxdds Nov 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi 0xfMissingNo! Version changes are necessary in your PR. They are made when we release and deploy a new version.

26 changes: 24 additions & 2 deletions alpaca_trade_api/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def __init__(self,
'dailyBars': {},
}
self._name = 'data'
self._run_forever_flag = False

async def _connect(self):
self._ws = await websockets.connect(
Expand Down Expand Up @@ -79,6 +80,7 @@ async def close(self):
await self._ws.close()
self._ws = None
self._running = False
self._run_forever_flag = False

async def stop_ws(self):
self._stop_stream_queue.put_nowait({"should_stop": True})
Expand Down Expand Up @@ -191,7 +193,8 @@ async def _run_forever(self):
await asyncio.sleep(0.1)
log.info(f'started {self._name} stream')
self._running = False
while True:
self._run_forever_flag = True
while self._run_forever_flag:
try:
if not self._running:
log.info("starting websocket connection")
Expand Down Expand Up @@ -376,6 +379,7 @@ def __init__(self,
self._ws = None
self._running = False
self._stop_stream_queue = queue.Queue()
self._run_forever_flag = False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of this flag? What functionality does it add over while True?


async def _connect(self):
self._ws = await websockets.connect(self._endpoint)
Expand Down Expand Up @@ -443,7 +447,8 @@ async def _run_forever(self):
await asyncio.sleep(0.1)
log.info('started trading stream')
self._running = False
while True:
self._run_forever_flag = True
while self._run_forever_flag:
try:
if not self._running:
log.info("starting websocket connection")
Expand All @@ -466,6 +471,7 @@ async def close(self):
await self._ws.close()
self._ws = None
self._running = False
self._run_forever_flag = False

async def stop_ws(self):
self._stop_stream_queue.put_nowait({"should_stop": True})
Expand Down Expand Up @@ -648,6 +654,22 @@ def run(self):
print('keyboard interrupt, bye')
pass

async def _close(self):
Copy link
Contributor

@haxdds haxdds Nov 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for taking the time to contribute! To help us out, could you please elaborate a bit further on the goal of your PR?

await asyncio.gather(
self.stop_ws(),
self._trading_ws.close(),
self._data_ws.close(),
self._crypto_ws.close()
)

def close(self):
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(self._close())
except KeyboardInterrupt:
print('keyboard interrupt, bye')
pass

async def stop_ws(self):
"""
Signal the ws connections to stop listenning to api stream.
Expand Down