diff --git a/av/av.go b/av/av.go index dfa03066..e8f009ea 100644 --- a/av/av.go +++ b/av/av.go @@ -1,4 +1,3 @@ - // Package av defines basic interfaces and data structures of container demux/mux and audio encode/decode. package av @@ -11,17 +10,17 @@ import ( type SampleFormat uint8 const ( - U8 = SampleFormat(iota + 1) // 8-bit unsigned integer - S16 // signed 16-bit integer - S32 // signed 32-bit integer - FLT // 32-bit float - DBL // 64-bit float - U8P // 8-bit unsigned integer in planar - S16P // signed 16-bit integer in planar - S32P // signed 32-bit integer in planar - FLTP // 32-bit float in planar - DBLP // 64-bit float in planar - U32 // unsigned 32-bit integer + U8 = SampleFormat(iota + 1) // 8-bit unsigned integer + S16 // signed 16-bit integer + S32 // signed 32-bit integer + FLT // 32-bit float + DBL // 64-bit float + U8P // 8-bit unsigned integer in planar + S16P // signed 16-bit integer in planar + S32P // signed 32-bit integer in planar + FLTP // 32-bit float in planar + DBLP // 64-bit float in planar + U32 // unsigned 32-bit integer ) func (self SampleFormat) BytesPerSample() int { @@ -116,11 +115,11 @@ func (self ChannelLayout) Count() (n int) { type CodecType uint32 var ( - H264 = MakeVideoCodecType(avCodecTypeMagic + 1) - AAC = MakeAudioCodecType(avCodecTypeMagic + 1) - PCM_MULAW = MakeAudioCodecType(avCodecTypeMagic + 2) - PCM_ALAW = MakeAudioCodecType(avCodecTypeMagic + 3) - SPEEX = MakeAudioCodecType(avCodecTypeMagic + 4) + H264 = MakeVideoCodecType(avCodecTypeMagic + 1) + AAC = MakeAudioCodecType(avCodecTypeMagic + 1) + PCM_MULAW = MakeAudioCodecType(avCodecTypeMagic + 2) + PCM_ALAW = MakeAudioCodecType(avCodecTypeMagic + 3) + SPEEX = MakeAudioCodecType(avCodecTypeMagic + 4) NELLYMOSER = MakeAudioCodecType(avCodecTypeMagic + 5) ) @@ -171,7 +170,7 @@ const avCodecTypeMagic = 233333 // can be converted to VideoCodecData or AudioCodecData using: // // codecdata.(AudioCodecData) or codecdata.(VideoCodecData) -// +// // for H264, CodecData is AVCDecoderConfigure bytes, includes SPS/PPS. type CodecData interface { Type() CodecType // Video/Audio codec type @@ -179,15 +178,15 @@ type CodecData interface { type VideoCodecData interface { CodecData - Width() int // Video width + Width() int // Video width Height() int // Video height } type AudioCodecData interface { CodecData - SampleFormat() SampleFormat // audio sample format - SampleRate() int // audio sample rate - ChannelLayout() ChannelLayout // audio channel layout + SampleFormat() SampleFormat // audio sample format + SampleRate() int // audio sample rate + ChannelLayout() ChannelLayout // audio channel layout PacketDuration([]byte) (time.Duration, error) // get audio compressed packet duration } @@ -196,16 +195,16 @@ type PacketWriter interface { } type PacketReader interface { - ReadPacket() (Packet,error) + ReadPacket() (Packet, error) } // Muxer describes the steps of writing compressed audio/video packets into container formats like MP4/FLV/MPEG-TS. -// +// // Container formats, rtmp.Conn, and transcode.Muxer implements Muxer interface. type Muxer interface { WriteHeader([]CodecData) error // write the file header - PacketWriter // write compressed audio/video packets - WriteTrailer() error // finish writing file, this func can be called only once + PacketWriter // write compressed audio/video packets + WriteTrailer() error // finish writing file, this func can be called only once } // Muxer with Close() method @@ -216,7 +215,7 @@ type MuxCloser interface { // Demuxer can read compressed audio/video packets from container formats like MP4/FLV/MPEG-TS. type Demuxer interface { - PacketReader // read compressed audio/video packets + PacketReader // read compressed audio/video packets Streams() ([]CodecData, error) // reads the file header, contains video/audio meta infomations } @@ -226,22 +225,28 @@ type DemuxCloser interface { Close() error } +const ( + I_FRAME = byte(0) + P_FRAME = byte(100) + B_FRAME = byte(101) +) + // Packet stores compressed audio/video data. type Packet struct { - IsKeyFrame bool // video packet is key frame - Idx int8 // stream index in container format + FrameType byte // video packet is key frame + Idx int8 // stream index in container format CompositionTime time.Duration // packet presentation time minus decode time for H264 B-Frame - Time time.Duration // packet decode time - Data []byte // packet data + Time time.Duration // packet decode time + Data []byte // packet data } // Raw audio frame. type AudioFrame struct { - SampleFormat SampleFormat // audio sample format, e.g: S16,FLTP,... + SampleFormat SampleFormat // audio sample format, e.g: S16,FLTP,... ChannelLayout ChannelLayout // audio channel layout, e.g: CH_MONO,CH_STEREO,... - SampleCount int // sample count in this frame - SampleRate int // sample rate - Data [][]byte // data array for planar format len(Data) > 1 + SampleCount int // sample count in this frame + SampleRate int // sample rate + Data [][]byte // data array for planar format len(Data) > 1 } func (self AudioFrame) Duration() time.Duration { @@ -291,26 +296,25 @@ func (self AudioFrame) Concat(in AudioFrame) (out AudioFrame) { // AudioEncoder can encode raw audio frame into compressed audio packets. // cgo/ffmpeg inplements AudioEncoder, using ffmpeg.NewAudioEncoder to create it. type AudioEncoder interface { - CodecData() (AudioCodecData, error) // encoder's codec data can put into container - Encode(AudioFrame) ([][]byte, error) // encode raw audio frame into compressed pakcet(s) - Close() // close encoder, free cgo contexts - SetSampleRate(int) (error) // set encoder sample rate - SetChannelLayout(ChannelLayout) (error) // set encoder channel layout - SetSampleFormat(SampleFormat) (error) // set encoder sample format - SetBitrate(int) (error) // set encoder bitrate - SetOption(string,interface{}) (error) // encoder setopt, in ffmpeg is av_opt_set_dict() - GetOption(string,interface{}) (error) // encoder getopt + CodecData() (AudioCodecData, error) // encoder's codec data can put into container + Encode(AudioFrame) ([][]byte, error) // encode raw audio frame into compressed pakcet(s) + Close() // close encoder, free cgo contexts + SetSampleRate(int) error // set encoder sample rate + SetChannelLayout(ChannelLayout) error // set encoder channel layout + SetSampleFormat(SampleFormat) error // set encoder sample format + SetBitrate(int) error // set encoder bitrate + SetOption(string, interface{}) error // encoder setopt, in ffmpeg is av_opt_set_dict() + GetOption(string, interface{}) error // encoder getopt } // AudioDecoder can decode compressed audio packets into raw audio frame. // use ffmpeg.NewAudioDecoder to create it. type AudioDecoder interface { Decode([]byte) (bool, AudioFrame, error) // decode one compressed audio packet - Close() // close decode, free cgo contexts + Close() // close decode, free cgo contexts } // AudioResampler can convert raw audio frames in different sample rate/format/channel layout. type AudioResampler interface { Resample(AudioFrame) (AudioFrame, error) // convert raw audio frames } - diff --git a/av/avutil/avutil.go b/av/avutil/avutil.go index 1e980cd7..ace236ca 100644 --- a/av/avutil/avutil.go +++ b/av/avutil/avutil.go @@ -1,14 +1,15 @@ package avutil import ( - "io" - "strings" - "fmt" "bytes" - "github.com/nareix/joy4/av" + "fmt" + "io" "net/url" "os" "path" + "strings" + + "github.com/Danile71/joy4/av" ) type HandlerDemuxer struct { @@ -22,7 +23,7 @@ func (self *HandlerDemuxer) Close() error { type HandlerMuxer struct { av.Muxer - w io.WriteCloser + w io.WriteCloser stage int } @@ -54,18 +55,18 @@ func (self *HandlerMuxer) Close() (err error) { } type RegisterHandler struct { - Ext string - ReaderDemuxer func(io.Reader)av.Demuxer - WriterMuxer func(io.Writer)av.Muxer - UrlMuxer func(string)(bool,av.MuxCloser,error) - UrlDemuxer func(string)(bool,av.DemuxCloser,error) - UrlReader func(string)(bool,io.ReadCloser,error) - Probe func([]byte)bool - AudioEncoder func(av.CodecType)(av.AudioEncoder,error) - AudioDecoder func(av.AudioCodecData)(av.AudioDecoder,error) - ServerDemuxer func(string)(bool,av.DemuxCloser,error) - ServerMuxer func(string)(bool,av.MuxCloser,error) - CodecTypes []av.CodecType + Ext string + ReaderDemuxer func(io.Reader) av.Demuxer + WriterMuxer func(io.Writer) av.Muxer + UrlMuxer func(string) (bool, av.MuxCloser, error) + UrlDemuxer func(string) (bool, av.DemuxCloser, error) + UrlReader func(string) (bool, io.ReadCloser, error) + Probe func([]byte) bool + AudioEncoder func(av.CodecType) (av.AudioEncoder, error) + AudioDecoder func(av.AudioCodecData) (av.AudioDecoder, error) + ServerDemuxer func(string) (bool, av.DemuxCloser, error) + ServerMuxer func(string) (bool, av.MuxCloser, error) + CodecTypes []av.CodecType } type Handlers struct { @@ -167,7 +168,7 @@ func (self *Handlers) Open(uri string) (demuxer av.DemuxCloser, err error) { } demuxer = &HandlerDemuxer{ Demuxer: handler.ReaderDemuxer(r), - r: r, + r: r, } return } @@ -196,7 +197,7 @@ func (self *Handlers) Open(uri string) (demuxer av.DemuxCloser, err error) { } demuxer = &HandlerDemuxer{ Demuxer: handler.ReaderDemuxer(_r), - r: r, + r: r, } return } @@ -254,7 +255,7 @@ func (self *Handlers) FindCreate(uri string) (handler RegisterHandler, muxer av. } muxer = &HandlerMuxer{ Muxer: handler.WriterMuxer(w), - w: w, + w: w, } return } diff --git a/cgo/ffmpeg/audio.go b/cgo/ffmpeg/audio.go index fbd676dc..eef1c5e2 100644 --- a/cgo/ffmpeg/audio.go +++ b/cgo/ffmpeg/audio.go @@ -2,32 +2,28 @@ package ffmpeg /* #include "ffmpeg.h" -int wrap_avcodec_decode_audio4(AVCodecContext *ctx, AVFrame *frame, void *data, int size, int *got) { - struct AVPacket pkt = {.data = data, .size = size}; - return avcodec_decode_audio4(ctx, frame, got, &pkt); -} -int wrap_avresample_convert(AVAudioResampleContext *avr, int *out, int outsize, int outcount, int *in, int insize, int incount) { - return avresample_convert(avr, (void *)out, outsize, outcount, (void *)in, insize, incount); -} + + */ import "C" import ( - "unsafe" - "runtime" "fmt" + "runtime" "time" - "github.com/nareix/joy4/av" - "github.com/nareix/joy4/av/avutil" - "github.com/nareix/joy4/codec/aacparser" + "unsafe" + + "github.com/Danile71/joy4/av" + "github.com/Danile71/joy4/av/avutil" + "github.com/Danile71/joy4/codec/aacparser" ) const debug = false type Resampler struct { - inSampleFormat, OutSampleFormat av.SampleFormat + inSampleFormat, OutSampleFormat av.SampleFormat inChannelLayout, OutChannelLayout av.ChannelLayout - inSampleRate, OutSampleRate int - avr *C.AVAudioResampleContext + inSampleRate, OutSampleRate int + avr *C.SwrContext } func (self *Resampler) Resample(in av.AudioFrame) (out av.AudioFrame, err error) { @@ -42,8 +38,8 @@ func (self *Resampler) Resample(in av.AudioFrame) (out av.AudioFrame, err error) outChannels = 1 } outData := make([]*C.uint8_t, outChannels) - outSampleCount := int(C.avresample_get_out_samples(self.avr, C.int(in.SampleCount))) - outLinesize := outSampleCount*self.OutSampleFormat.BytesPerSample() + outSampleCount := int(C.swr_get_out_samples(self.avr, C.int(in.SampleCount))) + outLinesize := outSampleCount * self.OutSampleFormat.BytesPerSample() flush.Data = make([][]byte, outChannels) for i := 0; i < outChannels; i++ { flush.Data[i] = make([]byte, outLinesize) @@ -53,11 +49,12 @@ func (self *Resampler) Resample(in av.AudioFrame) (out av.AudioFrame, err error) flush.SampleFormat = self.OutSampleFormat flush.SampleRate = self.OutSampleRate - convertSamples := int(C.wrap_avresample_convert( + convertSamples := int(C.swr_convert( self.avr, - (*C.int)(unsafe.Pointer(&outData[0])), C.int(outLinesize), C.int(outSampleCount), - nil, C.int(0), C.int(0), + (**C.uint8_t)(unsafe.Pointer(&outData[0])), C.int(outSampleCount), + nil, C.int(0), )) + if convertSamples < 0 { err = fmt.Errorf("ffmpeg: avresample_convert_frame failed") return @@ -76,29 +73,27 @@ func (self *Resampler) Resample(in av.AudioFrame) (out av.AudioFrame, err error) }) } - C.avresample_free(&self.avr) + C.swr_free(&self.avr) self.inSampleFormat = in.SampleFormat self.inSampleRate = in.SampleRate self.inChannelLayout = in.ChannelLayout - avr := C.avresample_alloc_context() + avr := C.swr_alloc() C.av_opt_set_int(unsafe.Pointer(avr), C.CString("in_channel_layout"), C.int64_t(channelLayoutAV2FF(self.inChannelLayout)), 0) C.av_opt_set_int(unsafe.Pointer(avr), C.CString("out_channel_layout"), C.int64_t(channelLayoutAV2FF(self.OutChannelLayout)), 0) C.av_opt_set_int(unsafe.Pointer(avr), C.CString("in_sample_rate"), C.int64_t(self.inSampleRate), 0) C.av_opt_set_int(unsafe.Pointer(avr), C.CString("out_sample_rate"), C.int64_t(self.OutSampleRate), 0) C.av_opt_set_int(unsafe.Pointer(avr), C.CString("in_sample_fmt"), C.int64_t(sampleFormatAV2FF(self.inSampleFormat)), 0) C.av_opt_set_int(unsafe.Pointer(avr), C.CString("out_sample_fmt"), C.int64_t(sampleFormatAV2FF(self.OutSampleFormat)), 0) - C.avresample_open(avr) + C.swr_init(avr) self.avr = avr } - var inChannels, inLinesize int + var inChannels int inSampleCount := in.SampleCount if !self.inSampleFormat.IsPlanar() { inChannels = 1 - inLinesize = inSampleCount*in.SampleFormat.BytesPerSample()*self.inChannelLayout.Count() } else { inChannels = self.inChannelLayout.Count() - inLinesize = inSampleCount*in.SampleFormat.BytesPerSample() } inData := make([]*C.uint8_t, inChannels) for i := 0; i < inChannels; i++ { @@ -106,15 +101,15 @@ func (self *Resampler) Resample(in av.AudioFrame) (out av.AudioFrame, err error) } var outChannels, outLinesize, outBytesPerSample int - outSampleCount := int(C.avresample_get_out_samples(self.avr, C.int(in.SampleCount))) + outSampleCount := int(C.swr_get_out_samples(self.avr, C.int(in.SampleCount))) if !self.OutSampleFormat.IsPlanar() { outChannels = 1 - outBytesPerSample = self.OutSampleFormat.BytesPerSample()*self.OutChannelLayout.Count() - outLinesize = outSampleCount*outBytesPerSample + outBytesPerSample = self.OutSampleFormat.BytesPerSample() * self.OutChannelLayout.Count() + outLinesize = outSampleCount * outBytesPerSample } else { outChannels = self.OutChannelLayout.Count() outBytesPerSample = self.OutSampleFormat.BytesPerSample() - outLinesize = outSampleCount*outBytesPerSample + outLinesize = outSampleCount * outBytesPerSample } outData := make([]*C.uint8_t, outChannels) out.Data = make([][]byte, outChannels) @@ -126,10 +121,10 @@ func (self *Resampler) Resample(in av.AudioFrame) (out av.AudioFrame, err error) out.SampleFormat = self.OutSampleFormat out.SampleRate = self.OutSampleRate - convertSamples := int(C.wrap_avresample_convert( + convertSamples := int(C.swr_convert( self.avr, - (*C.int)(unsafe.Pointer(&outData[0])), C.int(outLinesize), C.int(outSampleCount), - (*C.int)(unsafe.Pointer(&inData[0])), C.int(inLinesize), C.int(inSampleCount), + (**C.uint8_t)(unsafe.Pointer(&outData[0])), C.int(outSampleCount), + (**C.uint8_t)(unsafe.Pointer(&inData[0])), C.int(inSampleCount), )) if convertSamples < 0 { err = fmt.Errorf("ffmpeg: avresample_convert_frame failed") @@ -151,19 +146,19 @@ func (self *Resampler) Resample(in av.AudioFrame) (out av.AudioFrame, err error) } func (self *Resampler) Close() { - C.avresample_free(&self.avr) + C.swr_free(&self.avr) } type AudioEncoder struct { - ff *ffctx - SampleRate int - Bitrate int - ChannelLayout av.ChannelLayout - SampleFormat av.SampleFormat + ff *ffctx + SampleRate int + Bitrate int + ChannelLayout av.ChannelLayout + SampleFormat av.SampleFormat FrameSampleCount int - framebuf av.AudioFrame - codecData av.AudioCodecData - resampler *Resampler + framebuf av.AudioFrame + codecData av.AudioCodecData + resampler *Resampler } func sampleFormatAV2FF(sampleFormat av.SampleFormat) (ffsamplefmt int32) { @@ -194,25 +189,25 @@ func sampleFormatAV2FF(sampleFormat av.SampleFormat) (ffsamplefmt int32) { func sampleFormatFF2AV(ffsamplefmt int32) (sampleFormat av.SampleFormat) { switch ffsamplefmt { - case C.AV_SAMPLE_FMT_U8: ///< unsigned 8 bits + case C.AV_SAMPLE_FMT_U8: ///< unsigned 8 bits sampleFormat = av.U8 - case C.AV_SAMPLE_FMT_S16: ///< signed 16 bits + case C.AV_SAMPLE_FMT_S16: ///< signed 16 bits sampleFormat = av.S16 - case C.AV_SAMPLE_FMT_S32: ///< signed 32 bits + case C.AV_SAMPLE_FMT_S32: ///< signed 32 bits sampleFormat = av.S32 - case C.AV_SAMPLE_FMT_FLT: ///< float + case C.AV_SAMPLE_FMT_FLT: ///< float sampleFormat = av.FLT - case C.AV_SAMPLE_FMT_DBL: ///< double + case C.AV_SAMPLE_FMT_DBL: ///< double sampleFormat = av.DBL - case C.AV_SAMPLE_FMT_U8P: ///< unsigned 8 bits, planar + case C.AV_SAMPLE_FMT_U8P: ///< unsigned 8 bits, planar sampleFormat = av.U8P - case C.AV_SAMPLE_FMT_S16P: ///< signed 16 bits, planar + case C.AV_SAMPLE_FMT_S16P: ///< signed 16 bits, planar sampleFormat = av.S16P - case C.AV_SAMPLE_FMT_S32P: ///< signed 32 bits, planar + case C.AV_SAMPLE_FMT_S32P: ///< signed 32 bits, planar sampleFormat = av.S32P - case C.AV_SAMPLE_FMT_FLTP: ///< float, planar + case C.AV_SAMPLE_FMT_FLTP: ///< float, planar sampleFormat = av.FLTP - case C.AV_SAMPLE_FMT_DBLP: ///< double, planar + case C.AV_SAMPLE_FMT_DBLP: ///< double, planar sampleFormat = av.DBLP } return @@ -319,10 +314,10 @@ func (self *AudioEncoder) Setup() (err error) { default: self.codecData = audioCodecData{ channelLayout: self.ChannelLayout, - sampleFormat: self.SampleFormat, - sampleRate: self.SampleRate, - codecId: ff.codecCtx.codec_id, - extradata: extradata, + sampleFormat: self.SampleFormat, + sampleRate: self.SampleRate, + codecId: ff.codecCtx.codec_id, + extradata: extradata, } } @@ -368,7 +363,7 @@ func (self *AudioEncoder) encodeOne(frame av.AudioFrame) (gotpkt bool, pkt []byt } fmt.Println(farr) } - cerr := C.avcodec_encode_audio2(ff.codecCtx, &cpkt, ff.frame, &cgotpkt) + cerr := C.encode(ff.codecCtx, &cpkt, &cgotpkt, ff.frame) if cerr < C.int(0) { err = fmt.Errorf("ffmpeg: avcodec_encode_audio2 failed: %d", cerr) return @@ -390,8 +385,8 @@ func (self *AudioEncoder) encodeOne(frame av.AudioFrame) (gotpkt bool, pkt []byt func (self *AudioEncoder) resample(in av.AudioFrame) (out av.AudioFrame, err error) { if self.resampler == nil { self.resampler = &Resampler{ - OutSampleFormat: self.SampleFormat, - OutSampleRate: self.SampleRate, + OutSampleFormat: self.SampleFormat, + OutSampleRate: self.SampleRate, OutChannelLayout: self.ChannelLayout, } } @@ -487,73 +482,73 @@ func audioFrameAssignToFF(frame av.AudioFrame, f *C.AVFrame) { } func channelLayoutFF2AV(layout C.uint64_t) (channelLayout av.ChannelLayout) { - if layout & C.AV_CH_FRONT_CENTER != 0 { + if layout&C.AV_CH_FRONT_CENTER != 0 { channelLayout |= av.CH_FRONT_CENTER } - if layout & C.AV_CH_FRONT_LEFT != 0 { + if layout&C.AV_CH_FRONT_LEFT != 0 { channelLayout |= av.CH_FRONT_LEFT } - if layout & C.AV_CH_FRONT_RIGHT != 0 { + if layout&C.AV_CH_FRONT_RIGHT != 0 { channelLayout |= av.CH_FRONT_RIGHT } - if layout & C.AV_CH_BACK_CENTER != 0 { + if layout&C.AV_CH_BACK_CENTER != 0 { channelLayout |= av.CH_BACK_CENTER } - if layout & C.AV_CH_BACK_LEFT != 0 { + if layout&C.AV_CH_BACK_LEFT != 0 { channelLayout |= av.CH_BACK_LEFT } - if layout & C.AV_CH_BACK_RIGHT != 0 { + if layout&C.AV_CH_BACK_RIGHT != 0 { channelLayout |= av.CH_BACK_RIGHT } - if layout & C.AV_CH_SIDE_LEFT != 0 { + if layout&C.AV_CH_SIDE_LEFT != 0 { channelLayout |= av.CH_SIDE_LEFT } - if layout & C.AV_CH_SIDE_RIGHT != 0 { + if layout&C.AV_CH_SIDE_RIGHT != 0 { channelLayout |= av.CH_SIDE_RIGHT } - if layout & C.AV_CH_LOW_FREQUENCY != 0 { + if layout&C.AV_CH_LOW_FREQUENCY != 0 { channelLayout |= av.CH_LOW_FREQ } return } func channelLayoutAV2FF(channelLayout av.ChannelLayout) (layout C.uint64_t) { - if channelLayout & av.CH_FRONT_CENTER != 0 { + if channelLayout&av.CH_FRONT_CENTER != 0 { layout |= C.AV_CH_FRONT_CENTER } - if channelLayout & av.CH_FRONT_LEFT != 0 { + if channelLayout&av.CH_FRONT_LEFT != 0 { layout |= C.AV_CH_FRONT_LEFT } - if channelLayout & av.CH_FRONT_RIGHT != 0 { + if channelLayout&av.CH_FRONT_RIGHT != 0 { layout |= C.AV_CH_FRONT_RIGHT } - if channelLayout & av.CH_BACK_CENTER != 0 { + if channelLayout&av.CH_BACK_CENTER != 0 { layout |= C.AV_CH_BACK_CENTER } - if channelLayout & av.CH_BACK_LEFT != 0 { + if channelLayout&av.CH_BACK_LEFT != 0 { layout |= C.AV_CH_BACK_LEFT } - if channelLayout & av.CH_BACK_RIGHT != 0 { + if channelLayout&av.CH_BACK_RIGHT != 0 { layout |= C.AV_CH_BACK_RIGHT } - if channelLayout & av.CH_SIDE_LEFT != 0 { + if channelLayout&av.CH_SIDE_LEFT != 0 { layout |= C.AV_CH_SIDE_LEFT } - if channelLayout & av.CH_SIDE_RIGHT != 0 { + if channelLayout&av.CH_SIDE_RIGHT != 0 { layout |= C.AV_CH_SIDE_RIGHT } - if channelLayout & av.CH_LOW_FREQ != 0 { + if channelLayout&av.CH_LOW_FREQ != 0 { layout |= C.AV_CH_LOW_FREQUENCY } return } type AudioDecoder struct { - ff *ffctx + ff *ffctx ChannelLayout av.ChannelLayout - SampleFormat av.SampleFormat - SampleRate int - Extradata []byte + SampleFormat av.SampleFormat + SampleRate int + Extradata []byte } func (self *AudioDecoder) Setup() (err error) { @@ -589,7 +584,9 @@ func (self *AudioDecoder) Decode(pkt []byte) (gotframe bool, frame av.AudioFrame ff := &self.ff.ff cgotframe := C.int(0) - cerr := C.wrap_avcodec_decode_audio4(ff.codecCtx, ff.frame, unsafe.Pointer(&pkt[0]), C.int(len(pkt)), &cgotframe) + + cerr := C.decode(ff.codecCtx, ff.frame, (*C.uchar)(unsafe.Pointer(&pkt[0])), C.int(len(pkt)), &cgotframe) + if cerr < C.int(0) { err = fmt.Errorf("ffmpeg: avcodec_decode_audio4 failed: %d", cerr) return @@ -709,11 +706,11 @@ func NewAudioDecoder(codec av.AudioCodecData) (dec *AudioDecoder, err error) { } type audioCodecData struct { - codecId uint32 - sampleFormat av.SampleFormat + codecId uint32 + sampleFormat av.SampleFormat channelLayout av.ChannelLayout - sampleRate int - extradata []byte + sampleRate int + extradata []byte } func (self audioCodecData) Type() av.CodecType { @@ -755,4 +752,3 @@ func AudioCodecHandler(h *avutil.RegisterHandler) { } } } - diff --git a/cgo/ffmpeg/ffmpeg.c b/cgo/ffmpeg/ffmpeg.c new file mode 100644 index 00000000..6d3bff83 --- /dev/null +++ b/cgo/ffmpeg/ffmpeg.c @@ -0,0 +1,93 @@ +#include +#include +#include +#include +#include +#include +#include +#include "ffmpeg.h" + +int decode(AVCodecContext *avctx, AVFrame *frame, uint8_t *data, int size, int *got_frame) +{ + int ret; + struct AVPacket pkt = {.data = data, .size = size}; + + *got_frame = 0; + + ret = avcodec_send_packet(avctx, &pkt); + + av_packet_unref(&pkt); + + if (ret < 0) + return ret == AVERROR_EOF ? 0 : ret; + + + ret = avcodec_receive_frame(avctx, frame); + if (ret < 0 && ret != AVERROR(EAGAIN) && ret != AVERROR_EOF) + return ret; + if (ret >= 0) + *got_frame = 1; + + return 0; +} + +int encode(AVCodecContext *avctx, AVPacket *pkt, int *got_packet, AVFrame *frame) +{ + int ret; + + *got_packet = 0; + + ret = avcodec_send_frame(avctx, frame); + if (ret < 0) + return ret; + + ret = avcodec_receive_packet(avctx, pkt); + if (!ret) + *got_packet = 1; + if (ret == AVERROR(EAGAIN)) + return 0; + + return ret; +} + + + +int avcodec_encode_jpeg(AVCodecContext *pCodecCtx, AVFrame *pFrame,AVPacket *packet) { + AVCodec *jpegCodec = avcodec_find_encoder(AV_CODEC_ID_MJPEG); + int ret = -1; + + if (!jpegCodec) { + return ret; + } + + AVCodecContext *jpegContext = avcodec_alloc_context3(jpegCodec); + if (!jpegContext) { + jpegCodec = NULL; + return ret; + } + + jpegContext->pix_fmt = pCodecCtx->pix_fmt; + jpegContext->height = pFrame->height; + jpegContext->width = pFrame->width; + jpegContext->time_base= (AVRational){1,25}; + + ret = avcodec_open2(jpegContext, jpegCodec, NULL); + + if (ret < 0) { + goto error; + } + + int gotFrame; + + ret = encode(jpegContext, packet, &gotFrame, pFrame); + if (ret < 0) { + goto error; + } + + error: + avcodec_close(jpegContext); + avcodec_free_context(&jpegContext); + jpegCodec = NULL; + return ret; +} + diff --git a/cgo/ffmpeg/ffmpeg.go b/cgo/ffmpeg/ffmpeg.go index 813dd618..1cd49db3 100644 --- a/cgo/ffmpeg/ffmpeg.go +++ b/cgo/ffmpeg/ffmpeg.go @@ -1,10 +1,12 @@ package ffmpeg /* -#cgo LDFLAGS: -lavformat -lavutil -lavcodec -lavresample -lswscale +#cgo LDFLAGS: -lavformat -lavutil -lavcodec -lswresample -lswscale #include "ffmpeg.h" void ffinit() { + #if LIBAVCODEC_VERSION_INT < AV_VERSION_INT(58, 9, 100) av_register_all(); + #endif } */ import "C" @@ -14,15 +16,15 @@ import ( ) const ( - QUIET = int(C.AV_LOG_QUIET) - PANIC = int(C.AV_LOG_PANIC) - FATAL = int(C.AV_LOG_FATAL) - ERROR = int(C.AV_LOG_ERROR) + QUIET = int(C.AV_LOG_QUIET) + PANIC = int(C.AV_LOG_PANIC) + FATAL = int(C.AV_LOG_FATAL) + ERROR = int(C.AV_LOG_ERROR) WARNING = int(C.AV_LOG_WARNING) - INFO = int(C.AV_LOG_INFO) + INFO = int(C.AV_LOG_INFO) VERBOSE = int(C.AV_LOG_VERBOSE) - DEBUG = int(C.AV_LOG_DEBUG) - TRACE = int(C.AV_LOG_TRACE) + DEBUG = int(C.AV_LOG_DEBUG) + TRACE = int(C.AV_LOG_TRACE) ) func HasEncoder(name string) bool { @@ -71,4 +73,3 @@ func freeFFCtx(self *ffctx) { C.av_dict_free(&ff.options) } } - diff --git a/cgo/ffmpeg/ffmpeg.h b/cgo/ffmpeg/ffmpeg.h index 4dabdf70..a5511c87 100644 --- a/cgo/ffmpeg/ffmpeg.h +++ b/cgo/ffmpeg/ffmpeg.h @@ -1,8 +1,7 @@ - #include #include #include -#include +#include #include #include #include @@ -15,6 +14,7 @@ typedef struct { int profile; } FFCtx; + static inline int avcodec_profile_name_to_int(AVCodec *codec, const char *name) { const AVProfile *p; for (p = codec->profiles; p != NULL && p->profile != FF_PROFILE_UNKNOWN; p++) @@ -22,4 +22,6 @@ static inline int avcodec_profile_name_to_int(AVCodec *codec, const char *name) return p->profile; return FF_PROFILE_UNKNOWN; } - +int encode(AVCodecContext *avctx, AVPacket *pkt, int *got_packet, AVFrame *frame); +int decode(AVCodecContext *avctx, AVFrame *frame, uint8_t *data, int size, int *got_frame); +int avcodec_encode_jpeg(AVCodecContext *pCodecCtx, AVFrame *pFrame,AVPacket *packet); diff --git a/cgo/ffmpeg/video.go b/cgo/ffmpeg/video.go index 085b229a..468ae1e5 100644 --- a/cgo/ffmpeg/video.go +++ b/cgo/ffmpeg/video.go @@ -1,25 +1,21 @@ package ffmpeg -/* -#include "ffmpeg.h" -int wrap_avcodec_decode_video2(AVCodecContext *ctx, AVFrame *frame, void *data, int size, int *got) { - struct AVPacket pkt = {.data = data, .size = size}; - return avcodec_decode_video2(ctx, frame, got, &pkt); -} -*/ +//#cgo LDFLAGS: -lavformat -lavutil -lavcodec -lavresample -lswscale +// #include "ffmpeg.h" import "C" import ( - "unsafe" - "runtime" "fmt" "image" "reflect" - "github.com/nareix/joy4/av" - "github.com/nareix/joy4/codec/h264parser" + "runtime" + "unsafe" + + "github.com/Danile71/joy4/av" + "github.com/Danile71/joy4/codec/h264parser" ) type VideoDecoder struct { - ff *ffctx + ff *ffctx Extradata []byte } @@ -46,12 +42,13 @@ func fromCPtr(buf unsafe.Pointer, size int) (ret []uint8) { type VideoFrame struct { Image image.YCbCr - frame *C.AVFrame + Raw []byte + Size int } func (self *VideoFrame) Free() { self.Image = image.YCbCr{} - C.av_frame_free(&self.frame) + self.Raw = make([]byte, 0) } func freeVideoFrame(self *VideoFrame) { @@ -63,7 +60,60 @@ func (self *VideoDecoder) Decode(pkt []byte) (img *VideoFrame, err error) { cgotimg := C.int(0) frame := C.av_frame_alloc() - cerr := C.wrap_avcodec_decode_video2(ff.codecCtx, frame, unsafe.Pointer(&pkt[0]), C.int(len(pkt)), &cgotimg) + defer C.av_frame_free(&frame) + + cerr := C.decode(ff.codecCtx, frame, (*C.uchar)(unsafe.Pointer(&pkt[0])), C.int(len(pkt)), &cgotimg) + + if cerr < C.int(0) { + err = fmt.Errorf("ffmpeg: decode failed: %d", cerr) + return + } + + if cgotimg != C.int(0) { + w := int(frame.width) + h := int(frame.height) + ys := int(frame.linesize[0]) + cs := int(frame.linesize[1]) + + img = &VideoFrame{Image: image.YCbCr{ + Y: fromCPtr(unsafe.Pointer(frame.data[0]), ys*h), + Cb: fromCPtr(unsafe.Pointer(frame.data[1]), cs*h/2), + Cr: fromCPtr(unsafe.Pointer(frame.data[2]), cs*h/2), + YStride: ys, + CStride: cs, + SubsampleRatio: image.YCbCrSubsampleRatio420, + Rect: image.Rect(0, 0, w, h), + }} + + runtime.SetFinalizer(img, freeVideoFrame) + + packet := C.AVPacket{} + defer C.av_packet_unref(&packet) + + cerr := C.avcodec_encode_jpeg(ff.codecCtx, frame, &packet) + + if cerr != C.int(0) { + err = fmt.Errorf("ffmpeg: avcodec_encode_jpeg failed: %d", cerr) + return + } + + img.Size = int(packet.size) + img.Raw = make([]byte, img.Size) + copy(img.Raw, *(*[]byte)(unsafe.Pointer(&packet.data))) + } + + return +} + +func (self *VideoDecoder) DecodeBac(pkt []byte) (img *VideoFrame, err error) { + ff := &self.ff.ff + + cgotimg := C.int(0) + frame := C.av_frame_alloc() + defer C.av_frame_free(&frame) + + cerr := C.decode(ff.codecCtx, frame, (*C.uchar)(unsafe.Pointer(&pkt[0])), C.int(len(pkt)), &cgotimg) + if cerr < C.int(0) { err = fmt.Errorf("ffmpeg: avcodec_decode_video2 failed: %d", cerr) return @@ -76,15 +126,29 @@ func (self *VideoDecoder) Decode(pkt []byte) (img *VideoFrame, err error) { cs := int(frame.linesize[1]) img = &VideoFrame{Image: image.YCbCr{ - Y: fromCPtr(unsafe.Pointer(frame.data[0]), ys*h), - Cb: fromCPtr(unsafe.Pointer(frame.data[1]), cs*h/2), - Cr: fromCPtr(unsafe.Pointer(frame.data[2]), cs*h/2), - YStride: ys, - CStride: cs, + Y: fromCPtr(unsafe.Pointer(frame.data[0]), ys*h), + Cb: fromCPtr(unsafe.Pointer(frame.data[1]), cs*h/2), + Cr: fromCPtr(unsafe.Pointer(frame.data[2]), cs*h/2), + YStride: ys, + CStride: cs, SubsampleRatio: image.YCbCrSubsampleRatio420, - Rect: image.Rect(0, 0, w, h), - }, frame: frame} + Rect: image.Rect(0, 0, w, h), + }} runtime.SetFinalizer(img, freeVideoFrame) + + packet := C.AVPacket{} + defer C.av_packet_unref(&packet) + + cerr := C.avcodec_encode_jpeg(ff.codecCtx, frame, &packet) + + if cerr != C.int(0) { + err = fmt.Errorf("ffmpeg: avcodec_encode_jpeg failed: %d", cerr) + return + } + + img.Size = int(packet.size) + img.Raw = make([]byte, img.Size) + copy(img.Raw, *(*[]byte)(unsafe.Pointer(&packet.data))) } return @@ -114,11 +178,10 @@ func NewVideoDecoder(stream av.CodecData) (dec *VideoDecoder, err error) { if _dec.ff, err = newFFCtxByCodec(c); err != nil { return } - if err = _dec.Setup(); err != nil { + if err = _dec.Setup(); err != nil { return } dec = _dec return } - diff --git a/codec/aacparser/parser.go b/codec/aacparser/parser.go index 6432574a..2bfa6bff 100644 --- a/codec/aacparser/parser.go +++ b/codec/aacparser/parser.go @@ -1,12 +1,13 @@ package aacparser import ( - "github.com/nareix/joy4/utils/bits" - "github.com/nareix/joy4/av" - "time" - "fmt" "bytes" + "fmt" "io" + "time" + + "github.com/Danile71/joy4/av" + "github.com/Danile71/joy4/utils/bits" ) // copied from libavcodec/mpeg4audio.h @@ -83,12 +84,12 @@ These are the channel configurations: var chanConfigTable = []av.ChannelLayout{ 0, av.CH_FRONT_CENTER, - av.CH_FRONT_LEFT|av.CH_FRONT_RIGHT, - av.CH_FRONT_CENTER|av.CH_FRONT_LEFT|av.CH_FRONT_RIGHT, - av.CH_FRONT_CENTER|av.CH_FRONT_LEFT|av.CH_FRONT_RIGHT|av.CH_BACK_CENTER, - av.CH_FRONT_CENTER|av.CH_FRONT_LEFT|av.CH_FRONT_RIGHT|av.CH_BACK_LEFT|av.CH_BACK_RIGHT, - av.CH_FRONT_CENTER|av.CH_FRONT_LEFT|av.CH_FRONT_RIGHT|av.CH_BACK_LEFT|av.CH_BACK_RIGHT|av.CH_LOW_FREQ, - av.CH_FRONT_CENTER|av.CH_FRONT_LEFT|av.CH_FRONT_RIGHT|av.CH_SIDE_LEFT|av.CH_SIDE_RIGHT|av.CH_BACK_LEFT|av.CH_BACK_RIGHT|av.CH_LOW_FREQ, + av.CH_FRONT_LEFT | av.CH_FRONT_RIGHT, + av.CH_FRONT_CENTER | av.CH_FRONT_LEFT | av.CH_FRONT_RIGHT, + av.CH_FRONT_CENTER | av.CH_FRONT_LEFT | av.CH_FRONT_RIGHT | av.CH_BACK_CENTER, + av.CH_FRONT_CENTER | av.CH_FRONT_LEFT | av.CH_FRONT_RIGHT | av.CH_BACK_LEFT | av.CH_BACK_RIGHT, + av.CH_FRONT_CENTER | av.CH_FRONT_LEFT | av.CH_FRONT_RIGHT | av.CH_BACK_LEFT | av.CH_BACK_RIGHT | av.CH_LOW_FREQ, + av.CH_FRONT_CENTER | av.CH_FRONT_LEFT | av.CH_FRONT_RIGHT | av.CH_SIDE_LEFT | av.CH_SIDE_RIGHT | av.CH_BACK_LEFT | av.CH_BACK_RIGHT | av.CH_LOW_FREQ, } func ParseADTSHeader(frame []byte) (config MPEG4AudioConfig, hdrlen int, framelen int, samples int, err error) { @@ -266,7 +267,7 @@ func WriteMPEG4AudioConfig(w io.Writer, config MPEG4AudioConfig) (err error) { type CodecData struct { ConfigBytes []byte - Config MPEG4AudioConfig + Config MPEG4AudioConfig } func (self CodecData) Type() av.CodecType { @@ -308,4 +309,3 @@ func NewCodecDataFromMPEG4AudioConfigBytes(config []byte) (self CodecData, err e } return } - diff --git a/codec/codec.go b/codec/codec.go index d37df77c..7f46aaf6 100644 --- a/codec/codec.go +++ b/codec/codec.go @@ -1,9 +1,10 @@ package codec import ( - "github.com/nareix/joy4/av" - "github.com/nareix/joy4/codec/fake" "time" + + "github.com/Danile71/joy4/av" + "github.com/Danile71/joy4/codec/fake" ) type PCMUCodecData struct { @@ -50,7 +51,7 @@ func (self SpeexCodecData) PacketDuration(data []byte) (time.Duration, error) { // libavcodec/libspeexdec.c // samples = samplerate/50 // duration = 0.02s - return time.Millisecond*20, nil + return time.Millisecond * 20, nil } func NewSpeexCodecData(sr int, cl av.ChannelLayout) SpeexCodecData { @@ -61,4 +62,3 @@ func NewSpeexCodecData(sr int, cl av.ChannelLayout) SpeexCodecData { codec.ChannelLayout_ = cl return codec } - diff --git a/codec/fake/fake.go b/codec/fake/fake.go index 51e056f4..bef77bfa 100644 --- a/codec/fake/fake.go +++ b/codec/fake/fake.go @@ -1,13 +1,13 @@ package fake import ( - "github.com/nareix/joy4/av" + "github.com/Danile71/joy4/av" ) type CodecData struct { - CodecType_ av.CodecType - SampleRate_ int - SampleFormat_ av.SampleFormat + CodecType_ av.CodecType + SampleRate_ int + SampleFormat_ av.SampleFormat ChannelLayout_ av.ChannelLayout } @@ -26,4 +26,3 @@ func (self CodecData) ChannelLayout() av.ChannelLayout { func (self CodecData) SampleRate() int { return self.SampleRate_ } - diff --git a/codec/h264parser/parser.go b/codec/h264parser/parser.go index 35c8d837..7f28adb3 100644 --- a/codec/h264parser/parser.go +++ b/codec/h264parser/parser.go @@ -1,12 +1,12 @@ - package h264parser import ( - "github.com/nareix/joy4/av" - "github.com/nareix/joy4/utils/bits" - "github.com/nareix/joy4/utils/bits/pio" - "fmt" "bytes" + "fmt" + + "github.com/Danile71/joy4/av" + "github.com/Danile71/joy4/utils/bits" + "github.com/Danile71/joy4/utils/bits/pio" ) const ( @@ -131,7 +131,7 @@ Annex B is commonly used in live and streaming formats such as transport streams 2. AVCC The other common method of storing an H.264 stream is the AVCC format. In this format, each NALU is preceded with its length (in big endian format). This method is easier to parse, but you lose the byte alignment features of Annex B. Just to complicate things, the length may be encoded using 1, 2 or 4 bytes. This value is stored in a header object. This header is often called ‘extradata’ or ‘sequence header’. Its basic format is as follows: -bits +bits 8 version ( always 0x01 ) 8 avc profile ( sps[0][1] ) 8 avc compatibility ( sps[0][2] ) @@ -199,8 +199,8 @@ Additionally, there is a new variable called NALULengthSizeMinusOne. This confus An advantage to this format is the ability to configure the decoder at the start and jump into the middle of a stream. This is a common use case where the media is available on a random access medium such as a hard drive, and is therefore used in common container formats such as MP4 and MKV. */ -var StartCodeBytes = []byte{0,0,1} -var AUDBytes = []byte{0,0,0,1,0x9,0xf0,0,0,0,1} // AUD +var StartCodeBytes = []byte{0, 0, 1} +var AUDBytes = []byte{0, 0, 0, 1, 0x9, 0xf0, 0, 0, 0, 1} // AUD func CheckNALUsType(b []byte) (typ int) { _, typ = SplitNALUs(b) @@ -499,9 +499,9 @@ func ParseSPS(data []byte) (self SPSInfo, err error) { } type CodecData struct { - Record []byte + Record []byte RecordInfo AVCDecoderConfRecord - SPSInfo SPSInfo + SPSInfo SPSInfo } func (self CodecData) Type() av.CodecType { @@ -589,8 +589,8 @@ func (self *AVCDecoderConfRecord) Unmarshal(b []byte) (n int, err error) { self.AVCProfileIndication = b[1] self.ProfileCompatibility = b[2] self.AVCLevelIndication = b[3] - self.LengthSizeMinusOne = b[4]&0x03 - spscount := int(b[5]&0x1f) + self.LengthSizeMinusOne = b[4] & 0x03 + spscount := int(b[5] & 0x1f) n += 6 for i := 0; i < spscount; i++ { @@ -638,10 +638,10 @@ func (self *AVCDecoderConfRecord) Unmarshal(b []byte) (n int, err error) { func (self AVCDecoderConfRecord) Len() (n int) { n = 7 for _, sps := range self.SPS { - n += 2+len(sps) + n += 2 + len(sps) } for _, pps := range self.PPS { - n += 2+len(pps) + n += 2 + len(pps) } return } @@ -651,8 +651,8 @@ func (self AVCDecoderConfRecord) Marshal(b []byte) (n int) { b[1] = self.AVCProfileIndication b[2] = self.ProfileCompatibility b[3] = self.AVCLevelIndication - b[4] = self.LengthSizeMinusOne|0xfc - b[5] = uint8(len(self.SPS))|0xe0 + b[4] = self.LengthSizeMinusOne | 0xfc + b[5] = uint8(len(self.SPS)) | 0xe0 n += 6 for _, sps := range self.SPS { @@ -690,7 +690,7 @@ func (self SliceType) String() string { } const ( - SLICE_P = iota+1 + SLICE_P = iota + 1 SLICE_B SLICE_I ) @@ -702,9 +702,9 @@ func ParseSliceHeaderFromNALU(packet []byte) (sliceType SliceType, err error) { return } - nal_unit_type := packet[0]&0x1f + nal_unit_type := packet[0] & 0x1f switch nal_unit_type { - case 1,2,5,19: + case 1, 2, 5, 19: // slice_layer_without_partitioning_rbsp // slice_data_partition_a_layer_rbsp @@ -727,11 +727,11 @@ func ParseSliceHeaderFromNALU(packet []byte) (sliceType SliceType, err error) { } switch u { - case 0,3,5,8: + case 0, 3, 5, 8: sliceType = SLICE_P - case 1,6: + case 1, 6: sliceType = SLICE_B - case 2,4,7,9: + case 2, 4, 7, 9: sliceType = SLICE_I default: err = fmt.Errorf("h264parser: slice_type=%d invalid", u) @@ -740,4 +740,3 @@ func ParseSliceHeaderFromNALU(packet []byte) (sliceType SliceType, err error) { return } - diff --git a/format/mjpeg/client.go b/format/mjpeg/client.go new file mode 100644 index 00000000..bbb7b455 --- /dev/null +++ b/format/mjpeg/client.go @@ -0,0 +1,1247 @@ +package mjpeg + +import ( + "bufio" + "bytes" + "crypto/md5" + "encoding/base64" + "encoding/binary" + "encoding/hex" + "fmt" + "io" + "net" + "net/textproto" + "net/url" + "strconv" + "strings" + "time" + + "github.com/Danile71/joy4/av" + "github.com/Danile71/joy4/av/avutil" + "github.com/Danile71/joy4/codec" + "github.com/Danile71/joy4/codec/aacparser" + "github.com/Danile71/joy4/codec/h264parser" + "github.com/Danile71/joy4/format/rtsp/sdp" + "github.com/Danile71/joy4/utils/bits/pio" +) + +var ErrCodecDataChange = fmt.Errorf("rtsp: codec data change, please call HandleCodecDataChange()") + +var DebugRtp = false +var DebugRtsp = false +var SkipErrRtpBlock = false + +const ( + stageDescribeDone = iota + 1 + stageSetupDone + stageWaitCodecData + stageCodecDataDone +) + +type Client struct { + DebugRtsp bool + DebugRtp bool + Headers []string + + SkipErrRtpBlock bool + + RtspTimeout time.Duration + RtpTimeout time.Duration + RtpKeepAliveTimeout time.Duration + rtpKeepaliveTimer time.Time + rtpKeepaliveEnterCnt int + + stage int + + setupIdx []int + setupMap []int + + authHeaders func(method string) []string + + url *url.URL + conn *connWithTimeout + brconn *bufio.Reader + requestUri string + cseq uint + streams []*Stream + streamsintf []av.CodecData + session string + body io.Reader +} + +type Request struct { + Header []string + Uri string + Method string +} + +type Response struct { + StatusCode int + Headers textproto.MIMEHeader + ContentLength int + Body []byte + + Block []byte +} + +func DialTimeout(uri string, timeout time.Duration) (self *Client, err error) { + var URL *url.URL + if URL, err = url.Parse(uri); err != nil { + return + } + + if _, _, err := net.SplitHostPort(URL.Host); err != nil { + URL.Host = URL.Host + ":554" + } + + dailer := net.Dialer{Timeout: timeout} + var conn net.Conn + if conn, err = dailer.Dial("tcp", URL.Host); err != nil { + return + } + + u2 := *URL + u2.User = nil + + connt := &connWithTimeout{Conn: conn} + + self = &Client{ + conn: connt, + brconn: bufio.NewReaderSize(connt, 256), + url: URL, + requestUri: u2.String(), + DebugRtp: DebugRtp, + DebugRtsp: DebugRtsp, + SkipErrRtpBlock: SkipErrRtpBlock, + } + return +} + +func Dial(uri string) (self *Client, err error) { + return DialTimeout(uri, 0) +} + +func (self *Client) allCodecDataReady() bool { + for _, si := range self.setupIdx { + stream := self.streams[si] + if stream.CodecData == nil { + return false + } + } + return true +} + +func (self *Client) probe() (err error) { + for { + if self.allCodecDataReady() { + break + } + if _, err = self.readPacket(); err != nil { + return + } + } + self.stage = stageCodecDataDone + return +} + +func (self *Client) prepare(stage int) (err error) { + for self.stage < stage { + switch self.stage { + case 0: + if _, err = self.Describe(); err != nil { + return + } + + case stageDescribeDone: + if err = self.SetupAll(); err != nil { + return + } + + case stageSetupDone: + if err = self.Play(); err != nil { + return + } + + case stageWaitCodecData: + if err = self.probe(); err != nil { + return + } + } + } + return +} + +func (self *Client) Streams() (streams []av.CodecData, err error) { + if err = self.prepare(stageCodecDataDone); err != nil { + return + } + for _, si := range self.setupIdx { + stream := self.streams[si] + streams = append(streams, stream.CodecData) + } + return +} + +func (self *Client) SendRtpKeepalive() (err error) { + if self.RtpKeepAliveTimeout > 0 { + if self.rtpKeepaliveTimer.IsZero() { + self.rtpKeepaliveTimer = time.Now() + } else if time.Now().Sub(self.rtpKeepaliveTimer) > self.RtpKeepAliveTimeout { + self.rtpKeepaliveTimer = time.Now() + if self.DebugRtsp { + fmt.Println("rtp: keep alive") + } + req := Request{ + Method: "OPTIONS", + Uri: self.requestUri, + } + if err = self.WriteRequest(req); err != nil { + return + } + } + } + return +} + +func (self *Client) WriteRequest(req Request) (err error) { + self.conn.Timeout = self.RtspTimeout + self.cseq++ + + buf := &bytes.Buffer{} + + fmt.Fprintf(buf, "%s %s RTSP/1.0\r\n", req.Method, req.Uri) + fmt.Fprintf(buf, "CSeq: %d\r\n", self.cseq) + + if self.authHeaders != nil { + headers := self.authHeaders(req.Method) + for _, s := range headers { + io.WriteString(buf, s) + io.WriteString(buf, "\r\n") + } + } + for _, s := range req.Header { + io.WriteString(buf, s) + io.WriteString(buf, "\r\n") + } + for _, s := range self.Headers { + io.WriteString(buf, s) + io.WriteString(buf, "\r\n") + } + io.WriteString(buf, "\r\n") + + bufout := buf.Bytes() + + if self.DebugRtsp { + fmt.Print("> ", string(bufout)) + } + + if _, err = self.conn.Write(bufout); err != nil { + return + } + + return +} + +func (self *Client) parseBlockHeader(h []byte) (length int, no int, valid bool) { + length = int(h[2])<<8 + int(h[3]) + no = int(h[1]) + if no/2 >= len(self.streams) { + return + } + + if no%2 == 0 { // rtp + if length < 8 { + return + } + + // V=2 + if h[4]&0xc0 != 0x80 { + return + } + + stream := self.streams[no/2] + if int(h[5]&0x7f) != stream.Sdp.PayloadType { + return + } + + timestamp := binary.BigEndian.Uint32(h[8:12]) + if stream.firsttimestamp != 0 { + timestamp -= stream.firsttimestamp + if timestamp < stream.timestamp { + return + } else if timestamp-stream.timestamp > uint32(stream.timeScale()*60*60) { + return + } + } + } else { // rtcp + } + + valid = true + return +} + +func (self *Client) parseHeaders(b []byte) (statusCode int, headers textproto.MIMEHeader, err error) { + var line string + r := textproto.NewReader(bufio.NewReader(bytes.NewReader(b))) + if line, err = r.ReadLine(); err != nil { + err = fmt.Errorf("rtsp: header invalid") + return + } + + if codes := strings.Split(line, " "); len(codes) >= 2 { + if statusCode, err = strconv.Atoi(codes[1]); err != nil { + err = fmt.Errorf("rtsp: header invalid: %s", err) + return + } + } + + headers, _ = r.ReadMIMEHeader() + return +} + +func (self *Client) handleResp(res *Response) (err error) { + if sess := res.Headers.Get("Session"); sess != "" && self.session == "" { + if fields := strings.Split(sess, ";"); len(fields) > 0 { + self.session = fields[0] + } + } + if res.StatusCode == 401 { + if err = self.handle401(res); err != nil { + return + } + } + return +} + +func (self *Client) handle401(res *Response) (err error) { + /* + RTSP/1.0 401 Unauthorized + CSeq: 2 + Date: Wed, May 04 2016 10:10:51 GMT + WWW-Authenticate: Digest realm="LIVE555 Streaming Media", nonce="c633aaf8b83127633cbe98fac1d20d87" + */ + authval := res.Headers.Get("WWW-Authenticate") + hdrval := strings.SplitN(authval, " ", 2) + var realm, nonce string + + if len(hdrval) == 2 { + for _, field := range strings.Split(hdrval[1], ",") { + field = strings.Trim(field, ", ") + if keyval := strings.Split(field, "="); len(keyval) == 2 { + key := keyval[0] + val := strings.Trim(keyval[1], `"`) + switch key { + case "realm": + realm = val + case "nonce": + nonce = val + } + } + } + + if realm != "" { + var username string + var password string + + if self.url.User == nil { + err = fmt.Errorf("rtsp: no username") + return + } + username = self.url.User.Username() + password, _ = self.url.User.Password() + + self.authHeaders = func(method string) []string { + var headers []string + if nonce == "" { + headers = []string{ + fmt.Sprintf(`Authorization: Basic %s`, base64.StdEncoding.EncodeToString([]byte(username+":"+password))), + } + } else { + hs1 := md5hash(username + ":" + realm + ":" + password) + hs2 := md5hash(method + ":" + self.requestUri) + response := md5hash(hs1 + ":" + nonce + ":" + hs2) + headers = []string{fmt.Sprintf( + `Authorization: Digest username="%s", realm="%s", nonce="%s", uri="%s", response="%s"`, + username, realm, nonce, self.requestUri, response)} + } + return headers + } + } + } + + return +} + +func (self *Client) findRTSP() (block []byte, data []byte, err error) { + const ( + R = iota + 1 + T + S + Header + Dollar + ) + var _peek [8]byte + peek := _peek[0:0] + stat := 0 + + for i := 0; ; i++ { + var b byte + if b, err = self.brconn.ReadByte(); err != nil { + return + } + switch b { + case 'R': + if stat == 0 { + stat = R + } + case 'T': + if stat == R { + stat = T + } + case 'S': + if stat == T { + stat = S + } + case 'P': + if stat == S { + stat = Header + } + case '$': + if stat != Dollar { + stat = Dollar + peek = _peek[0:0] + } + default: + if stat != Dollar { + stat = 0 + peek = _peek[0:0] + } + } + + if false && self.DebugRtp { + fmt.Println("rtsp: findRTSP", i, b) + } + + if stat != 0 { + peek = append(peek, b) + } + if stat == Header { + data = peek + return + } + + if stat == Dollar && len(peek) >= 12 { + if self.DebugRtp { + fmt.Println("rtsp: dollar at", i, len(peek)) + } + if blocklen, _, ok := self.parseBlockHeader(peek); ok { + left := blocklen + 4 - len(peek) + + if left <= 0 { + return + } + + block = append(peek, make([]byte, left)...) + if _, err = io.ReadFull(self.brconn, block[len(peek):]); err != nil { + return + } + return + } + stat = 0 + peek = _peek[0:0] + } + } + + return +} + +func (self *Client) readLFLF() (block []byte, data []byte, err error) { + const ( + LF = iota + 1 + LFLF + ) + peek := []byte{} + stat := 0 + dollarpos := -1 + lpos := 0 + pos := 0 + + for { + var b byte + if b, err = self.brconn.ReadByte(); err != nil { + return + } + switch b { + case '\n': + if stat == 0 { + stat = LF + lpos = pos + } else if stat == LF { + if pos-lpos <= 2 { + stat = LFLF + } else { + lpos = pos + } + } + case '$': + dollarpos = pos + } + peek = append(peek, b) + + if stat == LFLF { + data = peek + return + } else if dollarpos != -1 && dollarpos-pos >= 12 { + hdrlen := dollarpos - pos + start := len(peek) - hdrlen + if blocklen, _, ok := self.parseBlockHeader(peek[start:]); ok { + block = append(peek[start:], make([]byte, blocklen+4-hdrlen)...) + if _, err = io.ReadFull(self.brconn, block[hdrlen:]); err != nil { + return + } + return + } + dollarpos = -1 + } + + pos++ + } + + return +} + +func (self *Client) readResp(b []byte) (res Response, err error) { + if res.StatusCode, res.Headers, err = self.parseHeaders(b); err != nil { + return + } + res.ContentLength, _ = strconv.Atoi(res.Headers.Get("Content-Length")) + if res.ContentLength > 0 { + res.Body = make([]byte, res.ContentLength) + if _, err = io.ReadFull(self.brconn, res.Body); err != nil { + return + } + } + if err = self.handleResp(&res); err != nil { + return + } + return +} + +func (self *Client) poll() (res Response, err error) { + var block []byte + var rtsp []byte + var headers []byte + + self.conn.Timeout = self.RtspTimeout + for { + if block, rtsp, err = self.findRTSP(); err != nil { + return + } + if len(block) > 0 { + res.Block = block + return + } else { + if block, headers, err = self.readLFLF(); err != nil { + return + } + if len(block) > 0 { + res.Block = block + return + } + if res, err = self.readResp(append(rtsp, headers...)); err != nil { + return + } + } + return + } + + return +} + +func (self *Client) ReadResponse() (res Response, err error) { + for { + if res, err = self.poll(); err != nil { + return + } + if res.StatusCode > 0 { + return + } + } + return +} + +func (self *Client) SetupAll() (err error) { + idx := []int{} + for i := range self.streams { + idx = append(idx, i) + } + return self.Setup(idx) +} + +func (self *Client) Setup(idx []int) (err error) { + if err = self.prepare(stageDescribeDone); err != nil { + return + } + + self.setupMap = make([]int, len(self.streams)) + for i := range self.setupMap { + self.setupMap[i] = -1 + } + self.setupIdx = idx + + for i, si := range idx { + self.setupMap[si] = i + + uri := "" + control := self.streams[si].Sdp.Control + if strings.HasPrefix(control, "rtsp://") { + uri = control + } else { + uri = self.requestUri + "/" + control + } + req := Request{Method: "SETUP", Uri: uri} + req.Header = append(req.Header, fmt.Sprintf("Transport: RTP/AVP/TCP;unicast;interleaved=%d-%d", si*2, si*2+1)) + if self.session != "" { + req.Header = append(req.Header, "Session: "+self.session) + } + if err = self.WriteRequest(req); err != nil { + return + } + if _, err = self.ReadResponse(); err != nil { + return + } + } + + if self.stage == stageDescribeDone { + self.stage = stageSetupDone + } + return +} + +func md5hash(s string) string { + h := md5.Sum([]byte(s)) + return hex.EncodeToString(h[:]) +} + +func (self *Client) Describe() (streams []sdp.Media, err error) { + var res Response + + for i := 0; i < 2; i++ { + req := Request{ + Method: "DESCRIBE", + Uri: self.requestUri, + Header: []string{"Accept: application/sdp"}, + } + if err = self.WriteRequest(req); err != nil { + return + } + if res, err = self.ReadResponse(); err != nil { + return + } + if res.StatusCode == 200 { + break + } + } + if res.ContentLength == 0 { + err = fmt.Errorf("rtsp: Describe failed, StatusCode=%d", res.StatusCode) + return + } + + body := string(res.Body) + + if self.DebugRtsp { + fmt.Println("<", body) + } + + _, medias := sdp.Parse(body) + + self.streams = []*Stream{} + for _, media := range medias { + stream := &Stream{Sdp: media, client: self} + stream.makeCodecData() + self.streams = append(self.streams, stream) + streams = append(streams, media) + } + + if self.stage == 0 { + self.stage = stageDescribeDone + } + return +} + +func (self *Client) Options() (err error) { + req := Request{ + Method: "OPTIONS", + Uri: self.requestUri, + } + if self.session != "" { + req.Header = append(req.Header, "Session: "+self.session) + } + if err = self.WriteRequest(req); err != nil { + return + } + if _, err = self.ReadResponse(); err != nil { + return + } + return +} + +func (self *Client) HandleCodecDataChange() (_newcli *Client, err error) { + newcli := &Client{} + *newcli = *self + + newcli.streams = []*Stream{} + for _, stream := range self.streams { + newstream := &Stream{} + *newstream = *stream + newstream.client = newcli + + if newstream.isCodecDataChange() { + if err = newstream.makeCodecData(); err != nil { + return + } + newstream.clearCodecDataChange() + } + newcli.streams = append(newcli.streams, newstream) + } + + _newcli = newcli + return +} + +func (self *Stream) clearCodecDataChange() { + self.spsChanged = false + self.ppsChanged = false +} + +func (self *Stream) isCodecDataChange() bool { + if self.spsChanged && self.ppsChanged { + return true + } + return false +} + +func (self *Stream) timeScale() int { + t := self.Sdp.TimeScale + if t == 0 { + // https://tools.ietf.org/html/rfc5391 + t = 8000 + } + return t +} + +func (self *Stream) makeCodecData() (err error) { + media := self.Sdp + + if media.PayloadType >= 96 && media.PayloadType <= 127 { + switch media.Type { + case av.H264: + for _, nalu := range media.SpropParameterSets { + if len(nalu) > 0 { + self.handleH264Payload(0, nalu) + } + } + + if len(self.sps) == 0 || len(self.pps) == 0 { + if nalus, typ := h264parser.SplitNALUs(media.Config); typ != h264parser.NALU_RAW { + for _, nalu := range nalus { + if len(nalu) > 0 { + self.handleH264Payload(0, nalu) + } + } + } + } + + if len(self.sps) > 0 && len(self.pps) > 0 { + if self.CodecData, err = h264parser.NewCodecDataFromSPSAndPPS(self.sps, self.pps); err != nil { + err = fmt.Errorf("rtsp: h264 sps/pps invalid: %s", err) + return + } + } else { + err = fmt.Errorf("rtsp: missing h264 sps or pps") + return + } + + case av.AAC: + if len(media.Config) == 0 { + err = fmt.Errorf("rtsp: aac sdp config missing") + return + } + if self.CodecData, err = aacparser.NewCodecDataFromMPEG4AudioConfigBytes(media.Config); err != nil { + err = fmt.Errorf("rtsp: aac sdp config invalid: %s", err) + return + } + } + } else { + switch media.PayloadType { + case 0: + self.CodecData = codec.NewPCMMulawCodecData() + + case 8: + self.CodecData = codec.NewPCMAlawCodecData() + + default: + err = fmt.Errorf("rtsp: PayloadType=%d unsupported", media.PayloadType) + return + } + } + + return +} + +func (self *Stream) handleBuggyAnnexbH264Packet(timestamp uint32, packet []byte) (isBuggy bool, err error) { + if len(packet) >= 4 && packet[0] == 0 && packet[1] == 0 && packet[2] == 0 && packet[3] == 1 { + isBuggy = true + if nalus, typ := h264parser.SplitNALUs(packet); typ != h264parser.NALU_RAW { + for _, nalu := range nalus { + if len(nalu) > 0 { + if err = self.handleH264Payload(timestamp, nalu); err != nil { + return + } + } + } + } + } + return +} + +func (self *Stream) handleH264Payload(timestamp uint32, packet []byte) (err error) { + if len(packet) < 2 { + err = fmt.Errorf("rtp: h264 packet too short") + return + } + + var isBuggy bool + if isBuggy, err = self.handleBuggyAnnexbH264Packet(timestamp, packet); isBuggy { + return + } + + naluType := packet[0] & 0x1f + + /* + Table 7-1 – NAL unit type codes + 1 Coded slice of a non-IDR picture + 5 Coded slice of an IDR picture + 6 Supplemental enhancement information (SEI) + 7 Sequence parameter set + 8 Picture parameter set + 1-23 NAL unit Single NAL unit packet 5.6 + 24 STAP-A Single-time aggregation packet 5.7.1 + 25 STAP-B Single-time aggregation packet 5.7.1 + 26 MTAP16 Multi-time aggregation packet 5.7.2 + 27 MTAP24 Multi-time aggregation packet 5.7.2 + 28 FU-A Fragmentation unit 5.8 + 29 FU-B Fragmentation unit 5.8 + 30-31 reserved - + */ + + self.pkt.FrameType = 123 + switch { + case naluType >= 1 && naluType <= 5: + + self.pkt.FrameType = packet[4] + + self.gotpkt = true + // raw nalu to avcc + b := make([]byte, 4+len(packet)) + pio.PutU32BE(b[0:4], uint32(len(packet))) + copy(b[4:], packet) + + self.pkt.Data = b + self.timestamp = timestamp + + case naluType == 7: // sps + if self.client != nil && self.client.DebugRtp { + fmt.Println("rtsp: got sps") + } + if len(self.sps) == 0 { + self.sps = packet + self.makeCodecData() + } else if bytes.Compare(self.sps, packet) != 0 { + self.spsChanged = true + self.sps = packet + if self.client != nil && self.client.DebugRtp { + fmt.Println("rtsp: sps changed") + } + } + + case naluType == 8: // pps + if self.client != nil && self.client.DebugRtp { + fmt.Println("rtsp: got pps") + } + if len(self.pps) == 0 { + self.pps = packet + self.makeCodecData() + } else if bytes.Compare(self.pps, packet) != 0 { + self.ppsChanged = true + self.pps = packet + if self.client != nil && self.client.DebugRtp { + fmt.Println("rtsp: pps changed") + } + } + + case naluType == 28: // FU-A + /* + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | FU indicator | FU header | | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | + | | + | FU payload | + | | + | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | :...OPTIONAL RTP padding | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + Figure 14. RTP payload format for FU-A + + The FU indicator octet has the following format: + +---------------+ + |0|1|2|3|4|5|6|7| + +-+-+-+-+-+-+-+-+ + |F|NRI| Type | + +---------------+ + + + The FU header has the following format: + +---------------+ + |0|1|2|3|4|5|6|7| + +-+-+-+-+-+-+-+-+ + |S|E|R| Type | + +---------------+ + + S: 1 bit + When set to one, the Start bit indicates the start of a fragmented + NAL unit. When the following FU payload is not the start of a + fragmented NAL unit payload, the Start bit is set to zero. + + E: 1 bit + When set to one, the End bit indicates the end of a fragmented NAL + unit, i.e., the last byte of the payload is also the last byte of + the fragmented NAL unit. When the following FU payload is not the + last fragment of a fragmented NAL unit, the End bit is set to + zero. + + R: 1 bit + The Reserved bit MUST be equal to 0 and MUST be ignored by the + receiver. + + Type: 5 bits + The NAL unit payload type as defined in table 7-1 of [1]. + */ + fuIndicator := packet[0] + fuHeader := packet[1] + isStart := fuHeader&0x80 != 0 + isEnd := fuHeader&0x40 != 0 + if isStart { + self.fuStarted = true + self.fuBuffer = []byte{fuIndicator&0xe0 | fuHeader&0x1f} + } + if self.fuStarted { + self.fuBuffer = append(self.fuBuffer, packet[2:]...) + if isEnd { + self.fuStarted = false + if err = self.handleH264Payload(timestamp, self.fuBuffer); err != nil { + return + } + } + } + + case naluType == 24: // STAP-A + /* + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | RTP Header | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + |STAP-A NAL HDR | NALU 1 Size | NALU 1 HDR | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | NALU 1 Data | + : : + + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | | NALU 2 Size | NALU 2 HDR | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | NALU 2 Data | + : : + | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | :...OPTIONAL RTP padding | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + Figure 7. An example of an RTP packet including an STAP-A + containing two single-time aggregation units + */ + packet = packet[1:] + for len(packet) >= 2 { + size := int(packet[0])<<8 | int(packet[1]) + if size+2 > len(packet) { + break + } + if err = self.handleH264Payload(timestamp, packet[2:size+2]); err != nil { + return + } + packet = packet[size+2:] + } + return + + case naluType >= 6 && naluType <= 23: // other single NALU packet + case naluType == 25: // STAB-B + case naluType == 26: // MTAP-16 + case naluType == 27: // MTAP-24 + case naluType == 28: // FU-B + + default: + err = fmt.Errorf("rtsp: unsupported H264 naluType=%d", naluType) + return + } + + return +} + +func (self *Stream) handleRtpPacket(packet []byte) (err error) { + if self.isCodecDataChange() { + err = ErrCodecDataChange + return + } + + if self.client != nil && self.client.DebugRtp { + fmt.Println("rtp: packet", self.CodecData.Type(), "len", len(packet)) + dumpsize := len(packet) + if dumpsize > 32 { + dumpsize = 32 + } + fmt.Print(hex.Dump(packet[:dumpsize])) + } + + /* + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + |V=2|P|X| CC |M| PT | sequence number | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | timestamp | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | synchronization source (SSRC) identifier | + +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ + | contributing source (CSRC) identifiers | + | .... | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + */ + if len(packet) < 8 { + err = fmt.Errorf("rtp: packet too short") + return + } + payloadOffset := 12 + int(packet[0]&0xf)*4 + if payloadOffset > len(packet) { + err = fmt.Errorf("rtp: packet too short") + return + } + timestamp := binary.BigEndian.Uint32(packet[4:8]) + payload := packet[payloadOffset:] + + /* + PT Encoding Name Audio/Video (A/V) Clock Rate (Hz) Channels Reference + 0 PCMU A 8000 1 [RFC3551] + 1 Reserved + 2 Reserved + 3 GSM A 8000 1 [RFC3551] + 4 G723 A 8000 1 [Vineet_Kumar][RFC3551] + 5 DVI4 A 8000 1 [RFC3551] + 6 DVI4 A 16000 1 [RFC3551] + 7 LPC A 8000 1 [RFC3551] + 8 PCMA A 8000 1 [RFC3551] + 9 G722 A 8000 1 [RFC3551] + 10 L16 A 44100 2 [RFC3551] + 11 L16 A 44100 1 [RFC3551] + 12 QCELP A 8000 1 [RFC3551] + 13 CN A 8000 1 [RFC3389] + 14 MPA A 90000 [RFC3551][RFC2250] + 15 G728 A 8000 1 [RFC3551] + 16 DVI4 A 11025 1 [Joseph_Di_Pol] + 17 DVI4 A 22050 1 [Joseph_Di_Pol] + 18 G729 A 8000 1 [RFC3551] + 19 Reserved A + 20 Unassigned A + 21 Unassigned A + 22 Unassigned A + 23 Unassigned A + 24 Unassigned V + 25 CelB V 90000 [RFC2029] + 26 JPEG V 90000 [RFC2435] + 27 Unassigned V + 28 nv V 90000 [RFC3551] + 29 Unassigned V + 30 Unassigned V + 31 H261 V 90000 [RFC4587] + 32 MPV V 90000 [RFC2250] + 33 MP2T AV 90000 [RFC2250] + 34 H263 V 90000 [Chunrong_Zhu] + 35-71 Unassigned ? + 72-76 Reserved for RTCP conflict avoidance [RFC3551] + 77-95 Unassigned ? + 96-127 dynamic ? [RFC3551] + */ + //payloadType := packet[1]&0x7f + + switch self.Sdp.Type { + case av.H264: + if err = self.handleH264Payload(timestamp, payload); err != nil { + return + } + + case av.AAC: + if len(payload) < 4 { + err = fmt.Errorf("rtp: aac packet too short") + return + } + payload = payload[4:] // TODO: remove this hack + self.gotpkt = true + self.pkt.Data = payload + self.timestamp = timestamp + + default: + self.gotpkt = true + self.pkt.Data = payload + self.timestamp = timestamp + } + + return +} + +func (self *Client) Play() (err error) { + req := Request{ + Method: "PLAY", + Uri: self.requestUri, + } + req.Header = append(req.Header, "Session: "+self.session) + if err = self.WriteRequest(req); err != nil { + return + } + + if self.allCodecDataReady() { + self.stage = stageCodecDataDone + } else { + self.stage = stageWaitCodecData + } + return +} + +func (self *Client) Teardown() (err error) { + req := Request{ + Method: "TEARDOWN", + Uri: self.requestUri, + } + req.Header = append(req.Header, "Session: "+self.session) + if err = self.WriteRequest(req); err != nil { + return + } + return +} + +func (self *Client) Close() (err error) { + return self.conn.Conn.Close() +} + +func (self *Client) handleBlock(block []byte) (pkt av.Packet, ok bool, err error) { + _, blockno, _ := self.parseBlockHeader(block) + if blockno%2 != 0 { + if self.DebugRtp { + fmt.Println("rtsp: rtcp block len", len(block)-4) + } + return + } + + i := blockno / 2 + if i >= len(self.streams) { + err = fmt.Errorf("rtsp: block no=%d invalid", blockno) + return + } + stream := self.streams[i] + + herr := stream.handleRtpPacket(block[4:]) + if herr != nil { + if !self.SkipErrRtpBlock { + err = herr + return + } + } + + if stream.gotpkt { + /* + TODO: sync AV by rtcp NTP timestamp + TODO: handle timestamp overflow + https://tools.ietf.org/html/rfc3550 + A receiver can then synchronize presentation of the audio and video packets by relating + their RTP timestamps using the timestamp pairs in RTCP SR packets. + */ + if stream.firsttimestamp == 0 { + stream.firsttimestamp = stream.timestamp + } + stream.timestamp -= stream.firsttimestamp + + ok = true + pkt = stream.pkt + pkt.Time = time.Duration(stream.timestamp) * time.Second / time.Duration(stream.timeScale()) + pkt.Idx = int8(self.setupMap[i]) + + if pkt.Time < stream.lasttime || pkt.Time-stream.lasttime > time.Minute*30 { + err = fmt.Errorf("rtp: time invalid stream#%d time=%v lasttime=%v", pkt.Idx, pkt.Time, stream.lasttime) + return + } + stream.lasttime = pkt.Time + + if self.DebugRtp { + fmt.Println("rtp: pktout", pkt.Idx, pkt.Time, len(pkt.Data)) + } + + stream.pkt = av.Packet{} + stream.gotpkt = false + } + + return +} + +func (self *Client) readPacket() (pkt av.Packet, err error) { + if err = self.SendRtpKeepalive(); err != nil { + return + } + + for { + var res Response + for { + if res, err = self.poll(); err != nil { + return + } + if len(res.Block) > 0 { + break + } + } + + var ok bool + if pkt, ok, err = self.handleBlock(res.Block); err != nil { + return + } + if ok { + return + } + } + + return +} + +func (self *Client) ReadPacket() (pkt av.Packet, err error) { + if err = self.prepare(stageCodecDataDone); err != nil { + return + } + return self.readPacket() +} + +func Handler(h *avutil.RegisterHandler) { + h.UrlDemuxer = func(uri string) (ok bool, demuxer av.DemuxCloser, err error) { + if !strings.HasPrefix(uri, "rtsp://") { + return + } + ok = true + demuxer, err = Dial(uri) + return + } +} diff --git a/format/mjpeg/conn.go b/format/mjpeg/conn.go new file mode 100644 index 00000000..10b0594e --- /dev/null +++ b/format/mjpeg/conn.go @@ -0,0 +1,25 @@ +package mjpeg + +import ( + "net" + "time" +) + +type connWithTimeout struct { + Timeout time.Duration + net.Conn +} + +func (self connWithTimeout) Read(p []byte) (n int, err error) { + if self.Timeout > 0 { + self.Conn.SetReadDeadline(time.Now().Add(self.Timeout)) + } + return self.Conn.Read(p) +} + +func (self connWithTimeout) Write(p []byte) (n int, err error) { + if self.Timeout > 0 { + self.Conn.SetWriteDeadline(time.Now().Add(self.Timeout)) + } + return self.Conn.Write(p) +} diff --git a/format/mjpeg/stream.go b/format/mjpeg/stream.go new file mode 100644 index 00000000..d2f293d9 --- /dev/null +++ b/format/mjpeg/stream.go @@ -0,0 +1,5 @@ +package mjpeg + +type Stream struct { + client *Client +} diff --git a/format/rtsp/client.go b/format/rtsp/client.go index 28f8592c..ad8bf4aa 100644 --- a/format/rtsp/client.go +++ b/format/rtsp/client.go @@ -8,13 +8,6 @@ import ( "encoding/binary" "encoding/hex" "fmt" - "github.com/nareix/joy4/utils/bits/pio" - "github.com/nareix/joy4/av" - "github.com/nareix/joy4/av/avutil" - "github.com/nareix/joy4/codec" - "github.com/nareix/joy4/codec/aacparser" - "github.com/nareix/joy4/codec/h264parser" - "github.com/nareix/joy4/format/rtsp/sdp" "io" "net" "net/textproto" @@ -22,6 +15,14 @@ import ( "strconv" "strings" "time" + + "github.com/Danile71/joy4/av" + "github.com/Danile71/joy4/av/avutil" + "github.com/Danile71/joy4/codec" + "github.com/Danile71/joy4/codec/aacparser" + "github.com/Danile71/joy4/codec/h264parser" + "github.com/Danile71/joy4/format/rtsp/sdp" + "github.com/Danile71/joy4/utils/bits/pio" ) var ErrCodecDataChange = fmt.Errorf("rtsp: codec data change, please call HandleCodecDataChange()") @@ -31,7 +32,7 @@ var DebugRtsp = false var SkipErrRtpBlock = false const ( - stageDescribeDone = iota+1 + stageDescribeDone = iota + 1 stageSetupDone stageWaitCodecData stageCodecDataDone @@ -39,7 +40,7 @@ const ( type Client struct { DebugRtsp bool - DebugRtp bool + DebugRtp bool Headers []string SkipErrRtpBlock bool @@ -52,20 +53,20 @@ type Client struct { stage int - setupIdx []int - setupMap []int + setupIdx []int + setupMap []int authHeaders func(method string) []string - url *url.URL - conn *connWithTimeout + url *url.URL + conn *connWithTimeout brconn *bufio.Reader - requestUri string - cseq uint - streams []*Stream + requestUri string + cseq uint + streams []*Stream streamsintf []av.CodecData - session string - body io.Reader + session string + body io.Reader } type Request struct { @@ -76,7 +77,7 @@ type Request struct { type Response struct { StatusCode int - Headers textproto.MIMEHeader + Headers textproto.MIMEHeader ContentLength int Body []byte @@ -105,12 +106,12 @@ func DialTimeout(uri string, timeout time.Duration) (self *Client, err error) { connt := &connWithTimeout{Conn: conn} self = &Client{ - conn: connt, - brconn: bufio.NewReaderSize(connt, 256), - url: URL, - requestUri: u2.String(), - DebugRtp: DebugRtp, - DebugRtsp: DebugRtsp, + conn: connt, + brconn: bufio.NewReaderSize(connt, 256), + url: URL, + requestUri: u2.String(), + DebugRtp: DebugRtp, + DebugRtsp: DebugRtsp, SkipErrRtpBlock: SkipErrRtpBlock, } return @@ -121,7 +122,7 @@ func Dial(uri string) (self *Client, err error) { } func (self *Client) allCodecDataReady() bool { - for _, si:= range self.setupIdx { + for _, si := range self.setupIdx { stream := self.streams[si] if stream.CodecData == nil { return false @@ -268,7 +269,7 @@ func (self *Client) parseBlockHeader(h []byte) (length int, no int, valid bool) timestamp -= stream.firsttimestamp if timestamp < stream.timestamp { return - } else if timestamp - stream.timestamp > uint32(stream.timeScale()*60*60) { + } else if timestamp-stream.timestamp > uint32(stream.timeScale()*60*60) { return } } @@ -373,7 +374,7 @@ func (self *Client) handle401(res *Response) (err error) { func (self *Client) findRTSP() (block []byte, data []byte, err error) { const ( - R = iota+1 + R = iota + 1 T S Header @@ -383,7 +384,7 @@ func (self *Client) findRTSP() (block []byte, data []byte, err error) { peek := _peek[0:0] stat := 0 - for i := 0;; i++ { + for i := 0; ; i++ { var b byte if b, err = self.brconn.ReadByte(); err != nil { return @@ -434,7 +435,12 @@ func (self *Client) findRTSP() (block []byte, data []byte, err error) { fmt.Println("rtsp: dollar at", i, len(peek)) } if blocklen, _, ok := self.parseBlockHeader(peek); ok { - left := blocklen+4-len(peek) + left := blocklen + 4 - len(peek) + + if left <= 0 { + return + } + block = append(peek, make([]byte, left)...) if _, err = io.ReadFull(self.brconn, block[len(peek):]); err != nil { return @@ -451,7 +457,7 @@ func (self *Client) findRTSP() (block []byte, data []byte, err error) { func (self *Client) readLFLF() (block []byte, data []byte, err error) { const ( - LF = iota+1 + LF = iota + 1 LFLF ) peek := []byte{} @@ -471,7 +477,7 @@ func (self *Client) readLFLF() (block []byte, data []byte, err error) { stat = LF lpos = pos } else if stat == LF { - if pos - lpos <= 2 { + if pos-lpos <= 2 { stat = LFLF } else { lpos = pos @@ -485,9 +491,9 @@ func (self *Client) readLFLF() (block []byte, data []byte, err error) { if stat == LFLF { data = peek return - } else if dollarpos != -1 && dollarpos - pos >= 12 { - hdrlen := dollarpos-pos - start := len(peek)-hdrlen + } else if dollarpos != -1 && dollarpos-pos >= 12 { + hdrlen := dollarpos - pos + start := len(peek) - hdrlen if blocklen, _, ok := self.parseBlockHeader(peek[start:]); ok { block = append(peek[start:], make([]byte, blocklen+4-hdrlen)...) if _, err = io.ReadFull(self.brconn, block[hdrlen:]); err != nil { @@ -810,7 +816,7 @@ func (self *Stream) handleH264Payload(timestamp uint32, packet []byte) (err erro return } - naluType := packet[0]&0x1f + naluType := packet[0] & 0x1f /* Table 7-1 – NAL unit type codes @@ -828,18 +834,21 @@ func (self *Stream) handleH264Payload(timestamp uint32, packet []byte) (err erro 29 FU-B Fragmentation unit 5.8 30-31 reserved - */ + + self.pkt.FrameType = 123 switch { case naluType >= 1 && naluType <= 5: - if naluType == 5 { - self.pkt.IsKeyFrame = true - } - self.gotpkt = true - // raw nalu to avcc - b := make([]byte, 4+len(packet)) - pio.PutU32BE(b[0:4], uint32(len(packet))) - copy(b[4:], packet) - self.pkt.Data = b - self.timestamp = timestamp + + self.pkt.FrameType = packet[4] + + self.gotpkt = true + // raw nalu to avcc + b := make([]byte, 4+len(packet)) + pio.PutU32BE(b[0:4], uint32(len(packet))) + copy(b[4:], packet) + + self.pkt.Data = b + self.timestamp = timestamp case naluType == 7: // sps if self.client != nil && self.client.DebugRtp { @@ -940,30 +949,30 @@ func (self *Stream) handleH264Payload(timestamp uint32, packet []byte) (err erro case naluType == 24: // STAP-A /* - 0 1 2 3 - 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | RTP Header | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - |STAP-A NAL HDR | NALU 1 Size | NALU 1 HDR | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | NALU 1 Data | - : : - + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | | NALU 2 Size | NALU 2 HDR | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | NALU 2 Data | - : : - | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | :...OPTIONAL RTP padding | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | RTP Header | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + |STAP-A NAL HDR | NALU 1 Size | NALU 1 HDR | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | NALU 1 Data | + : : + + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | | NALU 2 Size | NALU 2 HDR | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | NALU 2 Data | + : : + | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | :...OPTIONAL RTP padding | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - Figure 7. An example of an RTP packet including an STAP-A - containing two single-time aggregation units + Figure 7. An example of an RTP packet including an STAP-A + containing two single-time aggregation units */ packet = packet[1:] for len(packet) >= 2 { - size := int(packet[0])<<8|int(packet[1]) + size := int(packet[0])<<8 | int(packet[1]) if size+2 > len(packet) { break } @@ -1141,7 +1150,7 @@ func (self *Client) handleBlock(block []byte) (pkt av.Packet, ok bool, err error return } - i := blockno/2 + i := blockno / 2 if i >= len(self.streams) { err = fmt.Errorf("rtsp: block no=%d invalid", blockno) return @@ -1158,11 +1167,11 @@ func (self *Client) handleBlock(block []byte) (pkt av.Packet, ok bool, err error if stream.gotpkt { /* - TODO: sync AV by rtcp NTP timestamp - TODO: handle timestamp overflow - https://tools.ietf.org/html/rfc3550 - A receiver can then synchronize presentation of the audio and video packets by relating - their RTP timestamps using the timestamp pairs in RTCP SR packets. + TODO: sync AV by rtcp NTP timestamp + TODO: handle timestamp overflow + https://tools.ietf.org/html/rfc3550 + A receiver can then synchronize presentation of the audio and video packets by relating + their RTP timestamps using the timestamp pairs in RTCP SR packets. */ if stream.firsttimestamp == 0 { stream.firsttimestamp = stream.timestamp @@ -1171,10 +1180,10 @@ func (self *Client) handleBlock(block []byte) (pkt av.Packet, ok bool, err error ok = true pkt = stream.pkt - pkt.Time = time.Duration(stream.timestamp)*time.Second / time.Duration(stream.timeScale()) + pkt.Time = time.Duration(stream.timestamp) * time.Second / time.Duration(stream.timeScale()) pkt.Idx = int8(self.setupMap[i]) - if pkt.Time < stream.lasttime || pkt.Time - stream.lasttime > time.Minute*30 { + if pkt.Time < stream.lasttime || pkt.Time-stream.lasttime > time.Minute*30 { err = fmt.Errorf("rtp: time invalid stream#%d time=%v lasttime=%v", pkt.Idx, pkt.Time, stream.lasttime) return } @@ -1236,4 +1245,3 @@ func Handler(h *avutil.RegisterHandler) { return } } - diff --git a/format/rtsp/sdp/parser.go b/format/rtsp/sdp/parser.go index a092ddfc..3b91857f 100644 --- a/format/rtsp/sdp/parser.go +++ b/format/rtsp/sdp/parser.go @@ -4,9 +4,10 @@ import ( "encoding/base64" "encoding/hex" "fmt" - "github.com/nareix/joy4/av" "strconv" "strings" + + "github.com/Danile71/joy4/av" ) type Session struct { diff --git a/format/rtsp/stream.go b/format/rtsp/stream.go index f3497cdb..1bd3719c 100644 --- a/format/rtsp/stream.go +++ b/format/rtsp/stream.go @@ -1,9 +1,10 @@ package rtsp import ( - "github.com/nareix/joy4/av" - "github.com/nareix/joy4/format/rtsp/sdp" "time" + + "github.com/Danile71/joy4/av" + "github.com/Danile71/joy4/format/rtsp/sdp" ) type Stream struct { @@ -19,11 +20,10 @@ type Stream struct { spsChanged bool ppsChanged bool - gotpkt bool - pkt av.Packet - timestamp uint32 + gotpkt bool + pkt av.Packet + timestamp uint32 firsttimestamp uint32 lasttime time.Duration } -