Creating a Plugin

Action Plugin

An Action plugin runs arbitrary logic at the start or end of a batch data pipeline.

In order to implement an Action plugin, you extend the Action class. Only one method is required to be implemented:

run()

Methods

  • run(): Used to implement the functionality of the plugin.
  • configurePipeline(): Used to perform any validation on the application configuration that is required by this plugin or to create any streams or datasets if the fieldName for a stream or dataset is not a macro.

Example:

/**
 * Action that moves files from one fileset into another, optionally filtering files that match a regex.
 */
@Plugin(type = Action.PLUGIN_TYPE)
@Name(FilesetMoveAction.NAME)
@Description("Action that moves files from one fileset into another, optionally filtering files that match a regex.")
public class FilesetMoveAction extends Action {
  public static final String NAME = "FilesetMove";
  private final Conf config;

  /**
   * Config properties for the plugin.
   */
  public static class Conf extends PluginConfig {
    public static final String SOURCE_FILESET = "sourceFileset";
    public static final String DEST_FILESET = "destinationFileset";
    public static final String FILTER_REGEX = "filterRegex";

    @Name(SOURCE_FILESET)
    @Description("The fileset to move files from.")
    private String sourceFileset;

    @Name(DEST_FILESET)
    @Description("The fileset to move files to.")
    private String destinationFileset;

    @Nullable
    @Name(FILTER_REGEX)
    @Description("Filter any files whose name matches this regex. Defaults to '^\\.', which will filter any files " +
      "that begin with a period.")
    private String filterRegex;

    // set defaults for properties in a no-argument constructor.
    public Conf() {
      filterRegex = "^\\.";
    }
  }

  public FilesetMoveAction(Conf config) {
    this.config = config;
  }

  @Override
  public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
    Pattern.compile(config.filterRegex);
  }

  @Override
  public void run(ActionContext context) throws Exception {
    context.execute(new TxRunnable() {
      @Override
      public void run(DatasetContext context) throws Exception {
        FileSet sourceFileSet = context.getDataset(config.sourceFileset);
        FileSet destinationFileSet = context.getDataset(config.destinationFileset);

        Pattern pattern = Pattern.compile(config.filterRegex);

        for (Location sourceFile : sourceFileSet.getBaseLocation().list()) {
          if (pattern.matcher(sourceFile.getName()).find()) {
            continue;
          }
          Location destFile = destinationFileSet.getBaseLocation().append(sourceFile.getName());
          sourceFile.renameTo(destFile);
        }
      }
    });
  }
}

Post-run Action Plugin

A PostAction plugin runs arbitrary logic after the end of a pipeline run. It can be set to execute based on whether the run completed successfully, if it failed, or in either case.

The difference between a PostAction—and an Action that is placed at the end of a pipeline—is that a PostAction will always be executed even if the pipeline run fails, while an Action will only be executed if every stage preceding it successfully runs.

In order to implement an Post-run Action plugin, you extend the PostAction class. Only one method is required to be implemented:

run()

Methods

  • run(): Used to implement the functionality of the plugin.
  • configurePipeline(): Used to perform any validation on the application configuration that is required by this plugin or to create any streams or datasets if the fieldName for a stream or dataset is not a macro.

Example:

/**
 * Post run action that deletes files in a FileSet that match a configurable regex.
 */
@Plugin(type = PostAction.PLUGIN_TYPE)
@Name(FilesetDeletePostAction.NAME)
@Description("Post run action that deletes files in a FileSet that match a configurable regex if the run succeeded.")
public class FilesetDeletePostAction extends PostAction {
  public static final String NAME = "FilesetDelete";
  private final Conf config;

  /**
   * Config properties for the plugin.
   */
  public static class Conf extends PluginConfig {
    public static final String FILESET_NAME = "filesetName";
    public static final String DELETE_REGEX = "deleteRegex";
    public static final String DIRECTORY = "directory";

    @Name(FILESET_NAME)
    @Description("The fileset to delete files from.")
    private String filesetName;

    @Name(DELETE_REGEX)
    @Description("Delete files that match this regex.")
    private String deleteRegex;

    // Macro enabled properties can be set to a placeholder value ${key} when the pipeline is deployed.
    // At runtime, the value for 'key' can be given and substituted in.
    @Macro
    @Name(DIRECTORY)
    @Description("The fileset directory to delete files from.")
    private String directory;
  }

  public FilesetDeletePostAction(Conf config) {
    this.config = config;
  }

  @Override
  public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
    Pattern.compile(config.deleteRegex);
  }

  @Override
  public void run(BatchActionContext context) throws Exception {
    if (!context.isSuccessful()) {
      return;
    }

    FileSet fileSet = context.getDataset(config.filesetName);
    Pattern pattern = Pattern.compile(config.deleteRegex);
    for (Location fileLocation : fileSet.getBaseLocation().append(config.directory).list()) {
      if (pattern.matcher(fileLocation.getName()).find()) {
        fileLocation.delete();
      }
    }
  }
}

Batch Source Plugin

A BatchSource plugin is used as a source of a batch data pipeline. It is used to prepare and configure the input of a pipeline run.

In order to implement a Batch Source, you extend the BatchSource class. You need to define the types of the KEY and VALUE that the Batch Source will receive and the type of object that the Batch Source will emit to the subsequent stage (which could be either a Transformation or a Batch Sink). After defining the types, only one method is required to be implemented:

prepareRun()

Methods

  • prepareRun(): Used to configure the input for each run of the pipeline. If the fieldName for a stream or dataset is a macro, their creation will happen during this stage. This is called by the client that will submit the job for the pipeline run.
  • onRunFinish(): Used to run any required logic at the end of a pipeline run. This is called by the client that submitted the job for the pipeline run.
  • configurePipeline(): Used to perform any validation on the application configuration that is required by this plugin or to create any streams or datasets if the fieldName for a stream or dataset is not a macro.
  • initialize(): Initialize the Batch Source. Guaranteed to be executed before any call to the plugin’s transform method. This is called by each executor of the job. For example, if the MapReduce engine is being used, each mapper will call this method.
  • destroy(): Destroy any resources created by initialize. Guaranteed to be executed after all calls to the plugin’s transform method have been made. This is called by each executor of the job. For example, if the MapReduce engine is being used, each mapper will call this method.
  • transform(): This method will be called for every input key-value pair generated by the batch job. By default, the value is emitted to the subsequent stage.

