Skip to content

Deliver WebRTC packages to ion-SFU #245

@julioqueirozvr

Description

@julioqueirozvr

Hi, i'm developing an solution that brings multiple RTSP cameras to browser.
Using RTSPtoWebRTC on mesh (p2p) to the browser works well, but on 7/8 stream on browser, it crashes.
So, i'm testing the option to publish the WebRTC stream to an ion-sfu, that delivers to the browser.

I managed to make RTSPtoWebRTC connect on ion-sfu and deliver the packages to it, webrtc track is coming to the browser, but the video tag is showing nothing.
Maybe my RTP packets are corrupted? Anywhere here can help me? It's pretty urgent.
Front-end is not the problem, since webcam on SFU works fine.

Front

const pubVideo = document.getElementById("pub_video");
const subVideo = document.getElementById("sub_video");
const bntPubCam = document.getElementById("bnt_pubcam");
const bntPubScreen = document.getElementById("bnt_pubscreen");

const serverURL = "ws://localhost:7000/ws";

const config = {
  iceServers: [
    {
      urls: "stun:stun.l.google.com:19302",
    },
  ],
};

const signalLocal = new Signal.IonSFUJSONRPCSignal(serverURL);
const clientLocal = new IonSDK.Client(signalLocal, config);

signalLocal.onopen = () => clientLocal.join("teste");

const start = (type) => {
  if (type) {
    IonSDK.LocalStream.getUserMedia({
      resolution: "vga",
      audio: true,
      codec: "h264",
    })
      .then((media) => {
        pubVideo.srcObject = media;
        pubVideo.autoplay = true;
        pubVideo.controls = true;
        pubVideo.muted = true;
        bntPubCam.disabled = true;
        bntPubScreen.disabled = true;
        clientLocal.publish(media);
      })
      .catch(console.error);
  } else {
    IonSDK.LocalStream.getDisplayMedia({
      resolution: "vga",
      audio: true,
      codec: "h264",
    })
      .then((media) => {
        pubVideo.srcObject = media;
        pubVideo.autoplay = true;
        pubVideo.controls = true;
        pubVideo.muted = true;
        bntPubCam.disabled = true;
        bntPubScreen.disabled = true;
        clientLocal.publish(media);
      })
      .catch(console.error);
  }
};

clientLocal.ontrack = (track, stream) => {
  console.log("got track: ", track.id, "for stream: ", stream.id);
  track.onunmute = () => {
    subVideo.srcObject = stream;
    subVideo.autoplay = true;
    subVideo.muted = false;

    stream.onremovetrack = () => {
      subVideo.srcObject = null;
    };
  };
};

stream.go

package main

import (
	"context"
	"errors"
	"fmt"
	"log"
	"math/rand/v2"
	"time"

	"github.com/deepch/vdk/format/rtspv2"
	"github.com/pion/rtp"
	"github.com/pion/rtp/codecs"
)

var (
	ErrorStreamExitNoVideoOnStream = errors.New("Stream Exit No Video On Stream")
	ErrorStreamExitRtspDisconnect  = errors.New("Stream Exit Rtsp Disconnect")
	ErrorStreamExitNoViewer        = errors.New("Stream Exit On Demand No Viewer")
)

