๐Ÿ”—User Profiles

A Cask Data Application Platform (CDAP) example demonstrating column-level conflict detection in datasets using the example of managing user profiles in a Table.

๐Ÿ”—Overview

This application demonstrates the use of the column-level conflict detection in a dataset, through the example of an application that manages user profiles in a Table. The fields of a user profile are updated in different ways:

  • Attributes such as name and email address are changed through a RESTful call when the user updates their profile.
  • The time of the last login is updated by a sign-on service every time the user logs in, also through a RESTful call.
  • The time of the last activity is updated by a flow that processes events whenever it encounters an event from that user.

This application illustrates both row-level and column-level conflict detection for a Table.

Let's look at some of these components, and then run the application and see the results.

๐Ÿ”—Introducing Column-Level Conflict Detection

As in the other examples, the components of the application are tied together by a class UserProfiles:

public class UserProfiles extends AbstractApplication {

  @Override
  public void configure() {
    setName("UserProfiles");
    setDescription("Demonstrates the use of column-level conflict detection");
    addStream(new Stream("events"));
    addFlow(new ActivityFlow());
    addService(new UserProfileService());
    createDataset("counters", KeyValueTable.class,
                  DatasetProperties.builder().setDescription("Counters key-value table").build());

    // create the profiles table with a schema so that it can be explored via Hive
    Schema profileSchema = Schema.recordOf(
      "profile",
      // id, name, and email are never null and are set when a user profile is created
      Schema.Field.of("id", Schema.of(Schema.Type.STRING)),
      Schema.Field.of("name", Schema.of(Schema.Type.STRING)),
      Schema.Field.of("email", Schema.of(Schema.Type.STRING)),
      // login and active are never set when a profile is created but are set later, so they are nullable.
      Schema.Field.of("login", Schema.nullableOf(Schema.of(Schema.Type.LONG))),
      Schema.Field.of("active", Schema.nullableOf(Schema.of(Schema.Type.LONG)))
    );
    createDataset("profiles", Table.class.getName(), TableProperties.builder()
      // create the profiles table with column-level conflict detection
      .setConflictDetection(ConflictDetection.COLUMN)
      .setSchema(profileSchema)
      // to indicate that the id field should come from the row key and not a row column
      .setRowFieldName("id")
      .setDescription("Profiles table with column-level conflict detection")
      .build());
  }
}

This application uses a Table with conflict detection either at the row level or at the column level.

A conflict occurs if two transactions that overlap in time modify the same data in a table. For example, a flowlet's process method might overlap with a service handler. Such a conflict is detected at the time that the transactions are committed, and the transaction that attempts to commit last is rolled back.

By default, the granularity of the conflict detection is at the row-level. That means it is sufficient for two overlapping transactions writing to the same row of a table to cause a conflict, even if they write to different columns.

Specifying a conflict detection level of COLUMN means that a conflict is only detected if both transactions modify the same column of the same row. This is more precise, but it requires more book-keeping in the transaction system and thus can impact performance.

Column-level conflict detection should be enabled if it is known that different transactions frequently modify different columns of the same row concurrently.

๐Ÿ”—UserProfiles Application

This application uses:

  • a stream events to receive events of user activity;
  • a dataset profiles to store user profiles with conflict detection at either the row or column level;
  • a dataset counters to count events by URL (this is not essential for the purpose of the example);
  • a service UserProfileService to create, delete, and update profiles; and
  • a flow ActivityFlow to count events and record the time of last activity for the users.

The UserProfileService is a service for creating and modifying user profiles. It has handlers to create, update, and retrieve user profiles.

A script (add-users.sh) is used to populate the profiles dataset. Two additional scripts (update-login.sh and send-events.sh) are used to create a conflict by attempting to write to two different columns of the same row at the same time.

