🔗Purchase

A Cask Data Application Platform (CDAP) example demonstrating many of the CDAP components.

🔗Overview

This example demonstrates use of many of the CDAP components—streams, flows, flowlets, datasets, queries, MapReduce programs, workflows, and services—all in a single application.

The application uses a scheduled MapReduce and workflow to read from an ObjectMappedTable dataset and write to an ObjectStore dataset.

  • Send sentences of the form "Tom bought 5 apples for $10" to the purchaseStream. You can send sentences either by using a curl call, the CDAP CLI, or the CDAP UI.
  • The PurchaseFlow reads the purchaseStream and converts every input string into a Purchase object and stores the object in the purchases dataset.
  • The PurchaseStore flowlet demonstrates the setting of memory used by its YARN container.
  • User profile information for the user can be added by using curl calls (or another method) which are then stored in the userProfiles dataset.
  • The CatalogLookup service fetches the catalog id for a given product. The CatalogLookup service is called from the PurchaseStore flowlet. The host and port of the CatalogLookup service is discovered using the service discovery framework.
  • The UserProfileService is responsible for storing and retrieving the user information for a given user ID from the userProfiles dataset. The host and port of the UserProfileService is discovered using the service discovery framework.
  • When scheduled by the PurchaseHistoryWorkFlow, the PurchaseHistoryBuilder MapReduce reads the purchases dataset. It fetches the user profile information, if it is available, from the UserProfileService and creates a purchase history. It stores the purchase history in the history dataset every morning at 4:00 A.M. using a time schedule, and also every time 1MB of data is ingested by the purchaseStream using a data schedule.
  • You can either manually (in the Process screen of the CDAP UI) or programmatically execute the PurchaseHistoryBuilder MapReduce to store customers' purchase history in the history dataset.
  • The PurchaseHistoryBuilder MapReduce demonstrates the setting of memory used by its YARN container, both as default values and as runtime arguments.
  • Use the PurchaseHistoryService to retrieve from the history dataset the purchase history of a user.
  • Execute a SQL query over the history dataset. You can do this using a series of curl calls, or more conveniently using the Command Line Interface.

Note: Because the PurchaseHistoryWorkFlow is only scheduled to run at 4:00 A.M., you should not start it manually until after entering the first customers' purchases, or the PurchaseHistoryService will respond with a 204 No Response status code.

Let's look at some of these components, and then run the application and see the results.

🔗The Purchase Application

As in the other examples, the components of the application are tied together by the class PurchaseApp:

public class PurchaseApp extends AbstractApplication {

  public static final String APP_NAME = "PurchaseHistory";

  @Override
  public void configure() {
    setName(APP_NAME);
    setDescription("Purchase history application");

    // Ingest data into the Application via a Stream
    addStream(new Stream("purchaseStream"));

    // Store processed data in a Dataset
    createDataset("frequentCustomers", KeyValueTable.class,
                  DatasetProperties.builder().setDescription("Store frequent customers").build());

    // Store user profiles in a Dataset
    createDataset("userProfiles", KeyValueTable.class,
                  DatasetProperties.builder().setDescription("Store user profiles").build());

    // Process events in realtime using a Flow
    addFlow(new PurchaseFlow());

    // Specify a MapReduce to run on the acquired data
    addMapReduce(new PurchaseHistoryBuilder());

    // Run a Workflow that uses the MapReduce to run on the acquired data
    addWorkflow(new PurchaseHistoryWorkflow());

    // Retrieve the processed data using a Service
    addService(new PurchaseHistoryService());

    // Store and retrieve user profile data using a Service
    addService(UserProfileServiceHandler.SERVICE_NAME, new UserProfileServiceHandler());

    // Provide a Service to Application components
    addService(new CatalogLookupService());

    // Schedule the workflow
    scheduleWorkflow(
      Schedules.builder("DailySchedule")
        .setMaxConcurrentRuns(1)
        .createTimeSchedule("0 4 * * *"),
      "PurchaseHistoryWorkflow");

    // Schedule the workflow based on the data coming in the purchaseStream stream
    scheduleWorkflow(
      Schedules.builder("DataSchedule")
        .setDescription("Schedule execution when 1 MB or more of data is ingested in the purchaseStream")
        .setMaxConcurrentRuns(1)
        .createDataSchedule(Schedules.Source.STREAM, "purchaseStream", 1),
      "PurchaseHistoryWorkflow"
    );

    createDataset("history", PurchaseHistoryStore.class, PurchaseHistoryStore.properties("History dataset"));
    try {
      createDataset("purchases", ObjectMappedTable.class, ObjectMappedTableProperties.builder().setType(Purchase.class)
        .setDescription("Store purchases").build());
    } catch (UnsupportedTypeException e) {
      // This exception is thrown by ObjectMappedTable if its parameter type cannot be
      // (de)serialized (for example, if it is an interface and not a class, then there is
      // no auto-magic way deserialize an object.) In this case that will not happen
      // because PurchaseHistory and Purchase are actual classes.
      throw new RuntimeException(e);
    }
  }
}

