Skip to content

Commit 91d8453

Browse files
committed
ENH: use cython bin groupers, fix bug in DatetimeIndex.__getitem causing slowness, some timeseries vbenches
1 parent 30dd412 commit 91d8453

File tree

6 files changed

+95
-156
lines changed

6 files changed

+95
-156
lines changed

pandas/core/groupby.py

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -846,6 +846,39 @@ def agg_series(self, obj, func):
846846
grouper = lib.SeriesBinGrouper(obj, func, self.bins, dummy)
847847
return grouper.get_result()
848848

849+
#----------------------------------------------------------------------
850+
# cython aggregation
851+
852+
_cython_functions = {
853+
'add' : lib.group_add_bin,
854+
'mean' : lib.group_mean_bin,
855+
'var' : lib.group_var_bin,
856+
'std' : lib.group_var_bin
857+
}
858+
859+
def aggregate(self, values, how):
860+
agg_func = self._cython_functions[how]
861+
if values.ndim == 1:
862+
squeeze = True
863+
values = values[:, None]
864+
out_shape = (self.ngroups, 1)
865+
else:
866+
squeeze = False
867+
out_shape = (self.ngroups, values.shape[1])
868+
869+
trans_func = self._cython_transforms.get(how, lambda x: x)
870+
871+
# will be filled in Cython function
872+
result = np.empty(out_shape, dtype=np.float64)
873+
counts = np.zeros(self.ngroups, dtype=np.int32)
874+
875+
agg_func(result, counts, values, self.bins)
876+
result = trans_func(result)
877+
878+
if squeeze:
879+
result = result.squeeze()
880+
881+
return result, counts
849882

850883
class Grouping(object):
851884
"""
@@ -1901,7 +1934,7 @@ def picker(arr):
19011934
return arr[-1] if arr is not None and len(arr) else np.nan
19021935
return picker
19031936

1904-
raise ValueError("Unrecognized method: %s" % how)
1937+
return how
19051938

19061939

19071940
from pandas.util import py3compat

pandas/core/index.py

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1186,7 +1186,7 @@ class DatetimeIndex(Int64Index):
11861186
def __new__(cls, data=None,
11871187
freq=None, start=None, end=None, periods=None,
11881188
dtype=None, copy=False, name=None, tz=None,
1189-
**kwds):
1189+
verify_integrity=True, **kwds):
11901190

11911191
warn = False
11921192
if 'offset' in kwds and kwds['offset']:
@@ -1292,11 +1292,12 @@ def __new__(cls, data=None,
12921292
# TODO: this is horribly inefficient. If user passes data + offset, we
12931293
# need to make sure data points conform. Punting on this
12941294

1295-
if offset is not None:
1296-
for i, ts in enumerate(subarr):
1297-
if not offset.onOffset(Timestamp(ts)):
1298-
val = Timestamp(offset.rollforward(ts)).value
1299-
subarr[i] = val
1295+
if verify_integrity:
1296+
if offset is not None:
1297+
for i, ts in enumerate(subarr):
1298+
if not offset.onOffset(Timestamp(ts)):
1299+
val = Timestamp(offset.rollforward(ts)).value
1300+
subarr[i] = val
13001301

13011302
subarr = subarr.view(cls)
13021303
subarr.name = name
@@ -1305,6 +1306,15 @@ def __new__(cls, data=None,
13051306

13061307
return subarr
13071308

1309+
@classmethod
1310+
def _simple_new(cls, values, name, offset, tz):
1311+
result = values.view(cls)
1312+
result.name = name
1313+
result.offset = offset
1314+
result.tz = tz
1315+
1316+
return result
1317+
13081318
@property
13091319
def tzinfo(self):
13101320
"""
@@ -1740,16 +1750,15 @@ def __getitem__(self, key):
17401750
if result.ndim > 1:
17411751
return result
17421752

1743-
return DatetimeIndex(result, name=self.name, freq=new_offset,
1744-
tz=self.tz)
1753+
return self._simple_new(result, self.name, new_offset, self.tz)
17451754

17461755
# Try to run function on index first, and then on elements of index
17471756
# Especially important for group-by functionality
1748-
def map(self, func_to_map):
1757+
def map(self, f):
17491758
try:
1750-
return func_to_map(self)
1759+
return f(self)
17511760
except:
1752-
return super(DatetimeIndex, self).map(func_to_map)
1761+
return Index.map(self, f)
17531762

17541763
# alias to offset
17551764
@property

pandas/src/groupby.pyx

