Skip to content
1 change: 1 addition & 0 deletions doc/source/whatsnew/v1.2.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,7 @@ Groupby/resample/rolling
- Bug in :meth:`RollingGroupby.count` where a ``ValueError`` was raised when specifying the ``closed`` parameter (:issue:`35869`)
- Bug in :meth:`DataFrame.groupby.rolling` returning wrong values with partial centered window (:issue:`36040`).
- Bug in :meth:`DataFrameGroupBy.rolling` returned wrong values with timeaware window containing ``NaN``. Raises ``ValueError`` because windows are not monotonic now (:issue:`34617`)
- Bug in :meth:`Rolling.median()` and :meth:`Rolling.quantile()` returned wrong values for ``CustomIndexer`` with non-monotonic starting or ending points for windows (:issue:`37153`)

Reshaping
^^^^^^^^^
Expand Down
91 changes: 61 additions & 30 deletions pandas/_libs/window/aggregations.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -698,22 +698,38 @@ def roll_median_c(ndarray[float64_t] values, ndarray[int64_t] start,

else:

# calculate adds
for j in range(end[i - 1], e):
val = values[j]
if notnan(val):
nobs += 1
err = skiplist_insert(sl, val) != 1
if err:
break

# calculate deletes
for j in range(start[i - 1], s):
val = values[j]
if notnan(val):
skiplist_remove(sl, val)
nobs -= 1

if end[i - 1] > e:
for j in range(e, end[i - 1]):
val = values[j]
if notnan(val):
skiplist_remove(sl, val)
nobs -= 1
else:
# calculate adds
for j in range(end[i - 1], e):
val = values[j]
if notnan(val):
nobs += 1
err = skiplist_insert(sl, val) != 1
if err:
break

# if start was shifted back, add these again
if start[i -1] > s:
for j in range(s, start[i -1]):
val = values[j]
if notnan(val):
nobs += 1
err = skiplist_insert(sl, val) != 1
if err:
break
else:
# calculate deletes if start is shifted forward
for j in range(start[i - 1], s):
val = values[j]
if notnan(val):
skiplist_remove(sl, val)
nobs -= 1
if nobs >= minp:
midpoint = <int>(nobs / 2)
if nobs % 2:
Expand Down Expand Up @@ -955,20 +971,35 @@ def roll_quantile(ndarray[float64_t, cast=True] values, ndarray[int64_t] start,
skiplist_insert(skiplist, val)

else:

# calculate adds
for j in range(end[i - 1], e):
val = values[j]
if notnan(val):
nobs += 1
skiplist_insert(skiplist, val)

# calculate deletes
for j in range(start[i - 1], s):
val = values[j]
if notnan(val):
skiplist_remove(skiplist, val)
nobs -= 1
# Remove values again if end was moved back
if end[i - 1] > e:
for j in range(e, end[i - 1]):
val = values[j]
if notnan(val):
skiplist_remove(skiplist, val)
nobs -= 1
else:
# calculate adds
for j in range(end[i - 1], e):
val = values[j]
if notnan(val):
nobs += 1
skiplist_insert(skiplist, val)

# if start was shifted back, add these again
if start[i -1] > s:
for j in range(s, start[i -1]):
val = values[j]
if notnan(val):
nobs += 1
skiplist_insert(skiplist, val)
else:
# calculate deletes if start is shifted forward
for j in range(start[i - 1], s):
val = values[j]
if notnan(val):
skiplist_remove(skiplist, val)
nobs -= 1

if nobs >= minp:
if nobs == 1:
Expand Down
28 changes: 28 additions & 0 deletions pandas/tests/window/test_base_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,3 +263,31 @@ def test_fixed_forward_indexer_count():
result = df.rolling(window=indexer, min_periods=0).count()
expected = DataFrame({"b": [0.0, 0.0, 1.0, 1.0]})
tm.assert_frame_equal(result, expected)


@pytest.mark.parametrize(
("end_value", "values"), [(1, [0.0, 1, 1, 3, 2]), (-1, [0.0, 1, 0, 3, 1])]
)
@pytest.mark.parametrize(("func", "args"), [("median", []), ("quantile", [0.5])])
def test_indexer_quantile_sum(end_value, values, func, args):
# GH 37153
class CustomIndexer(BaseIndexer):
def get_window_bounds(self, num_values, min_periods, center, closed):
start = np.empty(num_values, dtype=np.int64)
end = np.empty(num_values, dtype=np.int64)
for i in range(num_values):
if self.use_expanding[i]:
start[i] = 0
end[i] = max(i + end_value, 1)
else:
start[i] = i
end[i] = i + self.window_size
return start, end

use_expanding = [True, False, True, False, True]
df = DataFrame({"values": range(5)})

indexer = CustomIndexer(window_size=1, use_expanding=use_expanding)
result = getattr(df.rolling(indexer), func)(*args)
expected = DataFrame({"values": values})
tm.assert_frame_equal(result, expected)