🔗Sport Results

A Cask Data Application Platform (CDAP) example demonstrating partitioned file sets through sport results analytics.

🔗Overview

This application demonstrates the use of the PartitionedFileSet datasets, MapReduce with runtime arguments, and ad-hoc queries over file sets:

  • Game results are stored in the PartitionedFileSet results. It is partitioned by league and season, and each partition is a CSV (comma-separated values) file containing the results in one league for a season; for example, the 2014 season of the NFL (National Football League).
  • Results are uploaded into the file set using the UploadService.
  • The ScoreCounter MapReduce program reads game results for a given league and aggregates total counts such as games won and lost, or points scored and conceded, across all seasons, and writes these totals to the partitioned file set totals that is partitioned by league.
  • Both the original game results and the aggregated totals can be explored using ad-hoc SQL queries.

Let's look at some of these components, and then run the application and see the results.

🔗The SportResults Application

As in the other examples, the components of the application are tied together by the class SportResults:

public class SportResults extends AbstractApplication {

  @Override
  public void configure() {
    addService(new UploadService());
    addMapReduce(new ScoreCounter());

    // Create the "results" partitioned file set, configure it to work with MapReduce and with Explore
    createDataset("results", PartitionedFileSet.class, PartitionedFileSetProperties.builder()
      // Properties for partitioning
      .setPartitioning(Partitioning.builder().addStringField("league").addIntField("season").build())
      // Properties for file set
      .setInputFormat(TextInputFormat.class)
      .setOutputFormat(TextOutputFormat.class)
      .setOutputProperty(TextOutputFormat.SEPERATOR, ",")
      // Properties for Explore (to create a partitioned Hive table)
      .setEnableExploreOnCreate(true)
      .setExploreFormat("csv")
      .setExploreSchema("`date` STRING, winner STRING, loser STRING, winnerpoints INT, loserpoints INT")
      .setExploreTableName("results")
      .setDescription("FileSet dataset of game results for a sport league and season")
      .build());

    // Create the aggregates partitioned file set, configure it to work with MapReduce and with Explore
    createDataset("totals", PartitionedFileSet.class, PartitionedFileSetProperties.builder()
      // Properties for partitioning
      .setPartitioning(Partitioning.builder().addStringField("league").build())
      // Properties for file set
      .setInputFormat(TextInputFormat.class)
      .setOutputFormat(TextOutputFormat.class)
      .setOutputProperty(TextOutputFormat.SEPERATOR, ",")
      // Properties for Explore (to create a partitioned Hive table)
      .setEnableExploreOnCreate(true)
      .setExploreTableName("totals")
      .setExploreFormat("csv")
      .setExploreSchema("team STRING, wins INT, ties INT, losses INT, scored INT, conceded INT")
      .setDescription("FileSet dataset of aggregated results for each sport league")
      .build());
  }
}

The configure() method creates the two PartitionedFileSet datasets used in this example.

  • Both datasets use CSV as the format: for MapReduce, they use the TextInputFormat and TextOutputFormat with a comma (",") as the field separator; for Explore, they use the csv format.

  • The first dataset (results) is partitioned by league and season. Each record represents a single game with a date, a winning and a losing team, and the winner's and the loser's points, for example:

    2011/9/5,Dallas Cowboys,New York Giants,24,17
    2011/9/9,Philadelphia Eagles,Cleveland Browns,17,16
    2011/9/9,New England Patriots,Tennessee Titans,34,13
    

    We have included some sample data in the resources directory. Note that because the column name date is a Hive reserved keyword, it has been enclosed in single back-ticks in the Explore schema declaration.

  • The totals dataset stores aggregates across all seasons and thus has the league as its single partitioning field. Each record has, for an individual team, the total number of games won and lost and the total number of points scored and conceded.

We will use the UploadService to upload the sample data files into the results dataset, then compute the totals aggregates using MapReduce, and we will explore both datasets using SQL.

🔗UploadService

This service has two handler methods: one to upload and another to download a partition of the results dataset as a file. It declares its use of the dataset using a @UseDataSet annotation:

@UseDataSet("results")
private PartitionedFileSet results;