Example:

/**
 * Batch Source that reads from a FileSet that has its data formatted as text.
 *
 * LongWritable is the first parameter because that is the key used by Hadoop's {@link TextInputFormat}.
 * Similarly, Text is the second parameter because that is the value used by Hadoop's {@link TextInputFormat}.
 * {@link StructuredRecord} is the third parameter because that is what the source will output.
 * All the plugins included with Hydrator operate on StructuredRecord.
 */
@Plugin(type = BatchSource.PLUGIN_TYPE)
@Name(TextFileSetSource.NAME)
@Description("Reads from a FileSet that has its data formatted as text.")
public class TextFileSetSource extends BatchSource<LongWritable, Text, StructuredRecord> {
  public static final String NAME = "TextFileSet";
  public static final Schema OUTPUT_SCHEMA = Schema.recordOf(
    "textRecord",
    Schema.Field.of("position", Schema.of(Schema.Type.LONG)),
    Schema.Field.of("text", Schema.of(Schema.Type.STRING))
  );
  private final Conf config;

  /**
   * Config properties for the plugin.
   */
  public static class Conf extends PluginConfig {
    public static final String FILESET_NAME = "fileSetName";
    public static final String FILES = "files";
    public static final String CREATE_IF_NOT_EXISTS = "createIfNotExists";
    public static final String DELETE_INPUT_ON_SUCCESS = "deleteInputOnSuccess";

    // The name annotation tells CDAP what the property name is. It is optional, and defaults to the variable name.
    // Note:  only primitives (including boxed types) and string are the types that are supported
    @Name(FILESET_NAME)
    @Description("The name of the FileSet to read from.")
    private String fileSetName;

    // Macro enabled properties can be set to a placeholder value ${key} when the pipeline is deployed.
    // At runtime, the value for 'key' can be given and substituted in.
    @Macro
    @Name(FILES)
    @Description("A comma separated list of files in the FileSet to read.")
    private String files;

    // A nullable fields tells CDAP that this is an optional field.
    @Nullable
    @Name(CREATE_IF_NOT_EXISTS)
    @Description("Whether to create the FileSet if it doesn't already exist. Defaults to false.")
    private Boolean createIfNotExists;

    @Nullable
    @Name(DELETE_INPUT_ON_SUCCESS)
    @Description("Whether to delete the data read by the source after the run succeeds. Defaults to false.")
    private Boolean deleteInputOnSuccess;

    // Use a no-args constructor to set field defaults.
    public Conf() {
      createIfNotExists = false;
      deleteInputOnSuccess = false;
    }
  }

  // CDAP will pass in a config with its fields populated based on the configuration given when creating the pipeline.
  public TextFileSetSource(Conf config) {
    this.config = config;
  }

  // configurePipeline is called exactly once when the pipeline is being created.
  // Any static configuration should be performed here.
  @Override
  public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
    // if the user has set createIfNotExists to true, create the FileSet here.
    if (config.createIfNotExists) {
      pipelineConfigurer.createDataset(config.fileSetName,
                                       FileSet.class,
                                       FileSetProperties.builder()
                                         .setInputFormat(TextInputFormat.class)
                                         .setOutputFormat(TextOutputFormat.class)
                                         .setEnableExploreOnCreate(true)
                                         .setExploreFormat("text")
                                         .setExploreSchema("text string")
                                         .build()
      );
    }
    // set the output schema of this stage so that stages further down the pipeline will know their input schema.
    pipelineConfigurer.getStageConfigurer().setOutputSchema(OUTPUT_SCHEMA);
  }

  // prepareRun is called before every pipeline run, and is used to configure what the input should be,
  // as well as any arguments the input should use. It is called by the client that is submitting the batch job.
  @Override
  public void prepareRun(BatchSourceContext context) throws IOException {
    Map<String, String> arguments = new HashMap<>();
    FileSetArguments.setInputPaths(arguments, config.files);
    context.setInput(Input.ofDataset(config.fileSetName, arguments));
  }

  // onRunFinish is called at the end of the pipeline run by the client that submitted the batch job.
  @Override
  public void onRunFinish(boolean succeeded, BatchSourceContext context) {
    // perform any actions that should happen at the end of the run.
    // in our case, we want to delete the data read during this run if the run succeeded.
    if (succeeded && config.deleteInputOnSuccess) {
      Map<String, String> arguments = new HashMap<>();
      FileSetArguments.setInputPaths(arguments, config.files);
      FileSet fileSet = context.getDataset(config.fileSetName, arguments);
      for (Location inputLocation : fileSet.getInputLocations()) {
        try {
          inputLocation.delete(true);
        } catch (IOException e) {
          throw new RuntimeException(e);
        }
      }
    }
  }

  // initialize is called by each job executor before any call to transform is made.
  // This occurs at the start of the batch job run, after the job has been successfully submitted.
  // For example, if mapreduce is the execution engine, each mapper will call initialize at the start of the program.
  @Override
  public void initialize(BatchRuntimeContext context) throws Exception {
    super.initialize(context);
    // create any resources required by transform()
  }

  // destroy is called by each job executor at the end of its life.
  // For example, if mapreduce is the execution engine, each mapper will call destroy at the end of the program.
  @Override
  public void destroy() {
    // clean up any resources created by initialize
  }

  // transform is used to transform the key-value pair output by the input into objects output by this source.
  // The output should be a StructuredRecord if you want the source to be compatible with the plugins included
  // with Hydrator.
  @Override
  public void transform(KeyValue<LongWritable, Text> input, Emitter<StructuredRecord> emitter) throws Exception {
    emitter.emit(StructuredRecord.builder(OUTPUT_SCHEMA)
                   .set("position", input.getKey().get())
                   .set("text", input.getValue().toString())
                   .build()
    );
  }
}

