Implementing a Twitter Firehose in CDH5

While trying to implement the tutorial from the series How-to: Analyze Twitter Data with Apache Hadoop I stumbled upon two issues:

  • CDH installed using parcels, which was the recommended method. The tutorial assumes that the installation was performed using packages. As a consequence, most of the libraries and programs are installed differently.
  • Because of CDH5 major libraries upgrade, to compile the Flume Twitter source the maven configuration file has to be updated accordingly.

Fortunately, the CDH5 release notes mentions:

Apache Flume
New Features:

  • FLUME-2190 – Includes a new Twitter Source that feeds off the Twitter firehose
  • FLUME-2109 – HTTP Source now supports HTTPS
  • Flume now auto-detects Cloudera Search dependencies.

The Twitter source is now out-of-the-box.

Let’s set up Flume as follows:

The flume agent will source from Twitter API and sink the tweets into HDFS

The flume agent will source from Twitter API and sink the tweets into HDFS

The approach is to

  • Create a Twitter Application. To establish a connection to Twitter, generate an application and the related security keys (oAuth) that will authenticate your Twitter account.
  • Create a Flume agent. Configure the Flume agent with one source, TwitterSource, and one sink (target), HDFS
  • Prepare HDFS. Create and configure the HDFS user and directory where the tweets will be stored.

Create a Twitter Application

Sign-in the Twitter development site.

Mouse-over your portrait (on the top right side) and select ‘My Applications’ from the menu.

Click on the 'Create a new application' button.

Click on the ‘Create a new application’ button.

Create the application by going through all the steps. Retrieve the 4 keys required to set up the source of the Flume agent.

The keys are named:

  • consumerKey
  • consumerSecret
  • accessToken
  • accessTokenSecret

These keys can be found once the application is created:

The 4 keys required for setting up the Twitter firehose.

The 4 keys required for setting up the Twitter firehose.

These keys won’t change unless you decide to. You can always come back to this application screen and copy the keys again.

Create a Flume Agent

In its simplest form, a Flume agent is composed of a Source, a Sink (the destination) and a Channel. Data is collected by the source, transmitted through the channel to the destination:

The composition of a Flume Agent

The composition of a Flume Agent

In our case, TwitterSource (the source) collects the tweets. These tweets are sent to HDFS (the Sink) through memoryChannel (the memory channel).

The Flume service is installed, but no instances of this service are created. To create an instance:

  • Go in the main screen of Cloudera Manager,
  • From the Cluster drop down button, select “Add a service”
From the Cloudera Manager Cluster menu, select "Add a service"

From the Cloudera Manager Cluster menu, select “Add a service”

From the list of available service, select Flume:

Select the Flume service from the list of services

Select the Flume service from the list of services

Select from the top menu Cluster the Flume service:

Select the Flume service from the Clusters menu

Select the Flume service from the Clusters menu

To modify the Flume service agent configuration,

  • Select the menu Configuration -> View/Edit.
  • Select the Agent Base Group.
  • Change the agent name to ‘TwitterAgent’.
  • Copy the configuration code provided below, replacing the [xxx] with the Twitter key values
Modify the Flume Agent configuration

Modify the Flume Agent configuration

The configuration of the Flume agent is as follows:

TwitterAgent.sources = twitter
TwitterAgent.channels = memoryChannel
TwitterAgent.sinks = HDFS

TwitterAgent.sources.twitter.type = org.apache.flume.source.twitter.TwitterSource
TwitterAgent.sources.twitter.consumerKey = [xxx] #insert your key here
TwitterAgent.sources.twitter.consumerSecret = [xxx] #insert your key here
TwitterAgent.sources.twitter.accessToken = [xxx] #insert your key here
TwitterAgent.sources.twitter.accessTokenSecret = [xxx] #insert your key here
TwitterAgent.sources.twitter.maxBatchDurationMillis = 200 
TwitterAgent.sources.twitter.channels = memoryChannel

TwitterAgent.channels.memoryChannel.type = memory
TwitterAgent.channels.memoryChannel.capacity = 10000
TwitterAgent.channels.memoryChannel.transactionCapacity = 1000

TwitterAgent.sinks.HDFS.type = hdfs
TwitterAgent.sinks.HDFS.channel = memoryChannel
TwitterAgent.sinks.HDFS.hdfs.path = hdfs://hadoop1.example.com:8020/user/flume/tweets/%Y/%m/%d/%H/
TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text
TwitterAgent.sinks.HDFS.hdfs.batchSize = 1000
TwitterAgent.sinks.HDFS.hdfs.rollSize = 0
TwitterAgent.sinks.HDFS.hdfs.rollCount = 10000
TwitterAgent.sinks.HDFS.hdfs.useLocalTimeStamp = true

Note that the hdfs.useLocalTimeStamp option has been set to true to avoid name conflicts while saving files in the HDFS directory.

Once this modifications are performed, click on the “Save Changes” button.

The agent is created, but it needs to be assigned to a cluster node to be able to run.

  • Select the “Instances” tab, and
  • Click on the “Add” button.
  • Select one node and validate.
Select the node on which the flume agent will execute

Select the node on which the flume agent will execute

Setup HDFS

The last step is to create the flume user in HDFS. In Hue, go to the Admin -> Manage Users menu. Add a flume user (leave the create home directory selected) and a flume group.

From a command prompt, type the following command:

$> su - hdfs
$> hadoop fs -chmod -R 770 /user/flume

Execution

Start the Flume Agent from the Cloudera Manager home screen.

you can monitor the progress and operations of the Flume agent:

Monitor the status of the Flume agent from the Flume Service Dashboard.

Monitor the status of the Flume agent from the Flume Service Dashboard.

In Hue, verify that the tweets are now written in the HDFS directory

