diff --git a/01_dask.delayed.ipynb b/01_dask.delayed.ipynb index ea19df2..5562d79 100644 --- a/01_dask.delayed.ipynb +++ b/01_dask.delayed.ipynb @@ -28,7 +28,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "As well see in the [distributed scheduler notebook](05_distributed.ipynb), Dask has several ways of executing code in parallel. We'll use the distributed scheduler by creating a `dask.distributed.Client`. For now, this will provide us with some nice diagnostics. We'll talk about schedulers in depth later." + "As we'll see in the [distributed scheduler notebook](05_distributed.ipynb), Dask has several ways of executing code in parallel. We'll use the distributed scheduler by creating a `dask.distributed.Client`. For now, this will provide us with some nice diagnostics. We'll talk about schedulers in depth later." ] }, { @@ -100,7 +100,7 @@ "\n", "Those two increment calls *could* be called in parallel, because they are totally independent of one-another.\n", "\n", - "We'll transform the `inc` and `add` functions using the `dask.delayed` function. When we call the delayed version by passing the arguments, exactly as before, but the original function isn't actually called yet - which is why the cell execution finishes very quickly.\n", + "We'll transform the `inc` and `add` functions using the `dask.delayed` function. When we call the delayed version by passing the arguments, exactly as before, the original function isn't actually called yet - which is why the cell execution finishes very quickly.\n", "Instead, a *delayed object* is made, which keeps track of the function to call and the arguments to pass to it.\n" ] }, diff --git a/01x_lazy.ipynb b/01x_lazy.ipynb index 97d5f65..da68b49 100644 --- a/01x_lazy.ipynb +++ b/01x_lazy.ipynb @@ -58,7 +58,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "Dask allows you to construct a prescription for the calculation you want to carry out. That may sound strange, but a simple example will demonstrate that you can achieve this while programming with perfectly ordinary Python functions and for-loops. We saw this in Chapter 02." + "Dask allows you to construct a prescription for the calculation you want to carry out. That may sound strange, but a simple example will demonstrate that you can achieve this while programming with perfectly ordinary Python functions and for-loops. We saw this in the previous notebook." ] }, { @@ -98,15 +98,15 @@ "x = inc(15)\n", "y = inc(30)\n", "total = add(x, y)\n", - "# incx, incy and total are all delayed objects. \n", - "# They contain a prescription of how to execute" + "# x, y and total are all delayed objects. \n", + "# They contain a prescription of how to carry out the computation" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "Calling a delayed function created a delayed object (`incx, incy, total`) - examine these interactively. Making these objects is somewhat equivalent to constructs like the `lambda` or function wrappers (see below). Each holds a simple dictionary describing the task graph, a full specification of how to carry out the computation.\n", + "Calling a delayed function created a delayed object (`x, y, total`) which can be examined interactively. Making these objects is somewhat equivalent to constructs like the `lambda` or function wrappers (see below). Each holds a simple dictionary describing the task graph, a full specification of how to carry out the computation.\n", "\n", "We can visualize the chain of calculations that the object `total` corresponds to as follows; the circles are functions, rectangles are data/results." ] @@ -388,7 +388,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "There are many tasks that take a while to complete, but don't actually require much of the CPU, for example anything that requires communication over a network, or input from a user. In typical sequential programming, execution would need to halt while the process completes, and then continue execution. That would be dreadful for a user experience (imagine the slow progress bar that locks up the application and cannot be canceled), and wasteful of time (the CPU could have been doing useful work in the meantime.\n", + "There are many tasks that take a while to complete, but don't actually require much of the CPU, for example anything that requires communication over a network, or input from a user. In typical sequential programming, execution would need to halt while the process completes, and then continue execution. That would be dreadful for user experience (imagine the slow progress bar that locks up the application and cannot be canceled), and wasteful of time (the CPU could have been doing useful work in the meantime).\n", "\n", "For example, we can launch processes and get their output as follows:\n", "```python\n", @@ -556,9 +556,9 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "Any Dask object, such as `total`, above, has an attribute which describes the calculations necessary to produce that result. Indeed, this is exactly the graph that we have been talking about, which can be visualized. We see that it is a simple dictionary, the keys are unique task identifiers, and the values are the functions and inputs for calculation.\n", + "Any Dask object, such as `total`, above, has an attribute which describes the calculations necessary to produce that result. Indeed, this is exactly the graph that we have been talking about, which can be visualized. We see that it is a simple dictionary, in which the keys are unique task identifiers, and the values are the functions and inputs for calculation.\n", "\n", - "`delayed` is a handy mechanism for creating the Dask graph, but the adventerous may wish to play with the full fexibility afforded by building the graph dictionaries directly. Detailed information can be found [here](http://dask.pydata.org/en/latest/graphs.html)." + "`delayed` is a handy mechanism for creating the Dask graph, but the adventurous may wish to play with the full fexibility afforded by building the graph dictionaries directly. Detailed information can be found [here](http://dask.pydata.org/en/latest/graphs.html)." ] }, { diff --git a/02_bag.ipynb b/02_bag.ipynb index c22f634..dc5da4a 100644 --- a/02_bag.ipynb +++ b/02_bag.ipynb @@ -320,7 +320,9 @@ "metadata": {}, "outputs": [], "source": [ - "js.filter(lambda record: record['name'] == 'Alice').pluck('transactions').take(3)" + "(js.filter(lambda record: record['name'] == 'Alice')\n", + " .pluck('transactions')\n", + " .take(3))" ] }, { @@ -455,8 +457,7 @@ "metadata": {}, "outputs": [], "source": [ - "is_even = lambda x: x % 2\n", - "b.foldby(is_even, binop=max, combine=max).compute()" + "b.foldby(lambda x: x % 2, binop=max, combine=max).compute()" ] }, { @@ -495,7 +496,7 @@ "# This one is comparatively fast and produces the same result.\n", "from operator import add\n", "def incr(tot, _):\n", - " return tot+1\n", + " return tot + 1\n", "\n", "result = js.foldby(key='name', \n", " binop=incr, \n", @@ -549,7 +550,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "For the same reasons that Pandas is often faster than pure Python, `dask.dataframe` can be faster than `dask.bag`. We will work more with DataFrames later, but from for the bag point of view, they are frequently the end-point of the \"messy\" part of data ingestion—once the data can be made into a data-frame, then complex split-apply-combine logic will become much more straight-forward and efficient.\n", + "For the same reasons that Pandas is often faster than pure Python, `dask.dataframe` can be faster than `dask.bag`. We will work more with DataFrames later, but from the point of view of a Bag, it is frequently the end-point of the \"messy\" part of data ingestion—once the data can be made into a data-frame, then complex split-apply-combine logic will become much more straight-forward and efficient.\n", "\n", "You can transform a bag with a simple tuple or flat dictionary structure into a `dask.dataframe` with the `to_dataframe` method." ] @@ -575,7 +576,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "Using a Dask DataFrame, how long does it take to do our prior computation of numbers of people with the same name? It turns out that `dask.dataframe.groupby()` beats `dask.bag.groupby()` more than an order of magnitude; but it still cannot match `dask.bag.foldby()` for this case." + "Using a Dask DataFrame, how long does it take to do our prior computation of numbers of people with the same name? It turns out that `dask.dataframe.groupby()` beats `dask.bag.groupby()` by more than an order of magnitude; but it still cannot match `dask.bag.foldby()` for this case." ] }, { @@ -608,7 +609,7 @@ "outputs": [], "source": [ "def denormalize(record):\n", - " # returns a list for every nested item, each transaction of each person\n", + " # returns a list for each person, one item per transaction\n", " return [{'id': record['id'], \n", " 'name': record['name'], \n", " 'amount': transaction['amount'], \n", diff --git a/03_array.ipynb b/03_array.ipynb index 63724a1..306e3c8 100644 --- a/03_array.ipynb +++ b/03_array.ipynb @@ -115,7 +115,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "Before using dask, lets consider the concept of blocked algorithms. We can compute the sum of a large number of elements by loading them chunk-by-chunk, and keeping a running total.\n", + "Before using dask, let's consider the concept of blocked algorithms. We can compute the sum of a large number of elements by loading them chunk-by-chunk, and keeping a running total.\n", "\n", "Here we compute the sum of this large array on disk by \n", "\n", @@ -133,8 +133,8 @@ "source": [ "# Compute sum of large array, one million numbers at a time\n", "sums = []\n", - "for i in range(0, 1000000000, 1000000):\n", - " chunk = dset[i: i + 1000000] # pull out numpy array\n", + "for i in range(0, 1_000_000_000, 1_000_000):\n", + " chunk = dset[i: i + 1_000_000] # pull out numpy array\n", " sums.append(chunk.sum())\n", "\n", "total = sum(sums)\n", @@ -152,7 +152,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "Now that we've seen the simple example above try doing a slightly more complicated problem, compute the mean of the array, assuming for a moment that we don't happen to already know how many elements are in the data. You can do this by changing the code above with the following alterations:\n", + "Now that we've seen the simple example above, try doing a slightly more complicated problem. Compute the mean of the array, assuming for a moment that we don't happen to already know how many elements are in the data. You can do this by changing the code above with the following alterations:\n", "\n", "1. Compute the sum of each block\n", "2. Compute the length of each block\n", @@ -182,8 +182,8 @@ "source": [ "sums = []\n", "lengths = []\n", - "for i in range(0, 1000000000, 1000000):\n", - " chunk = dset[i: i + 1000000] # pull out numpy array\n", + "for i in range(0, 1_000_000_000, 1_000_000):\n", + " chunk = dset[i: i + 1_000_000] # pull out numpy array\n", " sums.append(chunk.sum())\n", " lengths.append(len(chunk))\n", "\n", @@ -216,7 +216,7 @@ "You can create a `dask.array` `Array` object with the `da.from_array` function. This function accepts\n", "\n", "1. `data`: Any object that supports NumPy slicing, like `dset`\n", - "2. `chunks`: A chunk size to tell us how to block up our array, like `(1000000,)`" + "2. `chunks`: A chunk size to tell us how to block up our array, like `(1_000_000,)`" ] }, { @@ -226,7 +226,7 @@ "outputs": [], "source": [ "import dask.array as da\n", - "x = da.from_array(dset, chunks=(1000000,))\n", + "x = da.from_array(dset, chunks=(1_000_000,))\n", "x" ] }, @@ -234,7 +234,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "** Manipulate `dask.array` object as you would a numpy array**" + "**Manipulate `dask.array` object as you would a numpy array**" ] }, { @@ -243,7 +243,7 @@ "source": [ "Now that we have an `Array` we perform standard numpy-style computations like arithmetic, mathematics, slicing, reductions, etc..\n", "\n", - "The interface is familiar, but the actual work is different. dask_array.sum() does not do the same thing as numpy_array.sum()." + "The interface is familiar, but the actual work is different. `dask_array.sum()` does not do the same thing as `numpy_array.sum()`." ] }, { @@ -353,7 +353,7 @@ "1. Use multiple cores in parallel\n", "2. Chain operations on a single blocks before moving on to the next one\n", "\n", - "Dask.array translates your array operations into a graph of inter-related tasks with data dependencies between them. Dask then executes this graph in parallel with multiple threads. We'll discuss more about this in the next section.\n", + "`Dask.array` translates your array operations into a graph of inter-related tasks with data dependencies between them. Dask then executes this graph in parallel with multiple threads. We'll discuss more about this in the next section.\n", "\n" ] }, @@ -410,7 +410,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "Performance comparision\n", + "Performance comparison\n", "---------------------------\n", "\n", "The following experiment was performed on a heavy personal laptop. Your performance may vary. If you attempt the NumPy version then please ensure that you have more than 4GB of main memory." @@ -544,13 +544,6 @@ "dsets[0]" ] }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [] - }, { "cell_type": "code", "execution_count": null, @@ -597,7 +590,11 @@ { "cell_type": "code", "execution_count": null, - "metadata": {}, + "metadata": { + "jupyter": { + "source_hidden": true + } + }, "outputs": [], "source": [ "arrays = [da.from_array(dset, chunks=(500, 500)) for dset in dsets]\n", @@ -628,7 +625,11 @@ { "cell_type": "code", "execution_count": null, - "metadata": {}, + "metadata": { + "jupyter": { + "source_hidden": true + } + }, "outputs": [], "source": [ "x = da.stack(arrays, axis=0)\n", @@ -783,7 +784,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "The [Lennard-Jones](https://en.wikipedia.org/wiki/Lennard-Jones_potential) is used in partical simuluations in physics, chemistry and engineering. It is highly parallelizable.\n", + "The [Lennard-Jones potential](https://en.wikipedia.org/wiki/Lennard-Jones_potential) is used in partical simuluations in physics, chemistry and engineering. It is highly parallelizable.\n", "\n", "First, we'll run and profile the Numpy version on 7,000 particles." ] diff --git a/04_dataframe.ipynb b/04_dataframe.ipynb index 4035ea1..5008291 100644 --- a/04_dataframe.ipynb +++ b/04_dataframe.ipynb @@ -27,7 +27,7 @@ "source": [ "\n", "\n", - "The `dask.dataframe` module implements a blocked parallel `DataFrame` object that mimics a large subset of the Pandas `DataFrame`. One Dask `DataFrame` is comprised of many in-memory pandas `DataFrames` separated along the index. One operation on a Dask `DataFrame` triggers many pandas operations on the constituent pandas `DataFrame`s in a way that is mindful of potential parallelism and memory constraints.\n", + "The `dask.dataframe` module implements a blocked parallel `DataFrame` object that mimics a large subset of the Pandas `DataFrame` API. One Dask `DataFrame` is comprised of many in-memory pandas `DataFrames` separated along the index. One operation on a Dask `DataFrame` triggers many pandas operations on the constituent pandas `DataFrame`s in a way that is mindful of potential parallelism and memory constraints.\n", "\n", "**Related Documentation**\n", "\n", @@ -500,7 +500,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "But lets try by passing both to a single `compute` call." + "But let's try by passing both to a single `compute` call." ] }, { @@ -524,7 +524,7 @@ "- the filter (`df[~df.Cancelled]`)\n", "- some of the necessary reductions (`sum`, `count`)\n", "\n", - "To see what the merged task graphs between multiple results look like (and what's shared), you can use the `dask.visualize` function (we might want to use `filename='graph.pdf'` to zoom in on the graph better):" + "To see what the merged task graphs between multiple results look like (and what's shared), you can use the `dask.visualize` function (we might want to use `filename='graph.pdf'` to save the graph to disk so that we can zoom in more easily):" ] }, { @@ -560,7 +560,7 @@ "Dask.dataframe operations use `pandas` operations internally. Generally they run at about the same speed except in the following two cases:\n", "\n", "1. Dask introduces a bit of overhead, around 1ms per task. This is usually negligible.\n", - "2. When Pandas releases the GIL (coming to `groupby` in the next version) `dask.dataframe` can call several pandas operations in parallel within a process, increasing speed somewhat proportional to the number of cores. For operations which don't release the GIL, multiple processes would be needed to get the same speedup." + "2. When Pandas releases the GIL `dask.dataframe` can call several pandas operations in parallel within a process, increasing speed somewhat proportional to the number of cores. For operations which don't release the GIL, multiple processes would be needed to get the same speedup." ] }, { diff --git a/05_distributed.ipynb b/05_distributed.ipynb index 2b8e82b..572268c 100644 --- a/05_distributed.ipynb +++ b/05_distributed.ipynb @@ -21,7 +21,7 @@ "As we have seen so far, Dask allows you to simply construct graphs of tasks with dependencies, as well as have graphs created automatically for you using functional, Numpy or Pandas syntax on data collections. None of this would be very useful, if there weren't also a way to execute these graphs, in a parallel and memory-aware way. So far we have been calling `thing.compute()` or `dask.compute(thing)` without worrying what this entails. Now we will discuss the options available for that execution, and in particular, the distributed scheduler, which comes with additional functionality.\n", "\n", "Dask comes with four available schedulers:\n", - "- \"threaded\": a scheduler backed by a thread pool\n", + "- \"threaded\" (aka \"threading\"): a scheduler backed by a thread pool\n", "- \"processes\": a scheduler backed by a process pool\n", "- \"single-threaded\" (aka \"sync\"): a synchronous scheduler, good for debugging\n", "- distributed: a distributed scheduler for executing graphs on multiple machines, see below.\n", @@ -29,24 +29,24 @@ "To select one of these for computation, you can specify at the time of asking for a result, e.g.,\n", "```python\n", "myvalue.compute(scheduler=\"single-threaded\") # for debugging\n", - "```" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "or set the current default, either temporarily or globally\n", + "```\n", + "\n", + "You can also set a default scheduler either temporarily\n", "```python\n", "with dask.config.set(scheduler='processes'):\n", " # set temporarily for this block only\n", + " # all compute calls within this block will use the specified scheduler\n", " myvalue.compute()\n", + " anothervalue.compute()\n", + "```\n", "\n", - "dask.config.set(scheduler='processes')\n", + "Or globally\n", + "```python\n", "# set until further notice\n", + "dask.config.set(scheduler='processes')\n", "```\n", "\n", - "Lets see the difference for the familiar case of the flights data" + "Let's try out a few schedulers on the familiar case of the flights data." ] }, { diff --git a/06_distributed_advanced.ipynb b/06_distributed_advanced.ipynb index 35112cc..108cf72 100644 --- a/06_distributed_advanced.ipynb +++ b/06_distributed_advanced.ipynb @@ -36,9 +36,9 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "In chapter Distributed, we showed that executing a calculation (created using delayed) with the distributed executor is identical to any other executor. However, we now have access to additional functionality, and control over what data is held in memory.\n", + "In the previous chapter, we showed that executing a calculation (created using delayed) with the distributed executor is identical to any other executor. However, we now have access to additional functionality, and control over what data is held in memory.\n", "\n", - "To begin, the `futures` interface (derived from the built-in `concurrent.futures`) allow map-reduce like functionality. We can submit individual functions for evaluation with one set of inputs, or evaluated over a sequence of inputs with `submit()` and `map()`. Notice that the call returns immediately, giving one or more *futures*, whose status begins as \"pending\" and later becomes \"finished\". There is no blocking of the local Python session." + "To begin, the `futures` interface (derived from the built-in `concurrent.futures`) allows map-reduce like functionality. We can submit individual functions for evaluation with one set of inputs, or evaluated over a sequence of inputs with `submit()` and `map()`. Notice that the call returns immediately, giving one or more *futures*, whose status begins as \"pending\" and later becomes \"finished\". There is no blocking of the local Python session." ] }, { @@ -540,13 +540,9 @@ "def ratio(a, b):\n", " return a // b\n", "\n", - "@delayed\n", - "def summation(*a):\n", - " return sum(*a)\n", - "\n", "ina = [5, 25, 30]\n", "inb = [5, 5, 6]\n", - "out = summation([ratio(a, b) for (a, b) in zip(ina, inb)])\n", + "out = delayed(sum)([ratio(a, b) for (a, b) in zip(ina, inb)])\n", "f = c.compute(out)\n", "f" ] @@ -571,7 +567,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "But if we introduce bad input, an exception is raised. The exception happens in `ratio`, but only comes to our attention when calculating `summation`." + "But if we introduce bad input, an exception is raised. The exception happens in `ratio`, but only comes to our attention when calculating the sum." ] }, { @@ -586,7 +582,7 @@ "source": [ "ina = [5, 25, 30]\n", "inb = [5, 0, 6]\n", - "out = summation([ratio(a, b) for (a, b) in zip(ina, inb)])\n", + "out = delayed(sum)([ratio(a, b) for (a, b) in zip(ina, inb)])\n", "f = c.compute(out)\n", "c.gather(f)" ] @@ -634,10 +630,10 @@ "metadata": {}, "source": [ "The trouble with this approach is that Dask is meant for the execution of large datasets/computations - you probably can't simply run the whole thing \n", - "in one local thread, else you wouldn't have used Dask in the first place. So the code above should only be used on a small part of the data that also exchibits the error. \n", + "in one local thread, else you wouldn't have used Dask in the first place. So the code above should only be used on a small part of the data that also exihibits the error. \n", "Furthermore, the method will not work when you are dealing with futures (such as `f`, above, or after persisting) instead of delayed-based computations.\n", "\n", - "As alternative, you can ask the scheduler to analyze your calculation and find the specific sub-task responsible for the error, and pull only it and its dependnecies locally for execution." + "As an alternative, you can ask the scheduler to analyze your calculation and find the specific sub-task responsible for the error, and pull only it and its dependnecies locally for execution." ] }, {