Lineage

For plugins that fetch data from non-CDAP sources, the lineage is registered using the inputName provided when setInput() is invoked on BatchSourceContext in prepareRun(). Note that the inputName should be a valid DatasetId. For example:

@Override
public void prepareRun(BatchSourceContext context) throws Exception {
  ...
  context.setInput(Input.of("myExternalSource", myInputFormatProvider));
}

Lineage will be tracked using myExternalSource.

Batch Sink Plugin

A BatchSink plugin is used to write data in either batch or real-time data pipelines. It is used to prepare and configure the output of a batch of data from a pipeline run.

In order to implement a Batch Sink, you extend the BatchSink class. Similar to a Batch Source, you need to define the types of the KEY and VALUE that the Batch Sink will write in the Batch job and the type of object that it will accept from the previous stage (which could be either a Transformation or a Batch Source).

After defining the types, only one method is required to be implemented:

prepareRun()

Methods

  • prepareRun(): Used to configure the output for each run of the pipeline. This is called by the client that will submit the job for the pipeline run.
  • onRunFinish(): Used to run any required logic at the end of a pipeline run. This is called by the client that submitted the job for the pipeline run.
  • configurePipeline(): Used to perform any validation on the application configuration that is required by this plugin or to create any streams or datasets if the fieldName for a stream or dataset is not a macro.
  • initialize(): Initialize the Batch Sink. Guaranteed to be executed before any call to the plugin’s transform method. This is called by each executor of the job. For example, if the MapReduce engine is being used, each mapper will call this method.
  • destroy(): Destroy any resources created by initialize. Guaranteed to be executed after all calls to the plugin’s transform method have been made. This is called by each executor of the job. For example, if the MapReduce engine is being used, each mapper will call this method.
  • transform(): This method will be called for every object that is received from the previous stage. The logic inside the method will transform the object to the key-value pair expected by the Batch Sink's output format. If you don't override this method, the incoming object is set as the key and the value is set to null.

Example:

/**
 * Batch Sink that writes to a FileSet in text format.
 * Each record will be written as a single line, with record fields separated by a configurable separator.
 *
 * StructuredRecord is the first parameter because that is the input to the sink.
 * The second and third parameters are the key and value expected by Hadoop's {@link TextOutputFormat}.
 */
@Plugin(type = BatchSink.PLUGIN_TYPE)
@Name(TextFileSetSink.NAME)
@Description("Writes to a FileSet in text format.")
public class TextFileSetSink extends BatchSink<StructuredRecord, NullWritable, Text> {
  public static final String NAME = "TextFileSet";
  private final Conf config;

  /**
   * Config properties for the plugin.
   */
  public static class Conf extends PluginConfig {
    public static final String FILESET_NAME = "fileSetName";
    public static final String OUTPUT_DIR = "outputDir";
    public static final String FIELD_SEPARATOR = "fieldSeparator";

    // The name annotation tells CDAP what the property name is. It is optional, and defaults to the variable name.
    // Note:  only primitives (including boxed types) and string are the types that are supported
    @Name(FILESET_NAME)
    @Description("The name of the FileSet to write to.")
    private String fileSetName;

    // Macro enabled properties can be set to a placeholder value ${key} when the pipeline is deployed.
    // At runtime, the value for 'key' can be given and substituted in.
    @Macro
    @Name(OUTPUT_DIR)
    @Description("The FileSet directory to write to.")
    private String outputDir;

    @Nullable
    @Name(FIELD_SEPARATOR)
    @Description("The separator to use to join input record fields together. Defaults to ','.")
    private String fieldSeparator;

    // Use a no-args constructor to set field defaults.
    public Conf() {
      fileSetName = "";
      fieldSeparator = ",";
    }
  }

  // CDAP will pass in a config with its fields populated based on the configuration given when creating the pipeline.
  public TextFileSetSink(Conf config) {
    this.config = config;
  }

  // configurePipeline is called exactly once when the pipeline is being created.
  // Any static configuration should be performed here.
  @Override
  public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
    // create the FileSet here.
    pipelineConfigurer.createDataset(config.fileSetName,
                                     FileSet.class,
                                     FileSetProperties.builder()
                                       .setInputFormat(TextInputFormat.class)
                                       .setOutputFormat(TextOutputFormat.class)
                                       .setEnableExploreOnCreate(true)
                                       .setExploreFormat("text")
                                       .setExploreSchema("text string")
                                       .build()
    );
  }

  // prepareRun is called before every pipeline run, and is used to configure what the input should be,
  // as well as any arguments the input should use. It is called by the client that is submitting the batch job.
  @Override
  public void prepareRun(BatchSinkContext context) throws Exception {
    Map<String, String> arguments = new HashMap<>();
    FileSetArguments.setOutputPath(arguments, config.outputDir);
    context.addOutput(Output.ofDataset(config.fileSetName, arguments));
  }

  // onRunFinish is called at the end of the pipeline run by the client that submitted the batch job.
  @Override
  public void onRunFinish(boolean succeeded, BatchSinkContext context) {
    // perform any actions that should happen at the end of the run.
  }

  // initialize is called by each job executor before any call to transform is made.
  // This occurs at the start of the batch job run, after the job has been successfully submitted.
  // For example, if mapreduce is the execution engine, each mapper will call initialize at the start of the program.
  @Override
  public void initialize(BatchRuntimeContext context) throws Exception {
    super.initialize(context);
    // create any resources required by transform()
  }

  // destroy is called by each job executor at the end of its life.
  // For example, if mapreduce is the execution engine, each mapper will call destroy at the end of the program.
  @Override
  public void destroy() {
    // clean up any resources created by initialize
  }

  @Override
  public void transform(StructuredRecord input, Emitter<KeyValue<NullWritable, Text>> emitter) throws Exception {
    StringBuilder joinedFields = new StringBuilder();
    Iterator<Schema.Field> fieldIter = input.getSchema().getFields().iterator();
    if (!fieldIter.hasNext()) {
      // shouldn't happen
      return;
    }

    Object val = input.get(fieldIter.next().getName());
    if (val != null) {
      joinedFields.append(val);
    }
    while (fieldIter.hasNext()) {
      String fieldName = fieldIter.next().getName();
      joinedFields.append(config.fieldSeparator);
      val = input.get(fieldName);
      if (val != null) {
        joinedFields.append(val);
      }
    }
    emitter.emit(new KeyValue<>(NullWritable.get(), new Text(joinedFields.toString())));
  }

}

