diff --git a/app/(dashboard)/stream/page.tsx b/app/(dashboard)/stream/page.tsx index 7a07c4d..b3c4713 100644 --- a/app/(dashboard)/stream/page.tsx +++ b/app/(dashboard)/stream/page.tsx @@ -169,8 +169,11 @@ export default function StreamPage() { return; } + console.log("🚀 Starting streaming process..."); state.setStreaming(true); const settings = streamManager.getRecorderSettings(); + console.log("📹 MediaRecorder settings:", settings); + const protocol = window.location.protocol.replace("http", "ws"); const wsUrl = new URL(`${protocol}//${window.location.host}/rtmp`); wsUrl.searchParams.set("format", settings.format); @@ -178,7 +181,7 @@ export default function StreamPage() { wsUrl.searchParams.set("audio", settings.audio); wsUrl.searchParams.set("key", state.streamKey); - console.log("Connecting to streaming server with settings:", settings); + console.log("🔌 Connecting to streaming server:", wsUrl.toString().replace(state.streamKey, "***")); wsRef.current = new WebSocket(wsUrl.toString()); wsRef.current.addEventListener("open", () => { @@ -208,49 +211,88 @@ export default function StreamPage() { mediaRecorderRef.current.addEventListener("dataavailable", (e) => { if (e.data && e.data.size > 0) { if (wsRef.current && wsRef.current.readyState === WebSocket.OPEN) { - wsRef.current.send(e.data); - - bytesSentRef.current += e.data.size; - const now = Date.now(); - const timeDiff = (now - lastUpdateTimeRef.current) / 1000; - - if (timeDiff >= 1) { - const rate = bytesSentRef.current / timeDiff / 1024; - const roundedRate = Math.round(rate); - state.setDataRate(roundedRate); - - const currentTime = new Date().toLocaleTimeString("en-US", { - hour12: false, - minute: "2-digit", - second: "2-digit", - }); - state.setDataHistory((prev) => { - const newData = [ - ...prev, - { time: currentTime, rate: roundedRate }, - ]; - return newData.slice(-30); - }); - - bytesSentRef.current = 0; - lastUpdateTimeRef.current = now; + try { + wsRef.current.send(e.data); + + bytesSentRef.current += e.data.size; + const now = Date.now(); + const timeDiff = (now - lastUpdateTimeRef.current) / 1000; + + if (timeDiff >= 1) { + const rate = bytesSentRef.current / timeDiff / 1024; + const roundedRate = Math.round(rate); + state.setDataRate(roundedRate); + + const currentTime = new Date().toLocaleTimeString("en-US", { + hour12: false, + minute: "2-digit", + second: "2-digit", + }); + state.setDataHistory((prev) => { + const newData = [ + ...prev, + { time: currentTime, rate: roundedRate }, + ]; + return newData.slice(-30); + }); + + bytesSentRef.current = 0; + lastUpdateTimeRef.current = now; + } + } catch (error) { + console.error("Error sending data to WebSocket:", error); + // Stop streaming if we can't send data + if (mediaRecorderRef.current?.state === "recording") { + mediaRecorderRef.current.stop(); + } } } else { - console.warn("WebSocket not ready, dropping chunk"); + console.warn("WebSocket not ready, dropping chunk", { + exists: !!wsRef.current, + readyState: wsRef.current?.readyState, + }); } + } else { + console.warn("Received empty data chunk from MediaRecorder"); } }); mediaRecorderRef.current.addEventListener("stop", () => { - console.log("MediaRecorder stopped"); + console.log("MediaRecorder stopped", { + state: mediaRecorderRef.current?.state, + wsState: wsRef.current?.readyState, + streaming: state.streaming, + }); stopStreaming(); - if (wsRef.current) { + if (wsRef.current && wsRef.current.readyState === WebSocket.OPEN) { wsRef.current.close(); } }); mediaRecorderRef.current.addEventListener("error", (e) => { console.error("MediaRecorder error:", e); + console.error("MediaRecorder error details:", { + error: e, + state: mediaRecorderRef.current?.state, + mimeType: mediaRecorderRef.current?.mimeType, + }); + }); + + mediaRecorderRef.current.addEventListener("start", () => { + console.log("MediaRecorder started event", { + state: mediaRecorderRef.current?.state, + mimeType: mediaRecorderRef.current?.mimeType, + videoBitsPerSecond: mediaRecorderRef.current?.videoBitsPerSecond, + audioBitsPerSecond: mediaRecorderRef.current?.audioBitsPerSecond, + }); + }); + + mediaRecorderRef.current.addEventListener("pause", () => { + console.log("MediaRecorder paused"); + }); + + mediaRecorderRef.current.addEventListener("resume", () => { + console.log("MediaRecorder resumed"); }); console.log("✓ Starting MediaRecorder in 100ms..."); @@ -259,14 +301,20 @@ export default function StreamPage() { mediaRecorderRef.current && mediaRecorderRef.current.state === "inactive" ) { - mediaRecorderRef.current.start(100); + // Request data chunks every 250ms instead of 100ms + // This reduces overhead and improves stability while maintaining low latency + mediaRecorderRef.current.start(250); console.log("✓ MediaRecorder started!"); } }, 100); }); - wsRef.current.addEventListener("close", () => { - console.log("WebSocket disconnected"); + wsRef.current.addEventListener("close", (event) => { + console.log("WebSocket disconnected", { + code: event.code, + reason: event.reason, + wasClean: event.wasClean, + }); state.setConnected(false); stopStreaming(); }); @@ -285,10 +333,20 @@ export default function StreamPage() { // Cleanup on unmount useEffect(() => { return () => { + console.log("StreamPage unmounting - cleaning up"); if (requestAnimationRef.current) { cancelAnimationFrame(requestAnimationRef.current); } + if (mediaRecorderRef.current && mediaRecorderRef.current.state === "recording") { + console.log("Stopping MediaRecorder on unmount"); + mediaRecorderRef.current.stop(); + } + if (wsRef.current && wsRef.current.readyState === WebSocket.OPEN) { + console.log("Closing WebSocket on unmount"); + wsRef.current.close(); + } if (inputStreamRef.current) { + console.log("Stopping camera tracks on unmount"); inputStreamRef.current.getTracks().forEach((track) => track.stop()); } }; diff --git a/components/ai-elements/report.tsx b/components/ai-elements/report.tsx index ed8287a..0dfd63a 100644 --- a/components/ai-elements/report.tsx +++ b/components/ai-elements/report.tsx @@ -16,7 +16,7 @@ export function Report({ data }: { data: ReportData }) { }; return ( -
+
diff --git a/hooks/use-stream-manager.ts b/hooks/use-stream-manager.ts index 0da2461..13f68c9 100644 --- a/hooks/use-stream-manager.ts +++ b/hooks/use-stream-manager.ts @@ -11,16 +11,27 @@ const getRecorderSettings = () => { video: "", audio: "", }; - if (MediaRecorder.isTypeSupported("video/mp4")) { + + // Prefer WebM for streaming - it handles continuous streams better than MP4 + // MP4 chunks have complete container metadata that can confuse streaming parsers + if (MediaRecorder.isTypeSupported("video/webm;codecs=h264,opus")) { + settings.format = "webm"; + settings.video = "h264"; + settings.audio = "opus"; + } else if (MediaRecorder.isTypeSupported("video/webm;codecs=vp8,opus")) { + settings.format = "webm"; + settings.video = "vp8"; + settings.audio = "opus"; + } else if (MediaRecorder.isTypeSupported("video/mp4")) { + // Fallback to MP4 if WebM not supported settings.format = "mp4"; settings.video = "h264"; settings.audio = "aac"; } else { + // Last resort settings.format = "webm"; settings.audio = "opus"; - settings.video = MediaRecorder.isTypeSupported("video/webm;codecs=h264") - ? "h264" - : "vp8"; + settings.video = "vp8"; } return settings; }; diff --git a/server.js b/server.js index 4ef5095..ceefd6d 100644 --- a/server.js +++ b/server.js @@ -54,6 +54,9 @@ app.prepare().then(() => { // Track active connections let connectionCount = 0; const activeConnections = new Map(); + + // WebSocket send queue configuration + const WS_BUFFER_THRESHOLD = 1024 * 1024; // 1MB - pause if buffer exceeds this // Handle WebSocket upgrade requests server.on("upgrade", (request, socket, head) => { @@ -128,8 +131,12 @@ app.prepare().then(() => { inputFormat, // Re-timestamp and sync - input flags + // +genpts: Generate presentation timestamps + // +igndts: Ignore DTS (decode timestamps) - regenerate them + // +ignidx: Ignore index (for broken/streaming files) + // +discardcorrupt: Keep processing even with corrupt frames "-fflags", - "+genpts+igndts", + "+genpts+igndts+ignidx+discardcorrupt", "-avoid_negative_ts", "make_zero", @@ -148,14 +155,22 @@ app.prepare().then(() => { ...audioCodec, // RTMP specific flags for stable streaming + // Increased buffer size from 3000k to 6000k for better stability "-bufsize", - "3000k", + "6000k", "-maxrate", "3000k", "-g", "60", // keyframe every 2 seconds at 30fps "-sc_threshold", "0", + + // Additional buffering for input stability + "-probesize", + "10M", + "-analyzeduration", + "5M", + "-f", "flv", "-flvflags", @@ -165,7 +180,16 @@ app.prepare().then(() => { ]; console.log(`[Stream #${connectionId}] Starting FFmpeg...`); - const ffmpeg = child_process.spawn("ffmpeg", ffmpegArgs); + const ffmpeg = child_process.spawn("ffmpeg", ffmpegArgs, { + // Set high water mark for stdin pipe to buffer more data (default is 16KB) + // This helps prevent data loss during brief network hiccups + stdio: ["pipe", "pipe", "pipe"], + }); + + // Increase stdin buffer size to handle bursty data better + if (ffmpeg.stdin && ffmpeg.stdin.setDefaultEncoding) { + ffmpeg.stdin.setMaxListeners(0); + } let ffmpegReady = false; let bytesReceived = 0; @@ -316,7 +340,17 @@ app.prepare().then(() => { } try { - ffmpeg.stdin.write(msg); + // Check if ffmpeg stdin buffer is getting full + // If it is, we might want to apply backpressure + const canWrite = ffmpeg.stdin.write(msg); + + if (!canWrite) { + // Buffer is full, wait for drain event + // This implements backpressure to prevent memory issues + ffmpeg.stdin.once("drain", () => { + // Buffer has drained, ready for more data + }); + } } catch (err) { console.error( `[Stream #${connectionId}] Error writing to FFmpeg:`, diff --git a/worker/ffmpeg-transmux.ts b/worker/ffmpeg-transmux.ts index b4a67c8..e091573 100644 --- a/worker/ffmpeg-transmux.ts +++ b/worker/ffmpeg-transmux.ts @@ -24,13 +24,19 @@ export async function fetchAndTransmuxSegment(hlsUrl: string): Promise { "-protocol_whitelist", "file,http,https,tcp,tls", - // Increase HLS timeout and retries + // Increase HLS timeout and retries for better stability "-reconnect", "1", "-reconnect_streamed", "1", "-reconnect_delay_max", "5", + "-reconnect_at_eof", + "1", + "-multiple_requests", + "1", + "-timeout", + "10000000", // 10 second timeout in microseconds // HLS specific options "-live_start_index",