Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement LSMA with Numpy #149

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
numpy
100 changes: 100 additions & 0 deletions talipp/indicators/LSMA.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
from dataclasses import dataclass
from typing import List, Any
import numpy as np

from talipp.indicator_util import has_valid_values
from talipp.indicators.Indicator import Indicator, InputModifierType
from talipp.input import SamplingPeriodType, TimedValue, TimedValueExtractor


@dataclass
class LSMAVal:
"""`LSMA` output type.

Args:
slope: Slope of the least squares moving average regression.
intercep: Intercept of the least squares moving average regression.
pred: Predicted value.
"""

slope: float = None
intercept: float = None
pred: float = None


class LSMA(Indicator):
"""Least Squares Moving Average.

Input type: `float`

Output type: [LSMAVal][talipp.indicators.LSMA.LSMAVal]

Args:
period: Period.
input_values: List of input values.
input_indicator: Input indicator.
input_modifier: Input modifier.
input_sampling: Input sampling type.
"""

def __init__(
self,
period: int,
input_values: List[TimedValue] = None,
input_value_extractor=TimedValueExtractor,
input_indicator: Indicator = None,
input_modifier: InputModifierType = None,
input_sampling: SamplingPeriodType = None,
):
super().__init__(
input_modifier=input_modifier,
output_value_type=LSMAVal,
input_sampling=input_sampling,
)

self.period = period
self.v_get_timestamp = np.vectorize(input_value_extractor.get_timestamp)
self.v_get_value = np.vectorize(input_value_extractor.get_value)

#self.times = np.arange(
# start=0.0, stop=self.period, step=1.0, dtype=np.float64
#)

self.initialize(input_values, input_indicator)

def _calculate_new_value(self) -> Any:
if not has_valid_values(self.input_values, self.period):
return None

a_input_values = np.array(
list(
filter(
lambda v: isinstance(v, TimedValue),
self.input_values[-self.period :],
)
)
)
if len(a_input_values) == 0:
return None

times = self.v_get_timestamp(a_input_values)
delta_t = (times[-1] - times[0]) / (self.period - 1)
times = times - times[0]
times = 1.0 + times / delta_t

if np.count_nonzero(np.isnan(times)) == len(a_input_values):
return None

values = self.v_get_value(a_input_values)

A = np.vstack([times, np.ones(len(times))]).T
y = values[:, np.newaxis]
pinv = np.linalg.pinv(A)
alpha = pinv.dot(y)

slope = alpha[0][0]
intercept = alpha[1][0]

pred = slope * self.period + intercept

return LSMAVal(slope, intercept, pred)
2 changes: 2 additions & 0 deletions talipp/indicators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from .KeltnerChannels import KeltnerChannels as KeltnerChannels
from .KST import KST as KST
from .KVO import KVO as KVO
from .LSMA import LSMA as LSMA
from .MACD import MACD as MACD
from .MassIndex import MassIndex as MassIndex
from .McGinleyDynamic import McGinleyDynamic as McGinleyDynamic
Expand Down Expand Up @@ -85,6 +86,7 @@
"KeltnerChannels",
"KST",
"KVO",
"LSMA",
"MACD",
"MassIndex",
"McGinleyDynamic",
Expand Down
29 changes: 25 additions & 4 deletions talipp/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from datetime import datetime, timedelta
from enum import Enum, auto
from dataclasses import dataclass

from talipp.ohlcv import OHLCV

Expand Down Expand Up @@ -83,8 +84,8 @@ class SamplingPeriodType(Enum):

DAY_1 = (TimeUnitType.DAY, 1)
"""1 day"""


class Sampler:
"""Implementation of timeframe auto-sampling.

Expand Down Expand Up @@ -156,8 +157,28 @@ def _normalize(self, dt: datetime):
period_start = period_start.replace(tzinfo=dt.tzinfo)

delta = dt - period_start
num_periods = delta.total_seconds() // (period_length * Sampler.CONVERSION_TO_SEC[period_type])
num_periods = delta.total_seconds() // (
period_length * Sampler.CONVERSION_TO_SEC[period_type]
)

normalized_dt = period_start + timedelta(seconds=num_periods * period_length * Sampler.CONVERSION_TO_SEC[period_type])
normalized_dt = period_start + timedelta(
seconds=num_periods * period_length * Sampler.CONVERSION_TO_SEC[period_type]
)

return normalized_dt


@dataclass
class TimedValue:
time: datetime
value: float


class TimedValueExtractor:
@staticmethod
def get_timestamp(tv: TimedValue):
return tv.time.timestamp()

@staticmethod
def get_value(tv: TimedValue):
return tv.value
4 changes: 4 additions & 0 deletions test/TalippTest.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import unittest
from typing import List
from datetime import datetime, timedelta

from talipp.indicators.Indicator import Indicator
from talipp.ohlcv import OHLCV, OHLCVFactory
from talipp.input import TimedValue


class TalippTest(unittest.TestCase):
Expand All @@ -18,6 +20,8 @@ class TalippTest(unittest.TestCase):

CLOSE_EQUAL_VALUES_TMPL: List[float

TIMED_CLOSE_TMPL: List[TimedValue] = [TimedValue(datetime(2024, 7, 7) + timedelta(days=i), close) for (i, close) in enumerate(CLOSE_TMPL)]

def assertIndicatorUpdate(self, indicator: Indicator, iterations_no: int = 20):
last_indicator_value = indicator[-1]
last_input_value = indicator.input_values[-1]
Expand Down
53 changes: 53 additions & 0 deletions test/test_LSMA.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import unittest

from talipp.indicators import LSMA

from TalippTest import TalippTest


class TestLSMA(TalippTest):
def setUp(self) -> None:
self.input_values = TalippTest.TIMED_CLOSE_TMPL

def test_init_with_period_2(self):
ind = LSMA(2, self.input_values)

self.assertAlmostEqual(ind[-3].slope, 0.29, places=5)
self.assertAlmostEqual(ind[-3].intercept, 10.01, places=5)
self.assertAlmostEqual(ind[-3].pred, 10.59, places=5)

self.assertAlmostEqual(ind[-2].slope, -0.36, places=5)
self.assertAlmostEqual(ind[-2].intercept, 10.95, places=5)
self.assertAlmostEqual(ind[-2].pred, 10.23, places=5)

self.assertAlmostEqual(ind[-1].slope, -0.23, places=5)
self.assertAlmostEqual(ind[-1].intercept, 10.46, places=5)
self.assertAlmostEqual(ind[-1].pred, 10.0, places=5)

def test_init_with_period_5(self):
ind = LSMA(5, self.input_values)

self.assertAlmostEqual(ind[-3].slope, 0.529, places=5)
self.assertAlmostEqual(ind[-3].intercept, 8.161, places=5)
self.assertAlmostEqual(ind[-3].pred, 10.806, places=5)

self.assertAlmostEqual(ind[-2].slope, 0.248, places=5)
self.assertAlmostEqual(ind[-2].intercept, 9.352, places=5)
self.assertAlmostEqual(ind[-2].pred, 10.592, places=5)

self.assertAlmostEqual(ind[-1].slope, -0.037, places=5)
self.assertAlmostEqual(ind[-1].intercept, 10.365, places=5)
self.assertAlmostEqual(ind[-1].pred, 10.180, places=5)

def test_update(self):
self.assertIndicatorUpdate(LSMA(5, self.input_values))

def test_delete(self):
self.assertIndicatorDelete(LSMA(5, self.input_values))

def test_purge_oldest(self):
self.assertIndicatorPurgeOldest(LSMA(5, self.input_values))


if __name__ == "__main__":
unittest.main()
Loading