Dask Release 0.14.1
This work is supported by Continuum Analytics, the XDATA Program, and the Data Driven Discovery Initiative from the Moore Foundation.
I’m pleased to announce the release of Dask version 0.14.1. This release contains a variety of performance and feature improvements. This blogpost includes some notable features and changes since the last release on February 27th.
As always you can conda install from conda-forge
conda install -c conda-forge dask distributed
or you can pip install from PyPI
pip install dask[complete] --upgrade
Arrays
Recent work in distributed computing and machine learning have motivated new performance-oriented and usability changes to how we handle arrays.
Automatic chunking and operation on NumPy arrays
Many interactions between Dask arrays and NumPy arrays work smoothly. NumPy arrays are made lazy and are appropriately chunked to match the operation and the Dask array.
>>> x = np.ones(10) # a numpy array
>>> y = da.arange(10, chunks=(5,)) # a dask array
>>> z = x + y # combined become a dask.array
>>> z
dask.array<add, shape=(10,), dtype=float64, chunksize=(5,)>
>>> z.compute()
array([ 1., 2., 3., 4., 5., 6., 7., 8., 9., 10.])
Reshape
Reshaping distributed arrays is simple in simple cases, and can be quite complex in complex cases. Reshape now supports a much more broad set of shape transformations where any dimension is collapsed or merged to other dimensions.
>>> x = da.ones((2, 3, 4, 5, 6), chunks=(2, 2, 2, 2, 2))
>>> x.reshape((6, 2, 2, 30, 1))
dask.array<reshape, shape=(6, 2, 2, 30, 1), dtype=float64, chunksize=(3, 1, 2, 6, 1)>
This operation ends up being quite useful in a number of distributed array cases.
Optimize Slicing to Minimize Communication
Dask.array slicing optimizations are now careful to produce graphs that avoid situations that could cause excess inter-worker communication. The details of how they do this is a bit out of scope for a short blogpost, but the history here is interesting.
Historically dask.arrays were used almost exclusively by researchers with large on-disk arrays stored as HDF5 or NetCDF files. These users primarily used the single machine multi-threaded scheduler. We heavily tailored Dask array optimizations to this situation and made that community pretty happy. Now as some of that community switches to cluster computing on larger datasets the optimization goals shift a bit. We have tons of distributed disk bandwidth but really want to avoid communicating large results between workers. Supporting both use cases is possible and I think that we’ve achieved that in this release so far, but it’s starting to require increasing levels of care.
Micro-optimizations
With distributed computing also comes larger graphs and a growing importance of graph-creation overhead. This has been optimized somewhat in this release. We expect this to be a focus going forward.
DataFrames
Set_index
Set_index is smarter in two ways:
- If you set_index on a column that happens to be sorted then we’ll identify
that and avoid a costly shuffle. This was always possible with the
sorted=
keyword but users rarely used this feature. Now this is automatic. - Similarly when setting the index we can look at the size of the data and determine if there are too many or too few partitions and rechunk the data while shuffling. This can significantly improve performance if there are too many partitions (a common case).
Shuffle performance
We’ve micro-optimized some parts of dataframe shuffles. Big thanks to the Pandas developers for the help here. This accelerates set_index, joins, groupby-applies, and so on.
Fastparquet
The fastparquet library has seen a lot of use lately and has undergone a number of community bugfixes.
Importantly, Fastparquet now supports Python 2.
We strongly recommend Parquet as the standard data storage format for Dask dataframes (and Pandas DataFrames).
Distributed Scheduler
Replay remote exceptions
Debugging is hard in part because exceptions happen on remote machines where
normal debugging tools like pdb
can’t reach. Previously we were able to
bring back the traceback and exception, but you couldn’t dive into the stack
trace to investigate what went wrong:
def div(x, y):
return x / y
>>> future = client.submit(div, 1, 0)
>>> future
<Future: status: error, key: div-4a34907f5384bcf9161498a635311aeb>
>>> future.result() # getting result re-raises exception locally
<ipython-input-3-398a43a7781e> in div()
1 def div(x, y):
----> 2 return x / y
ZeroDivisionError: division by zero
Now Dask can bring a failing task and all necessary data back to the local machine and rerun it so that users can leverage the normal Python debugging toolchain.
>>> client.recreate_error_locally(future)
<ipython-input-3-398a43a7781e> in div(x, y)
1 def div(x, y):
----> 2 return x / y
ZeroDivisionError: division by zero
Now if you’re in IPython or a Jupyter notebook you can use the %debug
magic
to jump into the stacktrace, investigate local variables, and so on.
In [8]: %debug
> <ipython-input-3-398a43a7781e>(2)div()
1 def div(x, y):
----> 2 return x / y
ipdb> pp x
1
ipdb> pp y
0
Async/await syntax
Dask.distributed uses Tornado for network communication and Tornado coroutines for concurrency. Normal users rarely interact with Tornado coroutines; they aren’t familiar to most people so we opted instead to copy the concurrent.futures API. However some complex situations are much easier to solve if you know a little bit of async programming.
Fortunately, the Python ecosystem seems to be embracing this change towards native async code with the async/await syntax in Python 3. In an effort to motivate people to learn async programming and to gently nudge them towards Python 3 Dask.distributed we now support async/await in a few cases.
You can wait on a dask Future
async def f():
future = client.submit(func, *args, **kwargs)
result = await future
You can put the as_completed
iterator into an async for loop
async for future in as_completed(futures):
result = await future
... do stuff with result ...
And, because Tornado supports the await protocols you can also use the existing shadow concurrency API (everything prepended with an underscore) with await. (This was doable before.)
results = client.gather(futures) # synchronous
...
results = await client._gather(futures) # asynchronous
If you’re in Python 2 you can always do this with normal yield
and
the tornado.gen.coroutine
decorator.
Inproc transport
In the last release we enabled Dask to communicate over more things than just TCP. In practice this doesn’t come up (TCP is pretty useful). However in this release we now support single-machine “clusters” where the clients, scheduler, and workers are all in the same process and transfer data cost-free over in-memory queues.
This allows the in-memory user community to use some of the more advanced features (asynchronous computation, spill-to-disk support, web-diagnostics) that are only available in the distributed scheduler.
This is on by default if you create a cluster with LocalCluster without using Nanny processes.
>>> from dask.distributed import LocalCluster, Client
>>> cluster = LocalCluster(nanny=False)
>>> client = Client(cluster)
>>> client
<Client: scheduler='inproc://192.168.1.115/8437/1' processes=1 cores=4>
>>> from threading import Lock # Not serializable
>>> lock = Lock() # Won't survive going over a socket
>>> [future] = client.scatter([lock]) # Yet we can send to a worker
>>> future.result() # ... and back
<unlocked _thread.lock object at 0x7fb7f12d08a0>
Connection pooling for inter-worker communications
Workers now maintain a pool of sustained connections between each other. This pool is of a fixed size and removes connections with a least-recently-used policy. It avoids re-connection delays when transferring data between workers. In practice this shaves off a millisecond or two from every communication.
This is actually a revival of an old feature that we had turned off last year when it became clear that the performance here wasn’t a problem.
Along with other enhancements, this takes our round-trip latency down to 11ms on my laptop.
In [10]: %%time
...: for i in range(1000):
...: future = client.submit(inc, i)
...: result = future.result()
...:
CPU times: user 4.96 s, sys: 348 ms, total: 5.31 s
Wall time: 11.1 s
There may be room for improvement here though. For comparison here is the same
test with the concurent.futures.ProcessPoolExecutor
.
In [14]: e = ProcessPoolExecutor(8)
In [15]: %%time
...: for i in range(1000):
...: future = e.submit(inc, i)
...: result = future.result()
...:
CPU times: user 320 ms, sys: 56 ms, total: 376 ms
Wall time: 442 ms
Also, just to be clear, this measures total roundtrip latency, not overhead. Dask’s distributed scheduler overhead remains in the low hundreds of microseconds.
Related Projects
There has been activity around Dask and machine learning:
- dask-learn is undergoing some performance enhancements. It turns out that when you offer distributed grid search people quickly want to scale up their computations to hundreds of thousands of trials.
- dask-glm now has a few decent algorithms for convex optimization. The authors of this wrote a blogpost very recently if you’re interested: Developing Convex Optimization Algorithms in Dask
- dask-xgboost lets you hand off distributed data in Dask dataframes or arrays and hand it directly to a distributed XGBoost system (that Dask will nicely set up and tear down for you). This was a nice example of easy hand-off between two distributed services running in the same processes.
Acknowledgements
The following people contributed to the dask/dask repository since the 0.14.0 release on February 27th
- Antoine Pitrou
- Brian Martin
- Elliott Sales de Andrade
- Erik Welch
- Francisco de la Peña
- jakirkham
- Jim Crist
- Jitesh Kumar Jha
- Julien Lhermitte
- Martin Durant
- Matthew Rocklin
- Markus Gonser
- Talmaj
The following people contributed to the dask/distributed repository since the 1.16.0 release on February 27th
- Antoine Pitrou
- Ben Schreck
- Elliott Sales de Andrade
- Martin Durant
- Matthew Rocklin
- Phil Elson
blog comments powered by Disqus