Guide for Analysing Twitter Data in Hadoop

If a social network is a 'network of social interactions and personal relationships', then it's easy to understand how an online social network can be looked at simply as a vast source of data.

If every interaction on a social network, no matter how small, is considered a data point, then the amount of data available to be analysed is enormous. The possibilities this data can provide to organisations is limitless, but one of the main challenges lies in making the data available to analysts. Also, with the data being renewed constantly, it's vital that the right systems are put into place in order to give analysts access to up-to-date data. If slow, cumbersome processes are being used, then the data is already out of date by the time it is being analysed.

To demonstrate some of the capabilities of Red Sqirl, we've put together this guide to show how to access vast amounts of data from a social network in real-time and perform an analysis on it.

We will be pulling data from Twitter and performing an ongoing sentiment analysis on it. The goal being to have Red Sqirl produce a report every hour summing up the sentiment of enormous amounts of up-to-date twitter activity.

This document is a guide and not a step by step tutorial, the point of which is to show the capabilities of Red Sqirl. If you plan on recreating our technique please make sure you understand everything you run, including the scripts.

For our analysis, we will track eight basic emotions (anger, fear, anticipation, trust, surprise, sadness, joy, and disgust) and two sentiments (negative and positive). We will take tweets organised by a given topic and track words from 41 different languages.

The steps involved for this process are as follows:

Step 1. Twitter streaming
Step 2. Load the data
Step 3. Calculate the sentiment per tweet
Step 4. Create your output
Step 5. Schedule the hourly job
Step 6. Create a daily job
Step 7. Schedule a daily job
Step 8. Save and run

Prerequisite

We’ll assume the following is available to you:

We assume you have a working knowledge of:

We will now start by setting up the environment.

Step 1 Twitter Streaming

We will use the Twitter streaming API for pulling the tweets and Apache flume for transferring them to HDFS orderly. We will download the data in the JSON form, please see here for an example of twitter JSON.

API Limitations

Twitter offers a maximum bandwidth of 1% to their tweets in streaming mode. You can either filter the tweets, to get all the tweets related to a set of keywords or a 1% random sample.

Create your OAUTH credentials

For using the streaming API, you will have to generate your credentials.

  1. Go to twitter apps and register your application.
  2. Go to permissions and select Read Only.

Keep your browser window with your credentials open for a minute, we’ll use them in the next step.

Setup the streaming
  1. Download Apache Flume latest version binary tar.
  2. Copy flume tar file into your edge node
  3. Untar it, we will assume you have the directory /opt/apache-flume-1.6.0-bin next steps.
tar -xzf apache-flume-1.6.0-bin.tar.gz
  1. Download the idiro twitter streaming project, you can find the code on github if you’re interested
  2. Copy the twitter streaming tar file into your edge node
  3. Untar it, we will assume you have the directory /opt/streaming-0.0.1 next steps.
tar -xzf streaming-0.0.1.tar.gz
  1. Configure the file example.properties
  2. Configure the file flume-hdfs-copy.conf
  3. Go in the executable and change the top section
###############################################################
#Change me

#Twitter Streaming vars
#PROXY_PORT=3000
#PROXY_HOST=myproxy.local.net
PROPERTY_FILE=example.properties

#FLUME VARS
FLUME_HOME=~/apache-flume-1.6.0-bin
FLUME_EXEC_CONF=flume-hdfs-copy.conf
###############################################################
  1. We will assume in the next steps that flume will write into the folder ‘/share/twitter_json/%y%M%d/%H’
  2. Execute kickoff-streaming.sh
$cd /opt/streaming-0.0.1
$./kickoff-streaming.sh
  1. Wait a minute or so and check if the configured flume folder has been created in HDFS.

Notes:

  1. Apache Flume has a twitter-source action by default. It is doing exclusively sampling. But more importantly, problems can arise when trying to read the data generated in Hive (see here).
  2. CDH also provides twitter sampling plugins, but there are library conflicts with the official flume version 1.6.0

The NRC Dictionary

