🔗Stream Conversion

A Cask Data Application Platform (CDAP) example demonstrating Time-Partitioned FileSets.

🔗Overview

This application receives simple events through a stream, and periodically converts these events into partitions of a time-partitioned file set. These partitions can be queried with SQL. These are the components of the application:

  • The events stream receives simple events, where each event body is a number.
  • The converted dataset is a time-partitioned file set in Avro format.
  • The StreamConversionMapReduce reads the last five minutes of events from the stream and writes them to a new partition in the converted dataset.
  • The StreamConversionWorkflow is scheduled every five minutes and only runs the StreamConversionMapReduce.

Let's look at some of these components, and then run the application and see the results.

🔗The Stream Conversion Application

As in the other examples, the components of the application are tied together by the class StreamConversionApp:

public class StreamConversionApp extends AbstractApplication {

  static final String SCHEMA_STRING = Schema.recordOf(
    "streamEvent",
    Schema.Field.of("time", Schema.of(Schema.Type.LONG)),
    Schema.Field.of("body", Schema.of(Schema.Type.STRING))).toString();

  @Override
  public void configure() {
    addStream(new Stream("events"));
    addMapReduce(new StreamConversionMapReduce());
    addWorkflow(new StreamConversionWorkflow());
    scheduleWorkflow(Schedules.builder("every5min")
                       .setDescription("runs every 5 minutes")
                       .createTimeSchedule("*/5 * * * *"),
                     "StreamConversionWorkflow");

    // create the time-partitioned file set, configure it to work with MapReduce and with Explore
    createDataset("converted", TimePartitionedFileSet.class, FileSetProperties.builder()
      // properties for file set
      .setBasePath("converted")
      .setInputFormat(AvroKeyInputFormat.class)
      .setOutputFormat(AvroKeyOutputFormat.class)
      .setOutputProperty("schema", SCHEMA_STRING)
        // properties for explore (to create a partitioned hive table)
      .setEnableExploreOnCreate(true)
      .setSerDe("org.apache.hadoop.hive.serde2.avro.AvroSerDe")
      .setExploreInputFormat("org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat")
      .setExploreOutputFormat("org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat")
      .setTableProperty("avro.schema.literal", SCHEMA_STRING)
      .setDescription("Converted stream events dataset")
      .build());
  }
}

The interesting part is the creation of the dataset converted:

  • It is a TimePartitionedFileSet. This dataset manages the files in a FileSet by associating each file with a timestamp.
  • The properties are divided in two sections:
    • The first set of properties configures the underlying FileSet, as documented in the FileSet section.
    • The second set of properties configures how the dataset is queryable with SQL. Here we can enable the dataset for querying, and if so, we must specify Hive-specific properties for the Avro format: the Avro SerDe, an input and an output format, and an additional table property: namely, the schema for the Avro SerDe.

🔗The MapReduce Program

In its initialize() method, the StreamConversionMapReduce determines its logical start time, and it configures the events stream as its input and the converted dataset as its output:

  • This is a map-only MapReduce program; in other words, it has no reducers, and the mappers write directly to the output in Avro format:

    Job job = context.getHadoopJob();
    job.setMapperClass(StreamConversionMapper.class);
    job.setNumReduceTasks(0);
    job.setMapOutputKeyClass(AvroKey.class);
    job.setMapOutputValueClass(NullWritable.class);
    AvroJob.setOutputKeySchema(job, SCHEMA);
    
  • Based on the logical start time, the MapReduce determines the range of events to read from the stream:

    // read 5 minutes of events from the stream, ending at the logical start time of this run
    long logicalTime = context.getLogicalStartTime();
    context.addInput(Input.ofStream("events", logicalTime - TimeUnit.MINUTES.toMillis(5), logicalTime));
    
  • Each MapReduce run writes its output to a partition with the logical start time:

    TimePartitionedFileSetArguments.setOutputPartitionTime(dsArguments, logicalTime);
    context.addOutput(Output.ofDataset("converted", dsArguments));
    
  • Note that the output file path is derived from the output partition time by the dataset itself:

    TimePartitionedFileSet partitionedFileSet = context.getDataset("converted", dsArguments);
    LOG.info("Output location for new partition is: {}",
             partitionedFileSet.getEmbeddedFileSet().getOutputLocation());
    
  • The Mapper itself is straight-forward: for each event, it emits an Avro record:

    public static class StreamConversionMapper extends
      Mapper<LongWritable, StreamEvent, AvroKey<GenericRecord>, NullWritable> {
    
      @Override
      public void map(LongWritable timestamp, StreamEvent streamEvent, Context context)
        throws IOException, InterruptedException {
        GenericRecordBuilder recordBuilder = new GenericRecordBuilder(SCHEMA)
          .set("time", streamEvent.getTimestamp())
          .set("body", Bytes.toString(streamEvent.getBody()));
        GenericRecord record = recordBuilder.build();
        context.write(new AvroKey<>(record), NullWritable.get());
      }
    }
    

