Spam Classifier

A Cask Data Application Platform (CDAP) example demonstrating Spark Streaming.

Overview

This example demonstrates a Spark Streaming application that classifies Kafka messages as either "spam" or "ham" (not spam) based on a trained Spark MLlib NaiveBayes model.

Training data from a sample file is sent to CDAP by a CDAP CLI command to the trainingDataStream. This training data is from the SMS Spam Collection Dataset, which consists of a label (spam, ham) followed by the message.

This training data is used by the SpamClassifierProgram to train a Spark MLlib NaiveBayes model, which is then used to classify realtime messages coming through Kafka.

Once the application completes, you can query the messageClassificationStore dataset by using the classification/<message-id> endpoint of the MessageClassification. It will respond with Spam or Ham depending upon on the classification of the message.

Let's look at some of these components, and then run the application and see the results.

The SpamClassifier Application

As in the other examples, the components of the application are tied together by the class SpamClassifier:

public class SpamClassifier extends AbstractApplication {

  static final String SERVICE_HANDLER = "MessageClassification";
  public static final String STREAM = "trainingDataStream";
  public static final String DATASET = "messageClassificationStore";

  @Override
  public void configure() {
    setName("SpamClassifier");
    setDescription("A Spark Streaming Example for Kafka Message Classification");
    addStream(new Stream(STREAM));
    addSpark(new SpamClassifierProgram());
    addService(SERVICE_HANDLER, new SpamClassifierServiceHandler());

    // Store for message classification status
    try {
      ObjectStores.createObjectStore(getConfigurer(), DATASET, Double.class,
                                     DatasetProperties.builder().setDescription("Kafka Message Spam " +
                                                                                  "Classification").build());
    } catch (UnsupportedTypeException e) {
      // This exception is thrown by ObjectStore if its parameter type cannot be
      // (de)serialized (for example, if it is an interface and not a class, then there is
      // no auto-magic way deserialize an object.) In this case that will not happen
      // because Double is an actual class.
      throw new RuntimeException(e);
    }
  }
. . .

The messageClassificationStore ObjectStore Data Storage

The classified messages are stored in an ObjectStore dataset, messageClassificationStore, keyed by the message-id.

The MessageClassification Service

This service has a classification/<message-id> endpoint to obtain the classification of a given message through its message-id.

Building and Starting

  • You can build the example as described in Building an Example Application.

  • Start CDAP (as described in Starting and Stopping CDAP).

  • Deploy the application, as described in Deploying an Application. For example, from the CDAP Local Sandbox home directory, use the Command Line Interface (CLI):

    $ cdap cli load artifact examples/SpamClassifier/target/SpamClassifier-4.3.4.jar
    
    Successfully added artifact with name 'SpamClassifier'
    
    $ cdap cli create app SpamClassifier SpamClassifier 4.3.4 user
    
    Successfully created application
    
    > cdap cli load artifact examples\SpamClassifier\target\SpamClassifier-4.3.4.jar
    
    Successfully added artifact with name 'SpamClassifier'
    
    > cdap cli create app SpamClassifier SpamClassifier 4.3.4 user
    
    Successfully created application
    
  • Once the application has been deployed, you can start its components, as described in Starting an Application, and detailed at the start of running the example.

  • Once all components are started, run the example.

  • When finished, you can stop and remove the application.

Running the Example

Starting the Service

  • Using the CDAP UI, go to the SpamClassifier application overview page, programs tab, click MessageClassification to get to the service detail page, then click the Start button; or

  • From the CDAP Local Sandbox home directory, use the Command Line Interface:

    $ cdap cli start service SpamClassifier.MessageClassification
    
    Successfully started service 'MessageClassification' of application 'SpamClassifier' with stored runtime arguments '{}'
    
    > cdap cli start service SpamClassifier.MessageClassification
    
    Successfully started service 'MessageClassification' of application 'SpamClassifier' with stored runtime arguments '{}'
    

Injecting Training Data

Inject a file of training data to the stream trainingDataStream by running this command from the CDAP Sandbox home directory, using the Command Line Interface:

$ cdap cli load stream trainingDataStream examples/SpamClassifier/src/test/resources/trainingData.txt

Successfully loaded file to stream 'trainingDataStream'
> cdap cli load stream trainingDataStream examples\SpamClassifier\src\test\resources\trainingData.txt

Successfully loaded file to stream 'trainingDataStream'

Running the Spark Program

There are three ways to start the Spark program:

