FileSet Example

A Cask Data Application Platform (CDAP) example demonstrating the FileSet dataset and its use in services and MapReduce. The application also shows how to perform dataset management operations in programs.

Overview

This application demonstrates the use of the FileSet dataset:

  • The lines FileSet is used as input for the WordCount MapReduce program.
  • The counts FileSet is used as output for the WordCount MapReduce program.
  • The FileSetService allows uploading and downloading files within these two file sets. It also allows creating and managing additional file sets that can be used as input or output.

Let's look at some of these components, and then run the application and see the results.

The FileSetExample Application

As in the other examples, the components of the application are tied together by the class FileSetExample:

public class FileSetExample extends AbstractApplication {

  @Override
  public void configure() {
    setName("FileSetExample");
    setDescription("Application with a MapReduce that uses a FileSet dataset");
    createDataset("lines", FileSet.class, FileSetProperties.builder()
      .setBasePath("example/data/lines")
      .setInputFormat(TextInputFormat.class)
      .setOutputFormat(TextOutputFormat.class)
      .setDescription("Store input lines")
      .build());
    createDataset("counts", FileSet.class, FileSetProperties.builder()
      .setInputFormat(TextInputFormat.class)
      .setOutputFormat(TextOutputFormat.class)
      .setOutputProperty(TextOutputFormat.SEPERATOR, ":")
      .setDescription("Store word counts")
      .build());
    addService(new FileSetService());
    addMapReduce(new WordCount());
  }
}

The configure() method creates the two FileSet datasets used in this example. For the first FileSet—lines—we specify an explicit base path; for the second FileSet—counts—we let CDAP generate a path. Also, we configure the output format to use ":" as the separator for the counts FileSet.

We will use the FileSetService to upload a file into the lines file set, then count the words in that file using MapReduce, and finally download the word counts from the counts file set.

FileSetService

This service has one method to upload and another to download a file. Both of these methods have two arguments: the name of the FileSet and the relative path within that FileSet. For example, the read method returns the contents of the requested file for a GET request:

@GET
@Path("{fileset}")
@TransactionPolicy(TransactionControl.EXPLICIT)
public void read(HttpServiceRequest request, HttpServiceResponder responder,
                 @PathParam("fileset") String set, @QueryParam("path") String filePath) {

  FileSet fileSet;
  try {
    fileSet = getContext().getDataset(set);
  } catch (DatasetInstantiationException e) {
    LOG.warn("Error instantiating file set {}", set, e);
    responder.sendError(400, String.format("Invalid file set name '%s'", set));
    return;
  }

  Location location = fileSet.getLocation(filePath);
  getContext().discardDataset(fileSet);

  try {
    responder.send(200, location, "application/octet-stream");
  } catch (IOException e) {
    responder.sendError(400, String.format("Unable to read path '%s' in file set '%s'", filePath, set));
  }
}

It first instantiates the dataset specified by the first path parameter through its HttpServiceContext. Note that, conceptually, this method is not limited to using only the two datasets of this application (lines and counts)—getDataset() can dynamically instantiate any existing dataset.

The handler method then uses the getLocation() of the FileSet to obtain the location representing the requested file, and it opens an input stream for that location. Location is a file system abstraction from Apache™ Twill®; you can read more about its interface in the Apache Twill Javadocs.

Note that after obtaining the location from the FileSet, the handler discards that dataset through its context—it is not needed any more and therefore can be returned to the system. This is not strictly necessary: all datasets are eventually reclaimed by the system. However, explicitly discarding a dataset allows the system to reclaim it either immediately (as in this case) or as soon as the current transaction ends; in any case, possibly freeing valuable resources.

The write method uses an HttpContentConsumer to stream the body of the request to the location specified by the path query parameter. See the section on Handling Large Requests and the Sport Results Example for a more detailed explanation:

@PUT
@Path("{fileset}")
@TransactionPolicy(TransactionControl.EXPLICIT)
public HttpContentConsumer write(HttpServiceRequest request, HttpServiceResponder responder,
                                 @PathParam("fileset") final String set,
                                 @QueryParam("path") final String filePath) {
  FileSet fileSet;
  try {
    fileSet = getContext().getDataset(set);
  } catch (DatasetInstantiationException e) {
    LOG.warn("Error instantiating file set {}", set, e);
    responder.sendError(400, String.format("Invalid file set name '%s'", set));
    return null;
  }

  final Location location = fileSet.getLocation(filePath);
  getContext().discardDataset(fileSet);
  try {
    final WritableByteChannel channel = Channels.newChannel(location.getOutputStream());
    return new HttpContentConsumer() {
      @Override
      public void onReceived(ByteBuffer chunk, Transactional transactional) throws Exception {
        channel.write(chunk);
      }

      @Override
      public void onFinish(HttpServiceResponder responder) throws Exception {
        channel.close();
        responder.sendStatus(200);
      }

      @Override
      public void onError(HttpServiceResponder responder, Throwable failureCause) {
        Closeables.closeQuietly(channel);
        try {
          location.delete();
        } catch (IOException e) {
          LOG.warn("Failed to delete {}", location, e);
        }
        LOG.debug("Unable to write path '{}' in file set '{}'", filePath, set, failureCause);
        responder.sendError(400, String.format("Unable to write path '%s' in file set '%s'. Reason: '%s'",
                                               filePath, set, failureCause.getMessage()));
      }
    };
  } catch (IOException e) {
    responder.sendError(400, String.format("Unable to write path '%s' in file set '%s'. Reason: '%s'",
                                           filePath, set, e.getMessage()));
    return null;
  }
}