When running a sentiment analysis you rely on a dictionary, the more accurate your dictionary, the better the result. In this sentiment analysis, we will use the NRC Emotion Lexicon. Please refer to the NRC website for their terms & conditions. The NRC Emotion Lexicon is a list of English words and their associations with eight basic emotions (anger, fear, anticipation, trust, surprise, sadness, joy, and disgust) and two sentiments (negative and positive). The NRC also provides translations for languages other than English. So we'll add those words to our dictionary as well. We're unsure of the quality of those dictionaries and if the method of extracting words from tweets and comparing them with the dictionary is relevant or not for that language. If one of those languages is important for your analysis you should check this yourself.

We’ve created a file to load those dictionaries easily into a Hive Database.

  1. Download the tar file provided.
  2. Copy the tar file to your edge node and untar it
  3. Check the script “load_into_hive.sql”
  4. Execute the script load_into_hive.sql
  5. Check if the nrc_lexicon table is populated

The JSON Library

Hive cannot read JSON files by default and our streaming writes in the JSON format. Therefore you will need to download a JAR library.

  1. Download the library from rcongiu github, binary (jar file) are provided for standard distribution so you don’t have to compile.
  2. Copy this library into HDFS and keep his path handy, we will assume in the next steps to be /share/lib/hive/json-serde-1.3.7-jar-with-dependencies.jar
  3. Check if everything is OK in a terminal window (don’t forget to change the twitter data path)

ADD JAR /share/lib/hive/json-serde-1.3.7-jar-with-dependencies.jar;

