Java Client API

The Cask Data Application Platform (CDAP) Java Client API provides methods for interacting with CDAP from Java applications.

Maven Dependency

To use the Java Client API in your project, add this Maven dependency:

<dependency>
  <groupId>co.cask.cdap</groupId>
  <artifactId>cdap-client</artifactId>
  <version>${cdap.version}</version>
</dependency>

Components

The Java Client API allows you to interact with these CDAP components:

The above list links to the examples below for each portion of the API.

Configuring your *Client

Every *Client constructor requires a ClientConfig instance which configures the hostname and port of the CDAP instance that you wish to interact with.

In a non-secure (default) CDAP instance, instantiate as follows:

// Interact with the CDAP instance located at example.com, port 11015
ClientConfig clientConfig = ClientConfig.builder()
  .setConnectionConfig(new ConnectionConfig("example.com", 11015, false))
  .build();

In a secure CDAP instance, first pull in the cdap-authentication-client Maven dependency:

<dependency>
  <groupId>co.cask.cdap</groupId>
  <artifactId>cdap-authentication-client</artifactId>
  <version>${cdap.client.version}</version>
</dependency>

Then, instantiate as follows:

// Obtain AccessToken
AuthenticationClient authenticationClient = new BasicAuthenticationClient();
authenticationClient.setConnectionInfo("example.com", 11015, sslEnabled);
// Configure the AuthenticationClient as documented in
// https://github.com/caskdata/cdap-clients/blob/develop/cdap-authentication-clients/java
AccessToken accessToken = authenticationClient.getAccessToken();

// Interact with the secure CDAP instance located at example.com, port 11015, with the provided accessToken
ClientConfig clientConfig = ClientConfig.builder()
  .setConnectionConfig(new ConnectionConfig("example.com", 11015, sslEnabled))
  .setAccessToken(accessToken)
  .build();

ApplicationClient

ClientConfig clientConfig;

// Construct the client used to interact with CDAP
ApplicationClient appClient = new ApplicationClient(clientConfig);

// Fetch the list of applications
List<ApplicationRecord> apps = appClient.list(NamespaceId.DEFAULT);

// Deploy an application
File appJarFile = new File("your-app.jar");
appClient.deploy(NamespaceId.DEFAULT, appJarFile);

// Delete an application
appClient.delete(NamespaceId.DEFAULT.app("Purchase"));

// List programs belonging to an application
appClient.listPrograms(NamespaceId.DEFAULT.app("Purchase"));

DatasetClient

ClientConfig clientConfig;

// Construct the client used to interact with CDAP
DatasetClient datasetClient = new DatasetClient(clientConfig);

// Fetch the list of datasets
List<DatasetSpecificationSummary> datasets = datasetClient.list(NamespaceId.DEFAULT);

// Create a dataset
DatasetId datasetId = NamespaceId.DEFAULT.dataset("someDataset");
datasetClient.create(datasetId, "someDatasetType");

// Truncate a dataset
datasetClient.truncate(datasetId);

// Delete a dataset
datasetClient.delete(datasetId);

DatasetModuleClient

ClientConfig clientConfig;

// Construct the client used to interact with CDAP
DatasetModuleClient datasetModuleClient = new DatasetModuleClient(clientConfig);

// Add a dataset module
File moduleJarFile = createAppJarFile(SomeDatasetModule.class);
DatasetModuleId datasetModuleId = NamespaceId.DEFAULT.datasetModule("someDatasetModule");
datasetModuleClient.add(datasetModuleId, SomeDatasetModule.class.getName(), moduleJarFile);

// Fetch the dataset module information
DatasetModuleMeta datasetModuleMeta = datasetModuleClient.get(datasetModuleId);

// Delete all dataset modules
datasetModuleClient.deleteAll(NamespaceId.DEFAULT);

DatasetTypeClient

ClientConfig clientConfig;

// Construct the client used to interact with CDAP
DatasetTypeClient datasetTypeClient = new DatasetTypeClient(clientConfig);

// Fetch the dataset type information using the type name
DatasetTypeMeta datasetTypeMeta = datasetTypeClient.get(NamespaceId.DEFAULT.datasetType("someDatasetType"));

// Fetch the dataset type information using the classname
datasetTypeMeta = datasetTypeClient.get(NamespaceId.DEFAULT.datasetType(SomeDataset.class.getName()));

MetricsClient

ClientConfig clientConfig;

// Construct the client used to interact with CDAP
MetricsClient metricsClient = new MetricsClient(clientConfig);

// Fetch the total number of events that have been processed by a flowlet
RuntimeMetrics metric = metricsClient.getFlowletMetrics(NamespaceId.DEFAULT.app("HelloWorld").flow("someFlow").flowlet("someFlowlet"));
long processed = metric.getProcessed();

MonitorClient

ClientConfig clientConfig;

// Construct the client used to interact with CDAP
MonitorClient monitorClient = new MonitorClient(clientConfig);

// Fetch the list of system services
List<SystemServiceMeta> services = monitorClient.listSystemServices();

// Fetch status of system transaction service
String serviceStatus = monitorClient.getSystemServiceStatus("transaction");

// Fetch the number of instances of the system transaction service
int systemServiceInstances = monitorClient.getSystemServiceInstances("transaction");

// Set the number of instances of the system transaction service
monitorClient.setSystemServiceInstances("transaction", 1);

PreferencesClient

ClientConfig clientConfig;

// Construct the client used to interact with CDAP
PreferencesClient preferencesClient = new PreferencesClient(clientConfig);

Map<String, String> propMap = Maps.newHashMap();
propMap.put("k1", "v1");

