Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

create candles pipeline injection, add gaussian noise for example #503

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
37 changes: 29 additions & 8 deletions jesse/modes/backtest_mode.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import time
import re
from typing import Dict, List, Tuple
from typing import Dict, List, Tuple, Optional
import numpy as np
import jesse.helpers as jh
import jesse.services.metrics as stats
Expand All @@ -10,6 +10,7 @@
from jesse.enums import timeframes, order_types
from jesse.models import Order, Position
from jesse.modes.utils import save_daily_portfolio_balance
from jesse.pipelines.candles import BaseCandlesPipeline
from jesse.routes import router
from jesse.services import charts
from jesse.services import report
Expand Down Expand Up @@ -387,6 +388,7 @@ def _step_simulator(
benchmark: bool = False,
generate_hyperparameters: bool = False,
generate_logs: bool = False,
with_candles_pipeline: bool = True,
) -> dict:
# In case generating logs is specifically demanded, the debug mode must be enabled.
if generate_logs:
Expand All @@ -399,7 +401,7 @@ def _step_simulator(

length = _simulation_minutes_length(candles)
_prepare_times_before_simulation(candles)
_prepare_routes(hyperparameters)
candles_pipelines = _prepare_routes(hyperparameters, with_candles_pipeline)

# add initial balance
save_daily_portfolio_balance(is_initial=True)
Expand All @@ -412,7 +414,8 @@ def _step_simulator(

# add candles
for j in candles:
short_candle = candles[j]['candles'][i]
candles_pipeline = candles_pipelines[j]
short_candle = get_candles_from_pipeline(candles_pipeline, candles[j]['candles'], i)
if i != 0:
previous_short_candle = candles[j]['candles'][i - 1]
short_candle = _get_fixed_jumped_candle(previous_short_candle, short_candle)
Expand Down Expand Up @@ -522,8 +525,12 @@ def _prepare_times_before_simulation(candles: dict) -> None:
store.app.time = first_candles_set[0][0]


def _prepare_routes(hyperparameters: dict = None) -> None:
def _prepare_routes(hyperparameters: dict = None,
with_candles_pipeline: bool = True,
) -> dict[str, BaseCandlesPipeline | None]:
# initiate strategies
candles_pipeline = {}

for r in router.routes:
# if the r.strategy is str read it from file
if isinstance(r.strategy_name, str):
Expand Down Expand Up @@ -561,9 +568,21 @@ def _prepare_routes(hyperparameters: dict = None) -> None:
# init few objects that couldn't be initiated in Strategy __init__
# it also injects hyperparameters into self.hp in case the route does not uses any DNAs
r.strategy._init_objects()
candles_pipeline[jh.key(r.exchange, r.symbol)] = r.strategy.get_candles_pipeline() if with_candles_pipeline else None

selectors.get_position(r.exchange, r.symbol).strategy = r.strategy

return candles_pipeline


def get_candles_from_pipeline(candles_pipeline: Optional[BaseCandlesPipeline], candles: np.ndarray, i: int, candles_step: int = -1) -> np.ndarray:
if candles_pipeline is None:
if candles_step == -1:
return candles[i]
else:
return candles[i: i+candles_step]
return candles_pipeline.get_candles(candles[i: i + candles_pipeline._batch_size], i, candles_step)


def _update_progress_bar(
progressbar: Progressbar, run_silently: bool, candle_index: int, candle_step: int, last_update_time: float
Expand Down Expand Up @@ -753,6 +772,7 @@ def _skip_simulator(
benchmark: bool = False,
generate_hyperparameters: bool = False,
generate_logs: bool = False,
with_candles_pipeline: bool = True,
) -> dict:
# In case generating logs is specifically demanded, the debug mode must be enabled.
if generate_logs:
Expand All @@ -762,7 +782,7 @@ def _skip_simulator(

length = _simulation_minutes_length(candles)
_prepare_times_before_simulation(candles)
_prepare_routes(hyperparameters)
candles_pipelines = _prepare_routes(hyperparameters, with_candles_pipeline)

# add initial balance
save_daily_portfolio_balance(is_initial=True)
Expand All @@ -773,7 +793,7 @@ def _skip_simulator(
for i in range(0, length, candles_step):
# update time moved to _simulate_price_change_effect__multiple_candles
# store.app.time = first_candles_set[i][0] + (60_000 * candles_step)
_simulate_new_candles(candles, i, candles_step)
_simulate_new_candles(candles, candles_pipelines, i, candles_step)

last_update_time = _update_progress_bar(progressbar, run_silently, i, candles_step,
last_update_time=last_update_time)
Expand Down Expand Up @@ -831,11 +851,12 @@ def _calculate_minimum_candle_step():
return np.gcd.reduce(consider_time_frames)


def _simulate_new_candles(candles: dict, candle_index: int, candles_step: int) -> None:
def _simulate_new_candles(candles: dict, candles_pipelines: dict[str, BaseCandlesPipeline], candle_index: int, candles_step: int) -> None:
i = candle_index
# add candles
for j in candles:
short_candles = candles[j]["candles"][i: i + candles_step]
candles_pipeline = candles_pipelines[j]
short_candles = get_candles_from_pipeline(candles_pipeline, candles[j]['candles'], i, candles_step)
if i != 0:
previous_short_candles = candles[j]["candles"][i - 1]
# work the same, the fix needs to be done only on the gap of 1m edge candles.
Expand Down
Empty file added jesse/pipelines/__init__.py
Empty file.
5 changes: 5 additions & 0 deletions jesse/pipelines/candles/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from .base_candles import BaseCandlesPipeline
from .rearrange_candles import RearrangeCandles
from .multiple_pipelines import MultipleCandlesPipeline
from .gaussian_noise import GaussianNoiseCandlesPipeline
from .gaussian_resampler import GaussianResamplerCandlesPipeline
36 changes: 36 additions & 0 deletions jesse/pipelines/candles/base_candles.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import numpy as np


class BaseCandlesPipeline:
def __init__(self, batch_size: int) -> None:
self._batch_size = batch_size
self._output: np.ndarray = np.zeros((batch_size, 6))
self.last_price = 0.0

def get_candles(self, candles: np.ndarray, index: int, candles_step: int = -1) -> np.ndarray:
index = index % self._batch_size
if index == 0:
if self.last_price == 0.0:
self.last_price = candles[0, 1] # the first time use open price instead of last close
else:
self.last_price = self._output[-1, 2] # later use the last_price
inject_candle = self.process(candles, self._output[:len(candles)])
if not inject_candle:
self._output[:] = candles
if candles_step == -1:
return self._output[index]
if index + candles_step <= self._batch_size:
return self._output[index:index + candles_step]
raise ValueError("Batch size to candle pipeline supported only multiplication of the minimum timeframe in your"
" routes.")

def process(self, original_1m_candles: np.ndarray, out: np.ndarray) -> bool:
"""
:param original_1m_candles: get original 1m candles to modify it for research purposes to test various scenarios.
Get the next `batch_size` 1m candles.
:param out: The candles that will be injected to the simulation instead of the original 1m candles.
Contains the previous batch.
:return: True if out is modified, False otherwise
"""
return False

56 changes: 56 additions & 0 deletions jesse/pipelines/candles/gaussian_noise.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import numpy as np

from jesse.pipelines.candles import BaseCandlesPipeline


class GaussianNoiseCandlesPipeline(BaseCandlesPipeline):

def __init__(self, batch_size: int, *,
close_mu: float,
close_sigma: float,
high_mu: float = 0.0,
high_sigma: float = 0.0,
low_mu: float = 0.0,
low_sigma: float = 0.0,
) -> None:
"""
Add gaussian noise to candles
"""
super().__init__(batch_size)
self._first_time = True
self.close_mu = close_mu
self.close_sigma = close_sigma
self.high_mu = high_mu
self.high_sigma = high_sigma
self.low_mu = low_mu
self.low_sigma = low_sigma

def process(self, original_1m_candles: np.ndarray, out: np.ndarray) -> bool:
if not self._first_time:
last_price = out[-1, 2] # last_close_price
else:
self._first_time = True
# in case we don't have history set the price as the first price so the bias will be 0
last_price = original_1m_candles[0, 1]
out[:] = original_1m_candles[:]

# close price
noise = np.random.normal(self.close_mu, self.close_sigma, size=len(out)).cumsum()
out[:, 2] = out[:, 2] + noise

# open price
out[1:, 1] = out[:-1, 2]
out[0, 1] = last_price

# high
high_std = 0.0 if self.high_sigma == 0.0 else np.random.normal(0, self.high_sigma, size=len(out))
out[:, 3] = out[:, 3] + self.high_mu + high_std

# low
low_std = 0.0 if self.low_sigma == 0.0 else np.random.normal(0, self.low_sigma, size=len(out))
out[:, 4] = out[:, 4] + self.low_mu + low_std

out[:, 3] = np.maximum(np.maximum(out[:, 1], out[:, 2]), np.maximum(out[:, 3], out[:, 4]))
out[:, 4] = np.minimum(np.minimum(out[:, 1], out[:, 2]), np.minimum(out[:, 3], out[:, 4]))

return True
45 changes: 45 additions & 0 deletions jesse/pipelines/candles/gaussian_resampler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import numpy as np

from jesse.pipelines.candles import BaseCandlesPipeline


class GaussianResamplerCandlesPipeline(BaseCandlesPipeline):

def __init__(self, batch_size: int, *,
mu: float = 0.0, sigma: float = 1.0,
) -> None:
"""
Add gaussian noise to candles
"""
super().__init__(batch_size)
self.mu = mu
self.sigma = sigma

def process(self, original_1m_candles: np.ndarray, out: np.ndarray) -> bool:
out[:] = original_1m_candles[:]

# close price
delta_close = np.diff(original_1m_candles[:, 2], prepend=self.last_price)
mu_delta = np.mean(delta_close[1:])
sigma_delta = np.std(delta_close[1:])
out[:, 2] = np.random.normal(mu_delta + self.mu, sigma_delta * self.sigma, size=len(out)).cumsum() + self.last_price

# open price
out[1:, 1] = out[:-1, 2]
out[0, 1] = self.last_price

# high
delta_high_close = original_1m_candles[:, 3] - original_1m_candles[:, 2]
mu_delta = np.mean(delta_high_close)
sigma_delta = np.std(delta_high_close)
out[:, 3] = out[:, 2] + np.random.normal(mu_delta + self.mu, sigma_delta * self.sigma, size=len(out))

delta_close_low = original_1m_candles[:, 2] - original_1m_candles[:, 4]
mu_delta = np.mean(delta_close_low)
sigma_delta = np.std(delta_close_low)
out[:, 4] = out[:, 2] - np.random.normal(mu_delta + self.mu, sigma_delta * self.sigma, size=len(out))

out[:, 3] = np.maximum(np.maximum(out[:, 1], out[:, 2]), np.maximum(out[:, 3], out[:, 4]))
out[:, 4] = np.minimum(np.minimum(out[:, 1], out[:, 2]), np.minimum(out[:, 3], out[:, 4]))

return True
26 changes: 26 additions & 0 deletions jesse/pipelines/candles/multiple_pipelines.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import math

import numpy as np

from jesse.pipelines.candles import BaseCandlesPipeline


class MultipleCandlesPipeline(BaseCandlesPipeline):

def __init__(self, *pipelines: BaseCandlesPipeline):
batch_size = math.lcm(*[pipeline._batch_size for pipeline in pipelines])
super().__init__(batch_size)
self.pipelines = pipelines

def process(self, original_1m_candles: np.ndarray, out: np.ndarray) -> bool:
for pipeline in self.pipelines:
self.pipeline_process(pipeline, original_1m_candles, out)
return True

@staticmethod
def pipeline_process(pipeline: BaseCandlesPipeline, input_candles: np.ndarray, out: np.ndarray) -> None:
batch_size = pipeline._batch_size
for i in range(0, len(input_candles), batch_size):
candles = input_candles[i:i + batch_size]
output = pipeline.get_candles(candles, 0, len(candles))
out[i:i + len(output)] = output
28 changes: 28 additions & 0 deletions jesse/pipelines/candles/rearrange_candles.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import numpy as np

from jesse.pipelines.candles import BaseCandlesPipeline


class RearrangeCandles(BaseCandlesPipeline):

def __init__(self, batch_size: int) -> None:
super().__init__(batch_size)

def process(self, original_1m_candles: np.ndarray, out: np.ndarray) -> bool:
out[:len(original_1m_candles), :] = original_1m_candles[:, :]

out[:, 1] = out[:, 1] - out[:, 2]
out[:, 3] = out[:, 3] - out[:, 2]
out[:, 4] = out[:, 4] - out[:, 2]
out[:, 2] = np.diff(out[:, 2], prepend=self.last_price)

shuffled_indices = np.random.permutation(len(out))
out[:] = out[shuffled_indices]
out[:, 0] = original_1m_candles[:, 0]

out[:, 2] = out[:, 2].cumsum() + self.last_price
out[:, 1] += out[:, 2]
out[:, 3] += out[:, 2]
out[:, 4] += out[:, 2]

return True
1 change: 1 addition & 0 deletions jesse/research/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from .candles import get_candles, store_candles, fake_candle, fake_range_candles, candles_from_close_prices
from .backtest import backtest
from .monte_carlo import monte_carlo
from .import_candles import import_candles
6 changes: 5 additions & 1 deletion jesse/research/backtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ def backtest(
generate_json: bool = False,
generate_logs: bool = False,
hyperparameters: dict = None,
fast_mode: bool = False
fast_mode: bool = False,
with_candles_pipeline: bool = True,
) -> dict:
"""
An isolated backtest() function which is perfect for using in research, and AI training
Expand Down Expand Up @@ -65,6 +66,7 @@ def backtest(
generate_hyperparameters=generate_hyperparameters,
generate_logs=generate_logs,
fast_mode=fast_mode,
with_candles_pipeline=with_candles_pipeline,
)


Expand All @@ -84,6 +86,7 @@ def _isolated_backtest(
generate_hyperparameters: bool = False,
generate_logs: bool = False,
fast_mode: bool = False,
with_candles_pipeline: bool = True,
) -> dict:
from jesse.services.validators import validate_routes
from jesse.modes.backtest_mode import simulator
Expand Down Expand Up @@ -149,6 +152,7 @@ def _isolated_backtest(
generate_hyperparameters=generate_hyperparameters,
generate_logs=generate_logs,
fast_mode=fast_mode,
with_candles_pipeline=with_candles_pipeline
)

result = {
Expand Down
Loading