Lineage

For plugins that write data to non-CDAP sinks, the lineage is registered using the outputName provided when addOutput() is invoked on BatchSinkContext in prepareRun(). Note that the outputName should be a valid DatasetId. For example:

@Override
public void prepareRun(BatchSinkContext context) throws Exception {
  ...
  context.addOutput(Output.of("myExternalSink", myOutputFormatProvider));
}

Lineage will be tracked using myExternalSink.

Transformation Plugin

A Transform plugin is used to convert one input record into zero or more output records. It can be used in both batch and real-time data pipelines.

The only method that needs to be implemented is:

transform()

Methods

  • initialize(): Used to perform any initialization step that might be required during the runtime of the Transform. It is guaranteed that this method will be invoked before the transform method.
  • transform(): This method contains the logic that will be applied on each incoming data object. An emitter can be used to pass the results to the subsequent stage.
  • destroy(): Used to perform any cleanup before the plugin shuts down.

Below is an example of a StringCase that transforms specific fields to lowercase or uppercase.

/**
 * Transform that can transforms specific fields to lowercase or uppercase.
 */
@Plugin(type = Transform.PLUGIN_TYPE)
@Name(StringCaseTransform.NAME)
@Description("Transforms configured fields to lowercase or uppercase.")
public class StringCaseTransform extends Transform<StructuredRecord, StructuredRecord> {
  public static final String NAME = "StringCase";
  private final Conf config;
  private Set<String> upperFields;
  private Set<String> lowerFields;

  /**
   * Config properties for the plugin.
   */
  public static class Conf extends PluginConfig {
    public static final String UPPER_FIELDS = "upperFields";
    public static final String LOWER_FIELDS = "lowerFields";
    private static final Pattern SPLIT_ON = Pattern.compile("\\s*,\\s*");

    // nullable means this property is optional
    @Nullable
    @Name(UPPER_FIELDS)
    @Description("A comma separated list of fields to uppercase. Each field must be of type String.")
    private String upperFields;

    @Nullable
    @Name(LOWER_FIELDS)
    @Description("A comma separated list of fields to lowercase. Each field must be of type String.")
    private String lowerFields;

    private Set<String> getUpperFields() {
      return parseToSet(upperFields);
    }

    private Set<String> getLowerFields() {
      return parseToSet(lowerFields);
    }

    private Set<String> parseToSet(String str) {
      Set<String> set = new HashSet<>();
      if (str == null || str.isEmpty()) {
        return set;
      }
      for (String element : SPLIT_ON.split(str)) {
        set.add(element);
      }
      return set;
    }
  }

  public StringCaseTransform(Conf config) {
    this.config = config;
  }

  // configurePipeline is called only once, when the pipeline is deployed. Static validation should be done here.
  @Override
  public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
    StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
    // the output schema is always the same as the input schema
    Schema inputSchema = stageConfigurer.getInputSchema();

    // if schema is null, that means it is either not known until runtime, or it is variable
    if (inputSchema != null) {
      // if the input schema is constant and known at configure time, check that all configured fields are strings
      for (String fieldName : config.getUpperFields()) {
        validateFieldIsString(inputSchema, fieldName);
      }
      for (String fieldName : config.getLowerFields()) {
        validateFieldIsString(inputSchema, fieldName);
      }
    }

    stageConfigurer.setOutputSchema(inputSchema);
  }

  // initialize is called once at the start of each pipeline run
  @Override
  public void initialize(TransformContext context) throws Exception {
    upperFields = config.getUpperFields();
    lowerFields = config.getLowerFields();
  }

  // transform is called once for each record that goes into this stage
  @Override
  public void transform(StructuredRecord record, Emitter<StructuredRecord> emitter) throws Exception {
    StructuredRecord.Builder builder = StructuredRecord.builder(record.getSchema());
    for (Schema.Field field : record.getSchema().getFields()) {
      String fieldName = field.getName();
      if (upperFields.contains(fieldName)) {
        builder.set(fieldName, record.get(fieldName).toString().toUpperCase());
      } else if (lowerFields.contains(fieldName)) {
        builder.set(fieldName, record.get(fieldName).toString().toLowerCase());
      } else {
        builder.set(fieldName, record.get(fieldName));
      }
    }
    emitter.emit(builder.build());
  }

  private void validateFieldIsString(Schema schema, String fieldName) {
    Schema.Field inputField = schema.getField(fieldName);
    if (inputField == null) {
      throw new IllegalArgumentException(
        String.format("Field '%s' does not exist in input schema %s.", fieldName, schema));
    }
    Schema fieldSchema = inputField.getSchema();
    Schema.Type fieldType = fieldSchema.isNullable() ? fieldSchema.getNonNullable().getType() : fieldSchema.getType();
    if (fieldType != Schema.Type.STRING) {
      throw new IllegalArgumentException(
        String.format("Field '%s' is of illegal type %s. Must be of type %s.",
                      fieldName, fieldType, Schema.Type.STRING));
    }
  }
}

If you wanted, you could add to the transform method a user metric indicating the number of fields changed. The user metrics can be queried by using the CDAP Metrics HTTP RESTful API:

