From 00e927da09b94b639ec38590cdc22c2664c3c6dc Mon Sep 17 00:00:00 2001 From: immerrr Date: Thu, 13 Mar 2014 00:09:15 +0400 Subject: [PATCH 1/2] CLN: move some functions from tools.merge to core.internals --- pandas/core/internals.py | 447 +++++++++++++++++++++++++++++++++- pandas/tools/merge.py | 505 ++++----------------------------------- 2 files changed, 486 insertions(+), 466 deletions(-) diff --git a/pandas/core/internals.py b/pandas/core/internals.py index d32664559f7fc..30ac1c6824a67 100644 --- a/pandas/core/internals.py +++ b/pandas/core/internals.py @@ -11,21 +11,24 @@ from pandas.core.common import (_possibly_downcast_to_dtype, isnull, notnull, _NS_DTYPE, _TD_DTYPE, ABCSeries, is_list_like, ABCSparseSeries, _infer_dtype_from_scalar, - _values_from_object, _is_null_datelike_scalar) + _values_from_object, _is_null_datelike_scalar, + is_timedelta64_dtype, is_datetime64_dtype,) from pandas.core.index import (Index, MultiIndex, _ensure_index, - _handle_legacy_indexes) + _handle_legacy_indexes, _all_indexes_same) from pandas.core.indexing import (_maybe_convert_indices, _length_of_indexer) import pandas.core.common as com from pandas.sparse.array import _maybe_to_sparse, SparseArray import pandas.lib as lib import pandas.tslib as tslib import pandas.computation.expressions as expressions +from pandas.util.decorators import cache_readonly from pandas.tslib import Timestamp from pandas import compat from pandas.compat import range, lrange, lmap, callable, map, zip, u from pandas.tseries.timedeltas import _coerce_scalar_to_timedelta_type + class Block(PandasObject): """ @@ -4239,3 +4242,443 @@ def _possibly_compare(a, b, op): raise TypeError("Cannot compare types %r and %r" % tuple(type_names)) return res + + +def may_need_upcasting(blocks): + """ + Check if any of the blocks may need upcasting before merging. + + Returns + ------- + bool + True if there's an IntBlock (but not TimeDeltaBlock) or BoolBlock. + + """ + for block in blocks: + if (isinstance(block, (IntBlock, BoolBlock)) and + not isinstance(block, TimeDeltaBlock)): + return True + return False + + +def upcast_blocks(blocks): + """ + Upcast and consolidate if necessary + """ + new_blocks = [] + for block in blocks: + if isinstance(block, TimeDeltaBlock): + # these are int blocks underlying, but are ok + newb = block + elif isinstance(block, IntBlock): + newb = make_block(block.values.astype(float), block.items, + block.ref_items, placement=block._ref_locs) + elif isinstance(block, BoolBlock): + newb = make_block(block.values.astype(object), block.items, + block.ref_items, placement=block._ref_locs) + else: + newb = block + new_blocks.append(newb) + + # use any ref_items + return _consolidate(new_blocks, newb.ref_items) + + +class BlockJoinOperation(object): + """ + BlockJoinOperation made generic for N DataFrames + + Object responsible for orchestrating efficient join operation between two + BlockManager data structures + """ + def __init__(self, data_list, join_index, indexers, copy=True): + if len(data_list) != len(indexers): + raise AssertionError("data_list and indexers must have the same " + "length") + + self.units = [] + for data, indexer in zip(data_list, indexers): + if not data.is_consolidated(): + data = data.consolidate() + data._set_ref_locs() + self.units.append(_JoinUnit(data.blocks, indexer)) + + self.join_index = join_index + self.axis = 1 + self.copy = copy + self.offsets = None + + # do NOT sort + self.result_items = _concat_indexes([d.items for d in data_list]) + self.result_axes = list(data_list[0].axes) + self.result_axes[0] = self.result_items + self.result_axes[self.axis] = self.join_index + + def _prepare_blocks(self): + blockmaps = [] + + for unit in self.units: + join_blocks = unit.get_upcasted_blocks() + type_map = {} + for blk in join_blocks: + type_map.setdefault(blk.ftype, []).append(blk) + blockmaps.append((unit, type_map)) + + return blockmaps + + def get_result(self): + """ + Returns + ------- + merged : BlockManager + """ + blockmaps = self._prepare_blocks() + kinds = _get_merge_block_kinds(blockmaps) + + # maybe want to enable flexible copying <-- what did I mean? + kind_blocks = [] + for klass in kinds: + klass_blocks = [] + for unit, mapping in blockmaps: + if klass in mapping: + klass_blocks.extend((unit, b) for b in mapping[klass]) + + # blocks that we are going to merge + kind_blocks.append(klass_blocks) + + # create the merge offsets, essentially where the resultant blocks go in the result + if not self.result_items.is_unique: + + # length of the merges for each of the klass blocks + self.offsets = np.zeros(len(blockmaps)) + for kb in kind_blocks: + kl = list(b.get_merge_length() for unit, b in kb) + self.offsets += np.array(kl) + + # merge the blocks to create the result blocks + result_blocks = [] + for klass_blocks in kind_blocks: + res_blk = self._get_merged_block(klass_blocks) + result_blocks.append(res_blk) + + return BlockManager(result_blocks, self.result_axes) + + def _get_merged_block(self, to_merge): + if len(to_merge) > 1: + + # placement set here + return self._merge_blocks(to_merge) + else: + unit, block = to_merge[0] + blk = unit.reindex_block(block, self.axis, + self.result_items, copy=self.copy) + + # set placement / invalidate on a unique result + if self.result_items.is_unique and blk._ref_locs is not None: + if not self.copy: + blk = blk.copy() + blk.set_ref_locs(None) + + return blk + + def _merge_blocks(self, merge_chunks): + """ + merge_chunks -> [(_JoinUnit, Block)] + """ + funit, fblock = merge_chunks[0] + fidx = funit.indexer + + out_shape = list(fblock.get_values().shape) + + n = len(fidx) if fidx is not None else out_shape[self.axis] + + merge_lengths = list(blk.get_merge_length() for unit, blk in merge_chunks) + out_shape[0] = sum(merge_lengths) + out_shape[self.axis] = n + + # Should use Fortran order?? + block_dtype = _get_block_dtype([x[1] for x in merge_chunks]) + out = np.empty(out_shape, dtype=block_dtype) + + sofar = 0 + for unit, blk in merge_chunks: + out_chunk = out[sofar: sofar + len(blk)] + com.take_nd(blk.get_values(), unit.indexer, self.axis, out=out_chunk) + sofar += len(blk) + + # does not sort + new_block_items = _concat_indexes([b.items for _, b in merge_chunks]) + + # need to set placement if we have a non-unique result + # calculate by the existing placement plus the offset in the result set + placement = None + if not self.result_items.is_unique: + placement = [] + offsets = np.append(np.array([0]),self.offsets.cumsum()[:-1]) + for (unit, blk), offset in zip(merge_chunks,offsets): + placement.extend(blk.ref_locs+offset) + + return make_block(out, new_block_items, self.result_items, placement=placement) + + +class _JoinUnit(object): + """ + Blocks plus indexer + """ + + def __init__(self, blocks, indexer): + self.blocks = blocks + self.indexer = indexer + + @cache_readonly + def mask_info(self): + if self.indexer is None or not may_need_upcasting(self.blocks): + return None + else: + mask = self.indexer == -1 + needs_masking = mask.any() + return (mask, needs_masking) + + def get_upcasted_blocks(self): + # will short-circuit and not compute needs_masking if indexer is None + if self.mask_info is not None and self.mask_info[1]: + return upcast_blocks(self.blocks) + return self.blocks + + def reindex_block(self, block, axis, ref_items, copy=True): + if self.indexer is None: + result = block.copy() if copy else block + else: + result = block.reindex_axis(self.indexer, axis=axis, + mask_info=self.mask_info) + result.ref_items = ref_items + return result + + +def _concat_indexes(indexes): + return indexes[0].append(indexes[1:]) + + +def _get_merge_block_kinds(blockmaps): + kinds = set() + for _, mapping in blockmaps: + kinds |= set(mapping) + return kinds + + +def _get_block_dtype(blocks): + if len(blocks) == 0: + return object + blk1 = blocks[0] + dtype = blk1.dtype + + if issubclass(dtype.type, np.floating): + for blk in blocks: + if blk.dtype.type == np.float64: + return blk.dtype + + return dtype + + +def concat_blocks(blocks, axis, new_axes, objs): + values_list = [b.get_values() for b in blocks if b is not None] + concat_values = com._concat_compat(values_list, axis=axis) + + if axis > 0: + # Not safe to remove this check, need to profile + if not _all_indexes_same([b.items for b in blocks]): + # TODO: Either profile this piece or remove. + # FIXME: Need to figure out how to test whether this line exists or does not...(unclear if even possible + # or maybe would require performance test) + raise com.PandasError('dtypes are not consistent throughout ' + 'DataFrames') + return make_block(concat_values, + blocks[0].items, + new_axes[0], + placement=blocks[0]._ref_locs) + else: + offsets = np.r_[0, np.cumsum([len(x._data.axes[0]) for + x in objs])] + indexer = np.concatenate([offsets[i] + b.ref_locs + for i, b in enumerate(blocks) + if b is not None]) + + concat_items = new_axes[0].take(indexer) + block = make_block(concat_values, concat_items, new_axes[0]) + + # we need to set the ref_locs in this block so we have the mapping + # as we now have a non-unique index across dtypes, and we need to + # map the column location to the block location + # GH3602 + if not new_axes[0].is_unique: + block.set_ref_locs(indexer) + + return block + + + + +def _get_reindexed_data(objs, new_axes, axis): + """ + Prepare BlockManagers of specified objs for merging. + + Parameters + ---------- + objs : list of NDFrame + new_axes : list of Index + axis : int + + """ + # HACK: ugh + + reindexed_data = [] + axes_to_reindex = list(enumerate(new_axes)) + axes_to_reindex.pop(axis) + + for obj in objs: + data = obj._data.prepare_for_merge() + for i, ax in axes_to_reindex: + data = data.reindex_axis(ax, axis=i, copy=False) + reindexed_data.append(data) + + return reindexed_data + + +def _prepare_blocks(objs, new_axes, axis): + reindexed_data = _get_reindexed_data(objs, new_axes, axis) + + # we are consolidating as we go, so just add the blocks, no-need for dtype mapping + blockmaps = [] + for data in reindexed_data: + data = data.consolidate() + data._set_ref_locs() + blockmaps.append(data.get_block_map(typ='dict')) + return blockmaps, reindexed_data + + +def _concat_single_item(original_objs, reindexed_objs, item, axis): + # this method only gets called with axis >= 1 + if axis < 1: + raise AssertionError("axis must be >= 1, input was {0}".format(axis)) + + # this is called if we don't have consistent dtypes in a row-wise append + all_values = [] + dtypes = [] + alls = set() + + # figure out the resulting dtype of the combination + for data, orig in zip(reindexed_objs, original_objs): + d = dict([ (t,False) for t in ['object','datetime','timedelta','other'] ]) + if item in orig: + values = data.get(item) + if hasattr(values,'to_dense'): + values = values.to_dense() + all_values.append(values) + + dtype = values.dtype + + if issubclass(dtype.type, (np.object_, np.bool_)): + d['object'] = True + alls.add('object') + elif is_datetime64_dtype(dtype): + d['datetime'] = True + alls.add('datetime') + elif is_timedelta64_dtype(dtype): + d['timedelta'] = True + alls.add('timedelta') + else: + d['other'] = True + alls.add('other') + + else: + all_values.append(None) + d['other'] = True + alls.add('other') + + dtypes.append(d) + + if 'datetime' in alls or 'timedelta' in alls: + + if 'object' in alls or 'other' in alls: + + for v, d in zip(all_values,dtypes): + if d.get('datetime') or d.get('timedelta'): + pass + + # if we have all null, then leave a date/time like type + # if we have only that type left + elif v is None or isnull(v).all(): + + alls.discard('other') + alls.discard('object') + + # create the result + if 'object' in alls: + empty_dtype, fill_value = np.object_, np.nan + elif 'other' in alls: + empty_dtype, fill_value = np.float64, np.nan + elif 'datetime' in alls: + empty_dtype, fill_value = 'M8[ns]', tslib.iNaT + elif 'timedelta' in alls: + empty_dtype, fill_value = 'm8[ns]', tslib.iNaT + else: # pragma + raise AssertionError("invalid dtype determination in concat_single_item") + + to_concat = [] + for obj, item_values in zip(reindexed_objs, all_values): + if item_values is None or isnull(item_values).all(): + shape = obj.shape[1:] + missing_arr = np.empty(shape, dtype=empty_dtype) + missing_arr.fill(fill_value) + to_concat.append(missing_arr) + else: + to_concat.append(item_values) + + return com._concat_compat(to_concat, axis=axis - 1) + + +def get_concatenated_data(objs, new_axes, axis): + # need to conform to same other (joined) axes for block join + blockmaps, rdata = _prepare_blocks(objs, new_axes, axis) + kinds = _get_all_block_kinds(blockmaps) + + try: + # need to conform to same other (joined) axes for block join + new_blocks = [] + for kind in kinds: + klass_blocks = [] + for mapping in blockmaps: + l = mapping.get(kind) + if l is None: + l = [ None ] + klass_blocks.extend(l) + stacked_block = concat_blocks( + blocks=klass_blocks, axis=axis, + new_axes=new_axes, objs=objs) + new_blocks.append(stacked_block) + + for blk in new_blocks: + blk.ref_items = new_axes[0] + + return BlockManager(new_blocks, new_axes).post_merge(objs) + + # Eventual goal would be to move everything to PandasError or other explicit error + except (Exception, com.PandasError): # EAFP + + # should not be possible to fail here for the expected reason with + # axis = 0 + if axis == 0: # pragma: no cover + raise + + new_data = {} + for item in new_axes[0]: + new_data[item] = _concat_single_item(objs, rdata, item, axis) + + return new_data + + +def _get_all_block_kinds(blockmaps): + kinds = set() + for mapping in blockmaps: + kinds |= set(mapping) + return kinds diff --git a/pandas/tools/merge.py b/pandas/tools/merge.py index 90e713d72bdda..391828697502d 100644 --- a/pandas/tools/merge.py +++ b/pandas/tools/merge.py @@ -14,11 +14,9 @@ from pandas.core.index import (Index, MultiIndex, _get_combined_index, _ensure_index, _get_consensus_names, _all_indexes_same) -from pandas.core.internals import (TimeDeltaBlock, IntBlock, BoolBlock, BlockManager, - make_block, _consolidate) -from pandas.util.decorators import cache_readonly, Appender, Substitution +from pandas.core.internals import BlockJoinOperation, get_concatenated_data +from pandas.util.decorators import Appender, Substitution from pandas.core.common import (PandasError, ABCSeries, - is_timedelta64_dtype, is_datetime64_dtype, is_integer_dtype, isnull) import pandas.core.common as com @@ -190,9 +188,9 @@ def get_result(self): # TODO: more efficiently handle group keys to avoid extra # consolidation! - join_op = _BlockJoinOperation([ldata, rdata], join_index, - [left_indexer, right_indexer], axis=1, - copy=self.copy) + join_op = BlockJoinOperation([ldata, rdata], join_index, + [left_indexer, right_indexer], + copy=self.copy) result_data = join_op.get_result() result = DataFrame(result_data) @@ -409,14 +407,14 @@ def _validate_specification(self): if self.right_index: if len(self.left_on) != self.right.index.nlevels: raise ValueError('len(left_on) must equal the number ' - 'of levels in the index of "right"') + 'of levels in the index of "right"') self.right_on = [None] * n elif self.right_on is not None: n = len(self.right_on) if self.left_index: if len(self.right_on) != self.left.index.nlevels: raise ValueError('len(right_on) must equal the number ' - 'of levels in the index of "left"') + 'of levels in the index of "left"') self.left_on = [None] * n if len(self.right_on) != len(self.left_on): raise ValueError("len(right_on) must equal len(left_on)") @@ -495,9 +493,9 @@ def get_result(self): left_join_indexer = left_indexer right_join_indexer = right_indexer - join_op = _BlockJoinOperation([ldata, rdata], join_index, - [left_join_indexer, right_join_indexer], - axis=1, copy=self.copy) + join_op = BlockJoinOperation([ldata, rdata], join_index, + [left_join_indexer, right_join_indexer], + copy=self.copy) result_data = join_op.get_result() result = DataFrame(result_data) @@ -639,238 +637,6 @@ def _sort_labels(uniques, left, right): return new_left, new_right -class _BlockJoinOperation(object): - """ - BlockJoinOperation made generic for N DataFrames - - Object responsible for orchestrating efficient join operation between two - BlockManager data structures - """ - def __init__(self, data_list, join_index, indexers, axis=1, copy=True): - if axis <= 0: # pragma: no cover - raise MergeError('Only axis >= 1 supported for this operation') - - if len(data_list) != len(indexers): - raise AssertionError("data_list and indexers must have the same " - "length") - - self.units = [] - for data, indexer in zip(data_list, indexers): - if not data.is_consolidated(): - data = data.consolidate() - data._set_ref_locs() - self.units.append(_JoinUnit(data.blocks, indexer)) - - self.join_index = join_index - self.axis = axis - self.copy = copy - self.offsets = None - - # do NOT sort - self.result_items = _concat_indexes([d.items for d in data_list]) - self.result_axes = list(data_list[0].axes) - self.result_axes[0] = self.result_items - self.result_axes[axis] = self.join_index - - def _prepare_blocks(self): - blockmaps = [] - - for unit in self.units: - join_blocks = unit.get_upcasted_blocks() - type_map = {} - for blk in join_blocks: - type_map.setdefault(blk.ftype, []).append(blk) - blockmaps.append((unit, type_map)) - - return blockmaps - - def get_result(self): - """ - Returns - ------- - merged : BlockManager - """ - blockmaps = self._prepare_blocks() - kinds = _get_merge_block_kinds(blockmaps) - - # maybe want to enable flexible copying <-- what did I mean? - kind_blocks = [] - for klass in kinds: - klass_blocks = [] - for unit, mapping in blockmaps: - if klass in mapping: - klass_blocks.extend((unit, b) for b in mapping[klass]) - - # blocks that we are going to merge - kind_blocks.append(klass_blocks) - - # create the merge offsets, essentially where the resultant blocks go in the result - if not self.result_items.is_unique: - - # length of the merges for each of the klass blocks - self.offsets = np.zeros(len(blockmaps)) - for kb in kind_blocks: - kl = list(b.get_merge_length() for unit, b in kb) - self.offsets += np.array(kl) - - # merge the blocks to create the result blocks - result_blocks = [] - for klass_blocks in kind_blocks: - res_blk = self._get_merged_block(klass_blocks) - result_blocks.append(res_blk) - - return BlockManager(result_blocks, self.result_axes) - - def _get_merged_block(self, to_merge): - if len(to_merge) > 1: - - # placement set here - return self._merge_blocks(to_merge) - else: - unit, block = to_merge[0] - blk = unit.reindex_block(block, self.axis, - self.result_items, copy=self.copy) - - # set placement / invalidate on a unique result - if self.result_items.is_unique and blk._ref_locs is not None: - if not self.copy: - blk = blk.copy() - blk.set_ref_locs(None) - - return blk - - - def _merge_blocks(self, merge_chunks): - """ - merge_chunks -> [(_JoinUnit, Block)] - """ - funit, fblock = merge_chunks[0] - fidx = funit.indexer - - out_shape = list(fblock.get_values().shape) - - n = len(fidx) if fidx is not None else out_shape[self.axis] - - merge_lengths = list(blk.get_merge_length() for unit, blk in merge_chunks) - out_shape[0] = sum(merge_lengths) - out_shape[self.axis] = n - - # Should use Fortran order?? - block_dtype = _get_block_dtype([x[1] for x in merge_chunks]) - out = np.empty(out_shape, dtype=block_dtype) - - sofar = 0 - for unit, blk in merge_chunks: - out_chunk = out[sofar: sofar + len(blk)] - com.take_nd(blk.get_values(), unit.indexer, self.axis, out=out_chunk) - sofar += len(blk) - - # does not sort - new_block_items = _concat_indexes([b.items for _, b in merge_chunks]) - - # need to set placement if we have a non-unique result - # calculate by the existing placement plus the offset in the result set - placement = None - if not self.result_items.is_unique: - placement = [] - offsets = np.append(np.array([0]),self.offsets.cumsum()[:-1]) - for (unit, blk), offset in zip(merge_chunks,offsets): - placement.extend(blk.ref_locs+offset) - - return make_block(out, new_block_items, self.result_items, placement=placement) - - -class _JoinUnit(object): - """ - Blocks plus indexer - """ - - def __init__(self, blocks, indexer): - self.blocks = blocks - self.indexer = indexer - - @cache_readonly - def mask_info(self): - if self.indexer is None or not _may_need_upcasting(self.blocks): - return None - else: - mask = self.indexer == -1 - needs_masking = mask.any() - return (mask, needs_masking) - - def get_upcasted_blocks(self): - # will short-circuit and not compute needs_masking if indexer is None - if self.mask_info is not None and self.mask_info[1]: - return _upcast_blocks(self.blocks) - return self.blocks - - def reindex_block(self, block, axis, ref_items, copy=True): - if self.indexer is None: - result = block.copy() if copy else block - else: - result = block.reindex_axis(self.indexer, axis=axis, - mask_info=self.mask_info) - result.ref_items = ref_items - return result - - -def _may_need_upcasting(blocks): - for block in blocks: - if isinstance(block, (IntBlock, BoolBlock)) and not isinstance(block, TimeDeltaBlock): - return True - return False - - -def _upcast_blocks(blocks): - """ - Upcast and consolidate if necessary - """ - new_blocks = [] - for block in blocks: - if isinstance(block, TimeDeltaBlock): - # these are int blocks underlying, but are ok - newb = block - elif isinstance(block, IntBlock): - newb = make_block(block.values.astype(float), block.items, - block.ref_items, placement=block._ref_locs) - elif isinstance(block, BoolBlock): - newb = make_block(block.values.astype(object), block.items, - block.ref_items, placement=block._ref_locs) - else: - newb = block - new_blocks.append(newb) - - # use any ref_items - return _consolidate(new_blocks, newb.ref_items) - - -def _get_all_block_kinds(blockmaps): - kinds = set() - for mapping in blockmaps: - kinds |= set(mapping) - return kinds - - -def _get_merge_block_kinds(blockmaps): - kinds = set() - for _, mapping in blockmaps: - kinds |= set(mapping) - return kinds - - -def _get_block_dtype(blocks): - if len(blocks) == 0: - return object - blk1 = blocks[0] - dtype = blk1.dtype - - if issubclass(dtype.type, np.floating): - for blk in blocks: - if blk.dtype.type == np.float64: - return blk.dtype - - return dtype - #---------------------------------------------------------------------- # Concatenate DataFrame objects @@ -1060,221 +826,23 @@ def __init__(self, objs, axis=0, join='outer', join_axes=None, self.new_axes = self._get_new_axes() def get_result(self): - if self._is_series and self.axis == 0: - new_data = com._concat_compat([x.get_values() for x in self.objs]) - name = com._consensus_name_attr(self.objs) - new_data = self._post_merge(new_data) - return Series(new_data, index=self.new_axes[0], name=name) - elif self._is_series: - data = dict(zip(range(len(self.objs)), self.objs)) - index, columns = self.new_axes - tmpdf = DataFrame(data, index=index) - if columns is not None: - tmpdf.columns = columns - return tmpdf + if self._is_series: + if self.axis == 0: + new_data = com._concat_compat([x.get_values() for x in self.objs]) + name = com._consensus_name_attr(self.objs) + return Series(new_data, index=self.new_axes[0], name=name) + else: + data = dict(zip(range(len(self.objs)), self.objs)) + index, columns = self.new_axes + tmpdf = DataFrame(data, index=index) + if columns is not None: + tmpdf.columns = columns + return tmpdf else: - new_data = self._get_concatenated_data() - new_data = self._post_merge(new_data) + new_data = get_concatenated_data(self.objs, self.new_axes, + self.axis) return self.objs[0]._from_axes(new_data, self.new_axes) - def _post_merge(self, data): - if isinstance(data, BlockManager): - data = data.post_merge(self.objs) - return data - - def _get_fresh_axis(self): - return Index(np.arange(len(self._get_concat_axis()))) - - def _prepare_blocks(self): - reindexed_data = self._get_reindexed_data() - - # we are consolidating as we go, so just add the blocks, no-need for dtype mapping - blockmaps = [] - for data in reindexed_data: - data = data.consolidate() - data._set_ref_locs() - blockmaps.append(data.get_block_map(typ='dict')) - return blockmaps, reindexed_data - - def _get_concatenated_data(self): - # need to conform to same other (joined) axes for block join - blockmaps, rdata = self._prepare_blocks() - kinds = _get_all_block_kinds(blockmaps) - - try: - # need to conform to same other (joined) axes for block join - new_blocks = [] - for kind in kinds: - klass_blocks = [] - for mapping in blockmaps: - l = mapping.get(kind) - if l is None: - l = [ None ] - klass_blocks.extend(l) - stacked_block = self._concat_blocks(klass_blocks) - new_blocks.append(stacked_block) - - if self.axis == 0 and self.ignore_index: - self.new_axes[0] = self._get_fresh_axis() - - for blk in new_blocks: - blk.ref_items = self.new_axes[0] - - new_data = BlockManager(new_blocks, self.new_axes) - - # Eventual goal would be to move everything to PandasError or other explicit error - except (Exception, PandasError): # EAFP - - # should not be possible to fail here for the expected reason with - # axis = 0 - if self.axis == 0: # pragma: no cover - raise - - new_data = {} - for item in self.new_axes[0]: - new_data[item] = self._concat_single_item(rdata, item) - - return new_data - - def _get_reindexed_data(self): - # HACK: ugh - - reindexed_data = [] - axes_to_reindex = list(enumerate(self.new_axes)) - axes_to_reindex.pop(self.axis) - - for obj in self.objs: - data = obj._data.prepare_for_merge() - for i, ax in axes_to_reindex: - data = data.reindex_axis(ax, axis=i, copy=False) - reindexed_data.append(data) - - return reindexed_data - - def _concat_blocks(self, blocks): - - values_list = [b.get_values() for b in blocks if b is not None] - concat_values = com._concat_compat(values_list, axis=self.axis) - - if self.axis > 0: - # Not safe to remove this check, need to profile - if not _all_indexes_same([b.items for b in blocks]): - # TODO: Either profile this piece or remove. - # FIXME: Need to figure out how to test whether this line exists or does not...(unclear if even possible - # or maybe would require performance test) - raise PandasError('dtypes are not consistent throughout ' - 'DataFrames') - return make_block(concat_values, - blocks[0].items, - self.new_axes[0], - placement=blocks[0]._ref_locs) - else: - - offsets = np.r_[0, np.cumsum([len(x._data.axes[0]) for - x in self.objs])] - indexer = np.concatenate([offsets[i] + b.ref_locs - for i, b in enumerate(blocks) - if b is not None]) - if self.ignore_index: - concat_items = indexer - else: - concat_items = self.new_axes[0].take(indexer) - - if self.ignore_index: - ref_items = self._get_fresh_axis() - return make_block(concat_values, concat_items, ref_items) - - block = make_block(concat_values, concat_items, self.new_axes[0]) - - # we need to set the ref_locs in this block so we have the mapping - # as we now have a non-unique index across dtypes, and we need to - # map the column location to the block location - # GH3602 - if not self.new_axes[0].is_unique: - block.set_ref_locs(indexer) - - return block - - def _concat_single_item(self, objs, item): - # this is called if we don't have consistent dtypes in a row-wise append - all_values = [] - dtypes = [] - alls = set() - - # figure out the resulting dtype of the combination - for data, orig in zip(objs, self.objs): - d = dict([ (t,False) for t in ['object','datetime','timedelta','other'] ]) - if item in orig: - values = data.get(item) - if hasattr(values,'to_dense'): - values = values.to_dense() - all_values.append(values) - - dtype = values.dtype - - if issubclass(dtype.type, (np.object_, np.bool_)): - d['object'] = True - alls.add('object') - elif is_datetime64_dtype(dtype): - d['datetime'] = True - alls.add('datetime') - elif is_timedelta64_dtype(dtype): - d['timedelta'] = True - alls.add('timedelta') - else: - d['other'] = True - alls.add('other') - - else: - all_values.append(None) - d['other'] = True - alls.add('other') - - dtypes.append(d) - - if 'datetime' in alls or 'timedelta' in alls: - - if 'object' in alls or 'other' in alls: - - for v, d in zip(all_values,dtypes): - if d.get('datetime') or d.get('timedelta'): - pass - - # if we have all null, then leave a date/time like type - # if we have only that type left - elif v is None or isnull(v).all(): - - alls.discard('other') - alls.discard('object') - - # create the result - if 'object' in alls: - empty_dtype, fill_value = np.object_, np.nan - elif 'other' in alls: - empty_dtype, fill_value = np.float64, np.nan - elif 'datetime' in alls: - empty_dtype, fill_value = 'M8[ns]', tslib.iNaT - elif 'timedelta' in alls: - empty_dtype, fill_value = 'm8[ns]', tslib.iNaT - else: # pragma - raise AssertionError("invalid dtype determination in concat_single_item") - - to_concat = [] - for obj, item_values in zip(objs, all_values): - if item_values is None or isnull(item_values).all(): - shape = obj.shape[1:] - missing_arr = np.empty(shape, dtype=empty_dtype) - missing_arr.fill(fill_value) - to_concat.append(missing_arr) - else: - to_concat.append(item_values) - - # this method only gets called with axis >= 1 - if self.axis < 1: - raise AssertionError("axis must be >= 1, input was" - " {0}".format(self.axis)) - return com._concat_compat(to_concat, axis=self.axis - 1) - def _get_result_dim(self): if self._is_series and self.axis == 1: return 2 @@ -1302,13 +870,7 @@ def _get_new_axes(self): for i, ax in zip(indices, self.join_axes): new_axes[i] = ax - if self.ignore_index: - concat_axis = None - else: - concat_axis = self._get_concat_axis() - - new_axes[self.axis] = concat_axis - + new_axes[self.axis] = self._get_concat_axis() return new_axes def _get_comb_axis(self, i): @@ -1324,9 +886,16 @@ def _get_comb_axis(self, i): return _get_combined_index(all_indexes, intersect=self.intersect) def _get_concat_axis(self): + """ + Return index to be used along concatenation axis. + """ if self._is_series: if self.axis == 0: indexes = [x.index for x in self.objs] + elif self.ignore_index: + idx = Index(np.arange(len(self.objs))) + idx.is_unique = True # arange is always unique + return idx elif self.keys is None: names = [] for x in self.objs: @@ -1337,13 +906,21 @@ def _get_concat_axis(self): if x.name is not None: names.append(x.name) else: - return Index(np.arange(len(self.objs))) + idx = Index(np.arange(len(self.objs))) + idx.is_unique = True + return idx + return Index(names) else: return _ensure_index(self.keys) else: indexes = [x._data.axes[self.axis] for x in self.objs] + if self.ignore_index: + idx = Index(np.arange(sum(len(i) for i in indexes))) + idx.is_unique = True + return idx + if self.keys is None: concat_axis = _concat_indexes(indexes) else: From 150a847cf902daee84840ddc7f4d010bc9faa152 Mon Sep 17 00:00:00 2001 From: immerrr Date: Sat, 29 Mar 2014 11:24:37 +0400 Subject: [PATCH 2/2] CLN: calculate & use _ref_locs for unique items as well --- pandas/core/format.py | 6 +- pandas/core/frame.py | 33 +- pandas/core/generic.py | 49 +- pandas/core/internals.py | 1832 +++++++++++++----------------- pandas/tests/test_frame.py | 9 +- pandas/tests/test_internals.py | 39 +- pandas/tools/merge.py | 17 +- pandas/tools/tests/test_merge.py | 5 +- pandas/tseries/resample.py | 3 +- 9 files changed, 869 insertions(+), 1124 deletions(-) diff --git a/pandas/core/format.py b/pandas/core/format.py index a2a68b23c2018..ce59437517d37 100644 --- a/pandas/core/format.py +++ b/pandas/core/format.py @@ -1034,7 +1034,6 @@ def __init__(self, obj, path_or_buf=None, sep=",", na_rep='', float_format=None, self.blocks = self.obj._data.blocks ncols = sum(len(b.items) for b in self.blocks) self.data = [None] * ncols - self.column_map = self.obj._data.get_items_map(use_cached=False) if chunksize is None: chunksize = (100000 / (len(self.cols) or 1)) or 1 @@ -1307,10 +1306,9 @@ def _save_chunk(self, start_i, end_i): float_format=self.float_format, date_format=self.date_format) - for i, item in enumerate(b.items): - + for col_loc, col in zip(b.ref_locs, d): # self.data is a preallocated list - self.data[self.column_map[b][i]] = d[i] + self.data[col_loc] = col ix = data_index.to_native_types(slicer=slicer, na_rep=self.na_rep, float_format=self.float_format, diff --git a/pandas/core/frame.py b/pandas/core/frame.py index 5ecdd4d8b351d..e4fa03a0da01d 100755 --- a/pandas/core/frame.py +++ b/pandas/core/frame.py @@ -1952,7 +1952,9 @@ def _ensure_valid_index(self, value): raise ValueError('Cannot set a frame with no defined index ' 'and a value that cannot be converted to a ' 'Series') - self._data.set_axis(1, value.index.copy(), check_axis=False) + + self._data = self._data.reindex_axis(value.index.copy(), axis=1, + fill_value=np.nan) # we are a scalar # noop @@ -2646,17 +2648,15 @@ def trans(v): else: indexer = _nargsort(labels, kind=kind, ascending=ascending, na_position=na_position) - + + bm_axis = self._get_block_manager_axis(axis) + new_data = self._data.take(indexer, axis=bm_axis, + convert=False, verify=False) + if inplace: - if axis == 1: - new_data = self._data.reindex_items( - self._data.items[indexer], - copy=False) - elif axis == 0: - new_data = self._data.take(indexer) - self._update_inplace(new_data) + return self._update_inplace(new_data) else: - return self.take(indexer, axis=axis, convert=False, is_copy=False) + return self._constructor(new_data).__finalize__(self) def sortlevel(self, level=0, axis=0, ascending=True, inplace=False): """ @@ -2691,16 +2691,13 @@ def sortlevel(self, level=0, axis=0, ascending=True, inplace=False): else: return self.take(indexer, axis=axis, convert=False) + bm_axis = self._get_block_manager_axis(axis) + new_data = self._data.take(indexer, axis=bm_axis, + convert=False, verify=False) if inplace: - if axis == 1: - new_data = self._data.reindex_items( - self._data.items[indexer], - copy=False) - elif axis == 0: - new_data = self._data.take(indexer) - self._update_inplace(new_data) + return self._update_inplace(new_data) else: - return self.take(indexer, axis=axis, convert=False, is_copy=False) + return self._constructor(new_data).__finalize__(self) def swaplevel(self, i, j, axis=0): """ diff --git a/pandas/core/generic.py b/pandas/core/generic.py index fc7883f789703..d1d95d8965fd7 100644 --- a/pandas/core/generic.py +++ b/pandas/core/generic.py @@ -565,7 +565,7 @@ def f(x): f = _get_rename_function(v) baxis = self._get_block_manager_axis(axis) - result._data = result._data.rename(f, axis=baxis, copy=copy) + result._data = result._data.rename_axis(f, axis=baxis, copy=copy) result._clear_item_cache() if inplace: @@ -1217,21 +1217,9 @@ def take(self, indices, axis=0, convert=True, is_copy=True): taken : type of caller """ - # check/convert indicies here - if convert: - axis = self._get_axis_number(axis) - indices = _maybe_convert_indices( - indices, len(self._get_axis(axis))) - - baxis = self._get_block_manager_axis(axis) - if baxis == 0: - labels = self._get_axis(axis) - new_items = labels.take(indices) - new_data = self._data.reindex_axis(new_items, indexer=indices, - axis=baxis) - else: - new_data = self._data.take(indices, axis=baxis) - + new_data = self._data.take(indices, + axis=self._get_block_manager_axis(axis), + convert=True, verify=True) result = self._constructor(new_data).__finalize__(self) # maybe set copy if we didn't actually change the index @@ -1719,30 +1707,15 @@ def _reindex_with_indexers(self, reindexers, method=None, if index is None: continue - index = _ensure_index(index) - # reindex the axis - if method is not None: - new_data = new_data.reindex_axis( - index, indexer=indexer, method=method, axis=baxis, - fill_value=fill_value, limit=limit, copy=copy) - - elif indexer is not None: - # TODO: speed up on homogeneous DataFrame objects + index = _ensure_index(index) + if indexer is not None: indexer = com._ensure_int64(indexer) - new_data = new_data.reindex_indexer(index, indexer, axis=baxis, - fill_value=fill_value, - allow_dups=allow_dups) - - elif (baxis == 0 and index is not None and - index is not new_data.axes[baxis]): - new_data = new_data.reindex_items(index, copy=copy, - fill_value=fill_value) - - elif (baxis > 0 and index is not None and - index is not new_data.axes[baxis]): - new_data = new_data.copy(deep=copy) - new_data.set_axis(baxis, index) + + # TODO: speed up on homogeneous DataFrame objects + new_data = new_data.reindex_indexer(index, indexer, axis=baxis, + fill_value=fill_value, + allow_dups=allow_dups) if copy and new_data is self._data: new_data = new_data.copy() diff --git a/pandas/core/internals.py b/pandas/core/internals.py index 30ac1c6824a67..11ccb30367056 100644 --- a/pandas/core/internals.py +++ b/pandas/core/internals.py @@ -3,7 +3,7 @@ import operator from datetime import datetime, timedelta import copy -from collections import defaultdict +from collections import defaultdict, OrderedDict import numpy as np from pandas.core.base import PandasObject @@ -103,15 +103,31 @@ def ref_locs(self): if self._is_single_block or self.ref_items.equals(self.items): indexer = np.arange(len(self.items)) else: + if not self.ref_items.is_unique: + raise AssertionError("ref_locs undefined for non-unique ref_items") indexer = self.ref_items.get_indexer(self.items) indexer = com._ensure_platform_int(indexer) if (indexer == -1).any(): + # this means that we may have nan's in our block + nan_positions = isnull(self.items).nonzero()[0] + if len(nan_positions) == 0: + # This specific check is caused by a bug in NumPy (up + # to 1.8.1) that silently consumes invalid broadcasting + # of empty to non-empty if setitem is fancy-indexed: + # + # >>> arr = np.array([1]) + # >>> arr[:1] = np.array([]) + # Traceback (most recent call last): + # File "", line 1, in + # ValueError: could not broadcast input array from shape (0) into shape (1) + # >>> arr[[0]] = np.array([]) + # >>> arr[[True]] = np.array([]) + raise AssertionError('Some block items were not in ' + 'block ref_items') - # this means that we have nan's in our block try: - indexer[indexer == -1] = np.arange( - len(self.items))[isnull(self.items)] + indexer[indexer == -1] = nan_positions except: raise AssertionError('Some block items were not in ' 'block ref_items') @@ -208,6 +224,22 @@ def _slice(self, slicer): """ return a slice of my values """ return self.values[slicer] + def _getitem_block(self, slicer): + """ + Perform __getitem__-like, return result as block. + """ + if isinstance(slicer, tuple): + axis0_slicer = slicer[0] + else: + axis0_slicer = slicer + + return self.__class__(values=self.values[slicer], + items=self.items[axis0_slicer], + ref_items=self.ref_items, + ndim=self.ndim, + fastpath=True, + placement=self.ref_locs[axis0_slicer]) + @property def shape(self): return self.values.shape @@ -501,24 +533,33 @@ def prepare_for_merge(self, **kwargs): def post_merge(self, items, **kwargs): """ we are non-sparse block, try to convert to a sparse block(s) """ - overlap = set(items.keys()) & set(self.items) - if len(overlap): - overlap = _ensure_index(overlap) + sparsified_mask = self.items.isin(items.keys()) - new_blocks = [] - for item in overlap: - dtypes = set(items[item]) + if not sparsified_mask.any(): + return self - # this is a safe bet with multiple dtypes - dtype = list(dtypes)[0] if len(dtypes) == 1 else np.float64 + new_blocks = [] + for i in sparsified_mask.nonzero()[0]: + item = self.items[i] + ref_loc = self.ref_locs[i] - b = make_block(SparseArray(self.get(item), dtype=dtype), - [item], self.ref_items) - new_blocks.append(b) + dtypes = set(items[item]) + # this is a safe bet with multiple dtypes + dtype = list(dtypes)[0] if len(dtypes) == 1 else np.float64 - return new_blocks + new_blocks.append(make_block( + values=SparseArray(self.iget(i), dtype=dtype), items=[item], + ref_items=self.ref_items, placement=[ref_loc])) - return self + nonsparsified_locs = (~sparsified_mask).nonzero()[0] + if len(nonsparsified_locs): + new_blocks.append(make_block( + values=self.values[nonsparsified_locs], + items=self.items[nonsparsified_locs], + ref_items=self.ref_items, + placement=self.ref_locs[nonsparsified_locs])) + + return new_blocks def _can_hold_element(self, value): raise NotImplementedError() @@ -1790,6 +1831,12 @@ def __init__(self, values, items, ref_items, ndim=None, fastpath=False, self.ndim = ndim self._ref_locs = None + if placement is not None: + if len(items) != len(placement): + raise AssertionError("Length mismatch between" + " items and placement.") + self._ref_locs = np.asanyarray(placement, dtype=np.int_) + self.values = values if fastpath: self.items = items @@ -1828,6 +1875,11 @@ def sp_values(self, v): kind=self.kind, dtype=v.dtype, fill_value=self.fill_value, copy=False) + def iget(self, col): + if col != 0: + raise IndexError("SparseBlock only contains one item") + return self.values + @property def sp_index(self): return self.values.sp_index @@ -1847,7 +1899,8 @@ def should_store(self, value): def prepare_for_merge(self, **kwargs): """ create a dense block """ - return make_block(self.get_values(), self.items, self.ref_items) + return make_block(self.get_values(), self.items, self.ref_items, + placement=self.ref_locs) def post_merge(self, items, **kwargs): return self @@ -1876,9 +1929,19 @@ def get_values(self, dtype=None): def get_merge_length(self): return 1 + def copy(self, deep=True, ref_items=None): + if ref_items is None: + ref_items = self.ref_items + + return self.make_block(values=self.values, + items=self.items, ref_items=ref_items, + sparse_index=self.sp_index, + kind=self.kind, copy=deep, + placement=self.ref_locs) + def make_block(self, values, items=None, ref_items=None, sparse_index=None, kind=None, dtype=None, fill_value=None, copy=False, - fastpath=True): + fastpath=True, placement=None): """ return a new block """ if dtype is None: dtype = self.dtype @@ -1892,7 +1955,7 @@ def make_block(self, values, items=None, ref_items=None, sparse_index=None, kind=kind or self.kind, dtype=dtype, fill_value=fill_value, copy=copy) return make_block(new_values, items, ref_items, ndim=self.ndim, - fastpath=fastpath) + fastpath=fastpath, placement=placement) def interpolate(self, method='pad', axis=0, inplace=False, limit=None, fill_value=None, **kwargs): @@ -2076,6 +2139,42 @@ class BlockManager(PandasObject): lightweight blocked set of labeled data to be manipulated by the DataFrame public API class + Attributes + ---------- + shape + ndim + axes + values + items + + Methods + ------- + set_axis(axis, new_labels) + copy(deep=True) + + get_dtype_counts + get_ftype_counts + get_dtypes + get_ftypes + + apply(func, axes, block_filter_fn) + + get_bool_data + get_numeric_data + + get_slice(slice_like, axis) + get(label) + iget(loc) + get_scalar(label_tup) + + take(indexer, axis) + reindex_axis(new_labels, axis) + reindex_indexer(new_labels, indexer, axis) + + delete(label) + insert(loc, label, value) + set(label, value) + Parameters ---------- @@ -2085,18 +2184,21 @@ class BlockManager(PandasObject): This is *not* a public API class """ __slots__ = ['axes', 'blocks', '_ndim', '_shape', '_known_consolidated', - '_is_consolidated', '_has_sparse', '_ref_locs', '_items_map'] + '_is_consolidated', '_has_sparse', '_ref_locs'] def __init__(self, blocks, axes, do_integrity_check=True, fastpath=True): self.axes = [_ensure_index(ax) for ax in axes] self.blocks = blocks - ndim = self.ndim for block in blocks: - if not block.is_sparse and ndim != block.ndim: - raise AssertionError(('Number of Block dimensions (%d) must ' - 'equal number of axes (%d)') - % (block.ndim, ndim)) + if block.is_sparse: + if len(block.ref_locs) != 1: + raise AssertionError("Sparse block refers to multiple items") + else: + if self.ndim != block.ndim: + raise AssertionError(('Number of Block dimensions (%d) must ' + 'equal number of axes (%d)') + % (block.ndim, self.ndim)) if do_integrity_check: self._verify_integrity() @@ -2104,9 +2206,7 @@ def __init__(self, blocks, axes, do_integrity_check=True, fastpath=True): self._has_sparse = False self._consolidate_check() - # we have a duplicate items index, setup the block maps - if not self.items.is_unique: - self._set_ref_locs(do_refs=True) + self._rebuild_ref_locs() def make_empty(self, axes=None): """ return an empty BlockManager with the items axis of len 0 """ @@ -2130,225 +2230,63 @@ def __nonzero__(self): @property def shape(self): - if getattr(self, '_shape', None) is None: - self._shape = tuple(len(ax) for ax in self.axes) - return self._shape + return tuple(len(ax) for ax in self.axes) @property def ndim(self): - if getattr(self, '_ndim', None) is None: - self._ndim = len(self.axes) - return self._ndim + return len(self.axes) - def _set_axis(self, axis, value, check_axis=True): - cur_axis = self.axes[axis] - value = _ensure_index(value) + def set_axis(self, axis, new_labels): + new_labels = _ensure_index(new_labels) + old_len = len(self.axes[axis]) + new_len = len(new_labels) - if check_axis and len(value) != len(cur_axis): + if new_len != old_len: raise ValueError('Length mismatch: Expected axis has %d elements, ' - 'new values have %d elements' % (len(cur_axis), - len(value))) - - self.axes[axis] = value - self._shape = None - return cur_axis, value - - def set_axis(self, axis, value, maybe_rename=True, check_axis=True): - cur_axis, value = self._set_axis(axis, value, check_axis) + 'new values have %d elements' % (old_len, new_len)) + self.axes[axis] = new_labels if axis == 0: - - # set/reset ref_locs based on the current index - # and map the new index if needed - self._set_ref_locs(labels=cur_axis) - - # take via ref_locs for block in self.blocks: - block.set_ref_items(self.items, maybe_rename=maybe_rename) - - # set/reset ref_locs based on the new index - self._set_ref_locs(labels=value, do_refs=True) - - def _reset_ref_locs(self): - """ take the current _ref_locs and reset ref_locs on the blocks - to correctly map, ignoring Nones; - reset both _items_map and _ref_locs """ - - # let's reset the ref_locs in individual blocks - if self.items.is_unique: - for b in self.blocks: - b._ref_locs = None - else: - for b in self.blocks: - b.reset_ref_locs() - self._rebuild_ref_locs() - - self._ref_locs = None - self._items_map = None + block.ref_items = new_labels + block.items = new_labels[block.ref_locs] def _rebuild_ref_locs(self): - """Take _ref_locs and set the individual block ref_locs, skipping Nones - no effect on a unique index """ - if getattr(self, '_ref_locs', None) is not None: - item_count = 0 - for v in self._ref_locs: - if v is not None: - block, item_loc = v - if block._ref_locs is None: - block.reset_ref_locs() - block._ref_locs[item_loc] = item_count - item_count += 1 - - def _set_ref_locs(self, labels=None, do_refs=False): + Update mgr._ref_locs according to blk.ref_locs. """ - if we have a non-unique index on this axis, set the indexers - we need to set an absolute indexer for the blocks - return the indexer if we are not unique - - labels : the (new) labels for this manager - ref : boolean, whether to set the labels (one a 1-1 mapping) - - """ - - if labels is None: - labels = self.items - - # we are unique, and coming from a unique - is_unique = labels.is_unique - if is_unique and not do_refs: - - if not self.items.is_unique: - - # reset our ref locs - self._ref_locs = None - for b in self.blocks: - b._ref_locs = None - - return None - - # we are going to a non-unique index - # we have ref_locs on the block at this point - if (not is_unique and do_refs) or do_refs == 'force': - - # create the items map - im = getattr(self, '_items_map', None) - if im is None: + blocks = np.empty(self.shape[0], dtype=np.object_) + blk_locs = np.empty(self.shape[0], dtype=np.int_) + blk_locs.fill(-1) - im = dict() - for block in self.blocks: - - # if we have a duplicate index but - # _ref_locs have not been set - try: - rl = block.ref_locs - except: - raise AssertionError( - 'Cannot create BlockManager._ref_locs because ' - 'block [%s] with duplicate items [%s] does not ' - 'have _ref_locs set' % (block, labels)) - - m = maybe_create_block_in_items_map(im, block) - for i, item in enumerate(block.items): - m[i] = rl[i] - - self._items_map = im - - # create the _ref_loc map here - rl = [None] * len(labels) - for block, items in im.items(): - for i, loc in enumerate(items): - rl[loc] = (block, i) - self._ref_locs = rl - return rl - - elif do_refs: - self._reset_ref_locs() - - # return our cached _ref_locs (or will compute again - # when we recreate the block manager if needed - return getattr(self, '_ref_locs', None) - - def get_items_map(self, use_cached=True): - """ - return an inverted ref_loc map for an item index - block -> item (in that block) location -> column location - - use_cached : boolean, use the cached items map, or recreate - """ - - # cache check - if use_cached: - im = getattr(self, '_items_map', None) - if im is not None: - return im - - im = dict() - rl = self._set_ref_locs() - - # we have a non-duplicative index - if rl is None: - - axis = self.axes[0] - for block in self.blocks: - - m = maybe_create_block_in_items_map(im, block) - for i, item in enumerate(block.items): - m[i] = axis.get_loc(item) - - # use the ref_locs to construct the map - else: - - for i, (block, idx) in enumerate(rl): + for blk in self.blocks: + rl = blk.ref_locs + blocks[rl] = blk + blk_locs[rl] = np.arange(len(rl)) - m = maybe_create_block_in_items_map(im, block) - m[idx] = i + if (blk_locs == -1).any(): + raise AssertionError("Gaps in blk ref_locs") - self._items_map = im - return im + self._ref_locs = lib.fast_zip([blocks, blk_locs]) # make items read only for now def _get_items(self): return self.axes[0] items = property(fget=_get_items) - def _get_counts(self, f): - """ return a dict of the counts of the function in BlockManager """ - self._consolidate_inplace() - counts = dict() - for b in self.blocks: - v = f(b) - counts[v] = counts.get(v, 0) + b.shape[0] - return counts - - def _get_types(self, f): - """ return a list of the f per item """ - self._consolidate_inplace() - - # unique - if self.items.is_unique: - l = [ None ] * len(self.items) - for b in self.blocks: - v = f(b) - for rl in b.ref_locs: - l[rl] = v - return l - - # non-unique - ref_locs = self._set_ref_locs() - return [ f(ref_locs[i][0]) for i, item in enumerate(self.items) ] - def get_dtype_counts(self): - return self._get_counts(lambda b: b.dtype.name) + from collections import Counter + return dict(Counter(d.name for d in self.get_dtypes())) def get_ftype_counts(self): - return self._get_counts(lambda b: b.ftype) + from collections import Counter + return dict(Counter(self.get_ftypes())) def get_dtypes(self): - return self._get_types(lambda b: b.dtype) + return [rl[0].dtype for rl in self._ref_locs] def get_ftypes(self): - return self._get_types(lambda b: b.ftype) + return [rl[0].ftype for rl in self._ref_locs] def __getstate__(self): block_values = [b.values for b in self.blocks] @@ -2380,6 +2318,7 @@ def __setstate__(self, state): def _post_setstate(self): self._is_consolidated = False self._known_consolidated = False + self._rebuild_ref_locs() self._set_has_sparse() def __len__(self): @@ -2404,9 +2343,8 @@ def _verify_integrity(self): if block.ref_items is not self.items: raise AssertionError("Block ref_items must be BlockManager " "items") - if not block.is_sparse and block.values.shape[1:] != mgr_shape[1:]: - construction_error( - tot_items, block.values.shape[1:], self.axes) + if not block.is_sparse and block.shape[1:] != mgr_shape[1:]: + construction_error(tot_items, block.shape[1:], self.axes) if len(self.items) != tot_items: raise AssertionError('Number of manager items must equal union of ' 'block items\n# manager items: {0}, # ' @@ -2594,25 +2532,16 @@ def is_datelike_mixed_type(self): self._consolidate_inplace() return any([block.is_datelike for block in self.blocks]) - def get_block_map(self, copy=False, typ=None, columns=None, - is_numeric=False, is_bool=False): - """ return a dictionary mapping the ftype -> block list + def get_block_list(self, copy=False, columns=None, + is_numeric=False, is_bool=False): + """ return a block list Parameters ---------- - typ : return a list/dict copy : copy if indicated columns : a column filter list filter if the type is indicated """ - # short circuit - mainly for merging - if (typ == 'dict' and columns is None and not is_numeric and - not is_bool and not copy): - bm = defaultdict(list) - for b in self.blocks: - bm[str(b.ftype)].append(b) - return bm - self._consolidate_inplace() if is_numeric: @@ -2636,25 +2565,13 @@ def maybe_copy(b): b = b.copy() return b - if typ == 'list': - bm = [] - for b in self.blocks: - if filter_blocks(b): - b = filter_columns(b) - if b is not None: - bm.append(maybe_copy(b)) + bm = [] + for b in self.blocks: + if filter_blocks(b): + b = filter_columns(b) + if b is not None: + bm.append(maybe_copy(b)) - else: - if typ == 'dtype': - key = lambda b: b.dtype - else: - key = lambda b: b.ftype - bm = defaultdict(list) - for b in self.blocks: - if filter_blocks(b): - b = filter_columns(b) - if b is not None: - bm[str(key(b))].append(maybe_copy(b)) return bm def get_bool_data(self, **kwargs): @@ -2672,84 +2589,74 @@ def get_data(self, copy=False, columns=None, **kwargs): copy : boolean, default False Whether to copy the blocks """ - blocks = self.get_block_map( - typ='list', copy=copy, columns=columns, **kwargs) - if len(blocks) == 0: - return self.make_empty() + blocks = self.get_block_list( + copy=copy, columns=columns, **kwargs) return self.combine(blocks, copy=copy) def combine(self, blocks, copy=True): """ return a new manager with the blocks """ + if len(blocks) == 0: + return self.make_empty() + indexer = np.sort(np.concatenate([b.ref_locs for b in blocks])) + inv_indexer = _invert_reordering(indexer) new_items = self.items.take(indexer) new_blocks = [] for b in blocks: - b = b.reindex_items_from(new_items, copy=copy) - new_blocks.extend(_valid_blocks(b)) + b = b.copy(deep=False) + b._ref_locs = inv_indexer.take(b.ref_locs) + b.ref_items = new_items + new_blocks.append(b) + new_axes = list(self.axes) new_axes[0] = new_items return self.__class__(new_blocks, new_axes, do_integrity_check=False) def get_slice(self, slobj, axis=0): new_axes = list(self.axes) - new_axes[axis] = new_axes[axis][slobj] if axis == 0: new_items = new_axes[0] # we want to preserver the view of a single-block - if len(self.blocks) == 1: - + if (len(self.blocks) == 1 and + (self.blocks[0]._ref_locs is None + or (self.blocks[0]._ref_locs == np.arange(self.shape[0])).all())): blk = self.blocks[0] - ref_locs = blk.take_ref_locs(slobj) newb = make_block(blk._slice(slobj), new_items, new_items, klass=blk.__class__, fastpath=True, - placement=ref_locs) + placement=None) new_blocks = [newb] else: - return self.reindex_items( - new_items, indexer=np.arange(len(self.items))[slobj]) + return self.reindex_indexer( + new_items, indexer=np.arange(len(self.items))[slobj], + axis=0, allow_dups=True) else: - new_blocks = self._slice_blocks(slobj, axis) + slicer = [slice(None)] * self.ndim + slicer[axis] = slobj + + new_blocks = [make_block(block._slice(slicer), + block.items, + block.ref_items, + klass=block.__class__, + fastpath=True, + placement=block.ref_locs) + for block in self.blocks] bm = self.__class__(new_blocks, new_axes, do_integrity_check=False) bm._consolidate_inplace() return bm - def _slice_blocks(self, slobj, axis): - """ - slice the blocks using the provided slice object - this is only for slicing on axis != 0 - """ - - if axis == 0: - raise AssertionError("cannot _slice_blocks on axis=0") - - slicer = [slice(None, None) for _ in range(self.ndim)] - slicer[axis] = slobj - slicer = tuple(slicer) - is_unique = self.axes[0].is_unique - - def place(block): - if not is_unique: - return block._ref_locs - return None - - return [ make_block(block._slice(slicer), - block.items, - block.ref_items, - klass=block.__class__, - fastpath=True, - placement=place(block) - ) for block in self.blocks ] - def get_series_dict(self): - # For DataFrame - return _blocks_to_series_dict(self.blocks, self.axes[1]) + # FIXME: move to DataFrame class (or delete?) + from pandas.core.series import Series + return {item: Series(blk.values[blk_loc], index=self.axes[1], + name=item) + for item, (blk, blk_loc) in zip(self.items, self._ref_locs)} def __contains__(self, item): return item in self.items @@ -2780,51 +2687,34 @@ def copy(self, deep=True): def as_matrix(self, items=None): if len(self.blocks) == 0: - mat = np.empty(self.shape, dtype=float) - elif len(self.blocks) == 1: - blk = self.blocks[0] - if items is None or blk.items.equals(items): - # if not, then just call interleave per below - mat = blk.get_values() - else: - mat = self.reindex_items(items).as_matrix() + return np.empty(self.shape, dtype=float) + + if items is not None: + mgr = self.reindex_axis(items, axis=0) else: - if items is None: - mat = self._interleave(self.items) - else: - mat = self.reindex_items(items).as_matrix() + mgr = self - return mat + if (len(mgr.blocks) == 1 and + (mgr.blocks[0]._ref_locs is None or + (mgr.blocks[0]._ref_locs == np.arange(mgr.shape[0])).all())): + return mgr.blocks[0].get_values() + else: + return mgr._interleave() - def _interleave(self, items): + def _interleave(self): """ Return ndarray from blocks with specified item order Items must be contained in the blocks """ dtype = _interleaved_dtype(self.blocks) - items = _ensure_index(items) result = np.empty(self.shape, dtype=dtype) - itemmask = np.zeros(len(items), dtype=bool) - - # By construction, all of the item should be covered by one of the - # blocks - if items.is_unique: + itemmask = np.zeros(self.shape[0]) - for block in self.blocks: - indexer = items.get_indexer(block.items) - if (indexer == -1).any(): - raise AssertionError('Items must contain all block items') - result[indexer] = block.get_values(dtype) - itemmask[indexer] = 1 - - else: - - # non-unique, must use ref_locs - rl = self._set_ref_locs() - for i, (block, idx) in enumerate(rl): - result[i] = block.get_values(dtype)[idx] - itemmask[i] = 1 + for blk in self.blocks: + rl = blk.ref_locs + result[rl] = blk.get_values(dtype) + itemmask[rl] = 1 if not itemmask.all(): raise AssertionError('Some items were not contained in blocks') @@ -2892,22 +2782,9 @@ def fast_xs(self, loc, copy=False): result = result.copy() return result, copy - items = self.items - # non-unique (GH4726) - if not items.is_unique: - return self._interleave(items).ravel(), True - - # unique - dtype = _interleaved_dtype(self.blocks) - n = len(items) - result = np.empty(n, dtype=dtype) - for blk in self.blocks: - for j, item in enumerate(blk.items): - i = items.get_loc(item) - result[i] = blk._try_coerce_result(blk.iget((j, loc))) - - return result, True + # FIXME: performance + return self._interleave()[:, loc], True def consolidate(self): """ @@ -2928,110 +2805,85 @@ def _consolidate_inplace(self): if not self.is_consolidated(): self.blocks = _consolidate(self.blocks, self.items) - # reset our mappings - if not self.items.is_unique: - self._ref_locs = None - self._items_map = None - self._set_ref_locs(do_refs=True) - self._is_consolidated = True self._known_consolidated = True self._set_has_sparse() + self._rebuild_ref_locs() def get(self, item): + """ + Return values for selected item (ndarray or BlockManager). + """ if self.items.is_unique: - if isnull(item): + if not isnull(item): + loc = self.items.get_loc(item) + else: indexer = np.arange(len(self.items))[isnull(self.items)] - return self.get_for_nan_indexer(indexer) - _, block = self._find_block(item) - return block.get(item) + # allow a single nan location indexer + if not np.isscalar(indexer): + if len(indexer) == 1: + loc = indexer.item() + else: + raise ValueError("cannot label index with a null key") + + return self.iget(loc) else: if isnull(item): raise ValueError("cannot label index with a null key") - indexer = self.items.get_loc(item) - ref_locs = np.array(self._set_ref_locs()) - - # duplicate index but only a single result - if com.is_integer(indexer): - - b, loc = ref_locs[indexer] - values = [b.iget(loc)] - index = Index([self.items[indexer]]) - - # we have a multiple result, potentially across blocks - else: - - values = [block.iget(i) for block, i in ref_locs[indexer]] - index = self.items[indexer] - - # create and return a new block manager - axes = [index] + self.axes[1:] - blocks = form_blocks(values, index, axes) - mgr = BlockManager(blocks, axes) - mgr._consolidate_inplace() - return mgr + indexer = self.items.get_indexer_for([item]) + return self.reindex_indexer(new_axis=self.items[indexer], + indexer=indexer, axis=0, allow_dups=True) def iget(self, i): - item = self.items[i] - - # unique - if self.items.is_unique: - if notnull(item): - return self.get(item) - return self.get_for_nan_indexer(i) - - ref_locs = self._set_ref_locs() - b, loc = ref_locs[i] - return b.iget(loc) - - def get_for_nan_indexer(self, indexer): - - # allow a single nan location indexer - if not np.isscalar(indexer): - if len(indexer) == 1: - indexer = indexer.item() - else: - raise ValueError("cannot label index with a null key") - - # take a nan indexer and return the values - ref_locs = self._set_ref_locs(do_refs='force') - b, loc = ref_locs[indexer] + b, loc = self._ref_locs[i] return b.iget(loc) def get_scalar(self, tup): """ Retrieve single item """ - item = tup[0] - _, blk = self._find_block(item) - - # this could obviously be seriously sped up in cython - item_loc = blk.items.get_loc(item), - full_loc = item_loc + tuple(ax.get_loc(x) - for ax, x in zip(self.axes[1:], tup[1:])) - return blk.values[full_loc] + full_loc = list(ax.get_loc(x) + for ax, x in zip(self.axes, tup)) + blk, blk_loc = self._ref_locs[full_loc[0]] + full_loc[0] = blk_loc + return blk.values[tuple(full_loc)] def delete(self, item): + """ + Delete selected item (items if non-unique) in-place. + """ + indexer = self.items.get_loc(item) - is_unique = self.items.is_unique - loc = self.items.get_loc(item) + is_deleted = np.zeros(self.shape[0], dtype=np.bool_) + is_deleted[indexer] = True + ref_loc_offset = is_deleted.cumsum() - # dupe keys may return mask - loc = _possibly_convert_to_indexer(loc) - self._delete_from_all_blocks(loc, item) + new_items = self.items[~is_deleted] + new_blocks = [] - # _ref_locs, and _items_map are good here - new_items = self.items.delete(loc) - self.set_items_norename(new_items) + for blk in self.blocks: + brl = blk.ref_locs + blk_del = is_deleted[brl] + blk_del_count = np.count_nonzero(blk_del) - self._known_consolidated = False + if blk_del_count == len(brl): + continue - if not is_unique: - self._consolidate_inplace() + blk._ref_locs -= ref_loc_offset[brl] + blk.ref_items = new_items + if blk_del_count != 0: + blk = blk._getitem_block(~blk_del) + + new_blocks.append(blk) + + self.axes[0] = new_items + self.blocks = new_blocks + self._shape = None + self._rebuild_ref_locs() def set(self, item, value, check=False): """ @@ -3039,428 +2891,250 @@ def set(self, item, value, check=False): contained in the current set of items if check, then validate that we are not setting the same data in-place """ - if not isinstance(value, SparseArray): + # FIXME: refactor, clearly separate broadcasting & zip-like assignment + is_sparse = isinstance(value, SparseArray) + + if is_sparse: + assert self.ndim == 2 + + def value_getitem(locs): + return value + else: if value.ndim == self.ndim - 1: value = value.reshape((1,) + value.shape) + + def value_getitem(locs): + return value + else: + def value_getitem(locs): + return value[locs] if value.shape[1:] != self.shape[1:]: raise AssertionError('Shape of new values must be compatible ' 'with manager shape') - def _set_item(item, arr): - i, block = self._find_block(item) - if not block.should_store(value): - # delete from block, create and append new block - self._delete_from_block(i, item) - self._add_new_block(item, arr, loc=None) - else: - block.set(item, arr, check=check) - try: - loc = self.items.get_loc(item) - if isinstance(loc, int): - _set_item(self.items[loc], value) - else: - subset = self.items[loc] - if len(value) != len(subset): - raise AssertionError( - 'Number of items to set did not match') - - # we are inserting multiple non-unique items as replacements - # we are inserting one by one, so the index can go from unique - # to non-unique during the loop, need to have _ref_locs defined - # at all times - if np.isscalar(item) and (com.is_list_like(loc) or isinstance(loc, slice)): - - # first delete from all blocks - self.delete(item) - - loc = _possibly_convert_to_indexer(loc) - for i, (l, k, arr) in enumerate(zip(loc, subset, value)): + except KeyError: + # This item wasn't present, just insert at end + self.insert(len(self.items), item, value) + return - # insert the item - self.insert( - l, k, arr[None, :], allow_duplicates=True) + if isinstance(loc, int): + loc = [loc] - # reset the _ref_locs on indiviual blocks - # rebuild ref_locs - if self.items.is_unique: - self._reset_ref_locs() - self._set_ref_locs(do_refs='force') + ref_locs = self._ref_locs[loc] - self._rebuild_ref_locs() + unfit_mgr_locs = [] + unfit_val_locs = [] + for blk, blk_locs, val_locs in ref_loc_groupby_block(ref_locs): + if blk.should_store(value): + sliced = value_getitem(val_locs) + if check: + try: + if (blk.values[blk_locs] == sliced).all(): + continue + except Exception: + pass + if is_sparse: + assert blk_locs.tolist() == [0] + blk.values = sliced + else: + blk.values[blk_locs] = sliced + else: + unfit_mgr_locs.append(blk.ref_locs[blk_locs]) + unfit_val_locs.append(val_locs) + new_blk_ref_locs = np.delete(blk.ref_locs, blk_locs, axis=0) + new_blk_len = len(new_blk_ref_locs) + if not new_blk_len: + self.blocks.remove(blk) else: - for i, (item, arr) in enumerate(zip(subset, value)): - _set_item(item, arr[None, :]) - except KeyError: - # insert at end - self.insert(len(self.items), item, value) + blk.values = np.delete(blk.values, blk_locs, axis=0) + blk._ref_locs = new_blk_ref_locs + blk.items = blk.ref_items[blk._ref_locs] + self._ref_locs[new_blk_ref_locs] = \ + lib.fast_zip([np.array([blk] * new_blk_len), + np.arange(new_blk_len)]) + + if unfit_val_locs: + unfit_val_locs = np.concatenate(unfit_val_locs) + unfit_mgr_locs = np.concatenate(unfit_mgr_locs) + unfit_count = len(unfit_val_locs) + + if is_sparse: + for mgr_loc in unfit_mgr_locs: + new_block = make_block(values=value.copy(), + items=self.items[[mgr_loc]], + ref_items=self.items, + ndim=self.ndim, + placement=[mgr_loc]) + self.blocks.append(new_block) + self._ref_locs[mgr_loc] = (new_block, 0) + else: + new_block = make_block(values=value[unfit_val_locs], + items=self.items.take(unfit_mgr_locs), + ref_items=self.items, + ndim=self.ndim, + placement=unfit_mgr_locs) - self._known_consolidated = False + self.blocks.append(new_block) + self._ref_locs[unfit_mgr_locs] = lib.fast_zip([ + np.array([new_block] * unfit_count, dtype=np.object_), + np.arange(unfit_count)]) + + # Newly created block's dtype may already be present. + self._known_consolidated = False def insert(self, loc, item, value, allow_duplicates=False): + """ + Insert item at selected position. + + Parameters + ---------- + loc : int + item : hashable + value : array_like + allow_duplicates: bool + If False, trying to insert non-unique item will raise + """ if not allow_duplicates and item in self.items: # Should this be a different kind of error?? raise ValueError('cannot insert %s, already exists' % item) - try: - new_items = self.items.insert(loc, item) - self.set_items_norename(new_items) + if not isinstance(loc, int): + raise TypeError("loc must be int") - # new block - self._add_new_block(item, value, loc=loc) + new_items = self.items.insert(loc, item) + block = make_block(values=value, items=new_items[[loc]], + ref_items=new_items, ndim=self.ndim, + placement=[loc]) + new_ref_locs = np.insert(self._ref_locs, loc, None, axis=0) + new_ref_locs[loc] = (block, 0) - except: + for blk in self.blocks: + blk._ref_locs[blk._ref_locs >= loc] += 1 + blk.ref_items = new_items - # so our insertion operation failed, so back out of the new items - # GH 3010 - new_items = self.items.delete(loc) - self.set_items_norename(new_items) + self.blocks.append(block) + self.axes[0] = new_items + self._shape = None + self._ref_locs = new_ref_locs - # re-raise - raise + self._known_consolidated = False if len(self.blocks) > 100: self._consolidate_inplace() - self._known_consolidated = False - - # clear the internal ref_loc mappings if necessary - if loc != len(self.items) - 1 and new_items.is_unique: - self.set_items_clear(new_items) - - def set_items_norename(self, value): - self.set_axis(0, value, maybe_rename=False, check_axis=False) - self._shape = None - - def set_items_clear(self, value): - """ clear the ref_locs on all blocks """ - self.set_axis(0, value, maybe_rename='clear', check_axis=False) - - def _delete_from_all_blocks(self, loc, item): - """ delete from the items loc the item - the item could be in multiple blocks which could - change each iteration (as we split blocks) """ - - # possibily convert to an indexer - loc = _possibly_convert_to_indexer(loc) - - if isinstance(loc, (list, tuple, np.ndarray)): - for l in loc: - for i, b in enumerate(self.blocks): - if item in b.items: - self._delete_from_block(i, item) - - else: - i, _ = self._find_block(item) - self._delete_from_block(i, item) - - def _delete_from_block(self, i, item): - """ - Delete and maybe remove the whole block - - Remap the split blocks to there old ranges, - so after this function, _ref_locs and _items_map (if used) - are correct for the items, None fills holes in _ref_locs - """ - block = self.blocks.pop(i) - ref_locs = self._set_ref_locs() - prev_items_map = self._items_map.pop( - block) if ref_locs is not None else None - - # if we can't consolidate, then we are removing this block in its - # entirey - if block._can_consolidate: - - # compute the split mask - loc = block.items.get_loc(item) - if type(loc) == slice or com.is_integer(loc): - mask = np.array([True] * len(block)) - mask[loc] = False - else: # already a mask, inverted - mask = -loc - - # split the block - counter = 0 - for s, e in com.split_ranges(mask): - - sblock = make_block(block.values[s:e], - block.items[s:e].copy(), - block.ref_items, - klass=block.__class__, - fastpath=True) - - self.blocks.append(sblock) - - # update the _ref_locs/_items_map - if ref_locs is not None: - - # fill the item_map out for this sub-block - m = maybe_create_block_in_items_map( - self._items_map, sblock) - for j, itm in enumerate(sblock.items): - - # is this item masked (e.g. was deleted)? - while (True): - - if counter > len(mask) or mask[counter]: - break - else: - counter += 1 - - # find my mapping location - m[j] = prev_items_map[counter] - counter += 1 - - # set the ref_locs in this block - sblock.set_ref_locs(m) + def reindex_axis(self, new_axis, axis, method=None, limit=None, + fill_value=None, copy=True): + mgr = self if not copy else self.copy(deep=True) - # reset the ref_locs to the new structure - if ref_locs is not None: - - # items_map is now good, with the original locations - self._set_ref_locs(do_refs=True) - - # reset the ref_locs based on the now good block._ref_locs - self._reset_ref_locs() - - def _add_new_block(self, item, value, loc=None): - # Do we care about dtype at the moment? - - # hm, elaborate hack? - if loc is None: - loc = self.items.get_loc(item) - new_block = make_block(value, self.items[loc:loc + 1].copy(), - self.items, fastpath=True) - self.blocks.append(new_block) - - # set ref_locs based on the this new block - # and add to the ref/items maps - if not self.items.is_unique: - - # insert into the ref_locs at the appropriate location - # _ref_locs is already long enough, - # but may need to shift elements - new_block.set_ref_locs([0]) - - # need to shift elements to the right - if self._ref_locs[loc] is not None: - for i in reversed(lrange(loc + 1, len(self._ref_locs))): - self._ref_locs[i] = self._ref_locs[i - 1] - - self._ref_locs[loc] = (new_block, 0) - - # and reset - self._reset_ref_locs() - self._set_ref_locs(do_refs=True) - - def _find_block(self, item): - self._check_have(item) - for i, block in enumerate(self.blocks): - if item in block: - return i, block - - def _check_have(self, item): - if item not in self.items: - raise KeyError('no item named %s' % com.pprint_thing(item)) - - def reindex_axis(self, new_axis, indexer=None, method=None, axis=0, - fill_value=None, limit=None, copy=True): new_axis = _ensure_index(new_axis) - cur_axis = self.axes[axis] - - if new_axis.equals(cur_axis): - if copy: - result = self.copy(deep=True) - result.axes[axis] = new_axis - result._shape = None - - if axis == 0: - # patch ref_items, #1823 - for blk in result.blocks: - blk.ref_items = new_axis - - return result - else: - return self - - if axis == 0: - if method is not None or limit is not None: - return self.reindex_axis0_with_method( - new_axis, indexer=indexer, method=method, - fill_value=fill_value, limit=limit, copy=copy - ) - return self.reindex_items(new_axis, indexer=indexer, copy=copy, - fill_value=fill_value) + new_axis, indexer = mgr.axes[axis].reindex( + new_axis, method=method, limit=limit, copy_if_needed=True) - new_axis, indexer = cur_axis.reindex( - new_axis, method, copy_if_needed=True) - return self.reindex_indexer(new_axis, indexer, axis=axis, - fill_value=fill_value) + return mgr.reindex_indexer(new_axis, indexer, axis=axis, + fill_value=fill_value) - def reindex_axis0_with_method(self, new_axis, indexer=None, method=None, - fill_value=None, limit=None, copy=True): - raise AssertionError('method argument not supported for ' - 'axis == 0') - - def reindex_indexer(self, new_axis, indexer, axis=1, fill_value=None, + def reindex_indexer(self, new_axis, indexer, axis, fill_value=None, allow_dups=False): """ pandas-indexer with -1's only. """ # trying to reindex on an axis with duplicates - if not allow_dups and not self.axes[axis].is_unique and len(indexer): + if (not allow_dups and not self.axes[axis].is_unique + and indexer is not None and len(indexer)): raise ValueError("cannot reindex from a duplicate axis") - if not self.is_consolidated(): - self = self.consolidate() - - if axis == 0: - return self._reindex_indexer_items(new_axis, indexer, fill_value) - - new_blocks = [] - for block in self.blocks: - newb = block.reindex_axis( - indexer, axis=axis, fill_value=fill_value) - new_blocks.append(newb) - - new_axes = list(self.axes) - new_axes[axis] = new_axis - return self.__class__(new_blocks, new_axes) + if axis >= self.ndim: + raise AssertionError("Requested axis not found in manager") - def _reindex_indexer_items(self, new_items, indexer, fill_value): - # TODO: less efficient than I'd like - - item_order = com.take_1d(self.items.values, indexer) - new_axes = [new_items] + self.axes[1:] - new_blocks = [] - is_unique = new_items.is_unique + # FIXME: this code comes from generic.py, see if any of that is needed + # elif (baxis == 0 and + # index is not new_data.axes[baxis]): + # new_data = new_data.reindex_items(index, copy=copy, + # fill_value=fill_value) - # we have duplicates in the items and what we are reindexing - if not is_unique and not self.items.is_unique: + # elif (baxis > 0 and index is not None and + # index is not new_data.axes[baxis]): + # new_data = new_data.copy(deep=copy) + # new_data.set_axis(baxis, index) - rl = self._set_ref_locs(do_refs='force') - for i, idx in enumerate(indexer): - item = new_items.take([i]) - if idx >= 0: - blk, lidx = rl[idx] - blk = make_block(_block_shape(blk.iget(lidx)), item, - new_items, ndim=self.ndim, fastpath=True, - placement=[i]) - - # a missing value - else: - blk = self._make_na_block(item, - new_items, - placement=[i], - fill_value=fill_value) - new_blocks.append(blk) - new_blocks = _consolidate(new_blocks, new_items) - - - # keep track of what items aren't found anywhere + if axis == 0: + new_blocks = self._get_blocks_for_items_indexer(new_axis, indexer, + fill_value) else: - l = np.arange(len(item_order)) - mask = np.zeros(len(item_order), dtype=bool) - - for blk in self.blocks: - blk_indexer = blk.items.get_indexer(item_order) - selector = blk_indexer != -1 - - # update with observed items - mask |= selector - - if not selector.any(): - continue - - new_block_items = new_items.take(selector.nonzero()[0]) - new_values = com.take_nd(blk.values, blk_indexer[selector], axis=0, - allow_fill=False) - placement = l[selector] if not is_unique else None - new_blocks.append(make_block(new_values, - new_block_items, - new_items, - placement=placement, - fastpath=True)) - - if not mask.all(): - na_items = new_items[-mask] - placement = l[-mask] if not is_unique else None - na_block = self._make_na_block(na_items, - new_items, - placement=placement, - fill_value=fill_value) - new_blocks.append(na_block) - new_blocks = _consolidate(new_blocks, new_items) + # TODO: is this faster than blk.reindex_axis? + # return self.apply('take', + # axes=new_axes, + # indexer=indexer, + # ref_items=new_axes[0], + # new_axis=new_axes[axis], + # axis=axis) + new_blocks = [blk.reindex_axis(indexer, axis=axis, + fill_value=fill_value) + for blk in self.blocks] + new_axes = list(self.axes) + new_axes[axis] = new_axis return self.__class__(new_blocks, new_axes) - def reindex_items(self, new_items, indexer=None, copy=True, - fill_value=None): - """ - + def _get_blocks_for_items_indexer(self, new_items, indexer, fill_value): """ - new_items = _ensure_index(new_items) - data = self - if not data.is_consolidated(): - data = data.consolidate() - return data.reindex_items(new_items, copy=copy, - fill_value=fill_value) + Reindex blocks at axis=0 (overloaded for SingleBlockManager). - if indexer is None: - new_items, indexer = self.items.reindex(new_items, - copy_if_needed=True) - new_axes = [new_items] + self.axes[1:] + Returns + ------- + new_blocks : list of Block - # could have so me pathological (MultiIndex) issues here + """ + # fill_value[0] == None will group soon-to-be-added items under None + # fill_value[1] is an arbitrary integer (it's ignored) + new_ref_locs = com.take_1d(self._ref_locs, indexer, + fill_value=(None, 0)) new_blocks = [] - if indexer is None: - for blk in self.blocks: - if copy: - blk = blk.reindex_items_from(new_items) - else: - blk.ref_items = new_items - new_blocks.extend(_valid_blocks(blk)) - else: - - # unique - if self.axes[0].is_unique and new_items.is_unique: - - # ok to use the global indexer if only 1 block - i = indexer if len(self.blocks) == 1 else None - - for block in self.blocks: - blk = block.reindex_items_from(new_items, indexer=i, copy=copy) - new_blocks.extend(_valid_blocks(blk)) - - # non-unique + for blk, blk_locs, mgr_locs in ref_loc_groupby_block(new_ref_locs): + if blk is None: + new_blocks.append(self._make_na_block( + items=new_items[mgr_locs], ref_items=new_items, + placement=mgr_locs, fill_value=fill_value)) else: - rl = self._set_ref_locs(do_refs='force') - for i, idx in enumerate(indexer): - blk, lidx = rl[idx] - item = new_items.take([i]) - blk = make_block(_block_shape(blk.iget(lidx)), item, - new_items, ndim=self.ndim, fastpath=True, - placement=[i]) - new_blocks.append(blk) - - # add a na block if we are missing items - mask = indexer == -1 - if mask.any(): - extra_items = new_items[mask] - na_block = self._make_na_block(extra_items, new_items, - fill_value=fill_value) - new_blocks.append(na_block) - new_blocks = _consolidate(new_blocks, new_items) - - # consolidate - # import for non-unique which creates a block for each item - # and they must be consolidated before passing on - new_blocks = _consolidate(new_blocks, new_items) + # Otherwise, slicing along items axis is necessary. + if blk.is_sparse: + # If it's a sparse block, it's easy: + # + # - it can only contain 1 item + # - if blk is here, the item wasn't deleted + # - if blk wasn't handled above, the item is multiplied + # + # Hence the block is replicated. + for mgr_loc in mgr_locs: + newblk = blk.copy(deep=True, ref_items=new_items) + newblk._ref_locs = np.array([mgr_loc]) + newblk.items = new_items[newblk._ref_locs] + new_blocks.append(newblk) - return self.__class__(new_blocks, new_axes) + else: + # FIXME: this hack makes sure post-reindex blocks enumerate + # manager locs in ascending order. It was implemented to + # make pytables serialization test happy and should be + # removed once the codebase successfully switches to + # axis-oblivious blocks & blockmanagers. + order = np.argsort(mgr_locs) + blk_locs = blk_locs.take(order) + mgr_locs = mgr_locs.take(order) + + new_values = blk.values[blk_locs] + newblk = blk.__class__(values=new_values, + items=new_items[mgr_locs], + ref_items=new_items, + ndim=blk.ndim, + fastpath=True, + placement=mgr_locs,) + new_blocks.append(newblk) + + return new_blocks def _make_na_block(self, items, ref_items, placement=None, fill_value=None): @@ -3476,71 +3150,46 @@ def _make_na_block(self, items, ref_items, placement=None, block_values.fill(fill_value) return make_block(block_values, items, ref_items, placement=placement) - def take(self, indexer, new_index=None, axis=1, verify=True): - if axis < 1: - raise AssertionError('axis must be at least 1, got %d' % axis) - + def take(self, indexer, axis=1, verify=True, convert=True): + """ + Take items along any axis. + """ self._consolidate_inplace() - if isinstance(indexer, list): - indexer = np.array(indexer) + indexer = np.asanyarray(indexer, dtype=np.int_) - indexer = com._ensure_platform_int(indexer) - n = len(self.axes[axis]) + n = self.shape[axis] + if convert: + indexer = _maybe_convert_indices(indexer, n) if verify: - indexer = _maybe_convert_indices(indexer, n) if ((indexer == -1) | (indexer >= n)).any(): raise Exception('Indices must be nonzero and less than ' 'the axis length') - new_axes = list(self.axes) - if new_index is None: - new_index = self.axes[axis].take(indexer) - - new_axes[axis] = new_index - return self.apply('take', - axes=new_axes, - indexer=indexer, - ref_items=new_axes[0], - new_axis=new_axes[axis], - axis=axis) - - def merge(self, other, lsuffix=None, rsuffix=None): + new_labels = self.axes[axis].take(indexer) + return self.reindex_indexer(new_axis=new_labels, indexer=indexer, + axis=axis, allow_dups=True) + + def merge(self, other, lsuffix='', rsuffix=''): if not self._is_indexed_like(other): raise AssertionError('Must have same axes to merge managers') - this, other = self._maybe_rename_join(other, lsuffix, rsuffix) - - cons_items = this.items + other.items - new_axes = list(this.axes) - new_axes[0] = cons_items + l, r = items_overlap_with_suffix(left=self.items, lsuffix=lsuffix, + right=other.items, rsuffix=rsuffix) + new_items = _concat_indexes([l, r]) - consolidated = _consolidate(this.blocks + other.blocks, cons_items) - return self.__class__(consolidated, new_axes) - - def _maybe_rename_join(self, other, lsuffix, rsuffix, copydata=True): - to_rename = self.items.intersection(other.items) - if len(to_rename) > 0: - if not lsuffix and not rsuffix: - raise ValueError('columns overlap but no suffix specified: %s' - % to_rename) - - def lrenamer(x): - if x in to_rename: - return '%s%s' % (x, lsuffix) - return x - - def rrenamer(x): - if x in to_rename: - return '%s%s' % (x, rsuffix) - return x + new_blocks = [] + for blocks, offset in [(self.blocks, 0), + (other.blocks, self.shape[0])]: + for blk in blocks: + blk = blk.copy(deep=False, ref_items=new_items) + blk._ref_locs += offset + new_blocks.append(blk) - this = self.rename_items(lrenamer, copy=copydata) - other = other.rename_items(rrenamer, copy=copydata) - else: - this = self + new_axes = list(self.axes) + new_axes[0] = new_items - return this, other + return self.__class__(_consolidate(new_blocks, new_items), new_axes) def _is_indexed_like(self, other): """ @@ -3554,82 +3203,39 @@ def _is_indexed_like(self, other): return False return True - def rename(self, mapper, axis, copy=False): - """ generic rename """ + def rename_axis(self, mapper, axis, copy=True): + """ + Rename one of axes. - if axis == 0: - return self.rename_items(mapper, copy=copy) - return self.rename_axis(mapper, axis=axis) + Parameters + ---------- + mapper : unary callable + axis : int + copy : boolean, default True - def rename_axis(self, mapper, axis=1): + """ + new_axis = _transform_index(self.axes[axis], mapper) - index = self.axes[axis] - if isinstance(index, MultiIndex): - new_axis = MultiIndex.from_tuples( - [tuple(mapper(y) for y in x) for x in index], - names=index.names) + if axis != 0: + new_blocks = self.blocks else: - new_axis = Index([mapper(x) for x in index], name=index.name) - - if not new_axis.is_unique: - raise AssertionError('New axis must be unique to rename') + new_blocks = [] + for block in self.blocks: + newb = block.copy(deep=copy, ref_items=new_axis) + newb.items = new_axis[newb.ref_locs] + new_blocks.append(newb) new_axes = list(self.axes) new_axes[axis] = new_axis - return self.__class__(self.blocks, new_axes) - - def rename_items(self, mapper, copy=True): - if isinstance(self.items, MultiIndex): - items = [tuple(mapper(y) for y in x) for x in self.items] - new_items = MultiIndex.from_tuples(items, names=self.items.names) - else: - items = [mapper(x) for x in self.items] - new_items = Index(items, name=self.items.name) - - new_blocks = [] - for block in self.blocks: - newb = block.copy(deep=copy) - newb.set_ref_items(new_items, maybe_rename=True) - new_blocks.append(newb) - new_axes = list(self.axes) - new_axes[0] = new_items return self.__class__(new_blocks, new_axes) def add_prefix(self, prefix): f = (('%s' % prefix) + '%s').__mod__ - return self.rename_items(f) + return self.rename_axis(f, axis=0) def add_suffix(self, suffix): f = ('%s' + ('%s' % suffix)).__mod__ - return self.rename_items(f) - - @property - def block_id_vector(self): - # TODO - result = np.empty(len(self.items), dtype=int) - result.fill(-1) - - for i, blk in enumerate(self.blocks): - indexer = self.items.get_indexer(blk.items) - if (indexer == -1).any(): - raise AssertionError('Block items must be in manager items') - result.put(indexer, i) - - if (result < 0).any(): - raise AssertionError('Some items were not in any block') - return result - - @property - def item_dtypes(self): - result = np.empty(len(self.items), dtype='O') - mask = np.zeros(len(self.items), dtype=bool) - for i, blk in enumerate(self.blocks): - indexer = self.items.get_indexer(blk.items) - result.put(indexer, blk.dtype.name) - mask.put(indexer, 1) - if not (mask.all()): - raise AssertionError('Some items were not in any block') - return result + return self.rename_axis(f, axis=0) def equals(self, other): self_axes, other_axes = self.axes, other.axes @@ -3642,6 +3248,18 @@ def equals(self, other): return all(block.equals(oblock) for block, oblock in zip(self.blocks, other.blocks)) + def group_blocks_by_ftype(self): + """ + Combine blocks into map: ftype -> [blk0, blk1, ...]. + + """ + bm = defaultdict(list) + for b in self.blocks: + bm[str(b.ftype)].append(b) + return bm + + + class SingleBlockManager(BlockManager): """ manage a single block with """ @@ -3651,6 +3269,12 @@ class SingleBlockManager(BlockManager): __slots__ = ['axes', 'blocks', '_block', '_values', '_shape', '_has_sparse'] + # @property + # def _ref_locs(self): + # # FIXME: improve performance + # return lib.fast_zip([np.array([self._block] * self.shape[0]), + # np.arange(self.shape[0])]) + def __init__(self, block, axis, do_integrity_check=False, fastpath=True): if isinstance(axis, list): @@ -3696,22 +3320,21 @@ def __init__(self, block, axis, do_integrity_check=False, fastpath=True): block = make_block(block, axis, axis, ndim=1, fastpath=True) self.blocks = [block] - self._block = self.blocks[0] - self._values = self._block.values - self._has_sparse = self._block.is_sparse def _post_setstate(self): - self._block = self.blocks[0] - self._values = self._block.values + pass + + @property + def _block(self): + return self.blocks[0] - def _get_counts(self, f): - return { f(self._block) : 1 } + @property + def _values(self): + return self._block.values @property - def shape(self): - if getattr(self, '_shape', None) is None: - self._shape = tuple([len(self.axes[0])]) - return self._shape + def _has_sparse(self): + return self._block_is_sparse def apply(self, f, axes=None, do_integrity_check=False, **kwargs): """ @@ -3744,11 +3367,6 @@ def _reindex_indexer_items(self, new_items, indexer, fill_value): return self.reindex(new_items, indexer=indexer, fill_value=fill_value, copy=False) - def reindex_axis0_with_method(self, new_axis, indexer=None, method=None, - fill_value=None, limit=None, copy=True): - return self.reindex(new_axis, indexer=indexer, method=method, - fill_value=fill_value, limit=limit, copy=copy) - def _delete_from_block(self, i, item): super(SingleBlockManager, self)._delete_from_block(i, item) @@ -3766,17 +3384,11 @@ def _delete_from_block(self, i, item): block = make_block(np.array([], dtype=self._block.dtype), [], []) self.blocks = [block] - self._block = block - self._values = self._block.values def get_slice(self, slobj): return self.__class__(self._block._slice(slobj), self.index[slobj], fastpath=True) - def set_axis(self, axis, value, maybe_rename=True, check_axis=True): - cur_axis, value = self._set_axis(axis, value, check_axis) - self._block.set_ref_items(self.items, maybe_rename=maybe_rename) - def set_ref_items(self, ref_items, maybe_rename=True): """ we can optimize and our ref_locs are always equal to ref_items """ if maybe_rename: @@ -3800,6 +3412,12 @@ def dtype(self): def ftype(self): return self._block.ftype + def get_dtypes(self): + return [self._block.dtype] + + def get_ftypes(self): + return [self._block.ftype] + @property def values(self): return self._values.view() @@ -3821,12 +3439,44 @@ def _consolidate_check(self): def _consolidate_inplace(self): pass + def delete(self, item): + """ + Delete single item from SingleBlockManager. + + Ensures that self.blocks doesn't become empty. + """ + # Also, make sure dtype is preserved. + dtype = self._block.dtype + + super(SingleBlockManager, self).delete(item) + + if not self.blocks: + self.blocks = [make_block(np.empty(0, dtype=dtype), + self.items, self.items, + ndim=1, dtype=dtype, fastpath=True)] + def fast_xs(self, loc, copy=False): """ fast path for getting a cross-section """ - result = self._block.values[loc] - return result, False + return self._values[loc], False + + def _get_blocks_for_items_indexer(self, new_items, indexer, fill_value): + """ + Reindex blocks at axis=0 (overloaded for SingleBlockManager). + + Returns + ------- + new_blocks : list of Block + + """ + new_values = com.take_1d(self._values, indexer, + fill_value=fill_value) + + return [make_block(values=new_values, + items=new_items, ref_items=new_items, + ndim=self.ndim, fastpath=True)] + def construction_error(tot_items, block_shape, axes, e=None): """ raise a helpful message about our construction """ @@ -4069,17 +3719,6 @@ def _shape_compat(x): return items, stacked, placement -def _blocks_to_series_dict(blocks, index=None): - from pandas.core.series import Series - - series_dict = {} - - for block in blocks: - for item, vec in zip(block.items, block.values): - series_dict[item] = Series(vec, index=index, name=item) - return series_dict - - def _interleaved_dtype(blocks): if not len(blocks): return None @@ -4160,13 +3799,6 @@ def _consolidate(blocks, items): return new_blocks -def _valid_blocks(newb): - if newb is None: - return [] - if not isinstance(newb, list): - newb = [ newb ] - return [ b for b in newb if len(b.items) > 0 ] - def _merge_blocks(blocks, items, dtype=None, _can_consolidate=True): if len(blocks) == 1: return blocks[0] @@ -4178,22 +3810,15 @@ def _merge_blocks(blocks, items, dtype=None, _can_consolidate=True): raise AssertionError("_merge_blocks are invalid!") dtype = blocks[0].dtype - if not items.is_unique: - blocks = sorted(blocks, key=lambda b: b.ref_locs.tolist()) - + new_ref_locs = np.concatenate([b.ref_locs for b in blocks]) new_values = _vstack([b.values for b in blocks], dtype) - new_items = blocks[0].items.append([b.items for b in blocks[1:]]) - new_block = make_block(new_values, new_items, items) - # unique, can reindex - if items.is_unique: - return new_block.reindex_items_from(items) + argsort = np.argsort(new_ref_locs) + new_values = new_values[argsort] + new_ref_locs = new_ref_locs[argsort] - # merge the ref_locs - new_ref_locs = [b._ref_locs for b in blocks] - if all([x is not None for x in new_ref_locs]): - new_block.set_ref_locs(np.concatenate(new_ref_locs)) - return new_block + return make_block(new_values, items[new_ref_locs], items, + fastpath=True, placement=new_ref_locs) # no merge return blocks @@ -4297,16 +3922,16 @@ def __init__(self, data_list, join_index, indexers, copy=True): "length") self.units = [] + placement_offset = 0 for data, indexer in zip(data_list, indexers): if not data.is_consolidated(): data = data.consolidate() - data._set_ref_locs() - self.units.append(_JoinUnit(data.blocks, indexer)) + self.units.append(_JoinUnit(data.blocks, indexer, placement_offset)) + placement_offset += data.shape[0] self.join_index = join_index self.axis = 1 self.copy = copy - self.offsets = None # do NOT sort self.result_items = _concat_indexes([d.items for d in data_list]) @@ -4319,106 +3944,98 @@ def _prepare_blocks(self): for unit in self.units: join_blocks = unit.get_upcasted_blocks() + type_map = {} for blk in join_blocks: type_map.setdefault(blk.ftype, []).append(blk) + blockmaps.append((unit, type_map)) return blockmaps def get_result(self): """ + Perform join. + Returns ------- merged : BlockManager - """ - blockmaps = self._prepare_blocks() - kinds = _get_merge_block_kinds(blockmaps) + """ # maybe want to enable flexible copying <-- what did I mean? - kind_blocks = [] - for klass in kinds: - klass_blocks = [] - for unit, mapping in blockmaps: - if klass in mapping: - klass_blocks.extend((unit, b) for b in mapping[klass]) - # blocks that we are going to merge - kind_blocks.append(klass_blocks) + # Upcast & consolidate join units. + # + # blockmaps is [(unit: {ftype: [blk,...], ...}),...] + blockmaps = self._prepare_blocks() - # create the merge offsets, essentially where the resultant blocks go in the result - if not self.result_items.is_unique: + result_blocks = [] - # length of the merges for each of the klass blocks - self.offsets = np.zeros(len(blockmaps)) - for kb in kind_blocks: - kl = list(b.get_merge_length() for unit, b in kb) - self.offsets += np.array(kl) + # For each distinct ftype, merge blocks of that ftype and consolidate + # them to one result block. + unique_ftypes = set(k for unit, ftype_map in blockmaps + for k in ftype_map.keys()) + for ftype in unique_ftypes: + # Unit must accompany its blocks to the very end, because it + # contains reindexing and placement information. + unit_blocks_to_merge = [(unit, blk) + for unit, mapping in blockmaps + for blk in mapping.get(ftype, [])] - # merge the blocks to create the result blocks - result_blocks = [] - for klass_blocks in kind_blocks: - res_blk = self._get_merged_block(klass_blocks) - result_blocks.append(res_blk) + result_blocks.append(self._get_merged_block(unit_blocks_to_merge)) return BlockManager(result_blocks, self.result_axes) def _get_merged_block(self, to_merge): + """Merge blocks of same ftype.""" if len(to_merge) > 1: - - # placement set here return self._merge_blocks(to_merge) else: + # Only one block exists for given ftype, no merging is necessary, + # reindex it and return the result. unit, block = to_merge[0] - blk = unit.reindex_block(block, self.axis, - self.result_items, copy=self.copy) - - # set placement / invalidate on a unique result - if self.result_items.is_unique and blk._ref_locs is not None: - if not self.copy: - blk = blk.copy() - blk.set_ref_locs(None) - - return blk + return unit.reindex_block(block, self.axis, + self.result_items, copy=self.copy) def _merge_blocks(self, merge_chunks): """ - merge_chunks -> [(_JoinUnit, Block)] + Merge multiple blocks of same ftype. + + Performs reindexing & consolidating of data. + + Parameters + ---------- + merge_chunks : list of (_JoinUnit, Block)-tuples + """ funit, fblock = merge_chunks[0] fidx = funit.indexer out_shape = list(fblock.get_values().shape) - - n = len(fidx) if fidx is not None else out_shape[self.axis] - - merge_lengths = list(blk.get_merge_length() for unit, blk in merge_chunks) - out_shape[0] = sum(merge_lengths) - out_shape[self.axis] = n + # Collect items along "merge" axis. + out_shape[0] = sum(blk.shape[0] for unit, blk in merge_chunks) + # Extend (or reduce) shape along "alignment" axis if reindexing will + # take place. + if fidx is not None: + out_shape[self.axis] = len(fidx) # Should use Fortran order?? block_dtype = _get_block_dtype([x[1] for x in merge_chunks]) out = np.empty(out_shape, dtype=block_dtype) sofar = 0 + placements = [] for unit, blk in merge_chunks: out_chunk = out[sofar: sofar + len(blk)] - com.take_nd(blk.get_values(), unit.indexer, self.axis, out=out_chunk) + com.take_nd(blk.get_values(), unit.indexer, self.axis, + out=out_chunk, mask_info=unit.mask_info) + placements.append(blk.ref_locs + unit.placement_offset) sofar += len(blk) - # does not sort - new_block_items = _concat_indexes([b.items for _, b in merge_chunks]) + new_placement = np.concatenate(placements) - # need to set placement if we have a non-unique result - # calculate by the existing placement plus the offset in the result set - placement = None - if not self.result_items.is_unique: - placement = [] - offsets = np.append(np.array([0]),self.offsets.cumsum()[:-1]) - for (unit, blk), offset in zip(merge_chunks,offsets): - placement.extend(blk.ref_locs+offset) - - return make_block(out, new_block_items, self.result_items, placement=placement) + return make_block(out, self.result_items[new_placement], + self.result_items, placement=new_placement) class _JoinUnit(object): @@ -4426,9 +4043,10 @@ class _JoinUnit(object): Blocks plus indexer """ - def __init__(self, blocks, indexer): + def __init__(self, blocks, indexer, placement_offset): self.blocks = blocks self.indexer = indexer + self.placement_offset = placement_offset @cache_readonly def mask_info(self): @@ -4447,11 +4065,17 @@ def get_upcasted_blocks(self): def reindex_block(self, block, axis, ref_items, copy=True): if self.indexer is None: - result = block.copy() if copy else block + # If no reindexing is necessary, block metadata should still be + # copied (or otherwise one block would belong to two dataframes + # with different items, input and output of the join). + result = block.copy(deep=copy) else: result = block.reindex_axis(self.indexer, axis=axis, mask_info=self.mask_info) + + result._ref_locs = result.ref_locs + self.placement_offset result.ref_items = ref_items + return result @@ -4459,13 +4083,6 @@ def _concat_indexes(indexes): return indexes[0].append(indexes[1:]) -def _get_merge_block_kinds(blockmaps): - kinds = set() - for _, mapping in blockmaps: - kinds |= set(mapping) - return kinds - - def _get_block_dtype(blocks): if len(blocks) == 0: return object @@ -4480,7 +4097,7 @@ def _get_block_dtype(blocks): return dtype -def concat_blocks(blocks, axis, new_axes, objs): +def _concat_blocks(blocks, axis, new_axes, offsets): values_list = [b.get_values() for b in blocks if b is not None] concat_values = com._concat_compat(values_list, axis=axis) @@ -4497,25 +4114,15 @@ def concat_blocks(blocks, axis, new_axes, objs): new_axes[0], placement=blocks[0]._ref_locs) else: - offsets = np.r_[0, np.cumsum([len(x._data.axes[0]) for - x in objs])] - indexer = np.concatenate([offsets[i] + b.ref_locs - for i, b in enumerate(blocks) - if b is not None]) - - concat_items = new_axes[0].take(indexer) - block = make_block(concat_values, concat_items, new_axes[0]) - - # we need to set the ref_locs in this block so we have the mapping - # as we now have a non-unique index across dtypes, and we need to - # map the column location to the block location - # GH3602 - if not new_axes[0].is_unique: - block.set_ref_locs(indexer) - - return block + new_ref_locs = np.concatenate([offsets[i] + b.ref_locs + for i, b in enumerate(blocks) + if b is not None]) + concat_items = new_axes[0].take(new_ref_locs) + block = make_block(concat_values, concat_items, new_axes[0], + placement=new_ref_locs) + return block def _get_reindexed_data(objs, new_axes, axis): @@ -4551,8 +4158,8 @@ def _prepare_blocks(objs, new_axes, axis): blockmaps = [] for data in reindexed_data: data = data.consolidate() - data._set_ref_locs() - blockmaps.append(data.get_block_map(typ='dict')) + + blockmaps.append(data.group_blocks_by_ftype()) return blockmaps, reindexed_data @@ -4642,6 +4249,9 @@ def get_concatenated_data(objs, new_axes, axis): blockmaps, rdata = _prepare_blocks(objs, new_axes, axis) kinds = _get_all_block_kinds(blockmaps) + offsets = np.r_[0, np.cumsum([len(x._data.axes[axis]) for + x in objs])] + try: # need to conform to same other (joined) axes for block join new_blocks = [] @@ -4652,9 +4262,8 @@ def get_concatenated_data(objs, new_axes, axis): if l is None: l = [ None ] klass_blocks.extend(l) - stacked_block = concat_blocks( - blocks=klass_blocks, axis=axis, - new_axes=new_axes, objs=objs) + stacked_block = _concat_blocks(blocks=klass_blocks, axis=axis, + new_axes=new_axes, offsets=offsets) new_blocks.append(stacked_block) for blk in new_blocks: @@ -4670,7 +4279,8 @@ def get_concatenated_data(objs, new_axes, axis): if axis == 0: # pragma: no cover raise - new_data = {} + # Let's not lose column order here. + new_data = OrderedDict() for item in new_axes[0]: new_data[item] = _concat_single_item(objs, rdata, item, axis) @@ -4682,3 +4292,131 @@ def _get_all_block_kinds(blockmaps): for mapping in blockmaps: kinds |= set(mapping) return kinds + + +def _invert_reordering(reordering, minlength=None): + """ + Invert reordering operation. + + Given array `reordering`, make `reordering_inv` of it, such that:: + + reordering_inv[reordering[x]] = x + + There are two types of indexers: + + source + is when element *s* at position *i* means that values to fill *i-th* + item of reindex operation should be taken from *s-th* item of the + original (this is what is returned by `pandas.Index.reindex`). + destination + is when element *d* at position *i* means that values from *i-th* item + of source should be used to fill *d-th* item of reindexing operation. + + This function will convert from *source* to *destination* and vice-versa. + + .. note:: trailing ``-1`` may be lost upon conversion (this is what + `minlength` is there for). + + .. note:: if *source* indexer is not unique, corresponding *destination* + indexer will have ``dtype=object`` and will contain lists. + + Examples: + + >>> _invert_reordering([3, -1, 2, 4, -1]) + array([-1, -1, 2, 0, 3]) + >>> _invert_reordering([-1, -1, 0, 2, 3]) + array([3, -1, 2, 4]) + >>> _invert_reordering([1,3,5]) + array([-1, 0, -1, 1, -1, 2]) + + """ + reordering = np.asanyarray(reordering) + if not com.is_integer_dtype(reordering): + raise ValueError("Only integer indexers are supported") + + nonneg_indices = reordering[reordering >= 0] + counts = np.bincount(nonneg_indices, minlength=minlength) + has_non_unique = (counts > 1).any() + + dtype = np.dtype(np.object_) if has_non_unique else np.dtype(np.int_) + inverted = np.empty_like(counts, dtype=dtype) + inverted.fill(-1) + + nonneg_positions = np.arange(len(reordering), dtype=np.int_)[reordering >= 0] + np.put(inverted, nonneg_indices, nonneg_positions) + + if has_non_unique: + nonunique_elements = np.arange(len(counts))[counts > 1] + for elt in nonunique_elements: + inverted[elt] = nonneg_positions[nonneg_indices == elt].tolist() + + return inverted + + +def ref_loc_groupby_block(ref_locs): + """ + Group given ref_locs by block. + + Returns + ------- + iterator + Yield (block, block_locs, original_locs) + + """ + if len(ref_locs) == 0: + return + + sort_order = np.argsort(ref_locs) + sorted_rl = ref_locs[sort_order] + blocks = lib.map_infer(sorted_rl, operator.itemgetter(0)) + indices = lib.map_infer(sorted_rl, operator.itemgetter(1)) + + sep_pos = np.r_[0, (blocks[1:] != blocks[:-1]).nonzero()[0] + 1, len(blocks)] + start = sep_pos[0] + for end in sep_pos[1:]: + yield blocks[start], indices[start:end], sort_order[start:end] + start = end + + +def items_overlap_with_suffix(left, lsuffix, right, rsuffix): + """ + If two indices overlap, add suffixes to overlapping entries. + + If corresponding suffix is empty, the entry is simply converted to string. + + """ + to_rename = left.intersection(right) + if len(to_rename) == 0: + return left, right + else: + if not lsuffix and not rsuffix: + raise ValueError('columns overlap but no suffix specified: %s' % + to_rename) + + def lrenamer(x): + if x in to_rename: + return '%s%s' % (x, lsuffix) + return x + + def rrenamer(x): + if x in to_rename: + return '%s%s' % (x, rsuffix) + return x + + return (_transform_index(left, lrenamer), + _transform_index(right, rrenamer)) + + +def _transform_index(index, func): + """ + Apply function to all values found in index. + + This includes transforming multiindex entries separately. + + """ + if isinstance(index, MultiIndex): + items = [tuple(func(y) for y in x) for x in index] + return MultiIndex.from_tuples(items, names=index.names) + else: + items = [func(x) for x in index] + return Index(items, name=index.name) diff --git a/pandas/tests/test_frame.py b/pandas/tests/test_frame.py index 1bbcba0e4caad..66ec2fd42dc38 100644 --- a/pandas/tests/test_frame.py +++ b/pandas/tests/test_frame.py @@ -106,7 +106,6 @@ class CheckIndexing(object): def test_getitem(self): # slicing - sl = self.frame[:20] self.assertEqual(20, len(sl.index)) @@ -120,7 +119,7 @@ def test_getitem(self): self.assertIsNotNone(self.frame[key]) self.assertNotIn('random', self.frame) - with assertRaisesRegexp(KeyError, 'no item named random'): + with assertRaisesRegexp(KeyError, 'random'): self.frame['random'] df = self.frame.copy() @@ -3307,7 +3306,6 @@ def test_column_dups2(self): assert_frame_equal(result, expected) def test_column_dups_indexing(self): - def check(result, expected=None): if expected is not None: assert_frame_equal(result,expected) @@ -9711,7 +9709,7 @@ def test_reorder_levels(self): assert_frame_equal(result, expected) def test_sort_index(self): - frame = DataFrame(np.random.randn(4, 4), index=[1, 2, 3, 4], + frame = DataFrame(np.arange(16).reshape(4, 4), index=[1, 2, 3, 4], columns=['A', 'B', 'C', 'D']) # axis=0 @@ -11695,7 +11693,7 @@ def test_columns_with_dups(self): df_dt = DataFrame(Timestamp('20010101'),index=df_float.index,columns=df_float.columns) df = pd.concat([ df_float, df_int, df_bool, df_object, df_dt ], axis=1) - result = df._data._set_ref_locs() + result = df._data._ref_locs self.assertEqual(len(result), len(df.columns)) # testing iget @@ -12630,7 +12628,6 @@ def check_query_with_unnamed_multiindex(self, parser, engine): df = DataFrame(randn(10, 2), index=index) ind = Series(df.index.get_level_values(0).values, index=index) - #import ipdb; ipdb.set_trace() res1 = df.query('ilevel_0 == "red"', parser=parser, engine=engine) res2 = df.query('"red" == ilevel_0', parser=parser, engine=engine) exp = df[ind == 'red'] diff --git a/pandas/tests/test_internals.py b/pandas/tests/test_internals.py index 2c9c8a94a1902..f5cfd2b39924f 100644 --- a/pandas/tests/test_internals.py +++ b/pandas/tests/test_internals.py @@ -271,6 +271,7 @@ def test_is_indexed_like(self): self.assertFalse(self.mgr._is_indexed_like(mgr2)) def test_block_id_vector_item_dtypes(self): + raise nose.SkipTest('block_id_vector is being removed') expected = [0, 1, 0, 1, 0, 2, 3, 4] result = self.mgr.block_id_vector assert_almost_equal(expected, result) @@ -302,7 +303,7 @@ def test_duplicate_item_failure(self): b._ref_locs = None mgr._ref_locs = None mgr._items_map = None - self.assertRaises(AssertionError, mgr._set_ref_locs, do_refs=True) + self.assertRaises(AssertionError, mgr._rebuild_ref_locs) def test_contains(self): self.assertIn('a', self.mgr) @@ -328,9 +329,6 @@ def test_pickle(self): self.assertFalse(mgr2._is_consolidated) self.assertFalse(mgr2._known_consolidated) - def test_get(self): - pass - def test_get_scalar(self): for item in self.mgr.items: for i, index in enumerate(self.mgr.axes[1]): @@ -338,8 +336,37 @@ def test_get_scalar(self): exp = self.mgr.get(item)[i] assert_almost_equal(res, exp) + def test_get(self): + cols = Index(list('abc')) + values = np.random.rand(3, 3) + block = make_block(values=values.copy(), items=cols, ref_items=cols) + mgr = BlockManager(blocks=[block], axes=[cols, np.arange(3)]) + + assert_almost_equal(mgr.get('a'), values[0]) + assert_almost_equal(mgr.get('b'), values[1]) + assert_almost_equal(mgr.get('c'), values[2]) + def test_set(self): - pass + cols = Index(list('abc')) + values = np.c_[[[1, 2, 3]] * 3] + block = make_block(values=values.copy(), items=cols, ref_items=cols) + mgr = BlockManager(blocks=[block], axes=[cols, np.arange(3)]) + + mgr.set('d', np.array(['foo'] * 3)) + mgr.set('b', np.array(['bar'] * 3)) + assert_almost_equal(mgr.get('a'), [1, 2, 3]) + assert_almost_equal(mgr.get('b'), ['bar'] * 3) + assert_almost_equal(mgr.get('c'), [1, 2, 3]) + assert_almost_equal(mgr.get('d'), ['foo'] * 3) + + def test_insert(self): + self.mgr.insert(0, 'inserted', np.arange(N)) + + self.assertEqual(self.mgr.items[0], 'inserted') + assert_almost_equal(self.mgr.get('inserted'), np.arange(N)) + + for blk in self.mgr.blocks: + yield self.assertIs, self.mgr.items, blk.ref_items def test_set_change_dtype(self): self.mgr.set('baz', np.zeros(N, dtype=bool)) @@ -525,7 +552,7 @@ def _check_cols(before, after, cols): # not consolidated vals = randn(N) self.mgr.set('g', vals) - reindexed = self.mgr.reindex_items(['g', 'c', 'a', 'd']) + reindexed = self.mgr.reindex_axis(['g', 'c', 'a', 'd'], axis=0) self.assertEquals(reindexed.nblocks, 2) assert_almost_equal(reindexed.get('g'), vals.squeeze()) _check_cols(self.mgr, reindexed, ['c', 'a', 'd']) diff --git a/pandas/tools/merge.py b/pandas/tools/merge.py index 391828697502d..bb7347f0fa50f 100644 --- a/pandas/tools/merge.py +++ b/pandas/tools/merge.py @@ -14,7 +14,8 @@ from pandas.core.index import (Index, MultiIndex, _get_combined_index, _ensure_index, _get_consensus_names, _all_indexes_same) -from pandas.core.internals import BlockJoinOperation, get_concatenated_data +from pandas.core.internals import (BlockJoinOperation, get_concatenated_data, + items_overlap_with_suffix) from pandas.util.decorators import Appender, Substitution from pandas.core.common import (PandasError, ABCSeries, is_integer_dtype, isnull) @@ -278,8 +279,18 @@ def _get_merge_data(self): """ ldata, rdata = self.left._data, self.right._data lsuf, rsuf = self.suffixes - ldata, rdata = ldata._maybe_rename_join(rdata, lsuf, rsuf, - copydata=False) + + llabels, rlabels = items_overlap_with_suffix( + ldata.items, lsuf, rdata.items, rsuf) + + if not llabels.equals(ldata.items): + ldata = ldata.copy(deep=False) + ldata.set_axis(0, llabels) + + if not rlabels.equals(rdata.items): + rdata = rdata.copy(deep=False) + rdata.set_axis(0, rlabels) + return ldata, rdata def _get_merge_keys(self): diff --git a/pandas/tools/tests/test_merge.py b/pandas/tools/tests/test_merge.py index c3fa5b49fa28b..3e392c54f4065 100644 --- a/pandas/tools/tests/test_merge.py +++ b/pandas/tools/tests/test_merge.py @@ -1765,11 +1765,14 @@ def test_panel_join_overlap(self): p1 = panel.ix[['ItemA', 'ItemB', 'ItemC']] p2 = panel.ix[['ItemB', 'ItemC']] + # Expected index is + # + # ItemA, ItemB_p1, ItemC_p1, ItemB_p2, ItemC_p2 joined = p1.join(p2, lsuffix='_p1', rsuffix='_p2') p1_suf = p1.ix[['ItemB', 'ItemC']].add_suffix('_p1') p2_suf = p2.ix[['ItemB', 'ItemC']].add_suffix('_p2') no_overlap = panel.ix[['ItemA']] - expected = p1_suf.join(p2_suf).join(no_overlap) + expected = no_overlap.join(p1_suf.join(p2_suf)) tm.assert_panel_equal(joined, expected) def test_panel_join_many(self): diff --git a/pandas/tseries/resample.py b/pandas/tseries/resample.py index 7f243c20fe56e..b4a9e0186181b 100644 --- a/pandas/tseries/resample.py +++ b/pandas/tseries/resample.py @@ -336,7 +336,8 @@ def _take_new_index(obj, indexer, new_index, axis=0): elif isinstance(obj, DataFrame): if axis == 1: raise NotImplementedError - return DataFrame(obj._data.take(indexer, new_index=new_index, axis=1)) + return DataFrame(obj._data.reindex_indexer( + new_axis=new_index, indexer=indexer, axis=1)) else: raise NotImplementedError