In addition to reading and writing individual files, the FileSetService also allows performing dataset management operations, including creating, updating, truncating, and dropping file sets. These operations are available through the program context's getAdmin() interface. For example, the service has an endpoint that creates a new file set, either by cloning an existing file set's dataset properties, or using the properties submitted in the request body:

@POST
@Path("{fileset}/create")
@TransactionPolicy(TransactionControl.EXPLICIT)
public void create(HttpServiceRequest request, HttpServiceResponder responder,
                   @PathParam("fileset") final String set,
                   @Nullable @QueryParam("clone") final String clone) throws DatasetManagementException {
  DatasetProperties properties = DatasetProperties.EMPTY;
  ByteBuffer content = request.getContent();
  if (clone != null) {
    try {
      properties = getContext().getAdmin().getDatasetProperties(clone);
    } catch (InstanceNotFoundException e) {
      responder.sendError(404, "Dataset '" + clone + "' does not exist");
      return;
    }
  } else if (content != null && content.hasRemaining()) {
    try {
      properties = GSON.fromJson(Bytes.toString(content), DatasetProperties.class);
    } catch (Exception e) {
      responder.sendError(400, "Invalid properties: " + e.getMessage());
      return;
    }
  }
  try {
    getContext().getAdmin().createDataset(set, "fileSet", properties);
  } catch (InstanceConflictException e) {
    responder.sendError(409, "Dataset '" + set + "' already exists");
    return;
  }
  responder.sendStatus(200);
}

MapReduce over Files

WordCount is a simple word counting implementation in MapReduce. By default, it reads its input from the lines FileSet and writes its output to the counts FileSet. Alternatively, the names of the input and output dataset can also be given as runtime arguments:

/**
 * A simple word counter. It reads inputs from the "lines" FileSet and writes its output to
 * the "counts" FileSet. The input and output path can be configured as runtime arguments:
 * <ul>
 * <li>"dataset.lines.input.paths" for the input. Multiple paths can be given, separated by commas.</li>
 * <li>"dataset.counts.output.path" for the output.</li>
 * </ul>
 */
public class WordCount extends AbstractMapReduce {

  @Override
  public void configure() {
    setMapperResources(new Resources(1024));
    setReducerResources(new Resources(1024));
  }

  @Override
  public void initialize() throws Exception {
    MapReduceContext context = getContext();
    Job job = context.getHadoopJob();
    job.setMapperClass(Tokenizer.class);
    job.setReducerClass(Counter.class);
    job.setNumReduceTasks(1);

    String inputDataset = context.getRuntimeArguments().get("input");
    inputDataset = inputDataset != null ? inputDataset : "lines";

    String outputDataset = context.getRuntimeArguments().get("output");
    outputDataset = outputDataset != null ? outputDataset : "counts";

    context.addInput(Input.ofDataset(inputDataset));
    context.addOutput(Output.ofDataset(outputDataset));
  }
...

It is worth mentioning that nothing in WordCount is specifically programmed to use a FileSet. Instead of lines and counts, it could use any other dataset as long as the key and value types match.

Building and Starting

  • You can build the example as described in Building an Example Application.

  • Start CDAP (as described in Starting and Stopping CDAP).

  • Deploy the application, as described in Deploying an Application. For example, from the CDAP Local Sandbox home directory, use the Command Line Interface (CLI):

    $ cdap cli load artifact examples/FileSetExample/target/FileSetExample-5.0.0.jar
    
    Successfully added artifact with name 'FileSetExample'
    
    $ cdap cli create app FileSetExample FileSetExample 5.0.0 user
    
    Successfully created application
    
    > cdap cli load artifact examples\FileSetExample\target\FileSetExample-5.0.0.jar
    
    Successfully added artifact with name 'FileSetExample'
    
    > cdap cli create app FileSetExample FileSetExample 5.0.0 user
    
    Successfully created application
    
