🔗Wikipedia Pipeline

A Cask Data Application Platform (CDAP) example demonstrating a typical batch data processing pipeline using CDAP Workflows.

🔗Overview

This example demonstrates a CDAP application performing analysis on Wikipedia data using MapReduce and Spark programs running within a CDAP Workflow: WikipediaPipelineWorkflow.

This example can be run in both online and offline modes.

  • In the online mode, the MapReduceProgram WikipediaDataDownloader reads the stream pageTitleStream, each event of which is an element from the output of the Facebook "Likes" API. For each event, it downloads Wikipedia data for the page using the MediaWiki Wikipedia API. It stores the downloaded data in the KeyValueTable dataset wikiData.
  • In the offline mode, it expects Wikipedia data formatted following the output of the MediaWiki API in the stream wikiStream. The MapReduce program wikiDataToDataset consumes this stream and stores it in the same KeyValueTable dataset wikiData. Data can be uploaded to the wikiStream using the CDAP CLI.

Once raw Wikipedia data is available from using either the online or offline modes, the WikipediaPipelineWorkflow runs a MapReduce program WikiContentValidatorAndNormalizer that filters bad records from the raw data, as well as normalizes it by converting the wikitext-formatted data to plain text. It then stores the output in another KeyValueTable dataset called normalized.

The WikipediaPipelineWorkflow then contains a fork, with two branches:

  • One branch runs the Apache Spark program SparkWikipediaClustering. This program can be configured to either run Latent Dirichlet Allocation (LDA), or K-Means. The algorithm can be chosen by setting the field clusteringAlgorithm in an application config. By default, the workflow is configured to use LDA, if the clusteringAlgorithm field is not specified. This program consumes normalized data and runs clustering on it using the configured algorithm. It stores its output in the CDAP Table dataset clustering, with one row for each iteration, and a column per topic containing the score for that topic.
  • The other branch contains a MapReduce program TopNMapReduce that consumes the normalized data and produces the top "N" words in the dataset topn.

Let's look at some of these components, and then run the application and see the results.

🔗The WikipediaPipeline Application

As in the other examples, the components of the application are tied together by the class WikipediaPipelineApp:

/**
 * App to demonstrate a data pipeline that processes Wikipedia data using a CDAP Workflow.
 */
public class WikipediaPipelineApp extends AbstractApplication<WikipediaPipelineApp.WikipediaAppConfig> {
  static final String PAGE_TITLES_STREAM = "pageTitleStream";
  static final String RAW_WIKIPEDIA_STREAM = "wikiStream";
  static final String PAGE_TITLES_DATASET = "pages";
  static final String RAW_WIKIPEDIA_DATASET = "wikidata";
  static final String NORMALIZED_WIKIPEDIA_DATASET = "normalized";
  static final String SPARK_CLUSTERING_OUTPUT_DATASET = "clustering";
  static final String MAPREDUCE_TOPN_OUTPUT = "topn";
  static final String LIKES_TO_DATASET_MR_NAME = "LikesToDataset";
  static final String WIKIPEDIA_TO_DATASET_MR_NAME = "WikiDataToDataset";

  @Override
  public void configure() {
    addStream(new Stream(PAGE_TITLES_STREAM));
    addStream(new Stream(RAW_WIKIPEDIA_STREAM));
    addMapReduce(new StreamToDataset(LIKES_TO_DATASET_MR_NAME));
    addMapReduce(new StreamToDataset(WIKIPEDIA_TO_DATASET_MR_NAME));
    addMapReduce(new WikipediaDataDownloader());
    addMapReduce(new WikiContentValidatorAndNormalizer());
    addMapReduce(new TopNMapReduce());
    addSpark(new SparkWikipediaClustering(getConfig()));
    createDataset(PAGE_TITLES_DATASET, KeyValueTable.class,
                  DatasetProperties.builder().setDescription("Page titles dataset").build());
    createDataset(RAW_WIKIPEDIA_DATASET, KeyValueTable.class,
                  DatasetProperties.builder().setDescription("Raw Wikipedia dataset").build());
    createDataset(NORMALIZED_WIKIPEDIA_DATASET, KeyValueTable.class,
                  DatasetProperties.builder().setDescription("Normalized Wikipedia dataset").build());
    createDataset(SPARK_CLUSTERING_OUTPUT_DATASET, Table.class,
                  DatasetProperties.builder().setDescription("Spark clustering output dataset").build());
    createDataset(MAPREDUCE_TOPN_OUTPUT, KeyValueTable.class,
                  DatasetProperties.builder().setDescription("MapReduce top-'N'-words output dataset").build());
    addWorkflow(new WikipediaPipelineWorkflow(getConfig()));
    addService(new WikipediaService());
  }

