From ec9b5f8280b4197f40ab0a867ead01dbc37a0fd8 Mon Sep 17 00:00:00 2001 From: Daniele Carli <47296443+Moonzyyy@users.noreply.github.com> Date: Mon, 1 Jul 2024 10:24:17 +0100 Subject: [PATCH 1/2] [SEG] Piecewise Linear Approximation (PLA)- unfinished implementations (#250) * Initial Commit * Changed SW to np.array * Changed TD to np.array * Fixed TD error, changed BU to np.array * Revert some changes due to errors caused from numpy array --- tsml_eval/segmentation/__init__.py | 12 +++++++ tsml_eval/segmentation/_bu.py | 41 +++++++++++++++++++++++ tsml_eval/segmentation/_sw.py | 40 ++++++++++++++++++++++ tsml_eval/segmentation/_swab.py | 38 +++++++++++++++++++++ tsml_eval/segmentation/_td.py | 48 +++++++++++++++++++++++++++ tsml_eval/segmentation/base.py | 36 ++++++++++++++++++++ tsml_eval/segmentation/manual_test.py | 17 ++++++++++ 7 files changed, 232 insertions(+) create mode 100644 tsml_eval/segmentation/__init__.py create mode 100644 tsml_eval/segmentation/_bu.py create mode 100644 tsml_eval/segmentation/_sw.py create mode 100644 tsml_eval/segmentation/_swab.py create mode 100644 tsml_eval/segmentation/_td.py create mode 100644 tsml_eval/segmentation/base.py create mode 100644 tsml_eval/segmentation/manual_test.py diff --git a/tsml_eval/segmentation/__init__.py b/tsml_eval/segmentation/__init__.py new file mode 100644 index 00000000..1602ecbe --- /dev/null +++ b/tsml_eval/segmentation/__init__.py @@ -0,0 +1,12 @@ +"""Piecewise Linear Approximation.""" + +__all__ = [ + "BasePLA", + "SlidingWindow", + "TopDown", + "BottomUp" +] +from base import BasePLA +from _sw import SlidingWindow +from _td import TopDown +from _bu import BottomUp \ No newline at end of file diff --git a/tsml_eval/segmentation/_bu.py b/tsml_eval/segmentation/_bu.py new file mode 100644 index 00000000..fca63ee8 --- /dev/null +++ b/tsml_eval/segmentation/_bu.py @@ -0,0 +1,41 @@ +from base import BasePLA +import numpy as np +import math +__maintainer__ = [] +__all__ = ["BottomUp"] + +class BottomUp(BasePLA): + + def __init__(self, max_error): + super().__init__(max_error) + + #clean the code + def bottomUp(self, time_series): + seg_ts = [] + merge_cost = [] + for i in range(0, len(time_series), 2): + seg_ts.append(self.create_segment(time_series[i: i + 2])) + for i in range(len(seg_ts) - 1): + merge_cost.append(self.calculate_error(seg_ts[i] + seg_ts[i + 1])) + + merge_cost = np.array(merge_cost) + + while len(merge_cost != 0) and min(merge_cost) < self.max_error: + if(len(merge_cost) == len(seg_ts)): + print("error") + pos = np.argmin(merge_cost) + seg_ts[pos] = np.concatenate((seg_ts[pos], seg_ts[pos + 1])) + seg_ts.pop(pos + 1) + if (pos + 1) < len(merge_cost): + merge_cost = np.delete(merge_cost, pos + 1) + else: + merge_cost= np.delete(merge_cost, pos) + + if pos != 0: + merge_cost[pos - 1] = self.calculate_error(np.concatenate((seg_ts[pos - 1], seg_ts[pos]))) + + if((pos + 1) < len(seg_ts)): + merge_cost[pos] = self.calculate_error(np.concatenate((seg_ts[pos], seg_ts[pos + 1]))) + + + return seg_ts \ No newline at end of file diff --git a/tsml_eval/segmentation/_sw.py b/tsml_eval/segmentation/_sw.py new file mode 100644 index 00000000..6a4fa4ff --- /dev/null +++ b/tsml_eval/segmentation/_sw.py @@ -0,0 +1,40 @@ + +from base import BasePLA +import numpy as np +__maintainer__ = [] +__all__ = ["SlidingWindow"] + +class SlidingWindow(BasePLA): + + def __init__(self, max_error): + super().__init__(max_error) + + """work in progress + def sliding_window(self, time_series): + seg_ts = [] + anchor = 0 + for i in range(1, len(time_series)): + if self.calculate_error(time_series[anchor:i]) > self.max_error: + seg_ts.append(self.create_segment(time_series[anchor: i - 1])) + anchor = i - 1 + if(anchor < i): + seg_ts.append(self.create_segment(time_series[anchor: i - 1])) + return np.concatenate(seg_ts) """ + + #! clean this up, the while loops are not done in a good manner. This is from the pseudocode + def sliding_window(self, time_series): + seg_ts = [] + anchor = 0 + while anchor < len(time_series): + i = 2 + while anchor + i -1 < len(time_series) and self.calculate_error(time_series[anchor:anchor + i]) < self.max_error: + i = i + 1 + seg_ts.append(self.create_segment(time_series[anchor:anchor + i - 1])) + anchor = anchor + i - 1 + return seg_ts + + def segment(time_series): + return None + + def pla(time_series): + return None \ No newline at end of file diff --git a/tsml_eval/segmentation/_swab.py b/tsml_eval/segmentation/_swab.py new file mode 100644 index 00000000..a3f82dd1 --- /dev/null +++ b/tsml_eval/segmentation/_swab.py @@ -0,0 +1,38 @@ +from base import BasePLA +import numpy as np +import sys +import BottomUp + +__maintainer__ = [] +__all__ = ["SWAB"] + +class SWAB(BasePLA): + + def __init__(self, max_error, seg_num = 6): + self.seg_num = seg_num + self.bottomup = BottomUp(max_error) + super().__init__(max_error) + + + def swab(self, time_series): + seg_ts = [] + buffer = np.empty(self.seg_num, dtype=object) + sw_lower_bound = len(buffer) / 2 + sw_upper_bound = len(buffer) * 2 + while len(buffer) < 3: + t = self.bottomup(time_series) + seg_ts.append(t[0]) + buffer = buffer[len(t) - 1:] + return None + + + #finds the next potential segment + def best_line(self, time_series, current_data_point, sw_lower_bound, sw_upper_bound): + seg_ts = [] + error = 0 + while error < self.max_error: + seg_ts.append = time_series[current_data_point] + error = self.calculate_error(seg_ts) + current_data_point = current_data_point + 1 + return seg_ts + \ No newline at end of file diff --git a/tsml_eval/segmentation/_td.py b/tsml_eval/segmentation/_td.py new file mode 100644 index 00000000..092d1981 --- /dev/null +++ b/tsml_eval/segmentation/_td.py @@ -0,0 +1,48 @@ +from base import BasePLA +import numpy as np +import sys + +__maintainer__ = [] +__all__ = ["TopDown"] + +class TopDown(BasePLA): + + def __init__(self, max_error): + super().__init__(max_error) + + #Implement a cache system for this + def topDown(self, time_series): + seg_ts = [] + best_so_far = sys.float_info.max + breakpoint = None + for i in range(2, len(time_series -2)): + improvement_in_approximation = self.improvement_splitting_here(time_series, i) + if(improvement_in_approximation < best_so_far): + breakpoint = i + best_so_far = improvement_in_approximation + + if breakpoint == None: + return [time_series] + + left_segment = time_series[:breakpoint] + right_segment = time_series[breakpoint:] + + if self.calculate_error(left_segment) > self.max_error: + seg_ts.extend(self.topDown(left_segment)) + else: + seg_ts.append(left_segment) + + + if self.calculate_error(right_segment) > self.max_error: + seg_ts.extend(self.topDown(right_segment)) + else: + seg_ts.append(right_segment) + + return seg_ts + + + def improvement_splitting_here(self, time_series, breakpoint): + left_segment = time_series[:breakpoint] + right_segment = time_series[breakpoint:] + return self.calculate_error(left_segment) + self.calculate_error(right_segment) + \ No newline at end of file diff --git a/tsml_eval/segmentation/base.py b/tsml_eval/segmentation/base.py new file mode 100644 index 00000000..62c4109a --- /dev/null +++ b/tsml_eval/segmentation/base.py @@ -0,0 +1,36 @@ +"""Abstract base class""" + +__maintainer__ = [] +__all__ = ["BasePLA"] + +import numpy as np +import pandas as pd +from sklearn.linear_model import LinearRegression + +class BasePLA(): + "Base class for piecewise linear approximation (PLA)" + + def __init__(self, max_error): + self.max_error = max_error + + def linear_regression(self, time_series, sequence = None): + n = len(time_series) + Y = np.array(time_series) + X = np.arange(n).reshape(-1 , 1) + linearRegression = LinearRegression() + linearRegression.fit(X, Y) + regression_line = np.array(linearRegression.predict(X)) + return regression_line + + def sum_squared_error(self, time_series, linear_regression_time_series): + "formula: sse = the sum of the differences of the original series against the predicted series squared" + error = np.sum((time_series - linear_regression_time_series) ** 2) + return error + + def calculate_error(self, time_series): + lrts = self.linear_regression(time_series) + sse = self.sum_squared_error(time_series, lrts) + return sse + + def create_segment(self, time_series): + return self.linear_regression(time_series) \ No newline at end of file diff --git a/tsml_eval/segmentation/manual_test.py b/tsml_eval/segmentation/manual_test.py new file mode 100644 index 00000000..683b06c5 --- /dev/null +++ b/tsml_eval/segmentation/manual_test.py @@ -0,0 +1,17 @@ +from _sw import SlidingWindow +from _bu import BottomUp +from _td import TopDown +from aeon.datasets import load_electric_devices_segmentation +from aeon.visualisation import plot_series_with_change_points, plot_series_with_profiles +import matplotlib.pyplot as plt +import numpy as np + + +ts, period_size, true_cps = load_electric_devices_segmentation() +ts = ts[0:20] +ts = ts.values +sw = TopDown(100) +results = sw.topDown(ts) +print(len(results)) + +print(results) \ No newline at end of file From b69aeecfe84035f4cceea2b5ec10890f94e7adbd Mon Sep 17 00:00:00 2001 From: Daniele Carli <47296443+Moonzyyy@users.noreply.github.com> Date: Mon, 8 Jul 2024 10:19:18 +0100 Subject: [PATCH 2/2] [SEG] Comments (#251) * Initial Commit * Changed SW to np.array * Changed TD to np.array * Fixed TD error, changed BU to np.array * Revert some changes due to errors caused from numpy array * Added Swab and added dense findings of segmentations * Fixed dense findings for segmentation forPLA * Added comments to each function and class * Commits for meeting tomorrow * Moved to _wip folder, changed from segmentation to series transformer and fixed issue with top down algorithm * Fixed bottom down algorithm for pla * Deletion of folders * Implementing PiecewiseLinearApproximation as part of BaseSeriesTransformer * Initial finish in implementing PLA and its transformers into base series transformer, some efficiencies can be made * Fixed Errors * Tests added * Finished Progress --- tsml_eval/_wip/series_transformer/__init__.py | 7 + tsml_eval/_wip/series_transformer/_bu_old.py | 86 ++++ tsml_eval/_wip/series_transformer/_pla.py | 398 ++++++++++++++++++ tsml_eval/_wip/series_transformer/_sw_old.py | 70 +++ .../_wip/series_transformer/_swab_old.py | 117 +++++ tsml_eval/_wip/series_transformer/_td_old.py | 110 +++++ .../_wip/series_transformer/manual_test.py | 39 ++ tsml_eval/_wip/series_transformer/test_pla.py | 69 +++ tsml_eval/segmentation/__init__.py | 12 - tsml_eval/segmentation/_bu.py | 41 -- tsml_eval/segmentation/_sw.py | 40 -- tsml_eval/segmentation/_swab.py | 38 -- tsml_eval/segmentation/_td.py | 48 --- tsml_eval/segmentation/base.py | 36 -- tsml_eval/segmentation/manual_test.py | 17 - 15 files changed, 896 insertions(+), 232 deletions(-) create mode 100644 tsml_eval/_wip/series_transformer/__init__.py create mode 100644 tsml_eval/_wip/series_transformer/_bu_old.py create mode 100644 tsml_eval/_wip/series_transformer/_pla.py create mode 100644 tsml_eval/_wip/series_transformer/_sw_old.py create mode 100644 tsml_eval/_wip/series_transformer/_swab_old.py create mode 100644 tsml_eval/_wip/series_transformer/_td_old.py create mode 100644 tsml_eval/_wip/series_transformer/manual_test.py create mode 100644 tsml_eval/_wip/series_transformer/test_pla.py delete mode 100644 tsml_eval/segmentation/__init__.py delete mode 100644 tsml_eval/segmentation/_bu.py delete mode 100644 tsml_eval/segmentation/_sw.py delete mode 100644 tsml_eval/segmentation/_swab.py delete mode 100644 tsml_eval/segmentation/_td.py delete mode 100644 tsml_eval/segmentation/base.py delete mode 100644 tsml_eval/segmentation/manual_test.py diff --git a/tsml_eval/_wip/series_transformer/__init__.py b/tsml_eval/_wip/series_transformer/__init__.py new file mode 100644 index 00000000..6b521d9f --- /dev/null +++ b/tsml_eval/_wip/series_transformer/__init__.py @@ -0,0 +1,7 @@ +"""Piecewise Linear Approximation.""" + +__all__ = [ + "PiecewiseLinearApproximation", +] + +from _pla import PiecewiseLinearApproximation diff --git a/tsml_eval/_wip/series_transformer/_bu_old.py b/tsml_eval/_wip/series_transformer/_bu_old.py new file mode 100644 index 00000000..dec5cb6a --- /dev/null +++ b/tsml_eval/_wip/series_transformer/_bu_old.py @@ -0,0 +1,86 @@ +from base import BasePLA +import numpy as np +import math +__maintainer__ = [] +__all__ = ["BottomUp"] + +class BottomUp(BasePLA): + """ + Piecewise Linear Bottom-Up. + + Uses a bottom-up algorithm to traverse the dataset in an offline manner. + + Parameters + ---------- + max_error: float + The maximum error valuefor the function to find before segmenting the dataset + + References + ---------- + .. [1] Keogh, E., Chu, S., Hart, D. and Pazzani, M., 2001, November. + An online algorithm for segmenting time series. (pp. 289-296). + """ + + def __init__(self, max_error): + super().__init__(max_error) + + #clean the code + def transform(self, time_series): + """Transform a time series + + Parameters + ---------- + time_series : np.array + 1D time series to be transformed. + + Returns + ------- + list + List of transformed segmented time series + """ + + seg_ts = [] + merge_cost = [] + for i in range(0, len(time_series), 2): + seg_ts.append(self.create_segment(time_series[i: i + 2])) + for i in range(len(seg_ts) - 1): + merge_cost.append(self.calculate_error(seg_ts[i] + seg_ts[i + 1])) + + merge_cost = np.array(merge_cost) + + while len(merge_cost) != 0 and min(merge_cost) < self.max_error: + pos = np.argmin(merge_cost) + seg_ts[pos] = self.create_segment(np.concatenate((seg_ts[pos], seg_ts[pos + 1]))) + seg_ts.pop(pos + 1) + if (pos + 1) < len(merge_cost): + merge_cost = np.delete(merge_cost, pos + 1) + else: + merge_cost= np.delete(merge_cost, pos) + + if pos != 0: + merge_cost[pos - 1] = self.calculate_error(np.concatenate((seg_ts[pos - 1], seg_ts[pos]))) + + if((pos + 1) < len(seg_ts)): + merge_cost[pos] = self.calculate_error(np.concatenate((seg_ts[pos], seg_ts[pos + 1]))) + + + return seg_ts + + + + def transform_flatten(self, time_series): + """Transform a time series and return a 1d array + + Parameters + ---------- + time_series : np.array + 1D time series to be transformed. + + Returns + ------- + list + List of a flattened transformed time series + """ + + pla_timeseries = self.transform(time_series) + return np.concatenate(pla_timeseries) \ No newline at end of file diff --git a/tsml_eval/_wip/series_transformer/_pla.py b/tsml_eval/_wip/series_transformer/_pla.py new file mode 100644 index 00000000..db16d31c --- /dev/null +++ b/tsml_eval/_wip/series_transformer/_pla.py @@ -0,0 +1,398 @@ +__maintainer__ = [] +__all__ = ["PiecewiseLinearApproximation"] + +from enum import Enum +import numpy as np +from sklearn.linear_model import LinearRegression +from aeon.transformations.series.base import BaseSeriesTransformer + +class PiecewiseLinearApproximation(BaseSeriesTransformer): + """Piecewise Linear Approximation (PLA) for time series transformation. + + Takes a univariate time series as input. Approximates a time series using + linear regression and the sum of squares error (SSE) through an algorithm. + The algorithms available are two offline algorithms: TopDown and BottomUp + and two online algorithms: SlidingWindow and SWAB (Sliding Window and Bottom Up). + + Parameters + ---------- + transformer: enum + The transformer to be used + max_error: float + The maximum error valuefor the function to find before segmenting the dataset + buffer_size: int + The buffer size, used only for SWAB + + Attributes + ---------- + segment_dense : np.array + The endpoints of each found segment of the series for transformation + + References + ---------- + .. [1] Keogh, E., Chu, S., Hart, D. and Pazzani, M., 2001, November. + An online algorithm for segmenting time series. (pp. 289-296). + """ + + class Transformer(Enum): + """An enum class specifically for PLA.""" + + SlidingWindow = "SlidingWindow" + TopDown = "TopDown" + BottomUp = "BottomUp" + SWAB = "Swab" + + _tags = { + "fit_is_empty": True, + "python_dependencies": "sklearn", + } + + + def __init__(self, transformer, max_error, buffer_size = None): + if not isinstance(transformer, self.Transformer): + raise ValueError("Invalid transformer: please use Transformer class.") + if not isinstance(max_error, (int, float, complex)): + raise ValueError("Invalid max_error: it has to be a number.") + if not (buffer_size == None or isinstance(buffer_size, (int, float, complex))): + raise ValueError("Invalid buffer_size: use a number only or keep empty.") + self.transformer = transformer + self.max_error = max_error + self.buffer_size = buffer_size + self.segment_dense = np.array([]) + super().__init__(axis=0) + + + def _transform(self, X, y=None): + """Transform X and return a transformed version. + + private _transform containing the core logic, called from transform + + Parameters + ---------- + X : np.ndarray + 1D time series to be transformed + y : ignored argument for interface compatibility + + Returns + ------- + np.ndarray + 1D transform of X + """ + results = None + if(self.transformer == self.Transformer.SlidingWindow): + results = self._sliding_window(X) + elif(self.transformer == self.Transformer.TopDown): + results = self._top_down(X) + elif(self.transformer == self.Transformer.BottomUp): + results = self._bottom_up(X) + elif(self.transformer == self.Transformer.SWAB): + results = self._SWAB(X) + else: + raise RuntimeError("No transformer was called.") + + if(len(results) > 1): + segment_dense = np.zeros([len(results) - 1]) + segment_dense[0] = len(results[0]) + for i in range(1, len(results) - 1): + segment_dense[i] = segment_dense[i - 1] + len(results[i]) + self.segment_dense = segment_dense + + return np.concatenate(results) + + def _sliding_window(self, X): + """Transform a time series using the sliding window algorithm. (Online) + + Parameters + ---------- + X : np.ndarray + 1D time series to be transformed. + + Returns + ------- + list + List of transformed segmented time series + """ + seg_ts = [] + anchor = 0 + while anchor < len(X): + i = 2 + while anchor + i -1 < len(X) and self._calculate_error(X[anchor:anchor + i]) < self.max_error: + i = i + 1 + seg_ts.append(self._create_segment(X[anchor:anchor + i - 1])) + anchor = anchor + i - 1 + return seg_ts + + def _top_down(self, X): + """Transform a time series using the top down algorithm (Offline) + + Parameters + ---------- + X : np.ndarray + 1D time series to be transformed. + + Returns + ------- + list + List of transformed segmented time series + """ + + best_so_far = float("inf") + breakpoint = None + + for i in range(2, len(X -2)): + improvement_in_approximation = self.improvement_splitting_here(X, i) + if(improvement_in_approximation < best_so_far): + breakpoint = i + best_so_far = improvement_in_approximation + + if(breakpoint == None): + return X + + left_found_segment = X[:breakpoint] + right_found_segment = X[breakpoint:] + + left_segment = None + right_segment = None + + if self._calculate_error(left_found_segment) > self.max_error: + left_segment = self._top_down(left_found_segment) + else: + left_segment = [self._create_segment(left_found_segment)] + + if self._calculate_error(right_found_segment) > self.max_error: + right_segment = self._top_down(right_found_segment) + else: + right_segment = [self._create_segment(right_found_segment)] + + return left_segment + right_segment + + def improvement_splitting_here(self, X, breakpoint): + """Returns the SSE of the left and right segmennts split + at a particual point in a time series + + Parameters + ---------- + X : np.array + 1D time series. + breakpoint : int + the break point within the time series array + + Returns + ------- + error: float + the squared sum error of the split segmentations + """ + left_segment = X[:breakpoint] + right_segment = X[breakpoint:] + return self._calculate_error(left_segment) + self._calculate_error(right_segment) + + def _bottom_up(self, X): + """Transform a time series using the bottom up algorithm (Offline) + + Parameters + ---------- + X : np.ndarray + 1D time series to be transformed. + + Returns + ------- + list + List of transformed segmented time series + """ + seg_ts = [] + merge_cost = [] + for i in range(0, len(X), 2): + seg_ts.append(self._create_segment(X[i: i + 2])) + for i in range(len(seg_ts) - 1): + merge_cost.append(self._calculate_error(seg_ts[i] + seg_ts[i + 1])) + + merge_cost = np.array(merge_cost) + + while len(merge_cost) != 0 and min(merge_cost) < self.max_error: + pos = np.argmin(merge_cost) + seg_ts[pos] = self._create_segment(np.concatenate((seg_ts[pos], seg_ts[pos + 1]))) + seg_ts.pop(pos + 1) + if (pos + 1) < len(merge_cost): + merge_cost = np.delete(merge_cost, pos + 1) + else: + merge_cost= np.delete(merge_cost, pos) + + if pos != 0: + merge_cost[pos - 1] = self._calculate_error(np.concatenate((seg_ts[pos - 1], seg_ts[pos]))) + + if((pos + 1) < len(seg_ts)): + merge_cost[pos] = self._calculate_error(np.concatenate((seg_ts[pos], seg_ts[pos + 1]))) + + return seg_ts + + def _SWAB(self, X): + """Transform a time series using the SWAB algorithm (Online) + + Parameters + ---------- + X : np.array + 1D time series to be transformed. + + Returns + ------- + list + List of transformed segmented time series + """ + seg_ts = [] + if(self.buffer_size == None): + self.buffer_size = int(len(X) ** 0.5) + + lower_boundary_window = int(self.buffer_size / 2) + upper_boundary_window = int(self.buffer_size * 2) + + seg = self._best_line(X, 0, lower_boundary_window, upper_boundary_window) + current_data_point = len(seg) + buffer = np.array(seg) + + while len(buffer) > 0: + t = self._bottom_up(X) + seg_ts.append(t[0]) + buffer = buffer[len(t[0]):] + if(current_data_point >= len(X)): + seg = self._best_line(X, current_data_point, lower_boundary_window, upper_boundary_window) + current_data_point = current_data_point + len(seg) + buffer = np.append(buffer, seg) + else: + buffer = np.array([]) + t = t[1:] + for i in range(len(t)): + seg_ts.append(t[i]) + return seg_ts + + + def _best_line(self, X, current_data_point, lower_boundary_window, upper_boundary_window): + """Uses sliding window to find the next best segmentation candidate. + Used inside of the SWAB algorithm. + + Parameters + ---------- + X : np.array + 1D time series to be segmented. + current_data_point : int + the current_data_point we are observing + lower_boundary_window: int + the lower boundary of the window + upper_boundary_window: int + the uppoer boundary of the window + + Returns + ------- + np.array + new found segmentation candidates + """ + + max_window_length = current_data_point + upper_boundary_window + seg_ts = np.array(X[current_data_point: current_data_point + lower_boundary_window]) + current_data_point = current_data_point + lower_boundary_window + error = 0 + while current_data_point < max_window_length and current_data_point < len(X) and error < self.max_error: + seg_ts = np.append(seg_ts, X[current_data_point]) + error = self._calculate_error(seg_ts) + current_data_point = current_data_point + 1 + return seg_ts + + #Create own linear regression, inefficient to use sklearns + def _linear_regression(self, time_series): + """Creates a new time series using linear regression based + on the given time series. + + Parameters + ---------- + time_series : np.array + 1D time series to be transformed. + + Returns + ------- + list + List of transformed segmented time series + """ + + n = len(time_series) + Y = np.array(time_series) + X = np.arange(n).reshape(-1 , 1) + linearRegression = LinearRegression() + linearRegression.fit(X, Y) + regression_line = np.array(linearRegression.predict(X)) + return regression_line + + def _sum_squared_error(self, X, p_X): + """Returns the SSE of a value and its predicted value + + formula: SSE = ∑i (Xi - p_Xi)^2 + + Parameters + ---------- + X : np.array + 1D time series. + p_X: np.array + 1D linear time series formatted using linear regression + + Returns + ------- + error: float + the SSE + """ + + error = np.sum((X - p_X) ** 2) + return error + + def _calculate_error(self, X): + """Returns the SEE of a time series and its linear regression + + Parameters + ---------- + X : np.array + 1D time series. + + Returns + ------- + error: float + the SSE + """ + + lrts = self._linear_regression(X) + sse = self._sum_squared_error(X, lrts) + return sse + + def _create_segment(self, X): + """Create a linear segment of a given time series. + + Parameters + ---------- + X : np.array + 1D time series. + + Returns + ------- + np.array + the linear regression of the time series. + """ + return self._linear_regression(X) + + @classmethod + def get_test_params(cls, parameter_set="default"): + """Return testing parameter settings for the estimator. + + Parameters + ---------- + parameter_set : str, default="default" + + Returns + ------- + params : dict or list of dict, default = {} + Parameters to create testing instances of the class + Each dict are parameters to construct an "interesting" test instance, i.e., + `MyClass(**params)` or `MyClass(**params[i])` creates a valid test instance. + `create_test_instance` uses the first (or only) dictionary in `params` + """ + params = { + "transformer": PiecewiseLinearApproximation.Transformer.SWAB, + "max_error": 5, + } + + return params \ No newline at end of file diff --git a/tsml_eval/_wip/series_transformer/_sw_old.py b/tsml_eval/_wip/series_transformer/_sw_old.py new file mode 100644 index 00000000..b64515b2 --- /dev/null +++ b/tsml_eval/_wip/series_transformer/_sw_old.py @@ -0,0 +1,70 @@ + +from base import BasePLA +import numpy as np +__maintainer__ = [] +__all__ = ["SlidingWindow"] + +class SlidingWindow(BasePLA): + """Piecewise Linear Sliding Window. + + Uses a sliding window algorithm to traverse the dataset in an online manner. + + Parameters + ---------- + max_error: float + The maximum error valuefor the function to find before segmenting the dataset + + References + ---------- + .. [1] Keogh, E., Chu, S., Hart, D. and Pazzani, M., 2001, November. + An online algorithm for segmenting time series. (pp. 289-296). + """ + + def __init__(self, max_error): + super().__init__(max_error) + + #! clean this up, the while loops are not done in a good manner. This is from the pseudocode + def transform(self, time_series): + """Transform a time series + + Parameters + ---------- + time_series : np.array + 1D time series to be transformed. + + Returns + ------- + list + List of transformed segmented time series + """ + + seg_ts = [] + anchor = 0 + while anchor < len(time_series): + i = 2 + while anchor + i -1 < len(time_series) and self.calculate_error(time_series[anchor:anchor + i]) < self.max_error: + i = i + 1 + seg_ts.append(self.create_segment(time_series[anchor:anchor + i - 1])) + anchor = anchor + i - 1 + return seg_ts + + + def transform_flatten(self, time_series): + """Transform a time series and return a 1d array + + Parameters + ---------- + time_series : np.array + 1D time series to be transformed. + + Returns + ------- + list + List of a flattened transformed time series + """ + + pla_timeseries = self.transform(time_series) + print(pla_timeseries) + return np.concatenate(pla_timeseries) + + \ No newline at end of file diff --git a/tsml_eval/_wip/series_transformer/_swab_old.py b/tsml_eval/_wip/series_transformer/_swab_old.py new file mode 100644 index 00000000..162df541 --- /dev/null +++ b/tsml_eval/_wip/series_transformer/_swab_old.py @@ -0,0 +1,117 @@ +from base import BasePLA +import numpy as np +import sys +from _bu import BottomUp + +__maintainer__ = [] +__all__ = ["SWAB"] + +class SWAB(BasePLA): + """ + SWAB (Sliding Window And Bottom-Up) Segmentation. + + Uses SWAB algorithm as described in [1] to traverse the dataset in an online manner. + + Parameters + ---------- + max_error: float + The maximum error valuefor the function to find before segmenting the dataset + + References + ---------- + .. [1] Keogh, E., Chu, S., Hart, D. and Pazzani, M., 2001, November. + An online algorithm for segmenting time series. (pp. 289-296). + """ + + def __init__(self, max_error, sequence_num): + self.bottomup = BottomUp(max_error) + self.sequence_num = sequence_num + super().__init__(max_error) + + #need to check buffer, i think it does grow exponantionally large + def transform(self, time_series): + """Transform a time series + + Parameters + ---------- + time_series : np.array + 1D time series to be transformed. + + Returns + ------- + list + List of transformed segmented time series + """ + seg_ts = [] + + lower_boundary_window = int(self.sequence_num / 2) + upper_boundary_window = self.sequence_num * 2 + + seg = self.best_line(time_series, 0, lower_boundary_window, upper_boundary_window) + current_data_point = len(seg) + buffer = np.array(seg) + + while len(buffer) > 0: + t = self.bottomup.transform(time_series) + seg_ts.append(t[0]) + buffer = buffer[len(t[0]):] + if(current_data_point >= len(time_series)): + seg = self.best_line(time_series, current_data_point, lower_boundary_window, upper_boundary_window) + current_data_point = current_data_point + len(seg) + buffer = np.append(buffer, seg) + else: + buffer = np.array([]) + t = t[1:] + for i in range(len(t)): + seg_ts.append(t[i]) + return seg_ts + + + #finds the next potential segment + def best_line(self, time_series, current_data_point, lower_boundary_window, upper_boundary_window): + """Uses sliding window to find the next best segmentation candidate + + Parameters + ---------- + time_series : np.array + 1D time series to be segmented. + current_data_point : int + the current_data_point we are observing + lower_boundary_window: int + the lower boundary of the window + upper_boundary_window: int + the uppoer boundary of the window + + Returns + ------- + np.array + new found segmentation candidates + """ + + max_window_length = current_data_point + upper_boundary_window + seg_ts = np.array(time_series[current_data_point: current_data_point + lower_boundary_window]) + current_data_point = current_data_point + lower_boundary_window + error = 0 + while current_data_point < max_window_length and current_data_point < len(time_series) and error < self.max_error: + seg_ts = np.append(seg_ts, time_series[current_data_point]) + error = self.calculate_error(seg_ts) + current_data_point = current_data_point + 1 + return seg_ts + + + def transform_flatten(self, time_series): + """Transform a time series and return a 1d array + + Parameters + ---------- + time_series : np.array + 1D time series to be transformed. + + Returns + ------- + list + List of a flattened transformed time series + """ + + pla_timeseries = self.transform(time_series) + return np.concatenate(pla_timeseries) \ No newline at end of file diff --git a/tsml_eval/_wip/series_transformer/_td_old.py b/tsml_eval/_wip/series_transformer/_td_old.py new file mode 100644 index 00000000..47a3d5f5 --- /dev/null +++ b/tsml_eval/_wip/series_transformer/_td_old.py @@ -0,0 +1,110 @@ +from base import BasePLA +import numpy as np +import sys + +__maintainer__ = [] +__all__ = ["TopDown"] + +class TopDown(BasePLA): + """ + Top-Down Segmentation. + + Uses a top-down algorithm to traverse the dataset in an online manner. + + Parameters + ---------- + max_error: float + The maximum error valuefor the function to find before segmenting the dataset + + References + ---------- + .. [1] Keogh, E., Chu, S., Hart, D. and Pazzani, M., 2001, November. + An online algorithm for segmenting time series. (pp. 289-296). + """ + + def __init__(self, max_error): + super().__init__(max_error) + + #Implement a cache system for this + def transform(self, time_series): + """Transform a time series + + Parameters + ---------- + time_series : np.array + 1D time series to be transformed. + + Returns + ------- + list + List of transformed segmented time series + """ + + best_so_far = sys.float_info.max + breakpoint = None + + for i in range(2, len(time_series -2)): + improvement_in_approximation = self.improvement_splitting_here(time_series, i) + if(improvement_in_approximation < best_so_far): + breakpoint = i + best_so_far = improvement_in_approximation + + left_found_segment = time_series[:breakpoint] + right_found_segment = time_series[breakpoint:] + + left_segment = None + right_segment = None + + if self.calculate_error(left_found_segment) > self.max_error: + left_segment = self.transform(left_found_segment) + else: + left_segment = [self.create_segment(left_found_segment)] + + if self.calculate_error(right_found_segment) > self.max_error: + right_segment = self.transform(right_found_segment) + else: + right_segment = [self.create_segment(right_found_segment)] + + return left_segment + right_segment + + + def improvement_splitting_here(self, time_series, breakpoint): + """Returns the squared sum error of the left and right segment + splitted off at a particual point in a time series + + Parameters + ---------- + time_series : np.array + 1D time series. + breakpoint : int + the break point within the time series array + + Returns + ------- + error + the squared sum error of the split segmentations + """ + + left_segment = time_series[:breakpoint] + right_segment = time_series[breakpoint:] + return self.calculate_error(left_segment) + self.calculate_error(right_segment) + + + def transform_flatten(self, time_series): + """Transform a time series and return a 1d array + + Parameters + ---------- + time_series : np.array + 1D time series to be transformed. + + Returns + ------- + list + List of a flattened transformed time series + """ + + pla_timeseries = self.transform(time_series) + print(pla_timeseries) + return np.concatenate(pla_timeseries) + \ No newline at end of file diff --git a/tsml_eval/_wip/series_transformer/manual_test.py b/tsml_eval/_wip/series_transformer/manual_test.py new file mode 100644 index 00000000..17f03e34 --- /dev/null +++ b/tsml_eval/_wip/series_transformer/manual_test.py @@ -0,0 +1,39 @@ +from _pla import PiecewiseLinearApproximation +from aeon.datasets import load_electric_devices_segmentation +from aeon.visualisation import plot_series_with_change_points, plot_series_with_profiles +import matplotlib.pyplot as plt +import numpy as np +from sklearn.preprocessing import MinMaxScaler + + +ts, period_size, true_cps = load_electric_devices_segmentation() +ts = ts[:30] +ts = ts.values + + +ts = np.array([573.0,375.0,301.0,212.0,55.0,34.0,25.0,33.0,113.0,143.0,303.0,615.0,1226.0,1281.0,1221.0,1081.0,866.0,1096.0,1039.0,975.0,746.0,581.0,409.0,182.0]) + +pla = PiecewiseLinearApproximation(PiecewiseLinearApproximation.Transformer.SlidingWindow, float("inf")) +results = pla.fit_transform(ts) + +print("Original: ", ts) +print("PLA : ", results) + +plt.subplot(2, 1, 1) # (rows, columns, subplot_number) +plt.plot(np.arange(len(ts)), ts) +plt.title('Original') +plt.xlabel('x') +plt.ylabel('y1') + +# Create the second subplot (lower plot) +plt.subplot(2, 1, 2) # (rows, columns, subplot_number) +plt.plot(np.arange(len(ts)), results) +plt.title('PLA') +plt.xlabel('x') +plt.ylabel('y2') + +# Adjust layout to prevent overlapping +plt.tight_layout() + +# Display the plot +plt.show() \ No newline at end of file diff --git a/tsml_eval/_wip/series_transformer/test_pla.py b/tsml_eval/_wip/series_transformer/test_pla.py new file mode 100644 index 00000000..4ce41195 --- /dev/null +++ b/tsml_eval/_wip/series_transformer/test_pla.py @@ -0,0 +1,69 @@ +import pytest +import numpy as np +import pandas as pd +from _pla import PiecewiseLinearApproximation + + +@pytest.fixture +def X(): + return np.array([573.0,375.0,301.0,212.0,55.0,34.0,25.0,33.0,113.0,143.0,303.0, + 615.0,1226.0,1281.0,1221.0,1081.0,866.0,1096.0,1039.0,975.0, + 746.0,581.0,409.0,182.0]) + +def test_piecewise_linear_approximation_sliding_window(X): + pla = PiecewiseLinearApproximation(PiecewiseLinearApproximation.Transformer.SlidingWindow, 100) + result = pla.fit_transform(X) + expected = np.array([573., 375., 301., 212., 53., 38., 23., 33., 113., 143., 303., + 615., 1226., 1281., 1221., 1081., 866., 1097.16666667, + 1036.66666667, 976.16666667, 747.16666667, + 578.66666667, 410.16666667, 182.]) + np.testing.assert_array_almost_equal(result, expected) + +def test_piecewise_linear_approximation_top_down(X): + pla = PiecewiseLinearApproximation(PiecewiseLinearApproximation.Transformer.TopDown, 100) + result = pla.fit_transform(X) + expected = np.array([573., 375., 301., 212., 53., 38., 23., 33., 113., 143., 303., + 615., 1226., 1281., 1221., 1081., 866., 1097.16666667, + 1036.66666667, 976.16666667, 746., 581., 409., 182.]) + np.testing.assert_array_almost_equal(result, expected) + +def test_piecewise_linear_approximation_bottom_up(X): + result = PiecewiseLinearApproximation(PiecewiseLinearApproximation.Transformer.BottomUp, 5).fit_transform(X) + expected = np.array([538.8, 423.1, 307.4, 191.7, 48., 40.5, 33., 25.5, 43.6, + 210.2,376.8, 543.4, 1276.5, 1227., 1177.5, 1128., 953.5, + 980.5, 1007.5, 1034.5, 759.1, 572.7, 386.3, 199.9]) + np.testing.assert_array_almost_equal(result, expected) + +def test_piecewise_linear_approximation_SWAB(X): + result = PiecewiseLinearApproximation(PiecewiseLinearApproximation.Transformer.SWAB, 5).fit_transform(X) + expected = np.array([538.8, 423.1, 307.4, 191.7, 48., 40.5, 33., 25.5, 43.6, 210.2, + 376.8, 543.4, 1276.5, 1227., 1177.5, 1128., 953.5, 980.5, + 1007.5, 1034.5, 759.1, 572.7, 386.3, 199.9]) + np.testing.assert_array_almost_equal(result, expected) + +def test_piecewise_linear_approximation_check_diff_in_params(X): + transformers = [PiecewiseLinearApproximation.Transformer.SlidingWindow, + PiecewiseLinearApproximation.Transformer.TopDown, + PiecewiseLinearApproximation.Transformer.BottomUp, + PiecewiseLinearApproximation.Transformer.SWAB] + for i in range(len(transformers)): + low_error_pla = PiecewiseLinearApproximation(transformers[i], 1) + high_error_pla = PiecewiseLinearApproximation(transformers[i], float("inf")) + low_error_result = low_error_pla.fit_transform(X) + high_error_result = high_error_pla.fit_transform(X) + assert not np.allclose(low_error_result, high_error_result) + +def test_piecewise_linear_approximation_wrong_parameters(X): + with pytest.raises(ValueError): + PiecewiseLinearApproximation("Fake Transformer", 100) + with pytest.raises(ValueError): + PiecewiseLinearApproximation(PiecewiseLinearApproximation.Transformer.SWAB, "max_error") + with pytest.raises(ValueError): + PiecewiseLinearApproximation(PiecewiseLinearApproximation.Transformer.SWAB, 100, "buffer_size") + +def test_piecewise_linear_approximation_one_segment(X): + X = X[:2] + pla = PiecewiseLinearApproximation(PiecewiseLinearApproximation.Transformer.BottomUp, 10) + result = pla.fit_transform(X) + assert 0 == len(pla.segment_dense) + np.testing.assert_array_almost_equal(X, result, decimal=1) \ No newline at end of file diff --git a/tsml_eval/segmentation/__init__.py b/tsml_eval/segmentation/__init__.py deleted file mode 100644 index 1602ecbe..00000000 --- a/tsml_eval/segmentation/__init__.py +++ /dev/null @@ -1,12 +0,0 @@ -"""Piecewise Linear Approximation.""" - -__all__ = [ - "BasePLA", - "SlidingWindow", - "TopDown", - "BottomUp" -] -from base import BasePLA -from _sw import SlidingWindow -from _td import TopDown -from _bu import BottomUp \ No newline at end of file diff --git a/tsml_eval/segmentation/_bu.py b/tsml_eval/segmentation/_bu.py deleted file mode 100644 index fca63ee8..00000000 --- a/tsml_eval/segmentation/_bu.py +++ /dev/null @@ -1,41 +0,0 @@ -from base import BasePLA -import numpy as np -import math -__maintainer__ = [] -__all__ = ["BottomUp"] - -class BottomUp(BasePLA): - - def __init__(self, max_error): - super().__init__(max_error) - - #clean the code - def bottomUp(self, time_series): - seg_ts = [] - merge_cost = [] - for i in range(0, len(time_series), 2): - seg_ts.append(self.create_segment(time_series[i: i + 2])) - for i in range(len(seg_ts) - 1): - merge_cost.append(self.calculate_error(seg_ts[i] + seg_ts[i + 1])) - - merge_cost = np.array(merge_cost) - - while len(merge_cost != 0) and min(merge_cost) < self.max_error: - if(len(merge_cost) == len(seg_ts)): - print("error") - pos = np.argmin(merge_cost) - seg_ts[pos] = np.concatenate((seg_ts[pos], seg_ts[pos + 1])) - seg_ts.pop(pos + 1) - if (pos + 1) < len(merge_cost): - merge_cost = np.delete(merge_cost, pos + 1) - else: - merge_cost= np.delete(merge_cost, pos) - - if pos != 0: - merge_cost[pos - 1] = self.calculate_error(np.concatenate((seg_ts[pos - 1], seg_ts[pos]))) - - if((pos + 1) < len(seg_ts)): - merge_cost[pos] = self.calculate_error(np.concatenate((seg_ts[pos], seg_ts[pos + 1]))) - - - return seg_ts \ No newline at end of file diff --git a/tsml_eval/segmentation/_sw.py b/tsml_eval/segmentation/_sw.py deleted file mode 100644 index 6a4fa4ff..00000000 --- a/tsml_eval/segmentation/_sw.py +++ /dev/null @@ -1,40 +0,0 @@ - -from base import BasePLA -import numpy as np -__maintainer__ = [] -__all__ = ["SlidingWindow"] - -class SlidingWindow(BasePLA): - - def __init__(self, max_error): - super().__init__(max_error) - - """work in progress - def sliding_window(self, time_series): - seg_ts = [] - anchor = 0 - for i in range(1, len(time_series)): - if self.calculate_error(time_series[anchor:i]) > self.max_error: - seg_ts.append(self.create_segment(time_series[anchor: i - 1])) - anchor = i - 1 - if(anchor < i): - seg_ts.append(self.create_segment(time_series[anchor: i - 1])) - return np.concatenate(seg_ts) """ - - #! clean this up, the while loops are not done in a good manner. This is from the pseudocode - def sliding_window(self, time_series): - seg_ts = [] - anchor = 0 - while anchor < len(time_series): - i = 2 - while anchor + i -1 < len(time_series) and self.calculate_error(time_series[anchor:anchor + i]) < self.max_error: - i = i + 1 - seg_ts.append(self.create_segment(time_series[anchor:anchor + i - 1])) - anchor = anchor + i - 1 - return seg_ts - - def segment(time_series): - return None - - def pla(time_series): - return None \ No newline at end of file diff --git a/tsml_eval/segmentation/_swab.py b/tsml_eval/segmentation/_swab.py deleted file mode 100644 index a3f82dd1..00000000 --- a/tsml_eval/segmentation/_swab.py +++ /dev/null @@ -1,38 +0,0 @@ -from base import BasePLA -import numpy as np -import sys -import BottomUp - -__maintainer__ = [] -__all__ = ["SWAB"] - -class SWAB(BasePLA): - - def __init__(self, max_error, seg_num = 6): - self.seg_num = seg_num - self.bottomup = BottomUp(max_error) - super().__init__(max_error) - - - def swab(self, time_series): - seg_ts = [] - buffer = np.empty(self.seg_num, dtype=object) - sw_lower_bound = len(buffer) / 2 - sw_upper_bound = len(buffer) * 2 - while len(buffer) < 3: - t = self.bottomup(time_series) - seg_ts.append(t[0]) - buffer = buffer[len(t) - 1:] - return None - - - #finds the next potential segment - def best_line(self, time_series, current_data_point, sw_lower_bound, sw_upper_bound): - seg_ts = [] - error = 0 - while error < self.max_error: - seg_ts.append = time_series[current_data_point] - error = self.calculate_error(seg_ts) - current_data_point = current_data_point + 1 - return seg_ts - \ No newline at end of file diff --git a/tsml_eval/segmentation/_td.py b/tsml_eval/segmentation/_td.py deleted file mode 100644 index 092d1981..00000000 --- a/tsml_eval/segmentation/_td.py +++ /dev/null @@ -1,48 +0,0 @@ -from base import BasePLA -import numpy as np -import sys - -__maintainer__ = [] -__all__ = ["TopDown"] - -class TopDown(BasePLA): - - def __init__(self, max_error): - super().__init__(max_error) - - #Implement a cache system for this - def topDown(self, time_series): - seg_ts = [] - best_so_far = sys.float_info.max - breakpoint = None - for i in range(2, len(time_series -2)): - improvement_in_approximation = self.improvement_splitting_here(time_series, i) - if(improvement_in_approximation < best_so_far): - breakpoint = i - best_so_far = improvement_in_approximation - - if breakpoint == None: - return [time_series] - - left_segment = time_series[:breakpoint] - right_segment = time_series[breakpoint:] - - if self.calculate_error(left_segment) > self.max_error: - seg_ts.extend(self.topDown(left_segment)) - else: - seg_ts.append(left_segment) - - - if self.calculate_error(right_segment) > self.max_error: - seg_ts.extend(self.topDown(right_segment)) - else: - seg_ts.append(right_segment) - - return seg_ts - - - def improvement_splitting_here(self, time_series, breakpoint): - left_segment = time_series[:breakpoint] - right_segment = time_series[breakpoint:] - return self.calculate_error(left_segment) + self.calculate_error(right_segment) - \ No newline at end of file diff --git a/tsml_eval/segmentation/base.py b/tsml_eval/segmentation/base.py deleted file mode 100644 index 62c4109a..00000000 --- a/tsml_eval/segmentation/base.py +++ /dev/null @@ -1,36 +0,0 @@ -"""Abstract base class""" - -__maintainer__ = [] -__all__ = ["BasePLA"] - -import numpy as np -import pandas as pd -from sklearn.linear_model import LinearRegression - -class BasePLA(): - "Base class for piecewise linear approximation (PLA)" - - def __init__(self, max_error): - self.max_error = max_error - - def linear_regression(self, time_series, sequence = None): - n = len(time_series) - Y = np.array(time_series) - X = np.arange(n).reshape(-1 , 1) - linearRegression = LinearRegression() - linearRegression.fit(X, Y) - regression_line = np.array(linearRegression.predict(X)) - return regression_line - - def sum_squared_error(self, time_series, linear_regression_time_series): - "formula: sse = the sum of the differences of the original series against the predicted series squared" - error = np.sum((time_series - linear_regression_time_series) ** 2) - return error - - def calculate_error(self, time_series): - lrts = self.linear_regression(time_series) - sse = self.sum_squared_error(time_series, lrts) - return sse - - def create_segment(self, time_series): - return self.linear_regression(time_series) \ No newline at end of file diff --git a/tsml_eval/segmentation/manual_test.py b/tsml_eval/segmentation/manual_test.py deleted file mode 100644 index 683b06c5..00000000 --- a/tsml_eval/segmentation/manual_test.py +++ /dev/null @@ -1,17 +0,0 @@ -from _sw import SlidingWindow -from _bu import BottomUp -from _td import TopDown -from aeon.datasets import load_electric_devices_segmentation -from aeon.visualisation import plot_series_with_change_points, plot_series_with_profiles -import matplotlib.pyplot as plt -import numpy as np - - -ts, period_size, true_cps = load_electric_devices_segmentation() -ts = ts[0:20] -ts = ts.values -sw = TopDown(100) -results = sw.topDown(ts) -print(len(results)) - -print(results) \ No newline at end of file