CDAP Flume

The CDAP Sink is a Flume Sink implementation using the CDAP RESTful Stream interface to write events received from a source.


To use, put the CDAP Sink jar file in the Flume classpath (for example, in the Flume lib directory). The JAR can be obtained from Maven Central.

Specify the fully-qualified name of your CDAP Sink class in the Flume configuration properties:

a1.sinks.sink1.type = co.cask.cdap.flume.StreamSink

Enter the host name or host-ip that is used by the Stream Client: = <hostname | host-ip>

Set the target Stream name:

a1.sinks.sink1.streamName = <stream-name>

Optional parameters that can be specified are listed below, with their default values.

  • Enter the host port that is used by the Stream Client:

    a1.sinks.sink1.port = 11015
  • Secure Socket Layer mode [true | false]

    a1.sinks.sink1.sslEnabled = false
  • Verify SSL Certificate [true | false]:

    a1.sinks.sink1.verifySSLCert = true
    # Set it to false to suspend certificate checks;
    # this allows self-signed certificates when SSL is true
  • Number of threads to which Stream Client can send events:

    a1.sinks.sink1.writerPoolSize = 10
  • CDAP Router server version:

    a1.sinks.sink1.version = v3
  • CDAP Namespace:

    a1.sinks.sink1.namespace = default

Authentication Client

To use authentication, add these authentication client configuration parameters to the sink configuration file:

Fully qualified class name of the client class:

a1.sinks.sink1.authClientClass =

Path to the authentication client properties file:

a1.sinks.sink1.authClientProperties = /usr/local/apache-flume/conf/auth_client.conf

Authentication Client Example Configuration

# User name
# User password


Configuration of a Flume agent that reads data from a log file and puts it to CDAP using CDAP Sink:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /tmp/log
a1.sources.r1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = co.cask.cdap.flume.StreamSink = c1  =
a1.sinks.k1.port = 11015
a1.sinks.k1.streamName = logEventStream
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

Source Code Repository

Source code (and other resources) for this page are available at the CDAP Ingest Project GitHub repository.