  /**
   * Config for Wikipedia App.
   */
  public static class WikipediaAppConfig extends Config {

    @Nullable
    public final String clusteringAlgorithm;

    public WikipediaAppConfig() {
      this(null);
    }

    public WikipediaAppConfig(@Nullable String clusteringAlgorithm) {
      this.clusteringAlgorithm = clusteringAlgorithm == null ? "lda" : clusteringAlgorithm;
    }
  }
}

This application demonstrates:

  • The use of assigning unique names, as the same MapReduce (StreamToDataset) is used twice in the workflow (WikipediaPipelineWorkflow) under two different names. Also, depending on the chosen clusteringAlgorithm, the name of the SparkWikipediaClustering will either be SparkWikipediaClustering-LDA or SparkWikipediaClustering-KMEANS.
  • The use of Workflow Tokens in:
    • Condition Predicates
    • Setting MapReduce program configuration (setting it based on values in the token)
    • map() and reduce() functions (read-only, no updates)
    • Spark Programs (reading from—and writing to—the workflow token; adding Spark Accumulators to the workflow token)
    • Assertions in application unit tests
  • The use of application configs to create—from the same artifact—different applications. Depending on the value chosen for the clusteringAlgorithm, there can be two different applications, one using LDA for clustering, and the other using K-Means. The application is packaged with the two possible application config JSON files at resources/wikipedia-kmeans.json and resources/wikipedia-lda.json in the application directory.

🔗Building and Starting

  • You can build the example as described in Building an Example Application

  • Start CDAP (as described in Starting and Stopping CDAP).

  • Deploy the application, as described in Deploying an Application. For example, from the Standalone CDAP SDK directory, use the Command Line Interface (CLI):

    $ cdap cli load artifact examples/WikipediaPipeline/target/WikipediaPipeline-4.1.1.jar
    
    Successfully added artifact with name 'WikipediaPipeline'
    
    $ cdap cli create app WikipediaPipeline WikipediaPipeline 4.1.1 user
    
    Successfully created application
    
    > cdap cli load artifact examples\WikipediaPipeline\target\WikipediaPipeline-4.1.1.jar
    
    Successfully added artifact with name 'WikipediaPipeline'
    
    > cdap cli create app WikipediaPipeline WikipediaPipeline 4.1.1 user
    
    Successfully created application
    
  • Once the application (WikipediaPipeline) has been deployed, you can start its components, as described in general in Starting an Application, and detailed for this example in deploying the example.

  • Once all components are deployed, run the example.

  • When finished, you can stop and remove the application.

🔗Deploying the Example

After deploying the WikipediaPipeline application, the Apache Spark program SparkWikipediaClustering needs to be loaded as the WikipediaPipelineApp and two versions of it be created.

Since deploying the WikipediaPipelineApp involves loading an artifact and creating two applications from it, the preferred method of deploying it is to use the CDAP CLI.

  • Load the Artifact:
$ cdap cli load artifact examples/WikipediaPipeline/target/WikipediaPipeline-4.1.1.jar name WikipediaPipelineApp version 4.1.1

Successfully added artifact with name 'WikipediaPipelineApp'
> cdap cli load artifact examples\WikipediaPipeline\target\WikipediaPipeline-4.1.1.jar name WikipediaPipelineApp version 4.1.1

