diff --git a/client/src/em/em_stream_client.c b/client/src/em/em_stream_client.c index 28b7b2d6..6b376e6b 100644 --- a/client/src/em/em_stream_client.c +++ b/client/src/em/em_stream_client.c @@ -14,6 +14,10 @@ #include "gst_common.h" // for em_sample #include "em/em_egl.h" +#include "electricmaple.pb.h" + +#include + #include "os/os_threading.h" #include @@ -26,7 +30,9 @@ #include #include #include +#include #include +#include #include #include @@ -129,6 +135,8 @@ typedef enum } EmStreamClientProperty; #endif +#define RTP_TWOBYTES_HDR_EXT_ID 1 // Must be in the [1,15] range + // clang-format off #define SINK_CAPS \ "video/x-raw(" GST_CAPS_FEATURE_MEMORY_GL_MEMORY "), " \ @@ -261,6 +269,68 @@ em_stream_client_class_init(EmStreamClientClass *klass) #endif +static inline bool +em_stream_client_extract_frame_data(GstBuffer *buffer, em_proto_DownMessage *msg) +{ + + GstRTPBuffer rtp_buffer = GST_RTP_BUFFER_INIT; + + // extract Downstream metadata from rtp header + if (!gst_rtp_buffer_map(buffer, GST_MAP_WRITE, &rtp_buffer)) { + ALOGE("Failed to map GstBuffer"); + return false; + } + + // Not all buffers has extension data attached, check. + if (!gst_rtp_buffer_get_extension(&rtp_buffer)) { + goto no_buf; + } + uint8_t buf[em_proto_DownMessage_size] = {}; + guint size = 0; + if (!gst_rtp_buffer_get_extension_twobytes_header(&rtp_buffer, NULL, RTP_TWOBYTES_HDR_EXT_ID, + 0 /* NOTE: We do not support multi-extension-elements.*/, + &buf, &size)) { + + ALOGE("Could not retrieve twobyte rtp extension on buffer!"); + goto no_buf; + } + pb_istream_t our_istream = pb_istream_from_buffer(buf, size); + + bool result = pb_decode_ex(&our_istream, em_proto_DownMessage_fields, msg, PB_DECODE_NULLTERMINATED); + + if (!result) { + ALOGE("Error! %s", PB_GET_ERROR(&our_istream)); + goto no_buf; + } + + gst_rtp_buffer_unmap(&rtp_buffer); + return true; + +no_buf: + gst_rtp_buffer_unmap(&rtp_buffer); + return false; +} + +static inline XrQuaternionf +quat_to_openxr(const em_proto_Quaternion *q) +{ + return (XrQuaternionf){q->x, q->y, q->z, q->w}; +} +static inline XrVector3f +vec3_to_openxr(const em_proto_Vec3 *v) +{ + return (XrVector3f){v->x, v->y, v->z}; +} + +static inline XrPosef +pose_to_openxr(const em_proto_Pose *p) +{ + return (XrPosef){ + p->has_orientation ? quat_to_openxr(&p->orientation) : (XrQuaternionf){0, 0, 0, 1}, + p->has_position ? vec3_to_openxr(&p->position) : (XrVector3f){0, 0, 0}, + }; +} + /* * callbacks */ @@ -346,6 +416,7 @@ on_new_sample_cb(GstAppSink *appsink, gpointer user_data) GstSample *prevSample = NULL; GstSample *sample = gst_app_sink_pull_sample(appsink); g_assert_nonnull(sample); + { g_autoptr(GMutexLocker) locker = g_mutex_locker_new(&sc->sample_mutex); prevSample = sc->sample; @@ -612,9 +683,30 @@ em_stream_client_try_pull_sample(EmStreamClient *sc, struct timespec *out_decode } *out_decode_end = decode_end; + struct em_sc_sample *ret = calloc(1, sizeof(struct em_sc_sample)); + // ALOGE("FRED: GOT A SAMPLE !!!"); GstBuffer *buffer = gst_sample_get_buffer(sample); GstCaps *caps = gst_sample_get_caps(sample); + GstRTPBuffer rtp_buffer = GST_RTP_BUFFER_INIT; + + // extract Downstream metadata from rtp header + em_proto_DownMessage msg = em_proto_DownMessage_init_default; + if (em_stream_client_extract_frame_data(buffer, &msg)) { + ALOGI("RYLIE: got downstream frame message"); + } + if (msg.has_frame_data && msg.frame_data.has_P_localSpace_view0 && msg.frame_data.has_P_localSpace_view1) { + // OK we have a message for this one. + ALOGI("RYLIE: got downstream frame message with poses!"); + // TODO is it too late to get it here? + ret->base.have_poses = true; + ret->base.poses[0] = pose_to_openxr(&msg.frame_data.P_localSpace_view0); + ret->base.poses[1] = pose_to_openxr(&msg.frame_data.P_localSpace_view1); + + // TODO: use msg.frame_id (and others) and populate properly inside stream client. + // ... + // ... + } GstVideoInfo info; gst_video_info_from_caps(&info, caps); @@ -630,8 +722,6 @@ em_stream_client_try_pull_sample(EmStreamClient *sc, struct timespec *out_decode } #endif - struct em_sc_sample *ret = calloc(1, sizeof(struct em_sc_sample)); - GstVideoFrame frame; GstMapFlags flags = (GstMapFlags)(GST_MAP_READ | GST_MAP_GL); gst_video_frame_map(&frame, &info, buffer, flags); diff --git a/client/src/em/gst_common.h b/client/src/em/gst_common.h index d8f51f01..df9a1780 100644 --- a/client/src/em/gst_common.h +++ b/client/src/em/gst_common.h @@ -36,4 +36,7 @@ struct em_sample { GLuint frame_texture_id; GLenum frame_texture_target; + + bool have_poses; + XrPosef poses[2]; }; diff --git a/proto/electricmaple.proto b/proto/electricmaple.proto index 3b18cf23..18649286 100644 --- a/proto/electricmaple.proto +++ b/proto/electricmaple.proto @@ -101,8 +101,9 @@ message UpMessage { message DownFrameDataMessage { int64 frame_sequence_id = 1; - Pose P_localSpace_viewSpace = 2; - int64 display_time = 3; + Pose P_localSpace_view0 = 2; // Left view + Pose P_localSpace_view1 = 3; // Right view + int64 display_time = 4; // TODO fovs here } diff --git a/proto/generated/electricmaple.pb.h b/proto/generated/electricmaple.pb.h index 667fc0f2..e536998b 100644 --- a/proto/generated/electricmaple.pb.h +++ b/proto/generated/electricmaple.pb.h @@ -127,8 +127,10 @@ typedef struct _em_proto_UpMessage { typedef struct _em_proto_DownFrameDataMessage { int64_t frame_sequence_id; - bool has_P_localSpace_viewSpace; - em_proto_Pose P_localSpace_viewSpace; + bool has_P_localSpace_view0; + em_proto_Pose P_localSpace_view0; /* Left view */ + bool has_P_localSpace_view1; + em_proto_Pose P_localSpace_view1; /* Right view */ int64_t display_time; /* TODO fovs here */ } em_proto_DownFrameDataMessage; @@ -177,7 +179,7 @@ extern "C" { #define em_proto_TouchControllerRight_init_default {false, em_proto_InputClickTouch_init_default, false, em_proto_InputClickTouch_init_default, false, em_proto_InputClickTouch_init_default, false, em_proto_TouchControllerCommon_init_default} #define em_proto_UpFrameMessage_init_default {0, 0, 0, 0} #define em_proto_UpMessage_init_default {0, false, em_proto_TrackingMessage_init_default, false, em_proto_UpFrameMessage_init_default} -#define em_proto_DownFrameDataMessage_init_default {0, false, em_proto_Pose_init_default, 0} +#define em_proto_DownFrameDataMessage_init_default {0, false, em_proto_Pose_init_default, false, em_proto_Pose_init_default, 0} #define em_proto_DownMessage_init_default {false, em_proto_DownFrameDataMessage_init_default} #define em_proto_Quaternion_init_zero {0, 0, 0, 0} #define em_proto_Vec3_init_zero {0, 0, 0} @@ -192,7 +194,7 @@ extern "C" { #define em_proto_TouchControllerRight_init_zero {false, em_proto_InputClickTouch_init_zero, false, em_proto_InputClickTouch_init_zero, false, em_proto_InputClickTouch_init_zero, false, em_proto_TouchControllerCommon_init_zero} #define em_proto_UpFrameMessage_init_zero {0, 0, 0, 0} #define em_proto_UpMessage_init_zero {0, false, em_proto_TrackingMessage_init_zero, false, em_proto_UpFrameMessage_init_zero} -#define em_proto_DownFrameDataMessage_init_zero {0, false, em_proto_Pose_init_zero, 0} +#define em_proto_DownFrameDataMessage_init_zero {0, false, em_proto_Pose_init_zero, false, em_proto_Pose_init_zero, 0} #define em_proto_DownMessage_init_zero {false, em_proto_DownFrameDataMessage_init_zero} /* Field tags (for use in manual encoding/decoding) */ @@ -243,8 +245,9 @@ extern "C" { #define em_proto_UpMessage_tracking_tag 2 #define em_proto_UpMessage_frame_tag 3 #define em_proto_DownFrameDataMessage_frame_sequence_id_tag 1 -#define em_proto_DownFrameDataMessage_P_localSpace_viewSpace_tag 2 -#define em_proto_DownFrameDataMessage_display_time_tag 3 +#define em_proto_DownFrameDataMessage_P_localSpace_view0_tag 2 +#define em_proto_DownFrameDataMessage_P_localSpace_view1_tag 3 +#define em_proto_DownFrameDataMessage_display_time_tag 4 #define em_proto_DownMessage_frame_data_tag 1 /* Struct field encoding specification for nanopb */ @@ -371,11 +374,13 @@ X(a, STATIC, OPTIONAL, MESSAGE, frame, 3) #define em_proto_DownFrameDataMessage_FIELDLIST(X, a) \ X(a, STATIC, SINGULAR, INT64, frame_sequence_id, 1) \ -X(a, STATIC, OPTIONAL, MESSAGE, P_localSpace_viewSpace, 2) \ -X(a, STATIC, SINGULAR, INT64, display_time, 3) +X(a, STATIC, OPTIONAL, MESSAGE, P_localSpace_view0, 2) \ +X(a, STATIC, OPTIONAL, MESSAGE, P_localSpace_view1, 3) \ +X(a, STATIC, SINGULAR, INT64, display_time, 4) #define em_proto_DownFrameDataMessage_CALLBACK NULL #define em_proto_DownFrameDataMessage_DEFAULT NULL -#define em_proto_DownFrameDataMessage_P_localSpace_viewSpace_MSGTYPE em_proto_Pose +#define em_proto_DownFrameDataMessage_P_localSpace_view0_MSGTYPE em_proto_Pose +#define em_proto_DownFrameDataMessage_P_localSpace_view1_MSGTYPE em_proto_Pose #define em_proto_DownMessage_FIELDLIST(X, a) \ X(a, STATIC, OPTIONAL, MESSAGE, frame_data, 1) @@ -417,8 +422,8 @@ extern const pb_msgdesc_t em_proto_DownMessage_msg; #define em_proto_DownMessage_fields &em_proto_DownMessage_msg /* Maximum encoded size of messages (where known) */ -#define em_proto_DownFrameDataMessage_size 63 -#define em_proto_DownMessage_size 65 +#define em_proto_DownFrameDataMessage_size 104 +#define em_proto_DownMessage_size 106 #define em_proto_InputClickTouch_size 4 #define em_proto_InputThumbstick_size 16 #define em_proto_InputValueTouch_size 7 diff --git a/server/CMakeLists.txt b/server/CMakeLists.txt index 4cbb3849..9398da89 100644 --- a/server/CMakeLists.txt +++ b/server/CMakeLists.txt @@ -44,6 +44,7 @@ find_package(PkgConfig REQUIRED) # GLib packages we'll need pkg_check_modules(GLIB REQUIRED glib-2.0) pkg_check_modules(GST_SDP REQUIRED gstreamer-sdp-1.0) +pkg_check_modules(GST_RTP REQUIRED gstreamer-rtp-1.0) pkg_check_modules(GST_WEBRTC REQUIRED gstreamer-webrtc-1.0) pkg_check_modules(GST REQUIRED gstreamer-plugins-base-1.0) pkg_check_modules(GST REQUIRED gstreamer-plugins-bad-1.0) diff --git a/server/src/ems/CMakeLists.txt b/server/src/ems/CMakeLists.txt index 702fd128..9df50804 100644 --- a/server/src/ems/CMakeLists.txt +++ b/server/src/ems/CMakeLists.txt @@ -27,6 +27,7 @@ target_link_libraries( comp_util comp_multi ems_gst + em_proto ) target_include_directories(comp_ems PUBLIC . ${GST_INCLUDE_DIRS}) diff --git a/server/src/ems/ems_compositor.cpp b/server/src/ems/ems_compositor.cpp index 68965aff..813de911 100644 --- a/server/src/ems/ems_compositor.cpp +++ b/server/src/ems/ems_compositor.cpp @@ -17,6 +17,9 @@ #include "ems_compositor.h" + +#include "electricmaple.pb.h" + #include "gstreamer/gst_internal.h" #include "os/os_time.h" @@ -35,6 +38,7 @@ #include "vk/vk_cmd.h" #include "vk/vk_cmd_pool.h" + #include #include @@ -333,6 +337,16 @@ compositor_init_sys_info(struct ems_compositor *c, struct xrt_device *xdev) return true; } +static inline em_proto_Pose +to_proto(const struct xrt_pose &pose) +{ + em_proto_Pose ret = em_proto_Pose_init_default; + ret.has_position = true; + ret.position = {pose.position.x, pose.position.y, pose.position.z}; + ret.has_orientation = true; + ret.orientation = {pose.orientation.w, pose.orientation.x, pose.orientation.y, pose.orientation.z}; + return ret; +} /* * @@ -508,7 +522,20 @@ pack_blit_and_encode(struct ems_compositor *c, wrap->base_frame.source_timestamp = wrap->base_frame.timestamp; wrap->base_frame.source_sequence = c->image_sequence++; wrap->base_frame.source_id = 0; - wrap = NULL; + + // set the latest Downstream mesg before pushing the frame + em_proto_DownMessage msg = em_proto_DownMessage_init_default; + msg.has_frame_data = true; + msg.frame_data.frame_sequence_id = wrap->base_frame.source_sequence; + msg.frame_data.has_P_localSpace_view0 = true; + msg.frame_data.P_localSpace_view0 = to_proto(lvd->pose); + msg.frame_data.has_P_localSpace_view1 = true; + msg.frame_data.P_localSpace_view1 = to_proto(rvd->pose); + // msg.frame_datadisplay_time; /* Needed ?*/ + + wrap = NULL; // important to keep this line after setting "msg.frame_sequence_id" above. + + ems_gstreamer_pipeline_set_down_msg(c->gstreamer_pipeline, &msg); if (!c->pipeline_playing) { ems_gstreamer_pipeline_play(c->gstreamer_pipeline); diff --git a/server/src/ems/gst/CMakeLists.txt b/server/src/ems/gst/CMakeLists.txt index 5e3b89ed..b90ffed9 100644 --- a/server/src/ems/gst/CMakeLists.txt +++ b/server/src/ems/gst/CMakeLists.txt @@ -14,6 +14,7 @@ target_link_libraries( aux_gstreamer ${GST_LIBRARIES} ${GST_SDP_LIBRARIES} + ${GST_RTP_LIBRARIES} ${GST_WEBRTC_LIBRARIES} ${GLIB_LIBRARIES} ${LIBSOUP_LIBRARIES} diff --git a/server/src/ems/gst/ems_gstreamer_pipeline.c b/server/src/ems/gst/ems_gstreamer_pipeline.c index 376c344f..484856b2 100644 --- a/server/src/ems/gst/ems_gstreamer_pipeline.c +++ b/server/src/ems/gst/ems_gstreamer_pipeline.c @@ -22,9 +22,11 @@ #include "util/u_misc.h" #include "util/u_debug.h" -#include "pb_decode.h" #include "electricmaple.pb.h" +#include +#include + // Monado includes #include "gstreamer/gst_internal.h" #include "gstreamer/gst_pipeline.h" @@ -34,6 +36,7 @@ #include #include #include +#include #define GST_USE_UNSTABLE_API #include @@ -51,6 +54,10 @@ #define DEFAULT_VIDEOSINK " videoconvert ! autovideosink " #endif +// TODO: Can we define the below at a higher level so it can also be +// picked-up by em_stream_client ? +#define RTP_TWOBYTES_HDR_EXT_ID 1 // Must be in the [1,15] range +#define RTP_TWOBYTES_HDR_EXT_MAX_SIZE 255 EmsSignalingServer *signaling_server; @@ -67,11 +74,11 @@ struct ems_gstreamer_pipeline GObject *data_channel; guint timeout_src_id; + GBytes *downMsg_bytes; struct ems_callbacks *callbacks; }; - static gboolean sigint_handler(gpointer user_data) { @@ -243,6 +250,57 @@ data_channel_message_string_cb(GstWebRTCDataChannel *datachannel, gchar *str, st U_LOG_I("Received data channel message: %s\n", str); } +GstPadProbeReturn +webrtcbin_srcpad_probe(GstPad *pad, GstPadProbeInfo *info, gpointer user_data) +{ + (void)pad; + + GstBuffer *buffer; + GstRTPBuffer rtp_buffer = GST_RTP_BUFFER_INIT; + gconstpointer extension_data; + size_t extension_size; + + struct ems_gstreamer_pipeline *egp = (struct ems_gstreamer_pipeline *)user_data; + + if (GST_PAD_PROBE_INFO_TYPE(info) & GST_PAD_PROBE_TYPE_BUFFER_LIST) { + U_LOG_E("Received BufferList in webrtcbin srcpad! No support for BufferList!\n"); + return GST_PAD_PROBE_REMOVE; + } + + buffer = gst_pad_probe_info_get_buffer(info); + + buffer = gst_buffer_make_writable(buffer); + + if (!gst_rtp_buffer_map(buffer, GST_MAP_WRITE, &rtp_buffer)) { + U_LOG_E("Failed to map GstBuffer\n"); + return GST_PAD_PROBE_REMOVE; + } + + // Add extension data only on last Access Unit, indicated by the marker bit. + if (!gst_rtp_buffer_get_marker(&rtp_buffer)) { + gst_rtp_buffer_unmap(&rtp_buffer); + return GST_PAD_PROBE_OK; // Nothing's wrong, we keep the pad active. + } + + // Inject extension data + extension_data = g_bytes_get_data(egp->downMsg_bytes, &extension_size); + + if (extension_size > RTP_TWOBYTES_HDR_EXT_MAX_SIZE) { + U_LOG_E("size of data in rtp header is too large ! Implement multi-extension-element support !"); + gst_rtp_buffer_unmap(&rtp_buffer); + return GST_PAD_PROBE_REMOVE; + } + + if (!gst_rtp_buffer_add_extension_twobytes_header(&rtp_buffer, 0 /* appbits */, RTP_TWOBYTES_HDR_EXT_ID, + extension_data, (guint)extension_size)) { + U_LOG_E("Failed to add extension data !"); + return GST_PAD_PROBE_REMOVE; + } + + gst_rtp_buffer_unmap(&rtp_buffer); + + return GST_PAD_PROBE_OK; +} static void webrtc_client_connected_cb(EmsSignalingServer *server, EmsClientId client_id, struct ems_gstreamer_pipeline *egp) @@ -250,6 +308,7 @@ webrtc_client_connected_cb(EmsSignalingServer *server, EmsClientId client_id, st GstBin *pipeline = GST_BIN(egp->base.pipeline); gchar *name; GstElement *webrtcbin; + GstPad *webrtcbin_srcpad; GstCaps *caps; GstStateChangeReturn ret; GstWebRTCRTPTransceiver *transceiver; @@ -287,6 +346,19 @@ webrtc_client_connected_cb(EmsSignalingServer *server, EmsClientId client_id, st egp); } + // Get the srcpad associated with our webrtcbin element + webrtcbin_srcpad = gst_element_get_static_pad(webrtcbin, "src"); + + if (webrtcbin_srcpad != NULL) { + // Add a probe to call our callback when buffers get to the src pad + gst_pad_add_probe(webrtcbin_srcpad, GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_BUFFER_LIST, + webrtcbin_srcpad_probe, egp, NULL /*destroy_data*/); + + gst_object_unref(webrtcbin_srcpad); + } else { + U_LOG_E("Could not retrieve webrtcbin srcpad !"); + } + ret = gst_element_set_state(webrtcbin, GST_STATE_PLAYING); g_assert(ret != GST_STATE_CHANGE_FAILURE); @@ -532,6 +604,22 @@ loop_thread(void *data) * Exported functions. * */ +void +ems_gstreamer_pipeline_set_down_msg(struct gstreamer_pipeline *gp, em_proto_DownMessage *msg) +{ + struct ems_gstreamer_pipeline *egp = (struct ems_gstreamer_pipeline *)gp; + + uint8_t buf[em_proto_DownMessage_size]; + size_t n = 0; + pb_get_encoded_size(&n, em_proto_DownMessage_fields, &msg); + + pb_ostream_t os = pb_ostream_from_buffer(buf, sizeof(buf)); + + pb_encode(&os, em_proto_DownMessage_fields, &msg); + g_bytes_unref(egp->downMsg_bytes); + egp->downMsg_bytes = NULL; + egp->downMsg_bytes = g_bytes_new(buf, n); +} void ems_gstreamer_pipeline_play(struct gstreamer_pipeline *gp) diff --git a/server/src/ems/gst/ems_gstreamer_pipeline.h b/server/src/ems/gst/ems_gstreamer_pipeline.h index cddfa436..77fa4b42 100644 --- a/server/src/ems/gst/ems_gstreamer_pipeline.h +++ b/server/src/ems/gst/ems_gstreamer_pipeline.h @@ -13,14 +13,18 @@ #include "gstreamer/gst_pipeline.h" - #ifdef __cplusplus extern "C" { #endif -struct gstreamer_pipeline; +struct gstreamer_pipeline; struct ems_callbacks; +typedef struct _em_proto_DownMessage em_proto_DownMessage; + +void +ems_gstreamer_pipeline_set_down_msg(struct gstreamer_pipeline *gp, em_proto_DownMessage *msg); + void ems_gstreamer_pipeline_play(struct gstreamer_pipeline *gp);