🔗Streams

Streams are the primary means of bringing data from external systems into the CDAP in real time. They are ordered, time-partitioned sequences of data, usable for real-time collection and consumption of data.

They can be created programmatically within your application, using the Stream HTTP RESTful API, the StreamClient of the Java Client API, or by using the CDAP Command Line Interface.

Data written to a stream can be consumed in real time by flows or in batch by MapReduce programs..

🔗Creating a Stream

You specify a stream in your application specification:

addStream(new Stream("myStream"));

This specifies a new stream named myStream.

Streams are uniquely identified by a combination of the namespace and the stream name and are explicitly created before being used. Names used for streams need to be unique within a namespace, as streams are shared between applications. Names that start with an underscore (_) will not be visible in the home page of the CDAP UI, though they will be visible elsewhere in the CDAP UI.

🔗Writing To a Stream

You can write to streams either one operation at a time or in batches, using either the Stream HTTP RESTful API or the Command Line Interface.

Each individual signal sent to a stream is stored as a StreamEvent, which is comprised of a header (a map of strings for metadata) and a body (a blob of binary data).

🔗Reading From a Stream

To convert the binary data into a string, you need to take into account the character encoding of the data, such as shown in this code fragment:

@ProcessInput
public void process(StreamEvent myStreamEvent) {
  String event = Charsets.UTF_8.decode(myStreamEvent.getBody()).toString();
...
}

🔗Stream Time-To-Live (TTL)

Streams are persisted by CDAP, and once an event has been sent to a stream, by default it never expires. The Time-To-Live (TTL, specified in seconds) property governs how long an event is valid for consumption since it was written to the stream. The default TTL for all streams is infinite, meaning that events will never expire. The TTL property of a stream can be changed, using the Stream HTTP RESTful API, the StreamClient of the Java Client API, or by using the Command Line Interface.

🔗Truncating and Deleting a Stream

Streams can be truncated, which means deleting all events that were ever written to the stream. This is permanent and cannot be undone. They can be truncated using the Stream HTTP RESTful API, the StreamClient of the Java Client API, or by using the Command Line Interface.

Deleting a stream means deleting the endpoint so that events can no longer be written to it. This is permanent and cannot be undone. If another stream is created with the same name, it will not return any of the previous stream's events.

🔗Stream Notifications

Streams publish notifications internally to CDAP when they ingest data. The increment of data that they have to ingest in order for a notification to be published is defined by the notification.threshold.mb configuration of a stream, and can be changed using the Stream HTTP RESTful API, the StreamClient of the Java Client API, or by using the Command Line Interface. When creating a stream, by default the threshold is set to the value of stream.notification.threshold in the cdap-site.xml.

The notifications describe the absolute size of events ever ingested by a stream, and as such, they will always describe increasing data size. In particular, they do not reset when the stream is truncated, and they do not decrease when part of the data ingested by the stream has reach the TTL.

Stream-size notifications are used by stream-size schedules.

🔗Stream Examples

Streams are included in just about every CDAP application, tutorial, guide or example.

  • The simplest example, Hello World, demonstrates using a stream to ingest a name into a dataset.
  • For an example of pushing events to a stream, see the Purchase example and its CDAP CLI command that injects data to a stream.
  • For an example of reading events from a stream, see the Purchase example, where the class PurchaseStreamReader reads events from a stream.
  • For an example of reading from a stream with a MapReduce program, see the Batch Data Processing with CDAP, where the class TopClientsMapReduce reads events from a stream using the method Input.ofStream().