Successfully added artifact with name 'WikipediaPipelineApp'
  • Create an application using LDA as the clustering algorithm:
$ cdap cli create app wiki-lda WikipediaPipelineApp 4.1.1 user examples/WikipediaPipeline/resources/wikipedia-lda.json

Successfully created application
> cdap cli create app wiki-lda WikipediaPipelineApp 4.1.1 user examples\WikipediaPipeline\resources\wikipedia-lda.json

Successfully created application
  • Create an application using K-Means as the clustering algorithm:
$ cdap cli create app wiki-kmeans WikipediaPipelineApp 4.1.1 user examples/WikipediaPipeline/resources/wikipedia-kmeans.json

Successfully created application
> cdap cli create app wiki-kmeans WikipediaPipelineApp 4.1.1 user examples\WikipediaPipeline\resources\wikipedia-kmeans.json

Successfully created application

🔗Running the Example

🔗Injecting data

The pageTitleStream consumes events in the format returned by the Facebook "Likes" Graph API.

  • Inject a file of Facebook "Likes" data to the stream pageTitleStream by running this command from the Standalone CDAP SDK directory, using the CDAP Command Line Interface:

    $ cdap cli load stream pageTitleStream examples/WikipediaPipeline/resources/fb-likes-data.txt
    
    Successfully loaded file to stream 'pageTitleStream'
    
    > cdap cli load stream pageTitleStream examples\WikipediaPipeline\resources\fb-likes-data.txt
    
    Successfully loaded file to stream 'pageTitleStream'
    

    The wikiStream consumes events in the format returned by the MediaWiki Wikipedia API.

  • Inject a file of "Wikipedia" data to the stream wikiStream by running this command from the Standalone CDAP SDK directory, using the Command Line Interface:

    $ cdap cli load stream wikiStream examples/WikipediaPipeline/resources/wikipedia-data.txt
    
    Successfully loaded file to stream 'wikiStream'
    
    > cdap cli load stream wikiStream examples\WikipediaPipeline\resources\wikipedia-data.txt
    
    Successfully loaded file to stream 'wikiStream'
    

🔗Starting the Workflow

  • Using the CDAP UI, go to the WikipediaPipeline application overview page, programs tab, click WikipediaPipelineWorkflow to get to the workflow detail page, then click the Start button; or

  • From the Standalone CDAP SDK directory, use the Command Line Interface:

    $ cdap cli start workflow WikipediaPipeline.WikipediaPipelineWorkflow
    
    Successfully started workflow 'WikipediaPipelineWorkflow' of application 'WikipediaPipeline' with stored runtime arguments '{}'
    
    > cdap cli start workflow WikipediaPipeline.WikipediaPipelineWorkflow
    
    Successfully started workflow 'WikipediaPipelineWorkflow' of application 'WikipediaPipeline' with stored runtime arguments '{}'
    
  • Or, send a query via an HTTP request using the curl command:

    $ curl -w"\n" -X POST "http://localhost:11015/v3/namespaces/default/apps/WikipediaPipeline/workflows/WikipediaPipelineWorkflow/start"
    
    > curl -X POST "http://localhost:11015/v3/namespaces/default/apps/WikipediaPipeline/workflows/WikipediaPipelineWorkflow/start"
    

These runtime arguments can be set for the WikipediaPipelineWorkflow:

  • min.pages.threshold: Threshold for the number of pages to exist in the pageTitleStream for the workflow to proceed. Defaults to 10.
  • mode: Set this to 'online' when you wish to download Wikipedia data over the Internet. Defaults to 'offline', in which case the workflow expects Wikipedia data to be in the wikiStream.
  • stopwords.file: The path to the file containing stopwords to filter in the SparkWikipediaAnalyzer program. If unspecified, no words are considered as stopwords.
  • vocab.size: The size of the vocabulary for the SparkWikipediaAnalyzer program. Defaults to 1000.
  • topn.rank: The number of top words to produce in the TopNMapReduce program. Defaults to 10.
  • num.reduce.tasks: The number of reduce tasks to set for the TopNMapReduce program. Defaults to 1.

