CDAP Stream Client for Python

The Stream Client Python API is for managing Streams from Python applications.

Supported Actions

  • Create a Stream
  • Update TTL (time-to-live, in seconds) for an existing Stream
  • Retrieve the current Stream TTL (in seconds)
  • Truncate an existing Stream (the deletion of all events that were written to the Stream)
  • Write an event to an existing Stream


To install the CDAP Stream Client, run:

$ pip install cdap-stream-client

To install the development version, clone the repository:

$ git clone
$ cd cdap-ingest/cdap-stream-clients/python/
$ python install

Supported Python versions: 2.6, 2.7


To use the Stream Client Python API, include these imports in your Python script:

from cdap_stream_client import Config
from cdap_stream_client import StreamClient


Creating a StreamClient

Create a StreamClient instance with default parameters:

stream_client = StreamClient()

Optional configurations that can be set (and their default values):

  • host: localhost
  • port: 11015
  • namespace: default
  • ssl: False (set to True to use HTTPS protocol)
  • ssl_cert_check: True (set to False to suspend certificate checks; this allows self-signed certificates when SSL is True)
  • authClient: null (CDAP Authentication Client to interact with a secure CDAP instance)


config = Config() = 'localhost'
config.port = 11015
config.namespace = example
config.ssl = True

stream_client = StreamClient(config)

Creating a Stream

Create a new Stream with the <stream-id> newStreamName:



  • The <stream-id> should only contain ASCII letters, digits and hyphens.
  • If the Stream already exists, no error is returned, and the existing Stream remains in place.

Creating a StreamWriter

Create a StreamWriter instance for writing events to the Stream streamName:

stream_writer = stream_client.create_writer("streamName")

Writing Stream Events

To write new events to the Stream, use the write method of the StreamWriter class:

def write(self, message, charset=None, headers=None)


stream_promise = stream_writer.write("New stream event")

Truncating a Stream

To delete all events that were written to the Stream streamName, use:


Updating Stream Time-to-Live (TTL)

Update TTL (in seconds) for the Stream streamName:

stream_client.set_ttl("streamName", newTTL)

Getting Stream Time-to-Live (TTL)

Get the current TTL value (in seconds) for the Stream streamName:

ttl = stream_client.get_ttl("streamName")


StreamPromise's goal is to implement deferred code execution.

For error handling, create a handler for each case and set it using the onResponse method. The error handling callback function is optional.


def on_ok_response(response):
    parse response

def on_error_response(response):
    parse response

stream_promise = stream_writer.write("New stream event")
stream_promise.on_response(on_ok_response, on_error_response)

Source Code Repository

Source code (and other resources) for this page are available at the CDAP Ingest Project GitHub repository.