forked from jcrobak/parquet-python
-
-
Notifications
You must be signed in to change notification settings - Fork 183
Closed
Description
While running the dask
test suite against the upcoming pandas
2.0 release, we got the following fastparquet
-related error due to Categorical._codes
being removed in pandas=2.0
(setting it was deprecated in pandas=1.3
)
_____________________________________________________ test_categories_unnamed_index[fastparquet] ______________________________________________________
tmpdir = '/private/var/folders/k_/lx1rdvqn253gd1wrcx__5frm0000gn/T/pytest-of-james/pytest-32/test_categories_unnamed_index_0', engine = 'fastparquet'
def test_categories_unnamed_index(tmpdir, engine):
# Check that we can handle an unnamed categorical index
# https://github.com/dask/dask/issues/6885
tmpdir = str(tmpdir)
df = pd.DataFrame(
data={"A": [1, 2, 3], "B": ["a", "a", "b"]}, index=["x", "y", "y"]
)
ddf = dd.from_pandas(df, npartitions=1)
ddf = ddf.categorize(columns=["B"])
ddf.to_parquet(tmpdir, engine=engine)
ddf2 = dd.read_parquet(tmpdir, engine=engine)
> assert_eq(ddf.index, ddf2.index, check_divisions=False)
dask/dataframe/io/tests/test_parquet.py:1258:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
dask/dataframe/utils.py:556: in assert_eq
b = _check_dask(
dask/dataframe/utils.py:464: in _check_dask
result = dsk.compute(scheduler=scheduler)
dask/base.py:315: in compute
(result,) = compute(self, traverse=False, **kwargs)
dask/base.py:600: in compute
results = schedule(dsk, keys, **kwargs)
dask/local.py:557: in get_sync
return get_async(
dask/local.py:500: in get_async
for key, res_info, failed in queue_get(queue).result():
../../../mambaforge/envs/dask/lib/python3.10/concurrent/futures/_base.py:439: in result
return self.__get_result()
../../../mambaforge/envs/dask/lib/python3.10/concurrent/futures/_base.py:391: in __get_result
raise self._exception
dask/local.py:542: in submit
fut.set_result(fn(*args, **kwargs))
dask/local.py:238: in batch_execute_tasks
return [execute_task(*a) for a in it]
dask/local.py:238: in <listcomp>
return [execute_task(*a) for a in it]
dask/local.py:229: in execute_task
result = pack_exception(e, dumps)
dask/local.py:224: in execute_task
result = _execute_task(task, data)
dask/core.py:119: in _execute_task
return func(*(_execute_task(a, cache) for a in args))
dask/optimization.py:990: in __call__
return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
dask/core.py:149: in get
result = _execute_task(task, cache)
dask/core.py:119: in _execute_task
return func(*(_execute_task(a, cache) for a in args))
dask/core.py:119: in <genexpr>
return func(*(_execute_task(a, cache) for a in args))
dask/core.py:119: in _execute_task
return func(*(_execute_task(a, cache) for a in args))
dask/dataframe/io/parquet/core.py:97: in __call__
return read_parquet_part(
dask/dataframe/io/parquet/core.py:644: in read_parquet_part
dfs = [
dask/dataframe/io/parquet/core.py:645: in <listcomp>
func(
dask/dataframe/io/parquet/fastparquet.py:1005: in read_partition
return cls.pf_to_pandas(
dask/dataframe/io/parquet/fastparquet.py:1045: in pf_to_pandas
df, views = pf.pre_allocate(size, columns, categories, index)
../../../mambaforge/envs/dask/lib/python3.10/site-packages/fastparquet/api.py:780: in pre_allocate
df, arrs = _pre_allocate(size, columns, categories, index, cats,
../../../mambaforge/envs/dask/lib/python3.10/site-packages/fastparquet/api.py:1018: in _pre_allocate
df, views = dataframe.empty(dtypes, size, cols=cols, index_names=index,
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
types = [dtype('int64'), 'category'], size = 3, cats = {'B': 2, '__null_dask_index__': 2}, cols = ['A', 'B'], index_types = ['category']
index_names = ['__null_dask_index__'], timezones = {}
def empty(types, size, cats=None, cols=None, index_types=None, index_names=None,
timezones=None):
"""
Create empty DataFrame to assign into
In the simplest case, will return a Pandas dataframe of the given size,
with columns of the given names and types. The second return value `views`
is a dictionary of numpy arrays into which you can assign values that
show up in the dataframe.
For categorical columns, you get two views to assign into: if the
column name is "col", you get both "col" (the category codes) and
"col-catdef" (the category labels).
For a single categorical index, you should use the `.set_categories`
method of the appropriate "-catdef" columns, passing an Index of values
``views['index-catdef'].set_categories(pd.Index(newvalues), fastpath=True)``
Multi-indexes work a lot like categoricals, even if the types of each
index are not themselves categories, and will also have "-catdef" entries
in the views. However, these will be Dummy instances, providing only a
``.set_categories`` method, to be used as above.
Parameters
----------
types: like np record structure, 'i4,u2,f4,f2,f4,M8,m8', or using tuples
applies to non-categorical columns. If there are only categorical
columns, an empty string of None will do.
size: int
Number of rows to allocate
cats: dict {col: labels}
Location and labels for categorical columns, e.g., {1: ['mary', 'mo]}
will create column index 1 (inserted amongst the numerical columns)
with two possible values. If labels is an integers, `{'col': 5}`,
will generate temporary labels using range. If None, or column name
is missing, will assume 16-bit integers (a reasonable default).
cols: list of labels
assigned column names, including categorical ones.
index_types: list of str
For one of more index columns, make them have this type. See general
description, above, for caveats about multi-indexing. If None, the
index will be the default RangeIndex.
index_names: list of str
Names of the index column(s), if using
timezones: dict {col: timezone_str}
for timestamp type columns, apply this timezone to the pandas series;
the numpy view will be UTC.
Returns
-------
- dataframe with correct shape and data-types
- list of numpy views, in order, of the columns of the dataframe. Assign
to this.
"""
views = {}
timezones = timezones or {}
if isinstance(types, str):
types = types.split(',')
cols = cols if cols is not None else range(len(types))
def cat(col):
if cats is None or col not in cats:
return RangeIndex(0, 2**14)
elif isinstance(cats[col], int):
return RangeIndex(0, cats[col])
else: # explicit labels list
return cats[col]
df = OrderedDict()
for t, col in zip(types, cols):
if str(t) == 'category':
df[str(col)] = Categorical([], categories=cat(col), fastpath=True)
elif isinstance(t, BaseMaskedDtype):
# pandas masked types
arr_type = t.construct_array_type()
df[str(col)] = arr_type(
values=np.empty(0, dtype=t.numpy_dtype),
mask=np.empty(0, dtype=np.bool_),
copy=False
)
else:
if hasattr(t, 'base'):
# funky pandas not-dtype
t = t.base
d = np.empty(0, dtype=t)
if d.dtype.kind == "M" and str(col) in timezones:
try:
z = tz_to_dt_tz(timezones[str(col)])
d = Series(d).dt.tz_localize(z)
except:
warnings.warn("Inferring time-zone from %s in column %s "
"failed, using time-zone-agnostic"
"" % (timezones[str(col)], col))
df[str(col)] = d
df = DataFrame(df)
if not index_types:
index = RangeIndex(size)
elif len(index_types) == 1:
t, col = index_types[0], index_names[0]
if col is None:
raise ValueError('If using an index, must give an index name')
if str(t) == 'category':
c = Categorical([], categories=cat(col), fastpath=True)
vals = np.zeros(size, dtype=c.codes.dtype)
index = CategoricalIndex(c)
> index._data._codes = vals
E AttributeError: can't set attribute '_codes'
../../../mambaforge/envs/dask/lib/python3.10/site-packages/fastparquet/dataframe.py:129: AttributeError
Metadata
Metadata
Assignees
Labels
No labels