  1. Go to the SpamClassifier application overview page, programs tab, click SpamClassifierProgram to get to the Spark program detail page, and add these runtime arguments/preferences:

    kafka.brokers:broker1-host:port
    kafka.topics:topic1,topic2
    

    then click the Start button;

  2. Use the Command Line Interface:

    $ cdap cli start spark SpamClassifier.SpamClassifierProgram "kafka.brokers=broker1-host:port kafka.topics=topic1,topic2"
    
    > cdap cli start spark SpamClassifier.SpamClassifierProgram "kafka.brokers=broker1-host:port kafka.topics=topic1,topic2"
    
  3. Send a query via an HTTP request using a curl command:

    $ curl -w"\n" -X POST -d '{"kafka.brokers":"broker1-host:port", "kafka.topics":"topic1,topic2"}' \
    "http://localhost:11015/v3/namespaces/default/apps/SpamClassifier/spark/SpamClassifierProgram/start"
    
    > curl -X POST -d "{\"kafka.brokers\":\"broker1-host:port\", \"kafka.topics\":\"topic1,topic2\"}" ^
    "http://localhost:11015/v3/namespaces/default/apps/SpamClassifier/spark/SpamClassifierProgram/start"
    

Injecting Prediction Data

You can publish the messages which you want to be classified as "spam/ham" to the Kafka topic configured in the above Spark program. Kafka command line tools can be used to create topic and send messages. The message must be in the format:

message-id:message

For example:

2:I will call you later

Querying the Results

To query the messageClassificationStore ObjectStore using the MessageClassification, you can either:

  • Use the Command Line Interface:

    $ cdap cli call service SpamClassifier.MessageClassification GET status/1
    
    Ham
    
    > cdap cli call service SpamClassifier.MessageClassification GET status\1
    
    Ham
    
  • Send a query via an HTTP request using a curl command. For example:

    $ curl -w"\n" -X GET "http://localhost:11015/v3/namespaces/default/apps/SpamClassifier/services/MessageClassification/methods/status/1"
    
    Ham
    
    > curl -X GET "http://localhost:11015/v3/namespaces/default/apps/SpamClassifier/services/MessageClassification/methods/status/1"
    
    Ham
    

Stopping and Removing the Application

Once done, you can stop the application—if it hasn't stopped already—as described in Stopping an Application. Here is an example-specific description of the steps:

Stopping the Spark Program

  • Using the CDAP UI, go to the SpamClassifier application overview page, programs tab, click SpamClassifierProgram to get to the Spark program detail page, then click the Stop button; or

  • From the CDAP Local Sandbox home directory, use the Command Line Interface:

    $ cdap cli stop spark SpamClassifier.SpamClassifierProgram
    
    Successfully stopped Spark 'SpamClassifierProgram' of application 'SpamClassifier'
    
    > cdap cli stop spark SpamClassifier.SpamClassifierProgram
    
    Successfully stopped Spark 'SpamClassifierProgram' of application 'SpamClassifier'
    

Stopping the Service

  • Using the CDAP UI, go to the SpamClassifier application overview page, programs tab, click MessageClassification to get to the service detail page, then click the Stop button; or

  • From the CDAP Local Sandbox home directory, use the Command Line Interface:

    $ cdap cli stop service SpamClassifier.MessageClassification
    
    Successfully stopped service 'MessageClassification' of application 'SpamClassifier'
    
    > cdap cli stop service SpamClassifier.MessageClassification
    
    Successfully stopped service 'MessageClassification' of application 'SpamClassifier'
    

Removing the Application

You can now remove the application as described in Removing an Application, or:

  • Using the CDAP UI, go to the SpamClassifier application overview page, programs tab, click the Actions menu on the right side and select Manage to go to the Management pane for the application, then click the Actions menu on the right side and select Delete to delete the application; or

  • From the CDAP Local Sandbox home directory, use the Command Line Interface:

    $ cdap cli delete app SpamClassifier
    
    > cdap cli delete app SpamClassifier