func serveStreams() {
	for k, v := range Config.Streams {
		if !v.OnDemand {
			go RTSPWorkerLoop(k, v.URL, v.OnDemand, v.DisableAudio, v.Debug)
		}
	}
}
func RTSPWorkerLoop(name, url string, OnDemand, DisableAudio, Debug bool) {
	defer Config.RunUnlock(name)
	for {
		log.Println("Stream Try Connect", name)
		err := RTSPWorker(name, url, OnDemand, DisableAudio, Debug)
		if err != nil {
			log.Println(err)
			Config.LastError = err
		}
		if OnDemand && !Config.HasViewer(name) {
			log.Println(ErrorStreamExitNoViewer)
			return
		}
		time.Sleep(1 * time.Second)
	}
}
func RTSPWorker(name, url string, OnDemand, DisableAudio, Debug bool) error {
	keyTest := time.NewTimer(20 * time.Second)
	clientTest := time.NewTimer(20 * time.Second)
	//add next TimeOut
	RTSPClient, err := rtspv2.Dial(rtspv2.RTSPClientOptions{URL: url, DisableAudio: DisableAudio, DialTimeout: 3 * time.Second, ReadWriteTimeout: 3 * time.Second, Debug: Debug})
	if err != nil {
		return err
	}
	defer RTSPClient.Close()
	if RTSPClient.CodecData != nil {
		Config.coAd(name, RTSPClient.CodecData)
	}

	fmt.Printf("%+v\n", RTSPClient.CodecData)

	var AudioOnly bool
	if len(RTSPClient.CodecData) == 1 && RTSPClient.CodecData[0].Type().IsAudio() {
		AudioOnly = true
	}

	publisher, err := NewIonPublisher(context.Background(), "localhost:7000", "teste", name)
	if err != nil {
		return fmt.Errorf("erro ao criar publisher para %s: %w", name, err)
	}

	ssrc := rand.Uint32()
	payloader := &codecs.H264Payloader{}
	packetizer := rtp.NewPacketizer(
		1400,
		125,
		ssrc,
		payloader,
		rtp.NewRandomSequencer(),
		90000,
	)

	for {
		select {
		case <-clientTest.C:
			if OnDemand {
				if !Config.HasViewer(name) {
					return ErrorStreamExitNoViewer
				} else {
					clientTest.Reset(20 * time.Second)
				}
			}
		case <-keyTest.C:
			return ErrorStreamExitNoVideoOnStream
		case signals := <-RTSPClient.Signals:
			switch signals {
			case rtspv2.SignalCodecUpdate:
				Config.coAd(name, RTSPClient.CodecData)
			case rtspv2.SignalStreamRTPStop:
				return ErrorStreamExitRtspDisconnect
			}
		case packetAV := <-RTSPClient.OutgoingPacketQueue:

			//HERE, THE RTP PACKETS ARE SEND TO ION-SFU
			if packetAV.IsKeyFrame {
				fmt.Printf("🔍 KeyFrame recebido! Dump dos primeiros bytes:\n% X\n", packetAV.Data[:20])
			}

			if AudioOnly || packetAV.IsKeyFrame {
				keyTest.Reset(20 * time.Second)
			}
			Config.cast(name, *packetAV)

			timestamp := uint32(packetAV.Time * 90) // 90kHz para H264

			packets := packetizer.Packetize(packetAV.Data, timestamp)
			for _, pkt := range packets {
				pkt.PayloadType = 125
				err := publisher.WriteRTP(pkt)
				if err != nil {
					log.Println("Erro ao enviar pacote RTP:", err)
				}
			}
		}
	}
}

ion_publisher.go (stablish the connection with ion-sfu and send rtp packets)

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"net/url"
	"time"

	"github.com/gorilla/websocket"
	"github.com/pion/interceptor"
	"github.com/pion/rtp"
	"github.com/pion/webrtc/v3"
)

type IonPublisher struct {
	peerConnection   *webrtc.PeerConnection
	videoTrack       *webrtc.TrackLocalStaticRTP
	wsConn           *websocket.Conn
}

func NewIonPublisher(ctx context.Context, ionAddr, room, uid string) (*IonPublisher, error) {
	// 1. Conectar ao WebSocket do Ion-SFU
	u := url.URL{Scheme: "ws", Host: ionAddr, Path: "/ws"}
	conn, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
	if err != nil {
		return nil, fmt.Errorf("falha na conexão WebSocket: %w", err)
	}

	const h264FmtpLine = "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42e01f"

	// 3. Configure PeerConnection
	m := &webrtc.MediaEngine{}
	m.RegisterCodec(webrtc.RTPCodecParameters{
		RTPCodecCapability: webrtc.RTPCodecCapability{
			MimeType:    webrtc.MimeTypeH264,
			ClockRate:   90000,
			Channels:    0,
			SDPFmtpLine: h264FmtpLine,
		},
		PayloadType: 125,
	},
		webrtc.RTPCodecTypeVideo)

	i := &interceptor.Registry{}
	if err := webrtc.RegisterDefaultInterceptors(m, i); err != nil {
		return nil, fmt.Errorf("erro nos interceptors: %w", err)
	}

	api := webrtc.NewAPI(webrtc.WithMediaEngine(m))
	peerConnection, err := api.NewPeerConnection(webrtc.Configuration{
		ICEServers: []webrtc.ICEServer{{
			URLs: []string{"stun:stun.l.google.com:19302"},
		}},
		SDPSemantics: webrtc.SDPSemanticsUnifiedPlanWithFallback,
	})
	if err != nil {
		return nil, fmt.Errorf("falha ao criar PeerConnection: %w", err)
	}

	// 4. Create H264 video track
	videoTrack, err := webrtc.NewTrackLocalStaticRTP(
		webrtc.RTPCodecCapability{
			MimeType:    webrtc.MimeTypeH264,
			ClockRate:   90000,
			Channels:    0,
			SDPFmtpLine: h264FmtpLine,
		},
		"video",
		"pion",
	)
	if err != nil {
		return nil, fmt.Errorf("falha ao criar track: %w", err)
	}

	if _, err = peerConnection.AddTrack(videoTrack); err != nil {
		return nil, fmt.Errorf("falha ao adicionar track: %w", err)
	}

	publisher := &IonPublisher{
		peerConnection: peerConnection,
		videoTrack:     videoTrack,
		wsConn:         conn,
	}

	// 5. Handler to ICE Candidates
	peerConnection.OnICECandidate(func(c *webrtc.ICECandidate) {
		if c != nil {
			data, _ := json.Marshal(map[string]interface{}{
				"method": "trickle",
				"params": map[string]interface{}{
					"candidate": c.ToJSON(),
					"target":    0, // 0 = publisher, 1 = subscriber
				},
			})
			publisher.wsConn.WriteMessage(websocket.TextMessage, data)
		}
	})

	// 6. Generate Offer and send to SFU
	offer, err := peerConnection.CreateOffer(nil)
	if err != nil {
		return nil, fmt.Errorf("falha ao criar offer: %w", err)
	}
	log.Printf("Offer SDP:\n%s", offer.SDP)

	if err = peerConnection.SetLocalDescription(offer); err != nil {
		return nil, fmt.Errorf("falha ao configurar LocalDescription: %w", err)
	}

	// 2. send JOIN command
	offerMsg := map[string]interface{}{
		"method": "join",
		"params": map[string]interface{}{
			"sid": room, // ID da sala
			"uid": uid,  // ID do usuário
			"offer": map[string]interface{}{
				"type": "offer",
				"sdp":  offer.SDP,
			},
		},
		"jsonrpc": "2.0",
		"id":      1,
	}
	joinBytes, _ := json.Marshal(offerMsg)
	if err := conn.WriteMessage(websocket.TextMessage, joinBytes); err != nil {
		return nil, fmt.Errorf("falha ao enviar join: %w", err)
	}

	// 7. Handler WebSocket messages (Answer)
	go publisher.handleWebSocketMessages()

	return publisher, nil
}

