Implement a client for Livestream data
Use the following information to create a client that can consume Analytics Livestream data:
Install the required software:
- Java 11
- Maven
Update the sample code below:
- Add your stream ID where specified in the code.
- Add your generated access token where specified in the code. For information on generating this token, see the User Authentication Guide.
Build a runnable JAR:
Copied to your clipboard* Use the following: ```$ mvn package``` while running the application$ java target/adobe-livestream-consumer-1.0-SNAPSHOT.jar
.Copied to your clipboard* It is highly recommended to have separate threads, one for reading Livestream and another to perform work on the data.* The schema can be found at the following URL: https://livestream.adobe.net/api/1/schema.v
XML
resources/java/livestream/pom.xml
Copied to your clipboard<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.adobe.analytics</groupId><artifactId>adobe-livestream-consumer</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><name>adobe-livestream-consumer</name><url>http://maven.apache.org</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>11</maven.compiler.source><maven.compiler.target>11</maven.compiler.target></properties><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-jar-plugin</artifactId><version>3.3.0</version><configuration><archive><manifest><addClasspath>true</addClasspath><mainClass>com.adobe.analytics.LivestreamConsumer</mainClass></manifest></archive></configuration></plugin></plugins></build></project>
JAVA
resources/java/livestream/src/main/java/com/adobe/analytics/LivestreamConsumer.java
Copied to your clipboardpackage com.adobe.analytics;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStream;import java.io.InputStreamReader;import java.net.HttpURLConnection;import java.net.URL;import java.time.Duration;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.zip.GZIPInputStream;public class LivestreamConsumer implements Runnable{private final BlockingQueue<String> dataQueue = new ArrayBlockingQueue<>(1000);public static void main( String[] args ) throws IOException, InterruptedException {LivestreamConsumer consumer = new LivestreamConsumer();consumer.readFromLivestream();}public void readFromLivestream() throws IOException, InterruptedException {String accessToken = "PUT ACCESS TOKEN HERE";String streamId = "PUT STREAM ID HERE";String streamUrl = "https://livestream.adobe.net/api/1/stream/" + streamId;HttpURLConnection connection = getConnection(streamUrl, accessToken);int responseCode = connection.getResponseCode();while(responseCode != HttpURLConnection.HTTP_OK) {if(responseCode == HttpURLConnection.HTTP_MOVED_TEMP || responseCode == HttpURLConnection.HTTP_MOVED_PERM) {// Follow the redirect// Pass Authorization Header alongString redirectLocation = connection.getHeaderField("location");System.out.println("Redirecting to: " + redirectLocation);connection.disconnect();connection = getConnection(redirectLocation, accessToken);responseCode = connection.getResponseCode();} else if (responseCode == HttpURLConnection.HTTP_CONFLICT) {connection.disconnect();throw new RuntimeException("Connection refused due to conflict. Check that the maxConnections query string value is identical across connections.");} else {connection.disconnect();throw new RuntimeException("Unexpected response code: " + responseCode);}}// Spin up a new thread to handle data processingExecutorService executorService = Executors.newSingleThreadExecutor();executorService.submit(this);// The stream is always compressed using GZIPInputStream inputStream = new GZIPInputStream(connection.getInputStream());BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));String line = reader.readLine();while (line != null) {// In order to ensure we keep up with incoming Livestream data, delegate work to another thread// This could be replaced with writing to a data queue like Apache KafkadataQueue.put(line);line = reader.readLine();}reader.close();connection.disconnect();executorService.shutdownNow();}private HttpURLConnection getConnection(final String streamUrl, final String accessToken) throws IOException {URL url = new URL(streamUrl);HttpURLConnection connection = (HttpURLConnection) url.openConnection();connection.setReadTimeout((int) Duration.ofSeconds(60).toMillis());connection.setConnectTimeout((int) Duration.ofSeconds(10).toMillis());connection.setRequestProperty("Accept-Encoding", "gzip");connection.setRequestProperty("Authorization", "Bearer " + accessToken);// Many client libraries remove the Authorization header on a redirect// This application handles redirects manually to avoid thatconnection.setInstanceFollowRedirects(false);return connection;}@Overridepublic void run() {try {while(true) {String data = dataQueue.take();// Do data processing here, in a separate threadSystem.out.println(data);}} catch (InterruptedException e) {throw new RuntimeException(e);}}}