Clicks and Views

A Cask Data Application Platform (CDAP) example demonstrating a reduce-side join across two streams using a MapReduce program.

Overview

This application has a MapReduce which processes records in two streams and outputs to a PartitionedFileSet. The ClicksAndViewsMapReduce processes the records from a views stream and a clicks stream, then joining on the viewId of the records and writing the joined records to a joined PartitionedFileSet.

Let's look at some of these components, and then run the application and see the results.

The Clicks and Views Application

As in the other examples, the components of the application are tied together by the class ClicksAndViews:

public class ClicksAndViews extends AbstractApplication {
  static final String NAME = "ClicksAndViews";
  static final String VIEWS = "views";
  static final String CLICKS = "clicks";
  static final String JOINED = "joined";

  @Override
  public void configure() {
    setName(NAME);
    setDescription("Example Clicks and Views Application");

    // Process the "clicks" and "views" streams using MapReduce
    addMapReduce(new ClicksAndViewsMapReduce());

    addStream(new Stream(VIEWS, "Stores the views that happen for each ad"));
    addStream(new Stream(CLICKS, "Stores the clicks that happen for each view"));

    createDataset(JOINED, PartitionedFileSet.class, PartitionedFileSetProperties.builder()
      // partition on "runtime", represents the MapReduce's logical runtime
      .setPartitioning(Partitioning.builder().addLongField("runtime").build())
      // Property for file set to be used as output of MapReduce
      .setOutputFormat(TextOutputFormat.class)
      // Properties for Explore (to create a partitioned Hive table)
      .setEnableExploreOnCreate(true)
      .setExploreFormat("text")
      .setExploreFormatProperty("delimiter", "\t")
      // viewId, requestBeginTime, adId, referrer, userCookie, ip, numClicks
      .setExploreSchema("viewId BIGINT, requestBeginTime BIGINT, adId BIGINT, referrer STRING, " +
                          "userCookie STRING, ip STRING, numClicks INT")
      .setDescription("Stores the joined views, which additionally keeps track of how many clicks each view had")
      .build());
  }
}

Data Storage

  • views input Stream contains ad views, representing an advertisement displayed on a viewer's screen.
  • clicks input Stream contains ad clicks, representing when a viewer clicks on an advertisement.
  • joined output PartitionedFileSetFileSet contains the joined ad views, which additionally contains the count of clicks each ad view has. This Dataset is partitioned on the logical start time of the MapReduce.

MapReduce over Multiple Inputs

ClicksAndViewsMapReduce is a MapReduce that reads from the clicks and views streams and writes to the joined PartitionedFileSet. The initialize() method prepares the MapReduce program. It sets up the two streams as input and sets up the PartitionedFileSet as output, with the appropriate PartitionKey:

@Override
public void initialize() throws Exception {
  MapReduceContext context = getContext();
  context.addInput(Input.ofStream(ClicksAndViews.CLICKS));
  context.addInput(Input.ofStream(ClicksAndViews.VIEWS));

  PartitionedFileSet joinedPFS = context.getDataset(ClicksAndViews.JOINED);
  PartitionKey outputPartitionKey =
    PartitionedFileSetArguments.getOutputPartitionKey(context.getRuntimeArguments(), joinedPFS.getPartitioning());

  if (outputPartitionKey == null) {
    outputPartitionKey = PartitionKey.builder().addLongField("runtime", context.getLogicalStartTime()).build();
  }

  Map<String, String> outputArgs = new HashMap<>();
  PartitionedFileSetArguments.setOutputPartitionKey(outputArgs, outputPartitionKey);
  context.addOutput(Output.ofDataset(ClicksAndViews.JOINED, outputArgs));

  Job job = context.getHadoopJob();
  job.setMapperClass(ImpressionKeyingMapper.class);
  job.setReducerClass(JoiningReducer.class);
}

The Mapper class then keys each of the records based upon the viewId. That results in all clicks and views for a particular viewId going to a single Reducer for joining. In order to do the join properly, the Reducer needs to know which source each record came from. This is possible by calling the getInputName method of the MapReduceTaskContext that is passed to initialize. Note that the Mapper needs to implement ProgramLifeCycle:

public static class ImpressionKeyingMapper extends Mapper<LongWritable, Text, LongWritable, Text>
  implements ProgramLifecycle<MapReduceTaskContext<LongWritable, Text>> {

  private String inputName;

  @Override
  public void initialize(MapReduceTaskContext<LongWritable, Text> context) throws Exception {
    inputName = context.getInputContext().getInputName();
    Preconditions.checkNotNull(inputName);
    Preconditions.checkArgument(ClicksAndViews.CLICKS.equals(inputName) || ClicksAndViews.VIEWS.equals(inputName));
  }
...

Building and Starting

  • You can build the example as described in Building an Example Application.

  • Start CDAP (as described in Starting and Stopping CDAP).

  • Deploy the application, as described in Deploying an Application. For example, from the CDAP Local Sandbox home directory, use the Command Line Interface (CLI):

    $ cdap cli load artifact examples/ClicksAndViews/target/ClicksAndViews-5.0.0.jar
    
    Successfully added artifact with name 'ClicksAndViews'
    
    $ cdap cli create app ClicksAndViews ClicksAndViews 5.0.0 user
    
    Successfully created application
    
    > cdap cli load artifact examples\ClicksAndViews\target\ClicksAndViews-5.0.0.jar
    
    Successfully added artifact with name 'ClicksAndViews'
    
    > cdap cli create app ClicksAndViews ClicksAndViews 5.0.0 user
    
    Successfully created application
    
  • Once the application has been deployed, run the example.

  • When finished, you can remove the application.

Running the Example

Ingesting Records

Begin by uploading a file containing newline-separated records representing view events into the views stream:

$ cdap cli load stream views examples/ClicksAndViews/resources/views.txt

Successfully loaded file to stream 'views'
> cdap cli load stream views examples\ClicksAndViews\resources\views.txt

Successfully loaded file to stream 'views'

Then, upload records representing click events into the clicks stream:

$ cdap cli load stream clicks examples/ClicksAndViews/resources/clicks.txt

Successfully loaded file to stream 'clicks'
> cdap cli load stream clicks examples\ClicksAndViews\resources\clicks.txt

Successfully loaded file to stream 'clicks'

Starting the MapReduce

The MapReduce will write to a partition based upon its logical start time when it is run.

  • Using the CDAP UI, go to the ClicksAndViews application overview page, programs tab, click ClicksAndViewsMapReduce to get to the MapReduce detail page, then click the Start button; or

  • From the CDAP Sandbox home directory, use the Command Line Interface:

    $ cdap cli start mapreduce ClicksAndViews.ClicksAndViewsMapReduce
    
    Successfully started mapreduce 'ClicksAndViewsMapReduce' of application 'ClicksAndViews'
    with stored runtime arguments '{}'
    
    > cdap cli start mapreduce ClicksAndViews.ClicksAndViewsMapReduce
    
    Successfully started mapreduce 'ClicksAndViewsMapReduce' of application 'ClicksAndViews'
    with stored runtime arguments '{}'
    

Querying the Results

Once the MapReduce job has completed, you can sample the joined PartitionedFileSet, by executing an explore query using the CDAP CLI:

$ cdap cli execute "\"SELECT * FROM dataset_joined\""
> cdap cli execute "\"SELECT * FROM dataset_joined\""

The view records along with their click count will be displayed:

+======================================================================================================================================================================================================================+
| dataset_joined.viewid:  | dataset_joined.requestb | dataset_joined.adid: BI | dataset_joined.referrer | dataset_joined.usercook | dataset_joined.ip: STRI | dataset_joined.numclick | dataset_joined.runtime: BIGINT |
| BIGINT                  | egintime: BIGINT        | GINT                    | : STRING                | ie: STRING              | NG                      | s: INT                  |                                |
+======================================================================================================================================================================================================================+
| 0                       | 1461219010              | 2157                    | http://www.google.com   | lu=fQ9qHjLjFg3qi3bZiuz  | 62.128.93.36            | 0                       | 1461284201475                  |
| 1                       | 1461265001              | 2157                    | http://www.google.co.uk | lu=8fsdggknea@ASJHlz    | 21.612.39.63            | 1                       | 1461284201475                  |
| 2                       | 1461281958              | 2157                    | http://www.yahoo.com    | name=Mike               | 212.193.252.52          | 1                       | 1461284201475                  |
| 3                       | 1461331879              | 2157                    | http://www.amazon.com   | name=Matt               | 1.116.135.146           | 0                       | 1461284201475                  |
| 4                       | 1461348738              | 2157                    | http://www.t.co         | name=Nicholas; Httponly | 89.141.94.158           | 0                       | 1461284201475                  |
| 5                       | 1461349158              | 2157                    | http://www.linkedin.com | lo=Npa0jbIHGloMnx75     | 69.75.87.114            | 1                       | 1461284201475                  |
+======================================================================================================================================================================================================================+
Fetched 6 rows

To calculate a click-through rate from this data, you could divide the number of clicks by the number of total views:

$ cdap cli execute "\"SELECT SUM(numclicks)/COUNT(*) AS CTR FROM dataset_joined\""
> cdap cli execute "\"SELECT SUM(numclicks)\COUNT(*) AS CTR FROM dataset_joined\""

With our sample data, the click through rate is 0.5:

+=============+
| ctr: DOUBLE |
+=============+
| 0.5         |
+=============+
Fetched 1 rows

Removing the Application

You can now remove the application as described in Removing an Application, or:

  • Using the CDAP UI, go to the ClicksAndViews application overview page, programs tab, click the Actions menu on the right side and select Manage to go to the Management pane for the application, then click the Actions menu on the right side and select Delete to delete the application; or

  • From the CDAP Local Sandbox home directory, use the Command Line Interface:

    $ cdap cli delete app ClicksAndViews
    
    > cdap cli delete app ClicksAndViews