func (p *IonPublisher) handleWebSocketMessages() {
	var pendingCandidates []webrtc.ICECandidateInit

	for {
		_, msg, err := p.wsConn.ReadMessage()
		if err != nil {
			log.Printf("erro ao ler mensagem do WebSocket: %v", err)
			return
		}

		var resp map[string]interface{}
		if err := json.Unmarshal(msg, &resp); err != nil {
			log.Printf("erro ao decodificar JSON: %v", err)
			continue
		}

		// Handler de "answer"
		if result, ok := resp["result"].(map[string]interface{}); ok {
			if sdp, hasSDP := result["sdp"].(string); hasSDP {
				answer := webrtc.SessionDescription{
					Type: webrtc.SDPTypeAnswer,
					SDP:  sdp,
				}
				if err := p.peerConnection.SetRemoteDescription(answer); err != nil {
					log.Printf("erro ao aplicar RemoteDescription: %v", err)
					return
				}
				log.Println("RemoteDescription aplicada")

				// Após aplicar a RemoteDescription, adicione candidatos pendentes
				for _, c := range pendingCandidates {
					if err := p.peerConnection.AddICECandidate(c); err != nil {
						log.Printf("Erro ao adicionar ICE candidate pendente: %v", err)
					}
				}
				pendingCandidates = nil // limpar
			}
		}

		// Handler de trickle ICE
		if method, ok := resp["method"].(string); ok && method == "trickle" {
			params := resp["params"].(map[string]interface{})
			candData, err := json.Marshal(params["candidate"])
			if err != nil {
				log.Printf("erro ao processar candidato ICE: %v", err)
				continue
			}
			var candidate webrtc.ICECandidateInit
			if err := json.Unmarshal(candData, &candidate); err != nil {
				log.Printf("erro ao decodificar ICECandidateInit: %v", err)
				continue
			}

			// Se RemoteDescription ainda não foi aplicada, armazenar
			if p.peerConnection.RemoteDescription() == nil {
				pendingCandidates = append(pendingCandidates, candidate)
			} else {
				if err := p.peerConnection.AddICECandidate(candidate); err != nil {
					log.Printf("Erro ao adicionar ICE candidate: %v", err)
				}
			}
		}
	}
}

//Send RTSP packets
func (p *IonPublisher) WriteRTP(packet *rtp.Packet) error {
	// Enviar o pacote original
	return p.videoTrack.WriteRTP(packet)
}

func (p *IonPublisher) Close() {
	p.peerConnection.Close()
	p.wsConn.Close()
}

Any tips on what can be the problem?
On my architecture, an SFU could help me to resolve my performance issues on browser?
Aprecciate any help, thanks

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions