Profiling Data Throughput
This work is supported by Continuum Analytics and the XDATA Program as part of the Blaze Project
Disclaimer: This post is on experimental/buggy code.
tl;dr We measure the costs of processing semi-structured data like JSON blobs.
Semi-structured Data
Semi-structured data is ubiquitous and computationally painful. Consider the following JSON blobs:
{'name': 'Alice', 'payments': [1, 2, 3]}
{'name': 'Bob', 'payments': [4, 5]}
{'name': 'Charlie', 'payments': None}
This data doesn’t fit nicely into NumPy or Pandas and so we fall back to dynamic pure-Python data structures like dicts and lists. Python’s core data structures are surprisingly good, about as good as compiled languages like Java, but dynamic data structures present some challenges for efficient parallel computation.
Volume
Semi-structured data is often at the beginning of our data pipeline and so often has the greatest size. We may start with 100GB of raw data, reduce to 10GB to load into a database, and finally aggregate down to 1GB for analysis, machine learning, etc., 1kB of which becomes a plot or table.
Data Bandwidth (MB/s) | In Parallel (MB/s) | |
---|---|---|
Disk I/O | 500 | 500 |
Decompression | 100 | 500 |
Deserialization | 50 | 250 |
In-memory computation | 2000 | oo |
Shuffle | 9 | 30 |
Common solutions for large semi-structured data include Python iterators,
multiprocessing, Hadoop, and Spark as well as proper databases like MongoDB and
ElasticSearch. Two months
ago we built
dask.bag
, a toy dask experiment for semi-structured data. Today we’ll
strengthen the dask.bag
project and look more deeply at performance in this
space.
We measure performance with data bandwidth, usually in megabytes per second (MB/s). We’ll build intuition for why dealing with this data is costly.
Dataset
As a test dataset we play with a dump of GitHub data from https://www.githubarchive.org/. This data records every public github event (commit, comment, pull request, etc.) in the form of a JSON blob. This data is representative fairly representative of a broader class of problems. Often people want to do fairly simple analytics, like find the top ten committers to a particular repository, or clean the data before they load it into a database.
We’ll play around with this data using dask.bag
. This is both to get a feel
for what is expensive and to provide a cohesive set of examples. In truth we
won’t do any real analytics on the github dataset, we’ll find that the
expensive parts come well before analytic computation.
Items in our data look like this:
Disk I/O and Decompression – 100-500 MB/s
Data Bandwidth (MB/s) | |
---|---|
Read from disk with open | 500 |
Read from disk with gzip.open | 100 |
Parallel Read from disk with gzip.open | 500 |
A modern laptop hard drive can theoretically read data from disk to memory at 800 MB/s. So we could burn through a 10GB dataset in fifteen seconds on our laptop. Workstations with RAID arrays can do a couple GB/s. In practice I get around 500 MB/s on my personal laptop.
To reduce storage and transfer costs we often compress data. This requires CPU effort whenever we want to operate on the stored values. This can limit data bandwidth.
So we lose some data bandwidth through compression. Where we could previously process 500 MB/s we’re now down to only 100 MB/s. If we count bytes in terms of the amount stored on disk then we’re only hitting 18 MB/s. We’ll get around this with multiprocessing.
Decompression and Parallel processing – 500 MB/s
Fortunately we often have more cores than we know what to do with. Parallelizing reads can hide much of the decompression cost.
Dask.bag infers that we need to use gzip from the filename. Dask.bag currently
uses multiprocessing
to distribute work, allowing us to reclaim our 500 MB/s
throughput on compressed data. We also could have done this with
multiprocessing, straight Python, and a little elbow-grease.
Deserialization – 30 MB/s
Data Bandwidth (MB/s) | |
---|---|
json.loads | 30 |
ujson.loads | 50 |
Parallel ujson.loads | 150 |
Once we decompress our data we still need to turn bytes into meaningful data structures (dicts, lists, etc..) Our GitHub data comes to us as JSON. This JSON contains various encodings and bad characters so, just for today, we’re going to punt on bad lines. Converting JSON text to Python objects explodes out in memory a bit, so we’ll consider a smaller subset for this part, a single day.
So in terms of actual bytes of JSON we can only convert about 30MB per second. If we count in terms of the compressed data we store on disk then this looks more bleak at only 6 MB/s.
This can be improved by using faster libraries – 50 MB/s
The ultrajson library, ujson
, is pretty
slick and can improve our performance a bit. This is what Pandas uses under
the hood.
Or through Parallelism – 150 MB/s
This can also be accelerated through parallelism, just like decompression. It’s a bit cumbersome to show parallel deserializaiton in isolation. Instead we’ll show all of them together. This will under-estimate performance but is much easier to code up.
Mapping and Grouping - 2000 MB/s
Data Bandwidth (MB/s) | |
---|---|
Simple Python operations | 1400 |
Complex CyToolz operations | 2600 |
Once we have data in memory, Pure Python is relatively fast. Cytoolz moreso.
So slicing and logic are essentially free. The cost of compression and deserialization dominates actual computation time. Don’t bother optimizing fast per-record code, especially if CyToolz has already done so for you. Of course, you might be doing something expensive per record. If so then most of this post isn’t relevant for you.
Shuffling - 5-50 MB/s
Data Bandwidth (MB/s) | |
---|---|
Naive groupby with on-disk Shuffle | 25 |
Clever foldby without Shuffle | 250 |
For complex logic, like full groupbys and joins, we need to communicate large amounts of data between workers. This communication forces us to go through another full serialization/write/deserialization/read cycle. This hurts. And so, the single most important message from this post:
Avoid communication-heavy operations on semi-structured data. Structure your data and load into a database instead.
That being said, people will inevitably ignore this advice so we need to have a not-terrible fallback.
This groupby operation goes through the following steps:
- Read from disk
- Decompress GZip
- Deserialize with
ujson
- Do in-memory groupbys on chunks of the data
- Reserialize with
msgpack
(a bit faster) - Append group parts to disk
- Read in new full groups from disk
- Deserialize
msgpack
back to Python objects - Apply length function per group
Some of these steps have great data bandwidths, some less-so.
When we compound many steps together our bandwidth suffers.
We get about 25 MB/s total. This is about what pyspark gets (although today
pyspark
can parallelize across multiple machines while dask.bag
can not.)
Disclaimer, the numbers above are for dask.bag
and could very easily be
due to implementation flaws, rather than due to inherent challenges.
I would be interested in hearing from people who use full groupby on BigData. I’m quite curious to hear how this is used in practice and how it performs.
Creative Groupbys - 250 MB/s
Don’t use groupby. You can often work around it with cleverness. Our example above can be handled with streaming grouping reductions (see toolz docs.) This requires more thinking from the programmer but avoids the costly shuffle process.
We can also spell this with PySpark which performs about the same.
Use a Database
By the time you’re grouping or joining datasets you probably have structured data that could fit into a dataframe or database. You should transition from dynamic data structures (dicts/lists) to dataframes or databases as early as possible. DataFrames and databases compactly represent data in formats that don’t require serialization; this improves performance. Databases are also very clever about reducing communication.
Tools like pyspark
, toolz
, and dask.bag
are great for initial cleanings
of semi-structured data into a structured format but they’re relatively
inefficient at complex analytics. For inconveniently large data you should
consider a database as soon as possible. That could be some big-data-solution
or often just Postgres.
Better data structures for semi-structured data?
Dynamic data structures (dicts, lists) are overkill for semi-structured data. We don’t need or use their full power but we inherit all of their limitations (e.g. serialization costs.) Could we build something NumPy/Pandas-like that could handle the blob-of-JSON use-case? Probably.
DyND is one such project. DyND is a C++ project with Python bindings written by Mark Wiebe and Irwin Zaid and historically funded largely by Continuum and XData under the same banner as Blaze/Dask. It could probably handle the semi-structured data problem case if given a bit of love. It handles variable length arrays, text data, and missing values all with numpy-like semantics:
Sadly DyND has functionality gaps which limit usability.
I would like to see DyND mature to the point where it could robustly handle
semi-structured data. I think that this would be a big win for productivity
that would make projects like dask.bag
and pyspark
obsolete for a large
class of use-cases. If you know Python, C++, and would like to help DyND grow
I’m sure that Mark and Irwin would love the help
Comparison with PySpark
Dask.bag pros:
- Doesn’t engage the JVM, no heap errors or fiddly flags to set
- Conda/pip installable. You could have it in less than twenty seconds from now.
- Slightly faster in-memory implementations thanks to
cytoolz
; this isn’t important though - Good handling of lazy results per-partition
- Faster / lighter weight start-up times
- (Subjective) I find the API marginally cleaner
PySpark pros:
- Supports distributed computation (this is obviously huge)
- More mature, more filled out API
- HDFS integration
Dask.bag reinvents a wheel; why bother?
- Given the machinery inherited from
dask.array
andtoolz
,dask.bag
is very cheap to build and maintain. It’s around 500 significant lines of code. - PySpark throws Python processes inside a JVM ecosystem which can cause some confusion among users and a performance hit. A task scheduling system in the native code ecosystem would be valuable.
- Comparison and competition is healthy
- I’ve been asked to make a distributed array. I suspect that distributed bag is a good first step.
blog comments powered by Disqus