Log Analysis Example

A Cask Data Application Platform (CDAP) example demonstrating Spark and MapReduce running in parallel inside a Workflow through fork.

Overview

This example demonstrates Spark and MapReduce performing log analysis, computing total number of hits for every unique URL, total number of responses for every unique response code, and total number of requests made by every unique IP address, based on Apache usage log.

Logs are sent to CDAP and ingested into the logStream, which stores the log information event in its entirety.

After these events are streamed, they are taken up by the ResponseCounterSpark, which goes through the entries, calculates the total number of responses for every unique response code, and tabulates results in an KeyValueTable dataset, responseCount. The Spark program also computes the total number of requests made by every unique IP address and writes it to TimePartitionedFileSet, reqCount.

In parallel, these events are also taken up by the HitCounterProgram, which goes through the entries, calculates the total number hits for every unique URL and tabulates results in a KeyValueTable dataset, hitCount.

The LogAnalysisWorkflow ties the Spark and MapReduce programs to run in parallel.

Once the application completes, you can query the responseCount dataset by using the rescount endpoint of the ResponseCounterService. It will send back a string result with the total number of responses on the rescount query parameter. You can also query the hitCount dataset by using the url endpoint of the HitCounterService. It will send the total number of hits for the queried url. You can query the reqCount TimePartitionedFileSet by using the reqcount endpoint of the RequestCounterService which will return a set of all available partitions. Using one of partitions from the above set, you can query for the total number of requests made by every unique IP address in last 60 minutes. The reqfile endpoint of the RequestCounterService returns a map of IP addresses to the total number of requests made by them.

Let's look at some of these components, and then run the application and see the results.

The LogAnalysis Application

As in the other examples, the components of the application are tied together by the class LogAnalysisApp:

public class LogAnalysisApp extends AbstractApplication {

  public static final String LOG_STREAM = "logStream";
  public static final String HIT_COUNTER_SERVICE = "HitCounterService";
  public static final String RESPONSE_COUNTER_SERVICE = "ResponseCounterService";
  public static final String REQUEST_COUNTER_SERVICE = "RequestCounterService";
  public static final String RESPONSE_COUNT_STORE = "responseCount";
  public static final String HIT_COUNT_STORE = "hitCount";
  public static final String REQ_COUNT_STORE = "reqCount";

  @Override
  public void configure() {
    setDescription("CDAP Log Analysis App");

    // A stream to ingest log data
    addStream(new Stream(LOG_STREAM));

    // A Spark and MapReduce for processing log data
    addSpark(new ResponseCounterSpark());
    addMapReduce(new HitCounterProgram());

    addWorkflow(new LogAnalysisWorkflow());

    // Services to query for result
    addService(HIT_COUNTER_SERVICE, new HitCounterServiceHandler());
    addService(RESPONSE_COUNTER_SERVICE, new ResponseCounterHandler());
    addService(REQUEST_COUNTER_SERVICE, new RequestCounterHandler());

    // Datasets to store output after processing
    createDataset(RESPONSE_COUNT_STORE, KeyValueTable.class,
                  DatasetProperties.builder().setDescription("Store response counts").build());
    createDataset(HIT_COUNT_STORE, KeyValueTable.class,
                  DatasetProperties.builder().setDescription("Store hit counts").build());
    createDataset(REQ_COUNT_STORE, TimePartitionedFileSet.class, FileSetProperties.builder()
      .setOutputFormat(TextOutputFormat.class)
      .setOutputProperty(TextOutputFormat.SEPERATOR, ":")
      .setDescription("Store request counts").build());
  }
. . .

The hitCount and responseCount KeyValueTables and reqCount TimePartitionedFileSet

The calculated hit count for every unique URL is stored in a KeyValueTable dataset, hitCount and the total number of responses for a response code is stored in another KeyValueTable dataset, responseCount. The total number of requests made by every unique IP address is written to a TimePartitionedFileSet, ipCount.

The HitCounterService, ResponseCounterService, and RequestCounterService

These services provide convenient endpoints:

  • HitCounterService: hitcount endpoint to obtain the total number of hits for a given URL;
  • ResponseCounterService: rescount endpoint to obtain the total number of responses for a given response code;
  • RequestCounterService: reqcount endpoint to obtain a set of all the available partitions in the TimePartitionedFileSet; and
  • RequestCounterService: reqfile endpoint to retrieve data from a particular partition.

Building and Starting

  • You can build the example as described in Building an Example Application.

  • Start CDAP (as described in Starting and Stopping CDAP).

  • Deploy the application, as described in Deploying an Application. For example, from the CDAP Local Sandbox home directory, use the Command Line Interface (CLI):

    $ cdap cli load artifact examples/LogAnalysis/target/LogAnalysis-5.0.0.jar
    
    Successfully added artifact with name 'LogAnalysis'
    
