This work is supported by Continuum Analytics and the XDATA Program as part of the Blaze Project

A screencast version of this post is available here: https://youtu.be/KGlhU9kSfVk

Summary

Copy-pasting the following commands gives you a Dask cluster on EC2.

pip install dec2
dec2 up --keyname YOUR-AWS-KEY-NAME
        --keypair ~/.ssh/YOUR-AWS-KEY-FILE.pem
        --count 9   # Provision nine nodes
        --nprocs 8  # Use eight separate worker processes per node

dec2 ssh            # SSH into head node
ipython             # Start IPython console on head node
from distributed import Executor, s3, progress
e = Executor('127.0.0.1:8786')
df = s3.read_csv('dask-data/nyc-taxi/2015', lazy=False)
progress(df)
df.head()

You will have to use your own AWS credentials, but you’ll get fast distributed Pandas access on the NYCTaxi data across a cluster, loaded from S3.

Motivation

Reducing barriers to entry enables curious play.

Curiosity drives us to play with new tools. We love the idea that previously difficult tasks will suddenly become easy, expanding our abilities and opening up a range of newly solvable problems.

However, as our problems grow more complex our tools grow more cumbersome and setup costs increase. This cost stops us from playing around, which is a shame, because playing is good both for the education of the user and for the development of the tool. Tool makers who want feedback are strongly incentivized to decrease setup costs, especially for the play case.

In February we introduced dask.distributed, a lightweight distributed computing framework for Python. We focused on processing data with high level abstractions like dataframes and arrays in the following blogposts:

  1. Analyze GitHub JSON record data in S3
  2. Use Dask DataFrames on CSV data in HDFS
  3. Process NetCDF data with Dask arrays on a traditional cluster

Today we present a simple setup script to launch dask.distributed on EC2, enabling any user with AWS credentials to repeat these experiments easily.

dec2

Devops tooling and EC2 to the rescue

DEC2 does the following:

  1. Provisions nodes on EC2 using your AWS credentials
  2. Installs Anaconda on those nodes
  3. Deploys a dask.distributed Scheduler on the head node and Workers on the rest of the nodes
  4. Helps you to SSH into the head node or connect from your local machine
$ pip install dec2
$ dec2 up --help
Usage: dec2 up [OPTIONS]

Options:
  --keyname TEXT                Keyname on EC2 console  [required]
  --keypair PATH                Path to the keypair that matches the keyname [required]
  --name TEXT                   Tag name on EC2
  --region-name TEXT            AWS region  [default: us-east-1]
  --ami TEXT                    EC2 AMI  [default: ami-d05e75b8]
  --username TEXT               User to SSH to the AMI  [default: ubuntu]
  --type TEXT                   EC2 Instance Type  [default: m3.2xlarge]
  --count INTEGER               Number of nodes  [default: 4]
  --security-group TEXT         Security Group Name  [default: dec2-default]
  --volume-type TEXT            Root volume type  [default: gp2]
  --volume-size INTEGER         Root volume size (GB)  [default: 500]
  --file PATH                   File to save the metadata  [default: cluster.yaml]
  --provision / --no-provision  Provision salt on the nodes  [default: True]
  --dask / --no-dask            Install Dask.Distributed in the cluster [default: True]
  --nprocs INTEGER              Number of processes per worker  [default: 1]
  -h, --help                    Show this message and exit.

Note: dec2 was largely built by Daniel Rodriguez

Run

As an example we use dec2 to create a new cluster of nine nodes. Each worker will run with eight processes, rather than using threads.

dec2 up --keyname my-key-name
        --keypair ~/.ssh/my-key-file.pem
        --count 9       # Provision nine nodes
        --nprocs 8      # Use eight separate worker processes per node

Connect

We ssh into the head node and start playing in an IPython terminal:

localmachine:~$ dec2 ssh                          # SSH into head node
ec2-machine:~$ ipython                            # Start IPython console
In [1]: from distributed import Executor, s3, progress
In [2]: e = Executor('127.0.0.1:8786')
In [3]: e
Out[3]: <Executor: scheduler=127.0.0.1:8786 workers=64 threads=64>

Notebooks

Alternatively we set up a globally visible Jupyter notebook server:

localmachine:~$ dec2 dask-distributed address    # Get location of head node
Scheduler Address: XXX:XXX:XXX:XXX:8786

localmachine:~$ dec2 ssh                          # SSH into head node
ec2-machine:~$ jupyter notebook --ip="*"          # Start Jupyter Server

Then navigate to http://XXX:XXX:XXX:XXX:8888 from your local browser. Note, this method is not secure, see Jupyter docs for a better solution.

Public datasets

We repeat the experiments from our earlier blogpost on NYCTaxi data.

df = s3.read_csv('dask-data/nyc-taxi/2015', lazy=False)
progress(df)
df.head()
VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count trip_distance pickup_longitude pickup_latitude RateCodeID store_and_fwd_flag dropoff_longitude dropoff_latitude payment_type fare_amount extra mta_tax tip_amount tolls_amount improvement_surcharge total_amount
0 2 2015-01-15 19:05:39 2015-01-15 19:23:42 1 1.59 -73.993896 40.750111 1 N -73.974785 40.750618 1 12.0 1.0 0.5 3.25 0 0.3 17.05
1 1 2015-01-10 20:33:38 2015-01-10 20:53:28 1 3.30 -74.001648 40.724243 1 N -73.994415 40.759109 1 14.5 0.5 0.5 2.00 0 0.3 17.80
2 1 2015-01-10 20:33:38 2015-01-10 20:43:41 1 1.80 -73.963341 40.802788 1 N -73.951820 40.824413 2 9.5 0.5 0.5 0.00 0 0.3 10.80
3 1 2015-01-10 20:33:39 2015-01-10 20:35:31 1 0.50 -74.009087 40.713818 1 N -74.004326 40.719986 2 3.5 0.5 0.5 0.00 0 0.3 4.80
4 1 2015-01-10 20:33:39 2015-01-10 20:52:58 1 3.00 -73.971176 40.762428 1 N -74.004181 40.742653 2 15.0 0.5 0.5 0.00 0 0.3 16.30

Acknowledgments

The dec2 startup script is largely the work of Daniel Rodriguez. Daniel usually works on Anaconda for cluster management which is normally part of an Anaconda subscription but is also free for moderate use (4 nodes.) This does things similar to dec2, but much more maturely.

DEC2 was inspired by the excellent spark-ec2 setup script, which is how most Spark users, myself included, were first able to try out the library. The spark-ec2 script empowered many new users to try out a distributed system for the first time.

The S3 work was largely done by Hussain Sultan (Capital One) and Martin Durant (Continuum).

What didn’t work

  • Originally we automatically started an IPython console on the local machine and connected it to the remote scheduler. This felt slick, but was error prone due to mismatches between the user’s environment and the remote cluster’s environment.
  • It’s tricky to replicate functionality that’s part of a proprietary product (Anaconda for cluster management.) Fortunately, Continuum management has been quite supportive.
  • There aren’t many people in the data science community who know Salt, the system that backs dec2. I expect maintenance to be a bit tricky moving forward, especially during periods when Daniel and other developers are occupied.

blog comments powered by Disqus