🔗Count Random

A Cask Data Application Platform (CDAP) example demonstrating the @Tick feature of flows.

🔗Overview

This application does not have a stream; instead, it uses a tick annotation in the source flowlet to generate data:

  • The generate method of the source flowlet has a @Tick annotation which specifies how frequently the method will be called.
  • The source flowlet generates a random integer in the range {1..10000} and emits it to the next flowlet splitter.
  • The splitter flowlet splits the number into digits, and emits these digits to the next stage.
  • The counter flowlet increments the count of the received number in the KeyValueTable.

Let's look at some of these components, and then run the application and see the results.

🔗The Count Random Application

As in the other examples, the components of the application are tied together by the class CountRandom:

public class CountRandom extends AbstractApplication {

  public static final String TABLE_NAME = "randomTable";

  @Override
  public void configure() {
    setName("CountRandom");
    setDescription("Example random count application");
    createDataset(TABLE_NAME, KeyValueTable.class, DatasetProperties.builder().setDescription("Counts table").build());
    addFlow(new CountRandomFlow());
  }
}

The flow contains three flowlets:

@Override
protected void configure() {
  setName("CountRandom");
  setDescription("CountRandom Flow");
  addFlowlet("source", new RandomSource());
  addFlowlet("splitter", new NumberSplitter());
  addFlowlet("counter", new NumberCounter());
  connect("source", "splitter");
  connect("splitter", "counter");
}

The source flowlet generates random numbers every 1 millisecond. It can also be configured through runtime arguments:

  • whether to emit events or not; and
  • whether to sleep for an additional delay after each event.
public class RandomSource extends AbstractFlowlet {
  private OutputEmitter<Integer> randomOutput;

  private final Random random = new Random();
  private long delay = 0;
  private boolean emit = true;

  @Override
  public void initialize(FlowletContext context) throws Exception {
    super.initialize(context);
    String delayStr = context.getRuntimeArguments().get("delay");
    if (delayStr != null) {
      delay = Long.parseLong(delayStr);
    }
    String emitStr = context.getRuntimeArguments().get("emit");
    if (emitStr != null) {
      emit = Boolean.parseBoolean(emitStr);
    }
  }

  @Tick(delay = 1L, unit = TimeUnit.MILLISECONDS)
  public void generate() throws InterruptedException {
    if (emit) {
      randomOutput.emit(random.nextInt(10000));
    }
    if (delay > 0) {
      TimeUnit.MILLISECONDS.sleep(delay);
    }
  }
}

The splitter flowlet emits four numbers for every number that it receives.

public class NumberSplitter extends AbstractFlowlet {
  private OutputEmitter<Integer> output;

  @ProcessInput
  public void process(Integer number)  {
    emit(number % 10000);
    emit(number % 1000);
    emit(number % 100);
    emit(number % 10);
  }

  private void emit(Integer i) {
    // emit i with a hash key named "n" and value i
    output.emit(i, "n", i);
  }
}

Note that this flowlet associates a hash value named n with every number that it emits. That allows the counter flowlet to use a hash partitioning strategy when consuming the numbers it receives. This ensures that there are no transaction conflicts if the flowlet is scaled to multiple instances:

public class NumberCounter extends AbstractFlowlet {

  @UseDataSet(CountRandom.TABLE_NAME)
  KeyValueTable counters;

  @ProcessInput
  @HashPartition("n")
  public void process(Integer number) {
    counters.increment(number.toString().getBytes(), 1L);
  }
}

🔗Building and Starting

  • You can build the example as described in Building an Example Application

  • Start CDAP (as described in Starting and Stopping CDAP).

  • Deploy the application, as described in Deploying an Application. For example, from the Standalone CDAP SDK directory, use the Command Line Interface (CLI):

    $ cdap cli load artifact examples/CountRandom/target/CountRandom-4.1.1.jar
    
    Successfully added artifact with name 'CountRandom'
    
    $ cdap cli create app CountRandom CountRandom 4.1.1 user
    
    Successfully created application
    
    > cdap cli load artifact examples\CountRandom\target\CountRandom-4.1.1.jar
    
    Successfully added artifact with name 'CountRandom'
    
    > cdap cli create app CountRandom CountRandom 4.1.1 user
    
    Successfully created application
    
  • Once the application has been deployed, you can start its components, as described in Starting an Application, and detailed at the start of running the example.

  • Once all components are started, run the example.

  • When finished, you can stop and remove the application.

🔗Running the Example

🔗Starting the Flow

  • Using the CDAP UI, go to the CountRandom application overview page, programs tab, click CountRandom to get to the flow detail page, then click the Start button; or

  • From the Standalone CDAP SDK directory, use the Command Line Interface:

    $ cdap cli start flow CountRandom.CountRandom
    
    Successfully started flow 'CountRandom' of application 'CountRandom' with stored runtime arguments '{}'
    
    > cdap cli start flow CountRandom.CountRandom
    
    Successfully started flow 'CountRandom' of application 'CountRandom' with stored runtime arguments '{}'
    

Once you start the flow, the source flowlet will continuously generate data. You can see this by observing the counters for each flowlet in the flow visualization. Even though you are not injecting any data into the flow, the counters increase steadily.

🔗Querying the Results

You can see the results by executing a SQL query using the CDAP UI. Go to the randomTable dataset overview page, explore tab and click the Execute SQL button. When the query has finished and is hi-lighted in color, you can view the results by clicking a middle Action button in the right-side of the Results table. A pop-up window will show you the different keys and their values.

🔗Stopping and Removing the Application

Once done, you can stop the application—if it hasn't stopped already—as described in Stopping an Application. Here is an example-specific description of the steps:

Stopping the Flow

  • Using the CDAP UI, go to the CountRandom application overview page, programs tab, click CountRandom to get to the flow detail page, then click the Stop button; or

  • From the Standalone CDAP SDK directory, use the Command Line Interface:

    $ cdap cli stop flow CountRandom.CountRandom
    
    Successfully stopped flow 'CountRandom' of application 'CountRandom'
    
    > cdap cli stop flow CountRandom.CountRandom
    
    Successfully stopped flow 'CountRandom' of application 'CountRandom'
    

Removing the Application

You can now remove the application as described in Removing an Application, or:

  • Using the CDAP UI, go to the CountRandom application overview page, programs tab, click the Actions menu on the right side and select Manage to go to the Management pane for the application, then click the Actions menu on the right side and select Delete to delete the application; or

  • From the Standalone CDAP SDK directory, use the Command Line Interface:

    $ cdap cli delete app CountRandom
    
    > cdap cli delete app CountRandom