public void transform(StructuredRecord input, Emitter<StructuredRecord> emitter) throws Exception {
  int fieldsChanged = 0;
  . . .
    builder.set(fieldName, record.get(fieldName). . .
    fieldsChanged += 1;
  . . .
  getContext().getMetrics().count("fieldsChanged", fieldsChanged);
}

Error Transformation Plugin

An ErrorTransform plugin is a special type of Transform that consumes error records emitted from the previous stages instead of output records. It is used to transform an ErrorRecord to zero or more output records. In addition to the actual error object, an ErrorRecord exposes the stage the error was emitted from, an error code, and an error message. Errors can be emitted by BatchSource, Transform, and BatchAggregator plugins using the Emitter they receive. An ErrorTransform can be used in both batch and real-time data pipelines.

The only method that needs to be implemented is:

transform()

Methods

  • initialize(): Used to perform any initialization step that might be required during the runtime of the ErrorTransform. It is guaranteed that this method will be invoked before the transform method.
  • transform(): This method contains the logic that will be applied on each incoming ErrorRecord object. An emitter can be used to pass the results to the subsequent stage.
  • destroy(): Used to perform any cleanup before the plugin shuts down.

Below is an example of an ErrorCollector that adds the error stage, code, and message to each record it receives.

/**
 * Adds the error code and error message to each record, then emits it.
 */
@Plugin(type = ErrorTransform.PLUGIN_TYPE)
@Name("ErrorCollector")
public class ErrorCollector extends ErrorTransform<StructuredRecord, StructuredRecord> {
  private final Config config;

  public ErrorCollector(Config config) {
    this.config = config;
  }

  @Override
  public void configurePipeline(PipelineConfigurer pipelineConfigurer) throws IllegalArgumentException {
    Schema inputSchema = pipelineConfigurer.getStageConfigurer().getInputSchema();
    if (inputSchema != null) {
      if (inputSchema.getField(config.messageField) != null) {
        throw new IllegalArgumentException(String.format(
          "Input schema already contains message field %s. Please set messageField to a different value.",
          config.messageField));
      }
      if (inputSchema.getField(config.codeField) != null) {
        throw new IllegalArgumentException(String.format(
          "Input schema already contains code field %s. Please set codeField to a different value.",
          config.codeField));
      }
      if (inputSchema.getField(config.stageField) != null) {
        throw new IllegalArgumentException(String.format(
          "Input schema already contains stage field %s. Please set stageField to a different value.",
          config.stageField));
      }
      Schema outputSchema = getOutputSchema(config, inputSchema);
      pipelineConfigurer.getStageConfigurer().setOutputSchema(outputSchema);
    }
  }

  @Override
  public void transform(ErrorRecord<StructuredRecord> input, Emitter<StructuredRecord> emitter) throws Exception {
    StructuredRecord invalidRecord = input.getRecord();
    StructuredRecord.Builder output = StructuredRecord.builder(getOutputSchema(config, invalidRecord.getSchema()));
    for (Schema.Field field : invalidRecord.getSchema().getFields()) {
      output.set(field.getName(), invalidRecord.get(field.getName()));
    }
    if (config.messageField != null) {
      output.set(config.messageField, input.getErrorMessage());
    }
    if (config.codeField != null) {
      output.set(config.codeField, input.getErrorCode());
    }
    if (config.stageField != null) {
      output.set(config.stageField, input.getStageName());
    }
    emitter.emit(output.build());
  }

  private static Schema getOutputSchema(Config config, Schema inputSchema) {
    List<Schema.Field> fields = new ArrayList<>();
    fields.addAll(inputSchema.getFields());
    if (config.messageField != null) {
      fields.add(Schema.Field.of(config.messageField, Schema.of(Schema.Type.STRING)));
    }
    if (config.codeField != null) {
      fields.add(Schema.Field.of(config.codeField, Schema.of(Schema.Type.INT)));
    }
    if (config.stageField != null) {
      fields.add(Schema.Field.of(config.stageField, Schema.of(Schema.Type.STRING)));
    }
    return Schema.recordOf("error" + inputSchema.getRecordName(), fields);
  }

  /**
   * The plugin config
   */
  public static class Config extends PluginConfig {
    @Nullable
    @Description("The name of the error message field to use in the output schema. " +
      "If this not specified, the error message will be dropped.")
    private String messageField;

    @Nullable
    @Description("The name of the error code field to use in the output schema. " +
      "If this not specified, the error code will be dropped.")
    private String codeField;

    @Nullable
    @Description("The name of the error stage field to use in the output schema. " +
      "If this not specified, the error stage will be dropped.")
    private String stageField;

  }
}

Alert Publisher Plugin

An AlertPublisher plugin is a special type of plugin that consumes alerts emitted from previous stages instead of output records. Alerts are meant to be uncommon events that need to be acted on in some other program. Alerts contain a payload, which is just a map of strings containing any relevant data. An alert publisher is responsible for writing the alerts to some system, where it can be read and acted upon by some external program. For example, a plugin may write alerts to Kafka. Alerts may not be published immediately after they are emitted. It is up to the processing engine to decide when to publish alerts.

The only method that needs to be implemented is:

publish(Iterator<Alert> alerts)

Methods

  • initialize(): Used to perform any initialization step that might be required during the runtime of the AlertPublisher. It is guaranteed that this method will be invoked before the publish method.
  • publish(): This method contains the logic that will publish each incoming Alert.
  • destroy(): Used to perform any cleanup before the plugin shuts down.

Script Transformations

In the script transformations (JavaScriptTransform, PythonEvaluator, and the ValidatorTransform), a ScriptContext object is passed to the transform() method:

function transform(input, context);

The different Transforms that are passed this context object have similar signatures:

Transform Signature
JavaScriptTransform {{function transform(input, emitter, context)}}
PythonEvaluator {{function transform(input, emitter, context)}}
ValidatorTransform {{function isValid(input, context)}}

The ScriptContext has these methods:

public Logger getLogger();
public StageMetrics getMetrics();
public ScriptLookup getLookup(String table);

The context passed by the ValidatorTransform has an additional method that returns a validator:

public Object getValidator(String validatorName);

These methods allow access within the script to CDAP loggers, metrics, lookup tables, and the validator object.

Logger

Logger is an org.slf4j.Logger.

For example, a JavaScript transform step can access and write to the debug log with:

context.getLogger().debug('Received record with id ' + input.id);

StageMetrics

StageMetrics has these methods:

  • count(String metricName, int delta): Increases the value of the specific metric by delta. Metrics name will be prefixed by the stage ID, hence it will be aggregated for the current stage.
  • gauge(String metricName, long value): Sets the specific metric to the provided value. Metrics name will be prefixed by the stage ID, hence it will be aggregated for the current stage.
  • pipelineCount(String metricName, int delta): Increases the value of the specific metric by delta. Metrics emitted will be aggregated for the entire pipeline.
  • pipelineGauge(String metricName, long value): Sets the specific metric to the provided value. Metrics emitted will be aggregated for the entire pipeline.

ScriptLookup

Currently, ScriptContext.getLookup(String table) only supports key-value tables.

For example, if a lookup table purchases is configured, then you will be able to perform operations with that lookup table in your script: context.getLookup('purchases').lookup('key')

Validator Object

For example, in a validator transform, you can retrieve the validator object and call its functions as part of your JavaScript:

var coreValidator = context.getValidator("coreValidator");
if (!coreValidator.isDate(input.date)) {
. . .

Batch Aggregator Plugin

A BatchAggregator plugin is used to compute aggregates over a batch of data. It is used in both batch and real-time data pipelines. An aggregation takes place in two steps: groupBy and then aggregate.

  • In the groupBy step, the aggregator creates zero or more group keys for each input record. Before the aggregate step occurs, the CDAP pipeline will take all records that have the same group key, and collect them into a group. If a record does not have any of the group keys, it is filtered out. If a record has multiple group keys, it will belong to multiple groups.
  • The aggregate step is then called. In this step, the plugin receives group keys and all records that had that group key. It is then left to the plugin to decide what to do with each of the groups.

In order to implement a Batch Aggregator, you extend the BatchAggregator class. Unlike a Transform, which operates on a single record at a time, a BatchAggregator operates on a collection of records.

Methods

  • configurePipeline(): Used to perform any validation on the application configuration that is required by this plugin or to create any streams or datasets if the fieldName for a stream or dataset is not a macro.
  • initialize(): Initialize the Batch Aggregator. Guaranteed to be executed before any call to the plugin’s groupBy or aggregate methods. This is called by each executor of the job. For example, if the MapReduce engine is being used, each mapper will call this method.
  • destroy(): Destroy any resources created by initialize. Guaranteed to be executed after all calls to the plugin’s groupBy or aggregate methods have been made. This is called by each executor of the job. For example, if the MapReduce engine is being used, each mapper will call this method.
  • groupBy(): This method will be called for every object that is received from the previous stage. This method returns zero or more group keys for each object it recieves. Objects with the same group key will be grouped together for the aggregate method.
  • aggregate(): The method is called after every object has been assigned their group keys. This method is called once for each group key emitted by the groupBy method. The method recieves a group key as well as an iterator over all objects that had that group key. Objects emitted in this method are the output for this stage.

Example:

/**
 * Aggregator that counts how many times each word appears in records input to the aggregator.
 */
@Plugin(type = BatchAggregator.PLUGIN_TYPE)
@Name(WordCountAggregator.NAME)
@Description("Counts how many times each word appears in all records input to the aggregator.")
public class WordCountAggregator extends BatchAggregator<String, StructuredRecord, StructuredRecord> {
  public static final String NAME = "WordCount";
  public static final Schema OUTPUT_SCHEMA = Schema.recordOf(
    "wordCount",
    Schema.Field.of("word", Schema.of(Schema.Type.STRING)),
    Schema.Field.of("count", Schema.of(Schema.Type.LONG))
  );
  private static final Pattern WHITESPACE = Pattern.compile("\\s");
  private final Conf config;

  /**
   * Config properties for the plugin.
   */
  public static class Conf extends PluginConfig {
    @Description("The field from the input records containing the words to count.")
    private String field;
  }

  public WordCountAggregator(Conf config) {
    this.config = config;
  }

  @Override
  public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
    // any static configuration validation should happen here.
    // We will check that the field is in the input schema and is of type string.
    Schema inputSchema = pipelineConfigurer.getStageConfigurer().getInputSchema();
    // a null input schema means its unknown until runtime, or its not constant
    if (inputSchema != null) {
      // if the input schema is constant and known at configure time, check that the input field exists and is a string.
      Schema.Field inputField = inputSchema.getField(config.field);
      if (inputField == null) {
        throw new IllegalArgumentException(
          String.format("Field '%s' does not exist in input schema %s.", config.field, inputSchema));
      }
      Schema fieldSchema = inputField.getSchema();
      Schema.Type fieldType = fieldSchema.isNullable() ? fieldSchema.getNonNullable().getType() : fieldSchema.getType();
      if (fieldType != Schema.Type.STRING) {
        throw new IllegalArgumentException(
          String.format("Field '%s' is of illegal type %s. Must be of type %s.",
                        config.field, fieldType, Schema.Type.STRING));
      }
    }
    // set the output schema so downstream stages will know their input schema.
    pipelineConfigurer.getStageConfigurer().setOutputSchema(OUTPUT_SCHEMA);
  }

  @Override
  public void groupBy(StructuredRecord input, Emitter<String> groupKeyEmitter) throws Exception {
    String val = input.get(config.field);
    if (val == null) {
      return;
    }

    for (String word : WHITESPACE.split(val)) {
      groupKeyEmitter.emit(word);
    }
  }

  @Override
  public void aggregate(String groupKey, Iterator<StructuredRecord> groupValues,
                        Emitter<StructuredRecord> emitter) throws Exception {
    long count = 0;
    while (groupValues.hasNext()) {
      groupValues.next();
      count++;
    }
    emitter.emit(StructuredRecord.builder(OUTPUT_SCHEMA).set("word", groupKey).set("count", count).build());
  }
}

Batch Joiner Plugin

A BatchJoiner plugin is used to join records over a batch of data. It can be used in both batch and real-time data pipelines. A join takes place in two steps: a joinOn step followed by a merge step.

  1. In the joinOn step, the joiner creates a join key for each input record. the CDAP pipeline will then take all records that have the same join key and collect them into a group.
  2. The merge step is then called. In this step, the plugin receives a list of all the records with same join key based on the type of join (either an inner or outer join). It is then up to the plugin to decide what to emit, in what becomes the final output of the stage.

To implement a Batch Joiner, you extend the BatchJoiner class. Unlike a Transform, which operates on a single record at a time, a BatchJoiner operates on a collection of records.

Methods

  • configurePipeline(): Used to create any streams or datasets, or perform any validation on the application configuration that is required by this plugin.
  • initialize(): Initialize the Batch Joiner. Guaranteed to be executed before any call to the plugin’s joinOn or merge methods. This is called by each executor of the job. For example, if the MapReduce engine is being used, each mapper will call this method.
  • prepareRun(): Prepares a pipeline run. This is run every time before a pipeline runs to help set up the run. Here you can set properties such as the number of partitions to use when joining and the join key class, if it is not known at compile time.
  • destroy(): Destroy any resources created by the initialize method. Guaranteed to be executed after all calls to the plugin’s joinOn or merge methods have been made. This is called by each executor of the job. For example, if the MapReduce engine is being used, each mapper will call this method.
  • joinOn(): This method will be called for every object that is received from the previous stage. This method returns a join key for each object it recieves. Objects with the same join key will be grouped together for the merge method.
  • getJoinConfig(): This method will be called by the CDAP Pipeline to find out the type of join to be performed. The config specifies which input stages are requiredInputs. Records from a required input will always be present in the merge() method. Records from a non-required input will only be present in the merge() method if they meet the join criteria. In other words, if there are no required inputs, a full outer join is performed. If all inputs are required inputs, an inner join is performed.
  • merge(): This method is called after each object has been assigned a join key. The method recieves a join key, an iterator over all objects with that join key, and the stage that emitted the object. Objects emitted by this method are the output for this stage.

Spark Compute Plugin

A SparkCompute plugin is used to transform a collection of input records into a collection of output records. It can be used in both batch and real-time data pipelines. It is similar to a Transform, except instead of transforming its input record by record, it transforms an entire collection. In a SparkCompute plugin, you are given access to anything you would be able to do in a Spark program.

In order to implement a Spark Compute Plugin, you extend the SparkCompute class.

Methods

  • configurePipeline(): Used to perform any validation on the application configuration that is required by this plugin or to create any streams or datasets if the fieldName for a stream or dataset is not a macro.
  • transform(): This method is given a Spark RDD (Resilient Distributed Dataset) containing every object that is received from the previous stage. This method then performs Spark operations on the input to transform it into an output RDD that will be sent to the next stage.

Example:

/**
 * SparkCompute plugin that counts how many times each word appears in records input to the compute stage.
 */
@Plugin(type = SparkCompute.PLUGIN_TYPE)
@Name(WordCountCompute.NAME)
@Description("Counts how many times each word appears in all records input to the aggregator.")
public class WordCountCompute extends SparkCompute<StructuredRecord, StructuredRecord> {
  public static final String NAME = "WordCount";
  public static final Schema OUTPUT_SCHEMA = Schema.recordOf(
    "wordCount",
    Schema.Field.of("word", Schema.of(Schema.Type.STRING)),
    Schema.Field.of("count", Schema.of(Schema.Type.LONG))
  );
  private final Conf config;

  /**
   * Config properties for the plugin.
   */
  public static class Conf extends PluginConfig {
    @Description("The field from the input records containing the words to count.")
    private String field;
  }

  public WordCountCompute(Conf config) {
    this.config = config;
  }

  @Override
  public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
    // any static configuration validation should happen here.
    // We will check that the field is in the input schema and is of type string.
    Schema inputSchema = pipelineConfigurer.getStageConfigurer().getInputSchema();
    if (inputSchema != null) {
      WordCount wordCount = new WordCount(config.field);
      wordCount.validateSchema(inputSchema);
    }
    // set the output schema so downstream stages will know their input schema.
    pipelineConfigurer.getStageConfigurer().setOutputSchema(OUTPUT_SCHEMA);
  }

  @Override
  public JavaRDD<StructuredRecord> transform(SparkExecutionPluginContext sparkExecutionPluginContext,
                                             JavaRDD<StructuredRecord> javaRDD) throws Exception {
    WordCount wordCount = new WordCount(config.field);
    return wordCount.countWords(javaRDD)
      .flatMap(new FlatMapFunction<Tuple2<String, Long>, StructuredRecord>() {
        @Override
        public Iterable<StructuredRecord> call(Tuple2<String, Long> stringLongTuple2) throws Exception {
          List<StructuredRecord> output = new ArrayList<>();
          output.add(StructuredRecord.builder(OUTPUT_SCHEMA)
                       .set("word", stringLongTuple2._1())
                       .set("count", stringLongTuple2._2())
                       .build());
          return output;
        }
      });
  }
}