    $ cdap cli create app LogAnalysis LogAnalysis 5.0.0 user
    
    Successfully created application
    
    > cdap cli load artifact examples\LogAnalysis\target\LogAnalysis-5.0.0.jar
    
    Successfully added artifact with name 'LogAnalysis'
    
    > cdap cli create app LogAnalysis LogAnalysis 5.0.0 user
    
    Successfully created application
    
  • Once the application has been deployed, you can start its components, as described in Starting an Application, and detailed at the start of running the example.

  • Once all components are started, run the example.

  • When finished, you can stop and remove the application.

Running the Example

Starting the Services

  • Using the CDAP UI, go to the LogAnalysis application overview page, programs tab, click HitCounterService to get to the service detail page, then click the Start button, and then do the same for the RequestCounterService and ResponseCounterService services; or

  • From the CDAP Local Sandbox home directory, use the Command Line Interface:

    $ cdap cli start service LogAnalysis.HitCounterService
    $ cdap cli start service LogAnalysis.RequestCounterService
    $ cdap cli start service LogAnalysis.ResponseCounterService
    
    Successfully started service 'HitCounterService' of application 'LogAnalysis' with stored runtime arguments '{}'
    Successfully started service 'RequestCounterService' of application 'LogAnalysis' with stored runtime arguments '{}'
    Successfully started service 'ResponseCounterService' of application 'LogAnalysis' with stored runtime arguments '{}'
    
    > cdap cli start service LogAnalysis.HitCounterService
    > cdap cli start service LogAnalysis.RequestCounterService
    > cdap cli start service LogAnalysis.ResponseCounterService
    
    Successfully started service 'HitCounterService' of application 'LogAnalysis' with stored runtime arguments '{}'
    Successfully started service 'RequestCounterService' of application 'LogAnalysis' with stored runtime arguments '{}'
    Successfully started service 'ResponseCounterService' of application 'LogAnalysis' with stored runtime arguments '{}'
    
  • Or, you can send curl requests to CDAP:

    $ curl -w"\n" -X POST "http://localhost:11015/v3/namespaces/default/apps/LogAnalysis/services/HitCounterService/start"
    $ curl -w"\n" -X POST "http://localhost:11015/v3/namespaces/default/apps/LogAnalysis/services/RequestCounterService/start"
    $ curl -w"\n" -X POST "http://localhost:11015/v3/namespaces/default/apps/LogAnalysis/services/ResponseCounterService/start"
    
    > curl -X POST "http://localhost:11015/v3/namespaces/default/apps/LogAnalysis/services/HitCounterService/start"
    > curl -X POST "http://localhost:11015/v3/namespaces/default/apps/LogAnalysis/services/RequestCounterService/start"
    > curl -X POST "http://localhost:11015/v3/namespaces/default/apps/LogAnalysis/services/ResponseCounterService/start"
    

Injecting Access Logs

Inject a file of Apache access log to the stream logStream by running this command from the CDAP Sandbox home directory, using the Command Line Interface:

$ cdap cli load stream logStream examples/LogAnalysis/resources/apache.accesslog "text/plain"
Successfully loaded file to stream 'logStream'
> cdap cli load stream logStream examples\LogAnalysis\resources\apache.accesslog "text\plain"
Successfully loaded file to stream 'logStream'

Starting the Workflow

  • Using the CDAP UI, go to the LogAnalysis application overview page, programs tab, click LogAnalysisWorkflow to get to the workflow detail page, then click the Start button; or

  • From the CDAP Local Sandbox home directory, use the Command Line Interface:

    $ cdap cli start workflow LogAnalysis.LogAnalysisWorkflow
    
    Successfully started workflow 'LogAnalysisWorkflow' of application 'LogAnalysis' with stored runtime arguments '{}'
    
    > cdap cli start workflow LogAnalysis.LogAnalysisWorkflow
    
    Successfully started workflow 'LogAnalysisWorkflow' of application 'LogAnalysis' with stored runtime arguments '{}'
    
  • Or, send a query via an HTTP request using the curl command:

    $ curl -w"\n" -X POST "http://localhost:11015/v3/namespaces/default/apps/LogAnalysis/workflows/LogAnalysisWorkflow/start"
    
    > curl -X POST "http://localhost:11015/v3/namespaces/default/apps/LogAnalysis/workflows/LogAnalysisWorkflow/start"
    

Querying the Results

  • To query the hitCount KeyValueTable through the HitCounterService, send a query using the Command Line Interface. For example:

    $ cdap cli call service LogAnalysis.HitCounterService POST "hitcount" body '{"url":"/index.html"}'
    
    > cdap cli call service LogAnalysis.HitCounterService POST "hitcount" body '{"url":"\index.html"}'
    

    You can also use the curl command and an HTTP request:

    $ curl -w"\n" -X POST -d '{"url":"/index.html"}' "http://localhost:11015/v3/namespaces/default/apps/LogAnalysis/services/HitCounterService/methods/hitcount"
    
    > curl -X POST -d "{\"url\":\"/index.html\"}' \"http://localhost:11015/v3/namespaces/default/apps/LogAnalysis/services/HitCounterService/methods/hitcount\"
    

    On success, this command will return the hit count for the above URL, such as 4.

  • Similarly, to query the responseCount KeyValueTable through the ResponseCounterService, the reqCount TimePartitionedFileSet through the RequestCounterService, and to retrieve data from a particular partition of the TimePartitionedFileSet, use either the Command Line Interface or curl:

    $ cdap cli call service LogAnalysis.ResponseCounterService GET "rescount/200"
    
    $ curl -w"\n" -X GET "http://localhost:11015/v3/namespaces/default/apps/LogAnalysis/services/ResponseCounterService/methods/rescount/200"
    
    > cdap cli call service LogAnalysis.ResponseCounterService GET "rescount\200"
    
    > curl -X GET "http://localhost:11015/v3/namespaces/default/apps/LogAnalysis/services/ResponseCounterService/methods/rescount/200"
    

    On success, this command will return the total number of responses sent with the queried response code, 30.

  • To query the set of all the available partitions, use either of these commands:

    $ cdap cli call service LogAnalysis.RequestCounterService GET "reqcount"
    
    $ curl -w"\n" "http://localhost:11015/v3/namespaces/default/apps/LogAnalysis/services/RequestCounterService/methods/reqcount"
    
    > cdap cli call service LogAnalysis.RequestCounterService GET "reqcount"
    
    > curl "http://localhost:11015/v3/namespaces/default/apps/LogAnalysis/services/RequestCounterService/methods/reqcount"
    

    A possible successful response:

    ["7/29/15 7:47 PM"...]
    
  • To return a map of all the unique IP addresses with the number of requests made by them, use one of the available partitions, such as the one returned in the previous command, "7/29/15 7:47 PM":

    $ cdap cli call service LogAnalysis.RequestCounterService POST "reqfile" body '{"time":"7/29/15 7:47 PM"}'
    
    $ curl -w"\n" -X POST -d '{"time":"7/29/15 7:47 PM"}' \
    "http://localhost:11015/v3/namespaces/default/apps/LogAnalysis/services/RequestCounterService/methods/reqfile"
    
    > cdap cli call service LogAnalysis.RequestCounterService POST "reqfile" body '{"time":"7\29\15 7:47 PM"}'
    
    > curl -X POST -d "{\"time\":\"7/29/15 7:47 PM\"}" ^
    "http://localhost:11015/v3/namespaces/default/apps/LogAnalysis/services/RequestCounterService/methods/reqfile"
    

    A possible successful response:

    {"255.255.255.109":1,255.255.255.121":1,"255.255.255.211":1...}
    

Stopping and Removing the Application

Once done, you can stop the application—if it hasn't stopped already—as described in Stopping an Application. Here is an example-specific description of the steps:

Stopping the Workflow

  • Using the CDAP UI, go to the LogAnalysis application overview page, programs tab, click LogAnalysisWorkflow to get to the workflow detail page, then click the Stop button; or

  • From the CDAP Local Sandbox home directory, use the Command Line Interface:

    $ cdap cli stop workflow LogAnalysis.LogAnalysisWorkflow
    
    Successfully stopped workflow 'LogAnalysisWorkflow' of application 'LogAnalysis'
    
    > cdap cli stop workflow LogAnalysis.LogAnalysisWorkflow
    
    Successfully stopped workflow 'LogAnalysisWorkflow' of application 'LogAnalysis'
    

Stopping the Services

  • Using the CDAP UI, go to the LogAnalysis application overview page, programs tab, click HitCounterService to get to the service detail page, then click the Stop button, and then do the same for the RequestCounterService and ResponseCounterService services; or

  • From the CDAP Local Sandbox home directory, use the Command Line Interface:

    $ cdap cli stop service LogAnalysis.HitCounterService
    $ cdap cli stop service LogAnalysis.RequestCounterService
    $ cdap cli stop service LogAnalysis.ResponseCounterService
    
    > cdap cli stop service LogAnalysis.HitCounterService
    > cdap cli stop service LogAnalysis.RequestCounterService
    > cdap cli stop service LogAnalysis.ResponseCounterService
    

Removing the Application

You can now remove the application as described in Removing an Application, or:

  • Using the CDAP UI, go to the LogAnalysis application overview page, programs tab, click the Actions menu on the right side and select Manage to go to the Management pane for the application, then click the Actions menu on the right side and select Delete to delete the application; or

  • From the CDAP Local Sandbox home directory, use the Command Line Interface:

    $ cdap cli delete app LogAnalysis
    
    > cdap cli delete app LogAnalysis