Lines changed: 10 additions & 139 deletions
Original file line numberDiff line numberDiff line change
@@ -481,6 +481,7 @@ def generate_bins_dt64(ndarray[int64_t] values, ndarray[int64_t] binner,
481481
@cython.boundscheck(False)
482482
@cython.wraparound(False)
483483
def group_add_bin(ndarray[float64_t, ndim=2] out,
484+
ndarray[int32_t] counts,
484485
ndarray[float64_t, ndim=2] values,
485486
ndarray[int32_t] bins):
486487
'''
@@ -503,6 +504,7 @@ def group_add_bin(ndarray[float64_t, ndim=2] out,
503504
if b < ngroups - 1 and i >= bins[b]:
504505
b += 1
505506

507+
counts[b] += 1
506508
for j in range(K):
507509
val = values[i, j]
508510

@@ -515,6 +517,7 @@ def group_add_bin(ndarray[float64_t, ndim=2] out,
515517
if b < ngroups - 1 and i >= bins[b]:
516518
b += 1
517519

520+
counts[b] += 1
518521
val = values[i, 0]
519522

520523
# not nan
@@ -532,6 +535,7 @@ def group_add_bin(ndarray[float64_t, ndim=2] out,
532535
@cython.boundscheck(False)
533536
@cython.wraparound(False)
534537
def group_mean_bin(ndarray[float64_t, ndim=2] out,
538+
ndarray[int32_t] counts,
535539
ndarray[float64_t, ndim=2] values,
536540
ndarray[int32_t] bins):
537541
cdef:
@@ -551,6 +555,7 @@ def group_mean_bin(ndarray[float64_t, ndim=2] out,
551555
if b < ngroups - 1 and i >= bins[b]:
552556
b += 1
553557

558+
counts[b] += 1
554559
for j in range(K):
555560
val = values[i, j]
556561

@@ -563,6 +568,7 @@ def group_mean_bin(ndarray[float64_t, ndim=2] out,
563568
if b < ngroups - 1 and i >= bins[b]:
564569
b += 1
565570

571+
counts[b] += 1
566572
val = values[i, 0]
567573

568574
# not nan
@@ -581,6 +587,7 @@ def group_mean_bin(ndarray[float64_t, ndim=2] out,
581587
@cython.boundscheck(False)
582588
@cython.wraparound(False)
583589
def group_var_bin(ndarray[float64_t, ndim=2] out,
590+
ndarray[int32_t] counts,
584591
ndarray[float64_t, ndim=2] values,
585592
ndarray[int32_t] bins):
586593

@@ -602,6 +609,8 @@ def group_var_bin(ndarray[float64_t, ndim=2] out,
602609
if b < ngroups - 1 and i >= bins[b]:
603610
b += 1
604611

612+
counts[b] += 1
613+
605614
for j in range(K):
606615
val = values[i, j]
607616

@@ -615,6 +624,7 @@ def group_var_bin(ndarray[float64_t, ndim=2] out,
615624
if b < ngroups - 1 and i >= bins[b]:
616625
b += 1
617626

627+
counts[b] += 1
618628
val = values[i, 0]
619629

620630
# not nan
@@ -793,145 +803,6 @@ def generate_slices(ndarray[int32_t] labels, Py_ssize_t ngroups):
793803

794804
return starts, ends
795805

796-
'''
797-
798-
def ts_upsample_mean(ndarray[object] indices,
799-
ndarray[object] buckets,
800-
ndarray[float64_t] values,
801-
inclusive=False):
802-
cdef:
803-
Py_ssize_t i, j, nbuckets, nvalues
804-
ndarray[float64_t] output
805-
object next_bound
806-
float64_t the_sum, val, nobs
807-
808-
nbuckets = len(buckets)
809-
nvalues = len(indices)
810-
811-
assert(len(values) == len(indices))
812-
813-
output = np.empty(nbuckets, dtype=float)
814-
output.fill(np.NaN)
815-
816-
j = 0
817-
for i from 0 <= i < nbuckets:
818-
next_bound = buckets[i]
819-
the_sum = 0
820-
nobs = 0
821-
if inclusive:
822-
while j < nvalues and indices[j] <= next_bound:
823-
val = values[j]
824-
# not NaN
825-
if val == val:
826-
the_sum += val
827-
nobs += 1
828-
j += 1
829-
else:
830-
while j < nvalues and indices[j] < next_bound:
831-
832-
cdef:
833-
Py_ssize_t i, j, nbuckets, nvalues
834-
ndarray[float64_t] output
835-
object next_bound
836-
float64_t the_sum, val, nobs
837-
838-
nbuckets = len(buckets)
839-
nvalues = len(indices)
840-
841-
assert(len(values) == len(indices))
842-
843-
output = np.empty(nbuckets, dtype=float)
844-
output.fill(np.NaN)
845-
846-
j = 0
847-
for i from 0 <= i < nbuckets:
848-
next_bound = buckets[i]
849-
the_sum = 0
850-
nobs = 0
851-
if inclusive:
852-
while j < nvalues and indices[j] <= next_bound:
853-
val = values[j]
854-
# not NaN
855-
if val == val:
856-
the_sum += val
857-
nobs += 1
858-
j += 1
859-
else:
860-
while j < nvalues and indices[j] < next_bound:
861-
val = values[j]
862-
# not NaN
863-
if val == val:
864-
the_sum += val
865-
nobs += 1
866-
j += 1
867-
868-
if nobs > 0:
869-
output[i] = the_sum / nobs
870-
871-
if j >= nvalues:
872-
break
873-
874-
return output
875-
val = values[j]
876-
# not NaN
877-
if val == val:
878-
the_sum += val
879-
nobs += 1
880-
j += 1
881-
882-
if nobs > 0:
883-
output[i] = the_sum / nobs
884-
885-
if j >= nvalues:
886-
break
887-
888-
return output
889-
'''
890-
891-
def ts_upsample_generic(ndarray[object] indices,
892-
ndarray[object] buckets,
893-
ndarray[float64_t] values,
894-
object aggfunc,
895-
inclusive=False):
896-
'''
897-
put something here
898-
'''
899-
cdef:
900-
Py_ssize_t i, j, jstart, nbuckets, nvalues
901-
ndarray[float64_t] output
902-
object next_bound
903-
float64_t the_sum, val, nobs
904-
905-
nbuckets = len(buckets)
906-
nvalues = len(indices)
907-
908-
assert(len(values) == len(indices))
909-
910-
output = np.empty(nbuckets, dtype=float)
911-
output.fill(np.NaN)
912-
913-
j = 0
914-
for i from 0 <= i < nbuckets:
915-
next_bound = buckets[i]
916-
the_sum = 0
917-
nobs = 0
918-
919-
jstart = j
920-
if inclusive:
921-
while j < nvalues and indices[j] <= next_bound:
922-
j += 1
923-
else:
924-
while j < nvalues and indices[j] < next_bound:
925-
j += 1
926-
927-
if nobs > 0:
928-
output[i] = aggfunc(values[jstart:j])
929-
930-
if j >= nvalues:
931-
break
932-
933-
return output
934-
935806

936807
def groupby_arrays(ndarray index, ndarray _labels):
937808
cdef:

pandas/tests/test_timeseries.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -473,8 +473,9 @@ def test_custom_grouper(self):
473473
idx = idx.append(DatetimeIndex([np.datetime64(dti[-1])]))
474474
expect = Series(arr, index=idx)
475475

476+
# cython returns float for now
476477
result = g.agg(np.sum)
477-
assert_series_equal(result, expect)
478+
assert_series_equal(result, expect.astype(float))
478479

479480
data = np.random.rand(len(dti), 10)
480481
df = DataFrame(data, index=dti)

pandas/tests/test_tseries.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,8 @@ def test_group_add_bin():
317317
# bin-based group_add
318318
bins = np.array([3, 6], dtype=np.int32)
319319
out = np.zeros((3, 1), np.float64)
320-
lib.group_add_bin(out, obj, bins)
320+
counts = np.empty(len(out), dtype=np.int32)
321+
lib.group_add_bin(out, counts, obj, bins)
321322

322323
assert_almost_equal(out, exp)
323324

@@ -333,7 +334,8 @@ def test_group_mean_bin():
333334
# bin-based group_mean
334335
bins = np.array([3, 6], dtype=np.int32)
335336
out = np.zeros((3, 1), np.float64)
336-
lib.group_mean_bin(out, obj, bins)
337+
counts = np.empty(len(out), dtype=np.int32)
338+
lib.group_mean_bin(out, counts, obj, bins)
337339

338340
assert_almost_equal(out, exp)
339341

@@ -349,7 +351,9 @@ def test_group_var_bin():
349351
# bin-based group_var
350352
bins = np.array([3, 6], dtype=np.int32)
351353
out = np.zeros((3, 1), np.float64)
352-
lib.group_var_bin(out, obj, bins)
354+
counts = np.empty(len(out), dtype=np.int32)
355+
356+
lib.group_var_bin(out, counts, obj, bins)
353357

354358
assert_almost_equal(out, exp)
355359

0 commit comments

Comments
 (0)