🔗Data Cleansing

A Cask Data Application Platform (CDAP) example demonstrating incrementally consuming partitions of a partitioned file set using MapReduce.

🔗Overview

This application has a MapReduce which processes records from one partitioned file set into another partitioned file set, while filtering records that do not match a particular schema.

  • The DataCleansingService writes to the rawRecords partitioned file set.
  • The DataCleansingMapReduce processes the records, while filtering 'unclean' or invalid records.

Let's look at some of these components, and then run the application and see the results.

🔗The Data Cleansing Application

As in the other examples, the components of the application are tied together by the class DataCleansing:

public class DataCleansing extends AbstractApplication {
  protected static final String NAME = "DataCleansing";
  protected static final String RAW_RECORDS = "rawRecords";
  protected static final String CLEAN_RECORDS = "cleanRecords";
  protected static final String INVALID_RECORDS = "invalidRecords";
  protected static final String CONSUMING_STATE = "consumingState";

  @Override
  public void configure() {
    setName(NAME);
    setDescription("Example data cleansing application");

    // Ingest and retrieve the data using a Service
    addService(new DataCleansingService());

    // Process the records from "rawRecords" partitioned file set using MapReduce
    addMapReduce(new DataCleansingMapReduce());

    // Store the state of the incrementally processing MapReduce
    createDataset(CONSUMING_STATE, KeyValueTable.class);

    // Create the "rawRecords" partitioned file set for storing the input records, 
    // configure it to work with MapReduce
    createDataset(RAW_RECORDS, PartitionedFileSet.class, PartitionedFileSetProperties.builder()
      // Properties for partitioning
      .setPartitioning(Partitioning.builder().addLongField("time").build())
      // Properties for file set
      .setInputFormat(TextInputFormat.class)
      .setDescription("Store input records")
      .build());

    createDataset(CLEAN_RECORDS, PartitionedFileSet.class, PartitionedFileSetProperties.builder()
      // Properties for partitioning
      .setPartitioning(Partitioning.builder().addLongField("time").addIntField("zip").build())
      // Properties for file set
      .setOutputFormat(TextOutputFormat.class)
      // Properties for Explore (to create a partitioned Hive table)
      .setEnableExploreOnCreate(true)
      .setExploreFormat("text")
      .setExploreFormatProperty("delimiter", "\n")
      .setExploreSchema("record STRING")
      .setDescription("Store clean records")
      .build());

    createDataset(INVALID_RECORDS, PartitionedFileSet.class, PartitionedFileSetProperties.builder()
      // Properties for partitioning
      .setPartitioning(Partitioning.builder().addLongField("time").build())
      // Properties for file set
      .setOutputFormat(TextOutputFormat.class)
      // Properties for Explore (to create a partitioned Hive table)
      .setEnableExploreOnCreate(true)
      .setExploreFormat("text")
      .setExploreFormatProperty("delimiter", "\n")
      .setExploreSchema("record STRING")
      .setDescription("Store invalid records")
      .build());
  }
}

🔗Data Storage

  • rawRecords input PartitionedFileSet of the DataCleansingMapReduce, contains any ingested records.
  • cleanRecords output PartitionedFileSet, contains only the filtered records.
  • consumingState stores the state of the DataCleansingMapReduce, such that in each run, it processes only new partitions.

🔗DataCleansingService

The service allows writing to the rawRecords PartitionedFileSet. It exposes this endpoint:

  • POST /records/raw allows for writing to a partition of the rawRecords dataset;

🔗MapReduce over PartitionedFileSet

DataCleansingMapReduce is a simple MapReduce that reads from the rawRecords PartitionedFileSet and writes to the cleanRecords PartitionedFileSet. The initialize() method prepares the MapReduce program:

  • It uses the PartitionBatchInput to specify the partitions to process as input, in order to only process new partitions since its last run.
  • It specifies the output partition that is written to, based upon the supplied runtime arguments.

🔗Building and Starting

  • You can build the example as described in Building an Example Application

  • Start CDAP (as described in Starting and Stopping CDAP).

  • Deploy the application, as described in Deploying an Application. For example, from the Standalone CDAP SDK directory, use the Command Line Interface (CLI):

    $ cdap cli load artifact examples/DataCleansing/target/DataCleansing-4.1.1.jar
    
    Successfully added artifact with name 'DataCleansing'
    
    $ cdap cli create app DataCleansing DataCleansing 4.1.1 user
    
    Successfully created application
    
    > cdap cli load artifact examples\DataCleansing\target\DataCleansing-4.1.1.jar
    
    Successfully added artifact with name 'DataCleansing'
    
    > cdap cli create app DataCleansing DataCleansing 4.1.1 user
    
    Successfully created application
    
  • Once the application has been deployed, you can start its components, as described in Starting an Application, and detailed at the start of running the example.

  • Once all components are started, run the example.

  • When finished, you can stop and remove the application.

🔗Running the Example

