##### by Jim Crist

The scientific Python ecosystem is great for doing data analysis. Packages like NumPy and Pandas provide an excellent interface to doing complicated computations on datasets. With only a few lines of code one can load some data into a Pandas DataFrame, run some analysis, and generate a plot of the results. However, this workflow starts to falter when working with data that's larger than the RAM on your computer. At this point people often move their workflow from a Python based one into some other larger system like Spark or Hadoop. These are great at what they do, but for small problems are a bit overkill.

Dask is designed to fit the space between in memory tools like NumPy/Pandas and distributed tools like Spark/Hadoop. By using blocked algorithms and the existing Python ecosystem, it's able to work efficiently on large arrays or dataframes - often in parallel. People have been writing about Dask a lot lately. In this blog post we'll show a complete workflow of using Dask to analyze a large dataset. In it we'll cover:

• Efficient on disk storage using castra
• Larger than memory dataframes using dask.dataframe
• Parallelizing computations using python using dask.bag

## The Dataset¶

In July, Reddit user u/Stuck_in_the_Matrix released a torrent of every publically available Reddit comment. The data is stored in monthly bzip2 compressed files of new line delimited json. In this post we'll look at just the data from May 2015 - this is around 5 GB compressed, and 32 GB uncompressed. You can get the data for yourself here.

As a first step, we'll examine one of the json blobs:

In [1]:
from __future__ import division, print_function
from bz2 import BZ2File
import ujson

with BZ2File('RC_2015-05.bz2') as f:

Out[1]:
{u'archived': False,
u'author': u'rx109',
u'author_flair_css_class': None,
u'author_flair_text': None,
u'controversiality': 0,
u'created_utc': u'1430438400',
u'distinguished': None,
u'downs': 0,
u'edited': False,
u'gilded': 0,
u'id': u'cqug90g',
u'name': u't1_cqug90g',
u'parent_id': u't3_34di91',
u'removal_reason': None,
u'retrieved_on': 1432703079,
u'score': 4,
u'score_hidden': False,
u'subreddit': u'soccer_jp',
u'subreddit_id': u't5_378oi',
u'ups': 4}

Each blob is composed of 22 fields, and each field holds either some text (the body of the comment), an integer (the number of upvotes), a boolean (whether the comment has been archived), or a timestamp (when the comment was made). In particular, we can see that the json structure is flat, meaning that it could also be represented in a tabular structure (e.g. a csv or database table).

This is important, because bzipped json files are slow to read. Bzip is a good compression algorithm if you want to reduce the disk size of the data, but poor for quick access to the data. Furthermore due to how it's stored, all queries on the dataset will have to:

1. Decompress the data
2. Parse the json blob into a python dictionary
3. Perform the computation

For a single exploration of the dataset, this may be fine, but for repetitive access it'd be much more efficient to convert the data into a better format for quick reads. One such format is Castra. Castra is:

• A partitioned, on disk column-store

Storing data by column allow us to only read in data for columns we care about. Partitioning data along the index allows us to ignore irrelevant rows. Together, they reduce the amount of I/O that needs to be done for each query.

• That uses compression to improve disk access

Castra uses blosc to compress data, further reducing the amount of needed I/O. Blosc is a modern compressor that is much faster than bzip/gzip.

• And knows about pandas categoricals

Storing repetitive strings as categoricals both reduce I/O (Blosc can achieve better compression ratios), and also improves computational efficiency in Pandas.

## Moving to Castra¶

To move data into Castra, we need to cleanup the json blobs and convert them into consistent datatypes for each column. This includes turning timestamp strings into pandas.Timestamp objects, replacing missing data designators with the appropriate types, and removing a few unneeded fields. We then need to convert these json blobs into pandas.DataFrame objects, and push them into Castra. As our dataset is large, we'll do this in a batched, streaming fashion:

In [2]:
from pandas import Timestamp, NaT, DataFrame
from toolz import dissoc

def to_json(line):
"""Convert a line of json into a cleaned up dict."""