Spark Sink Plugin

A SparkSink plugin is used to perform computations on a collection of input records and optionally write output data. It can only be used in batch data pipelines. A SparkSink is similar to a SparkCompute plugin except that it has no output. In a SparkSink, you are given access to anything you would be able to do in a Spark program. For example, one common use case is to train a machine-learning model in this plugin.

In order to implement a Spark Sink Plugin, you extend the SparkSink class.

Methods

  • configurePipeline(): Used to perform any validation on the application configuration that is required by this plugin or to create any streams or datasets if the fieldName for a stream or dataset is not a macro.
  • run(): This method is given a Spark RDD (Resilient Distributed Dataset) containing every object that is received from the previous stage. This method then performs Spark operations on the input, and usually saves the result to a dataset.

Example:

/**
 * SparkSink plugin that counts how many times each word appears in records input to it and stores the result in
 * a KeyValueTable.
 */
@Plugin(type = SparkSink.PLUGIN_TYPE)
@Name(WordCountSink.NAME)
@Description("Counts how many times each word appears in all records input to the aggregator.")
public class WordCountSink extends SparkSink<StructuredRecord> {
  public static final String NAME = "WordCount";
  private final Conf config;

  /**
   * Config properties for the plugin.
   */
  public static class Conf extends PluginConfig {
    @Description("The field from the input records containing the words to count.")
    private String field;