Let's take a closer look at the upload method:

  • It first creates a partition key from the league and season received as path parameters in the request URL.
  • Then it obtains a PartitionOutput for that partition key from the results dataset.
  • It then uses the getLocation method of the PartitionOutput to obtain the location for writing the file, and opens an output stream for that location to write the file contents. Location is a file system abstraction from Apache™ Twill®; you can read more about its interface in the Apache Twill Javadocs.
  • It then returns an HttpContentConsumer to consume the incoming request body.
    • In the onReceive method, it keeps appending newly received bytes to the output stream.
    • In the onFinish method, it registers the written file as a new partition in the dataset, by calling the addPartition method of the PartitionOutput.
    • In the onError method, it does cleanup by removing the partially written file and responds with an error status.
@PUT
@Path("leagues/{league}/seasons/{season}")
@TransactionPolicy(TransactionControl.EXPLICIT)
public HttpContentConsumer write(HttpServiceRequest request, HttpServiceResponder responder,
                                 @PathParam("league") String league, @PathParam("season") int season)
  throws TransactionFailureException {

  final PartitionKey key = PartitionKey.builder()
    .addStringField("league", league)
    .addIntField("season", season)
    .build();
  final AtomicReference<PartitionDetail> partitionDetail = new AtomicReference<>();

  getContext().execute(new TxRunnable() {
    @Override
    public void run(DatasetContext context) throws Exception {
      partitionDetail.set(results.getPartition(key));
    }
  });

  if (partitionDetail.get() != null) {
    responder.sendString(409, "Partition exists.", Charsets.UTF_8);
    return null;
  }

  final PartitionOutput output = results.getPartitionOutput(key);
  try {
    final Location partitionDir = output.getLocation();
    if (!partitionDir.mkdirs()) {
      responder.sendString(409, "Partition exists.", Charsets.UTF_8);
      return null;
    }

    final Location location = partitionDir.append("file");
    final WritableByteChannel channel = Channels.newChannel(location.getOutputStream());
    return new HttpContentConsumer() {
      @Override
      public void onReceived(ByteBuffer chunk, Transactional transactional) throws Exception {
        channel.write(chunk);
      }

      @Override
      public void onFinish(HttpServiceResponder responder) throws Exception {
        channel.close();
        output.addPartition();
        responder.sendStatus(200);
      }

      @Override
      public void onError(HttpServiceResponder responder, Throwable failureCause) {
        Closeables.closeQuietly(channel);
        try {
          partitionDir.delete(true);
        } catch (IOException e) {
          LOG.warn("Failed to delete partition directory '{}'", partitionDir, e);
        }
        LOG.debug("Unable to write path {}", location, failureCause);
        responder.sendError(400, String.format("Unable to write path '%s'. Reason: '%s'",
                                               location, failureCause.getMessage()));
      }
    };
  } catch (IOException e) {
    responder.sendError(400, String.format("Unable to write path '%s'. Reason: '%s'",
                                           output.getRelativePath(), e.getMessage()));
    return null;
  }
}

🔗MapReduce over File Partitions

ScoreCounter is a simple MapReduce that reads from the results PartitionedFileSet and writes to the totals PartitionedFileSet. The initialize method prepares the MapReduce program for this:

  • It reads the league that it is supposed to process from the runtime arguments.
  • It constructs a partition filter for the input using the league as the only condition, and instantiates the results dataset with arguments that contain this filter.
  • It constructs an output partition key for the new partition, and instantiates the totals dataset with arguments specifying that partition key.
public class ScoreCounter extends AbstractMapReduce {

  private static final Logger LOG = LoggerFactory.getLogger(ScoreCounter.class);

  @Override
  public void configure() {
    setDescription("reads game results and counts statistics per team");
    setMapperResources(new Resources(512));
  }

