From 73017b75c91e420937f507c894ab54fed4de60ec Mon Sep 17 00:00:00 2001 From: Greg Holmes Date: Wed, 1 Apr 2026 10:24:18 +0100 Subject: [PATCH] feat: integrate Deepgram Java SDK for live text-to-speech WebSocket --- pom.xml | 23 +- src/main/java/com/deepgram/starter/App.java | 451 ++++++++++---------- 2 files changed, 224 insertions(+), 250 deletions(-) diff --git a/pom.xml b/pom.xml index cfa346f..93f1450 100644 --- a/pom.xml +++ b/pom.xml @@ -19,6 +19,13 @@ + + + com.deepgram + deepgram-java-sdk + 0.2.0 + + io.javalin @@ -37,14 +44,7 @@ com.fasterxml.jackson.dataformat jackson-dataformat-toml - 2.18.2 - - - - - com.fasterxml.jackson.core - jackson-databind - 2.18.2 + 2.18.6 @@ -67,13 +67,6 @@ slf4j-simple 2.0.16 - - - - org.eclipse.jetty.websocket - websocket-jetty-client - 11.0.24 - diff --git a/src/main/java/com/deepgram/starter/App.java b/src/main/java/com/deepgram/starter/App.java index f58804d..1d9b0aa 100644 --- a/src/main/java/com/deepgram/starter/App.java +++ b/src/main/java/com/deepgram/starter/App.java @@ -1,9 +1,9 @@ /** - * Java Live Text-to-Speech Starter - Javalin Backend Server + * Java Live Text-to-Speech Starter - Backend Server * - * Simple WebSocket proxy to Deepgram's Live TTS API. - * Forwards all messages (JSON text and binary audio) bidirectionally - * between client and Deepgram. + * Simple WebSocket proxy to Deepgram's Live TTS API using the Deepgram Java SDK. + * Forwards text messages from the browser to the SDK's Speak V1 WebSocket client, + * and forwards binary audio chunks back to the browser. * * Routes: * GET /api/session - Issue JWT session token @@ -13,45 +13,55 @@ */ package com.deepgram.starter; +// ============================================================================ +// SECTION 1: IMPORTS +// ============================================================================ + import com.auth0.jwt.JWT; import com.auth0.jwt.JWTVerifier; import com.auth0.jwt.algorithms.Algorithm; import com.auth0.jwt.exceptions.JWTVerificationException; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.toml.TomlMapper; import io.github.cdimascio.dotenv.Dotenv; import io.javalin.Javalin; +import io.javalin.websocket.WsConfig; import io.javalin.websocket.WsContext; -import org.eclipse.jetty.websocket.api.Session; -import org.eclipse.jetty.websocket.api.StatusCode; -import org.eclipse.jetty.websocket.api.annotations.*; -import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; -import org.eclipse.jetty.websocket.client.WebSocketClient; - +import com.deepgram.DeepgramClient; +import com.deepgram.resources.speak.v1.websocket.V1WebSocketClient; +import com.deepgram.resources.speak.v1.websocket.V1ConnectOptions; +import com.deepgram.resources.speak.v1.types.SpeakV1Text; +import com.deepgram.resources.speak.v1.types.SpeakV1Flush; +import com.deepgram.resources.speak.v1.types.SpeakV1Clear; +import com.deepgram.resources.speak.v1.types.SpeakV1Close; +import com.deepgram.types.SpeakV1Encoding; +import com.deepgram.types.SpeakV1Model; +import com.deepgram.types.SpeakV1SampleRate; + +import java.io.File; import java.io.InputStream; -import java.net.URI; import java.nio.ByteBuffer; import java.security.SecureRandom; import java.time.Instant; -import java.util.*; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; // ============================================================================ -// MAIN APPLICATION +// SECTION 2: MAIN APPLICATION // ============================================================================ public class App { // ======================================================================== - // CONFIGURATION + // SECTION 3: CONFIGURATION // ======================================================================== private static final Dotenv dotenv = Dotenv.configure().ignoreIfMissing().load(); private static final String DEEPGRAM_API_KEY = getRequiredEnv("DEEPGRAM_API_KEY"); - private static final String DEEPGRAM_TTS_URL = "wss://api.deepgram.com/v1/speak"; private static final int PORT = Integer.parseInt(getEnv("PORT", "8081")); private static final String HOST = getEnv("HOST", "0.0.0.0"); @@ -64,21 +74,12 @@ public class App { private static final ObjectMapper JSON_MAPPER = new ObjectMapper(); private static final TomlMapper TOML_MAPPER = new TomlMapper(); - /** Track active client WebSocket contexts for graceful shutdown */ - private static final Set activeConnections = ConcurrentHashMap.newKeySet(); - - /** - * Map each client WsContext to its upstream Deepgram session. - * WsContext is used as the key per Javalin's recommended pattern -- - * each connection gets its own unique WsContext instance. - */ - private static final Map deepgramSessions = new ConcurrentHashMap<>(); - - /** Shared Jetty WebSocket client for outbound Deepgram connections */ - private static final WebSocketClient wsClient = new WebSocketClient(); + /** Map of browser WsContext -> SDK V1WebSocketClient for cleanup */ + private static final ConcurrentHashMap activeConnections = + new ConcurrentHashMap<>(); // ======================================================================== - // ENVIRONMENT HELPERS + // SECTION 4: ENVIRONMENT HELPERS // ======================================================================== /** @@ -119,7 +120,7 @@ private static String generateRandomSecret() { } // ======================================================================== - // SESSION AUTH - JWT tokens for production security + // SECTION 5: SESSION AUTH - JWT tokens for production security // ======================================================================== /** @@ -156,7 +157,7 @@ private static String validateWsToken(String protocols) { } // ======================================================================== - // METADATA - Read deepgram.toml [meta] section + // SECTION 6: METADATA - Read deepgram.toml [meta] section // ======================================================================== /** @@ -167,7 +168,7 @@ private static Map readMetadata() throws Exception { try (InputStream is = App.class.getClassLoader().getResourceAsStream("deepgram.toml")) { if (is == null) { // Fall back to filesystem for local development - java.io.File file = new java.io.File("deepgram.toml"); + File file = new File("deepgram.toml"); if (!file.exists()) { throw new RuntimeException("deepgram.toml not found"); } @@ -188,99 +189,213 @@ private static Map readMetadata() throws Exception { } // ======================================================================== - // DEEPGRAM WEBSOCKET HANDLER (upstream connection) + // SECTION 7: WEBSOCKET ROUTE - Live Text-to-Speech Proxy // ======================================================================== /** - * Jetty WebSocket endpoint that receives messages from Deepgram and - * forwards them to the corresponding client WebSocket. + * Configures the WebSocket endpoint for live text-to-speech. + * Acts as a bidirectional proxy: browser <-> Javalin <-> Deepgram SDK Speak V1 WebSocket. + * + * The browser sends JSON text messages (e.g., { type: "Speak", text: "..." }) + * and receives binary audio chunks back. The SDK handles the outbound connection + * to Deepgram, authentication, and WebSocket lifecycle. + * + * @param ws Javalin WebSocket config */ - @WebSocket - public static class DeepgramSocket { - private final WsContext clientCtx; + private static void handleLiveTextToSpeech(WsConfig ws) { + + ws.onConnect(ctx -> { + // Validate JWT from subprotocol header + String protocols = ctx.header("Sec-WebSocket-Protocol"); + String validProto = validateWsToken(protocols); + if (validProto == null) { + System.out.println("WebSocket auth failed: invalid or missing token"); + ctx.closeSession(4401, "Unauthorized"); + return; + } - public DeepgramSocket(WsContext clientCtx) { - this.clientCtx = clientCtx; - } + System.out.println("Client connected to /api/live-text-to-speech"); - @OnWebSocketConnect - public void onOpen(Session session) { - System.out.println("Connected to Deepgram TTS API"); - deepgramSessions.put(clientCtx, session); - } + // Parse query parameters from the WebSocket URL + String model = ctx.queryParam("model") != null ? ctx.queryParam("model") : "aura-asteria-en"; + String encoding = ctx.queryParam("encoding") != null ? ctx.queryParam("encoding") : "linear16"; + String sampleRate = ctx.queryParam("sample_rate") != null ? ctx.queryParam("sample_rate") : "48000"; - @OnWebSocketMessage - public void onTextMessage(Session session, String message) { - // Forward JSON text messages from Deepgram to client - if (clientCtx.session.isOpen()) { - clientCtx.send(message); - } - } + System.out.println("Connecting to Deepgram TTS: model=" + model + + ", encoding=" + encoding + ", sample_rate=" + sampleRate); - @OnWebSocketMessage - public void onBinaryMessage(byte[] payload, int offset, int len) { - // Forward binary audio data from Deepgram to client - if (clientCtx.session.isOpen()) { - byte[] data = new byte[len]; - System.arraycopy(payload, offset, data, 0, len); - clientCtx.send(ByteBuffer.wrap(data)); - } - } + // Create SDK client for this connection + DeepgramClient dgClient = DeepgramClient.builder() + .apiKey(DEEPGRAM_API_KEY) + .build(); + + V1WebSocketClient dgWs = dgClient.speak().v1().v1WebSocket(); + + // Forward binary audio chunks from Deepgram to browser + dgWs.onSpeakV1Audio(audioBytes -> { + try { + if (ctx.session.isOpen()) { + ctx.send(ByteBuffer.wrap(audioBytes.toByteArray())); + } + } catch (Exception e) { + System.err.println("Error forwarding audio to browser: " + e.getMessage()); + } + }); + + // Forward JSON text messages (metadata, flushed, warning, etc.) from Deepgram to browser + dgWs.onMessage(json -> { + try { + // Only forward JSON text messages, not binary (audio handled by onSpeakV1Audio) + if (ctx.session.isOpen()) { + ctx.send(json); + } + } catch (Exception e) { + System.err.println("Error forwarding message to browser: " + e.getMessage()); + } + }); + + dgWs.onError(e -> { + System.err.println("Deepgram WebSocket error: " + e.getMessage()); + try { + if (ctx.session.isOpen()) { + String errorJson = JSON_MAPPER.writeValueAsString(Map.of( + "type", "Error", + "description", e.getMessage() != null ? e.getMessage() : "Deepgram connection error", + "code", "PROVIDER_ERROR" + )); + ctx.send(errorJson); + } + } catch (Exception ex) { + System.err.println("Failed to send error to client: " + ex.getMessage()); + } + }); + + dgWs.onDisconnected(reason -> { + System.out.println("Deepgram TTS connection closed: " + reason); + try { + if (ctx.session.isOpen()) { + ctx.closeSession(1000, "Deepgram disconnected"); + } + } catch (Exception ignored) {} + activeConnections.remove(ctx); + }); + + // Build connection options using SDK builder + V1ConnectOptions.Builder optionsBuilder = V1ConnectOptions.builder(); + optionsBuilder + .model(SpeakV1Model.valueOf(model)) + .encoding(SpeakV1Encoding.valueOf(encoding)) + .sampleRate(SpeakV1SampleRate.valueOf(sampleRate)); + V1ConnectOptions options = optionsBuilder.build(); + + // Connect to Deepgram via SDK + dgWs.connect(options).thenRun(() -> { + activeConnections.put(ctx, dgWs); + System.out.println("Live TTS session started (model=" + model + ")"); + }).exceptionally(e -> { + System.err.println("Failed to connect to Deepgram TTS: " + e.getMessage()); + try { + if (ctx.session.isOpen()) { + ctx.closeSession(1011, "Failed to connect to Deepgram"); + } + } catch (Exception ignored) {} + return null; + }); + }); - @OnWebSocketError - public void onError(Session session, Throwable error) { - System.err.println("Deepgram WebSocket error: " + error.getMessage()); - if (clientCtx.session.isOpen()) { + // Forward text messages from client to Deepgram via SDK + // The client sends JSON messages like { type: "Speak", text: "..." } + ws.onMessage(ctx -> { + V1WebSocketClient dgWs = activeConnections.get(ctx); + if (dgWs != null) { try { - String errorJson = JSON_MAPPER.writeValueAsString(Map.of( - "type", "Error", - "description", error.getMessage() != null ? error.getMessage() : "Deepgram connection error", - "code", "PROVIDER_ERROR" - )); - clientCtx.send(errorJson); + String message = ctx.message(); + JsonNode node = JSON_MAPPER.readTree(message); + String type = node.has("type") ? node.get("type").asText() : ""; + + switch (type) { + case "Speak": + String text = node.has("text") ? node.get("text").asText() : ""; + dgWs.sendText(SpeakV1Text.builder().text(text).build()); + break; + case "Flush": + dgWs.sendFlush(SpeakV1Flush.builder() + .type(com.deepgram.resources.speak.v1.types.SpeakV1FlushType.FLUSH).build()); + break; + case "Clear": + dgWs.sendClear(SpeakV1Clear.builder() + .type(com.deepgram.resources.speak.v1.types.SpeakV1ClearType.CLEAR).build()); + break; + case "Close": + dgWs.sendClose(SpeakV1Close.builder() + .type(com.deepgram.resources.speak.v1.types.SpeakV1CloseType.CLOSE).build()); + break; + default: + // Default: try sending as Speak text + String defaultText = node.has("text") ? node.get("text").asText() : message; + dgWs.sendText(SpeakV1Text.builder().text(defaultText).build()); + break; + } } catch (Exception e) { - System.err.println("Failed to send error to client: " + e.getMessage()); + System.err.println("Error forwarding message to Deepgram: " + e.getMessage()); } } - } + }); - @OnWebSocketClose - public void onClose(int statusCode, String reason) { - System.out.println("Deepgram connection closed: " + statusCode + " " + (reason != null ? reason : "")); - deepgramSessions.remove(clientCtx); - if (clientCtx.session.isOpen()) { - int closeCode = getSafeCloseCode(statusCode); - clientCtx.closeSession(closeCode, reason != null ? reason : ""); + // Forward binary messages from client to Deepgram (if needed) + ws.onBinaryMessage(ctx -> { + // Binary messages from client are not typical for TTS but forward if present + V1WebSocketClient dgWs = activeConnections.get(ctx); + if (dgWs != null) { + byte[] data = ctx.data(); + int offset = ctx.offset(); + int length = ctx.length(); + byte[] bytes = new byte[length]; + System.arraycopy(data, offset, bytes, 0, length); + dgWs.sendText(SpeakV1Text.builder().text(new String(bytes)).build()); } - } + }); - /** - * Returns a safe WebSocket close code, avoiding reserved codes. - */ - private int getSafeCloseCode(int code) { - int[] reserved = {1004, 1005, 1006, 1015}; - if (code >= 1000 && code <= 4999) { - for (int r : reserved) { - if (code == r) return 1000; - } - return code; + // Handle client disconnect - clean up Deepgram connection + ws.onClose(ctx -> { + System.out.println("Client disconnected: " + ctx.status() + " " + ctx.reason()); + V1WebSocketClient dgWs = activeConnections.remove(ctx); + if (dgWs != null) { + try { + dgWs.disconnect(); + } catch (Exception ignored) {} } - return 1000; - } + }); + + // Handle client errors + ws.onError(ctx -> { + System.err.println("Client WebSocket error: " + (ctx.error() != null ? ctx.error().getMessage() : "unknown")); + V1WebSocketClient dgWs = activeConnections.remove(ctx); + if (dgWs != null) { + try { + dgWs.disconnect(); + } catch (Exception ignored) {} + } + }); } // ======================================================================== - // MAIN - Server setup and startup + // SECTION 8: MAIN - Server setup and startup // ======================================================================== + /** + * Application entry point. Loads configuration, validates the API key, + * and starts the Javalin HTTP server with WebSocket support. + * + * @param args Command-line arguments (unused) + */ public static void main(String[] args) throws Exception { - // Start the shared WebSocket client for outbound Deepgram connections - wsClient.start(); Javalin app = Javalin.create(config -> { - // Configure Jetty server for WebSocket upgrade handling - config.jetty.modifyServer(server -> { - // Server-level configuration if needed + config.bundledPlugins.enableCors(cors -> { + cors.addRule(rule -> { + rule.anyHost(); + }); }); }); @@ -323,140 +438,7 @@ public static void main(String[] args) throws Exception { // WEBSOCKET PROXY - /api/live-text-to-speech // ==================================================================== - app.ws("/api/live-text-to-speech", ws -> { - - ws.onConnect(ctx -> { - // Validate JWT from subprotocol header - String protocols = ctx.header("Sec-WebSocket-Protocol"); - String validProto = validateWsToken(protocols); - if (validProto == null) { - System.out.println("WebSocket auth failed: invalid or missing token"); - ctx.closeSession(4401, "Unauthorized"); - return; - } - - System.out.println("Client connected to /api/live-text-to-speech"); - activeConnections.add(ctx); - - // Parse query parameters from the WebSocket URL - String model = ctx.queryParam("model") != null ? ctx.queryParam("model") : "aura-asteria-en"; - String encoding = ctx.queryParam("encoding") != null ? ctx.queryParam("encoding") : "linear16"; - String sampleRate = ctx.queryParam("sample_rate") != null ? ctx.queryParam("sample_rate") : "48000"; - String container = ctx.queryParam("container") != null ? ctx.queryParam("container") : "none"; - - // Build Deepgram WebSocket URL with query parameters - String deepgramUrl = DEEPGRAM_TTS_URL - + "?model=" + model - + "&encoding=" + encoding - + "&sample_rate=" + sampleRate - + "&container=" + container; - - System.out.println("Connecting to Deepgram TTS: model=" + model - + ", encoding=" + encoding + ", sample_rate=" + sampleRate); - - try { - // Create outbound WebSocket connection to Deepgram - DeepgramSocket deepgramSocket = new DeepgramSocket(ctx); - ClientUpgradeRequest request = new ClientUpgradeRequest(); - request.setHeader("Authorization", "Token " + DEEPGRAM_API_KEY); - - wsClient.connect(deepgramSocket, new URI(deepgramUrl), request); - } catch (Exception e) { - System.err.println("Error connecting to Deepgram: " + e.getMessage()); - if (ctx.session.isOpen()) { - String errorJson = JSON_MAPPER.writeValueAsString(Map.of( - "type", "Error", - "description", "Failed to establish proxy connection", - "code", "CONNECTION_FAILED" - )); - ctx.send(errorJson); - ctx.closeSession(1011, "Failed to connect to Deepgram"); - } - } - }); - - ws.onMessage(ctx -> { - // Forward text messages from client to Deepgram - Session deepgramSession = deepgramSessions.get(ctx); - if (deepgramSession != null && deepgramSession.isOpen()) { - deepgramSession.getRemote().sendString(ctx.message()); - } - }); - - ws.onBinaryMessage(ctx -> { - // Forward binary messages from client to Deepgram - Session deepgramSession = deepgramSessions.get(ctx); - if (deepgramSession != null && deepgramSession.isOpen()) { - byte[] data = ctx.data(); - int offset = ctx.offset(); - int length = ctx.length(); - byte[] bytes = new byte[length]; - System.arraycopy(data, offset, bytes, 0, length); - deepgramSession.getRemote().sendBytes(ByteBuffer.wrap(bytes)); - } - }); - - ws.onClose(ctx -> { - System.out.println("Client disconnected: " + ctx.status() + " " + ctx.reason()); - activeConnections.remove(ctx); - - // Close the upstream Deepgram connection - Session deepgramSession = deepgramSessions.remove(ctx); - if (deepgramSession != null && deepgramSession.isOpen()) { - deepgramSession.close(StatusCode.NORMAL, "Client disconnected"); - } - }); - - ws.onError(ctx -> { - System.err.println("Client WebSocket error: " + (ctx.error() != null ? ctx.error().getMessage() : "unknown")); - - // Close the upstream Deepgram connection on client error - Session deepgramSession = deepgramSessions.remove(ctx); - if (deepgramSession != null && deepgramSession.isOpen()) { - deepgramSession.close(StatusCode.NORMAL, "Client error"); - } - }); - }); - - // ==================================================================== - // GRACEFUL SHUTDOWN - // ==================================================================== - - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - System.out.println("\nShutdown signal received: starting graceful shutdown..."); - - // Close all active client WebSocket connections - System.out.println("Closing " + activeConnections.size() + " active WebSocket connection(s)..."); - for (WsContext wsCtx : activeConnections) { - try { - if (wsCtx.session.isOpen()) { - wsCtx.closeSession(1001, "Server shutting down"); - } - } catch (Exception e) { - System.err.println("Error closing WebSocket: " + e.getMessage()); - } - } - - // Close all upstream Deepgram connections - for (Session session : deepgramSessions.values()) { - try { - if (session.isOpen()) { - session.close(StatusCode.NORMAL, "Server shutting down"); - } - } catch (Exception e) { - System.err.println("Error closing Deepgram session: " + e.getMessage()); - } - } - - // Stop the WebSocket client - try { - wsClient.stop(); - } catch (Exception e) { - System.err.println("Error stopping WebSocket client: " + e.getMessage()); - } - - System.out.println("Shutdown complete"); - })); + app.ws("/api/live-text-to-speech", App::handleLiveTextToSpeech); // ==================================================================== // START SERVER @@ -466,8 +448,7 @@ public static void main(String[] args) throws Exception { System.out.println(); System.out.println("=".repeat(70)); - System.out.println("Backend API Server running at http://localhost:" + PORT); - System.out.println(); + System.out.println(" Backend API running at http://localhost:" + PORT); System.out.println(" GET /api/session"); System.out.println(" WS /api/live-text-to-speech (auth required)"); System.out.println(" GET /api/metadata");