    @Description("The name of the KeyValueTable to write to.")
    private String tableName;
  }

  public WordCountSink(Conf config) {
    this.config = config;
  }

  @Override
  public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
    // any static configuration validation should happen here.
    // We will check that the field is in the input schema and is of type string.
    Schema inputSchema = pipelineConfigurer.getStageConfigurer().getInputSchema();
    if (inputSchema != null) {
      WordCount wordCount = new WordCount(config.field);
      wordCount.validateSchema(inputSchema);
    }
    pipelineConfigurer.createDataset(config.tableName, KeyValueTable.class, DatasetProperties.EMPTY);
  }

  @Override
  public void prepareRun(SparkPluginContext sparkPluginContext) throws Exception {
    // no-op
  }

  @Override
  public void run(SparkExecutionPluginContext sparkExecutionPluginContext,
                  JavaRDD<StructuredRecord> javaRDD) throws Exception {
    WordCount wordCount = new WordCount(config.field);
    JavaPairRDD outputRDD = wordCount.countWords(javaRDD)
      .mapToPair(new PairFunction<Tuple2<String, Long>, byte[], byte[]>() {
        @Override
        public Tuple2<byte[], byte[]> call(Tuple2<String, Long> stringLongTuple2) throws Exception {
          return new Tuple2<>(Bytes.toBytes(stringLongTuple2._1()), Bytes.toBytes(stringLongTuple2._2()));
        }
      });
    sparkExecutionPluginContext.saveAsDataset(outputRDD, config.tableName);
  }
}

