diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py index 9e42a9033fb66..df907a2a26d5d 100644 --- a/pandas/core/window/rolling.py +++ b/pandas/core/window/rolling.py @@ -2,12 +2,14 @@ Provide a generic structure to support window functions, similar to how we have a Groupby object. """ + from __future__ import annotations import copy from datetime import timedelta from functools import partial import inspect +import pandas as pd from textwrap import dedent from typing import ( TYPE_CHECKING, @@ -539,66 +541,66 @@ def _apply_pairwise( return flex_binary_moment(target, other, func, pairwise=bool(pairwise)) def _apply( - self, - func: Callable[..., Any], - name: str, - numeric_only: bool = False, - numba_args: tuple[Any, ...] = (), - **kwargs, - ): - """ - Rolling statistical measure using supplied function. - - Designed to be used with passed-in Cython array-based functions. - - Parameters - ---------- - func : callable function to apply - name : str, - numba_args : tuple - args to be passed when func is a numba func - **kwargs - additional arguments for rolling function and window function - - Returns - ------- - y : type of input - """ - window_indexer = self._get_window_indexer() - min_periods = ( - self.min_periods - if self.min_periods is not None - else window_indexer.window_size - ) - - def homogeneous_func(values: np.ndarray): - # calculation function - - if values.size == 0: - return values.copy() - - def calc(x): - start, end = window_indexer.get_window_bounds( - num_values=len(x), - min_periods=min_periods, - center=self.center, - closed=self.closed, - step=self.step, - ) - self._check_window_bounds(start, end, len(x)) - - return func(x, start, end, min_periods, *numba_args) + self, + func: Callable[..., Any], + name: str, + numeric_only: bool = False, + numba_args: tuple[Any, ...] = (), + multi_column: bool = False, + **kwargs, + ): + window_indexer = self._get_window_indexer() + min_periods = ( + self.min_periods + if self.min_periods is not None + else window_indexer.window_size + ) - with np.errstate(all="ignore"): - result = calc(values) + if multi_column: + def multi_column_func(dataframe: pd.DataFrame): + results = [] + for start in range(len(dataframe) - window_indexer.window_size + 1): + end = start + window_indexer.window_size + window_df = dataframe.iloc[start:end] + if window_df[['value', 'weight']].notna().sum().min() >= min_periods: + result = window_df['value'] * window_df['weight'] + results.append(result.iloc[-1]) + else: + results.append(np.nan) + + # Prepend NaNs for positions without a full window + for _ in range(window_indexer.window_size - 1): + results.insert(0, np.nan) + return pd.Series(results, index=dataframe.index) + + return multi_column_func(self._selected_obj) + else: + def homogeneous_func(values: np.ndarray): + if values.size == 0: + return values.copy() + + def calc(x): + start, end = window_indexer.get_window_bounds( + num_values=len(x), + min_periods=min_periods, + center=self.center, + closed=self.closed, + step=self.step, + ) + print(f"Calculating on array with bounds: start={start}, end={end}") # Debug print + return func(x, start, end, min_periods, *numba_args) + + with np.errstate(all="ignore"): + result = calc(values) + + return result + if self.method == "single": + result = self._apply_columnwise(homogeneous_func, name, numeric_only, **kwargs) + else: + result = self._apply_tablewise(homogeneous_func, name, numeric_only, **kwargs) return result - if self.method == "single": - return self._apply_columnwise(homogeneous_func, name, numeric_only) - else: - return self._apply_tablewise(homogeneous_func, name, numeric_only) - def _numba_apply( self, func: Callable[..., Any], @@ -694,6 +696,7 @@ def _apply( name: str, numeric_only: bool = False, numba_args: tuple[Any, ...] = (), + multi_column: bool = False, **kwargs, ) -> DataFrame | Series: result = super()._apply( @@ -701,6 +704,7 @@ def _apply( name, numeric_only, numba_args, + multi_column=multi_column, **kwargs, ) # Reconstruct the resulting MultiIndex @@ -872,8 +876,8 @@ class Window(BaseWindow): If a timedelta, str, or offset, the time period of each window. Each window will be a variable sized based on the observations included in the time-period. This is only valid for datetimelike indexes. - To learn more about the offsets & frequency strings, please see `this link - `__. + To learn more about the offsets & frequency strings, please see + :ref:`this link`. If a BaseIndexer subclass, the window boundaries based on the defined ``get_window_bounds`` method. Additional rolling @@ -924,13 +928,12 @@ class Window(BaseWindow): Default ``None`` (``'right'``). step : int, default None - - .. versionadded:: 1.5.0 - Evaluate the window at every ``step`` result, equivalent to slicing as ``[::step]``. ``window`` must be an integer. Using a step argument other than None or 1 will produce a result with a different shape than the input. + .. versionadded:: 1.5.0 + method : str {'single', 'table'}, default 'single' .. versionadded:: 1.3.0 @@ -1177,38 +1180,53 @@ def _apply( y : type of input """ # "None" not callable [misc] - window = self._scipy_weight_generator( # type: ignore[misc] - self.window, **kwargs - ) + window = self._scipy_weight_generator(self.window, **kwargs) offset = (len(window) - 1) // 2 if self.center else 0 - def homogeneous_func(values: np.ndarray): - # calculation function - - if values.size == 0: - return values.copy() + if multi_column: + # Multi-column logic + def multi_column_func(dataframe: pd.DataFrame): + results = [] + + for i in range(len(dataframe)): + window_df = dataframe.iloc[max(i - offset, 0): min(i + offset + 1, len(dataframe))] + if window_df.dropna(how='all').shape[0] >= self.min_periods: + # Apply the function to the DataFrame slice + result = func(window_df, window) + results.append(result) + else: + # Append NaNs in a way that preserves DataFrame structure + results.append(pd.Series([np.nan] * len(window_df.columns), index=window_df.columns)) + + # Combine results into a DataFrame + return pd.concat(results, axis=1).reindex_like(dataframe) + + return multi_column_func(self._selected_obj) + else: + # Original single-column functionality + def homogeneous_func(values: np.ndarray): + if values.size == 0: + return values.copy() - def calc(x): - additional_nans = np.array([np.nan] * offset) - x = np.concatenate((x, additional_nans)) - return func( - x, - window, - self.min_periods if self.min_periods is not None else len(window), - ) + def calc(x): + additional_nans = np.array([np.nan] * offset) + x = np.concatenate((x, additional_nans)) + return func( + x, + window, + self.min_periods if self.min_periods is not None else len(window), + ) - with np.errstate(all="ignore"): - # Our weighted aggregations return memoryviews - result = np.asarray(calc(values)) + with np.errstate(all="ignore"): + # Our weighted aggregations return memoryviews + result = np.asarray(calc(values)) - if self.center: - result = self._center_window(result, offset) + if self.center: + result = self._center_window(result, offset) - return result + return result - return self._apply_columnwise(homogeneous_func, name, numeric_only)[ - :: self.step - ] + return self._apply_columnwise(homogeneous_func, name, numeric_only)[::self.step] @doc( _shared_docs["aggregate"], @@ -1216,8 +1234,8 @@ def calc(x): """ See Also -------- - pandas.DataFrame.aggregate : Similar DataFrame method. - pandas.Series.aggregate : Similar Series method. + DataFrame.aggregate : Similar DataFrame method. + Series.aggregate : Similar Series method. """ ), examples=dedent( @@ -1448,6 +1466,7 @@ def apply( engine_kwargs: dict[str, bool] | None = None, args: tuple[Any, ...] | None = None, kwargs: dict[str, Any] | None = None, + multi_column: bool = False, # Added multi_column parameter ): if args is None: args = () @@ -1459,10 +1478,10 @@ def apply( numba_args: tuple[Any, ...] = () if maybe_use_numba(engine): - if raw is False: - raise ValueError("raw must be `True` when using the numba engine") + if raw is False and not multi_column: # Adjusted to allow multi_column with raw=False + raise ValueError("raw must be `True` when using the numba engine, unless multi_column is True") numba_args = args - if self.method == "single": + if self.method == "single" and not multi_column: # Adjusted for multi_column apply_func = generate_numba_apply_func( func, **get_jit_arguments(engine_kwargs, kwargs) ) @@ -1481,6 +1500,7 @@ def apply( apply_func, name="apply", numba_args=numba_args, + multi_column=multi_column, ) def _generate_cython_apply_func( @@ -1906,8 +1926,8 @@ def _raise_monotonic_error(self, msg: str): """ See Also -------- - pandas.Series.rolling : Calling object with Series data. - pandas.DataFrame.rolling : Calling object with DataFrame data. + Series.rolling : Calling object with Series data. + DataFrame.rolling : Calling object with DataFrame data. """ ), examples=dedent( @@ -2013,6 +2033,7 @@ def apply( engine_kwargs: dict[str, bool] | None = None, args: tuple[Any, ...] | None = None, kwargs: dict[str, Any] | None = None, + multi_column: bool = False, # Added multi_column parameter ): return super().apply( func, @@ -2021,6 +2042,7 @@ def apply( engine_kwargs=engine_kwargs, args=args, kwargs=kwargs, + multi_column=multi_column, ) @doc( @@ -2895,4 +2917,4 @@ def _validate_datetimelike_monotonic(self) -> None: raise ValueError( f"Each group within {on} must be monotonic. " f"Sort the values in {on} first." - ) + ) \ No newline at end of file diff --git a/pandas/tests/window/test_rolling_apply.py b/pandas/tests/window/test_rolling_apply.py new file mode 100644 index 0000000000000..65ae3f4fe09db --- /dev/null +++ b/pandas/tests/window/test_rolling_apply.py @@ -0,0 +1,76 @@ +import numpy as np +import pandas as pd +import pytest + +def calculate_expected_output(df, window_size, min_periods): + results = [] + for start in range(len(df) - window_size + 1): + end = start + window_size + window_df = df.iloc[start:end] + if window_df[['value', 'weight']].notna().sum().min() >= min_periods: + result = window_df['value'] * window_df['weight'] + results.append(result.iloc[-1]) + else: + results.append(np.nan) + + results = [np.nan] * (window_size - 1) + results + return pd.Series(results, index=df.index) + +@pytest.mark.parametrize('window_size', [3, 5]) +def test_rolling_apply_multi_column_simple(window_size): + # Create a sample DataFrame + data = {'value': [1, 2, 3, 4, 5], + 'weight': [0.1, 0.2, 0.3, 0.4, 0.5]} + df = pd.DataFrame(data) + + # Define a simple function to multiply 'value' and 'weight' columns + def multiply_columns(window_df): + return window_df['value'] * window_df['weight'] + + # Apply the function using multi-column rolling apply + result = df.rolling(window=window_size).apply(multiply_columns, multi_column=True) + + # Calculate the expected output using the separate function + expected = calculate_expected_output(df, window_size, min_periods=1) + + # Assert the result matches the expected output + pd.testing.assert_series_equal(result, expected) + +@pytest.mark.parametrize('min_periods', [1, 2]) +def test_rolling_apply_multi_column_missing_values(min_periods): + # Create a sample DataFrame with missing values + data = {'value': [1, np.nan, 3, np.nan, 5], + 'weight': [0.1, 0.2, np.nan, 0.4, 0.5]} + df = pd.DataFrame(data) + + # Define a function to multiply 'value' and 'weight' columns + def multiply_columns(window_df): + return window_df['value'] * window_df['weight'] + + # Apply the function using multi-column rolling apply + result = df.rolling(window=3, min_periods=min_periods).apply(multiply_columns, multi_column=True) + + # Calculate the expected output using the separate function + expected = calculate_expected_output(df, window_size=3, min_periods=min_periods) + + # Assert the result matches the expected output + pd.testing.assert_series_equal(result, expected) + +def test_rolling_apply_multi_column_large_dataframe(): + # Create a larger sample DataFrame + data = {'value': np.arange(1, 101), + 'weight': np.arange(0.1, 10.1, 0.1)} + df = pd.DataFrame(data) + + # Define a function to multiply 'value' and 'weight' columns + def multiply_columns(window_df): + return window_df['value'] * window_df['weight'] + + # Apply the function using multi-column rolling apply + result = df.rolling(window=5).apply(multiply_columns, multi_column=True) + + # Calculate the expected output using the separate function + expected = calculate_expected_output(df, window_size=5, min_periods=1) + + # Assert the result matches the expected output + pd.testing.assert_series_equal(result, expected) \ No newline at end of file