// Set preferences at the Instance level
preferencesClient.setInstancePreferences(propMap);

// Get preferences at the Instance level
Map<String, String> currentPropMap = preferencesClient.getInstancePreferences();

// Delete preferences at the Instance level
preferencesClient.deleteInstancePreferences();

// Set preferences of MyApp application which is deployed in the Dev namespace
preferencesClient.setApplicationPreferences(new NamespaceId("Dev").app("MyApp"), propMap);

// Get only the preferences of MyApp application which is deployed in the Dev namespace
Map<String, String> appPrefs = preferencesClient.getApplicationPreferences(new NamespaceId("Dev").app("MyApp"), false);

// Get the resolved preferences (collapsed with higher level(s) of preferences)
Map<String, String> resolvedAppPrefs = preferencesClient.getApplicationPreferences(new NamespaceId("Dev").app("MyApp"), true);

ProgramClient

ClientConfig clientConfig;

// Construct the client used to interact with CDAP
ProgramClient programClient = new ProgramClient(clientConfig);

// Start a service in the WordCount example
programClient.start(NamespaceId.DEFAULT.app("WordCount").service("RetrieveCounts"));

// formatted in JSON
programClient.getLiveInfo(NamespaceId.DEFAULT.app("HelloWorld").service("greet"));

// Fetch program logs in the WordCount example
programClient.getProgramLogs(NamespaceId.DEFAULT.app("WordCount").service("RetrieveCounts"), 0, Long.MAX_VALUE);

// Scale a service in the HelloWorld example
programClient.setServiceInstances(NamespaceId.DEFAULT.app("HelloWorld").service("greet"), 3);

// Stop a service in the HelloWorld example
programClient.stop(NamespaceId.DEFAULT.app("HelloWorld").service("greet"));

// Start, scale, and stop a flow in the WordCount example
programClient.start(NamespaceId.DEFAULT.app("WordCount").flow("WordCountFlow"));

// Fetch the last 10 flow runs in the WordCount example
programClient.getAllProgramRuns(NamespaceId.DEFAULT.app("WordCount").flow("WordCountFlow"), 0, Long.MAX_VALUE, 10);

// Scale a flowlet in the WordCount example
programClient.setFlowletInstances(NamespaceId.DEFAULT.app("WordCount").flow("WordCountFlow").flowlet("Tokenizer"), 3);

// Stop a flow in the WordCount example
programClient.stop(NamespaceId.DEFAULT.app("WordCount").flow("WordCountFlow"));

QueryClient

ClientConfig clientConfig;

// Construct the client used to interact with CDAP
QueryClient queryClient = new QueryClient(clientConfig);

// Perform an ad-hoc query using the Purchase example
ListenableFuture<ExploreExecutionResult> resultFuture = queryClient.execute(NamespaceId.DEFAULT, "SELECT * FROM dataset_history WHERE customer IN ('Alice','Bob')");
ExploreExecutionResult results = resultFuture.get();

// Fetch schema
List<ColumnDesc> schema = results.getResultSchema();
String[] header = new String[schema.size()];
for (int i = 0; i < header.length; i++) {
  ColumnDesc column = schema.get(i);
  // Hive columns start at 1
  int index = column.getPosition() - 1;
  header[index] = column.getName() + ": " + column.getType();
}

ServiceClient

ClientConfig clientConfig;

// Construct the client used to interact with CDAP
ServiceClient serviceClient = new ServiceClient(clientConfig);

// Fetch service information using the service in the PurchaseApp example
ServiceSpecification serviceSpec = serviceClient.get(NamespaceId.DEFAULT.app("PurchaseApp").service("CatalogLookup"));

StreamClient

ClientConfig clientConfig;

// Construct the client used to interact with CDAP
StreamClient streamClient = new StreamClient(clientConfig);

// Fetch the stream list
List streams = streamClient.list(NamespaceId.DEFAULT);

// Create a stream, using the Purchase example
StreamId streamId = NamespaceId.DEFAULT.stream("purchases");
streamClient.create(streamId);

// Fetch a stream's properties
StreamProperties config = streamClient.getConfig(streamId);

// Send events to a stream
streamClient.sendEvent(streamId, "Tom bought 5 apples for $10");

// Read all events from a stream (results in events)
List<StreamEvent> events = Lists.newArrayList();
streamClient.getEvents(streamId, 0, Long.MAX_VALUE, Integer.MAX_VALUE, events);

// Read first 5 events from a stream (results in events)
events = Lists.newArrayList();
streamClient.getEvents(streamId, 0, Long.MAX_VALUE, 5, events);

// Read 2nd and 3rd events from a stream, after first calling getEvents
long startTime = events.get(1).getTimestamp();
long endTime = events.get(2).getTimestamp() + 1;
events.clear();
streamClient.getEvents(streamId, startTime, endTime, Integer.MAX_VALUE, events);

// Write asynchronously to a stream
streamId = NamespaceId.DEFAULT.stream("testAsync");
events = Lists.newArrayList();

streamClient.create(streamId);

// Send 10 async writes
int msgCount = 10;
for (int i = 0; i < msgCount; i++) {
  streamClient.asyncSendEvent(streamId, "Testing " + i);
}

// Read them back; need to read it multiple times as the writes happen asynchronously
while (events.size() != msgCount) {
  events.clear();
  streamClient.getEvents(streamId, 0, Long.MAX_VALUE, msgCount, events);
}

// Check that there are no more events
events.clear();
while (events.isEmpty()) {
  events.clear();
  streamClient.getEvents(streamId, 0, Long.MAX_VALUE, msgCount, events);
}