If you run with the default arguments, you will find that the pipeline starts but then stops after the first node, as the number of pages is less than the min.pages.threshold:

../_images/wikipedia-data-pipeline-1.png

Reduce the number of minimum number of pages (min.pages.threshold) to zero, and change the mode to online, by setting either the runtime arguments (which changes the next run) or the preferences (which changes all subsequent runs):

../_images/wikipedia-data-pipeline-2.png

You can also do this from a terminal:

  • From the Standalone CDAP SDK directory, use the Command Line Interface:

    $ cdap cli start workflow WikipediaPipeline.WikipediaPipelineWorkflow "\"min.pages.threshold=0 mode=online\""
    
    Successfully started workflow 'WikipediaPipelineWorkflow' of application 'WikipediaPipeline'
    with provided runtime arguments 'min.pages.threshold=0 mode=online'
    
    > cdap cli start workflow WikipediaPipeline.WikipediaPipelineWorkflow "\"min.pages.threshold=0 mode=online\""
    
    Successfully started workflow 'WikipediaPipelineWorkflow' of application 'WikipediaPipeline'
    with provided runtime arguments 'min.pages.threshold=0 mode=online'
    
  • Or, send a query via an HTTP request using the curl command:

    $ curl -w"\n" -X POST -d '{"min.pages.threshold":"0", "mode":"online"}' \
    "http://localhost:11015/v3/namespaces/default/apps/WikipediaPipeline/workflows/WikipediaPipelineWorkflow/start"
    
    > curl -X POST -d "{\"min.pages.threshold\":\"0\", \"mode\":\"online\"}" ^
    "http://localhost:11015/v3/namespaces/default/apps/WikipediaPipeline/workflows/WikipediaPipelineWorkflow/start"
    

Once the pipeline has run through to the end (below), you can start the service and query the results.

../_images/wikipedia-data-pipeline-3.png

🔗Starting the Service

  • Using the CDAP UI, go to the WikipediaPipeline application overview page, programs tab, click WikipediaService to get to the service detail page, then click the Start button; or

  • From the Standalone CDAP SDK directory, use the Command Line Interface:

    $ cdap cli start service WikipediaPipeline.WikipediaService
    
    Successfully started service 'WikipediaService' of application 'WikipediaPipeline' with stored runtime arguments '{}'
    
    > cdap cli start service WikipediaPipeline.WikipediaService
    
    Successfully started service 'WikipediaService' of application 'WikipediaPipeline' with stored runtime arguments '{}'
    

🔗Retrieving the Results