๐Ÿ”—Building and Starting

  • You can build the example as described in Building an Example Application

  • Start CDAP (as described in Starting and Stopping CDAP).

  • Deploy the application, as described in Deploying an Application. For example, from the Standalone CDAP SDK directory, use the Command Line Interface (CLI):

    $ cdap cli load artifact examples/UserProfiles/target/UserProfiles-4.1.1.jar
    
    Successfully added artifact with name 'UserProfiles'
    
    $ cdap cli create app UserProfiles UserProfiles 4.1.1 user
    
    Successfully created application
    
    > cdap cli load artifact examples\UserProfiles\target\UserProfiles-4.1.1.jar
    
    Successfully added artifact with name 'UserProfiles'
    
    > cdap cli create app UserProfiles UserProfiles 4.1.1 user
    
    Successfully created application
    
  • Once the application has been deployed, you can start its components, as described in Starting an Application, and detailed at the start of running the example.

  • Once all components are started, run the example.

  • When finished, you can stop and remove the application.

๐Ÿ”—Running the Example

๐Ÿ”—Observing Conflict Detection

To observe conflict detection at both the row-level and column-level, you will need to modify and build this example twice:

  • The first time, you will use row-level conflict detection, and see errors appearing in a log;
  • The second time, you will use column-level conflict detection and see the scripts complete successfully without errors.
  1. Build the Application with Row-level Conflict Detection

    Before building the application, set the ConflictDetection appropriately in the class UserProfiles:

    createDataset("profiles", Table.class.getName(), TableProperties.builder()
      // create the profiles table with column-level conflict detection
      .setConflictDetection(ConflictDetection.COLUMN)
      .setSchema(profileSchema)
      // to indicate that the id field should come from the row key and not a row column
      .setRowFieldName("id")
      .setDescription("Profiles table with column-level conflict detection")
      .build());
    
    • The first time you build the application, set the Table.PROPERTY_CONFLICT_LEVEL to ConflictDetection.ROW.
    • Build the example (as described Building an Example Application).
    • Start CDAP, deploy and start the application and its component. Make sure you start the flow and service as described below.
    • Once the application has been deployed and started, you can run the example by starting the flow and service.
    • You should observe errors as described below, in the <CDAP-SDK-home>/logs/cdap-debug.log.
  2. Re-build the Application with Column-level Conflict Detection

    • Stop the application's flow and service (as described below).
    • Delete the existing dataset profiles, either through the CDAP Command Line Interface or by making a curl call.
    • Now, rebuild the application, setting the Table.PROPERTY_CONFLICT_LEVEL back to its original value, ConflictDetection.COLUMN.
    • Re-deploy and re-run the application. You should not see any errors in the log.

๐Ÿ”—Starting the Flow

  • Using the CDAP UI, go to the UserProfiles application overview page, programs tab, click ActivityFlow to get to the flow detail page, then click the Start button; or

  • From the Standalone CDAP SDK directory, use the Command Line Interface:

    $ cdap cli start flow UserProfiles.ActivityFlow
    
    Successfully started flow 'ActivityFlow' of application 'UserProfiles' with stored runtime arguments '{}'
    
    > cdap cli start flow UserProfiles.ActivityFlow
    
    Successfully started flow 'ActivityFlow' of application 'UserProfiles' with stored runtime arguments '{}'
    

๐Ÿ”—Starting the Service

  • Using the CDAP UI, go to the UserProfiles application overview page, programs tab, click UserProfileService to get to the service detail page, then click the Start button; or

  • From the Standalone CDAP SDK directory, use the Command Line Interface:

    $ cdap cli start service UserProfiles.UserProfileService
    
    Successfully started service 'UserProfileService' of application 'UserProfiles' with stored runtime arguments '{}'
    
    > cdap cli start service UserProfiles.UserProfileService
    
    Successfully started service 'UserProfileService' of application 'UserProfiles' with stored runtime arguments '{}'
    

๐Ÿ”—Populate the profiles Table

Populate the profiles table with users using a script. From the Standalone CDAP SDK directory, use:

$ ./examples/UserProfiles/bin/add-users.sh
> .\examples\UserProfiles\bin\add-users.bat

๐Ÿ”—Create a Conflict

Now, from two different terminals, run the following commands concurrently (they are set to run, by default, for 100 seconds):

  • To randomly update the time of last login for users:

    $ ./examples/UserProfiles/bin/update-login.sh
    
    > .\examples\UserProfiles\bin\update-login.bat 100 1
    
  • To generate random user activity events and send them to the stream:

    $ ./examples/UserProfiles/bin/send-events.sh
    
    > .\examples\UserProfiles\bin\send-events.bat 100 1
    

