๐Ÿ”—Spark K-Means

A Cask Data Application Platform (CDAP) example demonstrating Spark.

๐Ÿ”—Overview

This example demonstrates a Spark application performing streaming analysis, computing the centers of points from an input stream using the K-Means Clustering method.

Data from a sample file is sent to CDAP by a CDAP CLI command to the pointsStream. This data is processed by the PointsFlow, which stores the points coordinates event in its entirety in points, an ObjectStore dataset.

As these entries are created, they are taken up by the SparkKMeansProgram, which goes through the entries, calculates centers and tabulates results in another ObjectStore dataset, centers.

Once the application completes, you can query the centers dataset by using the centers/{index} endpoint of the CentersService. It will respond with the center's coordinates based on the index parameter (e.g. "9.1,9.1,9.1").

Let's look at some of these components, and then run the application and see the results.

๐Ÿ”—The SparkKMeans Application

As in the other examples, the components of the application are tied together by the class SparkKMeansApp:

public class SparkKMeansApp extends AbstractApplication {

  @Override
  public void configure() {
    setName("SparkKMeans");
    setDescription("Spark KMeans app");

    // Ingest data into the Application via a Stream
    addStream(new Stream("pointsStream"));

    // Process points data in real-time using a Flow
    addFlow(new PointsFlow());

    // Run a Spark program on the acquired data
    addSpark(new SparkKMeansSpecification());

    // Retrieve the processed data using a Service
    addService(new CentersService());

    // Store input and processed data in ObjectStore Datasets
    try {
      ObjectStores.createObjectStore(getConfigurer(), "points", Point.class,
                                     DatasetProperties.builder().setDescription("Store points data").build());
      ObjectStores.createObjectStore(getConfigurer(), "centers", String.class,
                                     DatasetProperties.builder().setDescription("Store centers data").build());
    } catch (UnsupportedTypeException e) {
      // This exception is thrown by ObjectStore if its parameter type cannot be
      // (de)serialized (for example, if it is an interface and not a class, then there is
      // no auto-magic way deserialize an object.) In this case that will not happen
      // because String is an actual class.
      throw new RuntimeException(e);
    }
  }
. . .

๐Ÿ”—The points and centers ObjectStore Data Storage

The raw points data is stored in an ObjectStore dataset, points. The calculated centers data is stored in a second ObjectStore dataset, centers.

๐Ÿ”—The CentersService Service

This service has a centers/{index} endpoint to obtain the center's coordinates of a given index.

๐Ÿ”—Building and Starting

  • You can build the example as described in Building an Example Application

  • Start CDAP (as described in Starting and Stopping CDAP).

  • Deploy the application, as described in Deploying an Application. For example, from the Standalone CDAP SDK directory, use the Command Line Interface (CLI):

    $ cdap cli load artifact examples/SparkKMeans/target/SparkKMeans-4.1.1.jar
    
    Successfully added artifact with name 'SparkKMeans'
    
    $ cdap cli create app SparkKMeans SparkKMeans 4.1.1 user
    
    Successfully created application
    
    > cdap cli load artifact examples\SparkKMeans\target\SparkKMeans-4.1.1.jar
    
    Successfully added artifact with name 'SparkKMeans'
    
    > cdap cli create app SparkKMeans SparkKMeans 4.1.1 user
    
    Successfully created application
    
  • Once the application has been deployed, you can start its components, as described in Starting an Application, and detailed at the start of running the example.

  • Once all components are started, run the example.

  • When finished, you can stop and remove the application.

๐Ÿ”—Running the Example

๐Ÿ”—Starting the Flow

  • Using the CDAP UI, go to the SparkKMeans application overview page, programs tab, click PointsFlow to get to the flow detail page, then click the Start button; or

  • From the Standalone CDAP SDK directory, use the Command Line Interface:

    $ cdap cli start flow SparkKMeans.PointsFlow
    
    Successfully started flow 'PointsFlow' of application 'SparkKMeans' with stored runtime arguments '{}'
    
    > cdap cli start flow SparkKMeans.PointsFlow
    
    Successfully started flow 'PointsFlow' of application 'SparkKMeans' with stored runtime arguments '{}'
    

๐Ÿ”—Starting the Service

  • Using the CDAP UI, go to the SparkKMeans application overview page, programs tab, click CentersService to get to the service detail page, then click the Start button; or