🔗Storing Purchases with the Purchase ObjectStore Data Storage

The raw purchase data is stored in an ObjectMappedTable dataset, purchases, with this method defined in PurchaseStore.java:

public void process(Purchase purchase) {
  // Discover the CatalogLookup service via discovery service
  // the service name is the same as the one provided in the Application configure method
  URL serviceURL = getContext().getServiceURL(PurchaseApp.APP_NAME, CatalogLookupService.SERVICE_NAME);
  if (serviceURL != null) {
    String catalog = getCatalogId(serviceURL, purchase.getProduct());
    if (catalog != null) {
      purchase.setCatalogId(catalog);
    }
  }
  metrics.count("purchases." + purchase.getCustomer(), 1);

  LOG.info("Purchase info: Customer {}, ProductId {}, CatalogId {}",
           purchase.getCustomer(), purchase.getProduct(), purchase.getCatalogId());
  store.write(Bytes.toBytes(purchase.getPurchaseTime()), purchase);
}

This method is what actually puts data into the purchases dataset, by writing to the dataset with each purchase's timestamp and the Purchase Object.

The purchase history for each customer is compiled by the PurchaseHistoryWorkflow, which uses a MapReduce—PurchaseHistoryBuilder—to aggregate all purchases into a per-customer purchase history. It writes to the history dataset, a custom dataset that embeds an ObjectStore and implements the RecordScannable interface to allow SQL queries over the dataset.

The memory requirements of the flowlet PurchaseStore are set in its configure method:

@Override
public void configure(FlowletConfigurer configurer) {
  super.configure(configurer);
  setDescription("Store the incoming Purchase objects in the purchases dataset");
  setResources(new Resources(1024));
}

🔗PurchaseHistoryBuilder MapReduce

This MapReduce program demonstrates the setting of the YARN container resources, both as default values used in configuration and as runtime arguments:

public class PurchaseHistoryBuilder extends AbstractMapReduce {
  public static final String MAPPER_MEMORY_MB = "mapper.memory.mb";
  public static final String REDUCER_MEMORY_MB = "reducer.memory.mb";

  @Override
  public void configure() {
    setDescription("Purchase History Builder");
    setDriverResources(new Resources(1024));
    setMapperResources(new Resources(1024));
    setReducerResources(new Resources(1024));
  }