Streaming Source Plugin

A Streaming Source plugin is used as a source in real-time data pipelines. It is used to fetch a Spark DStream, which is an object that represents a collection of Spark RDDs and that produces a new RDD every batch interval of the pipeline.

In order to implement a Streaming Source Plugin, you extend the StreamingSource class.

Methods

  • configurePipeline(): Used to perform any validation on the application configuration that is required by this plugin or to create any streams or datasets if the fieldName for a stream or dataset is not a macro.
  • getStream(): Returns the JavaDStream that will be used as a source in the pipeline.

Example:

@Plugin(type = StreamingSource.PLUGIN_TYPE)
@Name("Twitter")
@Description("Twitter streaming source.")
public class TwitterStreamingSource extends StreamingSource<StructuredRecord> {
  private final TwitterStreamingConfig config;

  /**
   * Config class for TwitterStreamingSource.
   */
  public static class TwitterStreamingConfig extends PluginConfig implements Serializable {
    private static final long serialVersionUID = 4218063781909515444L;

    private String consumerKey;

    private String consumerSecret;

    private String accessToken;

    private String accessTokenSecret;

    private String referenceName;
  }

  public TwitterStreamingSource(TwitterStreamingConfig config) {
    this.config = config;
  }

  @Override
  public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
    pipelineConfigurer.getStageConfigurer().setOutputSchema(TwitterConstants.SCHEMA);
  }

  @Override
  public JavaDStream<StructuredRecord> getStream(StreamingContext context) throws Exception {
    JavaStreamingContext javaStreamingContext = context.getSparkStreamingContext();
    // lineage for this source will be tracked with this reference name
    context.registerLineage(config.referenceName);

    // Create authorization from user-provided properties
    ConfigurationBuilder configurationBuilder = new ConfigurationBuilder();
    configurationBuilder.setDebugEnabled(false)
      .setOAuthConsumerKey(config.consumerKey)
      .setOAuthConsumerSecret(config.consumerSecret)
      .setOAuthAccessToken(config.accessToken)
      .setOAuthAccessTokenSecret(config.accessTokenSecret);
    Authorization authorization = new OAuthAuthorization(configurationBuilder.build());

    return TwitterUtils.createStream(javaStreamingContext, authorization).map(
      new Function<Status, StructuredRecord>() {
        public StructuredRecord call(Status status) {
          return convertTweet(status);
        }
      }
    );
  }

  private StructuredRecord convertTweet(Status tweet) {
    // logic to convert a Twitter Status into a CDAP StructuredRecord
  }

}

Lineage

The lineage is registered using the referenceName provided when invoking registerLineage() on StreamingContext in getStream(). Note that the referenceName should be a valid DatasetId.

Windower Plugin

A Windower plugin is used in real-time data pipelines to create sliding windows over the data. It does this by combining multiple micro batches into larger batches.

A window is defined by its size and its slide interval. Both are defined in seconds and must be multiples of the batchInterval of the pipeline. The size defines how much data is contained in the window. The slide interval defines have often a window is created.

For example, consider a pipeline with a batchInterval of 10 seconds. The pipeline uses a windower that has a size of 60 and a slide interval of 30. The input into the windower will be micro batches containing 10 seconds of data. Every 30 seconds, the windower will output a batch of data containing the past 60 seconds of data, meaning the previous six micro batches that it received as input.

This also means that each window output will overlap (repeat) some of the data from the previous window. This is useful in calculating aggregates, such as how many "404" responses did a website send out in the past ten seconds, past minute, past five minutes.

In order to implement a Windower Plugin, you extend the Windower class.

Methods

  • configurePipeline(): Used to perform any validation on the application configuration that is required by this plugin or to create any streams or datasets if the fieldName for a stream or dataset is not a macro.
  • getWidth(): Return the width in seconds of windows created by this plugin. Must be a multiple of the batchInterval of the pipeline.
  • getSlideInterval(): Get the slide interval in seconds of windows created by this plugin. Must be a multiple of the batchInterval of the pipeline.

Example:

@Plugin(type = Windower.PLUGIN_TYPE)
@Name("Window")
@Description("Creates a sliding window over the data")
public class Window extends Windower {
  private final Conf conf;

  /**
   * Config for window plugin.
   */
  public static class Conf extends PluginConfig {
    long width;
    long slideInterval;
  }

  public Window(Conf conf) {
    this.conf = conf;
  }

  @Override
  public long getWidth() {
    return conf.width;
  }

  @Override
  public long getSlideInterval() {
    return conf.slideInterval;
  }
}