πŸ”—Consuming Twitter Data in Real Time

Source Code Repository: Source code (and other resources) for this guide are available at the CDAP Guides GitHub repository.

Consuming a live tweets stream in real time is one of the common tasks of big data applications that power the social analytics. In this guide, you will learn how to accomplish it with the Cask Data Application Platform (CDAP).

πŸ”—What You Will Build

You will build a CDAP application that consumes data from the public Twitter feed and computes the average tweet size. You will:

  • Build a real-time Flow to process tweets in real time;
  • Use a Flowlet from the cdap-pack-twitter library that uses the Twitter4j library to connect the Flow and Twitter stream;
  • Use a Dataset to persist the results of the analysis; and
  • Build a Service to serve the analysis results via a RESTful endpoint.

πŸ”—Let’s Build It!

Following sections will guide you through building an application from scratch. If you are interested in deploying and running the application right away, you can clone its source code from this GitHub repository. In that case, feel free to skip the next two sections and jump right to the Configuring TweetCollectorFlowlet section.

πŸ”—Application Design

Real-time processing capability within CDAP is supported by a Flow. The application we are building in this guide uses a Flow for processing the tweets consumed from Twitter feed. The processing results are persisted in a Dataset and are made available via RESTful endpoint using a Service.

../_images/app-design8.png

The Flow consists of two processing nodes called Flowlets:

  • A collector Flowlet that consumes data from Twitter feed and output a synthesized Tweet object; and
  • An analyzer Flowlet that consumes the tweet emitted by the collector to update the basic statistics of Tweets: total tweets size and count.

πŸ”—Application Implementation

The recommended way to build a CDAP application from scratch is to use a Maven project. Use the following directory structure (you’ll find contents of these files described below):

./pom.xml
./src/main/java/co/cask/cdap/guides/twitter/TwitterAnalysisApp.java
./src/main/java/co/cask/cdap/guides/twitter/TweetAnalysisFlow.java
./src/main/java/co/cask/cdap/guides/twitter/StatsRecorderFlowlet.java
./src/main/java/co/cask/cdap/guides/twitter/TweetStatsHandler.java
./src/main/resources/twitter4j.properties

The application will use the cdap-packs-twitter library which includes an implementation of TweetCollectorFlowlet. You'll need to add this library as a dependency to your project's pom.xml:

...
<dependencies>
  ...
  <dependency>
    <groupId>co.cask.cdap.packs</groupId>
    <artifactId>cdap-twitter-pack</artifactId>
    <version>0.1.0</version>
  </dependency>
</dependencies>

Create the TwitterAnalysisApp class which declares that the application has a Flow, a Service, and creates a Dataset:

public class TwitterAnalysisApp extends AbstractApplication {
  static final String NAME = "TwitterAnalysis";
  static final String TABLE_NAME = "tweetStats";
  static final String SERVICE_NAME = "TweetStatsService";

  @Override
  public void configure() {
    setName(NAME);
    createDataset(TABLE_NAME, KeyValueTable.class);
    addFlow(new TweetAnalysisFlow());
    addService(SERVICE_NAME, new TweetStatsHandler());
  }
}

The TweetAnalysisFlow makes use of the TweetCollectorFlowlet that is available in the cdap-packs-twitter library:

public class TweetAnalysisFlow extends AbstractFlow {
  static final String NAME = "TweetAnalysisFlow";
  @Override
  public void configure() {
    setName(NAME);
    setDescription("Collects simple tweet stats");
    addFlowlet("collect", new TweetCollectorFlowlet());
    addFlowlet("recordStats", new StatsRecorderFlowlet());
    connect("collect", "recordStats");
  }
}

Tweets pulled by the TweetCollectorFlowlet are consumed by the StatsRecorderFlowlet that updates the total number of tweets and their total body size in a Dataset:

public class StatsRecorderFlowlet extends AbstractFlowlet {
  @UseDataSet(TwitterAnalysisApp.TABLE_NAME)
  private KeyValueTable statsTable;

  @ProcessInput
  public void process(Tweet tweet) {
    statsTable.increment(Bytes.toBytes("totalCount"), 1);
    statsTable.increment(Bytes.toBytes("totalSize"), tweet.getText().length());
  }
}

In a real-world scenario, the Flowlet could perform more sophisticated processing on tweets.

Finally, the TweetStatsHandler uses the tweetStats Dataset to compute the average tweet size and serve it over HTTP:

@Path("/v1")
public class TweetStatsHandler extends AbstractHttpServiceHandler {

  @UseDataSet(TwitterAnalysisApp.TABLE_NAME)
  private KeyValueTable statsTable;

  @Path("avgSize")
  @GET
  public void sentimentAggregates(HttpServiceRequest request, HttpServiceResponder responder) throws Exception {
    long totalCount = statsTable.incrementAndGet(Bytes.toBytes("totalCount"), 0);
    long totalSize = statsTable.incrementAndGet(Bytes.toBytes("totalSize"), 0);
    responder.sendJson(totalCount > 0 ? totalSize / totalCount : 0);
  }
}

πŸ”—Configuring TweetCollectorFlowlet

In order to utilize the TweetCollectorFlowlet, a Twitter API key and Access token must be obtained and configured. Follow the steps provided by Twitter to obtain OAuth access tokens. You can provide these to the TweetCollectorFlowlet as runtime arguments of the Flow or put them in twitter4j.properties in the src/main/resources/ directory and package it with the application. The format of the twitter4j.properties file:

oauth.consumerKey=***************************
oauth.consumerSecret=***************************
oauth.accessToken=***************************
oauth.accessTokenSecret=***************************

πŸ”—Build and Run Application

The TwitterAnalysisApp application can be built and packaged using the Apache Maven command:

$ mvn clean package

Note that the remaining commands assume that the cdap-cli.sh script is available on your PATH. If this is not the case, please add it:

$ export PATH=$PATH:<CDAP home>/bin

If you haven't already started a standalone CDAP installation, start it with the command:

$ cdap.sh start

We can then deploy the application to a standalone CDAP installation and start its components:

$ cdap-cli.sh load artifact target/cdap-twitter-ingest-guide-<version>.jar
$ cdap-cli.sh create app TwitterAnalysis cdap-twitter-ingest-guide <version> user
$ cdap-cli.sh start flow TwitterAnalysis.TweetAnalysisFlow
$ cdap-cli.sh start service TwitterAnalysis.TweetStatsService

Once Flow is started, tweets are pulled and processed. You can query for the average tweet size:

$ curl -w'\n' http://localhost:10000/v3/namespaces/default/apps/TwitterAnalysis/services/TweetStatsService/methods/v1/avgSize

or using the CDAP CLI:

$ cdap-cli.sh call service TwitterAnalysis.TweetStatsService GET 'v1/avgSize'

Example output:

132

πŸ”—Share and Discuss!

Have a question? Discuss at the CDAP User Mailing List.

πŸ”—License

Copyright Β© 2014-2015 Cask Data, Inc.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.