🔗Building and Starting

  • You can build the example as described in Building an Example Application

  • Start CDAP (as described in Starting and Stopping CDAP).

  • Deploy the application, as described in Deploying an Application. For example, from the Standalone CDAP SDK directory, use the Command Line Interface (CLI):

    $ cdap cli load artifact examples/StreamConversion/target/StreamConversion-4.1.1.jar
    
    Successfully added artifact with name 'StreamConversion'
    
    $ cdap cli create app StreamConversionApp StreamConversion 4.1.1 user
    
    Successfully created application
    
    > cdap cli load artifact examples\StreamConversion\target\StreamConversion-4.1.1.jar
    
    Successfully added artifact with name 'StreamConversion'
    
    > cdap cli create app StreamConversionApp StreamConversion 4.1.1 user
    
    Successfully created application
    
  • Once the application has been deployed, you can start its components, as described in Starting an Application, and detailed at the start of running the example.

  • Once all components are started, run the example.

  • When finished, you can stop and remove the application.

🔗Running the Example

🔗Resuming the Schedule

The only thing you need to do to start the application is resume the schedule, as the schedule is in a suspended state when the application is deployed.

  • Using the CDAP UI, go to the StreamConversionApp application overview page, programs tab,
    • click StreamConversionWorkflow to get to the workflow detail page,
    • click on the More button in the upper-right to display an additional menu,
    • click on the Schedules menu-item to show the schedule, and then
    • click the Play button (>) so that the status shows as scheduled; or
  • From the Standalone CDAP SDK directory, use the Command Line Interface:

    $ cdap cli resume schedule StreamConversionApp.every5min
    
    Successfully resumed schedule 'every5min' in app 'StreamConversionApp'
    
    > cdap cli resume schedule StreamConversionApp.every5min
    
    Successfully resumed schedule 'every5min' in app 'StreamConversionApp'
    

🔗Running the Workflow

The StreamConversionWorkflow will run automatically every five minutes based on its schedule. To give it some data, you can use a provided script to send events to the stream, for example, to send 10000 events at a rate of roughly two per second (one per second in the case of Windows):

$ examples/StreamConversion/bin/send-events.sh --events 10000 --delay 0.5
> examples\StreamConversion\bin\send-events.bat 10000 1

You can now wait for the workflow to run, after which you can query the partitions in the converted dataset:

$ cdap cli execute "\"show partitions dataset_converted\""

+============================================+
| partition: STRING                          |
+============================================+
| year=2015/month=1/day=28/hour=17/minute=30 |
| year=2015/month=1/day=28/hour=17/minute=35 |
| year=2015/month=1/day=28/hour=17/minute=40 |
+============================================+
> cdap cli execute "\"show partitions dataset_converted\""

+============================================+
| partition: STRING |
+============================================+
| year=2015\month=1\day=28\hour=17\minute=30 |
| year=2015\month=1\day=28\hour=17\minute=35 |
| year=2015\month=1\day=28\hour=17\minute=40 |
+============================================+

Note that in the Hive meta store, the partitions are registered with multiple dimensions rather than the time since the Epoch: the year, month, and day of the month plus the hour and minute of the day.

You can also query the data in the dataset. For example, to find the five most frequent body texts, issue:

$ cdap cli execute "\"select count(*) as count, body from dataset_converted group by body order by count desc limit 5\""

+==============================+
| count: BIGINT | body: STRING |
+==============================+
| 86            | 53           |
| 81            | 92           |
| 75            | 45           |
| 73            | 24           |
| 70            | 63           |
+==============================+
> cdap cli execute "\"select count(*) as count, body from dataset_converted group by body order by count desc limit 5\""

+==============================+
| count: BIGINT | body: STRING |
+==============================+
| 86 | 53 |
| 81 | 92 |
| 75 | 45 |
| 73 | 24 |
| 70 | 63 |
+==============================+

Because this dataset is time-partitioned, you can use the partitioning keys to restrict the scope of the query. For example, to run the same query for only the month of January, use the query:

select count(*) as count, body from dataset_converted where month=1 group by body order by count desc limit 5

🔗Stopping and Removing the Application

Once done, you can stop the application—if it hasn't stopped already—as described in Stopping an Application. Here is an example-specific description of the steps:

Suspending the Schedule

The only thing you need to do to stop the application is suspend the schedule.

  • Using the CDAP UI, go to the StreamConversionApp application overview page, programs tab:
    • click StreamConversionWorkflow to get to the workflow detail page,
    • click on the More button in the upper-right to display an additional menu,
    • click on the Schedules menu-item to show the schedule, and then
    • click the Pause button (| |) so that the status shows as suspended; or
  • From the Standalone CDAP SDK directory, use the Command Line Interface:

    $ cdap cli suspend schedule StreamConversionApp.every5min
    
    Successfully suspended schedule 'every5min' in app 'StreamConversionApp'
    
    > cdap cli suspend schedule StreamConversionApp.every5min
    
    Successfully suspended schedule 'every5min' in app 'StreamConversionApp'
    

Removing the Application

You can now remove the application as described in Removing an Application, or:

  • Using the CDAP UI, go to the StreamConversionApp application overview page, programs tab, click the Actions menu on the right side and select Manage to go to the Management pane for the application, then click the Actions menu on the right side and select Delete to delete the application; or

  • From the Standalone CDAP SDK directory, use the Command Line Interface:

    $ cdap cli delete app StreamConversionApp
    
    > cdap cli delete app StreamConversionApp