Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 91 additions & 33 deletions app/(dashboard)/stream/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -169,16 +169,19 @@ export default function StreamPage() {
return;
}

console.log("🚀 Starting streaming process...");
state.setStreaming(true);
const settings = streamManager.getRecorderSettings();
console.log("📹 MediaRecorder settings:", settings);

const protocol = window.location.protocol.replace("http", "ws");
const wsUrl = new URL(`${protocol}//${window.location.host}/rtmp`);
wsUrl.searchParams.set("format", settings.format);
wsUrl.searchParams.set("video", settings.video);
wsUrl.searchParams.set("audio", settings.audio);
wsUrl.searchParams.set("key", state.streamKey);

console.log("Connecting to streaming server with settings:", settings);
console.log("🔌 Connecting to streaming server:", wsUrl.toString().replace(state.streamKey, "***"));
wsRef.current = new WebSocket(wsUrl.toString());

wsRef.current.addEventListener("open", () => {
Expand Down Expand Up @@ -208,49 +211,88 @@ export default function StreamPage() {
mediaRecorderRef.current.addEventListener("dataavailable", (e) => {
if (e.data && e.data.size > 0) {
if (wsRef.current && wsRef.current.readyState === WebSocket.OPEN) {
wsRef.current.send(e.data);

bytesSentRef.current += e.data.size;
const now = Date.now();
const timeDiff = (now - lastUpdateTimeRef.current) / 1000;

if (timeDiff >= 1) {
const rate = bytesSentRef.current / timeDiff / 1024;
const roundedRate = Math.round(rate);
state.setDataRate(roundedRate);

const currentTime = new Date().toLocaleTimeString("en-US", {
hour12: false,
minute: "2-digit",
second: "2-digit",
});
state.setDataHistory((prev) => {
const newData = [
...prev,
{ time: currentTime, rate: roundedRate },
];
return newData.slice(-30);
});

bytesSentRef.current = 0;
lastUpdateTimeRef.current = now;
try {
wsRef.current.send(e.data);

bytesSentRef.current += e.data.size;
const now = Date.now();
const timeDiff = (now - lastUpdateTimeRef.current) / 1000;

if (timeDiff >= 1) {
const rate = bytesSentRef.current / timeDiff / 1024;
const roundedRate = Math.round(rate);
state.setDataRate(roundedRate);

const currentTime = new Date().toLocaleTimeString("en-US", {
hour12: false,
minute: "2-digit",
second: "2-digit",
});
state.setDataHistory((prev) => {
const newData = [
...prev,
{ time: currentTime, rate: roundedRate },
];
return newData.slice(-30);
});

bytesSentRef.current = 0;
lastUpdateTimeRef.current = now;
}
} catch (error) {
console.error("Error sending data to WebSocket:", error);
// Stop streaming if we can't send data
if (mediaRecorderRef.current?.state === "recording") {
mediaRecorderRef.current.stop();
}
}
} else {
console.warn("WebSocket not ready, dropping chunk");
console.warn("WebSocket not ready, dropping chunk", {
exists: !!wsRef.current,
readyState: wsRef.current?.readyState,
});
}
} else {
console.warn("Received empty data chunk from MediaRecorder");
}
});

mediaRecorderRef.current.addEventListener("stop", () => {
console.log("MediaRecorder stopped");
console.log("MediaRecorder stopped", {
state: mediaRecorderRef.current?.state,
wsState: wsRef.current?.readyState,
streaming: state.streaming,
});
stopStreaming();
if (wsRef.current) {
if (wsRef.current && wsRef.current.readyState === WebSocket.OPEN) {
wsRef.current.close();
}
});

mediaRecorderRef.current.addEventListener("error", (e) => {
console.error("MediaRecorder error:", e);
console.error("MediaRecorder error details:", {
error: e,
state: mediaRecorderRef.current?.state,
mimeType: mediaRecorderRef.current?.mimeType,
});
});

mediaRecorderRef.current.addEventListener("start", () => {
console.log("MediaRecorder started event", {
state: mediaRecorderRef.current?.state,
mimeType: mediaRecorderRef.current?.mimeType,
videoBitsPerSecond: mediaRecorderRef.current?.videoBitsPerSecond,
audioBitsPerSecond: mediaRecorderRef.current?.audioBitsPerSecond,
});
});

mediaRecorderRef.current.addEventListener("pause", () => {
console.log("MediaRecorder paused");
});

mediaRecorderRef.current.addEventListener("resume", () => {
console.log("MediaRecorder resumed");
});

console.log("✓ Starting MediaRecorder in 100ms...");
Expand All @@ -259,14 +301,20 @@ export default function StreamPage() {
mediaRecorderRef.current &&
mediaRecorderRef.current.state === "inactive"
) {
mediaRecorderRef.current.start(100);
// Request data chunks every 250ms instead of 100ms
// This reduces overhead and improves stability while maintaining low latency
mediaRecorderRef.current.start(250);
console.log("✓ MediaRecorder started!");
}
}, 100);
});

wsRef.current.addEventListener("close", () => {
console.log("WebSocket disconnected");
wsRef.current.addEventListener("close", (event) => {
console.log("WebSocket disconnected", {
code: event.code,
reason: event.reason,
wasClean: event.wasClean,
});
state.setConnected(false);
stopStreaming();
});
Expand All @@ -285,10 +333,20 @@ export default function StreamPage() {
// Cleanup on unmount
useEffect(() => {
return () => {
console.log("StreamPage unmounting - cleaning up");
if (requestAnimationRef.current) {
cancelAnimationFrame(requestAnimationRef.current);
}
if (mediaRecorderRef.current && mediaRecorderRef.current.state === "recording") {
console.log("Stopping MediaRecorder on unmount");
mediaRecorderRef.current.stop();
}
if (wsRef.current && wsRef.current.readyState === WebSocket.OPEN) {
console.log("Closing WebSocket on unmount");
wsRef.current.close();
}
if (inputStreamRef.current) {
console.log("Stopping camera tracks on unmount");
inputStreamRef.current.getTracks().forEach((track) => track.stop());
}
};
Expand Down
2 changes: 1 addition & 1 deletion components/ai-elements/report.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export function Report({ data }: { data: ReportData }) {
};

return (
<div className="my-4 overflow-hidden rounded-lg border bg-card">
<div className="my-4 w-full max-w-md overflow-hidden rounded-lg border bg-card">
<div className="border-b bg-muted/50 p-3">
<div className="flex items-center gap-2">
<IconFileText className="h-5 w-5 text-muted-foreground" />
Expand Down
19 changes: 15 additions & 4 deletions hooks/use-stream-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,27 @@ const getRecorderSettings = () => {
video: "",
audio: "",
};
if (MediaRecorder.isTypeSupported("video/mp4")) {

// Prefer WebM for streaming - it handles continuous streams better than MP4
// MP4 chunks have complete container metadata that can confuse streaming parsers
if (MediaRecorder.isTypeSupported("video/webm;codecs=h264,opus")) {
settings.format = "webm";
settings.video = "h264";
settings.audio = "opus";
} else if (MediaRecorder.isTypeSupported("video/webm;codecs=vp8,opus")) {
settings.format = "webm";
settings.video = "vp8";
settings.audio = "opus";
} else if (MediaRecorder.isTypeSupported("video/mp4")) {
// Fallback to MP4 if WebM not supported
settings.format = "mp4";
settings.video = "h264";
settings.audio = "aac";
} else {
// Last resort
settings.format = "webm";
settings.audio = "opus";
settings.video = MediaRecorder.isTypeSupported("video/webm;codecs=h264")
? "h264"
: "vp8";
settings.video = "vp8";
}
return settings;
};
Expand Down
42 changes: 38 additions & 4 deletions server.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ app.prepare().then(() => {
// Track active connections
let connectionCount = 0;
const activeConnections = new Map();

// WebSocket send queue configuration
const WS_BUFFER_THRESHOLD = 1024 * 1024; // 1MB - pause if buffer exceeds this

// Handle WebSocket upgrade requests
server.on("upgrade", (request, socket, head) => {
Expand Down Expand Up @@ -128,8 +131,12 @@ app.prepare().then(() => {
inputFormat,

// Re-timestamp and sync - input flags
// +genpts: Generate presentation timestamps
// +igndts: Ignore DTS (decode timestamps) - regenerate them
// +ignidx: Ignore index (for broken/streaming files)
// +discardcorrupt: Keep processing even with corrupt frames
"-fflags",
"+genpts+igndts",
"+genpts+igndts+ignidx+discardcorrupt",
"-avoid_negative_ts",
"make_zero",

Expand All @@ -148,14 +155,22 @@ app.prepare().then(() => {
...audioCodec,

// RTMP specific flags for stable streaming
// Increased buffer size from 3000k to 6000k for better stability
"-bufsize",
"3000k",
"6000k",
"-maxrate",
"3000k",
"-g",
"60", // keyframe every 2 seconds at 30fps
"-sc_threshold",
"0",

// Additional buffering for input stability
"-probesize",
"10M",
"-analyzeduration",
"5M",

"-f",
"flv",
"-flvflags",
Expand All @@ -165,7 +180,16 @@ app.prepare().then(() => {
];

console.log(`[Stream #${connectionId}] Starting FFmpeg...`);
const ffmpeg = child_process.spawn("ffmpeg", ffmpegArgs);
const ffmpeg = child_process.spawn("ffmpeg", ffmpegArgs, {
// Set high water mark for stdin pipe to buffer more data (default is 16KB)
// This helps prevent data loss during brief network hiccups
stdio: ["pipe", "pipe", "pipe"],
});

// Increase stdin buffer size to handle bursty data better
if (ffmpeg.stdin && ffmpeg.stdin.setDefaultEncoding) {
ffmpeg.stdin.setMaxListeners(0);
}

let ffmpegReady = false;
let bytesReceived = 0;
Expand Down Expand Up @@ -316,7 +340,17 @@ app.prepare().then(() => {
}

try {
ffmpeg.stdin.write(msg);
// Check if ffmpeg stdin buffer is getting full
// If it is, we might want to apply backpressure
const canWrite = ffmpeg.stdin.write(msg);

if (!canWrite) {
// Buffer is full, wait for drain event
// This implements backpressure to prevent memory issues
ffmpeg.stdin.once("drain", () => {
// Buffer has drained, ready for more data
});
}
} catch (err) {
console.error(
`[Stream #${connectionId}] Error writing to FFmpeg:`,
Expand Down
8 changes: 7 additions & 1 deletion worker/ffmpeg-transmux.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,19 @@ export async function fetchAndTransmuxSegment(hlsUrl: string): Promise<Buffer> {
"-protocol_whitelist",
"file,http,https,tcp,tls",

// Increase HLS timeout and retries
// Increase HLS timeout and retries for better stability
"-reconnect",
"1",
"-reconnect_streamed",
"1",
"-reconnect_delay_max",
"5",
"-reconnect_at_eof",
"1",
"-multiple_requests",
"1",
"-timeout",
"10000000", // 10 second timeout in microseconds

// HLS specific options
"-live_start_index",
Expand Down