diff --git a/pandas/_libs/window/aggregations.pyx b/pandas/_libs/window/aggregations.pyx index 68c05f2bb2c98..a81c9182c04d2 100644 --- a/pandas/_libs/window/aggregations.pyx +++ b/pandas/_libs/window/aggregations.pyx @@ -14,6 +14,7 @@ import numpy as np cimport numpy as cnp from numpy cimport ( + complex64_t, float32_t, float64_t, int64_t, @@ -65,6 +66,10 @@ cdef bint is_monotonic_increasing_start_end_bounds( ): return is_monotonic(start, False)[0] and is_monotonic(end, False)[0] +ctypedef fused float_complex_types: + float64_t + complex64_t + # ---------------------------------------------------------------------- # Rolling sum @@ -129,7 +134,7 @@ cdef inline void remove_sum(float64_t val, int64_t *nobs, float64_t *sum_x, sum_x[0] = t -def roll_sum(const float64_t[:] values, ndarray[int64_t] start, +def roll_sum(float_complex_types[:] values, ndarray[int64_t] start, ndarray[int64_t] end, int64_t minp) -> np.ndarray: cdef: Py_ssize_t i, j @@ -151,25 +156,39 @@ def roll_sum(const float64_t[:] values, ndarray[int64_t] start, e = end[i] if i == 0 or not is_monotonic_increasing_bounds or s >= end[i - 1]: - + if float_complex_types is complex64_t: + prev_value = values[s].real + else: + prev_value = values[s] # setup - prev_value = values[s] num_consecutive_same_value = 0 sum_x = compensation_add = compensation_remove = 0 nobs = 0 for j in range(s, e): - add_sum(values[j], &nobs, &sum_x, &compensation_add, + if float_complex_types is complex64_t: + prev_value = values[j].real + else: + prev_value = values[j] + add_sum(prev_value, &nobs, &sum_x, &compensation_add, &num_consecutive_same_value, &prev_value) else: # calculate deletes + if float_complex_types is complex64_t: + prev_value = values[j].real + else: + prev_value = values[j] for j in range(start[i - 1], s): - remove_sum(values[j], &nobs, &sum_x, &compensation_remove) + remove_sum(prev_value, &nobs, &sum_x, &compensation_remove) # calculate adds for j in range(end[i - 1], e): - add_sum(values[j], &nobs, &sum_x, &compensation_add, + if float_complex_types is complex64_t: + prev_value = values[j].real + else: + prev_value = values[j] + add_sum(prev_value, &nobs, &sum_x, &compensation_add, &num_consecutive_same_value, &prev_value) output[i] = calc_sum(minp, nobs, sum_x, num_consecutive_same_value, prev_value) @@ -251,7 +270,7 @@ cdef inline void remove_mean(float64_t val, Py_ssize_t *nobs, float64_t *sum_x, neg_ct[0] = neg_ct[0] - 1 -def roll_mean(const float64_t[:] values, ndarray[int64_t] start, +def roll_mean(float_complex_types[:] values, ndarray[int64_t] start, ndarray[int64_t] end, int64_t minp) -> np.ndarray: cdef: float64_t val, compensation_add, compensation_remove, sum_x, prev_value @@ -276,10 +295,18 @@ def roll_mean(const float64_t[:] values, ndarray[int64_t] start, # setup compensation_add = compensation_remove = sum_x = 0 nobs = neg_ct = 0 - prev_value = values[s] + + if float_complex_types is complex64_t: + prev_value = values[s].real + else: + prev_value = values[s] + num_consecutive_same_value = 0 for j in range(s, e): - val = values[j] + if float_complex_types is complex64_t: + val = values[j].real + else: + val = values[j] add_mean(val, &nobs, &sum_x, &neg_ct, &compensation_add, &num_consecutive_same_value, &prev_value) @@ -287,12 +314,18 @@ def roll_mean(const float64_t[:] values, ndarray[int64_t] start, # calculate deletes for j in range(start[i - 1], s): - val = values[j] + if float_complex_types is complex64_t: + val = values[j].real + else: + val = values[j] remove_mean(val, &nobs, &sum_x, &neg_ct, &compensation_remove) # calculate adds for j in range(end[i - 1], e): - val = values[j] + if float_complex_types is complex64_t: + val = values[j].real + else: + val = values[j] add_mean(val, &nobs, &sum_x, &neg_ct, &compensation_add, &num_consecutive_same_value, &prev_value) @@ -387,14 +420,14 @@ cdef inline void remove_var(float64_t val, float64_t *nobs, float64_t *mean_x, ssqdm_x[0] = 0 -def roll_var(const float64_t[:] values, ndarray[int64_t] start, +def roll_var(float_complex_types[:] values, ndarray[int64_t] start, ndarray[int64_t] end, int64_t minp, int ddof=1) -> np.ndarray: """ Numerically stable implementation using Welford's method. """ cdef: float64_t mean_x, ssqdm_x, nobs, compensation_add, - float64_t compensation_remove, prev_value + float64_t compensation_remove, prev_value, val int64_t s, e, num_consecutive_same_value Py_ssize_t i, j, N = len(start) ndarray[float64_t] output @@ -417,12 +450,20 @@ def roll_var(const float64_t[:] values, ndarray[int64_t] start, # never removed if i == 0 or not is_monotonic_increasing_bounds or s >= end[i - 1]: - prev_value = values[s] + if float_complex_types is complex64_t: + prev_value = values[s].real + else: + prev_value = values[s] + num_consecutive_same_value = 0 mean_x = ssqdm_x = nobs = compensation_add = compensation_remove = 0 for j in range(s, e): - add_var(values[j], &nobs, &mean_x, &ssqdm_x, &compensation_add, + if float_complex_types is complex64_t: + val = values[j].real + else: + val = values[j] + add_var(val, &nobs, &mean_x, &ssqdm_x, &compensation_add, &num_consecutive_same_value, &prev_value) else: @@ -432,12 +473,20 @@ def roll_var(const float64_t[:] values, ndarray[int64_t] start, # calculate deletes for j in range(start[i - 1], s): - remove_var(values[j], &nobs, &mean_x, &ssqdm_x, + if float_complex_types is complex64_t: + val = values[j].real + else: + val = values[j] + remove_var(val, &nobs, &mean_x, &ssqdm_x, &compensation_remove) # calculate adds for j in range(end[i - 1], e): - add_var(values[j], &nobs, &mean_x, &ssqdm_x, &compensation_add, + if float_complex_types is complex64_t: + val = values[j].real + else: + val = values[j] + add_var(val, &nobs, &mean_x, &ssqdm_x, &compensation_add, &num_consecutive_same_value, &prev_value) output[i] = calc_var(minp, ddof, nobs, ssqdm_x, num_consecutive_same_value) @@ -562,7 +611,7 @@ cdef inline void remove_skew(float64_t val, int64_t *nobs, xxx[0] = t -def roll_skew(ndarray[float64_t] values, ndarray[int64_t] start, +def roll_skew(ndarray[float_complex_types] fused_values, ndarray[int64_t] start, ndarray[int64_t] end, int64_t minp) -> np.ndarray: cdef: Py_ssize_t i, j @@ -572,10 +621,16 @@ def roll_skew(ndarray[float64_t] values, ndarray[int64_t] start, float64_t compensation_x_add, compensation_x_remove float64_t x, xx, xxx float64_t prev_value - int64_t nobs = 0, N = len(start), V = len(values), nobs_mean = 0 + ndarray[float64_t] values + int64_t nobs = 0, N = len(start), V = len(fused_values), nobs_mean = 0 int64_t s, e, num_consecutive_same_value ndarray[float64_t] output, mean_array, values_copy - bint is_monotonic_increasing_bounds + bint is_monotonic_increasing_bound + + if float_complex_types is complex64_t: + values = fused_values.real.astype(float) + else: + values = fused_values minp = max(minp, 3) is_monotonic_increasing_bounds = is_monotonic_increasing_start_end_bounds( @@ -775,7 +830,7 @@ cdef inline void remove_kurt(float64_t val, int64_t *nobs, xxxx[0] = t -def roll_kurt(ndarray[float64_t] values, ndarray[int64_t] start, +def roll_kurt(ndarray[float_complex_types] fused_values, ndarray[int64_t] start, ndarray[int64_t] end, int64_t minp) -> np.ndarray: cdef: Py_ssize_t i, j @@ -786,11 +841,17 @@ def roll_kurt(ndarray[float64_t] values, ndarray[int64_t] start, float64_t compensation_x_remove, compensation_x_add float64_t x, xx, xxx, xxxx float64_t prev_value + ndarray[float64_t] values int64_t nobs, s, e, num_consecutive_same_value - int64_t N = len(start), V = len(values), nobs_mean = 0 + int64_t N = len(start), V = len(fused_values), nobs_mean = 0 ndarray[float64_t] output, values_copy bint is_monotonic_increasing_bounds + if float_complex_types is complex64_t: + values = fused_values.real.astype(float) + else: + values = fused_values + minp = max(minp, 4) is_monotonic_increasing_bounds = is_monotonic_increasing_start_end_bounds( start, end diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py index 9e8f95cf340c4..e6b20cb085539 100644 --- a/pandas/core/window/rolling.py +++ b/pandas/core/window/rolling.py @@ -40,6 +40,7 @@ from pandas.core.dtypes.common import ( ensure_float64, is_bool, + is_complex_dtype, is_integer, is_list_like, is_scalar, @@ -363,7 +364,10 @@ def _prep_values(self, values: ArrayLike) -> np.ndarray: if isinstance(values, ExtensionArray): values = values.to_numpy(np.float64, na_value=np.nan) else: - values = ensure_float64(values) + if is_complex_dtype(values): + values = values.astype(np.complex64) + else: + values = ensure_float64(values) except (ValueError, TypeError) as err: raise TypeError(f"cannot handle this type -> {values.dtype}") from err @@ -601,7 +605,7 @@ def calc(x): step=self.step, ) self._check_window_bounds(start, end, len(x)) - + # x = ensure_float64(x) return func(x, start, end, min_periods, *numba_args) with np.errstate(all="ignore"): diff --git a/pandas/tests/window/test_rolling.py b/pandas/tests/window/test_rolling.py index 4c26cfb95fd85..49eefa66f802b 100644 --- a/pandas/tests/window/test_rolling.py +++ b/pandas/tests/window/test_rolling.py @@ -1871,3 +1871,18 @@ def test_rolling_skew_kurt_floating_artifacts(): assert (result[-2:] == 0).all() result = r.kurt() assert (result[-2:] == -3).all() + + +def test_rolling_imaginary_part_of_complex(arithmetic_win_operators): + # GH 46619 + func_name = arithmetic_win_operators + df = DataFrame([1j, 1 + 2j]) + result = getattr( + df.rolling(2), + func_name, + )() + expected = getattr( + DataFrame([0, 1]).rolling(2), + func_name, + )() + tm.assert_frame_equal(result, expected)