From dd0df91c90808032264d9c39b2baa4be355cc4c9 Mon Sep 17 00:00:00 2001 From: Billal GHILAS Date: Fri, 23 Jan 2026 22:07:39 +0100 Subject: [PATCH 1/3] Add support for sending opus to rtmp clients --- lib/shinkai/sink/rtmp.ex | 16 ++++++++-------- lib/shinkai/track.ex | 16 +++++++++++++++- 2 files changed, 23 insertions(+), 9 deletions(-) diff --git a/lib/shinkai/sink/rtmp.ex b/lib/shinkai/sink/rtmp.ex index d56a9ed..88b9000 100644 --- a/lib/shinkai/sink/rtmp.ex +++ b/lib/shinkai/sink/rtmp.ex @@ -11,12 +11,12 @@ defmodule Shinkai.Sink.RTMP do import Shinkai.Utils - alias ExFLV.Tag.{AudioData, ExVideoData, Serializer, VideoData} + alias ExFLV.Tag.{AudioData, ExAudioData, ExVideoData, Serializer, VideoData} alias ExRTMP.Server.ClientSession alias Phoenix.PubSub @timescale 1000 - @supported_codesc [:h264, :h265, :av1, :aac, :pcma, :pcmu] + @supported_codesc [:h264, :h265, :av1, :aac, :pcma, :pcmu, :opus] def start_link(opts) do name = {:via, Registry, {Source.Registry, {:rtmp_sink, opts[:id]}}} @@ -34,7 +34,7 @@ defmodule Shinkai.Sink.RTMP do :ok = PubSub.subscribe(Shinkai.PubSub, tracks_topic(id)) :ok = PubSub.subscribe(Shinkai.PubSub, state_topic(id)) - {:ok, %{source_id: id, tracks: %{}, init_tags: [], packet_topic: packets_topic(id)}} + {:ok, %{source_id: id, tracks: %{}, init_tags: []}} end @impl true @@ -69,7 +69,7 @@ defmodule Shinkai.Sink.RTMP do end) if supported_tracks != [] do - :ok = PubSub.subscribe(Shinkai.PubSub, state.packet_topic) + :ok = PubSub.subscribe(Shinkai.PubSub, packets_topic(state.source_id)) end {:noreply, %{state | tracks: Map.new(supported_tracks, &{&1.id, &1}), init_tags: init_tags}} @@ -97,9 +97,7 @@ defmodule Shinkai.Sink.RTMP do Logger.warning("[#{state.source_id}] [RTMP Sink] source disconnected") Registry.dispatch(Sink.Registry, {:rtmp, state.source_id}, fn entries -> - for {pid, _} <- entries do - send(pid, :exit) - end + for {pid, _} <- entries, do: send(pid, :exit) end) {:noreply, state} @@ -109,7 +107,6 @@ defmodule Shinkai.Sink.RTMP do tags = Enum.map(packets, &packet_to_tag(track, &1)) for {pid, _} <- entries, {timestamp, data} <- tags do - # credo:disable-for-next-line case track.type do :video -> ClientSession.send_video_data(pid, timestamp, data) :audio -> ClientSession.send_audio_data(pid, timestamp, data) @@ -133,6 +130,9 @@ defmodule Shinkai.Sink.RTMP do |> AudioData.AAC.new(:raw) |> AudioData.new(:aac, 1, 3, :stereo) + :opus -> + %ExAudioData{codec_id: :opus, packet_type: :coded_frames, data: packet.data} + codec when codec in [:h265, :av1] -> packet_type = if codec == :h265 and cts != 0, do: :coded_frames, else: :coded_frames_x diff --git a/lib/shinkai/track.ex b/lib/shinkai/track.ex index d330e40..64caa9c 100644 --- a/lib/shinkai/track.ex +++ b/lib/shinkai/track.ex @@ -3,7 +3,7 @@ defmodule Shinkai.Track do Module describing a media track. """ - alias ExFLV.Tag.{AudioData, ExVideoData, VideoData} + alias ExFLV.Tag.{AudioData, ExAudioData, ExVideoData, VideoData} alias ExMP4.Box alias MediaCodecs.MPEG4 @@ -76,6 +76,20 @@ defmodule Shinkai.Track do |> AudioData.new(:aac, 1, 3, :stereo) end + def to_rtmp_tag(%{codec: :opus} = track) do + # priv_data is the channel count + # No support for more than 2 channels for now + dops = %Box.Dops{ + output_channel_count: track.priv_data || 2, + pre_skip: 0, + input_sample_rate: 48000, + output_gain: 0, + channel_mapping_family: 0 + } + + %ExAudioData{codec_id: :opus, packet_type: :sequence_start, data: Box.serialize(dops)} + end + def to_rtmp_tag(%{codec: codec} = track) when codec in [:h265, :av1] do %ExVideoData{ codec_id: codec, From 3581287547724071ab1cb51bfe801a82b55c1ba2 Mon Sep 17 00:00:00 2001 From: Billal GHILAS Date: Fri, 23 Jan 2026 22:08:51 +0100 Subject: [PATCH 2/3] Fix tests --- test/shinkai/pipeline_test.exs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/shinkai/pipeline_test.exs b/test/shinkai/pipeline_test.exs index 5ea7611..9fdd384 100644 --- a/test/shinkai/pipeline_test.exs +++ b/test/shinkai/pipeline_test.exs @@ -188,7 +188,7 @@ defmodule Shinkai.PipelineTest do {video_codec, audio_codec} = case fixture do "test/fixtures/big_buck_avc_aac.mp4" -> {:h264, :aac} - _ -> {:av1, nil} + _ -> {:av1, :opus} end if video_codec do From 4d918101a4e0cb6fc6aac4c59cb41767e2ba3209 Mon Sep 17 00:00:00 2001 From: Billal GHILAS Date: Fri, 23 Jan 2026 22:21:20 +0100 Subject: [PATCH 3/3] Refactor --- lib/shinkai/sink/rtmp.ex | 77 ++++++++++++++++++++++------------------ lib/shinkai/track.ex | 2 +- 2 files changed, 43 insertions(+), 36 deletions(-) diff --git a/lib/shinkai/sink/rtmp.ex b/lib/shinkai/sink/rtmp.ex index 88b9000..7371406 100644 --- a/lib/shinkai/sink/rtmp.ex +++ b/lib/shinkai/sink/rtmp.ex @@ -104,7 +104,19 @@ defmodule Shinkai.Sink.RTMP do end defp dispatch_packets(entries, packets, track) do - tags = Enum.map(packets, &packet_to_tag(track, &1)) + tags = + Enum.map(packets, fn packet -> + dts = div(packet.dts * @timescale, track.timescale) + cts = div((packet.pts - packet.dts) * @timescale, track.timescale) + + tag = + track.codec + |> packet_to_tag(packet, cts) + |> Serializer.serialize() + |> IO.iodata_to_binary() + + {dts, tag} + end) for {pid, _} <- entries, {timestamp, data} <- tags do case track.type do @@ -114,41 +126,36 @@ defmodule Shinkai.Sink.RTMP do end end - defp packet_to_tag(track, packet) do - dts = div(packet.dts * @timescale, track.timescale) - cts = div((packet.pts - packet.dts) * @timescale, track.timescale) - - tag = - case track.codec do - :h264 -> - maybe_prefix_payload(:h264, packet.data) - |> VideoData.AVC.new(:nalu, cts) - |> VideoData.new(:h264, if(packet.sync?, do: :keyframe, else: :interframe)) - - :aac -> - packet.data - |> AudioData.AAC.new(:raw) - |> AudioData.new(:aac, 1, 3, :stereo) - - :opus -> - %ExAudioData{codec_id: :opus, packet_type: :coded_frames, data: packet.data} - - codec when codec in [:h265, :av1] -> - packet_type = if codec == :h265 and cts != 0, do: :coded_frames, else: :coded_frames_x - - %ExVideoData{ - codec_id: codec, - frame_type: if(packet.sync?, do: :keyframe, else: :interframe), - packet_type: packet_type, - composition_time_offset: cts, - data: maybe_prefix_payload(codec, packet.data) - } - - codec -> - AudioData.new(packet.data, codec, 3, 1, :stereo) - end + defp packet_to_tag(:h264, packet, cts) do + maybe_prefix_payload(:h264, packet.data) + |> VideoData.AVC.new(:nalu, cts) + |> VideoData.new(:h264, if(packet.sync?, do: :keyframe, else: :interframe)) + end + + defp packet_to_tag(:aac, packet, _cts) do + packet.data + |> AudioData.AAC.new(:raw) + |> AudioData.new(:aac, 1, 3, :stereo) + end + + defp packet_to_tag(:opus, packet, _cts) do + %ExAudioData{codec_id: :opus, packet_type: :coded_frames, data: packet.data} + end + + defp packet_to_tag(codec, packet, cts) when codec in [:av1, :h265] do + packet_type = if codec == :h265 and cts != 0, do: :coded_frames, else: :coded_frames_x + + %ExVideoData{ + codec_id: codec, + frame_type: if(packet.sync?, do: :keyframe, else: :interframe), + packet_type: packet_type, + composition_time_offset: cts, + data: maybe_prefix_payload(codec, packet.data) + } + end - {dts, Serializer.serialize(tag) |> IO.iodata_to_binary()} + defp packet_to_tag(codec, packet, _cts) do + AudioData.new(packet.data, codec, 3, 1, :stereo) end defp maybe_prefix_payload(codec, payload) when codec in [:h264, :h265] do diff --git a/lib/shinkai/track.ex b/lib/shinkai/track.ex index 64caa9c..950dbf1 100644 --- a/lib/shinkai/track.ex +++ b/lib/shinkai/track.ex @@ -82,7 +82,7 @@ defmodule Shinkai.Track do dops = %Box.Dops{ output_channel_count: track.priv_data || 2, pre_skip: 0, - input_sample_rate: 48000, + input_sample_rate: 48_000, output_gain: 0, channel_mapping_family: 0 }