๐Ÿ”—Spark Page Rank Example

A Cask Data Application Platform (CDAP) example demonstrating Spark and MapReduce in a Workflow to compute page ranks.

๐Ÿ”—Overview

This example demonstrates Spark and MapReduce performing streaming log analysis, computing the page rank based on information about backlink URLs

Data from a sample file is sent to CDAP by a CDAP CLI command to the backlinkURLStream, which stores the URL pair event in its entirety.

After these events are streamed, they are taken up by the SparkPageRankProgram, which goes through the entries, calculates page rank and tabulates results in an ObjectStore dataset, ranks.

A MapReduce job uses the output of the Spark program from the ranks dataset, computes the total number of pages for every unique page rank, and then tabulates the results in another ObjectStore dataset, rankscount.

The PageRankWorkflow ties the Spark and MapReduce to run sequentially in this application.

Once the application completes, you can query the ranks dataset by using the rank endpoint of the SparkPageRankService. It will send back a string result with page rank based on the url query parameter. You can also query the rankscount dataset by using total endpoint. It will send the total number of pages for the queried page rank as a string.

Let's look at some of these components, and then run the application and see the results.

๐Ÿ”—The SparkPageRank Application

As in the other examples, the components of the application are tied together by the class SparkPageRankApp:

/**
 * An Application that calculates page rank of URLs from an input stream.
 */
public class SparkPageRankApp extends AbstractApplication {

  public static final String SERVICE_HANDLERS = "SparkPageRankService";
  public static final String BACKLINK_URL_STREAM = "backlinkURLStream";

  @Override
  public void configure() {
    setName("SparkPageRank");
    setDescription("Spark page rank application.");

    // Ingest data into the Application via a Stream
    addStream(new Stream(BACKLINK_URL_STREAM));

    // Run a Spark program on the acquired data
    addSpark(new PageRankSpark());

    // Runs MapReduce program on data emitted by Spark program
    addMapReduce(new RanksCounter());

    // Runs Spark followed by a MapReduce in a Workflow
    addWorkflow(new PageRankWorkflow());

    // Service to retrieve process data
    addService(SERVICE_HANDLERS, new SparkPageRankServiceHandler());

    // Store input and processed data in ObjectStore Datasets
    try {
      ObjectStores.createObjectStore(getConfigurer(), "ranks", Integer.class,
                                     DatasetProperties.builder().setDescription("Ranks Dataset").build());
      ObjectStores.createObjectStore(getConfigurer(), "rankscount", Integer.class,
                                     DatasetProperties.builder().setDescription("Ranks Count Dataset").build());
    } catch (UnsupportedTypeException e) {
      // This exception is thrown by ObjectStore if its parameter type cannot be
      // (de)serialized (for example, if it is an interface and not a class, then there is
      // no auto-magic way deserialize an object.) In this case that will not happen
      // because String and Double are actual classes.
      throw new RuntimeException(e);
    }
  }
...

๐Ÿ”—The ranks and rankscount ObjectStore Data Storage

The calculated page rank data is stored in an ObjectStore dataset, ranks, with the total number of pages for a page rank stored in an additional ObjectStore dataset, rankscount.

๐Ÿ”—The SparkPageRankService Service

This SparkPageRankService service has a rank endpoint to obtain the page rank of a given URL. It also has a total endpoint to obtain the total number of pages with a given page rank.

๐Ÿ”—Memory Requirements

When a Spark program is running inside a workflow, the memory requirements configured for the Spark program may need increasing beyond the defaults:

setDriverResources(new Resources(1024));
setExecutorResources(new Resources(1024));

๐Ÿ”—Building and Starting

  • You can build the example as described in Building an Example Application

  • Start CDAP (as described in Starting and Stopping CDAP).

  • Deploy the application, as described in Deploying an Application. For example, from the Standalone CDAP SDK directory, use the Command Line Interface (CLI):

    $ cdap cli load artifact examples/SparkPageRank/target/SparkPageRank-4.1.1.jar
    
    Successfully added artifact with name 'SparkPageRank'
    
    $ cdap cli create app SparkPageRank SparkPageRank 4.1.1 user
    
    Successfully created application
    
    > cdap cli load artifact examples\SparkPageRank\target\SparkPageRank-4.1.1.jar
    
    Successfully added artifact with name 'SparkPageRank'
    
    > cdap cli create app SparkPageRank SparkPageRank 4.1.1 user
    
    Successfully created application
    
  • Once the application has been deployed, you can start its components, as described in Starting an Application, and detailed at the start of running the example.

  • Once all components are started, run the example.

  • When finished, you can stop and remove the application.

๐Ÿ”—Running the Example

๐Ÿ”—Starting the Service

  • Using the CDAP UI, go to the SparkPageRank application overview page, programs tab, click SparkPageRankService to get to the service detail page, then click the Start button; or

  • From the Standalone CDAP SDK directory, use the Command Line Interface:

    $ cdap cli start service SparkPageRank.SparkPageRankService
    
    Successfully started service 'SparkPageRankService' of application 'SparkPageRank' with stored runtime arguments '{}'
    
    > cdap cli start service SparkPageRank.SparkPageRankService
    