If both scripts are running at the same time, then some user profiles will be updated at the same time by the service and by the flow. With row-level conflict detection, you would see transaction conflicts in the logs. But when the profiles table uses column-level conflict detection, these conflicts are avoided.

To see the behavior with row-level conflict detection, set the dataset creation statement at the bottom of UserProfiles.java to use ConflictDetection.ROW.name() and re-run the steps as above. You should see transaction conflicts in the logs. (One of the scripts will stop when a conflict occurs. You can stop the other one at that time.)

For example, such a conflict would show as (reformatted to fit):

2015-XX-XX 13:22:30,520 - ERROR [executor-
7:c.c.c.e.p.UserProfileService$UserProfileServiceHandlera910e557f239fd6b95a3ded5c922df3a@-1] - Transaction Failure:
co.cask.tephra.TransactionConflictException: Conflict detected for transaction 1432066950514000002. at
co.cask.tephra.TransactionContext.checkForConflicts(TransactionContext.java:174) ~[co.cask.tephra.tephra-core-0.4.1.jar:na] at
co.cask.tephra.TransactionContext.finish(TransactionContext.java:79) ~[co.cask.tephra.tephra-core-0.4.1.jar:na] at
. . .

(The log file is located at <CDAP-SDK-HOME>/logs/cdap-debug.log. You should also see an error in the CDAP UI, in the UserProfileService error log.)

Note that in order to see this happen (and to change from row- to column- and vice-versa), you need to delete the existing dataset profiles before redeploying the application, to force its recreation with the new properties.

Running the example with ConflictDetection.COLUMN.name() will result in the two scripts running concurrently without transaction conflicts.

๐Ÿ”—Deleting any Existing profiles Dataset

If the profiles dataset has been created from an earlier deployment of the application and running of the example, it needs to be removed before the next deployment and running, so that it is created with the correct properties.

To delete the profiles dataset, either use the CDAP Command Line Interface:

$ cdap cli delete dataset instance profiles
> cdap cli delete dataset instance profiles

or by making a curl call:

$ curl -w"\n" -X DELETE "http://localhost:11015/v3/namespaces/default/data/datasets/profiles"
> curl -X DELETE "http://localhost:11015/v3/namespaces/default/data/datasets/profiles"

Then re-deploy the application.

๐Ÿ”—Stopping and Removing the Application

Once done, you can stop the applicationโ€”if it hasn't stopped alreadyโ€”as described in Stopping an Application. Here is an example-specific description of the steps:

Stopping the Flow

  • Using the CDAP UI, go to the UserProfiles application overview page, programs tab, click ActivityFlow to get to the flow detail page, then click the Stop button; or

  • From the Standalone CDAP SDK directory, use the Command Line Interface:

    $ cdap cli stop flow UserProfiles.ActivityFlow
    
    Successfully stopped flow 'ActivityFlow' of application 'UserProfiles'
    
    > cdap cli stop flow UserProfiles.ActivityFlow
    
    Successfully stopped flow 'ActivityFlow' of application 'UserProfiles'
    

Stopping the Service

  • Using the CDAP UI, go to the UserProfiles application overview page, programs tab, click UserProfileService to get to the service detail page, then click the Stop button; or

  • From the Standalone CDAP SDK directory, use the Command Line Interface:

    $ cdap cli stop service UserProfiles.UserProfileService
    
    Successfully stopped service 'UserProfileService' of application 'UserProfiles'
    
    > cdap cli stop service UserProfiles.UserProfileService
    
    Successfully stopped service 'UserProfileService' of application 'UserProfiles'
    

Removing the Application

You can now remove the application as described in Removing an Application, or:

  • Using the CDAP UI, go to the UserProfiles application overview page, programs tab, click the Actions menu on the right side and select Manage to go to the Management pane for the application, then click the Actions menu on the right side and select Delete to delete the application; or

  • From the Standalone CDAP SDK directory, use the Command Line Interface:

    $ cdap cli delete app UserProfiles
    
    > cdap cli delete app UserProfiles