Wed 16 September 2015
Analyzing 1.7 Billion Reddit Comments with Blaze and Impala
by Daniel Rodriguez and Kristopher Overholt
Blaze is a Python library and interface to query data on different storage systems. Blaze works by translating a subset of modified NumPy and Pandas-like syntax to databases and other computing systems. Blaze gives Python users a familiar interface to query data living in other data storage systems such as SQL databases, NoSQL data stores, Spark, Hive, Impala, and raw data files such as CSV, JSON, and HDF5. Hive and Impala are distributed SQL engines that can perform queries on data that is stored in the Hadoop Distributed File System (HDFS).
In this post, we'll use Blaze and Impala to interactively query and explore a data set of approximately 1.7 billion comments (975 GB uncompressed) from the reddit website from October 2007 to May 2015. This data set was made available on July 2015 in a reddit post. The data set is in JSON format (one comment per line) and consists of the comment body, author, subreddit, timestamp of creation and other fields.
We downloaded the reddit comment data set, extracted it, and moved it to Amazon S3. We'll create a Hadoop cluster on Amazon EC2, move the data set onto the cluster and into Impala, then query it interactively with Blaze from a client machine (a laptop).
Related posts and tools¶
This post describes the use of Blaze and Impala on a Hadoop cluster. Two related blog posts use Dask with larger-than-memory data sets to efficiently analyze one month of reddit comments on a single machine: Analyzing Reddit Comments with Dask and Castra and ReproduceIt: Reddit word count.
Ibis is a related Python data analysis framework that is currently focused on Impala, but is actively expanding to use other compute engines. Ibis also has the ability to perform queries on an Impala cluster without requiring you to switch back and forth between Python code and the Impala shell.
Dependencies¶
The Python packages required for this analysis can be installed on the client machine using the Anaconda Python distribution.
For convenience, the environment.yml
file shown below contains the information required to create a new Conda environment (named blaze-impala
) and install all of dependencies used in this analysis.
name: blaze-impala
channels:
- blaze
dependencies:
- blaze
- bokeh=0.9
- impyla
- jupyter
- pandas
- pyhive
- python=2.7
To create this environment, create a file named environment.yml
, paste the above contents into this file, then execute the command conda env create
in the same directory as the environment.yml
file. You can then source activate blaze-impala
to work in the new environment.
Cluster Setup¶
The Hive and Impala servers used in this analysis were run on a Hadoop cluster with HDFS, YARN, Hive, Impala, and Jupyter Notebook installed. Anaconda Cluster can create and provision a cluster with these data analysis tools available as plugins.
We used Anaconda Cluster to create an Linux cluster on Amazon EC2 with 8 nodes (m3.2xlarge instances, Ubuntu 14.04) and install the Hive, YARN, Impala, HDFS, and Jupyter Notebook plugins.
Preparing the Data¶
With our Hadoop cluster up and running, we can move the reddit comment data from Amazon S3 to HDFS.
We can SSH into the head node of the cluster and run the following command with valid AWS credentials, which will transfer the reddit comment data (975 GB of JSON data) from a public Amazon S3 bucket to the HDFS data store on the cluster:
hadoop distcp s3n://{{ AWS_KEY }}:{{ AWS_SECRET }}@blaze-data/reddit/json/*/*.json /user/ubuntu
The time required to move the data from Amazon S3 to HDFS was about 1 hour and 45 minutes.
We can run the following command on the head node to download the JSON serializer/deserializer (SerDe) module to enable Hive to read and write in JSON format:
$ wget http://s3.amazonaws.com/elasticmapreduce/samples/hive-ads/libs/jsonserde.jar
On the head node, we open the interactive Hive shell using the hive
command, load the JSON SerDe module, create a table, and load the data into Hive:
$ hive
hive > ADD JAR jsonserde.jar;
hive > CREATE TABLE reddit_json (
archived boolean,
author string,
author_flair_css_class string,
author_flair_text string,
body string,
controversiality int,
created_utc string,
distinguished string,
downs int,
edited boolean,
gilded int,
id string,
link_id string,
name string,
parent_id string,
removal_reason string,
retrieved_on timestamp,
score int,
score_hidden boolean,
subreddit string,
subreddit_id string,
ups int
)
ROW FORMAT
serde 'com.amazon.elasticmapreduce.JsonSerde'
with serdeproperties ('paths'='archived,author,author_flair_css_class,author_flair_text,body,controversiality,created_utc,distinguished,downs,edited,gilded,id,link_id,name,parent_id,removal_reason,retrieved_on,score,score_hidden,subreddit,subreddit_id,ups');
hive > LOAD DATA INPATH '/user/ubuntu/*.json' INTO TABLE reddit_json;
The time required to load the data into Hive was less than 1 minute.
Once the data is loaded in Hive, we can query the data using SQL statements such as SELECT count(*) FROM reddit_json;
, however, the responses will be fairly slow because the data is in JSON format.
Alternatively, we can migrate the data to Parquet format. Apache Parquet is a columnar data store that was designed for HDFS and performs very well in many cases.
We can use the following commands in the interactive Hive shell to create a new table and convert the data to Parquet format:
hive > CREATE TABLE reddit_parquet (
archived boolean,
author string,
author_flair_css_class string,
author_flair_text string,
body string,
controversiality int,
created_utc string,
distinguished string,
downs int,
edited boolean,
gilded int,
id string,
link_id string,
name string,
parent_id string,
removal_reason string,
retrieved_on timestamp,
score int,
score_hidden boolean,
subreddit string,
subreddit_id string,
ups int,
created_utc_t timestamp
)
STORED AS PARQUET;
hive > SET dfs.block.size=1g;
hive > INSERT OVERWRITE TABLE reddit_parquet select *, cast(cast(created_utc as double) as timestamp) as created_utc_t FROM reddit_json;
The time required to convert the data to Parquet format was about 50 minutes.
Note that we added a new column in timestamp format (created_utc_t
) based on the original created_utc
column. The original column was a string
of numbers (timestamp), so first we cast this to a double
and then we cast the resulting double
to a timestamp
.
Finally, we SSH into one of the compute nodes and execute the following command from the interactive Impala shell to update the tables from the Hive metastore.
$ sudo impala-shell
impala> invalidate metadata;
Data Analysis and Queries¶
Now that we've loaded the data into Impala and converted it to Parquet format, we can close the SSH connection to the cluster and work from our laptop and use Blaze to interactively query the data set. Note that we will need the IP addresses of the head node and one of the compute nodes, which are running the Hive and Impala servers, respectively.
First, we import the Blaze, pandas, and Bokeh Python packages.
from __future__ import division, print_function
import blaze as bz
import pandas as pd
import bokeh
from bokeh.charts import TimeSeries, output_notebook, show
output_notebook()
print(bz.__version__)
print(pd.__version__)
print(bokeh.__version__)
Querying the data with Hive¶
We can use Blaze along with the PyHive Python package to query Hive.
First, we set up a connection to Hive and the table that contains the reddit comment data in Parquet format.
data = bz.Data('hive://54.81.249.17/default::reddit_parquet')
We can use Blaze to query Hive and show the types of all columns in the table.
data.dshape
Counting the total number of comments¶
We can count the total number of rows in the data set (that is, the total number of comments) using the following query.
n_posts = data.count()
The previous Blaze expression will not execute anything on Hive. Rather, it is a lazy expression that we can build upon to obtain the results we need from the data. We can also view how this expression translates to SQL using the bz.compute()
syntax.
print(bz.compute(n_posts))
We can then execute the expression on Hive and time the result.
%time int(n_posts)
This query took about 4.5 minutes using Hive, and the result shows that the total number of reddit comments in this data set is about 1.66 billion,
Querying the data with Impala¶
We can also use Blaze along with the Impyla Python package to query Impala, which is another SQL-on-Hadoop engine that has better performance for most queries because it does not rely on MapReduce jobs like Hive.
With Blaze, all of the queries will use the same syntax since they are abstracted to the user via expressions. The only difference in this case is that we will use the IP address of one of the compute nodes in the cluster instead of the head node because this is where the Impala server is running.
First, we set up a connection to Impala and the table that contains the reddit comment data in Parquet format.
data = bz.Data('impala://54.81.251.205/default::reddit_parquet')
Counting the total number of comments¶
Again, we can count the total number of rows in the data set with the following query. Note that the syntax of the Blaze expression is the same for both Hive and Impala.
n_posts = data.count()
Similar to Hive, we can also view how this expression translates to SQL using the bz.compute()
syntax. Note that the SQL syntax is the same for both Hive and Impala.
print(bz.compute(n_posts))
We can then execute the expression on Impala and time the result.
%time int(n_posts)
Note the significant difference in performance between Hive (4.5 minutes) and Impala (5 seconds) for the same query. There are a few seconds of network delay as our client machine communicates with the Hadoop cluster on Amazon EC2, so the actual computation time in Impala is slightly less than 5 seconds.
Counting the total number of upvotes¶
We can perform another simple query to sum all of the upvotes for all of the comments.
n_up_votes = data.ups.sum()
print(bz.compute(n_up_votes))
%time int(n_up_votes)
This query took about 4 seconds, and the result shows that the total number of upvotes in this data set is about 8.72 billion.
Counting the total number of posts in the /r/soccer subreddit¶
Similar to pandas or NumPy, you can filter particular columns from the data using Blaze. For example, we can count the total number of comments in the /r/soccer
subreddit.
n_posts_in_r_soccer = data[data.subreddit == 'soccer'].count()
%time int(n_posts_in_r_soccer)
This query took about 5 seconds, and the result shows that the total number of comments in the /r/soccer
subreddit is about 11.6 million.
Listing the top 10 subreddits with the most comments¶
Group by and sort operations are also supported in Blaze. Suppose we want to identify the subreddits with the most comments. We can group by subreddit, count the number of rows in each group, sort using the new posts
field, and display the first ten subreddits in the sorted list.
top_subreddits = bz.by(data.subreddit, posts=data.count()).sort('posts', ascending=False).head(10)
print(bz.compute(top_subreddits))
To convert the output into a pandas DataFrame, we can use odo, which is available in Blaze using bz.odo
.
%time bz.odo(top_subreddits, pd.DataFrame)
This query took about 28 seconds, and the results show that the subreddit with the most comments was /r/AskReddit
with about 185 million comments.
Counting the number of comments before a specific hour¶
While converting the data from JSON to Parquet format, we added an extra field called created_utc_t
as a TIMESTAMP
type in Impala. Using this column, we can filter comments by specific hours using bz.hour
in Blaze.
before_1pm = data.ups[bz.hour(data.created_utc_t) < 13].count()
%time int(before_1pm)
This query took about 6 seconds, and the result shows that the total number of comments posted before 1 pm is about 739 million (or 45% of the total comments).
Plotting the daily frequency of comments in the /r/IAmA subreddit¶
In a post on the /r/IAmA
subreddit, a person posts details about who they are, and other people can ask the original poster questions about himself/herself.
We can build a query for the information that we need to plot the daily number of comments in the /r/IAmA
subreddit over the entire date range of the data set.
First, we filter comments from the /r/IAmA
subreddit.
iama = data[(data.subreddit == 'IAmA')]
We can view the date of the first few posts on /r/IAmA
.
iama.created_utc_t.sort('created_utc_t').head(5)
We can convert the timestamp field to a continuous day field using the following equation:
(Year - 2007) * 365 + (Month - 1) * 31 + Day
This equation oversimplifies by falsely assuming that all months have 31 days and ignoring the extra days in leap years such as 2012, but it will serve to demonstrate the functionality of Blaze and Impala.
days = (bz.year(iama.created_utc_t) - 2007) * 365 + (bz.month(iama.created_utc_t) - 1) * 31 + bz.day(iama.created_utc_t)
We can reuse the previous expression to add a day
columm to the table using bz.transform
.
with_day = bz.transform(iama, day=days)
We can then group by the new day
column.
by_day = bz.by(with_day.day, posts=with_day.created_utc_t.count())
To convert the output into a pandas DataFrame, we can use odo
, which is available in Blaze using bz.odo
.
%time by_day_result = bz.odo(by_day, pd.DataFrame)
by_day_result = bz.odo(by_day, pd.DataFrame)
We can use pandas to modify the data and sort it by day.
by_day_result = by_day_result.sort(columns=['day'])
We can create a DateRange
in pandas and use this as an index in the time series plot.
rng = pd.date_range('5/28/2009', periods=len(by_day_result), freq='D')
by_day_result.index = rng
Finally, we can use Bokeh to plot the daily number of comments in the /r/IAmA
subreddit from May 2009 (the date of the first comment) to May 2015.
f = TimeSeries(by_day_result.posts, by_day_result.index,
title='Comments in /r/IAmA subreddit',
xlabel='Date', ylabel='Comments',
tools='pan,reset,save,wheel_zoom,box_zoom',
width=600, height=500)
show(f)
Summary¶
Blaze allows us to build queries and interactively explore data sets that exist in different storage systems and formats (Hive and Impala in this post) using a familiar Python interface and a consistent expression system, regardless of the backend.
We were able to run this analysis in a notebook on our local machine and interactively construct Blaze expressions, explore the data set of approximately 1.7 billion comments, and generate interactive plots, whereas the data queries were executed on the cluster nodes using the Hive and Impala distributed SQL engines.
The tools in the Blaze ecosystem (Blaze, odo, datashape, etc.) give us the flexibility of interacting with different remote data storage systems while using a consistent expression syntax and leveraging the performance of Impala and Parquet.