  @Override
  public void initialize() throws Exception {
    MapReduceContext context = getContext();
    Job job = context.getHadoopJob();
    job.setReducerClass(PerUserReducer.class);

    context.addInput(Input.ofDataset("purchases"), PurchaseMapper.class);
    context.addOutput(Output.ofDataset("history"));

    // override default memory usage if the corresponding runtime arguments are set.
    Map<String, String> runtimeArgs = context.getRuntimeArguments();
    String mapperMemoryMBStr = runtimeArgs.get(MAPPER_MEMORY_MB);
    if (mapperMemoryMBStr != null) {
      context.setMapperResources(new Resources(Integer.parseInt(mapperMemoryMBStr)));
    }
    String reducerMemoryMBStr = runtimeArgs.get(REDUCER_MEMORY_MB);
    if (reducerMemoryMBStr != null) {
      context.setReducerResources(new Resources(Integer.parseInt(reducerMemoryMBStr)));
    }
  }
...

🔗PurchaseHistoryService Service

This service has a history/{customer} endpoint to obtain the purchase history of a given customer. It also demonstrates the use of Resources to configure the memory requirements of the service:

@Override
protected void configure() {
  setName(SERVICE_NAME);
  setDescription("A service to retrieve a customer's purchase history");
  addHandler(new PurchaseHistoryServiceHandler());
  setResources(new Resources(1024));
}

🔗UserProfileService Service

This service has two endpoints:

A user endpoint to add a user's profile information to the system:

$ cdap cli call service PurchaseHistory.UserProfileService POST user body \
  '{"id":"Alice","firstName":"Alice","lastName":"Bernard","categories":["fruits"]}'
> cdap cli call service PurchaseHistory.UserProfileService POST user body ^
  '{"id":"Alice","firstName":"Alice","lastName":"Bernard","categories":["fruits"]}'

A user/{id} endpoint to obtain profile information for a specified user:

$ cdap cli call service PurchaseHistory.UserProfileService GET user/Alice

< 200 OK
< Content-Length: 79
< Connection: keep-alive
< Content-Type: application/json
{"id":"Alice","firstName":"Alice","lastName":"Bernard","categories":["fruits"]}
> cdap cli call service PurchaseHistory.UserProfileService GET user\Alice

< 200 OK
< Content-Length: 79
< Connection: keep-alive
< Content-Type: application\json
{"id":"Alice","firstName":"Alice","lastName":"Bernard","categories":["fruits"]}

🔗Building and Starting

  • You can build the example as described in Building an Example Application

  • Start CDAP (as described in Starting and Stopping CDAP).

  • Deploy the application, as described in Deploying an Application. For example, from the Standalone CDAP SDK directory, use the Command Line Interface (CLI):

    $ cdap cli load artifact examples/Purchase/target/Purchase-4.1.1.jar
    
    Successfully added artifact with name 'Purchase'
    
    $ cdap cli create app PurchaseHistory Purchase 4.1.1 user
    
    Successfully created application
    
    > cdap cli load artifact examples\Purchase\target\Purchase-4.1.1.jar
    
    Successfully added artifact with name 'Purchase'
    
    > cdap cli create app PurchaseHistory Purchase 4.1.1 user
    
    Successfully created application
    
  • Once the application has been deployed, you can start its components, as described in Starting an Application, and detailed at the start of running the example.

  • Once all components are started, run the example.

  • When finished, you can stop and remove the application.

🔗Running the Example

🔗Starting the Flow

  • Using the CDAP UI, go to the PurchaseHistory application overview page, programs tab, click PurchaseFlow to get to the flow detail page, then click the Start button; or

  • From the Standalone CDAP SDK directory, use the Command Line Interface:

    $ cdap cli start flow PurchaseHistory.PurchaseFlow
    
    Successfully started flow 'PurchaseFlow' of application 'PurchaseHistory' with stored runtime arguments '{}'
    
    > cdap cli start flow PurchaseHistory.PurchaseFlow
    
    Successfully started flow 'PurchaseFlow' of application 'PurchaseHistory' with stored runtime arguments '{}'
    

🔗Starting the Services

  • Using the CDAP UI, go to the PurchaseHistory application overview page, programs tab, click PurchaseHistoryService to get to the service detail page, then click the Start button, and then do the same for the CatalogLookup and UserProfileService services; or

  • From the Standalone CDAP SDK directory, use the Command Line Interface:

    $ cdap cli start service PurchaseHistory.PurchaseHistoryService
    $ cdap cli start service PurchaseHistory.CatalogLookup
    $ cdap cli start service PurchaseHistory.UserProfileService
    