🔗Starting the Service

  • Using the CDAP UI, go to the DataCleansing application overview page, programs tab, click DataCleansingService to get to the service detail page, then click the Start button; or

  • From the Standalone CDAP SDK directory, use the Command Line Interface:

    $ cdap cli start service DataCleansing.DataCleansingService
    
    Successfully started service 'DataCleansingService' of application 'DataCleansing' with stored runtime arguments '{}'
    
    > cdap cli start service DataCleansing.DataCleansingService
    
    Successfully started service 'DataCleansingService' of application 'DataCleansing' with stored runtime arguments '{}'
    

🔗Ingesting Records

Begin by uploading a file containing some newline-separated JSON records into the rawRecords dataset:

$ cdap cli call service DataCleansing.DataCleansingService POST v1/records/raw body:file examples/DataCleansing/resources/person.json

< 200 OK
< Content-Length: 0
< Connection: keep-alive
< Content-Type: text/plain
> cdap.bat cli call service DataCleansing.DataCleansingService POST v1/records/raw body:file examples\DataCleansing\resources\person.json

< 200 OK
< Content-Length: 0
< Connection: keep-alive
< Content-Type: text/plain

🔗Starting the MapReduce

The MapReduce must be started with a runtime argument output.partition.key that specifies the output partition of the cleanRecords dataset to write to. In this example, we'll simply use 1 as the value.

  • Using the CDAP UI, go to the DataCleansing application overview page, programs tab, click DataCleansingMapReduce to get to the MapReduce detail page, set the runtime arguments using output.partition.key as the key and 1 as the value, then click the Start button; or

  • From the Standalone CDAP SDK directory, use the Command Line Interface:

    $ cdap cli start mapreduce DataCleansing.DataCleansingMapReduce output.partition.key=1
    
    Successfully started mapreduce 'DataCleansingMapReduce' of application 'DataCleansing'
    with provided runtime arguments 'output.partition.key=1'
    
    > cdap cli start mapreduce DataCleansing.DataCleansingMapReduce output.partition.key=1
    
    Successfully started mapreduce 'DataCleansingMapReduce' of application 'DataCleansing'
    with provided runtime arguments 'output.partition.key=1'
    

Optionally, to specify a custom schema to match records against, the JSON of the schema can be specified as an additional runtime argument to the MapReduce with the key 'schema.key'. Otherwise, this is the default schema that is matched against the records:

public static final Schema DEFAULT_SCHEMA = Schema.recordOf("person",
                                                            Schema.Field.of("pid", Schema.of(Schema.Type.LONG)),
                                                            Schema.Field.of("name", Schema.of(Schema.Type.STRING)),
                                                            Schema.Field.of("dob", Schema.of(Schema.Type.STRING)),
                                                            Schema.Field.of("zip", Schema.of(Schema.Type.INT)));

🔗Querying the Results

To sample the cleanRecords PartitionedFileSet, execute an explore query using the CDAP CLI:

$ cdap cli execute "\"SELECT record FROM dataset_cleanRecords where TIME = 1 LIMIT 5\""
> cdap cli execute "\"SELECT record FROM dataset_cleanRecords where TIME = 1 LIMIT 5\""

The records that are not filtered out (those that adhere to the given schema) will be displayed:

+======================================================================+
| record: STRING                                                       |
+======================================================================+
| {"pid":223986723,"name":"bob","dob":"02-12-1983","zip":"84125"}      |
| {"pid":001058370,"name":"jill","dob":"12-12-1963","zip":"84125"}     |
| {"pid":000150018,"name":"wendy","dob":"06-19-1987","zip":"84125"}    |
| {"pid":013587810,"name":"john","dob":"10-10-1991","zip":"84125"}     |
| {"pid":811638015,"name":"samantha","dob":"04-20-1965","zip":"84125"} |
+======================================================================+
Fetched 5 rows

This process—ingesting records, running the MapReduce job with a different output partition key, and requesting the filtered data—can be repeated. Each time, the MapReduce job will pickup and process only the newly ingested set of records.

🔗Stopping and Removing the Application

Once done, you can stop the application—if it hasn't stopped already—as described in Stopping an Application. Here is an example-specific description of the steps:

Stopping the Service

  • Using the CDAP UI, go to the DataCleansing application overview page, programs tab, click DataCleansingService to get to the service detail page, then click the Stop button; or

  • From the Standalone CDAP SDK directory, use the Command Line Interface:

    $ cdap cli stop service DataCleansing.DataCleansingService
    
    Successfully stopped service 'DataCleansingService' of application 'DataCleansing'
    
    > cdap cli stop service DataCleansing.DataCleansingService
    
    Successfully stopped service 'DataCleansingService' of application 'DataCleansing'
    

Removing the Application

You can now remove the application as described in Removing an Application, or:

  • Using the CDAP UI, go to the DataCleansing application overview page, programs tab, click the Actions menu on the right side and select Manage to go to the Management pane for the application, then click the Actions menu on the right side and select Delete to delete the application; or

  • From the Standalone CDAP SDK directory, use the Command Line Interface:

    $ cdap cli delete app DataCleansing
    
    > cdap cli delete app DataCleansing