tl;dr: We evaluate dask graphs with a variety of schedulers and introduce a new distributed memory scheduler.
Dask.distributed is new and is not battle-tested. Use at your own risk and adjust expectations accordingly.
Evaluate dask graphs
Most dask users use the dask collections,
DataFrame. These collections are convenient ways to produce
dask graphs. A dask graph is a dictionary of tasks. A task is a tuple with a
function and arguments.
The graph comprising a dask collection (like a dask.array) is available through
Further operations on
x create more complex graphs
Hand-made dask graphs
We can make dask graphs by hand without dask collections. This involves creating a dictionary of tuples of functions.
We evaluate these graphs with one of the dask schedulers
We separate the evaluation of the graphs from their construction.
The separation of graphs from evaluation allows us to create new schedulers. In particular there exists a scheduler that operates on multiple machines in parallel, communicating over ZeroMQ.
This system has a single centralized scheduler, several workers, and potentially several clients.
Clients send graphs to the central scheduler which farms out those tasks to workers and coordinates the execution of the graph. While the scheduler centralizes metadata, the workers themselves handle transfer of intermediate data in a peer-to-peer fashion. Once the graph completes the workers send data to the scheduler which passes it through to the appropriate user/client.
And so now we can execute our dask graphs in parallel across multiple machines.
Choose Your Scheduler
This graph is small. We didn’t need a distributed network of machines to compute it (a single thread would have been much faster) but this simple example can be easily extended to more important cases, including generic use with the dask collections (Array, Bag, DataFrame). You can control the scheduler with a keyword argument to any compute call.
Alternatively you can set the default scheduler in dask with
We intentionally made the simplest and dumbest distributed scheduler we could think of. Because dask separates graphs from schedulers we can iterate on this problem many times; building better schedulers after learning what is important. This current scheduler learns from our single-memory system but is the first dask scheduler that has to think about distributed memory. As a result it has the following known limitations:
- It does not consider data locality. While linear chains of tasks will execute on the same machine we don’t think much about executing multi-input tasks on nodes where only some of the data is local.
- In particular, this scheduler isn’t optimized for data-local file-systems like HDFS. It’s still happy to read data from HDFS, but this results in unnecessary network communication. We’ve found that it’s great when paired with S3.
- This scheduler is new and hasn’t yet had its tires kicked. Vocal beta users are most welcome.
- We haven’t thought much about deployment. E.g. somehow you need to ssh into a bunch of machines and start up workers, then tear them down when you’re done. Dask.distributed can bootstrap off of an IPython Parallel cluster, and we’ve integrated it into anaconda-cluster but deployment remains a tough problem.
dask.distributed module is available in the last release but I suggest
using the development master branch. There will be another release in early
Blake Griffith has been playing with
dask.bag together on data from
http://githubarchive.org. He plans to write a
blogpost to give a better demonstration of the use of
real world problems. Look for that post in the next week or two.
You can read more about the internal design of
dask.distributed at the
Also thanks to Blake Griffith for serving as original user/developer and for smoothing over the user experience.
blog comments powered by Disqus