Skip to content

Commit 4123445

Browse files
committed
DOC: Add scaling to large datasets section
Closes pandas-dev#28315
1 parent 1285938 commit 4123445

File tree

6 files changed

+399
-0
lines changed

6 files changed

+399
-0
lines changed

doc/.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
data/
2+
timeseries.csv
3+
timeseries.parquet
4+
timeseries_wide.parquet

doc/source/conf.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
"sphinx.ext.doctest",
5555
"sphinx.ext.extlinks",
5656
"sphinx.ext.todo",
57+
# "sphinxcontrib.youtube",
5758
"numpydoc", # handle NumPy documentation formatted docstrings
5859
"IPython.sphinxext.ipython_directive",
5960
"IPython.sphinxext.ipython_console_highlighting",

doc/source/index.rst.template

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ See the :ref:`overview` for more detail about what's in the library.
8383
* :doc:`user_guide/style`
8484
* :doc:`user_guide/options`
8585
* :doc:`user_guide/enhancingperf`
86+
* :doc:`user_guide/scale`
8687
* :doc:`user_guide/sparse`
8788
* :doc:`user_guide/gotchas`
8889
* :doc:`user_guide/cookbook`

doc/source/user_guide/scale.rst

Lines changed: 343 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,343 @@
1+
.. _scale:
2+
3+
*************************
4+
Scaling to large datasets
5+
*************************
6+
7+
Pandas provide data structures for in-memory analytics. This makes using pandas
8+
to analyze larger than memory datasets somewhat tricky.
9+
10+
This document provides a few recommendations for scaling to larger datasets.
11+
It's a complement to :ref:`enhancingperf`, which focuses on speeding up analysis
12+
for datasets that fit in memory.
13+
14+
But first, it's worth considering *not using pandas*. Pandas isn't the right
15+
tool for all situations. If you're working with very large datasets and a tool
16+
like PostgreSQL fits your needs, then you should probably be using that.
17+
Assuming you want or need the expressivity and power of pandas, let's carry on.
18+
19+
.. ipython:: python
20+
21+
import pandas as pd
22+
import numpy as np
23+
from pandas.util.testing import make_timeseries
24+
25+
26+
Use more efficient file formats
27+
-------------------------------
28+
29+
Depending on your workload, data loading may be a bottleneck. In these case you
30+
might consider switching from a slow format like CSV to a faster format like
31+
Parquet. Loading from a file format like Parquet will also require less memory
32+
usage, letting you load larger datasets into pandas before running out of
33+
memory.
34+
35+
.. ipython:: python
36+
37+
# Make a random in-memory dataset
38+
ts = make_timeseries(freq="30S", seed=0)
39+
ts
40+
41+
42+
We'll now write and read the file using CSV and parquet.
43+
44+
45+
.. ipython:: python
46+
47+
%time ts.to_csv("timeseries.csv")
48+
49+
.. ipython:: python
50+
51+
%time ts2 = pd.read_csv("timeseries.csv", index_col="timestamp", parse_dates=["timestamp"])
52+
53+
.. ipython:: python
54+
55+
%time ts.to_parquet("timeseries.parquet")
56+
57+
.. ipython:: python
58+
59+
%time _ = pd.read_parquet("timeseries.parquet")
60+
61+
Notice that parquet gives much higher performance for reading and writing, both
62+
in terms of speed and lower peak memory usage. See :ref:`io` for more.
63+
64+
Load less data
65+
--------------
66+
67+
Suppose our raw dataset on disk has many columns, but we need just a subset
68+
for our analysis. To get those columns, we can either
69+
70+
1. Load the entire dataset then select those columns.
71+
2. Just load the columns we need.
72+
73+
Loading just the columns you need can be much faster and requires less memory.
74+
75+
.. ipython:: python
76+
77+
# make a similar dataset with many columns
78+
timeseries = [
79+
make_timeseries(freq="1T", seed=i).rename(columns=lambda x: f"{x}_{i}")
80+
for i in range(10)
81+
]
82+
ts_wide = pd.concat(timeseries, axis=1)
83+
ts_wide.head()
84+
ts_wide.to_parquet("timeseries_wide.parquet")
85+
86+
87+
Option 1 loads in all the data and then filters to what we need.
88+
89+
.. ipython:: python
90+
91+
columns = ['id_0', 'name_0', 'x_0', 'y_0']
92+
93+
%time _ = pd.read_parquet("timeseries_wide.parquet")[columns]
94+
95+
Option 2 only loads the columns we request. This is faster and has a lower peak
96+
memory usage, since the entire dataset isn't in memory at once.
97+
98+
.. ipython:: python
99+
100+
%time _ = pd.read_parquet("timeseries_wide.parquet", columns=columns)
101+
102+
103+
With :func:`pandas.read_csv`, you can specify ``usecols`` to limit the columns
104+
read into memory.
105+
106+
107+
Use efficient datatypes
108+
-----------------------
109+
110+
The default pandas data types are not the most memory efficient. This is
111+
especially true for high-cardinality text data (columns with relatively few
112+
unique values). By using more efficient data types you can store larger datasets
113+
in memory.
114+
115+
.. ipython:: python
116+
117+
ts.dtypes
118+
119+
.. ipython:: python
120+
121+
ts.memory_usage(deep=True) # memory usage in bytes
122+
123+
124+
The ``name`` column is taking up much more memory than any other. It has just a
125+
few unique values, so it's a good candidate for converting to a
126+
:class:`Categorical`. With a Categorical, we store each unique name once and use
127+
space-efficient integers to know which specific name is used in each row.
128+
129+
130+
.. ipython:: python
131+
132+
ts2 = ts.copy()
133+
ts2['name'] = ts2['name'].astype('category')
134+
ts2.memory_usage(deep=True)
135+
136+
We can go a bit further and downcast the numeric columns to their smallest types
137+
using :func:`pandas.to_numeric`.
138+
139+
.. ipython:: python
140+
141+
ts2['id'] = pd.to_numeric(ts2['id'], downcast='unsigned')
142+
ts2[['x', 'y']] = ts2[['x', 'y']].apply(pd.to_numeric, downcast='float')
143+
ts2.dtypes
144+
145+
.. ipython:: python
146+
147+
ts2.memory_usage(deep=True)
148+
149+
.. ipython:: python
150+
151+
reduction = ts2.memory_usage(deep=True).sum() / ts.memory_usage(deep=True).sum()
152+
print(f"{reduction:0.2f}")
153+
154+
In all, we've reduced the in-memory footprint of this dataset to 1/5 of its
155+
original size.
156+
157+
See :ref:`categorical` for more on ``Categorical`` and :ref:`basics.dtypes`
158+
for an overview of all of pandas' dtypes.
159+
160+
Use Other libraries
161+
-------------------
162+
163+
Pandas is just one library offering a DataFrame API. Because of its popularity,
164+
pandas' API has become something of a standard that other libraries implement.
165+
166+
For example, `Dask`_, a parallel computing library, has `dask.dataframe`_, a
167+
pandas-like API for working with larger than memory datasets in parallel. Dask
168+
can use multiple threads or processes on a single machine, or a cluster of
169+
machines to process data in parallel.
170+
171+
Let's make a larger dataset on disk (as parquet files) that's split into chunks,
172+
one per year.
173+
174+
.. ipython:: python
175+
176+
import pathlib
177+
178+
N = 12
179+
starts = [f'20{i:>02d}-01-01' for i in range(N)]
180+
ends = [f'20{i:>02d}-12-13' for i in range(N)]
181+
182+
pathlib.Path("data/timeseries").mkdir(exist_ok=True)
183+
184+
for i, (start, end) in enumerate(zip(starts, ends)):
185+
ts = make_timeseries(start=start, end=end, freq='1T', seed=i)
186+
ts.to_parquet(f"data/timeseries/ts-{i}.parquet")
187+
188+
We'll import ``dask.dataframe`` and notice that the API feels similar to pandas.
189+
We can use Dask's ``read_parquet`` function, but provide a globstring of files to read in.
190+
191+
.. ipython:: python
192+
193+
import dask.dataframe as dd
194+
195+
ddf = dd.read_parquet("data/timeseries/ts*.parquet", engine="pyarrow")
196+
ddf
197+
198+
Inspecting the ``ddf`` object, we see a few things
199+
200+
* There are familiar attributes like ``.columns`` and ``.dtypes``
201+
* There are familiar methods like ``.groupby``, ``.sum``, etc.
202+
* There are new attributes like ``.npartitions`` and ``.divisions``
203+
204+
The partitions and divisions are how Dask parallizes computation. A **Dask**
205+
DataFrame is made up of many **Pandas** DataFrames. A single method call on a
206+
Dask DataFrame ends up making many pandas method calls, and Dask knows how to
207+
coordinate everything to get the result.
208+
209+
.. ipython:: python
210+
211+
ddf.columns
212+
ddf.dtypes
213+
ddf.npartitions
214+
215+
One major difference: the ``dask.dataframe`` API is *lazy*. If you look at the
216+
repr above, you'll notice that the values aren't actually printed out; just the
217+
column names and dtypes. That's because Dask hasn't actually read the data yet.
218+
Rather than executing immediately, doing operations build up a **task graph**.
219+
220+
.. ipython:: python
221+
222+
ddf
223+
ddf['name']
224+
ddf['name'].value_counts()
225+
226+
Each of these calls is instant because the result isn't being computed yet.
227+
We're just building up a list of computation to do when someone needs the
228+
result. Dask knows that the return type of a ``pandas.Series.value_counts``
229+
is a pandas Series with a certain dtype and a certain name. So the Dask version
230+
returns a Dask Series with the same dtype and the same name.
231+
232+
To get the actual result you can call ``.compute()``.
233+
234+
.. ipython:: python
235+
236+
%time ddf['name'].value_counts().compute()
237+
238+
At that point, the full task graph (reading in data, selecting the columns,
239+
doing the ``value_counts``) is executed *in parallel*. You get back the same
240+
thing you'd get back from pandas, in this case a concrete pandas Series with the
241+
count of each ``name``.
242+
243+
By default, ``dask.dataframe`` operations use a threadpool to do operations in
244+
parallel. We can also connect to a cluster to distribute the work on many
245+
machines. In this case we'll connect to a local "cluster" made up of several
246+
processes on this single machine.
247+
248+
.. ipython:: python
249+
250+
from dask.distributed import Client, LocalCluster
251+
252+
cluster = LocalCluster()
253+
client = Client(cluster)
254+
client
255+
256+
Once this ``client`` is created, all of Dask's computation will take place on
257+
the cluster (which is just processes in this case).
258+
259+
Dask implements the most used parts of the pandas API. For example, we can do
260+
a familiar groupby aggregation.
261+
262+
.. ipython:: python
263+
264+
%time ddf.groupby('name')[['x', 'y']].mean().compute().head()
265+
266+
The grouping and aggregation is done out-of-core and in parallel.
267+
268+
When Dask knows the ``divisions`` of a dataset, certain optimizations are
269+
possible. When reading parquet datasets written by dask, the divisions will be
270+
known automatically. In this case, since we created the parquet files manually,
271+
we need to supply the divisions manually.
272+
273+
.. ipython:: python
274+
275+
divisions = tuple(pd.to_datetime(starts)) + (pd.Timestamp(ends[-1]),)
276+
ddf.divisions = divisions
277+
ddf
278+
279+
Now we can do things like fast random access with ``.loc``.
280+
281+
.. ipython:: python
282+
283+
ddf.loc['2002-01-01 12:01':'2002-01-01 12:05'].compute()
284+
285+
Dask knows to just look in the 3rd partition for selecting values in `2002`. It
286+
doesn't need to look at any other data.
287+
288+
Many workflows involve a large amount of data and processing it in a way that
289+
reduces the size to something that fits in memory. In this case, we'll resample
290+
to daily frequency and take the mean. Once we've taken the mean, we know the
291+
results will fit in memory, so we can safely call ``compute`` without running
292+
out of memory. At that point it's just a regular pandas object.
293+
294+
.. ipython:: python
295+
296+
@savefig dask_resample.png
297+
ddf[['x', 'y']].resample("1D").mean().cumsum().compute().plot()
298+
299+
These Dask examples have all be done using multiple processes on a single
300+
machine. Dask can be `deployed on a cluster
301+
<https://docs.dask.org/en/latest/setup.html>`_ to scale up to even larger
302+
datasets.
303+
304+
You see more dask examples at https://examples.dask.org.
305+
306+
Use chunking
307+
------------
308+
309+
If using another library like Dask is not an option, you can achieve similar
310+
results with a bit of work.
311+
312+
For example, we can recreate the out-of-core ``value_counts`` we did earlier
313+
with Dask. The peak memory usage of this will be the size of the single largest
314+
DataFrame.
315+
316+
.. ipython:: python
317+
318+
files = list(pathlib.Path("data/timeseries/").glob("ts*.parquet"))
319+
files
320+
321+
.. ipython:: python
322+
323+
%%time
324+
counts = pd.Series(dtype=int)
325+
for path in files:
326+
df = pd.read_parquet(path)
327+
counts = counts.add(df['name'].value_counts(), fill_value=0)
328+
counts.astype(int)
329+
330+
This matches the counts we saw above with Dask.
331+
332+
Some readers, like :meth:`pandas.read_csv` offer parameters to control the
333+
``chunksize``. Manually chunking is an OK option for workflows that don't
334+
require too sophisticated of operations. Some operations, like ``groupby``, are
335+
much harder to do chunkwise. In these cases, you may be better switching to a
336+
library like Dask, which implements these chunked algorithms for you.
337+
338+
.. ipython:: python
339+
340+
del client, cluster
341+
342+
.. _Dask: https://dask.org
343+
.. _dask.dataframe: https://docs.dask.org/en/latest/dataframe.html

environment.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ dependencies:
3535
- nbconvert>=5.4.1
3636
- nbsphinx
3737
- pandoc
38+
- dask
39+
- distributed
3840

3941
# web (jinja2 is also needed, but it's also an optional pandas dependency)
4042
- markdown

0 commit comments

Comments
 (0)