  • From the Standalone CDAP SDK directory, use the Command Line Interface:

    $ cdap cli start service SparkKMeans.CentersService
    
    Successfully started service 'CentersService' of application 'SparkKMeans' with stored runtime arguments '{}'
    
    > cdap cli start service SparkKMeans.CentersService
    
    Successfully started service 'CentersService' of application 'SparkKMeans' with stored runtime arguments '{}'
    

๐Ÿ”—Injecting Points Data

Inject a file of points data to the stream pointsStream by running this command from the Standalone CDAP SDK directory, using the Command Line Interface:

$ cdap cli load stream pointsStream examples/SparkKMeans/resources/points.txt

Successfully loaded file to stream 'pointsStream'
> cdap cli load stream pointsStream examples\SparkKMeans\resources\points.txt

Successfully loaded file to stream 'pointsStream'

๐Ÿ”—Running the Spark Program

There are three ways to start the Spark program:

  1. Go to the SparkKMeans application overview page, programs tab, click CentersService to get to the service detail page, then click the Start button; or

  2. Use the Command Line Interface:

    $ cdap cli start spark SparkKMeans.SparkKMeansProgram "args='3'"
    
    > cdap cli start spark SparkKMeans.SparkKMeansProgram "args='3'"
    
  3. Send a query via an HTTP request using the curl command:

    $ curl -w"\n" -X POST -d '{args="3"}' \
    "http://localhost:11015/v3/namespaces/default/apps/SparkKMeans/spark/SparkKMeansProgram/start"
    
    > curl -X POST -d "{args=\"3\"}' ^
    \"http://localhost:11015/v3/namespaces/default/apps/SparkKMeans/spark/SparkKMeansProgram/start\"
    

๐Ÿ”—Querying the Results

To query the centers ObjectStore using the CentersService, you can:

  • You can use the Command Line Interface:

    $ cdap cli call service SparkKMeans.CentersService GET centers/1
    
    306.52261306532665,306.52261306532665,793.7956448911223
    
    > cdap cli call service SparkKMeans.CentersService GET centers\1
    
    306.52261306532665,306.52261306532665,793.7956448911223
    
  • Send a query via an HTTP request using the curl command. For example:

    $ curl -w"\n" -X GET "http://localhost:11015/v3/namespaces/default/apps/SparkKMeans/services/CentersService/methods/centers/1"
    
    306.52261306532665,306.52261306532665,793.7956448911223
    
    > curl -X GET "http://localhost:11015/v3/namespaces/default/apps/SparkKMeans/services/CentersService/methods/centers/1"
    
    306.52261306532665,306.52261306532665,793.7956448911223
    

๐Ÿ”—Stopping and Removing the Application

Once done, you can stop the applicationโ€”if it hasn't stopped alreadyโ€”as described in Stopping an Application. Here is an example-specific description of the steps:

Stopping the Flow

  • Using the CDAP UI, go to the SparkKMeans application overview page, programs tab, click PointsFlow to get to the flow detail page, then click the Stop button; or

  • From the Standalone CDAP SDK directory, use the Command Line Interface:

    $ cdap cli stop flow SparkKMeans.PointsFlow
    
    Successfully stopped flow 'PointsFlow' of application 'SparkKMeans'
    
    > cdap cli stop flow SparkKMeans.PointsFlow
    
    Successfully stopped flow 'PointsFlow' of application 'SparkKMeans'
    

Stopping the Service

  • Using the CDAP UI, go to the SparkKMeans application overview page, programs tab, click CentersService to get to the service detail page, then click the Stop button; or

  • From the Standalone CDAP SDK directory, use the Command Line Interface:

    $ cdap cli stop service SparkKMeans.CentersService
    
    Successfully stopped service 'CentersService' of application 'SparkKMeans'
    
    > cdap cli stop service SparkKMeans.CentersService
    
    Successfully stopped service 'CentersService' of application 'SparkKMeans'
    

Removing the Application

You can now remove the application as described in Removing an Application, or:

  • Using the CDAP UI, go to the SparkKMeans application overview page, programs tab, click the Actions menu on the right side and select Manage to go to the Management pane for the application, then click the Actions menu on the right side and select Delete to delete the application; or

  • From the Standalone CDAP SDK directory, use the Command Line Interface:

    $ cdap cli delete app SparkKMeans
    
    > cdap cli delete app SparkKMeans