    Successfully started service 'PurchaseHistoryService' of application 'PurchaseHistory' with stored runtime arguments '{}'
    Successfully started service 'CatalogLookup' of application 'PurchaseHistory' with stored runtime arguments '{}'
    Successfully started service 'UserProfileService' of application 'PurchaseHistory' with stored runtime arguments '{}'
    
    > cdap cli start service PurchaseHistory.PurchaseHistoryService
    > cdap cli start service PurchaseHistory.CatalogLookup
    > cdap cli start service PurchaseHistory.UserProfileService
    
    Successfully started service 'PurchaseHistoryService' of application 'PurchaseHistory' with stored runtime arguments '{}'
    Successfully started service 'CatalogLookup' of application 'PurchaseHistory' with stored runtime arguments '{}'
    Successfully started service 'UserProfileService' of application 'PurchaseHistory' with stored runtime arguments '{}'
    
  • Or, you can send curl requests to CDAP:

    $ curl -w"\n" -X POST "http://localhost:11015/v3/namespaces/default/apps/PurchaseHistory/services/PurchaseHistoryService/start"
    $ curl -w"\n" -X POST "http://localhost:11015/v3/namespaces/default/apps/PurchaseHistory/services/CatalogLookup/start"
    $ curl -w"\n" -X POST "http://localhost:11015/v3/namespaces/default/apps/PurchaseHistory/services/UserProfileService/start"
    
    > curl -X POST "http://localhost:11015/v3/namespaces/default/apps/PurchaseHistory/services/PurchaseHistoryService/start"
    > curl -X POST "http://localhost:11015/v3/namespaces/default/apps/PurchaseHistory/services/CatalogLookup/start"
    > curl -X POST "http://localhost:11015/v3/namespaces/default/apps/PurchaseHistory/services/UserProfileService/start"
    

🔗Add A Profile

Add a User Profile for the user Alice, by running this command from the Standalone CDAP SDK directory, using the Command Line Interface:

$ cdap cli call service PurchaseHistory.UserProfileService POST user body \
  '{"id":"Alice","firstName":"Alice","lastName":"Bernard","categories":["fruits"]}'

Successfully connected to CDAP instance at http://localhost:11015/default
< 200 OK
> cdap cli call service PurchaseHistory.UserProfileService POST user body ^
  '{"id":"Alice","firstName":"Alice","lastName":"Bernard","categories":["fruits"]}'

Successfully connected to CDAP instance at http://localhost:11015/default
< 200 OK

🔗Injecting Sentences

Inject a file of sentences by running this command from the Standalone CDAP SDK directory, using the Command Line Interface:

$ cdap cli load stream purchaseStream examples/Purchase/resources/purchases.txt
Successfully loaded file to stream 'purchaseStream'
> cdap cli load stream purchaseStream examples\Purchase\resources\purchases.txt
Successfully loaded file to stream 'purchaseStream'

🔗Starting the Workflow

  • Using the CDAP UI, go to the PurchaseHistory application overview page, programs tab, click PurchaseHistoryWorkflow to get to the workflow detail page, then click the Start button; or

  • From the Standalone CDAP SDK directory, use the Command Line Interface:

    $ cdap cli start workflow PurchaseHistory.PurchaseHistoryWorkflow
    
    Successfully started workflow 'PurchaseHistoryWorkflow' of application 'PurchaseHistory' with stored runtime arguments '{}'
    
    > cdap cli start workflow PurchaseHistory.PurchaseHistoryWorkflow
    
    Successfully started workflow 'PurchaseHistoryWorkflow' of application 'PurchaseHistory' with stored runtime arguments '{}'
    
  • Or, send a query via an HTTP request using the curl command:

    $ curl -w"\n" -X POST "http://localhost:11015/v3/namespaces/default/apps/PurchaseHistory/workflows/PurchaseHistoryWorkflow/start"
    
    > curl -X POST "http://localhost:11015/v3/namespaces/default/apps/PurchaseHistory/workflows/PurchaseHistoryWorkflow/start"
    

🔗Querying the Results

To query the history ObjectStore through the PurchaseHistoryService, you can:

