Skip to content
1 change: 1 addition & 0 deletions doc/source/whatsnew/v1.2.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,7 @@ Groupby/resample/rolling
- Bug in :meth:`DataFrameGroupBy.rolling` returned wrong values with timeaware window containing ``NaN``. Raises ``ValueError`` because windows are not monotonic now (:issue:`34617`)
- Bug in :meth:`Rolling.__iter__` where a ``ValueError`` was not raised when ``min_periods`` was larger than ``window`` (:issue:`37156`)
- Using :meth:`Rolling.var()` instead of :meth:`Rolling.std()` avoids numerical issues for :meth:`Rolling.corr()` when :meth:`Rolling.var()` is still within floating point precision while :meth:`Rolling.std()` is not (:issue:`31286`)
- Bug in :meth:`Rolling.median` and :meth:`Rolling.quantile` returned wrong values for :class:`BaseIndexer` subclasses with non-monotonic starting or ending points for windows (:issue:`37153`)

Reshaping
^^^^^^^^^
Expand Down
24 changes: 18 additions & 6 deletions pandas/_libs/window/aggregations.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,11 @@ def roll_median_c(ndarray[float64_t] values, ndarray[int64_t] start,
int64_t nobs = 0, N = len(values), s, e, win
int midpoint
ndarray[float64_t] output
bint is_monotonic_increasing_bounds

is_monotonic_increasing_bounds = is_monotonic_increasing_start_end_bounds(
start, end
)

# we use the Fixed/Variable Indexer here as the
# actual skiplist ops outweigh any window computation costs
Expand All @@ -705,7 +710,7 @@ def roll_median_c(ndarray[float64_t] values, ndarray[int64_t] start,
s = start[i]
e = end[i]

if i == 0:
if i == 0 or not is_monotonic_increasing_bounds:

# setup
for j in range(s, e):
Expand Down Expand Up @@ -733,7 +738,6 @@ def roll_median_c(ndarray[float64_t] values, ndarray[int64_t] start,
if notnan(val):
skiplist_remove(sl, val)
nobs -= 1

if nobs >= minp:
midpoint = <int>(nobs / 2)
if nobs % 2:
Expand All @@ -748,6 +752,10 @@ def roll_median_c(ndarray[float64_t] values, ndarray[int64_t] start,

output[i] = res

if not is_monotonic_increasing_bounds:
nobs = 0
sl = skiplist_init(<int>win)

skiplist_destroy(sl)
if err:
raise MemoryError("skiplist_insert failed")
Expand Down Expand Up @@ -935,7 +943,7 @@ def roll_quantile(ndarray[float64_t, cast=True] values, ndarray[int64_t] start,
cdef:
float64_t val, prev, midpoint, idx_with_fraction
skiplist_t *skiplist
int64_t nobs = 0, i, j, s, e, N = len(values)
int64_t nobs = 0, i, j, s, e, N = len(values), win
Py_ssize_t idx
ndarray[float64_t] output
float64_t vlow, vhigh
Expand All @@ -950,6 +958,9 @@ def roll_quantile(ndarray[float64_t, cast=True] values, ndarray[int64_t] start,
except KeyError:
raise ValueError(f"Interpolation '{interpolation}' is not supported")

is_monotonic_increasing_bounds = is_monotonic_increasing_start_end_bounds(
start, end
)
# we use the Fixed/Variable Indexer here as the
# actual skiplist ops outweigh any window computation costs
output = np.empty(N, dtype=float)
Expand All @@ -967,7 +978,10 @@ def roll_quantile(ndarray[float64_t, cast=True] values, ndarray[int64_t] start,
s = start[i]
e = end[i]

if i == 0:
if i == 0 or not is_monotonic_increasing_bounds:
if not is_monotonic_increasing_bounds:
nobs = 0
skiplist = skiplist_init(<int>win)

# setup
for j in range(s, e):
Expand All @@ -977,7 +991,6 @@ def roll_quantile(ndarray[float64_t, cast=True] values, ndarray[int64_t] start,
skiplist_insert(skiplist, val)

else:

# calculate adds
for j in range(end[i - 1], e):
val = values[j]
Expand All @@ -991,7 +1004,6 @@ def roll_quantile(ndarray[float64_t, cast=True] values, ndarray[int64_t] start,
if notnan(val):
skiplist_remove(skiplist, val)
nobs -= 1

if nobs >= minp:
if nobs == 1:
# Single value in skip list
Expand Down
28 changes: 28 additions & 0 deletions pandas/tests/window/test_base_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,3 +263,31 @@ def test_fixed_forward_indexer_count():
result = df.rolling(window=indexer, min_periods=0).count()
expected = DataFrame({"b": [0.0, 0.0, 1.0, 1.0]})
tm.assert_frame_equal(result, expected)


@pytest.mark.parametrize(
("end_value", "values"), [(1, [0.0, 1, 1, 3, 2]), (-1, [0.0, 1, 0, 3, 1])]
)
@pytest.mark.parametrize(("func", "args"), [("median", []), ("quantile", [0.5])])
def test_indexer_quantile_sum(end_value, values, func, args):
# GH 37153
class CustomIndexer(BaseIndexer):
def get_window_bounds(self, num_values, min_periods, center, closed):
start = np.empty(num_values, dtype=np.int64)
end = np.empty(num_values, dtype=np.int64)
for i in range(num_values):
if self.use_expanding[i]:
start[i] = 0
end[i] = max(i + end_value, 1)
else:
start[i] = i
end[i] = i + self.window_size
return start, end

use_expanding = [True, False, True, False, True]
df = DataFrame({"values": range(5)})

indexer = CustomIndexer(window_size=1, use_expanding=use_expanding)
result = getattr(df.rolling(indexer), func)(*args)
expected = DataFrame({"values": values})
tm.assert_frame_equal(result, expected)