Analyzing Reddit Comments with Dask and Castra

by Jim Crist

The scientific Python ecosystem is great for doing data analysis. Packages like NumPy and Pandas provide an excellent interface to doing complicated computations on datasets. With only a few lines of code one can load some data into a Pandas DataFrame, run some analysis, and generate a plot of the results. However, this workflow starts to falter when working with data that's larger than the RAM on your computer. At this point people often move their workflow from a Python based one into some other larger system like Spark or Hadoop. These are great at what they do, but for small problems are a bit overkill.

Dask is designed to fit the space between in memory tools like NumPy/Pandas and distributed tools like Spark/Hadoop. By using blocked algorithms and the existing Python ecosystem, it's able to work efficiently on large arrays or dataframes - often in parallel. People have been writing about Dask a lot lately. In this blog post we'll show a complete workflow of using Dask to analyze a large dataset. In it we'll cover:

  • Efficient on disk storage using castra
  • Larger than memory dataframes using dask.dataframe
  • Parallelizing computations using python using dask.bag

The Dataset

In July, Reddit user u/Stuck_in_the_Matrix released a torrent of every publically available Reddit comment. The data is stored in monthly bzip2 compressed files of new line delimited json. In this post we'll look at just the data from May 2015 - this is around 5 GB compressed, and 32 GB uncompressed. You can get the data for yourself here.

As a first step, we'll examine one of the json blobs:

In [1]:
from __future__ import division, print_function
from bz2 import BZ2File
import ujson

with BZ2File('RC_2015-05.bz2') as f:
    line = f.readline()
ujson.loads(line)
Out[1]:
{u'archived': False,
 u'author': u'rx109',
 u'author_flair_css_class': None,
 u'author_flair_text': None,
 u'body': u'\u304f\u305d\n\u8aad\u307f\u305f\u3044\u304c\u8cb7\u3063\u305f\u3089\u8ca0\u3051\u306a\u6c17\u304c\u3059\u308b\n\u56f3\u66f8\u9928\u306b\u51fa\u306d\u30fc\u304b\u306a',
 u'controversiality': 0,
 u'created_utc': u'1430438400',
 u'distinguished': None,
 u'downs': 0,
 u'edited': False,
 u'gilded': 0,
 u'id': u'cqug90g',
 u'link_id': u't3_34di91',
 u'name': u't1_cqug90g',
 u'parent_id': u't3_34di91',
 u'removal_reason': None,
 u'retrieved_on': 1432703079,
 u'score': 4,
 u'score_hidden': False,
 u'subreddit': u'soccer_jp',
 u'subreddit_id': u't5_378oi',
 u'ups': 4}

Each blob is composed of 22 fields, and each field holds either some text (the body of the comment), an integer (the number of upvotes), a boolean (whether the comment has been archived), or a timestamp (when the comment was made). In particular, we can see that the json structure is flat, meaning that it could also be represented in a tabular structure (e.g. a csv or database table).

This is important, because bzipped json files are slow to read. Bzip is a good compression algorithm if you want to reduce the disk size of the data, but poor for quick access to the data. Furthermore due to how it's stored, all queries on the dataset will have to:

  1. Decompress the data
  2. Parse the json blob into a python dictionary
  3. Perform the computation

For a single exploration of the dataset, this may be fine, but for repetitive access it'd be much more efficient to convert the data into a better format for quick reads. One such format is Castra. Castra is:

  • A partitioned, on disk column-store

    Storing data by column allow us to only read in data for columns we care about. Partitioning data along the index allows us to ignore irrelevant rows. Together, they reduce the amount of I/O that needs to be done for each query.

  • That uses compression to improve disk access

    Castra uses blosc to compress data, further reducing the amount of needed I/O. Blosc is a modern compressor that is much faster than bzip/gzip.

  • And knows about pandas categoricals

    Storing repetitive strings as categoricals both reduce I/O (Blosc can achieve better compression ratios), and also improves computational efficiency in Pandas.

These design decisions make it ideal for fast access to timeseries data. You can read more about Castra in this blogpost.

Moving to Castra

To move data into Castra, we need to cleanup the json blobs and convert them into consistent datatypes for each column. This includes turning timestamp strings into pandas.Timestamp objects, replacing missing data designators with the appropriate types, and removing a few unneeded fields. We then need to convert these json blobs into pandas.DataFrame objects, and push them into Castra. As our dataset is large, we'll do this in a batched, streaming fashion:

In [2]:
from pandas import Timestamp, NaT, DataFrame
from toolz import dissoc


def to_json(line):
    """Convert a line of json into a cleaned up dict."""
    blob = ujson.loads(line)
    
    # Convert timestamps into Timestamp objects
    date = blob['created_utc']
    blob['created_utc'] = Timestamp.utcfromtimestamp(int(date))
    edited = blob['edited']
    blob['edited'] = Timestamp.utcfromtimestamp(int(edited)) if edited else NaT
    
    # Convert deleted posts into `None`s (missing text data)
    if blob['author'] == '[deleted]':
        blob['author'] = None
    if blob['body'] == '[deleted]':
        blob['body'] = None
        
    # Remove 'id', and 'subreddit_id' as they're redundant
    # Remove 'retrieved_on' as it's irrelevant
    return dissoc(blob, 'id', 'subreddit_id', 'retrieved_on')