  • Using the CDAP UI, go to the PurchaseHistory application overview page, programs tab, click PurchaseHistoryService to get to the service detail page, then click the Start button; or

  • From the Standalone CDAP SDK directory, use the Command Line Interface:

    $ cdap cli call service PurchaseHistory.PurchaseHistoryService GET history/Alice
    
    > cdap cli call service PurchaseHistory.PurchaseHistoryService GET history\Alice
    
  • Or, send a query via an HTTP request using the curl command:

    $ curl -w"\n" -X POST "http://localhost:11015/v3/namespaces/default/apps/PurchaseHistory/services/PurchaseHistoryService/methods/history/Alice"
    
    > curl -X POST "http://localhost:11015/v3/namespaces/default/apps/PurchaseHistory/services/PurchaseHistoryService/methods/history/Alice"
    

The results will be in JSON:

{"customer":"Alice","userProfile":{"id":"Alice","firstName":"Alice","lastName":"Bernard","
categories":["fruits"]},"purchases":[{"customer":"Alice","product":"coconut","quantity":2,
"price":5,"purchaseTime":1444411198435,"catalogId":""},{"customer":"Alice","product":"
grapefruit","quantity":12,"price":10,"purchaseTime":1444411198432,"catalogId":""}]}

🔗Exploring the Results using SQL

You can use SQL to formulate ad-hoc queries over the history and purchases datasets. This is done by a series of curl calls, as described in the RESTful API section of the CDAP Reference Manual. For your convenience, the SDK's Command Line Interface can execute the series of calls.

From within the SDK root directory:

$ cdap cli execute "\"SELECT * FROM dataset_history WHERE customer IN ('Alice','Bob')\""
> cdap cli execute "\"SELECT * FROM dataset_history WHERE customer IN ('Alice','Bob')\""

This will submit the query, using the history table in the cdap_user namespace, wait for its completion and then retrieve and print all results, one by one (example reformatted to fit):

+===========================================================================================================================+
| dataset_history.customer: | dataset_history.userprofile: struct<id:string | dataset_history.purchases: array<struct<custo |
| STRING                    | ,firstname:string,lastname:string,categories: | mer:string,product:string,quantity:int,price: |
|                           | array<string>>                                | int,purchasetime:bigint,catalogid:string>>    |
+===========================================================================================================================+
| Alice                     | {"id":"Alice","firstname":"Alice","lastname": | [{"customer":"Alice","product":"coconut","qua |
|                           | "Bernard","categories":["fruits"]}            | ntity":2,"price":5,"purchasetime":14267198729 |
|                           |                                               | 86,"catalogid":"Catalog-coconut"},{"customer" |
|                           |                                               | :"Alice","product":"grapefruit","quantity":12 |
|                           |                                               | ,"price":10,"purchasetime":1426719872968,"cat |
|                           |                                               | alogid":"Catalog-grapefruit"}]                |
|---------------------------------------------------------------------------------------------------------------------------|
| Bob                       |                                               | [{"customer":"Bob","product":"coffee","quanti |
|                           |                                               | ty":1,"price":1,"purchasetime":1426719873005, |
|                           |                                               | "catalogid":"Catalog-coffee"},{"customer":"Bo |
|                           |                                               | b","product":"orange","quantity":6,"price":12 |
|                           |                                               | ,"purchasetime":1426719872970,"catalogid":"Ca |
|                           |                                               | talog-orange"}]                               |
+===========================================================================================================================+

Note that because we only submitted a single User Profile, only one result—for Alice—is returned.

🔗Explore the Results Using curl and SQL

If you prefer to use curl directly, here are the sequence of steps to execute:

First, submit the query for execution:

$ curl -w"\n" -X POST -d '{"query": "'"SELECT * FROM dataset_history WHERE customer IN ('Alice','Bob')"'"}' \
"http://localhost:11015/v3/namespaces/default/data/explore/queries"
> curl -X POST -d "{\"query\": \"SELECT * FROM dataset_history WHERE customer IN (\'Alice\',\'Bob\')\"}" ^
"http://localhost:11015/v3/namespaces/default/data/explore/queries"

Note that due to the mix and repetition of single and double quotes, it can be tricky to escape all quotes correctly at the shell command prompt. On success, this will return a handle for the query, such as:

{"handle":"07fd9b6a-95b3-4831-992c-7164f11c3754"}

This handle is needed to inquire about the status of the query and to retrieve query results. To get the status, issue a GET to the query's URL using the handle:

$ curl -w"\n" -X GET "http://localhost:11015/v3/data/explore/queries/07fd9b6a-95b3-4831-992c-7164f11c3754/status"
> curl -X GET "http://localhost:11015/v3/data/explore/queries/07fd9b6a-95b3-4831-992c-7164f11c3754/status"

Because a SQL query can run for several minutes, you may have to repeat the call until it returns a status of finished:

{"status":"FINISHED","hasResults":true}

Once execution has finished, you can retrieve the results of the query using the handle:

$ curl -w"\n" -X POST "http://localhost:11015/v3/data/explore/queries/07fd9b6a-95b3-4831-992c-7164f11c3754/next"
> curl -X POST "http://localhost:11015/v3/data/explore/queries/07fd9b6a-95b3-4831-992c-7164f11c3754/next"

This will return—up to a limited number—the results in JSON format:

[{"columns":["Alice","[{\"customer\":\"Alice\",\"product\":\"grapefruit\",\"quantity\":12,\"price\":10
  \"purchasetime\":1403737694225}]"]},
{"columns":["Bob","[{\"customer\":\"Bob\",\"product\":\"orange\",\"quantity\":6,\"price\":12
  \"purchasetime\":1403737694226}]"]}]
. . .

You repeat this step until the curl call returns an empty list. That means you have retrieved all of the results and you can now close the query:

$ curl -w"\n" -X DELETE "http://localhost:11015/v3/data/explore/queries/07fd9b6a-95b3-4831-992c-7164f11c3754"
> curl -X DELETE "http://localhost:11015/v3/data/explore/queries/07fd9b6a-95b3-4831-992c-7164f11c3754"

🔗Stopping and Removing the Application

Once done, you can stop the application—if it hasn't stopped already—as described in Stopping an Application. Here is an example-specific description of the steps:

Stopping the Flow

  • Using the CDAP UI, go to the PurchaseHistory application overview page, programs tab, click PurchaseFlow to get to the flow detail page, then click the Stop button; or

  • From the Standalone CDAP SDK directory, use the Command Line Interface:

    $ cdap cli stop flow PurchaseHistory.PurchaseFlow
    
    Successfully stopped flow 'PurchaseFlow' of application 'PurchaseHistory'
    
    > cdap cli stop flow PurchaseHistory.PurchaseFlow
    
    Successfully stopped flow 'PurchaseFlow' of application 'PurchaseHistory'
    

Stopping the Services

  • Using the CDAP UI, go to the PurchaseHistory application overview page, programs tab, click PurchaseHistoryService to get to the service detail page, then click the Stop button, and then do the same for the CatalogLookup and UserProfileService services; or

  • From the Standalone CDAP SDK directory, use the Command Line Interface:

    $ cdap cli stop service PurchaseHistory.PurchaseHistoryService
    $ cdap cli stop service PurchaseHistory.CatalogLookup
    $ cdap cli stop service PurchaseHistory.UserProfileService
    
    > cdap cli stop service PurchaseHistory.PurchaseHistoryService
    > cdap cli stop service PurchaseHistory.CatalogLookup
    > cdap cli stop service PurchaseHistory.UserProfileService
    

Removing the Application

You can now remove the application as described in Removing an Application, or:

  • Using the CDAP UI, go to the PurchaseHistory application overview page, programs tab, click the Actions menu on the right side and select Manage to go to the Management pane for the application, then click the Actions menu on the right side and select Delete to delete the application; or

  • From the Standalone CDAP SDK directory, use the Command Line Interface:

    $ cdap cli delete app PurchaseHistory
    
    > cdap cli delete app PurchaseHistory