diff --git a/cmd/wtf/main.go b/cmd/wtf/main.go new file mode 100644 index 0000000..1985e78 --- /dev/null +++ b/cmd/wtf/main.go @@ -0,0 +1,85 @@ +package main + +import ( + "fmt" + "os" + + "github.com/tinyzimmer/go-glib/glib" + "github.com/tinyzimmer/go-gst/gst" + "github.com/tinyzimmer/go-gst/gst/app" +) + +func MonitorPipeline(mainLoop *glib.MainLoop, pipeline *gst.Pipeline) func(msg *gst.Message) bool { + return func(msg *gst.Message) bool { + switch msg.Type() { + case gst.MessageEOS: + pipeline.BlockSetState(gst.StateNull) + mainLoop.Quit() + case gst.MessageError: + err := msg.ParseError() + fmt.Println("ERROR:", err.Error()) + if debug := err.DebugString(); debug != "" { + fmt.Println("DEBUG:", debug) + } + mainLoop.Quit() + default: + fmt.Println(msg) + } + return true + } +} + +func input() (*gst.Pipeline, *app.Sink) { + r, err := gst.NewPipelineFromString("videotestsrc ! vp8enc ! rtpvp8pay ! appsink name=sink") + if err != nil { + panic(err) + } + + sink, err := r.GetElementByName("sink") + if err != nil { + panic(err) + } + + return r, app.SinkFromElement(sink) +} + +func output() (*gst.Pipeline, *app.Source) { + w, err := gst.NewPipelineFromString("appsrc name=source format=time ! rtpvp8depay ! vp8dec ! autovideosink") + if err != nil { + panic(err) + } + + src, err := w.GetElementByName("source") + if err != nil { + panic(err) + } + + return w, app.SrcFromElement(src) +} + +func main() { + gst.Init(&os.Args) + + mainLoop := glib.NewMainLoop(glib.MainContextDefault(), false) + + r, sink := input() + w, src := output() + + r.GetPipelineBus().AddWatch(MonitorPipeline(mainLoop, r)) + w.GetPipelineBus().AddWatch(MonitorPipeline(mainLoop, w)) + + sink.SetCallbacks(&app.SinkCallbacks{ + NewSampleFunc: func(sink *app.Sink) gst.FlowReturn { + sample := sink.PullSample() + if sample == nil { + return gst.FlowEOS + } + return src.PushSample(sample) + }, + }) + + r.SetState(gst.StatePlaying) + w.SetState(gst.StatePlaying) + + mainLoop.Run() +} diff --git a/examples/transcode-file/README.md b/examples/transcode-file/README.md index b4df2f9..94a6bdc 100644 --- a/examples/transcode-file/README.md +++ b/examples/transcode-file/README.md @@ -8,8 +8,4 @@ transcode-file is a simple application that shows how to transcode an ivf file o go run main.go -addr localhost:50051 -i input.ivf -o output.ivf ``` -You should see an `output.ivf` file produced. The output file does not contain the last ~1s -of video. This is a known issue due to the transcoder expecting live video sources, not -sources that terminate. - -If you intend to transcode mostly non-live video sources, it would be easier to use ffmpeg :) \ No newline at end of file +You should see an `output.ivf` file produced. diff --git a/go.mod b/go.mod index 6c88054..16cc10b 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,10 @@ require ( require ( github.com/golang/protobuf v1.5.2 // indirect github.com/google/go-cmp v0.5.7 // indirect + github.com/mattn/go-pointer v0.0.1 // indirect github.com/pkg/errors v0.9.1 // indirect + github.com/tinyzimmer/go-glib v0.0.24 // indirect + github.com/tinyzimmer/go-gst v0.2.32 // indirect go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.7.0 // indirect golang.org/x/text v0.3.7 // indirect diff --git a/go.sum b/go.sum index 4e2182a..f4de9c3 100644 --- a/go.sum +++ b/go.sum @@ -64,6 +64,8 @@ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/mattn/go-pointer v0.0.1 h1:n+XhsuGeVO6MEAp7xyEukFINEa+Quek5psIR/ylA6o0= +github.com/mattn/go-pointer v0.0.1/go.mod h1:2zXcozF6qYGgmsG+SeTZz3oAbFLdD3OWqnUbNvJZAlc= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= @@ -130,6 +132,10 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5 github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/tinyzimmer/go-glib v0.0.24 h1:ktZZC22/9t88kGRgNEFV/SESgIWhGHE+q7Z7Qj++luw= +github.com/tinyzimmer/go-glib v0.0.24/go.mod h1:ltV0gO6xNFzZhsIRbFXv8RTq9NGoNT2dmAER4YmZfaM= +github.com/tinyzimmer/go-gst v0.2.32 h1:bwJ1VfLyoeQPxuE7LgCTwwvMXFufnFoSws7QhaCfsY8= +github.com/tinyzimmer/go-gst v0.2.32/go.mod h1:V4h+HPS3mVGSwUJ7IBi3WAkJWITZasebERyqk3TEXUM= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= diff --git a/internal/gst/README.md b/internal/gst/README.md deleted file mode 100644 index 1e0d9f1..0000000 --- a/internal/gst/README.md +++ /dev/null @@ -1,3 +0,0 @@ -# muxable/transcoder/internal/gst - -Really tiny GStreamer bindings, forked from github.com/notedit/gst. \ No newline at end of file diff --git a/internal/gst/bin.go b/internal/gst/bin.go deleted file mode 100644 index 1149998..0000000 --- a/internal/gst/bin.go +++ /dev/null @@ -1,61 +0,0 @@ -package gst - -/* -#cgo pkg-config: gstreamer-1.0 -#include -#include -*/ -import "C" -import ( - "fmt" - "runtime" - "unsafe" -) - -type Bin struct { - Element -} - -func ParseBinFromDescription(binStr string) (bin *Bin, err error) { - var gError *C.GError - - pDesc := (*C.gchar)(unsafe.Pointer(C.CString(binStr))) - defer C.g_free(C.gpointer(unsafe.Pointer(pDesc))) - - gstElt := C.gst_parse_bin_from_description(pDesc, C.int(0), &gError) - - if gError != nil { - err = fmt.Errorf("create bin error for %s", binStr) - return - } - - bin = &Bin{} - bin.GstElement = gstElt - - runtime.SetFinalizer(bin, func(bin *Bin) { - C.gst_object_unref(C.gpointer(unsafe.Pointer(bin.GstElement))) - }) - - return -} - -func (b *Bin) GetByName(name string) (*Element) { - n := (*C.gchar)(unsafe.Pointer(C.CString(name))) - defer C.g_free(C.gpointer(unsafe.Pointer(n))) - CElement := C.gst_bin_get_by_name((*C.GstBin)(unsafe.Pointer(b.GstElement)), n) - - if CElement == nil { - return nil - } - return &Element{ - GstElement: CElement, - } -} - -func (b *Bin) Add(child *Element) { - C.gst_bin_add((*C.GstBin)(unsafe.Pointer(b.GstElement)), child.GstElement) -} - -func (b *Bin) Remove(child *Element) { - C.gst_bin_remove((*C.GstBin)(unsafe.Pointer(b.GstElement)), child.GstElement) -} diff --git a/internal/gst/caps.go b/internal/gst/caps.go deleted file mode 100644 index 1dae829..0000000 --- a/internal/gst/caps.go +++ /dev/null @@ -1,22 +0,0 @@ -package gst - -/* -#cgo pkg-config: gstreamer-1.0 -#include -*/ -import ( - "C" -) -import ( - "unsafe" -) - -func IsValidCapsString(capsStr string) (bool) { - pCapsStr := (*C.gchar)(unsafe.Pointer(C.CString(capsStr))) - defer C.g_free(C.gpointer(unsafe.Pointer(pCapsStr))) - - gstCaps := C.gst_caps_from_string(pCapsStr) - defer C.gst_caps_unref(gstCaps) - - return gstCaps != nil -} \ No newline at end of file diff --git a/internal/gst/element.go b/internal/gst/element.go deleted file mode 100644 index 415de2f..0000000 --- a/internal/gst/element.go +++ /dev/null @@ -1,141 +0,0 @@ -package gst - -/* -#cgo pkg-config: gstreamer-1.0 gstreamer-app-1.0 -#include -#include -#include -#include - -gpointer compat_memdup(gconstpointer mem, gsize byte_size) { -#if GLIB_CHECK_VERSION(2, 68, 0) - return g_memdup2(mem, byte_size); -#else - return g_memdup(mem, byte_size); -#endif -} -*/ -import "C" -import ( - "errors" - "fmt" - "io" - "unsafe" - - "github.com/pion/rtp" -) - -type Element struct { - GstElement *C.GstElement -} - -func (e *Element) SetState(state StateOptions) StateChangeReturn { - Cint := C.gst_element_set_state(e.GstElement, C.GstState(state)) - return StateChangeReturn(Cint) -} - -func (e *Element) EndOfStream() (err error) { - // EndOfStream signals that the appsrc will not receive any further - // input via PushBuffer and permits the pipeline to finish properly. - - gstReturn := C.gst_app_src_end_of_stream((*C.GstAppSrc)(unsafe.Pointer(e.GstElement))) - if gstReturn != C.GST_FLOW_OK { - err = errors.New("could not send end_of_stream") - } - return -} - -func (e *Element) PushBuffer(data []byte) (err error) { - b := C.CBytes(data) - defer C.free(b) - - p := C.compat_memdup(C.gconstpointer(b), C.ulong(len(data))) - cdata := C.gst_buffer_new_wrapped(p, C.ulong(len(data))) - - gstReturn := C.gst_app_src_push_buffer((*C.GstAppSrc)(unsafe.Pointer(e.GstElement)), cdata) - - if gstReturn != C.GST_FLOW_OK { - err = errors.New("could not push buffer on appsrc element") - return - } - - return -} - -func (e *Element) PullSample() (sample *Sample, err error) { - CGstSample := C.gst_app_sink_pull_sample((*C.GstAppSink)(unsafe.Pointer(e.GstElement))) - if CGstSample == nil { - err = errors.New("could not pull a sample from appsink") - return - } - - gstBuffer := C.gst_sample_get_buffer(CGstSample) - - if gstBuffer == nil { - err = errors.New("could not pull a sample from appsink") - return - } - - mapInfo := (*C.GstMapInfo)(unsafe.Pointer(C.malloc(C.sizeof_GstMapInfo))) - defer C.free(unsafe.Pointer(mapInfo)) - - if int(C.gst_buffer_map(gstBuffer, mapInfo, C.GST_MAP_READ)) == 0 { - err = fmt.Errorf("could not map gstBuffer %#v", gstBuffer) - return - } - - CData := (*[1 << 30]byte)(unsafe.Pointer(mapInfo.data)) - data := make([]byte, int(mapInfo.size)) - copy(data, CData[:]) - - duration := uint64((*C.GstBuffer)(unsafe.Pointer(gstBuffer)).duration) - pts := uint64((*C.GstBuffer)(unsafe.Pointer(gstBuffer)).pts) - dts := uint64((*C.GstBuffer)(unsafe.Pointer(gstBuffer)).dts) - offset := uint64((*C.GstBuffer)(unsafe.Pointer(gstBuffer)).offset) - - sample = &Sample{ - Data: data, - Duration: duration, - Pts: pts, - Dts: dts, - Offset: offset, - } - - C.gst_buffer_unmap(gstBuffer, mapInfo) - C.gst_sample_unref(CGstSample) - - return -} - -func (e *Element) IsEOS() bool { - Cbool := C.gst_app_sink_is_eos((*C.GstAppSink)(unsafe.Pointer(e.GstElement))) - return Cbool == 1 -} - -func (e *Element) WriteRTP(p *rtp.Packet) error { - buf, err := p.Marshal() - if err != nil { - return err - } - return e.PushBuffer(buf) -} - -func (e *Element) ReadRTP() (*rtp.Packet, error) { - sample, err := e.PullSample() - if err != nil { - if e.IsEOS() { - return nil, io.EOF - } - return nil, err - } - - p := &rtp.Packet{} - if err := p.Unmarshal(sample.Data); err != nil { - return nil, err - } - return p, nil -} - -func (e *Element) Close() error { - return e.EndOfStream() -} \ No newline at end of file diff --git a/internal/gst/gst.go b/internal/gst/gst.go deleted file mode 100644 index 9f4e72b..0000000 --- a/internal/gst/gst.go +++ /dev/null @@ -1,12 +0,0 @@ -package gst - -/* -#cgo pkg-config: gstreamer-1.0 -#include -#include -*/ -import "C" - -func init() { - C.gst_init(nil, nil); -} \ No newline at end of file diff --git a/internal/gst/pipeline.go b/internal/gst/pipeline.go deleted file mode 100644 index a4d203d..0000000 --- a/internal/gst/pipeline.go +++ /dev/null @@ -1,35 +0,0 @@ -package gst - -/* -#cgo pkg-config: gstreamer-1.0 -#include -#include -*/ -import "C" -import ( - "errors" - "runtime" - "unsafe" -) - -type Pipeline struct { - Bin -} - -func PipelineNew() (e *Pipeline, err error) { - gstElt := C.gst_pipeline_new(nil) - if gstElt == nil { - err = errors.New("could not create a Gstreamer pipeline") - return - } - - e = &Pipeline{} - - e.GstElement = gstElt - - runtime.SetFinalizer(e, func(e *Pipeline) { - C.gst_object_unref(C.gpointer(unsafe.Pointer(e.GstElement))) - }) - - return -} diff --git a/internal/gst/sample.go b/internal/gst/sample.go deleted file mode 100644 index f87c75a..0000000 --- a/internal/gst/sample.go +++ /dev/null @@ -1,9 +0,0 @@ -package gst - -type Sample struct { - Data []byte - Duration uint64 - Pts uint64 - Dts uint64 - Offset uint64 -} diff --git a/internal/gst/state.go b/internal/gst/state.go deleted file mode 100644 index dc63893..0000000 --- a/internal/gst/state.go +++ /dev/null @@ -1,27 +0,0 @@ -package gst - -/* -#cgo pkg-config: gstreamer-1.0 -#include -#include -*/ -import "C" - -type StateOptions int - -const ( - StateVoidPending StateOptions = C.GST_STATE_VOID_PENDING - StateNull StateOptions = C.GST_STATE_NULL - StateReady StateOptions = C.GST_STATE_READY - StatePaused StateOptions = C.GST_STATE_PAUSED - StatePlaying StateOptions = C.GST_STATE_PLAYING -) - -type StateChangeReturn int - -const ( - StateChangeFailure StateChangeReturn = C.GST_STATE_CHANGE_FAILURE - StateChangeSuccess StateChangeReturn = C.GST_STATE_CHANGE_SUCCESS - StateChangeAsync StateChangeReturn = C.GST_STATE_CHANGE_ASYNC - StateChangePreroll StateChangeReturn = C.GST_STATE_CHANGE_NO_PREROLL -) diff --git a/internal/peerconnection/peerconnection.go b/internal/peerconnection/peerconnection.go index 42996d4..c37d18f 100644 --- a/internal/peerconnection/peerconnection.go +++ b/internal/peerconnection/peerconnection.go @@ -18,7 +18,6 @@ func NewTranscoderPeerConnection(configuration webrtc.Configuration) (*webrtc.Pe RTPCodecCapability: webrtc.RTPCodecCapability{ MimeType: "video/H265", ClockRate: 90000, - Channels: 0, SDPFmtpLine: "", RTCPFeedback: []webrtc.RTCPFeedback{{Type: "goog-remb", Parameter: ""}, {Type: "ccm", Parameter: "fir"}, {Type: "nack", Parameter: ""}, {Type: "nack", Parameter: "pli"}}, }, diff --git a/internal/server/codec.go b/internal/server/codec.go index e8c4afc..114c96a 100644 --- a/internal/server/codec.go +++ b/internal/server/codec.go @@ -66,23 +66,43 @@ var DefaultOutputCodecs = map[string]webrtc.RTPCodecParameters{ }, webrtc.MimeTypeVP8: { - RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeVP8, ClockRate: 90000}, + RTPCodecCapability: webrtc.RTPCodecCapability{ + MimeType: webrtc.MimeTypeVP8, + ClockRate: 90000, + RTCPFeedback: []webrtc.RTCPFeedback{{Type: "goog-remb", Parameter: ""}, {Type: "ccm", Parameter: "fir"}, {Type: "nack", Parameter: ""}, {Type: "nack", Parameter: "pli"}}, + }, PayloadType: 100, }, webrtc.MimeTypeVP9: { - RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeVP9, ClockRate: 90000}, + RTPCodecCapability: webrtc.RTPCodecCapability{ + MimeType: webrtc.MimeTypeVP9, + ClockRate: 90000, + RTCPFeedback: []webrtc.RTCPFeedback{{Type: "goog-remb", Parameter: ""}, {Type: "ccm", Parameter: "fir"}, {Type: "nack", Parameter: ""}, {Type: "nack", Parameter: "pli"}}, + }, PayloadType: 101, }, webrtc.MimeTypeH264: { - RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264, ClockRate: 90000}, + RTPCodecCapability: webrtc.RTPCodecCapability{ + MimeType: webrtc.MimeTypeH264, + ClockRate: 90000, + RTCPFeedback: []webrtc.RTCPFeedback{{Type: "goog-remb", Parameter: ""}, {Type: "ccm", Parameter: "fir"}, {Type: "nack", Parameter: ""}, {Type: "nack", Parameter: "pli"}}, + }, PayloadType: 102, }, webrtc.MimeTypeH265: { - RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH265, ClockRate: 90000}, + RTPCodecCapability: webrtc.RTPCodecCapability{ + MimeType: webrtc.MimeTypeH265, + ClockRate: 90000, + RTCPFeedback: []webrtc.RTCPFeedback{{Type: "goog-remb", Parameter: ""}, {Type: "ccm", Parameter: "fir"}, {Type: "nack", Parameter: ""}, {Type: "nack", Parameter: "pli"}}, + }, PayloadType: 103, }, webrtc.MimeTypeAV1: { - RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeAV1, ClockRate: 90000}, + RTPCodecCapability: webrtc.RTPCodecCapability{ + MimeType: webrtc.MimeTypeAV1, + ClockRate: 90000, + RTCPFeedback: []webrtc.RTCPFeedback{{Type: "goog-remb", Parameter: ""}, {Type: "ccm", Parameter: "fir"}, {Type: "nack", Parameter: ""}, {Type: "nack", Parameter: "pli"}}, + }, PayloadType: 104, }, @@ -208,13 +228,13 @@ func PipelineString(from, to webrtc.RTPCodecParameters, encoder string) (string, inputCaps := fmt.Sprintf("application/x-rtp,media=(string)video,%s", fromParameters.ToCaps(from)) return fmt.Sprintf( - "appsrc format=time is-live=true name=source ! %s ! %s ! queue ! decodebin ! autovideosink", - inputCaps, fromParameters.Depayloader), nil + "appsrc is-live=true do-timestamp=true format=time name=source ! %s ! %s ! queue ! decodebin ! queue ! videoconvert ! videorate ! %s ! %s ! queue ! appsink name=sink sync=false async=false", + inputCaps, fromParameters.Depayloader, encoder, toParameters.Payloader), nil } else if strings.HasPrefix(from.MimeType, "audio") { inputCaps := fmt.Sprintf("application/x-rtp,media=(string)audio,%s", fromParameters.ToCaps(from)) return fmt.Sprintf( - "appsrc format=time is-live=true name=source ! %s ! %s ! queue ! decodebin ! queue ! audioconvert ! audioresample ! %s ! %s mtu=1200 ! appsink name=sink sync=false async=false", + "appsrc is-live=true do-timestamp=true format=time name=source ! %s ! %s ! queue ! decodebin ! queue ! audioconvert ! audioresample ! %s ! %s mtu=1200 ! appsink name=sink sync=false async=false", inputCaps, fromParameters.Depayloader, encoder, toParameters.Payloader), nil } return "", fmt.Errorf("unsupported codec %s", from.MimeType) diff --git a/internal/server/codec_test.go b/internal/server/codec_test.go deleted file mode 100644 index 161c9cf..0000000 --- a/internal/server/codec_test.go +++ /dev/null @@ -1,26 +0,0 @@ -package server - -import ( - "testing" - - "github.com/muxable/transcoder/internal/gst" -) - -func TestSupportedCodecs(t *testing.T) { - for _, c := range SupportedCodecs { - // try creating elements - if _, err := gst.ParseBinFromDescription(c.Depayloader); err != nil { - t.Errorf("failed to create element from %s: %v", c.Depayloader, err) - } - if _, err := gst.ParseBinFromDescription(c.Payloader); err != nil { - t.Errorf("failed to create element from %s: %v", c.Payloader, err) - } - if _, err := gst.ParseBinFromDescription(c.DefaultEncoder); err != nil { - t.Errorf("failed to create element from %s: %v", c.DefaultEncoder, err) - } - } - // verify that invalid bins fail. - if _, err := gst.ParseBinFromDescription("invalid"); err == nil { - t.Errorf("expected error for invalid bin") - } -} diff --git a/pkg/server/server.go b/pkg/server/server.go index 4b17bef..1378365 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -7,6 +7,7 @@ import ( "github.com/muxable/transcoder/api" "github.com/muxable/transcoder/internal/peerconnection" + "github.com/muxable/transcoder/internal/server" "github.com/muxable/transcoder/pkg/transcode" "github.com/pion/rtpio/pkg/rtpio" "github.com/pion/webrtc/v3" @@ -187,13 +188,13 @@ func (s *TranscoderServer) Transcode(ctx context.Context, request *api.Transcode // tr is the remote track that matches the request. transcoder, err := transcode.NewTranscoder(matched.TrackRemote.Codec(), transcode.WithSynchronizer(matched.Synchronizer), - transcode.ToMimeType(request.MimeType), + transcode.ToOutputCodec(server.DefaultOutputCodecs[request.MimeType]), transcode.ViaGStreamerEncoder(request.GstreamerPipeline)) if err != nil { return nil, err } - tl, err := webrtc.NewTrackLocalStaticRTP(transcoder.OutputCodec(), matched.TrackRemote.ID(), matched.TrackRemote.StreamID()) + tl, err := webrtc.NewTrackLocalStaticRTP(transcoder.OutputCodec().RTPCodecCapability, matched.TrackRemote.ID(), matched.TrackRemote.StreamID()) if err != nil { return nil, err } diff --git a/pkg/transcode/synchronizer.go b/pkg/transcode/synchronizer.go index cb87aff..22fdebd 100644 --- a/pkg/transcode/synchronizer.go +++ b/pkg/transcode/synchronizer.go @@ -1,9 +1,7 @@ package transcode import ( - "fmt" - - "github.com/muxable/transcoder/internal/gst" + "github.com/tinyzimmer/go-gst/gst" ) type Synchronizer struct { @@ -11,7 +9,7 @@ type Synchronizer struct { } func NewSynchronizer() (*Synchronizer, error) { - pipeline, err := gst.PipelineNew() + pipeline, err := gst.NewPipeline("") if err != nil { return nil, err } @@ -24,8 +22,5 @@ func NewSynchronizer() (*Synchronizer, error) { } func (s *Synchronizer) Close() error { - if r := s.element.SetState(gst.StateNull); r != gst.StateChangeSuccess { - return fmt.Errorf("failed to set state to null: %+v", r) - } - return nil + return s.element.SetState(gst.StateNull) } diff --git a/pkg/transcode/transcode.go b/pkg/transcode/transcode.go index 1d22be1..9a55ca5 100644 --- a/pkg/transcode/transcode.go +++ b/pkg/transcode/transcode.go @@ -1,20 +1,21 @@ package transcode import ( + "errors" "fmt" "io" "strings" - "github.com/muxable/transcoder/internal/gst" "github.com/muxable/transcoder/internal/server" "github.com/pion/rtp" - "github.com/pion/rtpio/pkg/rtpio" "github.com/pion/webrtc/v3" + "github.com/tinyzimmer/go-gst/gst" + "github.com/tinyzimmer/go-gst/gst/app" ) type Transcoder struct { - sink rtpio.RTPWriteCloser - source rtpio.RTPReader + sink *app.Source + source *app.Sink synchronizer *Synchronizer encodingPipeline string @@ -48,30 +49,32 @@ func NewTranscoder(from webrtc.RTPCodecParameters, options ...TranscoderOption) return nil, err } - bin, err := gst.ParseBinFromDescription(transcodingPipelineStr) + bin, err := gst.NewBinFromString(transcodingPipelineStr, false) if err != nil { return nil, err } - source := bin.GetByName("source") - sink := bin.GetByName("sink") - if source != nil { - t.sink = source + src, err := bin.GetElementByName("source") + if err != nil { + return nil, err } - if sink != nil { - t.source = sink + sink, err := bin.GetElementByName("sink") + if err != nil { + return nil, err } + t.sink = app.SrcFromElement(src) + t.source = app.SinkFromElement(sink) if t.synchronizer == nil { - pipeline, err := gst.PipelineNew() + pipeline, err := gst.NewPipeline("") if err != nil { return nil, err } - pipeline.Add(&bin.Element) + pipeline.Add(bin.Element) pipeline.SetState(gst.StatePlaying) - t.bin = &pipeline.Bin + t.bin = pipeline.Bin } else { - t.synchronizer.element.Add(&bin.Element) + t.synchronizer.element.Add(bin.Element) t.bin = bin } @@ -85,11 +88,18 @@ func (t *Transcoder) OutputCodec() webrtc.RTPCodecParameters { } func (t *Transcoder) ReadRTP() (*rtp.Packet, error) { - if t.source == nil { + sample := t.source.PullSample() + if sample == nil { return nil, io.EOF } - p, err := t.source.ReadRTP() - if err != nil { + buffer := sample.GetBuffer() + if buffer == nil { + return nil, fmt.Errorf("no buffer in sample") + } + buf := buffer.Map(gst.MapRead).Bytes() + defer buffer.Unmap() + p := &rtp.Packet{} + if err := p.Unmarshal(buf); err != nil { return nil, err } return p, nil @@ -100,23 +110,35 @@ func (t *Transcoder) WriteRTP(p *rtp.Packet) error { if t.sink == nil { return nil } - return t.sink.WriteRTP(p) + buf, err := p.Marshal() + if err != nil { + return err + } + + buffer := gst.NewBufferWithSize(int64(len(buf))) + + buffer.Map(gst.MapWrite).WriteData(buf) + buffer.Unmap() + + if r := t.sink.PushBuffer(buffer); r != gst.FlowOK { + return fmt.Errorf("failed to push buffer: %v", r) + } + return nil } func (t *Transcoder) Close() error { t.bin.SetState(gst.StateNull) if t.synchronizer != nil { - t.synchronizer.element.Remove(&t.bin.Element) + t.synchronizer.element.Remove(t.bin.Element) } - if t.sink == nil { - return nil + if err := t.sink.EndStream(); err != gst.FlowEOS { + return errors.New("failed to end stream") } - if err := t.sink.Close(); err != nil { - return err - } + t.source.Unref() + t.sink.Unref() return nil } diff --git a/test/e2e_test.go b/test/e2e_test.go index 0db2343..1e6cf87 100644 --- a/test/e2e_test.go +++ b/test/e2e_test.go @@ -1,138 +1,173 @@ -package test +package main import ( "fmt" - "log" + "os" "strings" - "sync" "testing" - "github.com/muxable/transcoder/internal/gst" "github.com/muxable/transcoder/internal/server" "github.com/muxable/transcoder/pkg/transcode" - "github.com/pion/rtpio/pkg/rtpio" + "github.com/pion/rtp" "github.com/pion/webrtc/v3" + "github.com/tinyzimmer/go-glib/glib" + "github.com/tinyzimmer/go-gst/gst" + "github.com/tinyzimmer/go-gst/gst/app" ) -func writer(s string) (*gst.Element, error) { - pipeline, err := gst.PipelineNew() - if err != nil { - return nil, err +func MonitorPipeline(mainLoop *glib.MainLoop, pipeline *gst.Pipeline) func(msg *gst.Message) bool { + return func(msg *gst.Message) bool { + switch msg.Type() { + case gst.MessageEOS: + pipeline.BlockSetState(gst.StateNull) + mainLoop.Quit() + case gst.MessageError: + err := msg.ParseError() + fmt.Println("ERROR:", err.Error()) + if debug := err.DebugString(); debug != "" { + fmt.Println("DEBUG:", debug) + } + mainLoop.Quit() + default: + // fmt.Println(msg) + } + return true } - bin, err := gst.ParseBinFromDescription(s) +} + +func input(ps string) (*gst.Pipeline, *app.Sink) { + r, err := gst.NewPipelineFromString(ps) if err != nil { - return nil, err + panic(err) } - pipeline.SetState(gst.StatePlaying) - pipeline.Add(&bin.Element) - bin.SetState(gst.StatePlaying) + sink, err := r.GetElementByName("sink") + if err != nil { + panic(err) + } - return bin.GetByName("source"), nil + return r, app.SinkFromElement(sink) } -func reader(s string) (*gst.Element, error) { - pipeline, err := gst.PipelineNew() +func output(ps string) (*gst.Pipeline, *app.Source, *gst.Element) { + w, err := gst.NewPipelineFromString(ps) if err != nil { - return nil, err + panic(err) } - bin, err := gst.ParseBinFromDescription(s) + + src, err := w.GetElementByName("source") if err != nil { - return nil, err + panic(err) } - pipeline.SetState(gst.StatePlaying) - pipeline.Add(&bin.Element) - bin.SetState(gst.StatePlaying) + test, err := w.GetElementByName("test") + if err != nil { + panic(err) + } - return bin.GetByName("sink"), nil + return w, app.SrcFromElement(src), test } -func TestTranscodingVideo(t *testing.T) { - for mime, codec := range server.SupportedCodecs { - if mime != webrtc.MimeTypeVP8 { - continue - } - if !strings.HasPrefix(mime, "video") { - continue - } - - log.Printf("playing %s", mime) - - outputCodec := server.DefaultOutputCodecs[mime] - - tc, err := transcode.NewTranscoder( - server.DefaultOutputCodecs[webrtc.MimeTypeVP8], - transcode.ToOutputCodec(outputCodec)) - if err != nil { - t.Errorf("failed to create transcoder: %v", err) - continue - } +func TestTranscoding(t *testing.T) { + gst.Init(&os.Args) - reader, err := reader(fmt.Sprintf("filesrc location=input.ivf ! decodebin ! vp8enc ! rtpvp8pay pt=%d mtu=1200 ! appsink name=sink", server.DefaultOutputCodecs[webrtc.MimeTypeVP8].PayloadType)) - if err != nil { - t.Errorf("failed to create bin: %v", err) - } - writer, err := writer(fmt.Sprintf("appsrc format=time is-live=true name=source ! application/x-rtp,%s ! %s ! queue ! decodebin ! autovideosink", codec.ToCaps(outputCodec), codec.Depayloader)) - if err != nil { - t.Errorf("failed to create bin: %v", err) - } - - var wg sync.WaitGroup - wg.Add(2) - go func() { - rtpio.CopyRTP(tc, reader) - tc.Close() - wg.Done() - }() - go func() { - rtpio.CopyRTP(writer, tc) - writer.Close() - wg.Done() - }() - wg.Wait() + for mime, codec := range server.SupportedCodecs { + t.Run(mime, func(t *testing.T) { + + mainLoop := glib.NewMainLoop(glib.MainContextDefault(), false) + + var ic webrtc.RTPCodecParameters + oc := server.DefaultOutputCodecs[mime] + + var rs, ws string + if strings.HasPrefix(mime, "audio") { + ic = server.DefaultOutputCodecs[webrtc.MimeTypeOpus] + rs = fmt.Sprintf("audiotestsrc num-buffers=100 ! opusenc ! rtpopuspay pt=%d mtu=1200 ! appsink name=sink", server.DefaultOutputCodecs[webrtc.MimeTypeOpus].PayloadType) + ws = fmt.Sprintf("appsrc format=time name=source ! application/x-rtp,%s ! %s ! queue ! decodebin ! audioconvert ! testsink name=test", codec.ToCaps(oc), codec.Depayloader) + } else { + ic = server.DefaultOutputCodecs[webrtc.MimeTypeVP8] + rs = fmt.Sprintf("videotestsrc num-buffers=100 ! vp8enc ! rtpvp8pay pt=%d mtu=1200 ! appsink name=sink", server.DefaultOutputCodecs[webrtc.MimeTypeVP8].PayloadType) + ws = fmt.Sprintf("appsrc format=time name=source ! application/x-rtp,%s ! %s ! queue ! decodebin ! videoconvert ! testsink name=test", codec.ToCaps(oc), codec.Depayloader) + } + + r, sink := input(rs) + w, src, test := output(ws) + + tc, err := transcode.NewTranscoder( + ic, + transcode.ToOutputCodec(oc)) + if err != nil { + t.Errorf("failed to create transcoder: %v", err) + return + } + + r.GetPipelineBus().AddWatch(MonitorPipeline(mainLoop, r)) + w.GetPipelineBus().AddWatch(MonitorPipeline(mainLoop, w)) + + sink.SetCallbacks(&app.SinkCallbacks{ + NewSampleFunc: func(sink *app.Sink) gst.FlowReturn { + sample := sink.PullSample() + if sample == nil { + return gst.FlowEOS + } + buffer := sample.GetBuffer() + if buffer == nil { + return gst.FlowError + } + buf := buffer.Map(gst.MapRead).Bytes() + defer buffer.Unmap() + + p := &rtp.Packet{} + if err := p.Unmarshal(buf); err != nil { + t.Errorf("failed to unmarshal packet: %v", err) + return gst.FlowError + } + if err := tc.WriteRTP(p); err != nil { + t.Errorf("failed to write packet: %v", err) + return gst.FlowError + } + return gst.FlowOK + }, + }) + + src.SetCallbacks(&app.SourceCallbacks{ + NeedDataFunc: func(src *app.Source, length uint) { + p, err := tc.ReadRTP() + if err != nil { + t.Errorf("failed to read packet: %v", err) + return + } + buf, err := p.Marshal() + if err != nil { + t.Errorf("failed to marshal packet: %v", err) + return + } + + buffer := gst.NewBufferWithSize(int64(len(buf))) + + buffer.Map(gst.MapWrite).WriteData(buf) + buffer.Unmap() + + src.PushBuffer(buffer) + }, + }) + + r.SetState(gst.StatePlaying) + w.SetState(gst.StatePlaying) + defer r.SetState(gst.StateNull) + defer w.SetState(gst.StateNull) + + mainLoop.Run() + + bc, err := test.GetProperty("buffer-count") + if err != nil { + t.Errorf("failed to get buffer count: %v", err) + return + } + if bc.(int64) == 0 { + t.Errorf("buffer count is %d, expected >0", bc.(int64)) + return + } + }) } } - -// func TestTranscodingAudio(t *testing.T) { -// for mime, codec := range server.SupportedCodecs { -// if !strings.HasPrefix(mime, "audio") { -// continue -// } - -// log.Printf("playing %s", mime) - -// outputCodec := server.DefaultOutputCodecs[mime] - -// tc, err := transcode.NewTranscoder( -// server.DefaultOutputCodecs[webrtc.MimeTypeOpus], -// transcode.ToOutputCodec(outputCodec)) -// if err != nil { -// t.Errorf("failed to create transcoder: %v", err) -// continue -// } - -// reader, err := reader(fmt.Sprintf("filesrc location=input.ogg ! oggdemux ! rtpopuspay pt=%d mtu=1200 ! appsink name=sink", server.DefaultOutputCodecs[webrtc.MimeTypeOpus].PayloadType)) -// if err != nil { -// t.Errorf("failed to create bin: %v", err) -// } -// writer, err := writer(fmt.Sprintf("appsrc format=time is-live=true name=source ! application/x-rtp,%s ! %s ! queue ! decodebin ! audioconvert ! pulsesink provide-clock=false", codec.ToCaps(outputCodec), codec.Depayloader)) -// if err != nil { -// t.Errorf("failed to create bin: %v", err) -// } - -// var wg sync.WaitGroup -// wg.Add(2) -// go func() { -// rtpio.CopyRTP(tc, reader) -// tc.Close() -// wg.Done() -// }() -// go func() { -// rtpio.CopyRTP(writer, tc) -// writer.Close() -// wg.Done() -// }() -// wg.Wait() -// } -// } diff --git a/test/output.avi b/test/output.avi new file mode 100644 index 0000000..e69de29 diff --git a/test/output.ivf b/test/output.ivf new file mode 100644 index 0000000..e69de29