CREATE EXTERNAL TABLE tweets_tmp (
 id BIGINT,
 created_at STRING,
 source STRING,
 favorited BOOLEAN,
 retweeted_status STRUCT<
  text:STRING,
  user:STRUCT<screen_name:STRING,name:STRING>,
  retweet_count:INT>,
 entities STRUCT<
  urls:ARRAY<STRUCT<expanded_url:STRING>>,
  user_mentions:ARRAY<STRUCT<screen_name:STRING,name:STRING>>,
  hashtags:ARRAY<STRUCT<text:STRING>>>,
 text STRING,
 user STRUCT<
  screen_name:STRING,
  name:STRING,
  friends_count:INT,
  followers_count:INT,
  statuses_count:INT,
  verified:BOOLEAN,
  utc_offset:INT,
  time_zone:STRING>,
 in_reply_to_screen_name STRING
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
LOCATION '/share/twitter_json/${DATE}/${HOUR}’;

SELECT id,text FROM tweets LIMIT 10;

DROP TABLE tweets_tmp;

The Analysis

Now using Red Sqirl we will build a sentiment analysis. Red Sqirl will then produce an hourly and daily summary. These two tables can then be queried or integrated with third-party tools. You can find the output rs file here.

At each step of the process, we'll show a screenshot to demonstrate the workflow. This first screenshot is a view of what the final workflow in Red Sqirl will look like.



Step 2 Load the Data

Create the Twitter Source

In this case our source is the data generated by flume and the format is JSON. We’ll load it every hour as a text file into Red Sqirl.

  1. Drag & Drop a Synchronous Source
  2. Double Click on it
  3. Name it twitter
  4. Click OK
  5. Select Hadoop Distributed File System
  6. Click Next
  7. Select your path /share/twitter_json/20161015/00
  8. Give delimiter #1 to get one field
  9. Click Apply to check
  10. Click Next
  11. The template path should look like this: /share/twitter_json/${YEAR}${MONTH}${DAY}/${HOUR}
  12. Click Next
  13. The defaults are fine, except the offset to set at -1
  14. Click OK

Create a temporary external table

This hourly file can be parsed into a script node. From which we will load it as an external table. In this analysis we will only need the following fields:

  1. id
  2. username
  3. location
  4. Text
  1. Drag & Drop a Script Node
  2. Link “twitter” to it (choose the unnamed output)
  3. Double Click
  4. Name it twitter_extract
  5. Choose “template-hive2”
  6. Click Next
  7. Choose HCatalog
  8. Click Next
  9. In the header write “id LONG, username STRING, area STRING, content STRING”
  10. Click Next
  11. Choose HCatalog
  12. Click Next
  13. Write “id LONG,word STRING” in the header field
  14. Click Next
  15. We’re copying how the node looks for us. From the original value, check
  1. The job-xml field
  2. The jdbc-url field
  3. Remove the prepare field
  4. You can add namenode parameters, and change the INPUT_PATH parameter

<hive2 xmlns="uri:oozie:hive2-action:0.1">
        <job-tracker>${jobtracker}</job-tracker>
        <name-node>${namenode}</name-node>
        <job-xml>/user/etienne/.redsqirl/conf/hive-site.xml</job-xml>
        <configuration>
          <property>
            <name>mapred.job.queue.name</name>
            <value>${default_action_queue}</value>
          </property>
          <property>
            <name>oozie.launcher.mapred.job.queue.name</name>
            <value>${default_launcher_queue}</value>
          </property>
        </configuration>
        <jdbc-url>jdbc:hive2://hdp2.local.net:10000/default</jdbc-url>
        <script>!{SCRIPT}</script>
        <param>namenode=${namenode}</param>
        <param>INPUT_PATH=${twitter}</param>
</hive2>

  1. Set the script extension to ‘.sql’
  2. Click on ignore warnings
  3. Click Next
  4. Copy the following script

ADD JAR ${namenode}/share/lib/hive/json-serde-1.3.7-jar-with-dependencies.jar;

--Drop temporary objects
DROP TABLE IF EXISTS tweets_tmp;
DROP VIEW IF EXISTS tweet_sentences_tmp;
DROP TABLE IF EXISTS !{OUTPUT_DATABASE_tweets}.!{OUTPUT_TABLE_tweets};
DROP TABLE IF EXISTS !{OUTPUT_DATABASE_words}.!{OUTPUT_TABLE_words};

--Orignal data
CREATE EXTERNAL TABLE tweets_tmp (
  id BIGINT,
  text STRING,
  `user` STRING
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
LOCATION '${INPUT_PATH}';

--Get the four fields we will use
CREATE TABLE !{OUTPUT_DATABASE_tweets}.!{OUTPUT_TABLE_tweets} AS
SELECT distinct id AS id,
        get_json_object(`user`,'$.name’) AS username,
        get_json_object(`user`,'$.location’) AS area,
        text AS content
FROM tweets_tmp;

DROP TABLE tweets_tmp;

--Create a table with a word per row
create view tweet_sentences_tmp as select id, words
FROM !{OUTPUT_DATABASE_tweets}.!{OUTPUT_TABLE_tweets}
lateral view explode(sentences(lower(content))) dummy as words;

create table !{OUTPUT_DATABASE_words}.!{OUTPUT_TABLE_words} as select id, word
FROM tweet_sentences_tmp
lateral view explode( words ) dummy as word;

DROP VIEW tweet_sentences_tmp;

  1. Click OK

Note:

  • In tweets_tmp table, the location field in the user JSON field is sometimes missing. Therefore, the user column type can’t be STRUCT.

Create the Dictionary Source

As we are using a multi-lingual environment, we will need to load the dictionary and remove the eventual duplicate.

  1. Drag & Drop an HCatalog Source
  2. Double Click on it
  3. Name it lexicon
  4. Click OK
  5. Choose the lexicon table for me it is /utils/nrc_lexicon
  6. Click OK

Ensure Unicity of Emotions per word

We are assuming here that a word used in different languages have only one meaning. We will remove duplicates.

  1. Drag & Drop a JDBC Aggregator
  2. Link lexicon with it
  3. Double click on it
  4. Name it words
  5. Click OK
  6. Select word
  7. Click Next
  8. Choose copy in the menu and click Generate
  9. Choose max in the menu and click Generate
  10. Click on the growing glass icon
  11. Replace all “_max” by nothing
  12. Click Close on the replace modal
  13. Click Next
  14. Click Next
  15. Click OK

Step 3 Calculate the sentiment per tweet

Merge tweets with sentiment

For every tweet, we will look into the meaning of each word. A word can be in several category, and can be repeated.

  1. Drag & Drop a Jdbc Join
  2. Link twitter_extract to it, words output
  3. Link words to it
  4. Double click on it
  5. Call it “word_emotions”
  6. Click Next
  7. Select copy and OK
  8. Remove the row “twitter_extract_words.word” and “words.word”
  9. Go in the replace modal and replace “twitter_extract_words_” to nothing
  10. Go in the replace modal and replace “words_” to nothing
  11. Click on the Close button
  12. Click Next
  13. On the twitter_extract_words line, add “twitter_extract_words.word” and on the words line “words.word”
  14. Click Next
  15. Click Next
  16. Click OK

Aggregate the Data per tweet

Now we can calculate the sentiment communicated in a tweet.

  1. Drag & Drop an aggregator icon
  2. Link “twitter_emotions” to it
  3. Double Click on it
  4. Name it “tweet_emotions”
  5. Select id
  6. Select Copy and OK
  7. Select SUM and OK
  8. Go in the grossing glass icon and replace “_sum” by nothing
  9. Click Next
  10. Click Next
  11. Click OK

Step 4 Create your Output

Low Level Output

We will add the data retrieved into our input data.

  1. Drag & Drop a Jdbc Join
  2. Link “twitter_extract” to it, tweet outputs
  3. Link “tweet_emotions” to it
  4. Double Click on it
  5. Name it “twitter_emotion”
  6. Click Next
  7. Select Copy and Click OK
  8. Drop line “tweet_emotions.id”
  9. Click on the grossing glass
  10. Replace “twitter_extract_tweets_” by nothing
  11. Replace “tweet_emotions_” by nothing
  12. Click on Close
  13. Click next
  14. Select “Left Outer Join”
  15. Add “twitter_extract_tweets.id” to the twitter_extract line
  16. Add “tweet_emotions.id” to the tweet_emotions line
  17. Make sure that the twitter_extract line is first
  18. Click Next
  19. Click Next
  20. Click OK

Record tweet level output

Let’s record twitter_emotion

  1. Drag & Drop a Synchronous Sink
  2. Link “twitter_emotion” to it
  3. Double click on it
  4. Name it “hourly_tweets”
  5. In template, write the output table name and database to write the result, for us we will write it into ‘/twitter/tweets/dt=${YEAR}${MONTH}${DAY};hr=${HOUR}’
  6. Click Apply
  7. Click OK on the error message
  8. You will see a green question mark, hover it and you will see an sql statement. Copy the entire statement
  9. On the bottom right of your screen, in the HCatalog View, click on the Query Executor.
  10. Paste your query
  11. You can add a data format such as “STORED AS ORC” if you wish
  12. At the end your query would look like

CREATE TABLE twitter.tweets (id BIGINT,username STRING,area STRING,content STRING,positive
INT,negative INT,anger INT,anticipation INT,disgust INT,fear INT,joy INT,sadness INT,surprise INT,trust INT )
PARTITIONED BY (dt STRING, hr STRING ) STORED AS ORC

  1.  Click the execute button
  2. In HCatalog View, go into the twitter database
  3. Click on the refresh button, so that you can see the new table
  4. Click Ok in the Synchronous Sink window.

Summary Output

We will now summarize our output. For every area and a given keywords, we will calculate the number of tweets and the emotions.


Firstly let’s calculate an overall view for every area.

  1. Drag & Drop a Jdbc Aggregator
  2. Link Twitter Emotions to it
  3. Double Click on it
  4. Name it “twitter_sum”
  5. Click OK
  6. Select area
  7. Click Next
  8. Add a line, with the following fields: '',content,CATEGORY
  9. Select Copy and Click OK
  10. Click on the Configure Button
  11. Click on count
  12. Select total_cnt
  13. Click Add
  14. Click Generate
  15. Select SUM and click OK
  16. Remove id_sum row
  17. Click Next
  18. Click Next
  19. Click OK

We will now add one node for every expression we want to report on.

  1. Copy & Paste twitter_sum
  2. Link the “twitter_emotion” to the new icon
  3. Go in options, rename Object. We can name it for example “twitter_bd_sum”.
  4. Confirm the rename
  5. Double Click on the object
  6. Click next
  7. Change the content operation to be ‘big data'
  8. Click Next
  9. Click Next
  10. In the Condition write: LOWER(content) LIKE '%big data%'
  11. Click OK

We need to bring all our reports together in one dataset.

  1. Drag & Drop a Jdbc Union
  2. Link it with all your summaries
  3. Name it “twitter_report”
  4. Click OK
  5. Click Next
  6. Select Copy and click OK
  7. Click Next
  8. Click Next
  9. Click OK

Record area level output

Let’s record our hourly report now.

  1. Drag & Drop a Synchronous Sink
  2. Link “twitter_report” to it
  3. Double click on it
  4. Name it “hourly_report”
  5. In template, write the output table name and database to write the result, for me I will write it into ‘/twitter/twitter_report_hr/dt=${YEAR}-${MONTH}-${DAY};hr=${HOUR}’
  6. Click Apply
  7. Click OK on the error message
  8. You will see a green question mark, hover it and you will see an sql statement. Copy the entire statement
  9. On the bottom right of your screen, in the HCatalog View, click on the Query Executor.
  10. Paste your query
  11. You can add a data format such as “STORED AS ORC” if you wish
  12. At the end your query will look like

CREATE TABLE twitter.twitter_report_hr (content STRING,area STRING,total_cnt
INT,positive_sum INT,negative_sum INT,anger_sum INT,anticipation_sum INT,disgust_sum INT,fear_sum INT,joy_sum
INT,sadness_sum INT,surprise_sum INT,trust_sum INT ) PARTITIONED BY (dt STRING, hr STRING ) STORED AS ORC

  1.  Click the execute button
  2. In HCatalog View, go into the twitter database
  3. Click on the refresh button, so that you can see the new table
  4. Click Ok in the Synchronous Sink window.

Step 5 Schedule The Hourly Job

We will now configure the computation time of our job.

  1. Double Click on the coordinator label, on the top left of the canvas
  2. Change the name to “twitter_hourly”
  3. Click on Execute
  4. In the execution time, choose today at 12:00, and repeat the job every hour. In here, it means the job will execute at the start of each hour.

Step 6 Create a daily Job

Let’s create a daily summary. We will take 24 reports and produce one every day.

Our first step is to set up the input data of our first job.

  1. Drag & Drop a Synchronous Sink Filter
  2. Link ‘hourly_report’ to it
  3. Double Click on it
  4. Name it ‘all_day_reports’
  5. Click OK
  6. Set the number of instances to 24
  7. Click OK

We will now summarize the report.

  1. Drag & Drop a Jdbc Aggregator
  2. Link ‘all_day_reports’ to it
  3. Double Click on it
  4. Name it ‘report_d_sum’
  5. Click OK
  6. Select area and content
  7. Click Next
  8. Select copy and click OK
  9. Select sum and click OK
  10. Click on the grossing glass icon and replace all “_sum_sum” by “_sum”
  11. Rename “total_cnt_sum” to “total_cnt”
  12. Click Next
  13. Click Next
  14. Click OK

Let’s record our summary.

  1. Drag & Drop a Synchronous Sink
  2. Link ‘all_day_reports’ to it
  3. Double Click on it
  4. Name it ‘daily_report’
  5. Click OK
  6. Set the template path to ‘/twitter/twitter_report_daily/dt=${YEAR}-${MONTH}-${DAY}’
  7. Apply
  8. Copy the create statement to the execution window
  9. You can add your favourite storage method. The script to execute should look like:

CREATE TABLE twitter.twitter_report_daily (content STRING,area STRING,total_cnt
INT,positive_sum INT,negative_sum INT,anger_sum INT,anticipation_sum INT,disgust_sum INT,fear_sum INT,joy_sum
INT,sadness_sum INT,surprise_sum INT,trust_sum INT ) PARTITIONED BY (dt STRING ) STORED AS ORC

  1. Click Execute
  2. Refresh the tables in the twitter database
  3. Click OK in the Sink configuration window

Step 7 Schedule the daily job

We will now configure the computation time of our daily job.

  1. Double click on the coordinator label, corresponding to the new coloured area
  2. Change the name to “twitter_daily”
  3. Click on Execute
  4. In the execution time, choose today at 00:00, and repeat the job every day.

Step 8 Save & Run
  1. Go in the File menu, click Save
  2. Name the new workflow twitter_emotions
  3. Go to Project, click “Save & Run”
  4. Give it a date far in the future to run it in data pipeline mode, or with an end date in the past to run it in batch mode. When running in batch, you may be limited by Oozie “Workflow definition length” property default value.

Conclusion

In this use case we have shown how to:

You can now create a dashboard using your favourite visualisation tool.

Simple Tableau Dashboard showing daily usage

Other Related Sources

Twitter related Tutorials