  @Override
  public void initialize() throws Exception {
    MapReduceContext context = getContext();
    Job job = context.getHadoopJob();
    job.setMapperClass(ResultsMapper.class);
    job.setReducerClass(TeamCounter.class);
    job.setNumReduceTasks(1);

    String league = context.getRuntimeArguments().get("league");
    Preconditions.checkNotNull(league);

    // Configure the input to read all seasons for the league
    Map<String, String> inputArgs = Maps.newHashMap();
    PartitionedFileSetArguments.setInputPartitionFilter(
      inputArgs, PartitionFilter.builder().addValueCondition("league", league).build());
    context.addInput(Input.ofDataset("results", inputArgs));

    // Each run writes its output to a partition for the league
    Map<String, String> outputArgs = Maps.newHashMap();
    PartitionKey outputKey = PartitionKey.builder().addStringField("league", league).build();
    PartitionedFileSetArguments.setOutputPartitionKey(outputArgs, outputKey);
    context.addOutput(Output.ofDataset("totals", outputArgs));

    // used only for logging:
    PartitionedFileSet input = context.getDataset("results", inputArgs);
    PartitionedFileSet outputFileSet = context.getDataset("totals", outputArgs);
    String outputPath = FileSetArguments.getOutputPath(outputFileSet.getEmbeddedFileSet().getRuntimeArguments());
    LOG.info("input: {}, output: {}", input.getEmbeddedFileSet().getInputLocations(), outputPath);
  }
...

It is worth mentioning that nothing else in ScoreCounter is specifically programmed to use file partitions. Instead of results and totals, it could use any other dataset as long as the key and value types match.

🔗Building and Starting

  • You can build the example as described in Building an Example Application

  • Start CDAP (as described in Starting and Stopping CDAP).

  • Deploy the application, as described in Deploying an Application. For example, from the Standalone CDAP SDK directory, use the Command Line Interface (CLI):

    $ cdap cli load artifact examples/SportResults/target/SportResults-4.1.1.jar
    
    Successfully added artifact with name 'SportResults'
    
    $ cdap cli create app SportResults SportResults 4.1.1 user
    
    Successfully created application
    
    > cdap cli load artifact examples\SportResults\target\SportResults-4.1.1.jar
    
    Successfully added artifact with name 'SportResults'
    
    > cdap cli create app SportResults SportResults 4.1.1 user
    
    Successfully created application
    
  • Once the application has been deployed, you can start its components, as described in Starting an Application, and detailed at the start of running the example.

  • Once all components are started, run the example.

  • When finished, you can stop and remove the application.

🔗Running the Example

🔗Starting the Service

  • Using the CDAP UI, go to the SportResults application overview page, programs tab, click UploadService to get to the service detail page, then click the Start button; or

  • From the Standalone CDAP SDK directory, use the Command Line Interface:

    $ cdap cli start service SportResults.UploadService
    
    Successfully started service 'UploadService' of application 'SportResults' with stored runtime arguments '{}'
    
    > cdap cli start service SportResults.UploadService
    
