🔗Word Count

A Cask Data Application Platform (CDAP) example demonstrating flows, datasets, services, and configuring an application at deployment time.

🔗Overview

This application receives words and sentences from a stream and uses flowlets in a flow to process the sentences and store the results and statistics in datasets.

  • The wordStream receives sentences, one event at a time.
  • The splitter flowlet reads sentences from stream and splits them into words, writes global statistics of the received words such as "total words received" and "total length of words received" and emits each word to the counter flowlet and each sentence (list of words) to the associator flowlet.
  • The associator flowlet receives the set of words and writes word associations to the wordAssocs dataset. For example, if we receive a sentence "Welcome to CDAP", the word associations are {"Welcome", "to"} , {"Welcome", "CDAP"}, and {"to", "CDAP"}.
  • The counter flowlet receives a word, increments the count for the word—maintained in a key-value table—and forwards the word to the unique flowlet.
  • The unique flowlet receives a word and updates the uniqueCount table, if it is seeing this word for the first time.

Let's look at some of these components, and then run the application and see the results.

🔗The Word Count Application

As in the other examples, the components of the application are tied together by the class WordCount:

public class WordCount extends AbstractApplication<WordCount.WordCountConfig> {

  /**
   * Word Count Application's configuration class.
   */
  public static class WordCountConfig extends Config {
    private String stream;
    private String wordStatsTable;
    private String wordCountTable;
    private String uniqueCountTable;
    private String wordAssocTable;

    /**
     * Set default values for the configuration variables.
     */
    public WordCountConfig() {
      this.stream = "wordStream";
      this.wordStatsTable = "wordStats";
      this.wordCountTable = "wordCounts";
      this.uniqueCountTable = "uniqueCount";
      this.wordAssocTable = "wordAssocs";
    }

    /**
     * Used only for unit testing.
     */
    public WordCountConfig(String stream, String wordStatsTable, String wordCountTable, String uniqueCountTable,
                           String wordAssocTable) {
      this.stream = stream;
      this.wordStatsTable = wordStatsTable;
      this.wordCountTable = wordCountTable;
      this.uniqueCountTable = uniqueCountTable;
      this.wordAssocTable = wordAssocTable;
    }

    public String getStream() {
      return stream;
    }

    public String getWordStatsTable() {
      return wordStatsTable;
    }

    public String getWordCountTable() {
      return wordCountTable;
    }

    public String getUniqueCountTable() {
      return uniqueCountTable;
    }

    public String getWordAssocTable() {
      return wordAssocTable;
    }
  }

  @Override
  public void configure() {
    WordCountConfig config = getConfig();
    setName("WordCount");
    setDescription("Example word count application");

    // Ingest data into the Application via Streams
    addStream(new Stream(config.getStream()));

    // Store processed data in Datasets
    createDataset(config.getWordStatsTable(), Table.class,
                  TableProperties.builder()
                    .setReadlessIncrementSupport(true)
                    .setDescription("Stats of total counts and lengths of words")
                    .build());
    createDataset(config.getWordCountTable(), KeyValueTable.class,
                  DatasetProperties.builder().setDescription("Words and corresponding counts").build());
    createDataset(config.getUniqueCountTable(), UniqueCountTable.class,
                  DatasetProperties.builder().setDescription("Total count of unique words").build());
    createDataset(config.getWordAssocTable(), AssociationTable.class,
                  DatasetProperties.builder().setDescription("Word associations table").build());

    // Process events in real-time using Flows
    addFlow(new WordCounter(config));

    // Retrieve the processed data using a Service
    addService(new RetrieveCounts(config));
  }
}

🔗Data Storage

The application uses these datasets by default:

  • wordStatsTable stores the global statistics of total count of words and the total length of words received.
  • wordCountTable stores the word and the corresponding count in a key value table.
  • uniqueCountTable is a custom dataset that stores the total count of unique words received so far.
  • wordAssocTable is a custom dataset that stores the count for word associations.

However, the names of these datasets can be configured to be different from their defaults by providing a configuration at application deployment time. All programs rely on this configuration to instantiate their datasets at runtime.

🔗RetrieveCounts Service

The service serves read requests for calculated statistics, word counts, and associations. It exposes these endpoints:

  • /stats returns the total number of words, the number of unique words, and the average word length.
  • /count/{word}?limit={limit} returns the word count of a specified word and its word associations, up to a specified limit or, if not specified, the default limit of ten.
  • /counts returns the counts for all words in the input, with the request body expected to contain a comma-separated list of words.
  • /multicounts returns the counts for all words in the input, with the request body expected to contain a comma-separated list of words. It differs from the /counts endpoint in that it uses a KeyValueTable to perform a batched read.
  • /assoc/{word1}/{word2} returns the count of associations for a specific word pair.

