diff --git a/README.md b/README.md index 8fa0b8a..031f805 100644 --- a/README.md +++ b/README.md @@ -50,7 +50,7 @@ Application Options: # Testing the WHEP client -The WHEP client only requires a few arguments, namely the WHEP endpoint to subscribe to (e.g., an endpoint created in the [Simple WHEP Server](https://github.com/meetecho/simple-whep-server)) and the audio and/or video caps of the codecs you expect to receive. If codecs match, incoming streams are automatically created from the negotiation process, and rendered accordingly. +The WHEP client only requires a few arguments, namely the WHEP endpoint to subscribe to (e.g., an endpoint created in the [Simple WHEP Server](https://github.com/meetecho/simple-whep-server)) and the audio and/or video caps of the codecs you expect to receive. It will initially attempt to send an SDP offer, but if the server changes stance to reply with a server-sent offer instead (via a 406 response), it will automatically switch mode accordingly and prepare an answer to send back. If at the end of the exchange (however it happened) codecs match, incoming streams are automatically created from the negotiation process, and rendered accordingly. A simple example, that assumes the specified endpoint requires the "verysecret" token via Bearer authorization, is the following: diff --git a/src/whep-client.c b/src/whep-client.c index 89a67fa..e88cffc 100644 --- a/src/whep-client.c +++ b/src/whep-client.c @@ -41,6 +41,7 @@ enum whep_state { WHEP_STATE_CONNECTED, WHEP_STATE_SUBSCRIBING, WHEP_STATE_OFFER_PREPARED, + WHEP_STATE_ANSWER_PREPARED, WHEP_STATE_STARTED, WHEP_STATE_API_ERROR, WHEP_STATE_ERROR @@ -55,6 +56,7 @@ static gboolean no_trickle = FALSE, gathering_done = FALSE, static const char *stun_server = NULL, **turn_server = NULL; static char *auto_stun_server = NULL, **auto_turn_server = NULL; static int latency = -1; +static gboolean server_sent_offer = FALSE; /* API properties */ static enum whep_state state = 0; @@ -68,9 +70,10 @@ static GAsyncQueue *candidates = NULL; /* Helper methods and callbacks */ static gboolean whep_check_plugins(void); static void whep_options(void); -static gboolean whep_initialize(void); +static gboolean whep_initialize(gboolean offer); static void whep_negotiation_needed(GstElement *element, gpointer user_data); static void whep_offer_available(GstPromise *promise, gpointer user_data); +static void whep_answer_available(GstPromise *promise, gpointer user_data); static void whep_candidate(GstElement *webrtc G_GNUC_UNUSED, guint mlineindex, char *candidate, gpointer user_data G_GNUC_UNUSED); static gboolean whep_send_candidates(gpointer user_data); @@ -83,8 +86,9 @@ static void whep_ice_connection_state(GstElement *webrtc, GParamSpec *pspec, static void whep_dtls_connection_state(GstElement *dtls, GParamSpec *pspec, gpointer user_data G_GNUC_UNUSED); static void whep_connect(GstWebRTCSessionDescription *offer); +static void whep_answer(GstWebRTCSessionDescription *answer); static void whep_process_link_header(char *link); -static gboolean whep_parse_offer(char *sdp_offer); +static gboolean whep_parse_sdp(char *sdp_object); static void whep_disconnect(char *reason); static void whep_incoming_stream(GstElement *webrtc, GstPad *pad, gpointer user_data); @@ -231,7 +235,7 @@ int main(int argc, char *argv[]) { if(follow_link) whep_options(); /* Initialize the stack (and then connect to the WHEP endpoint) */ - if(!whep_initialize()) + if(!whep_initialize(TRUE)) exit(1); /* Loop forever */ @@ -354,7 +358,7 @@ static gboolean source_events(GstPad *pad, GstObject *parent, GstEvent *event) { } /* Helper method to initialize the GStreamer WebRTC stack */ -static gboolean whep_initialize(void) { +static gboolean whep_initialize(gboolean offer) { /* Prepare the pipeline, using the info we got from the command line */ pipeline = gst_pipeline_new(NULL); pc = gst_element_factory_make("webrtcbin", NULL); @@ -367,14 +371,14 @@ static gboolean whep_initialize(void) { g_object_set(pc, "ice-transport-policy", 1, NULL); gst_bin_add(GST_BIN(pipeline), pc); /* Add transceivers to receive audio and/or video */ - if(audio_caps) { + if(offer && audio_caps) { GstWebRTCRTPTransceiver *transceiver = NULL; GstCaps *caps = gst_caps_from_string(audio_caps); g_signal_emit_by_name(pc, "add-transceiver", GST_WEBRTC_RTP_TRANSCEIVER_DIRECTION_RECVONLY, caps, &transceiver); gst_caps_unref(caps); gst_object_unref(transceiver); } - if(video_caps) { + if(offer && video_caps) { GstWebRTCRTPTransceiver *transceiver = NULL; GstCaps *caps = gst_caps_from_string(video_caps); g_signal_emit_by_name(pc, "add-transceiver", GST_WEBRTC_RTP_TRANSCEIVER_DIRECTION_RECVONLY, caps, &transceiver); @@ -409,7 +413,8 @@ static gboolean whep_initialize(void) { } } /* Let's configure the function to be invoked when an SDP offer can be prepared */ - g_signal_connect(pc, "on-negotiation-needed", G_CALLBACK(whep_negotiation_needed), NULL); + if(offer) + g_signal_connect(pc, "on-negotiation-needed", G_CALLBACK(whep_negotiation_needed), NULL); /* We need a different callback to be notified about candidates to trickle to Janus */ g_signal_connect(pc, "on-ice-candidate", G_CALLBACK(whep_candidate), NULL); /* We also add a couple of callbacks to be notified about connection state changes */ @@ -496,6 +501,39 @@ static void whep_offer_available(GstPromise *promise, gpointer user_data) { } } +/* Callback invoked when we have an SDP answer ready to be sent (server-sent offers) */ +static GstWebRTCSessionDescription *answer = NULL; +static void whep_answer_available(GstPromise *promise, gpointer user_data) { + WHEP_PREFIX(LOG_INFO, "Answer created\n"); + /* Make sure we're in the right state */ + g_assert_cmphex(state, ==, WHEP_STATE_ANSWER_PREPARED); + g_assert_cmphex(gst_promise_wait(promise), ==, GST_PROMISE_RESULT_REPLIED); + const GstStructure *reply = gst_promise_get_reply(promise); + gst_structure_get(reply, "answer", GST_TYPE_WEBRTC_SESSION_DESCRIPTION, &answer, NULL); + gst_promise_unref(promise); + + /* Set the local description locally */ + WHEP_PREFIX(LOG_INFO, "Setting local description\n"); + promise = gst_promise_new(); + g_signal_emit_by_name(pc, "set-local-description", answer, promise); + gst_promise_interrupt(promise); + gst_promise_unref(promise); + + /* Now that a DTLS stack is available, try monitoring the DTLS state too */ + GstElement *dtls = gst_bin_get_by_name(GST_BIN(pc), "dtlsdec0"); + g_signal_connect(dtls, "notify::connection-state", G_CALLBACK(whep_dtls_connection_state), NULL); + gst_object_unref(dtls); + + /* Now that the answer is ready, send it to the server via PATCH + * (unless we're not tricking, in which case we wait for gathering to be + * completed, and then add all candidates to this answer before sending it) */ + if(!no_trickle || gathering_done) { + whep_answer(answer); + gst_webrtc_session_description_free(answer); + answer = NULL; + } +} + /* Callback invoked when a candidate to trickle becomes available */ static void whep_candidate(GstElement *webrtc G_GNUC_UNUSED, guint mlineindex, char *candidate, gpointer user_data G_GNUC_UNUSED) { @@ -606,9 +644,15 @@ static void whep_ice_gathering_state(GstElement *webrtc, GParamSpec *pspec, gathering_done = TRUE; /* If we're not trickling, send the SDP with all candidates now */ if(no_trickle) { - whep_connect(offer); - gst_webrtc_session_description_free(offer); - offer = NULL; + if(!server_sent_offer) { + whep_connect(offer); + gst_webrtc_session_description_free(offer); + offer = NULL; + } else { + whep_answer(answer); + gst_webrtc_session_description_free(answer); + answer = NULL; + } } break; default: @@ -716,7 +760,7 @@ static void whep_connect(GstWebRTCSessionDescription *offer) { WHEP_LOG(LOG_VERB, "%s\n", sdp_offer); /* Partially parse the SDP to find ICE credentials and the mid for the bundle m-line */ - if(!whep_parse_offer(sdp_offer)) { + if(!whep_parse_sdp(sdp_offer)) { whep_disconnect("SDP error"); return; } @@ -726,7 +770,9 @@ static void whep_connect(GstWebRTCSessionDescription *offer) { GBytes *bytes = NULL; guint status = whep_http_send(&session, "POST", (char *)server_url, sdp_offer, "application/sdp", &bytes); g_free(sdp_offer); - if(status != 201) { + /* 201 means the server accepted our client-sent offer, 406 means the + * server may be asking to switch stance to s server-sent offer instead*/ + if(status != 201 && status != 406) { /* Didn't get the success we were expecting */ WHEP_LOG(LOG_ERR, " [%u] %s\n", status, status ? soup_message_get_reason_phrase(session.msg) : "HTTP error"); g_object_unref(session.msg); @@ -736,6 +782,21 @@ static void whep_connect(GstWebRTCSessionDescription *offer) { whep_disconnect("HTTP error"); return; } + if(status == 406) { + WHEP_PREFIX(LOG_INFO, "Server switched to server-sent offers (406)\n"); + /* FIXME Reset the pipeline */ + pipeline = NULL; + pc = NULL; + WHEP_PREFIX(LOG_INFO, "Re-initializing the pipeline\n"); + if(!whep_initialize(FALSE)) { + g_object_unref(session.msg); + g_object_unref(session.http_conn); + if(bytes != NULL) + g_bytes_unref(bytes); + whep_disconnect("Stance change error"); + return; + } + } /* Get the response */ const char *content_type = soup_message_headers_get_content_type(soup_message_get_response_headers(session.msg), NULL); if(content_type == NULL || strcasecmp(content_type, "application/sdp")) { @@ -749,7 +810,7 @@ static void whep_connect(GstWebRTCSessionDescription *offer) { } /* Get the body */ if(bytes == NULL || g_bytes_get_size(bytes) == 0) { - WHEP_LOG(LOG_ERR, "Missing SDP answer\n"); + WHEP_LOG(LOG_ERR, "Missing SDP\n"); g_object_unref(session.msg); g_object_unref(session.http_conn); if(bytes != NULL) @@ -757,12 +818,12 @@ static void whep_connect(GstWebRTCSessionDescription *offer) { whep_disconnect("SDP error"); return; } - char *answer = g_malloc(g_bytes_get_size(bytes) + 1); - memcpy(answer, g_bytes_get_data(bytes, NULL), g_bytes_get_size(bytes)); - answer[g_bytes_get_size(bytes)] = '\0'; + char *sdp_body = g_malloc(g_bytes_get_size(bytes) + 1); + memcpy(sdp_body, g_bytes_get_data(bytes, NULL), g_bytes_get_size(bytes)); + sdp_body[g_bytes_get_size(bytes)] = '\0'; g_bytes_unref(bytes); - if(strstr(answer, "v=0\r\n") != answer) { - WHEP_LOG(LOG_ERR, "Invalid SDP answer\n"); + if(strstr(sdp_body, "v=0\r\n") != sdp_body) { + WHEP_LOG(LOG_ERR, "Invalid SDP\n"); g_object_unref(session.msg); g_object_unref(session.http_conn); whep_disconnect("SDP error"); @@ -834,14 +895,14 @@ static void whep_connect(GstWebRTCSessionDescription *offer) { g_source_unref(patch_timer); } - /* Process the SDP answer */ - WHEP_PREFIX(LOG_INFO, "Received SDP answer (%zu bytes)\n", strlen(answer)); - WHEP_LOG(LOG_VERB, "%s\n", answer); + /* Process the SDP offer or answer */ + WHEP_PREFIX(LOG_INFO, "Received SDP (%zu bytes)\n", strlen(sdp_body)); + WHEP_LOG(LOG_VERB, "%s\n", sdp_body); /* Check if there are any candidates in the SDP: we'll need to fake trickles in case */ - if(strstr(answer, "candidate") != NULL) { + if(strstr(sdp_body, "candidate") != NULL) { int mlines = 0, i = 0; - gchar **lines = g_strsplit(answer, "\r\n", -1); + gchar **lines = g_strsplit(sdp_body, "\r\n", -1); gchar *line = NULL; while(lines[i] != NULL) { line = lines[i]; @@ -868,14 +929,14 @@ static void whep_connect(GstWebRTCSessionDescription *offer) { WHEP_LOG(LOG_ERR, "Error initializing SDP object (%d)\n", ret); g_object_unref(session.msg); g_object_unref(session.http_conn); - g_free(answer); + g_free(sdp_body); whep_disconnect("SDP error"); return; } - ret = gst_sdp_message_parse_buffer((guint8 *)answer, strlen(answer), sdp); + ret = gst_sdp_message_parse_buffer((guint8 *)sdp_body, strlen(sdp_body), sdp); g_object_unref(session.msg); g_object_unref(session.http_conn); - g_free(answer); + g_free(sdp_body); if(ret != GST_SDP_OK) { /* Something went wrong */ gst_sdp_message_free(sdp); @@ -883,7 +944,8 @@ static void whep_connect(GstWebRTCSessionDescription *offer) { whep_disconnect("SDP error"); return; } - GstWebRTCSessionDescription *gst_sdp = gst_webrtc_session_description_new(GST_WEBRTC_SDP_TYPE_ANSWER, sdp); + GstWebRTCSessionDescription *gst_sdp = gst_webrtc_session_description_new( + status == 201 ? GST_WEBRTC_SDP_TYPE_ANSWER : GST_WEBRTC_SDP_TYPE_OFFER, sdp); /* Set remote description on our pipeline */ WHEP_PREFIX(LOG_INFO, "Setting remote description\n"); @@ -892,6 +954,93 @@ static void whep_connect(GstWebRTCSessionDescription *offer) { gst_promise_interrupt(promise); gst_promise_unref(promise); gst_webrtc_session_description_free(gst_sdp); + + if(status == 201) { + /* Client-side offer, we're done */ + return; + } + /* If we got here, we're in server-sent offer mode, which means we + * must create an answer to send back via an HTTP PATCH request */ + /* Finally, create an answer to send back */ + WHEP_PREFIX(LOG_INFO, "Creating answer\n"); + state = WHEP_STATE_ANSWER_PREPARED; + promise = gst_promise_new_with_change_func(whep_answer_available, NULL, NULL); + g_signal_emit_by_name(pc, "create-answer", NULL, promise); +} + +/* Helper method to send our answer to the WHEP resource */ +static void whep_answer(GstWebRTCSessionDescription *answer) { + /* Convert the SDP object to a string */ + char *sdp_answer = gst_sdp_message_as_text(answer->sdp); + WHEP_PREFIX(LOG_INFO, "Sending SDP answer (%zu bytes)\n", strlen(sdp_answer)); + + /* If we're not trickling, add our candidates to the SDP */ + if(no_trickle) { + /* Prepare the candidate attributes */ + char attributes[4096], expanded_sdp[8192]; + attributes[0] = '\0'; + expanded_sdp[0] = '\0'; + char *candidate = NULL; + while((candidate = g_async_queue_try_pop(candidates)) != NULL) { + WHEP_PREFIX(LOG_VERB, "Adding candidate to SDP: %s\n", candidate); + g_strlcat(attributes, "a=", sizeof(attributes)); + g_strlcat(attributes, candidate, sizeof(attributes)); + g_strlcat(attributes, "\r\n", sizeof(attributes)); + g_free(candidate); + } + /* Add them to all m-lines */ + int mlines = 0, i = 0; + gchar **lines = g_strsplit(sdp_answer, "\r\n", -1); + gchar *line = NULL; + while(lines[i] != NULL) { + line = lines[i]; + if(strstr(line, "m=") == line) { + /* New m-line */ + mlines++; + if(mlines > 1) + g_strlcat(expanded_sdp, attributes, sizeof(expanded_sdp)); + } + if(strlen(line) > 2) { + g_strlcat(expanded_sdp, line, sizeof(expanded_sdp)); + g_strlcat(expanded_sdp, "\r\n", sizeof(expanded_sdp)); + } + i++; + } + g_clear_pointer(&lines, g_strfreev); + g_strlcat(expanded_sdp, attributes, sizeof(expanded_sdp)); + g_free(sdp_answer); + sdp_answer = g_strdup(expanded_sdp); + } + WHEP_LOG(LOG_VERB, "%s\n", sdp_answer); + + /* Partially parse the SDP to find ICE credentials and the mid for the bundle m-line */ + if(!whep_parse_sdp(sdp_answer)) { + whep_disconnect("SDP error"); + return; + } + + /* Create an HTTP connection */ + whep_http_session session = { 0 }; + guint status = whep_http_send(&session, "PATCH", (char *)resource_url, sdp_answer, "application/sdp", NULL); + g_free(sdp_answer); + if(status != 200 && status != 204) { + /* Didn't get the success we were expecting */ + WHEP_LOG(LOG_ERR, " [%u] %s\n", status, status ? soup_message_get_reason_phrase(session.msg) : "HTTP error"); + g_object_unref(session.msg); + g_object_unref(session.http_conn); + whep_disconnect("HTTP error"); + return; + } + /* Check if there's an ETag we should send in upcoming requests */ + const char *etag = soup_message_headers_get_one(soup_message_get_response_headers(session.msg), "etag"); + if(etag == NULL) { + WHEP_LOG(LOG_WARN, "No ETag header, won't be able to set If-Match when trickling\n"); + } else { + latest_etag = g_strdup(etag); + } + + /* Negotiation done */ + WHEP_PREFIX(LOG_INFO, "Negotiation completed\n"); } /* Helper method to disconnect from the WHEP endpoint */ @@ -1012,8 +1161,8 @@ static guint whep_http_send(whep_http_session *session, char *method, } /* Helper method to parse SDP offers and extract stuff we need */ -static gboolean whep_parse_offer(char *sdp_offer) { - gchar **parts = g_strsplit(sdp_offer, "\n", -1); +static gboolean whep_parse_sdp(char *sdp_object) { + gchar **parts = g_strsplit(sdp_object, "\n", -1); gboolean mline = FALSE, success = TRUE, done = FALSE; if(parts) { int index = 0;