    Successfully started service 'UploadService' of application 'SportResults' with stored runtime arguments '{}'
    

🔗Uploading Game Results

Begin by uploading some CSV files into the results dataset. For example, to upload the results for the 2012 season of the NFL (National Football League):

$ cdap cli call service SportResults.UploadService PUT leagues/nfl/seasons/2012 body:file examples/SportResults/resources/nfl-2012.csv
> cdap cli call service SportResults.UploadService PUT leagues\nfl\seasons\2012 body:file examples\SportResults\resources\nfl-2012.csv

Using a curl command:

$ curl -w"\n" -X PUT localhost:11015/v3/namespaces/default/apps/SportResults/services/UploadService/methods/leagues/nfl/seasons/2012 \
--data-binary @examples/SportResults/resources/nfl-2012.csv
> curl -X PUT localhost:11015/v3/namespaces/default/apps/SportResults/services/UploadService/methods/leagues/nfl/seasons/2012 ^
--data-binary @examples\SportResults\resources\nfl-2012.csv

Feel free to add more seasons—and sport leagues:

$ cdap cli call service SportResults.UploadService PUT leagues/nfl/seasons/2013 body:file examples/SportResults/resources/nfl-2013.csv

$ cdap cli call service SportResults.UploadService PUT leagues/nba/seasons/2012 body:file examples/SportResults/resources/nba-2012.csv

$ cdap cli call service SportResults.UploadService PUT leagues/nba/seasons/2013 body:file examples/SportResults/resources/nba-2013.csv
> cdap cli call service SportResults.UploadService PUT leagues\nfl\seasons\2013 body:file examples\SportResults\resources\nfl-2013.csv

> cdap cli call service SportResults.UploadService PUT leagues\nba\seasons\2012 body:file examples\SportResults\resources\nba-2012.csv

> cdap cli call service SportResults.UploadService PUT leagues\nba\seasons\2013 body:file examples\SportResults\resources\nba-2013.csv

Note that the files can only be uploaded once for each season and league. A subsequent upload would fail because the file partition already exists.

🔗Starting the MapReduce

To run the ScoreCounter over all seasons of the NFL:

$ cdap cli start mapreduce SportResults.ScoreCounter league=nfl
> cdap cli start mapreduce SportResults.ScoreCounter league=nfl

Note that the MapReduce can only be run once for each league. A subsequent run would fail because the output already exists.

🔗Exploring with Ad-hoc SQL

Both of the partitioned file sets are registered as external tables in Hive and can be explored with SQL. To see the existing partitions of a dataset, use the show partitions query:

$ cdap cli execute "\"show partitions results\""
> cdap cli execute "\"show partitions results\""

For example, to find the three games with the highest point difference in the 2012 NFL season, over all seasons (that have been uploaded), and for all seasons of all sport leagues:

$ cdap cli execute "\"select * from results where league='nfl' and season=2012 order by winnerpoints-loserpoints desc limit 3\""

$ cdap cli execute "\"select * from results where league='nfl' order by winnerpoints-loserpoints desc limit 3\""

$ cdap cli execute "\"select * from results order by winnerpoints-loserpoints desc limit 3\""
> cdap cli execute "\"select * from results where league='nfl' and season=2012 order by winnerpoints-loserpoints desc limit 3\""

> cdap cli execute "\"select * from results where league='nfl' order by winnerpoints-loserpoints desc limit 3\""

> cdap cli execute "\"select * from results order by winnerpoints-loserpoints desc limit 3\""

You can also explore the totals dataset. For example, to find the NFL teams team that, over their history, have scored the least points compared to the points they conceded:

$ cdap cli execute "\"select * from totals where league = 'nfl' order by conceded - scored desc limit 3"\"
> cdap cli execute "\"select * from totals where league = 'nfl' order by conceded - scored desc limit 3"\"

The last command would produce results (your results may vary, depending on the datasets you load) such as:

Successfully connected CDAP instance at http://localhost:11015/default
+===============================================================================================================================+
| totals.team: ST | totals.wins: IN | totals.ties: IN | totals.losses:  | totals.scored:  | totals.conceded | totals.league: ST |
| RING            | T               | T               | INT             | INT             | : INT           | RING              |
+===============================================================================================================================+
| Jacksonville Ja | 6               | 0               | 26              | 502             | 893             | nfl               |
| guars           |                 |                 |                 |                 |                 |                   |
|-------------------------------------------------------------------------------------------------------------------------------|
| Oakland Raiders | 8               | 0               | 24              | 612             | 896             | nfl               |
|-------------------------------------------------------------------------------------------------------------------------------|
| New York Jets   | 14              | 0               | 18              | 571             | 762             | nfl               |
+===============================================================================================================================+
Fetched 3 rows

🔗Stopping and Removing the Application

Once done, you can stop the application—if it hasn't stopped already—as described in Stopping an Application. Here is an example-specific description of the steps:

Stopping the Service

  • Using the CDAP UI, go to the SportResults application overview page, programs tab, click UploadService to get to the service detail page, then click the Stop button; or

  • From the Standalone CDAP SDK directory, use the Command Line Interface:

    $ cdap cli stop service SportResults.UploadService
    
    Successfully stopped service 'UploadService' of application 'SportResults'
    
    > cdap cli stop service SportResults.UploadService
    
    Successfully stopped service 'UploadService' of application 'SportResults'
    

Removing the Application

You can now remove the application as described in Removing an Application, or:

  • Using the CDAP UI, go to the SportResults application overview page, programs tab, click the Actions menu on the right side and select Manage to go to the Management pane for the application, then click the Actions menu on the right side and select Delete to delete the application; or

  • From the Standalone CDAP SDK directory, use the Command Line Interface:

    $ cdap cli delete app SportResults
    
    > cdap cli delete app SportResults