    Successfully started service 'SparkPageRankService' of application 'SparkPageRank' with stored runtime arguments '{}'
    

๐Ÿ”—Injecting URL Pairs

Inject a file of URL pairs to the stream backlinkURLStream by running this command from the Standalone CDAP SDK directory, using the Command Line Interface:

$ cdap cli load stream backlinkURLStream examples/SparkPageRank/resources/urlpairs.txt

Successfully loaded file to stream 'backlinkURLStream'
> cdap cli load stream backlinkURLStream examples\SparkPageRank\resources\urlpairs.txt

Successfully loaded file to stream 'backlinkURLStream'

๐Ÿ”—Starting the Workflow

The workflow must be started with a runtime argument spark.SparkPageRankProgram.args that specifies the number of iterations. By default, this is 10; in this example, we'll use 3 as the value.

  • Using the CDAP UI, go to the SparkPageRank application overview page, programs tab, click PageRankWorkflow to get to the workflow detail page, set the runtime arguments using spark.SparkPageRankProgram.args as the key and 3 as the value, then click the Start button; or

  • From the Standalone CDAP SDK directory, use the Command Line Interface:

    $ cdap cli start workflow SparkPageRank.PageRankWorkflow "spark.SparkPageRankProgram.args='3'"
    
    Successfully started workflow 'PageRankWorkflow' of application 'SparkPageRank'
    with provided runtime arguments 'spark.SparkPageRankProgram.args=3'
    
    > cdap cli start workflow SparkPageRank.PageRankWorkflow "spark.SparkPageRankProgram.args='3'"
    
    Successfully started workflow 'PageRankWorkflow' of application 'SparkPageRank'
    with provided runtime arguments 'spark.SparkPageRankProgram.args=3'
    
  • Or, send a query via an HTTP request using the curl command:

    $ curl -w"\n" -X POST -d '{spark.SparkPageRankProgram.args="3"}' \
    "http://localhost:11015/v3/namespaces/default/apps/SparkPageRank/workflows/PageRankWorkflow/start"
    
    > curl -X POST -d "{spark.SparkPageRankProgram.args=\"3\"}' ^
    \"http://localhost:11015/v3/namespaces/default/apps/SparkPageRank/workflows/PageRankWorkflow/start\"
    

๐Ÿ”—Querying the Results

To query the ranks ObjectStore through the SparkPageRankService, you can use the Command Line Interface:

$ cdap cli call service SparkPageRank.SparkPageRankService POST "rank" body "{'url':'http://example.com/page1'}"

10
> cdap cli call service SparkPageRank.SparkPageRankService POST "rank" body "{'url':'http:\\example.com\page1'}"

10

You can also send a query via an HTTP request using the curl command. For example:

$ curl -w"\n" -X POST -d '{"url":"http://example.com/page1"}' \
"http://localhost:11015/v3/namespaces/default/apps/SparkPageRank/services/SparkPageRankService/methods/rank"

10
> curl -X POST -d "{\"url\":\"http://example.com/page1\"}' ^
\"http://localhost:11015/v3/namespaces/default/apps/SparkPageRank/services/SparkPageRankService/methods/rank\"

10

Similarly, to query the rankscount ObjectStore using the SparkPageRankService and get the total number of pages with a page rank of 10, you can do the following:

Using the Command Line Interface:

$ cdap cli call service SparkPageRank.SparkPageRankService GET 'total/10'

48
> cdap cli call service SparkPageRank.SparkPageRankService GET 'total\10'

48

Using curl:

$ curl -w"\n" -X GET "http://localhost:11015/v3/namespaces/default/apps/SparkPageRank/services/SparkPageRankService/methods/total/10"

48
> curl -X GET "http://localhost:11015/v3/namespaces/default/apps/SparkPageRank/services/SparkPageRankService/methods/total/10"

48

๐Ÿ”—Stopping and Removing the Application

Once done, you can stop the applicationโ€”if it hasn't stopped alreadyโ€”as described in Stopping an Application. Here is an example-specific description of the steps:

Stopping the Workflow

  • Using the CDAP UI, go to the SparkPageRank application overview page, programs tab, click PageRankWorkflow to get to the workflow detail page, then click the Stop button; or

  • From the Standalone CDAP SDK directory, use the Command Line Interface:

    $ cdap cli stop workflow SparkPageRank.PageRankWorkflow
    
    Successfully stopped workflow 'PageRankWorkflow' of application 'SparkPageRank'
    
    > cdap cli stop workflow SparkPageRank.PageRankWorkflow
    
    Successfully stopped workflow 'PageRankWorkflow' of application 'SparkPageRank'
    

Stopping the Service

  • Using the CDAP UI, go to the SparkPageRank application overview page, programs tab, click SparkPageRankService to get to the service detail page, then click the Stop button; or

  • From the Standalone CDAP SDK directory, use the Command Line Interface:

    $ cdap cli stop service SparkPageRank.SparkPageRankService
    
    Successfully stopped service 'SparkPageRankService' of application 'SparkPageRank'
    
    > cdap cli stop service SparkPageRank.SparkPageRankService
    
    Successfully stopped service 'SparkPageRankService' of application 'SparkPageRank'
    

Removing the Application

You can now remove the application as described in Removing an Application, or:

  • Using the CDAP UI, go to the SparkPageRank application overview page, programs tab, click the Actions menu on the right side and select Manage to go to the Management pane for the application, then click the Actions menu on the right side and select Delete to delete the application; or

  • From the Standalone CDAP SDK directory, use the Command Line Interface:

    $ cdap cli delete app SparkPageRank
    
    > cdap cli delete app SparkPageRank