# Convert timestamps into Timestamp objects
date = blob['created_utc']
blob['created_utc'] = Timestamp.utcfromtimestamp(int(date))
edited = blob['edited']
blob['edited'] = Timestamp.utcfromtimestamp(int(edited)) if edited else NaT

# Convert deleted posts into Nones (missing text data)
if blob['author'] == '[deleted]':
blob['author'] = None
if blob['body'] == '[deleted]':
blob['body'] = None

# Remove 'id', and 'subreddit_id' as they're redundant
# Remove 'retrieved_on' as it's irrelevant
return dissoc(blob, 'id', 'subreddit_id', 'retrieved_on')

columns = ['archived', 'author', 'author_flair_css_class', 'author_flair_text',
'body', 'controversiality', 'created_utc', 'distinguished', 'downs',
'removal_reason', 'score', 'score_hidden', 'subreddit', 'ups']

def to_df(batch):
"""Convert a list of json strings into a dataframe"""
blobs = map(to_json, batch)
df = DataFrame.from_records(blobs, columns=columns)
return df.set_index('created_utc')


To actually create a Castra, we need to provide a filepath, a "template" dataframe (to get the columns and datatypes from), and a list of columns that should be stored as pandas categoricals. We'll categorize 'distinguished', 'subreddit', and 'removal_reason' because they're objects (no need to categorize integers), and there's a small set of them relative to the length of the series. This will help with both IO and compute performance in pandas.

We can then put all the functions together to iterate through batches of 200,000 blobs and add them to Castra, partitioned every 3 hours. This takes a while:

In [3]:
from castra import Castra
from toolz import peek, partition_all

categories = ['distinguished', 'subreddit', 'removal_reason']

with BZ2File('RC_2015-05.bz2') as f:
batches = partition_all(200000, f)
df, frames = peek(map(to_df, batches))
castra = Castra('reddit_data.castra', template=df, categories=categories)
castra.extend_sequence(frames, freq='3h')


## Querying the Data¶

Castra's partitioned design makes it work well with Dask. Using dask.dataframe, we can access this large dataset quickly, using a familiar pandas interface:

In [4]:
import dask.dataframe as dd

# Start a progress bar for all computations
pbar = ProgressBar()
pbar.register()

df = dd.from_castra('reddit_data.castra/')

[########################################] | 100% Completed |  1.9s

Out[4]:
archived author author_flair_css_class author_flair_text body controversiality distinguished downs edited gilded link_id name parent_id removal_reason score score_hidden subreddit ups
created_utc
2015-05-01 False rx109 None None くそ\n読みたいが買ったら負けな気がする\n図書館に出ねーかな 0 None 0 NaT 0 t3_34di91 t1_cqug90g t3_34di91 None 4 False soccer_jp 4
2015-05-01 False BEE_REAL_ fan vp Virtus.pro Fan Nihilum and LG are significantly better off in... 0 None 0 NaT 0 t3_34gmag t1_cqug90p t3_34gmag None 5 False GlobalOffensive 5
2015-05-01 False WyaOfWade Heat Heat gg this one's over. off to watch the NFL draft... 0 None 0 NaT 0 t3_34g8mx t1_cqug90h t3_34g8mx None 4 False nba 4

In [5]:
df.ups.count().compute()

[########################################] | 100% Completed |  0.9s

Out[5]:
54504410

So there are around 54.5 million comments, which were counted in around 1 second. This ran significantly faster than the tens of minutes it took going directly from the bzip file, as the data is stored in a format more amenable to quick access.

How about a more complicated query - getting the top 10 subreddits by comments:

In [6]:
df.subreddit.value_counts().nlargest(10).compute()

[########################################] | 100% Completed |  3.3s

Out[6]:
AskReddit          4234970
leagueoflegends    1223184
nba                 756195
funny               745916
pics                630925
nfl                 566656
pcmasterrace        557307
videos              556065
news                548287
todayilearned       519910
dtype: int64

Dask also includes operations for timeseries computations, such as resample. Here we'll resample all comments over the month into 12 hour increments:

In [7]:
from bokeh.charts import TimeSeries, output_notebook, show
output_notebook()