columns = ['archived', 'author', 'author_flair_css_class', 'author_flair_text',
           'body', 'controversiality', 'created_utc', 'distinguished', 'downs',
           'edited', 'gilded', 'link_id', 'name', 'parent_id',
           'removal_reason', 'score', 'score_hidden', 'subreddit', 'ups']


def to_df(batch):
    """Convert a list of json strings into a dataframe"""
    blobs = map(to_json, batch)
    df = DataFrame.from_records(blobs, columns=columns)
    return df.set_index('created_utc')

To actually create a Castra, we need to provide a filepath, a "template" dataframe (to get the columns and datatypes from), and a list of columns that should be stored as pandas categoricals. We'll categorize 'distinguished', 'subreddit', and 'removal_reason' because they're objects (no need to categorize integers), and there's a small set of them relative to the length of the series. This will help with both IO and compute performance in pandas.

We can then put all the functions together to iterate through batches of 200,000 blobs and add them to Castra, partitioned every 3 hours. This takes a while:

In [3]:
from castra import Castra
from toolz import peek, partition_all

categories = ['distinguished', 'subreddit', 'removal_reason']

with BZ2File('RC_2015-05.bz2') as f:
    batches = partition_all(200000, f)
    df, frames = peek(map(to_df, batches))
    castra = Castra('reddit_data.castra', template=df, categories=categories)
    castra.extend_sequence(frames, freq='3h')

Querying the Data

Castra's partitioned design makes it work well with Dask. Using dask.dataframe, we can access this large dataset quickly, using a familiar pandas interface:

In [4]:
import dask.dataframe as dd
from dask.diagnostics import ProgressBar

# Start a progress bar for all computations
pbar = ProgressBar()
pbar.register()

