Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WebsocketConnector] fix candle feeds #712

Merged
merged 3 commits into from
Jul 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion octobot_trading/exchange_data/ohlcv/channel/ohlcv_updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ def _get_historical_candles_count(self):
self.logger.warning(f"Can't initialize the required "
f"{self.channel.exchange_manager.exchange_config.required_historical_candles_count}"
f" historical candles: {self.channel.exchange_manager.exchange_name} is not "
f"supporting candles history.")
f"supporting large candles history. Using the {self.OHLCV_OLD_LIMIT} "
f"latest candles instead.")
return self.OHLCV_OLD_LIMIT

async def _get_init_candles(self, time_frame, pair):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ cdef class CryptofeedWebsocketConnector(abstract_websocket.AbstractWebsocketExch

cdef public object local_loop
cdef public bint is_websocket_restarting
cdef dict _previous_open_candles

cpdef void start(self)
cpdef void _set_async_callbacks(self)
Expand Down Expand Up @@ -68,3 +69,5 @@ cdef class CryptofeedWebsocketConnector(abstract_websocket.AbstractWebsocketExch
cdef str _parse_order_type(self, str raw_order_type)
cdef str _parse_order_status(self, str raw_order_status)
cdef str _parse_order_side(self, str raw_order_side)
cdef void _register_previous_open_candle(self, object time_frame, str symbol, list candle)
cdef list _get_previous_open_candle(self, object time_frame, str symbol)
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,13 @@ def __init__(self, config: object, exchange_manager: object):

# Creates cryptofeed exchange instance
self.cryptofeed_exchange = cryptofeed_exchanges.EXCHANGE_MAP[self.get_feed_name()](
config=self.client_config, sandbox=self.exchange_manager.is_sandboxed, **self.EXCHANGE_CONSTRUCTOR_KWARGS)
config=self.client_config,
sandbox=self.exchange_manager.is_sandboxed,
candle_closed_only=False,
**self.EXCHANGE_CONSTRUCTOR_KWARGS
)

self._previous_open_candles = {}

"""
Abstract methods
Expand Down Expand Up @@ -459,7 +465,11 @@ def _subscribe_feed(self, channels, callbacks, symbols=None, candle_interval=Non
:param channels: the feed channels
:param callbacks: the feed callbacks
"""
feed_kwargs = {}
feed_kwargs = {
"candle_closed_only": False
}
# feeds are creating an exchange, apply exchange kwargs
feed_kwargs.update(self.EXCHANGE_CONSTRUCTOR_KWARGS)
if symbols:
feed_kwargs["symbols"] = symbols
if candle_interval:
Expand Down Expand Up @@ -688,16 +698,26 @@ async def candle(self, candle_data: cryptofeed_types.Candle, receipt_timestamp:
}

if candle_data.symbol not in self.watched_pairs:
previous_candle = self._get_previous_open_candle(time_frame, symbol)
push_previous_candle = previous_candle is not None and \
previous_candle[commons_enums.PriceIndexes.IND_PRICE_TIME.value] < candle_data.start
if candle_data.closed or push_previous_candle:
if candle_data.closed and previous_candle is not None and \
previous_candle[commons_enums.PriceIndexes.IND_PRICE_TIME.value] > candle_data.start:
self.logger.warning(f"Duplicate closed candle: pushing already pushed closed "
f"candle: [{candle_data}]")
await self.push_to_channel(trading_constants.OHLCV_CHANNEL,
time_frame=time_frame,
symbol=symbol,
candle=previous_candle if push_previous_candle else candle)
# closed candle has been fetched from exchange, use it and reset previous open candle
self._register_previous_open_candle(time_frame, symbol, None)
if not candle_data.closed:
await self.push_to_channel(trading_constants.KLINE_CHANNEL,
time_frame=time_frame,
symbol=symbol,
kline=candle)
else:
await self.push_to_channel(trading_constants.OHLCV_CHANNEL,
time_frame=time_frame,
symbol=symbol,
candle=candle)
self._register_previous_open_candle(time_frame, symbol, candle)

# Push a new ticker if necessary : only push on the min timeframe
if time_frame is self.min_timeframe:
Expand Down Expand Up @@ -865,3 +885,17 @@ def _parse_order_side(self, raw_order_side):
"""
return trading_enums.TradeOrderSide.BUY.value \
if raw_order_side == cryptofeed_constants.BUY else trading_enums.TradeOrderSide.SELL.value

def _register_previous_open_candle(self, time_frame, symbol, candle):
try:
self._previous_open_candles[time_frame][symbol] = candle
except KeyError:
if time_frame not in self._previous_open_candles:
self._previous_open_candles[time_frame] = {}
self._previous_open_candles[time_frame][symbol] = candle

def _get_previous_open_candle(self, time_frame, symbol):
try:
return self._previous_open_candles[time_frame][symbol]
except KeyError:
return None
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ cryptography # Never specify a version (managed by https://github.com/Drakkar-So

# Websocket requirements
websockets==10.3
cryptofeed==2.2.2
cryptofeed==2.2.3

# OrderBook requirement
sortedcontainers==2.4.0
Expand Down