Analyzing 1.7 Billion Reddit Comments with Blaze and Impala

by Daniel Rodriguez and Kristopher Overholt

Blaze is a Python library and interface to query data on different storage systems. Blaze works by translating a subset of modified NumPy and Pandas-like syntax to databases and other computing systems. Blaze gives Python users a familiar interface to query data living in other data storage systems such as SQL databases, NoSQL data stores, Spark, Hive, Impala, and raw data files such as CSV, JSON, and HDF5. Hive and Impala are distributed SQL engines that can perform queries on data that is stored in the Hadoop Distributed File System (HDFS).

In this post, we'll use Blaze and Impala to interactively query and explore a data set of approximately 1.7 billion comments (975 GB uncompressed) from the reddit website from October 2007 to May 2015. This data set was made available on July 2015 in a reddit post. The data set is in JSON format (one comment per line) and consists of the comment body, author, subreddit, timestamp of creation and other fields.

We downloaded the reddit comment data set, extracted it, and moved it to Amazon S3. We'll create a Hadoop cluster on Amazon EC2, move the data set onto the cluster and into Impala, then query it interactively with Blaze from a client machine (a laptop).

This post describes the use of Blaze and Impala on a Hadoop cluster. Two related blog posts use Dask with larger-than-memory data sets to efficiently analyze one month of reddit comments on a single machine: Analyzing Reddit Comments with Dask and Castra and ReproduceIt: Reddit word count.

Ibis is a related Python data analysis framework that is currently focused on Impala, but is actively expanding to use other compute engines. Ibis also has the ability to perform queries on an Impala cluster without requiring you to switch back and forth between Python code and the Impala shell.

Dependencies

The Python packages required for this analysis can be installed on the client machine using the Anaconda Python distribution.

For convenience, the environment.yml file shown below contains the information required to create a new Conda environment (named blaze-impala) and install all of dependencies used in this analysis.

name: blaze-impala
channels:
  - blaze
dependencies:
  - blaze
  - bokeh=0.9
  - impyla
  - jupyter
  - pandas
  - pyhive
  - python=2.7

To create this environment, create a file named environment.yml, paste the above contents into this file, then execute the command conda env create in the same directory as the environment.yml file. You can then source activate blaze-impala to work in the new environment.

Cluster Setup

The Hive and Impala servers used in this analysis were run on a Hadoop cluster with HDFS, YARN, Hive, Impala, and Jupyter Notebook installed. Anaconda Cluster can create and provision a cluster with these data analysis tools available as plugins.

We used Anaconda Cluster to create an Linux cluster on Amazon EC2 with 8 nodes (m3.2xlarge instances, Ubuntu 14.04) and install the Hive, YARN, Impala, HDFS, and Jupyter Notebook plugins.

Preparing the Data

With our Hadoop cluster up and running, we can move the reddit comment data from Amazon S3 to HDFS.

We can SSH into the head node of the cluster and run the following command with valid AWS credentials, which will transfer the reddit comment data (975 GB of JSON data) from a public Amazon S3 bucket to the HDFS data store on the cluster:

hadoop distcp s3n://{{ AWS_KEY }}:{{ AWS_SECRET }}@blaze-data/reddit/json/*/*.json /user/ubuntu

The time required to move the data from Amazon S3 to HDFS was about 1 hour and 45 minutes.

We can run the following command on the head node to download the JSON serializer/deserializer (SerDe) module to enable Hive to read and write in JSON format:

$ wget http://s3.amazonaws.com/elasticmapreduce/samples/hive-ads/libs/jsonserde.jar

On the head node, we open the interactive Hive shell using the hive command, load the JSON SerDe module, create a table, and load the data into Hive:

$ hive
hive > ADD JAR jsonserde.jar;
hive > CREATE TABLE reddit_json (
  archived                 boolean,
  author                   string,
  author_flair_css_class   string,
  author_flair_text        string,
  body                     string,
  controversiality         int,
  created_utc              string,
  distinguished            string,
  downs                    int,
  edited                   boolean,
  gilded                   int,
  id                       string,
  link_id                  string,
  name                     string,
  parent_id                string,
  removal_reason           string,
  retrieved_on             timestamp,
  score                    int,
  score_hidden             boolean,
  subreddit                string,
  subreddit_id             string,
  ups                      int
)
ROW FORMAT
    serde 'com.amazon.elasticmapreduce.JsonSerde'
    with serdeproperties ('paths'='archived,author,author_flair_css_class,author_flair_text,body,controversiality,created_utc,distinguished,downs,edited,gilded,id,link_id,name,parent_id,removal_reason,retrieved_on,score,score_hidden,subreddit,subreddit_id,ups');

hive > LOAD DATA INPATH '/user/ubuntu/*.json' INTO TABLE reddit_json;

The time required to load the data into Hive was less than 1 minute.

Once the data is loaded in Hive, we can query the data using SQL statements such as SELECT count(*) FROM reddit_json;, however, the responses will be fairly slow because the data is in JSON format.

Alternatively, we can migrate the data to Parquet format. Apache Parquet is a columnar data store that was designed for HDFS and performs very well in many cases.

We can use the following commands in the interactive Hive shell to create a new table and convert the data to Parquet format:

hive > CREATE TABLE reddit_parquet (
  archived                 boolean,
  author                   string,
  author_flair_css_class   string,
  author_flair_text        string,
  body                     string,
  controversiality         int,
  created_utc              string,
  distinguished            string,
  downs                    int,
  edited                   boolean,
  gilded                   int,
  id                       string,
  link_id                  string,
  name                     string,
  parent_id                string,
  removal_reason           string,
  retrieved_on             timestamp,
  score                    int,
  score_hidden             boolean,
  subreddit                string,
  subreddit_id             string,
  ups                      int,
  created_utc_t            timestamp
)
STORED AS PARQUET;

hive > SET dfs.block.size=1g;

hive > INSERT OVERWRITE TABLE reddit_parquet select *, cast(cast(created_utc as double) as timestamp) as created_utc_t FROM reddit_json;

The time required to convert the data to Parquet format was about 50 minutes.

Note that we added a new column in timestamp format (created_utc_t) based on the original created_utc column. The original column was a string of numbers (timestamp), so first we cast this to a double and then we cast the resulting double to a timestamp.

Finally, we SSH into one of the compute nodes and execute the following command from the interactive Impala shell to update the tables from the Hive metastore.

$ sudo impala-shell
impala> invalidate metadata;

Data Analysis and Queries

Now that we've loaded the data into Impala and converted it to Parquet format, we can close the SSH connection to the cluster and work from our laptop and use Blaze to interactively query the data set. Note that we will need the IP addresses of the head node and one of the compute nodes, which are running the Hive and Impala servers, respectively.

First, we import the Blaze, pandas, and Bokeh Python packages.

In [1]:
from __future__ import division, print_function

import blaze as bz
import pandas as pd
import bokeh
from bokeh.charts import TimeSeries, output_notebook, show

output_notebook()