  • Once the application has been deployed, you can start its components, as described in Starting an Application, and detailed at the start of running the example.

  • Once all components are started, run the example.

  • When finished, you can stop and remove the application.

Running the Example

Starting the Service

  • Using the CDAP UI, go to the FileSetExample application overview page, programs tab, click FileSetService to get to the service detail page, then click the Start button; or

  • From the CDAP Local Sandbox home directory, use the Command Line Interface:

    $ cdap cli start service FileSetExample.FileSetService
    
    Successfully started service 'FileSetService' of application 'FileSetExample' with stored runtime arguments '{}'
    
    > cdap cli start service FileSetExample.FileSetService
    
    Successfully started service 'FileSetService' of application 'FileSetExample' with stored runtime arguments '{}'
    

Uploading and Downloading Files

First, we will upload a text file (some.txt) that we will use as input for the WordCount. This is done by making a RESTful call to the FileSetService. A sample text file (lines.txt) is included in the resources directory of the example. From within the CDAP CLI:

cdap > call service FileSetExample.FileSetService PUT lines?path=some.txt body:file examples/FileSetExample/resources/lines.txt

< 200 OK
< Content-Length: 0
< Connection: keep-alive
< Content-Type: text/plain

Now, we start the MapReduce program and configure it to use the file some.txt as its input, and to write its output to counts.out:

cdap > start mapreduce FileSetExample.WordCount "dataset.lines.input.paths=some.txt dataset.counts.output.path=counts.out"

Successfully started MapReduce program 'WordCount' of application 'FileSetExample'
with provided runtime arguments 'dataset.lines.input.paths=some.txt dataset.counts.output.path=counts.out'

Check the status of the MapReduce program until it is completed:

cdap > get mapreduce status FileSetExample.WordCount

STOPPED

and you can download the results of the computation:

cdap > call service FileSetExample.FileSetService GET counts?path=counts.out/part-r-00000

< 200 OK
< Content-Length: 60
a:1
five:1
hello:4
is:1
letter:1
say:1
the:1
word:2
world:1

Note that we have to download a part file that is under the output path that was specified for the MapReduce program. This is because in MapReduce, every reducer writes a separate part file into the output directory. In this case, as we have fixed the number of reducers to one, there is only a single part file to download.

Dataset Management Operations

Instead of using the default input and output datasets that are created when the application is deployed, we can also create additional file sets using the FileSetService. For example, we can create an input dataset named myin using the create endpoint:

cdap > call service FileSetExample.FileSetService POST /myin/create body "{ 'properties': { 'input.format' : 'org.apache.hadoop.mapreduce.lib.input.TextInputFormat' } }"

The dataset properties for the new file set are given in the body of the request. Alternatively, we can clone an existing dataset's properties to create an output file set named myout:

cdap > call service FileSetExample.FileSetService POST /myout/create?clone=counts

We can now run the example with these two datasets as input and output. First, upload a file to the input dataset:

cdap > call service FileSetExample.FileSetService PUT myin?path=some.txt body:file examples/FileSetExample/resources/lines.txt

Now start the MapReduce and provide extra runtime arguments to specify the input and output dataset:

cdap > start mapreduce FileSetExample.WordCount "input=myin output=myout dataset.myin.input.paths=some.txt dataset.myout.output.path=counts.out"

Then we can retrieve the output of the MapReduce as previously:

cdap > call service FileSetExample.FileSetService GET myout?path=counts.out/part-r-00000

Finally, we can delete the two datasets that we created:

cdap > call service FileSetExample.FileSetService POST /myin/drop
cdap > call service FileSetExample.FileSetService POST /myout/drop

Stopping and Removing the Application

Once done, you can stop the application—if it hasn't stopped already—as described in Stopping an Application. Here is an example-specific description of the steps:

Stopping the Service

  • Using the CDAP UI, go to the FileSetExample application overview page, programs tab, click FileSetService to get to the service detail page, then click the Stop button; or

  • From the CDAP Local Sandbox home directory, use the Command Line Interface:

    $ cdap cli stop service FileSetExample.FileSetService
    
    Successfully stopped service 'FileSetService' of application 'FileSetExample'
    
    > cdap cli stop service FileSetExample.FileSetService
    
    Successfully stopped service 'FileSetService' of application 'FileSetExample'
    

Removing the Application

You can now remove the application as described in Removing an Application, or:

  • Using the CDAP UI, go to the FileSetExample application overview page, programs tab, click the Actions menu on the right side and select Manage to go to the Management pane for the application, then click the Actions menu on the right side and select Delete to delete the application; or

  • From the CDAP Local Sandbox home directory, use the Command Line Interface:

    $ cdap cli delete app FileSetExample
    
    > cdap cli delete app FileSetExample