The tweets are now getting collected and are saved in the HDFS directories

The tweets are now getting collected and are saved in the HDFS directories

A Firehose or Garden Hose?

After a 24 hours, there are 1.6GB worth of tweets in HDFS

$> hdfs dfs -du -h /user/flume
1.6 G  /user/flume/tweet

TwitterSource processed only 4,374,100 tweets, this compared to the 500+ millions tweets that Twitter claims to generate daily. It looks like my firehose is in fact a garden hose. The reasons are somewhat explained in the development site of Twitter. There are vendors reselling the Twitter stream API that are not subject to the rate limiting restrictions. The 3 certified providers are: DataSift, Gnip and NTT Data. Topsy was also a data reseller, but it has been recently acquired by Apple.

Conclusion

A Twitter Firehose implemented without writing a single line of code, that’s quite impressive. It is now possible to analyze all these tweets with other mechanism such as Hive, Solr, etc.
All comments and remarks are welcome.

Post navigation


Comments

  • avatar

    JP

    Right, by just sticking together a few components to make this work it’s impressive.

    A personal note to add here. This configuration is not highly recommend for production usage, especially when adopting the Firehouse API (not the sample stream) without rate limits. Why?

    Small file problem! HDFS is a block device, build for huge files, 64MB, 128MB, 256MB or even more. With your Flume configuration you will end up with many, many small files on HDFS (~60KB each). You could avoid this by changing the following parameters accordingly:

    TwitterAgent.sinks.HDFS.hdfs.batchSize = 10000
    TwitterAgent.sinks.HDFS.hdfs.rollCount = 0
    TwitterAgent.sinks.HDFS.hdfs.rollSize = 0

    TwitterAgent.channels.MemChannel.type = memory
    TwitterAgent.channels.MemChannel.capacity = 10000
    TwitterAgent.channels.MemChannel.transactionCapacity = 10000

    Now you have files ~64MB.

  • avatar

    JP

    Sorry, The channel transaction capacity will need to be smaller than the channel capacity, so

    TwitterAgent.channels.MemChannel. transactionCapacity = 1000

  • avatar

    Chris

    Hello JP,

    I haven’t yet looked at the fine-tuning techniques for Flume and HDFS, and your comments are definitively very good tips for me to start!

    Thanks for this useful comment!

  • avatar

    Ben

    Hey, is it possible to pre-filter the tweets with:

    TwitterAgent.sources.twitter.keywords = wc2014

    Because when i add it to the config, nothing happens and i still get all the tweets instead of the chosen ones.

  • avatar

    Chris

    Hello Ben,

    The way you are defining the filtering is correct. If you change the configuration file, you have to apply the configuration and restart the flume agent.

  • avatar

    Ben

    Hey Chris,
    thanks for the reply. Actually, thats what i did. it didnt work. You know a reason why that could be? I dont have to define an interceptor or something like that?

  • avatar

    Chris

    Hi Ben,

    I looked at the code of the flume twitter Source Code and I noticed that “keywords” is not supported anymore in the new versions. This flume agent will sample the 1% twitter firehose (or garden hose). If you want to perform a filtering, you will have to implement this with Twitterj.

  • avatar

    Ben

    Hey Chris,

    Thanks for checking. Do you maybe have a tutorial or something so we can implement that? Thanks!

    Ben

  • avatar

    Chris

    Hi Ben, unfortunately I haven’t really investigated this yet.

  • avatar

    Ben

    Hey Chris,

    thanks for answering. You know if that keywordfiltering is possible with CDH 4.7?

    Thanks!

  • avatar

    Chris

    As a matter of fact, it was possible. In fact the following article on Cloudera Blog site (http://j.mp/1pV4Kqo) describes the process in a two part article. I hope that helps.

  • avatar

    James

    Hi Chris,

    thank you for this great tutorial.

    Unfortunately, I’m encountring an error. When I’m trying to use HIVE, I’m getting the following error:

    “Could not connect to localhost.localdomain:9083”

    I’m using MySQL to run the hive metastore Server.

    How can I resolve this issue?

    Thank you in advance.

    King regards
    James

  • avatar

    Chris

    Hi James, is it a similar kind of error as described in the following article http://javet.org/?p=216 ?

  • avatar

    James

    Hi,

    thank you for your answer. Now I’m encoutering another problem with my flume twitter data!

    the flume data is full of unsupported characters:

    http://i.imgur.com/kM5Cqir.png

    also, when I try to edit the file, I’m getting the following error message:

    http://i.imgur.com/aOmQ6cN.png

    File is not encoded in utf; cannot be edited: /user/flume/tweets/2014/07/08/12/FlumeData.1404847995652.

    And when I try to import the flume data into the hive tables using the metastore manager, I’m getting the following errors:

    java.io.IOException: org.apache.hadoop.hive.serde2.SerDeException: org.codehaus.jackson.JsonParseException: Unexpected character (‘O’ (code 79)): expected a valid value (number, String, array, object, ‘true’, ‘false’ or ‘null’)
    at [Source: java.io.StringReader@4f7f1d92; line: 1, column: 2]

    can you help me solve this problem with my twitter data?

  • avatar

    Chris

    I think I had a similar issue. I posted the question on the Cloudera Support forum and it was suggested that the files were corrupted. I will see if I find more information on this issue (but unfortunately I do not have much time lately). If you find an answer before me, please let me know.

  • avatar

    Tarek Abouzeid

    i am having the exact same problem of James , please if any one found a solution for this problem it will be great to post it here , thanks so much in advance

  • avatar

    saeed

    i followed the exact same steps in this blog but i do not get anything in my flume/hdfs/tweet directory . i updated my configuration. i restarted my flume. i added one of my node to act as a flume agent. nothing worked. Any idea?

Leave a Reply