🔗Building and Starting

  • You can build the example as described in Building an Example Application

  • Start CDAP (as described in Starting and Stopping CDAP).

  • Deploy the application, as described in Deploying an Application. For example, from the Standalone CDAP SDK directory, use the Command Line Interface (CLI):

    $ cdap cli load artifact examples/WordCount/target/WordCount-4.1.1.jar
    
    Successfully added artifact with name 'WordCount'
    
    $ cdap cli create app WordCount WordCount 4.1.1 user
    
    Successfully created application
    
    > cdap cli load artifact examples\WordCount\target\WordCount-4.1.1.jar
    
    Successfully added artifact with name 'WordCount'
    
    > cdap cli create app WordCount WordCount 4.1.1 user
    
    Successfully created application
    
  • Once the application has been deployed, you can start its components, as described in Starting an Application, and detailed at the start of running the example.

  • Once all components are started, run the example.

  • When finished, you can stop and remove the application.

🔗Running the Example

🔗Starting the Flow

  • Using the CDAP UI, go to the WordCount application overview page, programs tab, click WordCounter to get to the flow detail page, then click the Start button; or

  • From the Standalone CDAP SDK directory, use the Command Line Interface:

    $ cdap cli start flow WordCount.WordCounter
    
    Successfully started flow 'WordCounter' of application 'WordCount' with stored runtime arguments '{}'
    
    > cdap cli start flow WordCount.WordCounter
    
    Successfully started flow 'WordCounter' of application 'WordCount' with stored runtime arguments '{}'
    

🔗Starting the Service

  • Using the CDAP UI, go to the WordCount application overview page, programs tab, click RetrieveCounts to get to the service detail page, then click the Start button; or

  • From the Standalone CDAP SDK directory, use the Command Line Interface:

    $ cdap cli start service WordCount.RetrieveCounts
    
    Successfully started service 'RetrieveCounts' of application 'WordCount' with stored runtime arguments '{}'
    
    > cdap cli start service WordCount.RetrieveCounts
    
    Successfully started service 'RetrieveCounts' of application 'WordCount' with stored runtime arguments '{}'
    

🔗Injecting Sentences

In the application's detail page, click on the WordCounter flow. This takes you to the flow details page.

Now double-click on the wordStream stream on the left side of the flow visualization, which brings up a pop-up window. Enter a sentence such as "Hello CDAP" (without the enclosing quotes) and click on the Inject button.

After you close the pop-up window (using the button in the window's upper-right), you will see that the counter for the stream increases to 1, the counters for the flowlets splitter and associator increase to 1 and the counters for the flowlets counter and unique increase to 2.

You can repeat these steps to enter additional sentences. In the dialog box is an +Upload button that will send a file to the stream; you can use that to upload a text file if you wish.

🔗Querying the Results

To query the RetrieveCounts service, either:

  • Use the CDAP CLI:

    $ cdap cli call service WordCount.RetrieveCounts GET /count/CDAP
    
    > cdap cli call service WordCount.RetrieveCounts GET \count\CDAP
    
  • Send a query via an HTTP request using the curl command:

    $ curl -w"\n" -X GET "http://localhost:11015/v3/namespaces/default/apps/WordCount/services/RetrieveCounts/methods/count/CDAP"
    
    > curl -X GET "http://localhost:11015/v3/namespaces/default/apps/WordCount/services/RetrieveCounts/methods/count/CDAP"
    

The word count and top-10 associations words for that word will be displayed in JSON format (example reformatted to fit; results will depend on what you have submitted):

{
  "assocs": {
    "Hello":1,
    "BigData":3,
    "Cask":5,
  },
  "count":6,
  "word":"CDAP"
}

You can also make requests to the other endpoints available in this service, as described above.

🔗Stopping and Removing the Application

Once done, you can stop the application—if it hasn't stopped already—as described in Stopping an Application. Here is an example-specific description of the steps:

Stopping the Flow

  • Using the CDAP UI, go to the WordCount application overview page, programs tab, click WordCounter to get to the flow detail page, then click the Stop button; or

  • From the Standalone CDAP SDK directory, use the Command Line Interface:

    $ cdap cli stop flow WordCount.WordCounter
    
    Successfully stopped flow 'WordCounter' of application 'WordCount'
    
    > cdap cli stop flow WordCount.WordCounter
    
    Successfully stopped flow 'WordCounter' of application 'WordCount'
    

Stopping the Service

  • Using the CDAP UI, go to the WordCount application overview page, programs tab, click RetrieveCounts to get to the service detail page, then click the Stop button; or

  • From the Standalone CDAP SDK directory, use the Command Line Interface:

    $ cdap cli stop service WordCount.RetrieveCounts
    
    Successfully stopped service 'RetrieveCounts' of application 'WordCount'
    
    > cdap cli stop service WordCount.RetrieveCounts
    
    Successfully stopped service 'RetrieveCounts' of application 'WordCount'
    

Removing the Application

You can now remove the application as described in Removing an Application, or:

  • Using the CDAP UI, go to the WordCount application overview page, programs tab, click the Actions menu on the right side and select Manage to go to the Management pane for the application, then click the Actions menu on the right side and select Delete to delete the application; or

  • From the Standalone CDAP SDK directory, use the Command Line Interface:

    $ cdap cli delete app WordCount
    
    > cdap cli delete app WordCount