The WikipediaService can retrieve results from the analysis performed by the WikipediaPipelineWorkflow. The service exposes these REST APIs, which can be accessed either with the CDAP CLI or curl.

  • Retrieve the list of topics generated by the SparkWikipediaAnalyzer program:

    $ cdap cli call service WikipediaPipeline.WikipediaService GET /v1/functions/lda/topics
    
    $ curl -w"\n" -X GET "localhost:11015/v3/namespaces/default/apps/WikipediaPipeline/services/WikipediaService/methods/v1/functions/lda/topics"
    
    [0,1,2,3,4,5,6,7,8,9]
    
    > cdap cli call service WikipediaPipeline.WikipediaService GET \v1\functions\lda\topics
    
    > curl -X GET "localhost:11015/v3/namespaces/default/apps/WikipediaPipeline/services/WikipediaService/methods/v1/functions/lda/topics"
    
    [0,1,2,3,4,5,6,7,8,9]
    
  • Retrieve the details (terms and term weights) for a given (integer) topic:

    $ cdap cli call service WikipediaPipeline.WikipediaService GET /v1/functions/lda/topics/0
    
    $ curl -w"\n" -X GET "localhost:11015/v3/namespaces/default/apps/WikipediaPipeline/services/WikipediaService/methods/v1/functions/lda/topics/0"
    
    [{"name":"and","weight":0.038682279584092004},{"name":"company","weight":0.011716155714206075},
    {"name":"facebook","weight":0.03279816812913312},{"name":"for","weight":0.0236260327332555},
    {"name":"google","weight":0.03240608486488011},{"name":"its","weight":0.01541806996121385},
    {"name":"that","weight":0.032277216101403945},{"name":"the","weight":0.08955250785732792},
    {"name":"users","weight":0.013512787321319556},{"name":"was","weight":0.014201825107197289}]
    
    > cdap cli call service WikipediaPipeline.WikipediaService GET \v1\functions\lda\topics\0
    
    > curl -X GET "localhost:11015/v3/namespaces/default/apps/WikipediaPipeline/services/WikipediaService/methods/v1/functions/lda/topics/0"
    
    [{"name":"and","weight":0.038682279584092004},{"name":"company","weight":0.011716155714206075},
    {"name":"facebook","weight":0.03279816812913312},{"name":"for","weight":0.0236260327332555},
    {"name":"google","weight":0.03240608486488011},{"name":"its","weight":0.01541806996121385},
    {"name":"that","weight":0.032277216101403945},{"name":"the","weight":0.08955250785732792},
    {"name":"users","weight":0.013512787321319556},{"name":"was","weight":0.014201825107197289}]
    
  • Retrieve the output of the TopNMapReduce program:

    $ cdap cli call service WikipediaPipeline.WikipediaService GET /v1/functions/topn/words
    
    $ curl -w"\n" -X GET "localhost:11015/v3/namespaces/default/apps/WikipediaPipeline/services/WikipediaService/methods/v1/functions/topn/words"
    
    [{"The":627},{"a":1466},{"and":1844},{"in":1415},{"of":2076},{"on":604},{"that":644},{"the":3857},{"to":1620},{"was":740}]
    
    > cdap cli call service WikipediaPipeline.WikipediaService GET \v1\functions\topn\words
    
    > curl -X GET "localhost:11015/v3/namespaces/default/apps/WikipediaPipeline/services/WikipediaService/methods/v1/functions/topn/words"
    
    [{"The":627},{"a":1466},{"and":1844},{"in":1415},{"of":2076},{"on":604},{"that":644},{"the":3857},{"to":1620},{"was":740}]
    

🔗Stopping and Removing the Application

Once done, you can stop the application—if it hasn't stopped already—as described in Stopping an Application. Here is an example-specific description of the steps:

Stopping the Workflow

  • Using the CDAP UI, go to the WikipediaPipeline application overview page, programs tab, click WikipediaPipelineWorkflow to get to the workflow detail page, then click the Stop button; or

  • From the Standalone CDAP SDK directory, use the Command Line Interface:

    $ cdap cli stop workflow WikipediaPipeline.WikipediaPipelineWorkflow
    
    Successfully stopped workflow 'WikipediaPipelineWorkflow' of application 'WikipediaPipeline'
    
    > cdap cli stop workflow WikipediaPipeline.WikipediaPipelineWorkflow
    
    Successfully stopped workflow 'WikipediaPipelineWorkflow' of application 'WikipediaPipeline'
    

Stopping the Service

  • Using the CDAP UI, go to the WikipediaPipeline application overview page, programs tab, click WikipediaService to get to the service detail page, then click the Stop button; or

  • From the Standalone CDAP SDK directory, use the Command Line Interface:

    $ cdap cli stop service WikipediaPipeline.WikipediaService
    
    Successfully stopped service 'WikipediaService' of application 'WikipediaPipeline'
    
    > cdap cli stop service WikipediaPipeline.WikipediaService
    
    Successfully stopped service 'WikipediaService' of application 'WikipediaPipeline'
    

Removing the Application

You can now remove the application as described in Removing an Application, or:

  • Using the CDAP UI, go to the WikipediaPipeline application overview page, programs tab, click the Actions menu on the right side and select Manage to go to the Management pane for the application, then click the Actions menu on the right side and select Delete to delete the application; or

  • From the Standalone CDAP SDK directory, use the Command Line Interface:

    $ cdap cli delete app WikipediaPipeline
    
    > cdap cli delete app WikipediaPipeline