diff --git a/tools/local-streamer/go.mod b/tools/local-streamer/go.mod new file mode 100644 index 000000000..4363e8fc3 --- /dev/null +++ b/tools/local-streamer/go.mod @@ -0,0 +1,5 @@ +module github.com/livepeer/ai-runner/tools/local-streamer + +go 1.25.0 + +require github.com/livepeer/go-livepeer v0.8.9-0.20251225002406-8a37cb268722 diff --git a/tools/local-streamer/go.sum b/tools/local-streamer/go.sum new file mode 100644 index 000000000..108689c0e --- /dev/null +++ b/tools/local-streamer/go.sum @@ -0,0 +1,10 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/livepeer/go-livepeer v0.8.9-0.20251225002406-8a37cb268722 h1:nySq7gNP2Z8VdoZJ7nkNKKP2rOotAjliPTVjSH5T8VY= +github.com/livepeer/go-livepeer v0.8.9-0.20251225002406-8a37cb268722/go.mod h1:WEpT6YDvXwwH/Wdhqjria+2OSMqV+24TKGuhemjw5DY= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/tools/local-streamer/local-streamer b/tools/local-streamer/local-streamer new file mode 100755 index 000000000..94d1294d2 Binary files /dev/null and b/tools/local-streamer/local-streamer differ diff --git a/tools/local-streamer/main.go b/tools/local-streamer/main.go new file mode 100644 index 000000000..ba63a7b47 --- /dev/null +++ b/tools/local-streamer/main.go @@ -0,0 +1,278 @@ +package main + +import ( + "bytes" + "context" + "encoding/json" + "flag" + "fmt" + "io" + "log/slog" + "net/http" + "os" + "os/signal" + "sync" + "syscall" + "time" +) + +// Config holds the CLI configuration +type Config struct { + InputFile string + OutputFile string + RunnerURL string + Port int + Params string + SegmentDur float64 + Realtime bool + NoTranscode bool +} + +// StartStreamRequest is the request body for starting a stream +type StartStreamRequest struct { + SubscribeURL string `json:"subscribe_url"` + PublishURL string `json:"publish_url"` + ControlURL string `json:"control_url"` + EventsURL string `json:"events_url"` + Params map[string]interface{} `json:"params"` +} + +func main() { + config := parseFlags() + + // Set up logging + slog.SetDefault(slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{ + Level: slog.LevelInfo, + }))) + + slog.Info("Starting local streamer", + "input", config.InputFile, + "output", config.OutputFile, + "runner", config.RunnerURL, + "port", config.Port, + ) + + // Set up context with cancellation + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Handle shutdown signals + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + go func() { + sig := <-sigCh + slog.Info("Received signal, shutting down", "signal", sig) + cancel() + }() + + // Run the streamer + if err := run(ctx, config); err != nil { + slog.Error("Streamer failed", "err", err) + os.Exit(1) + } + + slog.Info("Local streamer finished") +} + +func parseFlags() Config { + config := Config{} + + flag.StringVar(&config.InputFile, "input", "", "Input video file path (required)") + flag.StringVar(&config.OutputFile, "output", "output.ts", "Output file path") + flag.StringVar(&config.RunnerURL, "runner-url", "http://localhost:8000", "AI Runner URL") + flag.IntVar(&config.Port, "port", 9935, "Trickle server port") + flag.StringVar(&config.Params, "params", "{}", "Pipeline parameters as JSON") + flag.Float64Var(&config.SegmentDur, "segment-dur", 0.5, "Segment duration in seconds") + flag.BoolVar(&config.Realtime, "realtime", true, "Stream at realtime speed (use -realtime=false to stream as fast as possible)") + flag.BoolVar(&config.NoTranscode, "no-transcode", false, "Skip transcoding, just copy streams (input must be MPEG-TS compatible)") + + flag.Parse() + + if config.InputFile == "" { + fmt.Fprintln(os.Stderr, "Error: --input is required") + flag.Usage() + os.Exit(1) + } + + // Check if input file exists + if _, err := os.Stat(config.InputFile); os.IsNotExist(err) { + fmt.Fprintf(os.Stderr, "Error: input file does not exist: %s\n", config.InputFile) + os.Exit(1) + } + + return config +} + +func run(parentCtx context.Context, config Config) error { + ctx, cancel := context.WithCancel(parentCtx) + defer cancel() + + // 1. Start Trickle server + server := NewTrickleServer(config.Port) + if err := server.Start(); err != nil { + return fmt.Errorf("failed to start trickle server: %w", err) + } + defer server.Stop() + + // Give the server a moment to start + time.Sleep(100 * time.Millisecond) + + // 2. Parse pipeline params + var params map[string]interface{} + if err := json.Unmarshal([]byte(config.Params), ¶ms); err != nil { + return fmt.Errorf("failed to parse params JSON: %w", err) + } + + inputURL := server.GetInputURL() + outputURL := server.GetOutputURL() + controlURL := server.GetControlURL() + eventsURL := server.GetEventsURL() + + // 3. Pre-create the input channel before calling runner API + // This is critical - the runner will try to subscribe immediately + slog.Info("Pre-creating trickle channels") + if err := createTrickleChannel(inputURL); err != nil { + slog.Warn("Failed to pre-create input channel", "err", err) + } + if err := createTrickleChannel(controlURL); err != nil { + slog.Warn("Failed to pre-create control channel", "err", err) + } + + // 4. Start output handler FIRST (subscribes to processed output) + // Start before the runner so we're ready to receive output + outputHandler := NewOutputHandler(outputURL, config.OutputFile) + var wg sync.WaitGroup + errCh := make(chan error, 2) + + wg.Add(1) + go func() { + defer wg.Done() + if err := outputHandler.Start(ctx); err != nil && ctx.Err() == nil { + errCh <- fmt.Errorf("output handler error: %w", err) + } + }() + + // 5. Start segmenter in a goroutine (publishes input segments) + // Start before calling runner API so input is available when runner connects + segmenter := NewSegmenter(config.InputFile, inputURL, config.SegmentDur, config.Realtime) + segmenterDone := make(chan struct{}) + + wg.Add(1) + go func() { + defer wg.Done() + defer close(segmenterDone) + if err := segmenter.Start(ctx); err != nil && ctx.Err() == nil { + errCh <- fmt.Errorf("segmenter error: %w", err) + } + }() + + // Give segmenter time to start publishing first segment + time.Sleep(500 * time.Millisecond) + + // 6. Call AI Runner API to start the stream + slog.Info("Starting stream on AI Runner", + "inputURL", inputURL, + "outputURL", outputURL, + "runnerURL", config.RunnerURL, + ) + + if err := startStream(ctx, config.RunnerURL, inputURL, outputURL, controlURL, eventsURL, params); err != nil { + return fmt.Errorf("failed to start stream on runner: %w", err) + } + + // Wait for segmenter to finish, then give output handler time to receive remaining output + go func() { + <-segmenterDone + slog.Info("Input finished, waiting for output to complete...") + // Wait up to 30 seconds for output after input is done + select { + case <-time.After(30 * time.Second): + slog.Info("Timeout waiting for output, stopping") + cancel() + case <-ctx.Done(): + } + }() + + // Wait for completion or error + doneCh := make(chan struct{}) + go func() { + wg.Wait() + close(doneCh) + }() + + select { + case err := <-errCh: + return err + case <-doneCh: + return nil + case <-ctx.Done(): + // Wait a bit for goroutines to clean up + time.Sleep(500 * time.Millisecond) + return nil + } +} + +// createTrickleChannel creates a channel on the trickle server +func createTrickleChannel(url string) error { + req, err := http.NewRequest("POST", url, nil) + if err != nil { + return err + } + req.Header.Set("Expect-Content", "video/MP2T") + + client := &http.Client{Timeout: 5 * time.Second} + resp, err := client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + // 200 or 404 (already exists) are both fine + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNotFound { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("unexpected status %d: %s", resp.StatusCode, string(body)) + } + + slog.Info("Created trickle channel", "url", url) + return nil +} + +func startStream(ctx context.Context, runnerURL, subscribeURL, publishURL, controlURL, eventsURL string, params map[string]interface{}) error { + reqBody := StartStreamRequest{ + SubscribeURL: subscribeURL, + PublishURL: publishURL, + ControlURL: controlURL, + EventsURL: eventsURL, + Params: params, + } + + jsonBody, err := json.Marshal(reqBody) + if err != nil { + return fmt.Errorf("failed to marshal request: %w", err) + } + + url := fmt.Sprintf("%s/live-video-to-video", runnerURL) + slog.Info("Calling AI Runner API", "url", url, "body", string(jsonBody)) + + req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(jsonBody)) + if err != nil { + return fmt.Errorf("failed to create request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + + client := &http.Client{Timeout: 30 * time.Second} + resp, err := client.Do(req) + if err != nil { + return fmt.Errorf("request failed: %w", err) + } + defer resp.Body.Close() + + body, _ := io.ReadAll(resp.Body) + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("runner returned status %d: %s", resp.StatusCode, string(body)) + } + + slog.Info("Stream started successfully", "response", string(body)) + return nil +} diff --git a/tools/local-streamer/output.go b/tools/local-streamer/output.go new file mode 100644 index 000000000..729aa16f1 --- /dev/null +++ b/tools/local-streamer/output.go @@ -0,0 +1,121 @@ +package main + +import ( + "context" + "errors" + "fmt" + "io" + "log/slog" + "os" + "time" + + "github.com/livepeer/go-livepeer/trickle" +) + +// OutputHandler handles subscribing to trickle output and writing to file +type OutputHandler struct { + subscribeURL string + outputFile string +} + +// NewOutputHandler creates a new output handler +func NewOutputHandler(subscribeURL, outputFile string) *OutputHandler { + return &OutputHandler{ + subscribeURL: subscribeURL, + outputFile: outputFile, + } +} + +// Start begins subscribing and writing output +func (o *OutputHandler) Start(ctx context.Context) error { + slog.Info("Starting output handler", "subscribeURL", o.subscribeURL, "outputFile", o.outputFile) + + // Create the output file + file, err := os.Create(o.outputFile) + if err != nil { + return fmt.Errorf("failed to create output file: %w", err) + } + defer file.Close() + + // Create trickle subscriber + subscriber, err := trickle.NewTrickleSubscriber(trickle.TrickleSubscriberConfig{ + URL: o.subscribeURL, + Ctx: ctx, + }) + if err != nil { + return fmt.Errorf("failed to create trickle subscriber: %w", err) + } + + var totalBytes int64 + segmentCount := 0 + retries := 0 + const maxRetries = 60 // Wait up to 30 seconds for first segment + const retryPause = 500 * time.Millisecond + + for { + select { + case <-ctx.Done(): + slog.Info("Output handler cancelled", "totalBytes", totalBytes, "segments", segmentCount) + return nil + default: + } + + // Read from trickle + segment, err := subscriber.Read() + if err != nil { + if errors.Is(err, trickle.EOS) { + slog.Info("End of stream reached", "totalBytes", totalBytes, "segments", segmentCount) + return nil + } + if errors.Is(err, trickle.StreamNotFoundErr) { + // Stream might not exist yet, retry + if retries < maxRetries { + retries++ + slog.Debug("Stream not found, retrying", "retry", retries) + time.Sleep(retryPause) + continue + } + return fmt.Errorf("stream not found after %d retries", maxRetries) + } + + // Handle sequence nonexistent error + var seqErr *trickle.SequenceNonexistent + if errors.As(err, &seqErr) { + // Skip to leading edge + slog.Info("Sequence not found, skipping to latest", "requested", seqErr.Seq, "latest", seqErr.Latest) + subscriber.SetSeq(seqErr.Latest) + continue + } + + // Other errors - retry with backoff + if retries < maxRetries { + retries++ + slog.Warn("Error reading segment, retrying", "err", err, "retry", retries) + time.Sleep(retryPause) + continue + } + return fmt.Errorf("failed to read segment after %d retries: %w", maxRetries, err) + } + + // Reset retry counter on successful read + retries = 0 + + // Get segment sequence number + seq := trickle.GetSeq(segment) + + // Copy segment data to file + startTime := time.Now() + n, err := io.Copy(file, segment.Body) + segment.Body.Close() + + if err != nil { + slog.Error("Error writing segment to file", "seq", seq, "err", err) + continue + } + + totalBytes += n + segmentCount++ + slog.Info("Wrote segment to file", "seq", seq, "bytes", n, "totalBytes", totalBytes, "took", time.Since(startTime)) + } +} + diff --git a/tools/local-streamer/segmenter.go b/tools/local-streamer/segmenter.go new file mode 100644 index 000000000..5fccf2025 --- /dev/null +++ b/tools/local-streamer/segmenter.go @@ -0,0 +1,235 @@ +package main + +import ( + "bufio" + "context" + "fmt" + "io" + "log/slog" + "os" + "os/exec" + "path/filepath" + "sync" + "syscall" + "time" + + "github.com/livepeer/go-livepeer/trickle" +) + +// Segmenter handles FFmpeg segmentation and Trickle publishing +type Segmenter struct { + inputFile string + publishURL string + segmentDur float64 // segment duration in seconds + realtime bool // whether to stream in realtime (-re flag) +} + +// NewSegmenter creates a new segmenter +func NewSegmenter(inputFile, publishURL string, segmentDur float64, realtime bool) *Segmenter { + return &Segmenter{ + inputFile: inputFile, + publishURL: publishURL, + segmentDur: segmentDur, + realtime: realtime, + } +} + +// Start begins segmentation and publishing +func (s *Segmenter) Start(ctx context.Context) error { + slog.Info("Starting segmenter", "input", s.inputFile, "publishURL", s.publishURL, "segmentDur", s.segmentDur) + + // Create trickle publisher + publisher, err := trickle.NewTricklePublisher(s.publishURL) + if err != nil { + return fmt.Errorf("failed to create trickle publisher: %w", err) + } + + // Create a temporary directory for segment pipes + tmpDir, err := os.MkdirTemp("", "local-streamer-*") + if err != nil { + return fmt.Errorf("failed to create temp dir: %w", err) + } + defer os.RemoveAll(tmpDir) + + // Create the segment pattern for named pipes + segmentPattern := filepath.Join(tmpDir, "segment-%d.ts") + + // Create first named pipe + firstPipe := fmt.Sprintf(segmentPattern, 0) + if err := syscall.Mkfifo(firstPipe, 0666); err != nil && !os.IsExist(err) { + return fmt.Errorf("failed to create first pipe: %w", err) + } + + // Channel to signal completion + doneCh := make(chan struct{}) + var wg sync.WaitGroup + + // Start the segment processor in a goroutine + wg.Add(1) + go func() { + defer wg.Done() + s.processSegments(ctx, segmentPattern, publisher, doneCh) + }() + + // Build FFmpeg command + args := []string{} + if s.realtime { + args = append(args, "-re") // Read input at native frame rate + } + args = append(args, + "-i", s.inputFile, + "-c:v", "libx264", + "-preset", "ultrafast", + "-tune", "zerolatency", + "-c:a", "aac", + "-f", "segment", + "-segment_time", fmt.Sprintf("%.2f", s.segmentDur), + "-reset_timestamps", "1", + segmentPattern, + ) + + cmd := exec.CommandContext(ctx, "ffmpeg", args...) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + + // Set up graceful termination + cmd.Cancel = func() error { + return cmd.Process.Signal(syscall.SIGTERM) + } + cmd.WaitDelay = 5 * time.Second + + slog.Info("Running FFmpeg", "args", args) + err = cmd.Run() + + // Signal completion to the segment processor + close(doneCh) + + // Wait for segment processor to finish + wg.Wait() + + // Close the publisher + if closeErr := publisher.Close(); closeErr != nil { + slog.Error("Error closing publisher", "err", closeErr) + } + + if err != nil && ctx.Err() == nil { + return fmt.Errorf("ffmpeg error: %w", err) + } + + slog.Info("Segmenter finished") + return nil +} + +// processSegments reads segments from named pipes and publishes them +func (s *Segmenter) processSegments(ctx context.Context, pattern string, publisher *trickle.TricklePublisher, doneCh <-chan struct{}) { + segmentNum := 0 + for { + select { + case <-ctx.Done(): + slog.Info("Segment processor cancelled") + return + case <-doneCh: + slog.Info("Segment processor done signal received") + return + default: + } + + pipeName := fmt.Sprintf(pattern, segmentNum) + nextPipeName := fmt.Sprintf(pattern, segmentNum+1) + + // Create next pipe ahead of time + if err := syscall.Mkfifo(nextPipeName, 0666); err != nil && !os.IsExist(err) { + slog.Error("Failed to create next pipe", "pipe", nextPipeName, "err", err) + } + + // Open pipe for reading with timeout + file, err := openPipeWithTimeout(pipeName, 20*time.Second, doneCh) + if err != nil { + slog.Info("Pipe open completed or timed out", "pipe", pipeName, "err", err) + // Clean up pipes + os.Remove(pipeName) + os.Remove(nextPipeName) + return + } + + // Read and publish the segment + if err := s.publishSegment(ctx, file, publisher, segmentNum); err != nil { + slog.Error("Failed to publish segment", "segment", segmentNum, "err", err) + } + + file.Close() + os.Remove(pipeName) + segmentNum++ + } +} + +// openPipeWithTimeout opens a named pipe with a timeout +func openPipeWithTimeout(name string, timeout time.Duration, doneCh <-chan struct{}) (*os.File, error) { + resultCh := make(chan *os.File, 1) + errCh := make(chan error, 1) + + go func() { + // Open in non-blocking mode first + file, err := os.OpenFile(name, os.O_RDONLY, 0) + if err != nil { + errCh <- err + return + } + resultCh <- file + }() + + select { + case file := <-resultCh: + return file, nil + case err := <-errCh: + return nil, err + case <-doneCh: + return nil, fmt.Errorf("done signal received") + case <-time.After(timeout): + return nil, fmt.Errorf("timeout waiting for pipe") + } +} + +// publishSegment reads from a file and publishes to trickle +func (s *Segmenter) publishSegment(ctx context.Context, file *os.File, publisher *trickle.TricklePublisher, segmentNum int) error { + startTime := time.Now() + reader := bufio.NewReader(file) + + // Read all data into a buffer first to get the size + data, err := io.ReadAll(reader) + if err != nil { + return fmt.Errorf("failed to read segment: %w", err) + } + + if len(data) == 0 { + slog.Warn("Empty segment, skipping", "segment", segmentNum) + return nil + } + + // Create a reader from the data + dataReader := &byteReader{data: data} + + // Publish the segment + if err := publisher.Write(dataReader); err != nil { + return fmt.Errorf("failed to publish segment: %w", err) + } + + slog.Info("Published segment", "segment", segmentNum, "bytes", len(data), "took", time.Since(startTime)) + return nil +} + +// byteReader implements io.Reader for a byte slice +type byteReader struct { + data []byte + pos int +} + +func (r *byteReader) Read(p []byte) (n int, err error) { + if r.pos >= len(r.data) { + return 0, io.EOF + } + n = copy(p, r.data[r.pos:]) + r.pos += n + return n, nil +} + diff --git a/tools/local-streamer/server.go b/tools/local-streamer/server.go new file mode 100644 index 000000000..32fd649c8 --- /dev/null +++ b/tools/local-streamer/server.go @@ -0,0 +1,95 @@ +package main + +import ( + "fmt" + "log/slog" + "net/http" + + "github.com/livepeer/go-livepeer/trickle" +) + +// TrickleServer wraps the trickle server functionality +type TrickleServer struct { + server *trickle.Server + mux *http.ServeMux + httpSrv *http.Server + port int + basePath string + stopFunc func() +} + +// NewTrickleServer creates a new Trickle server on the specified port +func NewTrickleServer(port int) *TrickleServer { + basePath := "/trickle/" + mux := http.NewServeMux() + + config := trickle.TrickleServerConfig{ + BasePath: basePath, + Mux: mux, + Autocreate: true, // Auto-create channels on first publish + } + + server := trickle.ConfigureServer(config) + + return &TrickleServer{ + server: server, + mux: mux, + port: port, + basePath: basePath, + } +} + +// Start starts the Trickle server +func (ts *TrickleServer) Start() error { + // Start the trickle server's internal ticker for idle channel sweeping + ts.stopFunc = ts.server.Start() + + addr := fmt.Sprintf(":%d", ts.port) + ts.httpSrv = &http.Server{ + Addr: addr, + Handler: ts.mux, + } + + slog.Info("Starting Trickle server", "addr", addr, "basePath", ts.basePath) + + go func() { + if err := ts.httpSrv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + slog.Error("Trickle server error", "err", err) + } + }() + + return nil +} + +// Stop stops the Trickle server +func (ts *TrickleServer) Stop() error { + slog.Info("Stopping Trickle server") + if ts.stopFunc != nil { + ts.stopFunc() + } + if ts.httpSrv != nil { + return ts.httpSrv.Close() + } + return nil +} + +// GetInputURL returns the URL for the input channel +func (ts *TrickleServer) GetInputURL() string { + return fmt.Sprintf("http://localhost:%d%sinput", ts.port, ts.basePath) +} + +// GetOutputURL returns the URL for the output channel +func (ts *TrickleServer) GetOutputURL() string { + return fmt.Sprintf("http://localhost:%d%soutput", ts.port, ts.basePath) +} + +// GetControlURL returns the URL for the control channel +func (ts *TrickleServer) GetControlURL() string { + return fmt.Sprintf("http://localhost:%d%scontrol", ts.port, ts.basePath) +} + +// GetEventsURL returns the URL for the events channel +func (ts *TrickleServer) GetEventsURL() string { + return fmt.Sprintf("http://localhost:%d%sevents", ts.port, ts.basePath) +} +