# Load data into a dask dataframe:
df = dd.from_castra('reddit_data.castra/')
df.head(3)
[########################################] | 100% Completed |  1.9s
Out[4]:
archived author author_flair_css_class author_flair_text body controversiality distinguished downs edited gilded link_id name parent_id removal_reason score score_hidden subreddit ups
created_utc
2015-05-01 False rx109 None None くそ\n読みたいが買ったら負けな気がする\n図書館に出ねーかな 0 None 0 NaT 0 t3_34di91 t1_cqug90g t3_34di91 None 4 False soccer_jp 4
2015-05-01 False BEE_REAL_ fan vp Virtus.pro Fan Nihilum and LG are significantly better off in... 0 None 0 NaT 0 t3_34gmag t1_cqug90p t3_34gmag None 5 False GlobalOffensive 5
2015-05-01 False WyaOfWade Heat Heat gg this one's over. off to watch the NFL draft... 0 None 0 NaT 0 t3_34g8mx t1_cqug90h t3_34g8mx None 4 False nba 4

Lets start with some simple queries. First, we'll count total comments:

In [5]:
df.ups.count().compute()
[########################################] | 100% Completed |  0.9s
Out[5]:
54504410

So there are around 54.5 million comments, which were counted in around 1 second. This ran significantly faster than the tens of minutes it took going directly from the bzip file, as the data is stored in a format more amenable to quick access.

How about a more complicated query - getting the top 10 subreddits by comments:

In [6]:
df.subreddit.value_counts().nlargest(10).compute()
[########################################] | 100% Completed |  3.3s
Out[6]:
AskReddit          4234970
leagueoflegends    1223184
nba                 756195
funny               745916
pics                630925
nfl                 566656
pcmasterrace        557307
videos              556065
news                548287
todayilearned       519910
dtype: int64

Dask also includes operations for timeseries computations, such as resample. Here we'll resample all comments over the month into 12 hour increments:

In [7]:
from bokeh.charts import TimeSeries, output_notebook, show
output_notebook()
BokehJS successfully loaded.
In [8]:
counts_12h = df.ups.resample('12h', how='count').compute()

# Plot the results
fig = TimeSeries(counts_12h.values, counts_12h.index, title='All Comments',
                 xlabel='Date', ylabel='Comments')
fig.left[0].formatter.precision = 0
show(fig)
[########################################] | 100% Completed |  2.2s

Here you can see the daily differences between hemispheres (Reddit is used more often when the western hemisphere is awake), as well as the increased traffic on weekdays vs weekends.

Another interesting one is a plot of all comments in r/IAmA over time, resampled into hour increments:

In [9]:
ama_by_h = df.subreddit[df.subreddit=='IAmA'].resample('h', how='count').compute()

# Plot the results
fig = TimeSeries(ama_by_h.values, ama_by_h.index, title='Comments in r/IAmA',
                 xlabel='Date', ylabel='Comments')
show(fig)
[########################################] | 100% Completed | 21.3s

From this we can see that there was a huge spike in AmA traffic sometime around May 20th. We can investigate further to get the id of the top AmA, and then use PRAW (the Python reddit API wrapper) to get the title of that post:

In [10]:
# Get the start and end of the most popular hour in IAmA
s, e = ama_by_h.nlargest(2).index

# Select all comments in IAmA in this range
in_range = df.loc[s:e]

# Get the link id for the top post in IAmA in this period
ama_ids = in_range[in_range.subreddit=='IAmA'].link_id
top_ama = ama_ids[ama_ids.str.startswith('t3_')].value_counts().compute().index[0]
[########################################] | 100% Completed |  3.9s
In [11]:
import praw
reddit = praw.Reddit(user_agent='python')
url = 'https://www.reddit.com/r/IAmA/comments/{0}'.format(top_ama[3:])
thread = reddit.get_submission(url)
thread.title
Out[11]:
u'I am Senator Bernie Sanders, Democratic candidate for President of the United States \u2014 AMA'

So the most popular thread in r/IAmA in May 2015 was the AmA with Bernie Sanders.

What about Non-DataFrame Computations?

So far we've shown the ease and speed at which Castra and Dask allow you to query large datasets using a familiar pandas interface. Some calculations can't easily be done with dataframes though, and you need to drop into Python. One such computation is creating word clouds from Reddit Comments, as shown by Daniel Rodriguez in this blog post. Going directly from the bzip file, it took him roughly 23 minutes to create one wordcloud. Here we'll repeat that same computation, but pulling the data from Castra instead.

In [12]:
# Create a set of stopwords to ignore
stopwords = set(["a", "able", "about", "across", "after", "all", "almost",
                 "also", "am", "among", "an", "and", "any", "are", "as", "at",
                 "be", "because", "been", "but", "by", "can", "cannot",
                 "could", "dear", "did", "do", "does", "either", "else",
                 "ever", "every", "for", "from", "get", "got", "had", "has",
                 "have", "he", "her", "hers", "him", "his", "how", "however",
                 "i", "if", "in", "into", "is", "it", "its", "just", "least",
                 "let", "like", "likely", "may", "me", "might", "most",
                 "must", "my", "neither", "no", "nor", "not", "of", "off",
                 "often", "on", "only", "or", "other", "our", "own", "rather",
                 "said", "say", "says", "she", "should", "since", "so",
                 "some", "than", "that", "the", "their", "them", "then",
                 "there", "these", "they", "this", "tis", "to", "too", "twas",
                 "us", "wants", "was", "we", "were", "what", "when", "where",
                 "which", "while", "who", "whom", "why", "will", "with",
                 "would", "yet", "you", "your"])

# Some domain specific words/word-like things to ignore from reddit
stopwords.update(['https', 'http', 'deleted', 'gt'])
stopwords.update(map(str, range(10)))
In [13]:
import re
import nltk

isword = re.compile("^[0-9a-zA-Z]+$").search

def to_words(body):
    """Convert a comment to a list of words."""
    if not body:
        return []
    words = [w.lower() for w in nltk.word_tokenize(body)]
    return [w for w in words if w not in stopwords and isword(w)]

To perform the actual computation, we'll use the dask.bag abstraction, which is a list-like collection good for computing on arbitrary python objects.

In [14]:
import dask.bag as db

# Create a bag of tuples of (subreddit, body)
b = db.from_castra('reddit_data.castra', columns=['subreddit', 'body'], npartitions=8)

# Filter out comments not in r/MachineLearning
matches_subreddit = b.filter(lambda x: x[0] == 'MachineLearning')

# Convert each comment into a list of words, and concatenate
words = matches_subreddit.pluck(1).map(to_words).concat()

# Count the frequencies for each word, and take the top 100
top_words = words.frequencies().topk(100, key=1)
top_words = top_words.compute()
[########################################] | 100% Completed | 34.5s

This took only 34 s, around 40 times faster than going from the bzip file. Storage format for accessing data is critical, and can make a huge difference in compuation speed.

We can make a word cloud of these results using the wordcloud package from Andreas Mueller:

In [15]:
from wordcloud import WordCloud
WordCloud(width=500, height=300).generate_from_frequencies(top_words).to_image()
Out[15]:

Interestingly, it seems the top 2 words in r/MachineLearning are "more" and "data". No one ever says "less data".

Summary

For problems that are too big for NumPy or Pandas to handle, you don't immediately need to move to a distributed system. Often a mix of efficient disk storage and a clever use of blocked algorithms on a single machine can get the job done.

Dask provides the blocked algorithms in familiar pandas.DataFrame and numpy.array like interfaces. Using dask.dataframe we were able to analyze a larger-than-RAM dataset efficiently without switching from a Python based workflow. For computations that don't fit a tabular model, we were able to use dask.bag to work directly with python objects and functions in parallel. All of this was done directly from an IPython Notebook, without any need to setup an external database or distributed system. We believe these tools are a nice fit for many medium-sized problems.


Do Dask & Castra fit your needs? Let us know in the comments below.

comments powered by Disqus