From df2c4d96c0eb56be1fba96f9a8a6d8d758013e9c Mon Sep 17 00:00:00 2001 From: Maxim Sharabayko Date: Mon, 17 Aug 2020 13:45:03 +0200 Subject: [PATCH 01/24] Added SRT bonding --- CMakeLists.txt | 1 + xtransmit/CMakeLists.txt | 9 + xtransmit/generate.cpp | 69 +++- xtransmit/generate.hpp | 5 +- xtransmit/receive.cpp | 69 +++- xtransmit/receive.hpp | 6 +- xtransmit/route.cpp | 12 +- xtransmit/route.hpp | 2 +- xtransmit/srt_socket.cpp | 4 +- xtransmit/srt_socket_group.cpp | 620 +++++++++++++++++++++++++++++++++ xtransmit/srt_socket_group.hpp | 107 ++++++ xtransmit/xtransmit-app.cpp | 27 +- 12 files changed, 895 insertions(+), 36 deletions(-) create mode 100644 xtransmit/srt_socket_group.cpp create mode 100644 xtransmit/srt_socket_group.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index d481cf6..50d5288 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -27,6 +27,7 @@ add_subdirectory(submodule/spdlog) SET(ENABLE_APPS ON) SET(ENABLE_SHARED OFF) SET(ENABLE_ENCRYPTION ON) +SET(ENABLE_EXPERIMENTAL_BONDING ON) add_subdirectory(submodule/srt) #set_target_properties(srt-live-transmit diff --git a/xtransmit/CMakeLists.txt b/xtransmit/CMakeLists.txt index 9a2a585..d8fe513 100644 --- a/xtransmit/CMakeLists.txt +++ b/xtransmit/CMakeLists.txt @@ -17,6 +17,15 @@ target_include_directories(srt-xtransmit PUBLIC ${PTHREAD_INCLUDE_DIR} ) +if (ENABLE_EXPERIMENTAL_BONDING) + set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -DENABLE_EXPERIMENTAL_BONDING=1") + set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DENABLE_EXPERIMENTAL_BONDING=1") +endif() + +if (ENABLE_STDCXX_SYNC) + set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -DENABLE_STDCXX_SYNC=1") + set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DENABLE_STDCXX_SYNC=1") +endif() if (ENABLE_CXX17) set(REQUIRE_CXX_VER 17) diff --git a/xtransmit/generate.cpp b/xtransmit/generate.cpp index 959b2dd..ffd5527 100644 --- a/xtransmit/generate.cpp +++ b/xtransmit/generate.cpp @@ -14,6 +14,7 @@ // xtransmit #include "socket_stats.hpp" #include "srt_socket.hpp" +#include "srt_socket_group.hpp" #include "udp_socket.hpp" #include "generate.hpp" #include "pacer.hpp" @@ -33,10 +34,29 @@ using shared_sock = std::shared_ptr; #define LOG_SC_GENERATE "GENERATE " +int srt_gen_listen_callback(void* opaq, SRTSOCKET sock, int hsversion, + const struct sockaddr* peeraddr, const char* streamid) +{ + spdlog::trace(LOG_SC_GENERATE "Accepted member socket 0x{:X}.", sock); + return 0; +} + +void srt_gen_connect_callback(void* opaq, SRTSOCKET sock, int error, const sockaddr* /*peer*/, int token) +{ + if (error != SRT_SUCCESS) + { + spdlog::warn(LOG_SC_GENERATE "Member socket 0x{:X} (token {}) connection failed: ({}) {}.", sock, token, error, + srt_strerror(error, 0)); + return; + } + + // After SRT v1.4.2 connection callback is no longer called on connection success. + spdlog::trace(LOG_SC_GENERATE "Member socket connected 0x{:X} (token {}).", sock, token); +} + void run_pipe(shared_sock dst, const config& cfg, const atomic_bool& force_break) { vector message_to_send(cfg.message_size); - iota(message_to_send.begin(), message_to_send.end(), (char)0); const auto start_time = steady_clock::now(); const int num_messages = cfg.duration > 0 ? -1 : cfg.num_messages; @@ -94,9 +114,19 @@ void run_pipe(shared_sock dst, const config& cfg, const atomic_bool& force_break } } -void xtransmit::generate::run(const string& dst_url, const config& cfg, const atomic_bool& force_break) +void xtransmit::generate::run(const vector& dst_urls, const config& cfg, const atomic_bool& force_break) { - const UriParser uri(dst_url); + if (dst_urls.empty()) + { + spdlog::error(LOG_SC_GENERATE "No destination URI was provided"); + return; + } + + vector urls; + for (const string& url : dst_urls) + { + urls.emplace_back(UriParser(url)); + } shared_sock sock; shared_sock connection; @@ -121,17 +151,36 @@ void xtransmit::generate::run(const string& dst_url, const config& cfg, const at do { try { - if (uri.proto() == "udp") + if (urls.size() == 1) { - connection = make_shared(uri); + if (urls[0].proto() == "udp") + { + connection = make_shared(urls[0]); + } + else + { + sock = make_shared(urls[0]); + socket::srt* s = static_cast(sock.get()); + const bool accept = s->mode() == socket::srt::LISTENER; + if (accept) + s->listen(); + connection = accept ? s->accept() : s->connect(); + } } else { - sock = make_shared(uri); - socket::srt* s = static_cast(sock.get()); - const bool accept = s->mode() == socket::srt::LISTENER; + sock = make_shared(urls); + socket::srt_group* s = static_cast(sock.get()); + const bool accept = s->mode() == socket::srt_group::LISTENER; if (accept) + { + s->set_listen_callback(srt_gen_listen_callback, nullptr); s->listen(); + } + else + { + s->set_connect_callback(srt_gen_connect_callback, nullptr); + } connection = accept ? s->accept() : s->connect(); } @@ -146,14 +195,14 @@ void xtransmit::generate::run(const string& dst_url, const config& cfg, const at } while (cfg.reconnect && !force_break); } -CLI::App* xtransmit::generate::add_subcommand(CLI::App& app, config& cfg, string& dst_url) +CLI::App* xtransmit::generate::add_subcommand(CLI::App& app, config& cfg, vector& dst_urls) { const map to_bps{{"kbps", 1000}, {"Mbps", 1000000}, {"Gbps", 1000000000}}; const map to_ms{{"s", 1000}, {"ms", 1}}; const map to_sec{{"s", 1}, {"min", 60}, {"mins", 60}}; CLI::App* sc_generate = app.add_subcommand("generate", "Send generated data (SRT, UDP)")->fallthrough(); - sc_generate->add_option("dst", dst_url, "Destination URI"); + sc_generate->add_option("dst", dst_urls, "Destination URI")->expected(1, 10); sc_generate->add_option("--msgsize", cfg.message_size, "Size of a message to send"); sc_generate->add_option("--sendrate", cfg.sendrate, "Bitrate to generate") ->transform(CLI::AsNumberWithUnit(to_bps, CLI::AsNumberWithUnit::CASE_SENSITIVE)); diff --git a/xtransmit/generate.hpp b/xtransmit/generate.hpp index 4d3a4be..fbd0031 100644 --- a/xtransmit/generate.hpp +++ b/xtransmit/generate.hpp @@ -24,8 +24,9 @@ struct config std::string playback_csv; }; -void run(const std::string& dst_url, const config& cfg, const std::atomic_bool& force_break); +void run(const std::vector& dst_urls, const config& cfg, const std::atomic_bool& force_break); + +CLI::App* add_subcommand(CLI::App& app, config& cfg, std::vector& dst_urls); -CLI::App* add_subcommand(CLI::App& app, config& cfg, std::string& dst_url); } // namespace generate } // namespace xtransmit diff --git a/xtransmit/receive.cpp b/xtransmit/receive.cpp index bb39c11..f370dea 100644 --- a/xtransmit/receive.cpp +++ b/xtransmit/receive.cpp @@ -13,6 +13,7 @@ // xtransmit #include "socket_stats.hpp" #include "srt_socket.hpp" +#include "srt_socket_group.hpp" #include "udp_socket.hpp" #include "receive.hpp" #include "metrics.hpp" @@ -137,9 +138,39 @@ void run_pipe(shared_sock src, const config &cfg, const atomic_bool &force_break } } -void xtransmit::receive::run(const string &src_url, const config &cfg, const atomic_bool &force_break) +int srt_listen_callback(void* opaq, SRTSOCKET sock, int hsversion, + const struct sockaddr* peeraddr, const char* streamid) { - const UriParser uri(src_url); + spdlog::trace(LOG_SC_RECEIVE "Accepted member socket 0x{:X}.", sock); + return 0; +} + +void srt_connect_callback(void* opaq, SRTSOCKET sock, int error, const sockaddr* /*peer*/, int token) +{ + if (error != SRT_SUCCESS) + { + spdlog::warn(LOG_SC_RECEIVE "Member socket 0x{:X} (token {}) connection failed: ({}) {}.", sock, token, error, + srt_strerror(error, 0)); + return; + } + + // After SRT v1.4.2 connection callback is no longer called on connection success. + spdlog::trace(LOG_SC_RECEIVE "Member socket connected 0x{:X} (token {}).", sock, token); +} + +void xtransmit::receive::run(const vector &src_urls, const config &cfg, const atomic_bool &force_break) +{ + if (src_urls.empty()) + { + spdlog::error(LOG_SC_RECEIVE "No destination URI was provided"); + return; + } + + vector urls; + for (const string& url : src_urls) + { + urls.emplace_back(url); + } shared_sock sock; shared_sock conn; @@ -164,17 +195,34 @@ void xtransmit::receive::run(const string &src_url, const config &cfg, const ato do { try { - if (uri.proto() == "udp") + if (urls.size() == 1) { - conn = make_shared(uri); + if (urls[0].proto() == "udp") + { + conn = make_shared(urls[0]); + } + else + { + sock = make_shared(urls[0]); + socket::srt* s = static_cast(sock.get()); + const bool accept = s->mode() == socket::srt::LISTENER; + if (accept) + s->listen(); + conn = accept ? s->accept() : s->connect(); + } } else { - sock = make_shared(uri); - socket::srt* s = static_cast(sock.get()); - const bool accept = s->mode() == socket::srt::LISTENER; - if (accept) + sock = make_shared(urls); + socket::srt_group* s = static_cast(sock.get()); + const bool accept = s->mode() == socket::srt_group::LISTENER; + if (accept) { + s->set_listen_callback(&srt_listen_callback, nullptr); s->listen(); + } + else { + s->set_connect_callback(&srt_connect_callback, nullptr); + } conn = accept ? s->accept() : s->connect(); } @@ -191,12 +239,12 @@ void xtransmit::receive::run(const string &src_url, const config &cfg, const ato } while (cfg.reconnect && !force_break); } -CLI::App* xtransmit::receive::add_subcommand(CLI::App& app, config& cfg, string& src_url) +CLI::App* xtransmit::receive::add_subcommand(CLI::App& app, config& cfg, vector& src_urls) { const map to_ms{ {"s", 1000}, {"ms", 1} }; CLI::App* sc_receive = app.add_subcommand("receive", "Receive data (SRT, UDP)")->fallthrough(); - sc_receive->add_option("src", src_url, "Source URI"); + sc_receive->add_option("--input,-i,src", src_urls, "Source URI"); sc_receive->add_option("--msgsize", cfg.message_size, "Size of a buffer to receive message payload"); sc_receive->add_option("--statsfile", cfg.stats_file, "output stats report filename"); sc_receive->add_option("--statsfreq", cfg.stats_freq_ms, "output stats report frequency (ms)") @@ -208,6 +256,7 @@ CLI::App* xtransmit::receive::add_subcommand(CLI::App& app, config& cfg, string& sc_receive->add_option("--metricsfreq", cfg.metrics_freq_ms, "Metrics report frequency") ->transform(CLI::AsNumberWithUnit(to_ms, CLI::AsNumberWithUnit::CASE_SENSITIVE)); sc_receive->add_flag("--twoway", cfg.send_reply, "Both send and receive data"); + sc_receive->add_option("--input-group", cfg.inputs, "More input group URLs for SRT bonding"); return sc_receive; } diff --git a/xtransmit/receive.hpp b/xtransmit/receive.hpp index d95b54e..9c3dc2b 100644 --- a/xtransmit/receive.hpp +++ b/xtransmit/receive.hpp @@ -1,6 +1,7 @@ #pragma once #include #include +#include // Third party libraries #include "CLI/CLI.hpp" @@ -21,14 +22,15 @@ namespace xtransmit { int message_size = 1316; int stats_freq_ms = 0; std::string stats_file; + std::vector inputs; }; - void run(const std::string& url, const config& cfg, + void run(const std::vector& urls, const config& cfg, const std::atomic_bool& force_break); - CLI::App* add_subcommand(CLI::App& app, config& cfg, std::string& src_url); + CLI::App* add_subcommand(CLI::App& app, config& cfg, std::vector& src_urls); } // namespace receive diff --git a/xtransmit/route.cpp b/xtransmit/route.cpp index efd8151..2c84c39 100644 --- a/xtransmit/route.cpp +++ b/xtransmit/route.cpp @@ -95,6 +95,12 @@ namespace route void xtransmit::route::run(const string& src_url, const string& dst_url, const config& cfg, const atomic_bool& force_break) { + if (src_url.empty() || dst_url.empty()) + { + spdlog::error(LOG_SC_ROUTE "Empty source/destination was provided"); + return; + } + try { const bool write_stats = cfg.stats_file != "" && cfg.stats_freq_ms > 0; // make_unique is not supported by GCC 4.8, only starting from GCC 4.9 :( @@ -125,13 +131,13 @@ void xtransmit::route::run(const string& src_url, const string& dst_url, } } -CLI::App* xtransmit::route::add_subcommand(CLI::App& app, config& cfg, string& src_url, string& dst_url) +CLI::App* xtransmit::route::add_subcommand(CLI::App& app, config& cfg, vector& src_urls, vector& dst_urls) { const map to_ms{ {"s", 1000}, {"ms", 1} }; CLI::App* sc_route = app.add_subcommand("route", "Route data (SRT, UDP)")->fallthrough(); - sc_route->add_option("src", src_url, "Source URI"); - sc_route->add_option("dst", dst_url, "Destination URI"); + sc_route->add_option("-i,src", src_urls, "Source URI")->expected(1, 10); + sc_route->add_option("-o,dst", dst_urls, "Destination URI")->expected(1, 10); sc_route->add_option("--msgsize", cfg.message_size, "Size of a buffer to receive message payload"); sc_route->add_flag("--bidir", cfg.bidir, "Enable bidirectional transmission"); sc_route->add_option("--statsfile", cfg.stats_file, "output stats report filename"); diff --git a/xtransmit/route.hpp b/xtransmit/route.hpp index a82c505..cc604f4 100644 --- a/xtransmit/route.hpp +++ b/xtransmit/route.hpp @@ -22,7 +22,7 @@ namespace xtransmit { const config& cfg, const std::atomic_bool& force_break); CLI::App* add_subcommand(CLI::App& app, config& cfg, - std::string& src_url, std::string& dst_url); + std::vector& src_urls, std::vector& dst_urls); } // namespace forward diff --git a/xtransmit/srt_socket.cpp b/xtransmit/srt_socket.cpp index a582ad8..670cc6b 100644 --- a/xtransmit/srt_socket.cpp +++ b/xtransmit/srt_socket.cpp @@ -79,12 +79,12 @@ socket::srt::~srt() { if (!m_blocking_mode) { - spdlog::debug(LOG_SOCK_SRT "0x{:X} Closing. Releasing epolls", m_bind_socket); + spdlog::debug(LOG_SOCK_SRT "@{} srt::~srt. releasing epolls", m_bind_socket); if (m_epoll_connect != -1) srt_epoll_release(m_epoll_connect); srt_epoll_release(m_epoll_io); } - spdlog::debug(LOG_SOCK_SRT "0x{:X} Closing", m_bind_socket); + spdlog::debug(LOG_SOCK_SRT "0x{:X} srt::~srt: closing", m_bind_socket); srt_close(m_bind_socket); } diff --git a/xtransmit/srt_socket_group.cpp b/xtransmit/srt_socket_group.cpp new file mode 100644 index 0000000..42f5459 --- /dev/null +++ b/xtransmit/srt_socket_group.cpp @@ -0,0 +1,620 @@ +#include +#include +#include // std::ostream_iterator + +// submodules +#include "spdlog/spdlog.h" + +// xtransmit +#include "srt_socket_group.hpp" +#include "srt_socket.hpp" +#include "misc.hpp" // HAS_PUTTIME + +// srt utils +#include "verbose.hpp" +#include "socketoptions.hpp" +#include "apputil.hpp" +#include "common.h" // SRT library's SockStatusStr(..) + +using namespace std; +using namespace xtransmit; +using shared_srt_group = shared_ptr; + +#define LOG_SRT_GROUP "SOCKET::SRT_GROUP " + +SocketOption::Mode detect_srt_mode(const UriParser& uri) +{ + string modestr = "default"; + string adapter; + + const auto& options = uri.parameters(); + + if (options.count("mode")) + { + modestr = options.at("mode"); + } + + if (options.count("adapter")) + { + adapter = options.at("adapter"); + } + + return SrtInterpretMode(modestr, uri.host(), adapter); +} + +SocketOption::Mode validate_srt_group(const vector& urls) +{ + SocketOption::Mode prev_mode = SocketOption::FAILURE; + // All URLs ha + for (const auto url : urls) + { + if (url.type() != UriParser::SRT) + { + spdlog::error(LOG_SRT_GROUP "URI {} is not SRT.", url.uri()); + return SocketOption::FAILURE; + } + + const auto mode = detect_srt_mode(url); + if (mode <= SocketOption::FAILURE || mode > SocketOption::RENDEZVOUS) + { + spdlog::error(LOG_SRT_GROUP "Failed to detect SRT mode for URI {}.", url.uri()); + return SocketOption::FAILURE; + } + + if (prev_mode != SocketOption::FAILURE && mode != prev_mode) + { + spdlog::error(LOG_SRT_GROUP + "Failed to match SRT modes for provided URIs. URI {} has mode {}. Previous mode is {}", + url.uri(), + SocketOption::mode_names[mode], + SocketOption::mode_names[prev_mode]); + return SocketOption::FAILURE; + } + + prev_mode = mode; + } + + return prev_mode; +} + +socket::srt_group::srt_group(const vector& uris) +{ + // validate_srt_group(..) also checks for empty 'uris' + const connection_mode m_mode = (connection_mode)validate_srt_group(uris); + if (m_mode == FAILURE) + throw socket::exception("Group mode validation failed!"); + if (m_mode == RENDEZVOUS) + throw socket::exception("Rendezvous mode is not supported by socket groups!"); + + m_options = uris[0].parameters(); + + if (m_options.count("blocking")) + { + m_blocking_mode = !false_names.count(m_options.at("blocking")); + m_options.erase("blocking"); + } + + if (!m_blocking_mode) + { + m_epoll_connect = srt_epoll_create(); + if (m_epoll_connect == -1) + throw socket::exception(srt_getlasterror_str()); + + m_epoll_io = srt_epoll_create(); + if (m_epoll_io == -1) + throw socket::exception(srt_getlasterror_str()); + } + + // Will throw an exception if invalid options were provided. + srt::assert_options_valid(m_options); + + // Create SRT socket group + if (m_mode == LISTENER) + { + create_listeners(uris); + } + else + { + create_callers(uris); + } +} + +socket::srt_group::srt_group(srt_group& group, int group_id) + : m_bind_socket(group_id) + , m_blocking_mode(group.m_blocking_mode) +{ + if (!m_blocking_mode) + { + m_epoll_io = srt_epoll_create(); + if (m_epoll_io == -1) + throw socket::exception(srt_getlasterror_str()); + + const int io_modes = SRT_EPOLL_IN | SRT_EPOLL_OUT | SRT_EPOLL_ERR; + if (SRT_ERROR == srt_epoll_add_usock(m_epoll_io, m_bind_socket, &io_modes)) + throw socket::exception(srt_getlasterror_str()); + } +} + +socket::srt_group::~srt_group() +{ + if (!m_blocking_mode) + { + spdlog::debug(LOG_SRT_GROUP "0x{:X} Closing. Releasing epolls", m_bind_socket); + if (m_epoll_connect != -1) + srt_epoll_release(m_epoll_connect); + if (m_epoll_io != -1) + srt_epoll_release(m_epoll_io); + } + spdlog::debug(LOG_SRT_GROUP "0x{:X} Closing SRT group", m_bind_socket); + release_targets(); + srt_close(m_bind_socket); +} + +void socket::srt_group::create_listeners(const vector& src_uri) +{ + // Create listeners according to the parameters + for (size_t i = 0; i < src_uri.size(); ++i) + { + const UriParser& url = src_uri[i]; + sockaddr_any sa = CreateAddr(url.host(), url.portno()); + + SRTSOCKET s = srt_create_socket(); + if (s == SRT_INVALID_SOCK) + throw socket::exception(srt_getlasterror_str()); + + int gcon = 1; + if (SRT_SUCCESS != srt_setsockflag(s, SRTO_GROUPCONNECT, &gcon, sizeof gcon)) + throw socket::exception(srt_getlasterror_str()); + + if (SRT_SUCCESS != srt_bind(s, sa.get(), sa.size())) + throw socket::exception(srt_getlasterror_str()); + + if (SRT_SUCCESS != configure_pre(s)) + throw socket::exception(srt_getlasterror_str()); + + if (!m_blocking_mode) + { + const int modes = SRT_EPOLL_IN | SRT_EPOLL_ERR; + if (SRT_ERROR == srt_epoll_add_usock(m_epoll_connect, s, &modes)) + throw socket::exception(srt_getlasterror_str()); + + const int io_modes = SRT_EPOLL_IN | SRT_EPOLL_OUT | SRT_EPOLL_ERR; + if (SRT_ERROR == srt_epoll_add_usock(m_epoll_io, s, &io_modes)) + throw socket::exception(srt_getlasterror_str()); + } + + m_listeners.push_back(s); + } +} + +void socket::srt_group::create_callers(const vector& uris) +{ + m_bind_socket = srt_create_group(SRT_GTYPE_BROADCAST); + if (m_bind_socket == SRT_INVALID_SOCK) + raise_exception("srt_create_group"); + + for (const auto& uri : uris) + { + sockaddr_any sa; + try + { + sa = CreateAddr(uri.host(), uri.portno()); + } + catch (const std::invalid_argument& e) + { + raise_exception("connect::create_addr", e.what()); + } + + const sockaddr* bindsa = nullptr; + + const SRT_SOCKGROUPCONFIG gd = srt_prepare_endpoint(bindsa, sa.get(), sa.size()); + m_targets.push_back(gd); + } + + if (!m_blocking_mode) + { + const int modes = SRT_EPOLL_OUT | SRT_EPOLL_ERR; + if (SRT_ERROR == srt_epoll_add_usock(m_epoll_connect, m_bind_socket, &modes)) + throw socket::exception(srt_getlasterror_str()); + + const int io_modes = SRT_EPOLL_IN | SRT_EPOLL_OUT | SRT_EPOLL_ERR; + if (SRT_ERROR == srt_epoll_add_usock(m_epoll_io, m_bind_socket, &io_modes)) + throw socket::exception(srt_getlasterror_str()); + } +} + +void socket::srt_group::listen() +{ + for (const auto sockid : m_listeners) + { + if (srt_listen(sockid, 5) == SRT_ERROR) + raise_exception("listen failed with {}", srt_getlasterror_str()); + } +} + +shared_srt_group socket::srt_group::accept() +{ + // spdlog::debug(LOG_SRT_GROUP "0x{:X} (srt://{}:{:d}) {} Waiting for incoming connection", + // m_bind_socket, m_host, m_port, m_blocking_mode ? "SYNC" : "ASYNC"); + + SRTSOCKET accepted_sock = SRT_INVALID_SOCK; + // Wait for REAL connected state if nonblocking mode + if (!m_blocking_mode) + { + std::this_thread::sleep_for(3s); + // Socket readiness for connection is checked by polling on READ allowed sockets in case of listeners. + // In the group connection mode we wait for the first accepted connection. + constexpr int timeout_ms = -1; + int len = 2; + SRTSOCKET ready[2]; + if (srt_epoll_wait(m_epoll_connect, ready, &len, 0, 0, timeout_ms, 0, 0, 0, 0) == SRT_ERROR) + raise_exception("accept::epoll_wait"); + + spdlog::trace(LOG_SRT_GROUP "Epoll read-ready sock 0x{:X}, 0x{:X}", ready[0], len > 1 ? ready[1] : 0); + + sockaddr_in scl; + int sclen = sizeof scl; + const SRTSOCKET lstnr_sock = ready[0]; + accepted_sock = srt_accept(lstnr_sock, (sockaddr*)&scl, &sclen); + if (accepted_sock == SRT_INVALID_SOCK) + raise_exception("accept", ready[1]); + } + else + { + accepted_sock = srt_accept_bond(m_listeners.data(), m_listeners.size(), -1); + if (accepted_sock == SRT_INVALID_SOCK) + raise_exception("accept_bond failed with {}", srt_getlasterror_str()); + } + + spdlog::info(LOG_SRT_GROUP "Accepted connection sock 0x{:X}", accepted_sock); + const int res = configure_post(accepted_sock); + if (res == SRT_ERROR) + raise_exception("accept::configure_post"); + + return make_shared(*this, accepted_sock); +} + +void socket::srt_group::set_listen_callback(srt_listen_callback_fn* hook_fn, void* hook_opaque) +{ + for (const auto sockid : m_listeners) + { + if (srt_listen_callback(sockid, hook_fn, hook_opaque) == SRT_ERROR) + raise_exception("listen failed with {}", srt_getlasterror_str()); + } +} + +void socket::srt_group::set_connect_callback(srt_connect_callback_fn* hook_fn, void* hook_opaque) +{ + srt_connect_callback(m_bind_socket, hook_fn, hook_opaque); +} + +void socket::srt_group::raise_exception(const string&& place, SRTSOCKET sock) const +{ + const int udt_result = srt_getlasterror(nullptr); + const string message = srt_getlasterror_str(); + spdlog::debug( + LOG_SRT_GROUP "0x{:X} {} ERROR {} {}", sock != SRT_INVALID_SOCK ? sock : m_bind_socket, place, udt_result, message); + throw socket::exception(place + ": " + message); +} + +void socket::srt_group::raise_exception(const string&& place, const string&& reason) const +{ + spdlog::debug(LOG_SRT_GROUP "0x{:X} {} ERROR {}", m_bind_socket, place, reason); + throw socket::exception(place + ": " + reason); +} + +void socket::srt_group::release_targets() +{ + for (auto& gd : m_targets) + srt_delete_config(gd.config); + m_targets.clear(); +} + +shared_srt_group socket::srt_group::connect() +{ + spdlog::debug( + LOG_SRT_GROUP "0x{:X} {} Connecting group to remote srt.", m_bind_socket, m_blocking_mode ? "SYNC" : "ASYNC"); + + if (!m_blocking_mode) + { + for (auto target : m_targets) + { + sockaddr_any target_addr(target.peeraddr); + const int st = srt_connect(m_bind_socket, target_addr.get(), target_addr.size()); + if (st == SRT_ERROR) + raise_exception("srt_group::connect_member"); + } + + // In case of a caller a connection event triggers write-readiness. + int len = 2; + SRTSOCKET ready[2]; + if (srt_epoll_wait(m_epoll_connect, 0, 0, ready, &len, -1, 0, 0, 0, 0) != -1) + { + const SRT_SOCKSTATUS state = srt_getsockstate(m_bind_socket); + if (state != SRTS_CONNECTED) + { + const int reason = srt_getrejectreason(m_bind_socket); + raise_exception("connect failed", srt_rejectreason_str(reason)); + } + } + else + { + raise_exception("srt_group::connect.epoll_wait"); + } + } + else + { + const int st = srt_connect_group(m_bind_socket, m_targets.data(), m_targets.size()); + if (st == SRT_ERROR) + raise_exception("srt_group::connect"); + } + + spdlog::debug( + LOG_SRT_GROUP "0x{:X} {} Group member connected to remote", m_bind_socket, m_blocking_mode ? "SYNC" : "ASYNC"); + release_targets(); + return shared_from_this(); +} + +int socket::srt_group::configure_pre(SRTSOCKET sock) +{ + int maybe = m_blocking_mode ? 1 : 0; + const int result = srt_setsockopt(sock, 0, SRTO_RCVSYN, &maybe, sizeof maybe); + if (result == -1) + return result; + + // host is only checked for emptiness and depending on that the connection mode is selected. + // Here we are not exactly interested with that information. + std::vector failures; + + // NOTE: here host = "", so the 'connmode' will be returned as LISTENER always, + // but it doesn't matter here. We don't use 'connmode' for anything else than + // checking for failures. + SocketOption::Mode conmode = SrtConfigurePre(sock, m_host, m_options, &failures); + + if (conmode == SocketOption::FAILURE) + { + if (Verbose::on) + { + Verb() << "WARNING: failed to set options: "; + copy(failures.begin(), failures.end(), ostream_iterator(*Verbose::cverb, ", ")); + Verb(); + } + + return SRT_ERROR; + } + + m_mode = static_cast(conmode); + + return SRT_SUCCESS; +} + +int socket::srt_group::configure_post(SRTSOCKET sock) +{ + int is_blocking = m_blocking_mode ? 1 : 0; + + int result = srt_setsockopt(sock, 0, SRTO_SNDSYN, &is_blocking, sizeof is_blocking); + if (result == -1) + return result; + result = srt_setsockopt(sock, 0, SRTO_RCVSYN, &is_blocking, sizeof is_blocking); + if (result == -1) + return result; + + // host is only checked for emptiness and depending on that the connection mode is selected. + // Here we are not exactly interested with that information. + vector failures; + + SrtConfigurePost(sock, m_options, &failures); + + if (!failures.empty()) + { + if (Verbose::on) + { + Verb() << "WARNING: failed to set options: "; + copy(failures.begin(), failures.end(), ostream_iterator(*Verbose::cverb, ", ")); + Verb(); + } + } + + return 0; +} + +size_t socket::srt_group::read(const mutable_buffer& buffer, int timeout_ms) +{ + if (!m_blocking_mode) + { + int ready[2] = {SRT_INVALID_SOCK, SRT_INVALID_SOCK}; + int len = 2; + + const int epoll_res = srt_epoll_wait(m_epoll_io, ready, &len, nullptr, nullptr, timeout_ms, 0, 0, 0, 0); + if (epoll_res == SRT_ERROR) + { + if (srt_getlasterror(nullptr) == SRT_ETIMEOUT) + return 0; + + raise_exception("read::epoll"); + } + } + + const int res = srt_recvmsg2(m_bind_socket, static_cast(buffer.data()), (int)buffer.size(), nullptr); + if (SRT_ERROR == res) + { + if (srt_getlasterror(nullptr) != SRT_EASYNCRCV) + raise_exception("read::recv"); + + spdlog::warn(LOG_SRT_GROUP "recvmsg returned error 6002: read error, try again"); + return 0; + } + + return static_cast(res); +} + +int socket::srt_group::write(const const_buffer& buffer, int timeout_ms) +{ + if (!m_blocking_mode) + { + int ready[2] = {SRT_INVALID_SOCK, SRT_INVALID_SOCK}; + int len = 2; + int rready[2] = {SRT_INVALID_SOCK, SRT_INVALID_SOCK}; + int rlen = 2; + // TODO: check error fds + const int res = srt_epoll_wait(m_epoll_io, rready, &rlen, ready, &len, timeout_ms, 0, 0, 0, 0); + if (res == SRT_ERROR) + raise_exception("write::epoll"); + } + + const int res = + srt_sendmsg2(m_bind_socket, static_cast(buffer.data()), static_cast(buffer.size()), nullptr); + if (res == SRT_ERROR) + { + if (srt_getlasterror(nullptr) == SRT_EASYNCSND) + return 0; + + raise_exception("socket::write::send", srt_getlasterror_str()); + } + + return res; +} + +socket::srt_group::connection_mode socket::srt_group::mode() const { return m_mode; } + +int socket::srt_group::statistics(SRT_TRACEBSTATS& stats, bool instant) +{ + return srt_bstats(m_bind_socket, &stats, instant); +} + +const string socket::srt_group::stats_to_csv(int socketid, const SRT_TRACEBSTATS& stats, bool print_header) +{ + std::ostringstream output; + +#define HAS_PKT_REORDER_TOL (SRT_VERSION_MAJOR >= 1) && (SRT_VERSION_MINOR >= 4) && (SRT_VERSION_PATCH > 0) +// pktSentUnique, pktRecvUnique were added in SRT v1.4.2 +#define HAS_UNIQUE_PKTS \ + (SRT_VERSION_MAJOR == 1) && ((SRT_VERSION_MINOR > 4) || ((SRT_VERSION_MINOR == 4) && (SRT_VERSION_PATCH >= 2))) + + if (print_header) + { +#ifdef HAS_PUT_TIME + output << "Timepoint,"; +#endif + output << "Time,SocketID,pktFlowWindow,pktCongestionWindow,pktFlightSize,"; + output << "msRTT,mbpsBandwidth,mbpsMaxBW,pktSent,pktSndLoss,pktSndDrop,"; + output << "pktRetrans,byteSent,byteAvailSndBuf,byteSndDrop,mbpsSendRate,usPktSndPeriod,msSndBuf,"; + output << "pktRecv,pktRcvLoss,pktRcvDrop,pktRcvRetrans,pktRcvBelated,"; + output << "byteRecv,byteAvailRcvBuf,byteRcvLoss,byteRcvDrop,mbpsRecvRate,msRcvBuf,msRcvTsbPdDelay"; +#if HAS_PKT_REORDER_TOL + output << ",pktReorderTolerance"; +#endif +#if HAS_UNIQUE_PKTS + output << ",pktSentUnique,pktRecvUnique"; +#endif + output << endl; + return output.str(); + } + +#ifdef HAS_PUT_TIME + output << print_timestamp_now() << ','; +#endif // HAS_PUT_TIME + + output << stats.msTimeStamp << ','; + output << socketid << ','; + output << stats.pktFlowWindow << ','; + output << stats.pktCongestionWindow << ','; + output << stats.pktFlightSize << ','; + + output << stats.msRTT << ','; + output << stats.mbpsBandwidth << ','; + output << stats.mbpsMaxBW << ','; + output << stats.pktSent << ','; + output << stats.pktSndLoss << ','; + output << stats.pktSndDrop << ','; + + output << stats.pktRetrans << ','; + output << stats.byteSent << ','; + output << stats.byteAvailSndBuf << ','; + output << stats.byteSndDrop << ','; + output << stats.mbpsSendRate << ','; + output << stats.usPktSndPeriod << ','; + output << stats.msSndBuf << ','; + + output << stats.pktRecv << ','; + output << stats.pktRcvLoss << ','; + output << stats.pktRcvDrop << ','; + output << stats.pktRcvRetrans << ','; + output << stats.pktRcvBelated << ','; + + output << stats.byteRecv << ','; + output << stats.byteAvailRcvBuf << ','; + output << stats.byteRcvLoss << ','; + output << stats.byteRcvDrop << ','; + output << stats.mbpsRecvRate << ','; + output << stats.msRcvBuf << ','; + output << stats.msRcvTsbPdDelay; + +#if HAS_PKT_REORDER_TOL + output << "," << stats.pktReorderTolerance; +#endif + +#if HAS_UNIQUE_PKTS + output << "," << stats.pktSentUnique; + output << "," << stats.pktRecvUnique; +#endif + + output << endl; + + return output.str(); + +#undef HAS_PUT_TIME +#undef HAS_UNIQUE_PKTS +} + +const string socket::srt_group::statistics_csv(bool print_header) const +{ + //SRT_ASSERT(m_bind_socket != SRT_INVALID_SOCK); + SRT_TRACEBSTATS stats; + if (SRT_ERROR == srt_bstats(m_bind_socket, &stats, true)) + raise_exception("statistics"); + + string csv_stats = stats_to_csv(m_bind_socket, stats, print_header); + + if (print_header) + return csv_stats; + + size_t group_size = 0; + if (srt_group_data(m_bind_socket, NULL, &group_size) != SRT_SUCCESS) + { + // Not throwing an exception as group stats was retrieved. + spdlog::warn(LOG_SRT_GROUP "0x{:X} statistics_csv: Failed to retrieve the number of group members", m_bind_socket); + return csv_stats; + } + + vector group_data(group_size); + const int num_members = srt_group_data(m_bind_socket, group_data.data(), &group_size); + if (num_members == SRT_ERROR) + { + // Not throwing an exception as group stats was retrieved. + spdlog::warn(LOG_SRT_GROUP "0x{:X} statistics_csv: Failed to retrieve group data", m_bind_socket); + return csv_stats; + } + + for (int i = 0; i < num_members; ++i) + { + const int id = group_data[i].id; + const SRT_SOCKSTATUS status = group_data[i].sockstate; + + if (group_data[i].sockstate != SRTS_CONNECTED) + { + spdlog::trace(LOG_SRT_GROUP "0x{:X} statistics_csv: Socket state is {}, skipping.", id, srt_logging::SockStatusStr(status)); + continue; + } + + if (SRT_ERROR == srt_bstats(id, &stats, true)) + { + spdlog::warn(LOG_SRT_GROUP "0x{:X} statistics_csv: Failed to retrieve group member stats. {}", id, srt_getlasterror_str()); + break; + } + + csv_stats += stats_to_csv(id, stats, false); + } + + return csv_stats; +} diff --git a/xtransmit/srt_socket_group.hpp b/xtransmit/srt_socket_group.hpp new file mode 100644 index 0000000..61425ac --- /dev/null +++ b/xtransmit/srt_socket_group.hpp @@ -0,0 +1,107 @@ +#pragma once +#include +#include +#include +#include +#include +#include + +// xtransmit +#include "buffer.hpp" +#include "socket.hpp" + +// OpenSRT +#include "srt.h" +#include "uriparser.hpp" + +namespace xtransmit +{ +namespace socket +{ + +class srt_group + : public std::enable_shared_from_this + , public isocket +{ + using string = std::string; + using shared_srt_group = std::shared_ptr; + +public: + explicit srt_group(const std::vector& uris); + + srt_group(srt_group& group, int group_id); + + virtual ~srt_group(); + +public: + shared_srt_group connect(); + shared_srt_group accept(); + + /** + * Start listening on the incomming connection requests. + * + * May throw a socket_exception. + */ + void listen() noexcept(false); + + void set_listen_callback(srt_listen_callback_fn* hook_fn, void* hook_opaque); + void set_connect_callback(srt_connect_callback_fn* hook_fn, void* hook_opaque); + +private: + void configure(const std::map& options); + + void identify_connection_mode(const std::vector& uris); + int configure_pre(SRTSOCKET sock); + int configure_post(SRTSOCKET sock); + void create_listeners(const std::vector& uris); + void create_callers(const std::vector& uris); + void release_targets(); + +public: + /** + * @returns The number of bytes received. + * + * @throws socket_exception Thrown on failure. + */ + size_t read(const mutable_buffer& buffer, int timeout_ms = -1) final; + int write(const const_buffer& buffer, int timeout_ms = -1) final; + + enum connection_mode + { + FAILURE = -1, + LISTENER = 0, + CALLER = 1, + RENDEZVOUS = 2 + }; + + connection_mode mode() const; + + bool is_caller() const final { return m_mode == CALLER; } + +public: + int id() const final { return m_bind_socket; } + int statistics(SRT_TRACEBSTATS& stats, bool instant = true); + bool supports_statistics() const final { return true; } + const std::string statistics_csv(bool print_header) const final; + static const std::string stats_to_csv(int socketid, const SRT_TRACEBSTATS& stats, bool print_header); + +private: + void raise_exception(const string&& place, SRTSOCKET sock = SRT_INVALID_SOCK) const; + void raise_exception(const string&& place, const string&& reason) const; + +private: + int m_bind_socket = SRT_INVALID_SOCK; + std::vector m_listeners; + std::vector m_targets; + int m_epoll_connect = -1; + int m_epoll_io = -1; + + connection_mode m_mode = FAILURE; + bool m_blocking_mode = true; + string m_host; + int m_port = -1; + std::map m_options; // All other options, as provided in the URI +}; + +} // namespace socket +} // namespace xtransmit diff --git a/xtransmit/xtransmit-app.cpp b/xtransmit/xtransmit-app.cpp index d677908..fdc789a 100644 --- a/xtransmit/xtransmit-app.cpp +++ b/xtransmit/xtransmit-app.cpp @@ -65,7 +65,7 @@ string create_srt_logfa_description() // Each group on a new line stringstream ss; - ss << "SRT log functional areas: ["; + ss << "SRT log functional areas: \n["; int en10 = 0; for (auto entry : revmap) { @@ -159,15 +159,17 @@ int main(int argc, char** argv) }); string src, dst; + vector src_urls; + vector dst_urls; generate::config cfg_generate; - CLI::App* sc_generate = generate::add_subcommand(app, cfg_generate, dst); + CLI::App* sc_generate = generate::add_subcommand(app, cfg_generate, dst_urls); xtransmit::receive::config cfg_receive; - CLI::App* sc_receive = receive::add_subcommand(app, cfg_receive, src); + CLI::App* sc_receive = receive::add_subcommand(app, cfg_receive, src_urls); xtransmit::route::config cfg_route; - CLI::App* sc_route = route::add_subcommand(app, cfg_route, src, dst); + CLI::App* sc_route = route::add_subcommand(app, cfg_route, src_urls, dst_urls); #if ENABLE_FILE_TRANSFER CLI::App* sc_file = app.add_subcommand("file", "Send/receive a single file or folder contents")->fallthrough(); @@ -189,16 +191,29 @@ int main(int argc, char** argv) // https://cliutils.gitlab.io/CLI11Tutorial/chapters/an-advanced-example.html if (sc_generate->parsed()) { - generate::run(dst, cfg_generate, force_break); + generate::run(dst_urls, cfg_generate, force_break); return 0; } else if (sc_receive->parsed()) { - xtransmit::receive::run(src, cfg_receive, force_break); + for (const auto url : src_urls) + { + cout << "URL: " << url << "\n"; + } + xtransmit::receive::run(src_urls, cfg_receive, force_break); return 0; } else if (sc_route->parsed()) { + for (const auto url : src_urls) + { + cout << "SRC URL: " << url << "\n"; + } + for (const auto url : dst_urls) + { + cout << "DST URL: " << url << "\n"; + } + xtransmit::route::run(src, dst, cfg_route, force_break); return 0; } From 5bee0d056836307544e5e5948d4407adf87473c7 Mon Sep 17 00:00:00 2001 From: Maxim Sharabayko Date: Fri, 4 Dec 2020 13:31:52 +0100 Subject: [PATCH 02/24] Removed this_thread::slepp before accepting --- xtransmit/srt_socket_group.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/xtransmit/srt_socket_group.cpp b/xtransmit/srt_socket_group.cpp index 42f5459..3b6348c 100644 --- a/xtransmit/srt_socket_group.cpp +++ b/xtransmit/srt_socket_group.cpp @@ -241,7 +241,6 @@ shared_srt_group socket::srt_group::accept() // Wait for REAL connected state if nonblocking mode if (!m_blocking_mode) { - std::this_thread::sleep_for(3s); // Socket readiness for connection is checked by polling on READ allowed sockets in case of listeners. // In the group connection mode we wait for the first accepted connection. constexpr int timeout_ms = -1; From 0f03f4ff487a0c2116ff1db8ee4e04d73d22a45f Mon Sep 17 00:00:00 2001 From: Maxim Sharabayko Date: Fri, 4 Dec 2020 17:00:44 +0100 Subject: [PATCH 03/24] Fixed closing listening sockets of a group --- xtransmit/receive.cpp | 2 +- xtransmit/srt_socket_group.cpp | 12 ++++++++++++ xtransmit/srt_socket_group.hpp | 1 + 3 files changed, 14 insertions(+), 1 deletion(-) diff --git a/xtransmit/receive.cpp b/xtransmit/receive.cpp index f370dea..9af3826 100644 --- a/xtransmit/receive.cpp +++ b/xtransmit/receive.cpp @@ -139,7 +139,7 @@ void run_pipe(shared_sock src, const config &cfg, const atomic_bool &force_break } int srt_listen_callback(void* opaq, SRTSOCKET sock, int hsversion, - const struct sockaddr* peeraddr, const char* streamid) + const struct sockaddr* peeraddr, const char* streamid) { spdlog::trace(LOG_SC_RECEIVE "Accepted member socket 0x{:X}.", sock); return 0; diff --git a/xtransmit/srt_socket_group.cpp b/xtransmit/srt_socket_group.cpp index 3b6348c..5c929bd 100644 --- a/xtransmit/srt_socket_group.cpp +++ b/xtransmit/srt_socket_group.cpp @@ -147,6 +147,7 @@ socket::srt_group::~srt_group() } spdlog::debug(LOG_SRT_GROUP "0x{:X} Closing SRT group", m_bind_socket); release_targets(); + release_listeners(); srt_close(m_bind_socket); } @@ -183,6 +184,8 @@ void socket::srt_group::create_listeners(const vector& src_uri) throw socket::exception(srt_getlasterror_str()); } + spdlog::trace(LOG_SRT_GROUP "Created listener 0x{:X} on {}:{}", s, url.host(), url.portno()); + m_listeners.push_back(s); } } @@ -309,6 +312,15 @@ void socket::srt_group::release_targets() m_targets.clear(); } +void socket::srt_group::release_listeners() +{ + for (auto sock : m_listeners) { + spdlog::trace(LOG_SRT_GROUP "Closing listener 0x{:X}", sock); + srt_close(sock); + } + m_listeners.clear(); +} + shared_srt_group socket::srt_group::connect() { spdlog::debug( diff --git a/xtransmit/srt_socket_group.hpp b/xtransmit/srt_socket_group.hpp index 61425ac..06077b9 100644 --- a/xtransmit/srt_socket_group.hpp +++ b/xtransmit/srt_socket_group.hpp @@ -56,6 +56,7 @@ class srt_group void create_listeners(const std::vector& uris); void create_callers(const std::vector& uris); void release_targets(); + void release_listeners(); public: /** From f4fe1d18ee8647074a131021486a4f00648e1802 Mon Sep 17 00:00:00 2001 From: Maxim Sharabayko Date: Fri, 4 Dec 2020 17:27:18 +0100 Subject: [PATCH 04/24] Close old sockets before reconnecting --- xtransmit/generate.cpp | 8 ++++---- xtransmit/receive.cpp | 6 +++--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/xtransmit/generate.cpp b/xtransmit/generate.cpp index ffd5527..d1aba18 100644 --- a/xtransmit/generate.cpp +++ b/xtransmit/generate.cpp @@ -35,7 +35,7 @@ using shared_sock = std::shared_ptr; #define LOG_SC_GENERATE "GENERATE " int srt_gen_listen_callback(void* opaq, SRTSOCKET sock, int hsversion, - const struct sockaddr* peeraddr, const char* streamid) + const struct sockaddr* peeraddr, const char* streamid) { spdlog::trace(LOG_SC_GENERATE "Accepted member socket 0x{:X}.", sock); return 0; @@ -128,9 +128,6 @@ void xtransmit::generate::run(const vector& dst_urls, const config& cfg, urls.emplace_back(UriParser(url)); } - shared_sock sock; - shared_sock connection; - const bool write_stats = cfg.stats_file != "" && cfg.stats_freq_ms > 0; // make_unique is not supported by GCC 4.8, only starting from GCC 4.9 :( unique_ptr stats; @@ -151,6 +148,9 @@ void xtransmit::generate::run(const vector& dst_urls, const config& cfg, do { try { + shared_sock sock; + shared_sock connection; + if (urls.size() == 1) { if (urls[0].proto() == "udp") diff --git a/xtransmit/receive.cpp b/xtransmit/receive.cpp index 9af3826..f7728c4 100644 --- a/xtransmit/receive.cpp +++ b/xtransmit/receive.cpp @@ -172,9 +172,6 @@ void xtransmit::receive::run(const vector &src_urls, const config &cfg, urls.emplace_back(url); } - shared_sock sock; - shared_sock conn; - unique_ptr stats; const bool write_stats = cfg.stats_file != "" && cfg.stats_freq_ms > 0; @@ -195,6 +192,9 @@ void xtransmit::receive::run(const vector &src_urls, const config &cfg, do { try { + shared_sock sock; + shared_sock conn; + if (urls.size() == 1) { if (urls[0].proto() == "udp") From a17dd84bdb8f0fbbd0a2afa18b8ccb3d6e83d0b2 Mon Sep 17 00:00:00 2001 From: Maxim Sharabayko Date: Mon, 7 Dec 2020 19:26:35 +0100 Subject: [PATCH 05/24] working on member reconnect --- xtransmit/generate.cpp | 4 ++++ xtransmit/receive.cpp | 2 ++ xtransmit/srt_socket_group.cpp | 38 ++++++++++++++++++++++++++++++++-- xtransmit/srt_socket_group.hpp | 3 +++ 4 files changed, 45 insertions(+), 2 deletions(-) diff --git a/xtransmit/generate.cpp b/xtransmit/generate.cpp index d1aba18..69decf2 100644 --- a/xtransmit/generate.cpp +++ b/xtransmit/generate.cpp @@ -187,10 +187,14 @@ void xtransmit::generate::run(const vector& dst_urls, const config& cfg, if (stats) stats->add_socket(connection); run_pipe(connection, cfg, force_break); + if (stats && cfg.reconnect) + stats->clear(); } catch (const socket::exception& e) { spdlog::warn(LOG_SC_GENERATE "{}", e.what()); + if (stats) + stats->clear(); } } while (cfg.reconnect && !force_break); } diff --git a/xtransmit/receive.cpp b/xtransmit/receive.cpp index f7728c4..bb47c28 100644 --- a/xtransmit/receive.cpp +++ b/xtransmit/receive.cpp @@ -235,6 +235,8 @@ void xtransmit::receive::run(const vector &src_urls, const config &cfg, catch (const socket::exception & e) { spdlog::warn(LOG_SC_RECEIVE "{}", e.what()); + if (stats) + stats->clear(); } } while (cfg.reconnect && !force_break); } diff --git a/xtransmit/srt_socket_group.cpp b/xtransmit/srt_socket_group.cpp index 5c929bd..039721a 100644 --- a/xtransmit/srt_socket_group.cpp +++ b/xtransmit/srt_socket_group.cpp @@ -284,10 +284,44 @@ void socket::srt_group::set_listen_callback(srt_listen_callback_fn* hook_fn, voi raise_exception("listen failed with {}", srt_getlasterror_str()); } } +// for (auto target : m_targets) + +void socket::srt_group::connect_callback_fn(void* opaq, SRTSOCKET sock, int error, const sockaddr* peer, int token) +{ + if (opaq == nullptr) + { + spdlog::warn(LOG_SRT_GROUP "connect_callback_fn does not have a pointer to the group"); + return; + } + + // TODO: this group may no longer exist. Use some global array to track valid groups. + socket::srt_group* group = reinterpret_cast(opaq); + + group->on_connect_callback(sock, error, peer, token); +} + +void socket::srt_group::on_connect_callback(SRTSOCKET sock, int error, const sockaddr* /*peer*/, int token) +{ + if (error == SRT_SUCCESS) + { + // After SRT v1.4.2 connection callback is no longer called on connection success. + spdlog::trace(LOG_SRT_GROUP "Member socket connected 0x{:X} (token {}).", sock, token); + return; + } + + spdlog::warn(LOG_SRT_GROUP "Member socket 0x{:X} (token {}) connection failed: ({}) {}.", sock, token, error, + srt_strerror(error, 0)); + + // TODO: schedule reconnection. + + return; + +} void socket::srt_group::set_connect_callback(srt_connect_callback_fn* hook_fn, void* hook_opaque) { - srt_connect_callback(m_bind_socket, hook_fn, hook_opaque); + //srt_connect_callback(m_bind_socket, hook_fn, hook_opaque); + srt_connect_callback(m_bind_socket, connect_callback_fn, (void*) this); } void socket::srt_group::raise_exception(const string&& place, SRTSOCKET sock) const @@ -324,7 +358,7 @@ void socket::srt_group::release_listeners() shared_srt_group socket::srt_group::connect() { spdlog::debug( - LOG_SRT_GROUP "0x{:X} {} Connecting group to remote srt.", m_bind_socket, m_blocking_mode ? "SYNC" : "ASYNC"); + LOG_SRT_GROUP "0x{:X} {} Connecting group to remote SRT", m_bind_socket, m_blocking_mode ? "SYNC" : "ASYNC"); if (!m_blocking_mode) { diff --git a/xtransmit/srt_socket_group.hpp b/xtransmit/srt_socket_group.hpp index 06077b9..3114447 100644 --- a/xtransmit/srt_socket_group.hpp +++ b/xtransmit/srt_socket_group.hpp @@ -58,6 +58,9 @@ class srt_group void release_targets(); void release_listeners(); + void on_connect_callback(SRTSOCKET sock, int error, const sockaddr*, int token); + static void connect_callback_fn(void* opaq, SRTSOCKET sock, int error, const sockaddr* peer, int token); + public: /** * @returns The number of bytes received. From cc84aa10e2b29029b93f2d97fb1a4f9fab79e20a Mon Sep 17 00:00:00 2001 From: Maxim Sharabayko Date: Tue, 8 Dec 2020 19:07:45 +0100 Subject: [PATCH 06/24] Scheduling member link reconnection --- xtransmit/scheduler.hpp | 128 +++++++++++++++++++++++++++++++++ xtransmit/srt_socket_group.cpp | 35 +++++++-- xtransmit/srt_socket_group.hpp | 3 + 3 files changed, 161 insertions(+), 5 deletions(-) create mode 100644 xtransmit/scheduler.hpp diff --git a/xtransmit/scheduler.hpp b/xtransmit/scheduler.hpp new file mode 100644 index 0000000..3e51a11 --- /dev/null +++ b/xtransmit/scheduler.hpp @@ -0,0 +1,128 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace xtransmit +{ +using namespace std; +using namespace std::chrono; + +class task +{ +public: + explicit task(std::function&& f) + : f(std::move(f)) + { + } + + function f; +}; + +class scheduler +{ +public: + explicit scheduler(unsigned int max_n_tasks = 4) + : done_(false) + , thread_(&scheduler::timer_loop, this) + { + } + + scheduler(const scheduler&) = delete; + + scheduler(scheduler&&) noexcept = delete; + + scheduler& operator=(const scheduler&) = delete; + + scheduler& operator=(scheduler&&) noexcept = delete; + + ~scheduler() + { + done_ = true; + sync_.cv.notify_one(); + if (thread_.joinable()) + thread_.join(); + } + + template + void schedule_on(const steady_clock::time_point time, Callable&& f, Args&&... args) + { + shared_ptr t = + make_shared(bind(forward(f), forward(args)...)); + add_task(time, move(t)); + } + + template + void schedule_in(const steady_clock::duration time, Callable&& f, Args&&... args) + { + schedule_on(steady_clock::now() + time, forward(f), forward(args)...); + } + +private: + atomic done_; + struct + { + mutex mtx; + condition_variable cv; + } sync_; + + multimap> tasks_; + mutex lock_; + thread thread_; + + void timer_loop() + { + while (!this->done_) + { + this->manage_tasks(); + + if (this->tasks_.empty()) + { + unique_lock lock(sync_.mtx); + sync_.cv.wait(lock); + } + else + { + const auto time_of_first_task = (*tasks_.begin()).first; + unique_lock lock(sync_.mtx); + sync_.cv.wait_until(lock, time_of_first_task); + } + } + } + + + void add_task(const steady_clock::time_point time, shared_ptr t) + { + lock_guard l(lock_); + tasks_.emplace(time, move(t)); + sync_.cv.notify_one(); + } + + void manage_tasks() + { + lock_guard l(lock_); + + auto end_of_tasks_to_run = tasks_.upper_bound(steady_clock::now()); + if (end_of_tasks_to_run != tasks_.begin()) + { + // for all tasks_ that have been triggered + for (auto i = tasks_.begin(); i != end_of_tasks_to_run; ++i) + { + auto& task = (*i).second; + task->f(); + } + + // remove the completed tasks_ + tasks_.erase(tasks_.begin(), end_of_tasks_to_run); + } + } +}; + +} // namespace xtransmit diff --git a/xtransmit/srt_socket_group.cpp b/xtransmit/srt_socket_group.cpp index 039721a..ce43c64 100644 --- a/xtransmit/srt_socket_group.cpp +++ b/xtransmit/srt_socket_group.cpp @@ -312,7 +312,26 @@ void socket::srt_group::on_connect_callback(SRTSOCKET sock, int error, const soc spdlog::warn(LOG_SRT_GROUP "Member socket 0x{:X} (token {}) connection failed: ({}) {}.", sock, token, error, srt_strerror(error, 0)); - // TODO: schedule reconnection. + bool reconn_scheduled = false; + for (auto target : m_targets) + { + if (target.token != token) + continue; + + auto connfn = [](SRTSOCKET group, SRT_SOCKGROUPCONFIG target) { + spdlog::trace(LOG_SRT_GROUP "0x{:X}: Reconnecting member socket (token {})", group, target.token); + const int st = srt_connect_group(group, &target, 1); + if (st == SRT_ERROR) + spdlog::warn(LOG_SRT_GROUP "0x{:X}: Member reconnection failed (token {})", group, target.token); + }; + + spdlog::trace(LOG_SRT_GROUP "0x{:X}: Scheduling member reconnection (token {})", m_bind_socket, token); + reconn_scheduled = true; + m_scheduler.schedule_in(1s, connfn, m_bind_socket, target); + } + + if (!reconn_scheduled) + spdlog::warn(LOG_SRT_GROUP "0x{:X}: Could not schedule member reconnection (token {})", m_bind_socket, token); return; @@ -360,8 +379,13 @@ shared_srt_group socket::srt_group::connect() spdlog::debug( LOG_SRT_GROUP "0x{:X} {} Connecting group to remote SRT", m_bind_socket, m_blocking_mode ? "SYNC" : "ASYNC"); - if (!m_blocking_mode) + if (!m_blocking_mode && false) { + // This branch does not assign a token to the target + // therefiore it is not possible to schedule a reconnection. + // srt_connect_group is to be used instead in both blocking and non-blocking modes. + spdlog::debug( + LOG_SRT_GROUP "non blocking"); for (auto target : m_targets) { sockaddr_any target_addr(target.peeraddr); @@ -375,8 +399,7 @@ shared_srt_group socket::srt_group::connect() SRTSOCKET ready[2]; if (srt_epoll_wait(m_epoll_connect, 0, 0, ready, &len, -1, 0, 0, 0, 0) != -1) { - const SRT_SOCKSTATUS state = srt_getsockstate(m_bind_socket); - if (state != SRTS_CONNECTED) + if (srt_getsockstate(m_bind_socket) != SRTS_CONNECTED) { const int reason = srt_getrejectreason(m_bind_socket); raise_exception("connect failed", srt_rejectreason_str(reason)); @@ -389,6 +412,8 @@ shared_srt_group socket::srt_group::connect() } else { + spdlog::debug( + LOG_SRT_GROUP "srt_connect_group"); const int st = srt_connect_group(m_bind_socket, m_targets.data(), m_targets.size()); if (st == SRT_ERROR) raise_exception("srt_group::connect"); @@ -396,7 +421,7 @@ shared_srt_group socket::srt_group::connect() spdlog::debug( LOG_SRT_GROUP "0x{:X} {} Group member connected to remote", m_bind_socket, m_blocking_mode ? "SYNC" : "ASYNC"); - release_targets(); + return shared_from_this(); } diff --git a/xtransmit/srt_socket_group.hpp b/xtransmit/srt_socket_group.hpp index 3114447..a60112e 100644 --- a/xtransmit/srt_socket_group.hpp +++ b/xtransmit/srt_socket_group.hpp @@ -9,6 +9,7 @@ // xtransmit #include "buffer.hpp" #include "socket.hpp" +#include "scheduler.hpp" // OpenSRT #include "srt.h" @@ -105,6 +106,8 @@ class srt_group string m_host; int m_port = -1; std::map m_options; // All other options, as provided in the URI + + scheduler m_scheduler; }; } // namespace socket From 74e90286c953b36e715095df3a5bea0579091247 Mon Sep 17 00:00:00 2001 From: Maxim Sharabayko Date: Wed, 9 Dec 2020 19:05:44 +0100 Subject: [PATCH 07/24] Adding backup mode TODO: detect grouptype and weight URI query --- xtransmit/srt_socket_group.cpp | 60 ++++++++++++++++++++++++++++++++-- xtransmit/srt_socket_group.hpp | 2 +- 2 files changed, 58 insertions(+), 4 deletions(-) diff --git a/xtransmit/srt_socket_group.cpp b/xtransmit/srt_socket_group.cpp index ce43c64..7604ef3 100644 --- a/xtransmit/srt_socket_group.cpp +++ b/xtransmit/srt_socket_group.cpp @@ -42,6 +42,53 @@ SocketOption::Mode detect_srt_mode(const UriParser& uri) return SrtInterpretMode(modestr, uri.host(), adapter); } +static SRT_GROUP_TYPE detect_group_type(const UriParser& uri) +{ + const auto& options = uri.parameters(); + const string key("grouptype"); + + if (!options.count(key)) + return SRT_GTYPE_BROADCAST; + + const string gmode = options.at(key); + if (gmode == "broadcast") + return SRT_GTYPE_BROADCAST; + + if (gmode == "backup") + return SRT_GTYPE_BACKUP; + + throw socket::exception(LOG_SRT_GROUP ": Failed to detect group mode. Value provided: " + gmode); +} + +static int detect_link_weight(UriParser& uri) +{ + auto& options = uri.parameters(); + const string key("weight"); + + if (!options.count(key)) + return 0; + + const string weight_str = options.at(key); + int weight = 0; + try { + weight = std::stoi(weight_str); + } + catch (std::invalid_argument const &) + { + throw socket::exception(LOG_SRT_GROUP ": Bad input. weight=" + weight_str); + } + catch (std::out_of_range const &e) + { + throw socket::exception(LOG_SRT_GROUP ": Integer overflow. weight=" + weight_str); + } + + // the allowed value for weight is between 0 and 32767 + if (weight < 0 || weight >32767) + throw socket::exception(LOG_SRT_GROUP ": Wrong link weight provided. The allowed value is between 0 and 32767."); + + return weight; +} + SocketOption::Mode validate_srt_group(const vector& urls) { SocketOption::Mode prev_mode = SocketOption::FAILURE; @@ -77,6 +124,9 @@ SocketOption::Mode validate_srt_group(const vector& urls) return prev_mode; } +// TODO: m_options per socket: +// - m_opts_group +// - m_opts_link[n] socket::srt_group::srt_group(const vector& uris) { // validate_srt_group(..) also checks for empty 'uris' @@ -86,6 +136,7 @@ socket::srt_group::srt_group(const vector& uris) if (m_mode == RENDEZVOUS) throw socket::exception("Rendezvous mode is not supported by socket groups!"); + const SRT_GROUP_TYPE gtype = detect_group_type(uris[0]); m_options = uris[0].parameters(); if (m_options.count("blocking")) @@ -111,11 +162,14 @@ socket::srt_group::srt_group(const vector& uris) // Create SRT socket group if (m_mode == LISTENER) { + spdlog::error(LOG_SRT_GROUP "Creating a group of listeners."); create_listeners(uris); } else { - create_callers(uris); + const char* gtype_str = (gtype == SRT_GTYPE_BACKUP) ? "main/backup" : "broadcast"; + spdlog::trace(LOG_SRT_GROUP "Creating a group of callers (type {}).", gtype_str); + create_callers(uris, gtype); } } @@ -190,9 +244,9 @@ void socket::srt_group::create_listeners(const vector& src_uri) } } -void socket::srt_group::create_callers(const vector& uris) +void socket::srt_group::create_callers(const vector& uris, SRT_GROUP_TYPE gtype) { - m_bind_socket = srt_create_group(SRT_GTYPE_BROADCAST); + m_bind_socket = srt_create_group(gtype); if (m_bind_socket == SRT_INVALID_SOCK) raise_exception("srt_create_group"); diff --git a/xtransmit/srt_socket_group.hpp b/xtransmit/srt_socket_group.hpp index a60112e..12d495c 100644 --- a/xtransmit/srt_socket_group.hpp +++ b/xtransmit/srt_socket_group.hpp @@ -55,7 +55,7 @@ class srt_group int configure_pre(SRTSOCKET sock); int configure_post(SRTSOCKET sock); void create_listeners(const std::vector& uris); - void create_callers(const std::vector& uris); + void create_callers(const std::vector& uris, SRT_GROUP_TYPE gtype); void release_targets(); void release_listeners(); From 253659243ceb84a63f91283d7d49944a4eee7be4 Mon Sep 17 00:00:00 2001 From: Maxim Sharabayko Date: Thu, 10 Dec 2020 16:51:07 +0100 Subject: [PATCH 08/24] Adding per-link options. Note. This commit makes listener failing. The state of the object is weird. --- xtransmit/generate.cpp | 4 +- xtransmit/receive.cpp | 4 +- xtransmit/route.cpp | 2 +- xtransmit/srt_socket.cpp | 6 +-- xtransmit/srt_socket.hpp | 4 +- xtransmit/srt_socket_group.cpp | 82 +++++++++++++++++++--------------- xtransmit/srt_socket_group.hpp | 14 ++++-- 7 files changed, 68 insertions(+), 48 deletions(-) diff --git a/xtransmit/generate.cpp b/xtransmit/generate.cpp index 69decf2..4954561 100644 --- a/xtransmit/generate.cpp +++ b/xtransmit/generate.cpp @@ -160,7 +160,7 @@ void xtransmit::generate::run(const vector& dst_urls, const config& cfg, else { sock = make_shared(urls[0]); - socket::srt* s = static_cast(sock.get()); + socket::srt* s = dynamic_cast(sock.get()); const bool accept = s->mode() == socket::srt::LISTENER; if (accept) s->listen(); @@ -170,7 +170,7 @@ void xtransmit::generate::run(const vector& dst_urls, const config& cfg, else { sock = make_shared(urls); - socket::srt_group* s = static_cast(sock.get()); + socket::srt_group* s = dynamic_cast(sock.get()); const bool accept = s->mode() == socket::srt_group::LISTENER; if (accept) { diff --git a/xtransmit/receive.cpp b/xtransmit/receive.cpp index bb47c28..1cfa15b 100644 --- a/xtransmit/receive.cpp +++ b/xtransmit/receive.cpp @@ -204,7 +204,7 @@ void xtransmit::receive::run(const vector &src_urls, const config &cfg, else { sock = make_shared(urls[0]); - socket::srt* s = static_cast(sock.get()); + socket::srt* s = dynamic_cast(sock.get()); const bool accept = s->mode() == socket::srt::LISTENER; if (accept) s->listen(); @@ -214,7 +214,7 @@ void xtransmit::receive::run(const vector &src_urls, const config &cfg, else { sock = make_shared(urls); - socket::srt_group* s = static_cast(sock.get()); + socket::srt_group* s = dynamic_cast(sock.get()); const bool accept = s->mode() == socket::srt_group::LISTENER; if (accept) { s->set_listen_callback(&srt_listen_callback, nullptr); diff --git a/xtransmit/route.cpp b/xtransmit/route.cpp index 2c84c39..e8629de 100644 --- a/xtransmit/route.cpp +++ b/xtransmit/route.cpp @@ -78,7 +78,7 @@ namespace route if(uri.proto() == "srt") { shared_sock socket = make_shared(uri); - socket::srt* s = static_cast(socket.get()); + socket::srt* s = dynamic_cast(socket.get()); const bool accept = s->mode() == socket::srt::LISTENER; if (accept) s->listen(); diff --git a/xtransmit/srt_socket.cpp b/xtransmit/srt_socket.cpp index 670cc6b..f2584c8 100644 --- a/xtransmit/srt_socket.cpp +++ b/xtransmit/srt_socket.cpp @@ -242,7 +242,7 @@ std::future socket::srt::async_read(std::vector &buffer) return std::future(); } -void socket::srt::assert_options_valid(const std::map& options) +void socket::srt::assert_options_valid(const map& options, const unordered_set& extra) { #ifdef ENABLE_CXX17 for (const auto& [key, val] : options) @@ -263,7 +263,7 @@ void socket::srt::assert_options_valid(const std::map& options) break; } - if (opt_found || key == "bind" || key == "mode") + if (opt_found || extra.count(key)) continue; stringstream ss; @@ -275,7 +275,7 @@ void socket::srt::assert_options_valid(const std::map& options) void socket::srt::assert_options_valid() const { - assert_options_valid(m_options); + assert_options_valid(m_options, {"bind", "mode"}); } diff --git a/xtransmit/srt_socket.hpp b/xtransmit/srt_socket.hpp index 0c7307f..2d65310 100644 --- a/xtransmit/srt_socket.hpp +++ b/xtransmit/srt_socket.hpp @@ -5,6 +5,7 @@ #include #include #include +#include // xtransmit #include "buffer.hpp" @@ -51,9 +52,10 @@ class srt /// Verifies URI options provided are valid. /// /// @param [in] options a map of options key-value pairs to validate + /// @param [in] extra a set of extra options that are valid, e.g. "mode", "bind" /// @throw socket::exception on failure /// - static void assert_options_valid(const std::map& options); + static void assert_options_valid(const std::map& options, const std::unordered_set& extra); private: void configure(const std::map& options); diff --git a/xtransmit/srt_socket_group.cpp b/xtransmit/srt_socket_group.cpp index 7604ef3..58f2426 100644 --- a/xtransmit/srt_socket_group.cpp +++ b/xtransmit/srt_socket_group.cpp @@ -42,15 +42,14 @@ SocketOption::Mode detect_srt_mode(const UriParser& uri) return SrtInterpretMode(modestr, uri.host(), adapter); } -static SRT_GROUP_TYPE detect_group_type(const UriParser& uri) +SRT_GROUP_TYPE socket::srt_group::detect_group_type(const options& opts) { - const auto& options = uri.parameters(); const string key("grouptype"); - if (!options.count(key)) + if (!opts.count(key)) return SRT_GTYPE_BROADCAST; - const string gmode = options.at(key); + const string gmode = opts.at(key); if (gmode == "broadcast") return SRT_GTYPE_BROADCAST; @@ -136,13 +135,19 @@ socket::srt_group::srt_group(const vector& uris) if (m_mode == RENDEZVOUS) throw socket::exception("Rendezvous mode is not supported by socket groups!"); - const SRT_GROUP_TYPE gtype = detect_group_type(uris[0]); - m_options = uris[0].parameters(); + for (auto uri : uris) + { + // Will throw an exception if invalid options were provided. + srt::assert_options_valid(uri.parameters(), {"bind", "mode", "weight", "grouptype"}); + m_opts_link.push_back(uri.parameters()); + } + + const SRT_GROUP_TYPE gtype = detect_group_type(m_opts_link[0]); - if (m_options.count("blocking")) + if (m_opts_link[0].count("blocking")) { - m_blocking_mode = !false_names.count(m_options.at("blocking")); - m_options.erase("blocking"); + m_blocking_mode = !false_names.count(m_opts_link[0].at("blocking")); + m_opts_link[0].erase("blocking"); } if (!m_blocking_mode) @@ -156,9 +161,6 @@ socket::srt_group::srt_group(const vector& uris) throw socket::exception(srt_getlasterror_str()); } - // Will throw an exception if invalid options were provided. - srt::assert_options_valid(m_options); - // Create SRT socket group if (m_mode == LISTENER) { @@ -176,6 +178,7 @@ socket::srt_group::srt_group(const vector& uris) socket::srt_group::srt_group(srt_group& group, int group_id) : m_bind_socket(group_id) , m_blocking_mode(group.m_blocking_mode) + , m_mode(group.m_mode) { if (!m_blocking_mode) { @@ -224,7 +227,7 @@ void socket::srt_group::create_listeners(const vector& src_uri) if (SRT_SUCCESS != srt_bind(s, sa.get(), sa.size())) throw socket::exception(srt_getlasterror_str()); - if (SRT_SUCCESS != configure_pre(s)) + if (SRT_SUCCESS != configure_pre(s, i)) throw socket::exception(srt_getlasterror_str()); if (!m_blocking_mode) @@ -323,7 +326,7 @@ shared_srt_group socket::srt_group::accept() } spdlog::info(LOG_SRT_GROUP "Accepted connection sock 0x{:X}", accepted_sock); - const int res = configure_post(accepted_sock); + const int res = configure_post(accepted_sock, 0); // TODO: are there POST options per link? if (res == SRT_ERROR) raise_exception("accept::configure_post"); @@ -479,41 +482,48 @@ shared_srt_group socket::srt_group::connect() return shared_from_this(); } -int socket::srt_group::configure_pre(SRTSOCKET sock) +int socket::srt_group::configure_pre(SRTSOCKET sock, int link_index) { + SRT_ASSERT(link_index < m_opts_link.size()); int maybe = m_blocking_mode ? 1 : 0; const int result = srt_setsockopt(sock, 0, SRTO_RCVSYN, &maybe, sizeof maybe); if (result == -1) return result; - // host is only checked for emptiness and depending on that the connection mode is selected. - // Here we are not exactly interested with that information. - std::vector failures; + const auto configure = [&](int li) -> int { + // host is only checked for emptiness and depending on that the connection mode is selected. + // Here we are not exactly interested with that information. + std::vector failures; - // NOTE: here host = "", so the 'connmode' will be returned as LISTENER always, - // but it doesn't matter here. We don't use 'connmode' for anything else than - // checking for failures. - SocketOption::Mode conmode = SrtConfigurePre(sock, m_host, m_options, &failures); + // NOTE: here host = "", so the 'connmode' will be returned as LISTENER always, + // but it doesn't matter here. We don't use 'connmode' for anything else than + // checking for failures. + // TODO: use per-link options too + SocketOption::Mode conmode = SrtConfigurePre(sock, m_host, m_opts_link[li], &failures); - if (conmode == SocketOption::FAILURE) - { - if (Verbose::on) + if (conmode == SocketOption::FAILURE) { - Verb() << "WARNING: failed to set options: "; - copy(failures.begin(), failures.end(), ostream_iterator(*Verbose::cverb, ", ")); - Verb(); + stringstream ss; + for (const auto v : failures) ss << v << ", "; + spdlog::error(LOG_SRT_GROUP "WARNING: failed to set options: {}", ss.str()); + return SRT_ERROR; } + return SRT_SUCCESS; + }; + + if (configure(0) != SRT_SUCCESS) return SRT_ERROR; - } - m_mode = static_cast(conmode); + if (link_index != 0) + return configure(link_index); return SRT_SUCCESS; } -int socket::srt_group::configure_post(SRTSOCKET sock) +int socket::srt_group::configure_post(SRTSOCKET sock, int link_index) { + SRT_ASSERT(link_index < m_opts_link.size()); int is_blocking = m_blocking_mode ? 1 : 0; int result = srt_setsockopt(sock, 0, SRTO_SNDSYN, &is_blocking, sizeof is_blocking); @@ -527,15 +537,15 @@ int socket::srt_group::configure_post(SRTSOCKET sock) // Here we are not exactly interested with that information. vector failures; - SrtConfigurePost(sock, m_options, &failures); + SrtConfigurePost(sock, m_opts_link[link_index], &failures); if (!failures.empty()) { if (Verbose::on) { - Verb() << "WARNING: failed to set options: "; - copy(failures.begin(), failures.end(), ostream_iterator(*Verbose::cverb, ", ")); - Verb(); + stringstream ss; + for (const auto v : failures) ss << v << ", "; + spdlog::error(LOG_SRT_GROUP "WARNING: failed to set options: {}", ss.str()); } } @@ -599,7 +609,7 @@ int socket::srt_group::write(const const_buffer& buffer, int timeout_ms) return res; } -socket::srt_group::connection_mode socket::srt_group::mode() const { return m_mode; } +socket::srt_group::connection_mode socket::srt_group::mode() const { cout << "mode = " << m_mode << endl; return m_mode; } int socket::srt_group::statistics(SRT_TRACEBSTATS& stats, bool instant) { diff --git a/xtransmit/srt_socket_group.hpp b/xtransmit/srt_socket_group.hpp index 12d495c..5fba5e7 100644 --- a/xtransmit/srt_socket_group.hpp +++ b/xtransmit/srt_socket_group.hpp @@ -52,8 +52,12 @@ class srt_group void configure(const std::map& options); void identify_connection_mode(const std::vector& uris); - int configure_pre(SRTSOCKET sock); - int configure_post(SRTSOCKET sock); + + /// Set SRT socket options with PRE binding. + /// @param [in] sock member socket + /// @param [in] link_index link index in m_opts_link + int configure_pre(SRTSOCKET sock, int link_index); + int configure_post(SRTSOCKET sock, int link_index); void create_listeners(const std::vector& uris); void create_callers(const std::vector& uris, SRT_GROUP_TYPE gtype); void release_targets(); @@ -62,6 +66,9 @@ class srt_group void on_connect_callback(SRTSOCKET sock, int error, const sockaddr*, int token); static void connect_callback_fn(void* opaq, SRTSOCKET sock, int error, const sockaddr* peer, int token); + using options = std::map; + static SRT_GROUP_TYPE detect_group_type(const options& opts); + public: /** * @returns The number of bytes received. @@ -105,7 +112,8 @@ class srt_group bool m_blocking_mode = true; string m_host; int m_port = -1; - std::map m_options; // All other options, as provided in the URI + std::vector m_opts_link; // Options per member link. [0] - also defines common options. + // Link index to token can be determined from m_targets scheduler m_scheduler; }; From 2511d98e9cc6411413d32f42b052f532dc2e8ae3 Mon Sep 17 00:00:00 2001 From: Maxim Sharabayko Date: Thu, 10 Dec 2020 17:52:07 +0100 Subject: [PATCH 09/24] Fixed group connection mode. Parsing link weight. --- xtransmit/srt_socket_group.cpp | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/xtransmit/srt_socket_group.cpp b/xtransmit/srt_socket_group.cpp index 58f2426..1e240fa 100644 --- a/xtransmit/srt_socket_group.cpp +++ b/xtransmit/srt_socket_group.cpp @@ -59,7 +59,7 @@ SRT_GROUP_TYPE socket::srt_group::detect_group_type(const options& opts) throw socket::exception(LOG_SRT_GROUP ": Failed to detect group mode. Value provided: " + gmode); } -static int detect_link_weight(UriParser& uri) +static int detect_link_weight(const UriParser& uri) { auto& options = uri.parameters(); const string key("weight"); @@ -129,7 +129,7 @@ SocketOption::Mode validate_srt_group(const vector& urls) socket::srt_group::srt_group(const vector& uris) { // validate_srt_group(..) also checks for empty 'uris' - const connection_mode m_mode = (connection_mode)validate_srt_group(uris); + m_mode = (connection_mode)validate_srt_group(uris); if (m_mode == FAILURE) throw socket::exception("Group mode validation failed!"); if (m_mode == RENDEZVOUS) @@ -164,7 +164,7 @@ socket::srt_group::srt_group(const vector& uris) // Create SRT socket group if (m_mode == LISTENER) { - spdlog::error(LOG_SRT_GROUP "Creating a group of listeners."); + spdlog::trace(LOG_SRT_GROUP "Creating a group of listeners."); create_listeners(uris); } else @@ -267,7 +267,9 @@ void socket::srt_group::create_callers(const vector& uris, SRT_GROUP_ const sockaddr* bindsa = nullptr; - const SRT_SOCKGROUPCONFIG gd = srt_prepare_endpoint(bindsa, sa.get(), sa.size()); + SRT_SOCKGROUPCONFIG gd = srt_prepare_endpoint(bindsa, sa.get(), sa.size()); + + gd.weight = detect_link_weight(uri); m_targets.push_back(gd); } @@ -490,7 +492,7 @@ int socket::srt_group::configure_pre(SRTSOCKET sock, int link_index) if (result == -1) return result; - const auto configure = [&](int li) -> int { + const auto configure_link = [&](int li) -> int { // host is only checked for emptiness and depending on that the connection mode is selected. // Here we are not exactly interested with that information. std::vector failures; @@ -512,11 +514,11 @@ int socket::srt_group::configure_pre(SRTSOCKET sock, int link_index) return SRT_SUCCESS; }; - if (configure(0) != SRT_SUCCESS) + if (configure_link(0) != SRT_SUCCESS) return SRT_ERROR; if (link_index != 0) - return configure(link_index); + return configure_link(link_index); return SRT_SUCCESS; } @@ -609,7 +611,7 @@ int socket::srt_group::write(const const_buffer& buffer, int timeout_ms) return res; } -socket::srt_group::connection_mode socket::srt_group::mode() const { cout << "mode = " << m_mode << endl; return m_mode; } +socket::srt_group::connection_mode socket::srt_group::mode() const { return m_mode; } int socket::srt_group::statistics(SRT_TRACEBSTATS& stats, bool instant) { From a95d5f5c35f453ab05e95057b56ad2d9ac0c2a7a Mon Sep 17 00:00:00 2001 From: Maxim Sharabayko Date: Mon, 21 Dec 2020 17:35:32 +0100 Subject: [PATCH 10/24] Set group caller PRE options --- xtransmit/srt_socket_group.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/xtransmit/srt_socket_group.cpp b/xtransmit/srt_socket_group.cpp index 1e240fa..7024423 100644 --- a/xtransmit/srt_socket_group.cpp +++ b/xtransmit/srt_socket_group.cpp @@ -253,6 +253,9 @@ void socket::srt_group::create_callers(const vector& uris, SRT_GROUP_ if (m_bind_socket == SRT_INVALID_SOCK) raise_exception("srt_create_group"); + if (SRT_SUCCESS != configure_pre(m_bind_socket, 0)) + throw socket::exception(srt_getlasterror_str()); + for (const auto& uri : uris) { sockaddr_any sa; From f792d18465978e7954dc95e436d5ed774d9cc2c2 Mon Sep 17 00:00:00 2001 From: Maxim Sharabayko Date: Wed, 20 Jan 2021 16:26:22 +0100 Subject: [PATCH 11/24] Fixed listener-side PRE configs --- xtransmit/srt_socket_group.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/xtransmit/srt_socket_group.cpp b/xtransmit/srt_socket_group.cpp index 7024423..8c175b6 100644 --- a/xtransmit/srt_socket_group.cpp +++ b/xtransmit/srt_socket_group.cpp @@ -224,10 +224,10 @@ void socket::srt_group::create_listeners(const vector& src_uri) if (SRT_SUCCESS != srt_setsockflag(s, SRTO_GROUPCONNECT, &gcon, sizeof gcon)) throw socket::exception(srt_getlasterror_str()); - if (SRT_SUCCESS != srt_bind(s, sa.get(), sa.size())) + if (SRT_SUCCESS != configure_pre(s, i)) throw socket::exception(srt_getlasterror_str()); - if (SRT_SUCCESS != configure_pre(s, i)) + if (SRT_SUCCESS != srt_bind(s, sa.get(), sa.size())) throw socket::exception(srt_getlasterror_str()); if (!m_blocking_mode) From 5055a22b6a2ff4f5f59591f73d7a0a1c1ea2ef62 Mon Sep 17 00:00:00 2001 From: Maxim Sharabayko Date: Thu, 21 Jan 2021 18:07:19 +0100 Subject: [PATCH 12/24] Printing SRC and DST URIs --- xtransmit/xtransmit-app.cpp | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/xtransmit/xtransmit-app.cpp b/xtransmit/xtransmit-app.cpp index fdc789a..a91dcca 100644 --- a/xtransmit/xtransmit-app.cpp +++ b/xtransmit/xtransmit-app.cpp @@ -191,6 +191,10 @@ int main(int argc, char** argv) // https://cliutils.gitlab.io/CLI11Tutorial/chapters/an-advanced-example.html if (sc_generate->parsed()) { + for (const auto url : dst_urls) + { + spdlog::info("DST URL: {}", url); + } generate::run(dst_urls, cfg_generate, force_break); return 0; } @@ -198,7 +202,7 @@ int main(int argc, char** argv) { for (const auto url : src_urls) { - cout << "URL: " << url << "\n"; + spdlog::info("SRC URL: {}", url); } xtransmit::receive::run(src_urls, cfg_receive, force_break); return 0; @@ -207,11 +211,11 @@ int main(int argc, char** argv) { for (const auto url : src_urls) { - cout << "SRC URL: " << url << "\n"; + spdlog::info("SRC URL: {}", url); } for (const auto url : dst_urls) { - cout << "DST URL: " << url << "\n"; + spdlog::info("DST URL: {}", url); } xtransmit::route::run(src, dst, cfg_route, force_break); From 92e2320ee3a55f5e1e85839bc1f1079643493187 Mon Sep 17 00:00:00 2001 From: Maxim Sharabayko Date: Fri, 22 Jan 2021 16:13:41 +0100 Subject: [PATCH 13/24] Added connect and listen callback to socket group --- xtransmit/generate.cpp | 25 ------------ xtransmit/receive.cpp | 26 ------------ xtransmit/srt_socket_group.cpp | 72 ++++++++++++++++++++++++++++++---- xtransmit/srt_socket_group.hpp | 9 ++++- 4 files changed, 72 insertions(+), 60 deletions(-) diff --git a/xtransmit/generate.cpp b/xtransmit/generate.cpp index 4954561..cf459b4 100644 --- a/xtransmit/generate.cpp +++ b/xtransmit/generate.cpp @@ -34,26 +34,6 @@ using shared_sock = std::shared_ptr; #define LOG_SC_GENERATE "GENERATE " -int srt_gen_listen_callback(void* opaq, SRTSOCKET sock, int hsversion, - const struct sockaddr* peeraddr, const char* streamid) -{ - spdlog::trace(LOG_SC_GENERATE "Accepted member socket 0x{:X}.", sock); - return 0; -} - -void srt_gen_connect_callback(void* opaq, SRTSOCKET sock, int error, const sockaddr* /*peer*/, int token) -{ - if (error != SRT_SUCCESS) - { - spdlog::warn(LOG_SC_GENERATE "Member socket 0x{:X} (token {}) connection failed: ({}) {}.", sock, token, error, - srt_strerror(error, 0)); - return; - } - - // After SRT v1.4.2 connection callback is no longer called on connection success. - spdlog::trace(LOG_SC_GENERATE "Member socket connected 0x{:X} (token {}).", sock, token); -} - void run_pipe(shared_sock dst, const config& cfg, const atomic_bool& force_break) { vector message_to_send(cfg.message_size); @@ -174,13 +154,8 @@ void xtransmit::generate::run(const vector& dst_urls, const config& cfg, const bool accept = s->mode() == socket::srt_group::LISTENER; if (accept) { - s->set_listen_callback(srt_gen_listen_callback, nullptr); s->listen(); } - else - { - s->set_connect_callback(srt_gen_connect_callback, nullptr); - } connection = accept ? s->accept() : s->connect(); } diff --git a/xtransmit/receive.cpp b/xtransmit/receive.cpp index 1cfa15b..5bfd629 100644 --- a/xtransmit/receive.cpp +++ b/xtransmit/receive.cpp @@ -33,8 +33,6 @@ using shared_sock = std::shared_ptr; #define LOG_SC_RECEIVE "RECEIVE " - - void trace_message(const size_t bytes, const vector &buffer, int conn_id) { ::cout << "RECEIVED MESSAGE length " << bytes << " on conn ID " << conn_id; @@ -138,26 +136,6 @@ void run_pipe(shared_sock src, const config &cfg, const atomic_bool &force_break } } -int srt_listen_callback(void* opaq, SRTSOCKET sock, int hsversion, - const struct sockaddr* peeraddr, const char* streamid) -{ - spdlog::trace(LOG_SC_RECEIVE "Accepted member socket 0x{:X}.", sock); - return 0; -} - -void srt_connect_callback(void* opaq, SRTSOCKET sock, int error, const sockaddr* /*peer*/, int token) -{ - if (error != SRT_SUCCESS) - { - spdlog::warn(LOG_SC_RECEIVE "Member socket 0x{:X} (token {}) connection failed: ({}) {}.", sock, token, error, - srt_strerror(error, 0)); - return; - } - - // After SRT v1.4.2 connection callback is no longer called on connection success. - spdlog::trace(LOG_SC_RECEIVE "Member socket connected 0x{:X} (token {}).", sock, token); -} - void xtransmit::receive::run(const vector &src_urls, const config &cfg, const atomic_bool &force_break) { if (src_urls.empty()) @@ -217,12 +195,8 @@ void xtransmit::receive::run(const vector &src_urls, const config &cfg, socket::srt_group* s = dynamic_cast(sock.get()); const bool accept = s->mode() == socket::srt_group::LISTENER; if (accept) { - s->set_listen_callback(&srt_listen_callback, nullptr); s->listen(); } - else { - s->set_connect_callback(&srt_connect_callback, nullptr); - } conn = accept ? s->accept() : s->connect(); } diff --git a/xtransmit/srt_socket_group.cpp b/xtransmit/srt_socket_group.cpp index 8c175b6..f42da08 100644 --- a/xtransmit/srt_socket_group.cpp +++ b/xtransmit/srt_socket_group.cpp @@ -164,7 +164,7 @@ socket::srt_group::srt_group(const vector& uris) // Create SRT socket group if (m_mode == LISTENER) { - spdlog::trace(LOG_SRT_GROUP "Creating a group of listeners."); + spdlog::trace(LOG_SRT_GROUP "Creating a group of listeners"); create_listeners(uris); } else @@ -286,10 +286,14 @@ void socket::srt_group::create_callers(const vector& uris, SRT_GROUP_ if (SRT_ERROR == srt_epoll_add_usock(m_epoll_io, m_bind_socket, &io_modes)) throw socket::exception(srt_getlasterror_str()); } + + set_connect_callback(); } void socket::srt_group::listen() { + set_listen_callback(); + for (const auto sockid : m_listeners) { if (srt_listen(sockid, 5) == SRT_ERROR) @@ -338,15 +342,71 @@ shared_srt_group socket::srt_group::accept() return make_shared(*this, accepted_sock); } -void socket::srt_group::set_listen_callback(srt_listen_callback_fn* hook_fn, void* hook_opaque) +void socket::srt_group::print_member_socket(SRTSOCKET sock) +{ + sockaddr_any sa; + int sa_len = sa.storage_size(); + srt_getpeername(sock, sa.get(), &sa_len); + + int weight = -1; // unknown + int gtype = -1; + int gtype_len = sizeof gtype; + + if (srt_getsockflag(sock, SRTO_GROUPTYPE, (void*) >ype, >ype_len) == SRT_SUCCESS + && gtype == SRT_GTYPE_BACKUP) + { + const SRTSOCKET group_id = srt_groupof(sock); + spdlog::trace(LOG_SRT_GROUP "group ID {}.", group_id); + SRT_SOCKGROUPDATA gdata[3] = {}; + size_t gdata_len = 3; + const int gsize = srt_group_data(group_id, gdata, &gdata_len); + for (int i = 0; i < gsize; ++i) + { + if (gdata[i].id != sock) + continue; + + weight = gdata[i].weight; + break; + } + } + + gtype += 1; + gtype = gtype < 0 ? 0 : (gtype > 3 ? 0 : gtype); + const char* gtype_str[] = { "NO GROUP", "BROADCAST", "BACKUP", "BALANCING"}; + spdlog::trace(LOG_SRT_GROUP "Member socket 0x{:X}, {} weight = {} remote IP {}", sock, + gtype_str[gtype], weight, sa.str()); +} + +int socket::srt_group::on_listen_callback(SRTSOCKET sock) +{ + m_scheduler.schedule_in(20ms, &socket::srt_group::print_member_socket, this, sock); + return 0; +} + +int socket::srt_group::listen_callback_fn(void* opaq, SRTSOCKET sock, int hsversion, + const struct sockaddr* peeraddr, const char* streamid) +{ + if (opaq == nullptr) + { + spdlog::warn(LOG_SRT_GROUP "listen_callback_fn does not have a pointer to the group"); + return 0; + } + + spdlog::trace(LOG_SRT_GROUP "Accepted member socket 0x{:X}", sock); + + // TODO: this group may no longer exist. Use some global array to track valid groups. + socket::srt_group* group = reinterpret_cast(opaq); + return group->on_listen_callback(sock); +} + +void socket::srt_group::set_listen_callback() { for (const auto sockid : m_listeners) { - if (srt_listen_callback(sockid, hook_fn, hook_opaque) == SRT_ERROR) + if (srt_listen_callback(sockid, listen_callback_fn, (void*) this) == SRT_ERROR) raise_exception("listen failed with {}", srt_getlasterror_str()); } } -// for (auto target : m_targets) void socket::srt_group::connect_callback_fn(void* opaq, SRTSOCKET sock, int error, const sockaddr* peer, int token) { @@ -396,12 +456,10 @@ void socket::srt_group::on_connect_callback(SRTSOCKET sock, int error, const soc spdlog::warn(LOG_SRT_GROUP "0x{:X}: Could not schedule member reconnection (token {})", m_bind_socket, token); return; - } -void socket::srt_group::set_connect_callback(srt_connect_callback_fn* hook_fn, void* hook_opaque) +void socket::srt_group::set_connect_callback() { - //srt_connect_callback(m_bind_socket, hook_fn, hook_opaque); srt_connect_callback(m_bind_socket, connect_callback_fn, (void*) this); } diff --git a/xtransmit/srt_socket_group.hpp b/xtransmit/srt_socket_group.hpp index 5fba5e7..4459bbc 100644 --- a/xtransmit/srt_socket_group.hpp +++ b/xtransmit/srt_socket_group.hpp @@ -45,14 +45,14 @@ class srt_group */ void listen() noexcept(false); - void set_listen_callback(srt_listen_callback_fn* hook_fn, void* hook_opaque); - void set_connect_callback(srt_connect_callback_fn* hook_fn, void* hook_opaque); private: void configure(const std::map& options); void identify_connection_mode(const std::vector& uris); + void set_listen_callback(); + void set_connect_callback(); /// Set SRT socket options with PRE binding. /// @param [in] sock member socket /// @param [in] link_index link index in m_opts_link @@ -65,10 +65,15 @@ class srt_group void on_connect_callback(SRTSOCKET sock, int error, const sockaddr*, int token); static void connect_callback_fn(void* opaq, SRTSOCKET sock, int error, const sockaddr* peer, int token); + int on_listen_callback(SRTSOCKET sock); + static int listen_callback_fn(void* opaq, SRTSOCKET sock, int hsversion, + const struct sockaddr* peeraddr, const char* streamid); using options = std::map; static SRT_GROUP_TYPE detect_group_type(const options& opts); + void print_member_socket(SRTSOCKET sock); + public: /** * @returns The number of bytes received. From 7a3c71072a1a392836ca0bc60bfcce0da47f8315 Mon Sep 17 00:00:00 2001 From: Maxim Sharabayko Date: Fri, 22 Jan 2021 16:22:12 +0100 Subject: [PATCH 14/24] Corrected group log a bit --- xtransmit/srt_socket_group.cpp | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/xtransmit/srt_socket_group.cpp b/xtransmit/srt_socket_group.cpp index f42da08..9cb882e 100644 --- a/xtransmit/srt_socket_group.cpp +++ b/xtransmit/srt_socket_group.cpp @@ -344,10 +344,6 @@ shared_srt_group socket::srt_group::accept() void socket::srt_group::print_member_socket(SRTSOCKET sock) { - sockaddr_any sa; - int sa_len = sa.storage_size(); - srt_getpeername(sock, sa.get(), &sa_len); - int weight = -1; // unknown int gtype = -1; int gtype_len = sizeof gtype; @@ -356,7 +352,6 @@ void socket::srt_group::print_member_socket(SRTSOCKET sock) && gtype == SRT_GTYPE_BACKUP) { const SRTSOCKET group_id = srt_groupof(sock); - spdlog::trace(LOG_SRT_GROUP "group ID {}.", group_id); SRT_SOCKGROUPDATA gdata[3] = {}; size_t gdata_len = 3; const int gsize = srt_group_data(group_id, gdata, &gdata_len); @@ -373,8 +368,8 @@ void socket::srt_group::print_member_socket(SRTSOCKET sock) gtype += 1; gtype = gtype < 0 ? 0 : (gtype > 3 ? 0 : gtype); const char* gtype_str[] = { "NO GROUP", "BROADCAST", "BACKUP", "BALANCING"}; - spdlog::trace(LOG_SRT_GROUP "Member socket 0x{:X}, {} weight = {} remote IP {}", sock, - gtype_str[gtype], weight, sa.str()); + spdlog::trace(LOG_SRT_GROUP "Member socket 0x{:X}, {} weight = {}", sock, + gtype_str[gtype], weight); } int socket::srt_group::on_listen_callback(SRTSOCKET sock) @@ -392,7 +387,8 @@ int socket::srt_group::listen_callback_fn(void* opaq, SRTSOCKET sock, int hsvers return 0; } - spdlog::trace(LOG_SRT_GROUP "Accepted member socket 0x{:X}", sock); + sockaddr_any sa(peeraddr); + spdlog::trace(LOG_SRT_GROUP "Accepted member socket 0x{:X}, remote IP {}", sock, sa.str()); // TODO: this group may no longer exist. Use some global array to track valid groups. socket::srt_group* group = reinterpret_cast(opaq); From b410b78253e24775af7e57d4c2840580352507b2 Mon Sep 17 00:00:00 2001 From: Maxim Sharabayko Date: Thu, 28 Jan 2021 15:23:13 +0100 Subject: [PATCH 15/24] Resolved C++14 string operator --- xtransmit/srt_socket_group.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/xtransmit/srt_socket_group.cpp b/xtransmit/srt_socket_group.cpp index 9cb882e..5eb7b76 100644 --- a/xtransmit/srt_socket_group.cpp +++ b/xtransmit/srt_socket_group.cpp @@ -374,7 +374,7 @@ void socket::srt_group::print_member_socket(SRTSOCKET sock) int socket::srt_group::on_listen_callback(SRTSOCKET sock) { - m_scheduler.schedule_in(20ms, &socket::srt_group::print_member_socket, this, sock); + m_scheduler.schedule_in(std::chrono::microseconds(20), &socket::srt_group::print_member_socket, this, sock); return 0; } @@ -445,7 +445,7 @@ void socket::srt_group::on_connect_callback(SRTSOCKET sock, int error, const soc spdlog::trace(LOG_SRT_GROUP "0x{:X}: Scheduling member reconnection (token {})", m_bind_socket, token); reconn_scheduled = true; - m_scheduler.schedule_in(1s, connfn, m_bind_socket, target); + m_scheduler.schedule_in(std::chrono::seconds(1), connfn, m_bind_socket, target); } if (!reconn_scheduled) From f66de1f2554eae704945ee0d940953a2ae8a1161 Mon Sep 17 00:00:00 2001 From: Maxim Sharabayko Date: Mon, 8 Feb 2021 17:48:48 +0100 Subject: [PATCH 16/24] Fixed reorder distance metric --- xtransmit/metrics_reorder.hpp | 2 +- xtransmit/srt_socket_group.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/xtransmit/metrics_reorder.hpp b/xtransmit/metrics_reorder.hpp index 8642af3..290e264 100644 --- a/xtransmit/metrics_reorder.hpp +++ b/xtransmit/metrics_reorder.hpp @@ -46,7 +46,7 @@ class reorder else // Packet reordering: pkt_seqno < m_seqno { ++m_stats.pkts_reordered; - const uint64_t reorder_dist = pkt_seqno - m_stats.expected_seqno; + const uint64_t reorder_dist = m_stats.expected_seqno - pkt_seqno; m_stats.reorder_dist = std::max(m_stats.reorder_dist, reorder_dist); //spdlog::warn(LOG_SC_RFC4737 "Detected reordered packet, dist {}", reorder_dist); diff --git a/xtransmit/srt_socket_group.cpp b/xtransmit/srt_socket_group.cpp index 5eb7b76..cbfb89d 100644 --- a/xtransmit/srt_socket_group.cpp +++ b/xtransmit/srt_socket_group.cpp @@ -498,7 +498,7 @@ shared_srt_group socket::srt_group::connect() if (!m_blocking_mode && false) { // This branch does not assign a token to the target - // therefiore it is not possible to schedule a reconnection. + // therefore it is not possible to schedule a reconnection. // srt_connect_group is to be used instead in both blocking and non-blocking modes. spdlog::debug( LOG_SRT_GROUP "non blocking"); From 85b5571a6670bc2f45e0d394f772c42c83535838 Mon Sep 17 00:00:00 2001 From: Maxim Sharabayko Date: Fri, 26 Feb 2021 17:50:31 +0100 Subject: [PATCH 17/24] Reorganized stats a bit. Moved unique packets closed to non-unique packets. Fixed first stats record. --- .gitignore | 2 +- xtransmit/socket.hpp | 1 + xtransmit/srt_socket_group.cpp | 37 ++++++++++++++++++---------------- 3 files changed, 22 insertions(+), 18 deletions(-) diff --git a/.gitignore b/.gitignore index 4726e3c..db754f5 100644 --- a/.gitignore +++ b/.gitignore @@ -12,7 +12,7 @@ # Build results _build*/ -build/ +build*/ [Dd]ebug/ [Dd]ebugPublic/ [Rr]elease/ diff --git a/xtransmit/socket.hpp b/xtransmit/socket.hpp index 0f261b4..8970fbe 100644 --- a/xtransmit/socket.hpp +++ b/xtransmit/socket.hpp @@ -59,6 +59,7 @@ class isocket virtual int write(const const_buffer &buffer, int timeout_ms = -1) = 0; public: + /** Check if statistics is supported by a socket implementation. * * @returns true if statistics is supported, false otherwise. diff --git a/xtransmit/srt_socket_group.cpp b/xtransmit/srt_socket_group.cpp index cbfb89d..3e332ac 100644 --- a/xtransmit/srt_socket_group.cpp +++ b/xtransmit/srt_socket_group.cpp @@ -690,15 +690,19 @@ const string socket::srt_group::stats_to_csv(int socketid, const SRT_TRACEBSTATS output << "Timepoint,"; #endif output << "Time,SocketID,pktFlowWindow,pktCongestionWindow,pktFlightSize,"; - output << "msRTT,mbpsBandwidth,mbpsMaxBW,pktSent,pktSndLoss,pktSndDrop,"; - output << "pktRetrans,byteSent,byteAvailSndBuf,byteSndDrop,mbpsSendRate,usPktSndPeriod,msSndBuf,"; - output << "pktRecv,pktRcvLoss,pktRcvDrop,pktRcvRetrans,pktRcvBelated,"; + output << "msRTT,mbpsBandwidth,mbpsMaxBW,pktSent,"; +#if HAS_UNIQUE_PKTS + output << "pktSentUnique,"; +#endif + output << "pktSndLoss,pktSndDrop,pktRetrans,byteSent,"; + output << "byteAvailSndBuf,byteSndDrop,mbpsSendRate,usPktSndPeriod,msSndBuf,pktRecv,"; +#if HAS_UNIQUE_PKTS + output << "pktRecvUnique,"; +#endif + output << "pktRcvLoss,pktRcvDrop,pktRcvRetrans,pktRcvBelated,"; output << "byteRecv,byteAvailRcvBuf,byteRcvLoss,byteRcvDrop,mbpsRecvRate,msRcvBuf,msRcvTsbPdDelay"; #if HAS_PKT_REORDER_TOL output << ",pktReorderTolerance"; -#endif -#if HAS_UNIQUE_PKTS - output << ",pktSentUnique,pktRecvUnique"; #endif output << endl; return output.str(); @@ -718,6 +722,9 @@ const string socket::srt_group::stats_to_csv(int socketid, const SRT_TRACEBSTATS output << stats.mbpsBandwidth << ','; output << stats.mbpsMaxBW << ','; output << stats.pktSent << ','; +#if HAS_UNIQUE_PKTS + output << stats.pktSentUnique << ","; +#endif output << stats.pktSndLoss << ','; output << stats.pktSndDrop << ','; @@ -730,6 +737,9 @@ const string socket::srt_group::stats_to_csv(int socketid, const SRT_TRACEBSTATS output << stats.msSndBuf << ','; output << stats.pktRecv << ','; +#if HAS_UNIQUE_PKTS + output << stats.pktRecvUnique << ","; +#endif output << stats.pktRcvLoss << ','; output << stats.pktRcvDrop << ','; output << stats.pktRcvRetrans << ','; @@ -747,11 +757,6 @@ const string socket::srt_group::stats_to_csv(int socketid, const SRT_TRACEBSTATS output << "," << stats.pktReorderTolerance; #endif -#if HAS_UNIQUE_PKTS - output << "," << stats.pktSentUnique; - output << "," << stats.pktRecvUnique; -#endif - output << endl; return output.str(); @@ -762,16 +767,14 @@ const string socket::srt_group::stats_to_csv(int socketid, const SRT_TRACEBSTATS const string socket::srt_group::statistics_csv(bool print_header) const { - //SRT_ASSERT(m_bind_socket != SRT_INVALID_SOCK); - SRT_TRACEBSTATS stats; + if (print_header) + return stats_to_csv(m_bind_socket, SRT_TRACEBSTATS(), print_header);; + + SRT_TRACEBSTATS stats = {}; if (SRT_ERROR == srt_bstats(m_bind_socket, &stats, true)) raise_exception("statistics"); - string csv_stats = stats_to_csv(m_bind_socket, stats, print_header); - if (print_header) - return csv_stats; - size_t group_size = 0; if (srt_group_data(m_bind_socket, NULL, &group_size) != SRT_SUCCESS) { From 61a785777604c399ba33887c85078c51fd683abf Mon Sep 17 00:00:00 2001 From: Maxim Sharabayko Date: Tue, 2 Mar 2021 17:42:38 +0100 Subject: [PATCH 18/24] Removed bonding to route subcommand --- xtransmit/route.cpp | 6 +++--- xtransmit/route.hpp | 2 +- xtransmit/xtransmit-app.cpp | 11 +---------- 3 files changed, 5 insertions(+), 14 deletions(-) diff --git a/xtransmit/route.cpp b/xtransmit/route.cpp index e8629de..9dfaf7d 100644 --- a/xtransmit/route.cpp +++ b/xtransmit/route.cpp @@ -131,13 +131,13 @@ void xtransmit::route::run(const string& src_url, const string& dst_url, } } -CLI::App* xtransmit::route::add_subcommand(CLI::App& app, config& cfg, vector& src_urls, vector& dst_urls) +CLI::App* xtransmit::route::add_subcommand(CLI::App& app, config& cfg, string& src_url, string& dst_url) { const map to_ms{ {"s", 1000}, {"ms", 1} }; CLI::App* sc_route = app.add_subcommand("route", "Route data (SRT, UDP)")->fallthrough(); - sc_route->add_option("-i,src", src_urls, "Source URI")->expected(1, 10); - sc_route->add_option("-o,dst", dst_urls, "Destination URI")->expected(1, 10); + sc_route->add_option("-i,src", src_url, "Source URI")->expected(1); + sc_route->add_option("-o,dst", dst_url, "Destination URI")->expected(1); sc_route->add_option("--msgsize", cfg.message_size, "Size of a buffer to receive message payload"); sc_route->add_flag("--bidir", cfg.bidir, "Enable bidirectional transmission"); sc_route->add_option("--statsfile", cfg.stats_file, "output stats report filename"); diff --git a/xtransmit/route.hpp b/xtransmit/route.hpp index cc604f4..a82c505 100644 --- a/xtransmit/route.hpp +++ b/xtransmit/route.hpp @@ -22,7 +22,7 @@ namespace xtransmit { const config& cfg, const std::atomic_bool& force_break); CLI::App* add_subcommand(CLI::App& app, config& cfg, - std::vector& src_urls, std::vector& dst_urls); + std::string& src_url, std::string& dst_url); } // namespace forward diff --git a/xtransmit/xtransmit-app.cpp b/xtransmit/xtransmit-app.cpp index a91dcca..ab27827 100644 --- a/xtransmit/xtransmit-app.cpp +++ b/xtransmit/xtransmit-app.cpp @@ -169,7 +169,7 @@ int main(int argc, char** argv) CLI::App* sc_receive = receive::add_subcommand(app, cfg_receive, src_urls); xtransmit::route::config cfg_route; - CLI::App* sc_route = route::add_subcommand(app, cfg_route, src_urls, dst_urls); + CLI::App* sc_route = route::add_subcommand(app, cfg_route, src, dst); #if ENABLE_FILE_TRANSFER CLI::App* sc_file = app.add_subcommand("file", "Send/receive a single file or folder contents")->fallthrough(); @@ -209,15 +209,6 @@ int main(int argc, char** argv) } else if (sc_route->parsed()) { - for (const auto url : src_urls) - { - spdlog::info("SRC URL: {}", url); - } - for (const auto url : dst_urls) - { - spdlog::info("DST URL: {}", url); - } - xtransmit::route::run(src, dst, cfg_route, force_break); return 0; } From 38b951611545e727351c1e84e57fa5466417b6b8 Mon Sep 17 00:00:00 2001 From: Maxim Sharabayko Date: Thu, 11 Mar 2021 16:43:32 +0100 Subject: [PATCH 19/24] Added weight to group CSV stats --- xtransmit/srt_socket_group.cpp | 11 ++++++----- xtransmit/srt_socket_group.hpp | 2 +- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/xtransmit/srt_socket_group.cpp b/xtransmit/srt_socket_group.cpp index 3e332ac..d5ca06a 100644 --- a/xtransmit/srt_socket_group.cpp +++ b/xtransmit/srt_socket_group.cpp @@ -675,7 +675,7 @@ int socket::srt_group::statistics(SRT_TRACEBSTATS& stats, bool instant) return srt_bstats(m_bind_socket, &stats, instant); } -const string socket::srt_group::stats_to_csv(int socketid, const SRT_TRACEBSTATS& stats, bool print_header) +const string socket::srt_group::stats_to_csv(int socketid, const SRT_TRACEBSTATS& stats, uint16_t weight, bool print_header) { std::ostringstream output; @@ -689,7 +689,7 @@ const string socket::srt_group::stats_to_csv(int socketid, const SRT_TRACEBSTATS #ifdef HAS_PUT_TIME output << "Timepoint,"; #endif - output << "Time,SocketID,pktFlowWindow,pktCongestionWindow,pktFlightSize,"; + output << "Time,SocketID,weight,pktFlowWindow,pktCongestionWindow,pktFlightSize,"; output << "msRTT,mbpsBandwidth,mbpsMaxBW,pktSent,"; #if HAS_UNIQUE_PKTS output << "pktSentUnique,"; @@ -714,6 +714,7 @@ const string socket::srt_group::stats_to_csv(int socketid, const SRT_TRACEBSTATS output << stats.msTimeStamp << ','; output << socketid << ','; + output << weight << ","; output << stats.pktFlowWindow << ','; output << stats.pktCongestionWindow << ','; output << stats.pktFlightSize << ','; @@ -768,12 +769,12 @@ const string socket::srt_group::stats_to_csv(int socketid, const SRT_TRACEBSTATS const string socket::srt_group::statistics_csv(bool print_header) const { if (print_header) - return stats_to_csv(m_bind_socket, SRT_TRACEBSTATS(), print_header);; + return stats_to_csv(m_bind_socket, SRT_TRACEBSTATS(), 0, print_header);; SRT_TRACEBSTATS stats = {}; if (SRT_ERROR == srt_bstats(m_bind_socket, &stats, true)) raise_exception("statistics"); - string csv_stats = stats_to_csv(m_bind_socket, stats, print_header); + string csv_stats = stats_to_csv(m_bind_socket, stats, 0, print_header); size_t group_size = 0; if (srt_group_data(m_bind_socket, NULL, &group_size) != SRT_SUCCESS) @@ -809,7 +810,7 @@ const string socket::srt_group::statistics_csv(bool print_header) const break; } - csv_stats += stats_to_csv(id, stats, false); + csv_stats += stats_to_csv(id, stats, group_data[i].weight, false); } return csv_stats; diff --git a/xtransmit/srt_socket_group.hpp b/xtransmit/srt_socket_group.hpp index 4459bbc..e6eb011 100644 --- a/xtransmit/srt_socket_group.hpp +++ b/xtransmit/srt_socket_group.hpp @@ -100,7 +100,7 @@ class srt_group int statistics(SRT_TRACEBSTATS& stats, bool instant = true); bool supports_statistics() const final { return true; } const std::string statistics_csv(bool print_header) const final; - static const std::string stats_to_csv(int socketid, const SRT_TRACEBSTATS& stats, bool print_header); + static const std::string stats_to_csv(int socketid, const SRT_TRACEBSTATS& stats, uint16_t weight, bool print_header); private: void raise_exception(const string&& place, SRTSOCKET sock = SRT_INVALID_SOCK) const; From c60366c74060cecd1f4ecefe6785d2401f969148 Mon Sep 17 00:00:00 2001 From: Maxim Sharabayko Date: Fri, 19 Mar 2021 12:46:21 +0100 Subject: [PATCH 20/24] Added warning to metrics about loss detected --- xtransmit/metrics.hpp | 1 + xtransmit/metrics_reorder.hpp | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/xtransmit/metrics.hpp b/xtransmit/metrics.hpp index 4e90b0f..30efe84 100644 --- a/xtransmit/metrics.hpp +++ b/xtransmit/metrics.hpp @@ -63,6 +63,7 @@ namespace metrics // TODO: latency measurements inline void validate_packet(const vector& payload) { + // TODO: validate payload const auto sys_time_now = system_clock::now(); const auto std_time_now = steady_clock::now(); diff --git a/xtransmit/metrics_reorder.hpp b/xtransmit/metrics_reorder.hpp index 290e264..8741c83 100644 --- a/xtransmit/metrics_reorder.hpp +++ b/xtransmit/metrics_reorder.hpp @@ -40,7 +40,7 @@ class reorder { const uint64_t lost = pkt_seqno - m_stats.expected_seqno; m_stats.pkts_lost += lost; - //spdlog::warn(LOG_SC_RFC4737 "Detected loss of {} packets", lost); + spdlog::warn("[METRICS] Detected loss of {} packets (seqno [{}; {}))", lost, m_stats.expected_seqno, pkt_seqno); m_stats.expected_seqno = pkt_seqno + 1; } else // Packet reordering: pkt_seqno < m_seqno @@ -49,7 +49,7 @@ class reorder const uint64_t reorder_dist = m_stats.expected_seqno - pkt_seqno; m_stats.reorder_dist = std::max(m_stats.reorder_dist, reorder_dist); - //spdlog::warn(LOG_SC_RFC4737 "Detected reordered packet, dist {}", reorder_dist); + spdlog::warn("[METRICS] Detected reordered packet, seqno {}, expected {} (dist {})", pkt_seqno, m_stats.expected_seqno, reorder_dist); } } From a7fe78dccb7847b30c88f90ab4b426d8f0964dfb Mon Sep 17 00:00:00 2001 From: Maxim Sharabayko Date: Fri, 19 Mar 2021 16:15:48 +0100 Subject: [PATCH 21/24] Member stats: continue after a broken member --- xtransmit/srt_socket_group.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/xtransmit/srt_socket_group.cpp b/xtransmit/srt_socket_group.cpp index d5ca06a..703bbe4 100644 --- a/xtransmit/srt_socket_group.cpp +++ b/xtransmit/srt_socket_group.cpp @@ -789,7 +789,7 @@ const string socket::srt_group::statistics_csv(bool print_header) const if (num_members == SRT_ERROR) { // Not throwing an exception as group stats was retrieved. - spdlog::warn(LOG_SRT_GROUP "0x{:X} statistics_csv: Failed to retrieve group data", m_bind_socket); + spdlog::warn(LOG_SRT_GROUP "0x{:X} statistics_csv: Failed to retrieve group data, {}", m_bind_socket, srt_getlasterror_str()); return csv_stats; } @@ -800,14 +800,14 @@ const string socket::srt_group::statistics_csv(bool print_header) const if (group_data[i].sockstate != SRTS_CONNECTED) { - spdlog::trace(LOG_SRT_GROUP "0x{:X} statistics_csv: Socket state is {}, skipping.", id, srt_logging::SockStatusStr(status)); + spdlog::trace(LOG_SRT_GROUP "0x{:X} statistics_csv: Socket 0x{:X} state is {}, skipping.", m_bind_socket, id, srt_logging::SockStatusStr(status)); continue; } if (SRT_ERROR == srt_bstats(id, &stats, true)) { - spdlog::warn(LOG_SRT_GROUP "0x{:X} statistics_csv: Failed to retrieve group member stats. {}", id, srt_getlasterror_str()); - break; + spdlog::warn(LOG_SRT_GROUP "0x{:X} statistics_csv: Failed to retrieve stats for member 0x{:X}. {}", m_bind_socket, id, srt_getlasterror_str()); + continue; } csv_stats += stats_to_csv(id, stats, group_data[i].weight, false); From 995c2d522a6da97dde1efe22be07d2521819938a Mon Sep 17 00:00:00 2001 From: Maxim Sharabayko Date: Mon, 29 Mar 2021 15:21:13 +0200 Subject: [PATCH 22/24] Fixed member group type log on accept --- xtransmit/srt_socket_group.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/xtransmit/srt_socket_group.cpp b/xtransmit/srt_socket_group.cpp index 703bbe4..b9b7b51 100644 --- a/xtransmit/srt_socket_group.cpp +++ b/xtransmit/srt_socket_group.cpp @@ -345,7 +345,7 @@ shared_srt_group socket::srt_group::accept() void socket::srt_group::print_member_socket(SRTSOCKET sock) { int weight = -1; // unknown - int gtype = -1; + int gtype = 0; int gtype_len = sizeof gtype; if (srt_getsockflag(sock, SRTO_GROUPTYPE, (void*) >ype, >ype_len) == SRT_SUCCESS @@ -365,7 +365,6 @@ void socket::srt_group::print_member_socket(SRTSOCKET sock) } } - gtype += 1; gtype = gtype < 0 ? 0 : (gtype > 3 ? 0 : gtype); const char* gtype_str[] = { "NO GROUP", "BROADCAST", "BACKUP", "BALANCING"}; spdlog::trace(LOG_SRT_GROUP "Member socket 0x{:X}, {} weight = {}", sock, From 1ad6eb2f914eab179cfdc557ecb05017a7b05156 Mon Sep 17 00:00:00 2001 From: Maxim Sharabayko Date: Wed, 31 Mar 2021 10:05:54 +0200 Subject: [PATCH 23/24] Minor: using SOCKET type instead of int --- xtransmit/socket.hpp | 2 +- xtransmit/srt_socket.hpp | 4 ++-- xtransmit/srt_socket_group.hpp | 4 ++-- xtransmit/udp_socket.hpp | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/xtransmit/socket.hpp b/xtransmit/socket.hpp index 8970fbe..f10b7a9 100644 --- a/xtransmit/socket.hpp +++ b/xtransmit/socket.hpp @@ -76,7 +76,7 @@ class isocket virtual const std::string statistics_csv(bool print_header) const { return std::string(); } - virtual int id() const = 0; + virtual SOCKET id() const = 0; }; } // namespace socket diff --git a/xtransmit/srt_socket.hpp b/xtransmit/srt_socket.hpp index 2d65310..92c56b0 100644 --- a/xtransmit/srt_socket.hpp +++ b/xtransmit/srt_socket.hpp @@ -90,7 +90,7 @@ class srt bool is_caller() const final { return m_mode == CALLER; } public: - int id() const final { return m_bind_socket; } + SOCKET id() const final { return m_bind_socket; } int statistics(SRT_TRACEBSTATS& stats, bool instant = true); bool supports_statistics() const final { return true; } const std::string statistics_csv(bool print_header) const final; @@ -101,7 +101,7 @@ class srt void raise_exception(const string&& place, const string&& reason) const; private: - int m_bind_socket = SRT_INVALID_SOCK; + SRTSOCKET m_bind_socket = SRT_INVALID_SOCK; int m_epoll_connect = -1; int m_epoll_io = -1; diff --git a/xtransmit/srt_socket_group.hpp b/xtransmit/srt_socket_group.hpp index e6eb011..5942553 100644 --- a/xtransmit/srt_socket_group.hpp +++ b/xtransmit/srt_socket_group.hpp @@ -96,7 +96,7 @@ class srt_group bool is_caller() const final { return m_mode == CALLER; } public: - int id() const final { return m_bind_socket; } + SOCKET id() const final { return m_bind_socket; } int statistics(SRT_TRACEBSTATS& stats, bool instant = true); bool supports_statistics() const final { return true; } const std::string statistics_csv(bool print_header) const final; @@ -107,7 +107,7 @@ class srt_group void raise_exception(const string&& place, const string&& reason) const; private: - int m_bind_socket = SRT_INVALID_SOCK; + SRTSOCKET m_bind_socket = SRT_INVALID_SOCK; std::vector m_listeners; std::vector m_targets; int m_epoll_connect = -1; diff --git a/xtransmit/udp_socket.hpp b/xtransmit/udp_socket.hpp index d75cc65..4c47065 100644 --- a/xtransmit/udp_socket.hpp +++ b/xtransmit/udp_socket.hpp @@ -32,7 +32,7 @@ class udp public: bool is_caller() const final { return m_host != ""; } - int id() const final { return m_bind_socket; } + SOCKET id() const final { return m_bind_socket; } public: /** From 70a3163272b8f7e17dbfeff086df3ffcc9e63e18 Mon Sep 17 00:00:00 2001 From: Maxim Sharabayko Date: Fri, 7 May 2021 14:46:51 +0200 Subject: [PATCH 24/24] Connection erro log message --- submodule/srt | 2 +- xtransmit/srt_socket_group.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/submodule/srt b/submodule/srt index 702153f..42c36ed 160000 --- a/submodule/srt +++ b/submodule/srt @@ -1 +1 @@ -Subproject commit 702153f098de79fd922196d97f8108d22d03c9f6 +Subproject commit 42c36ed311069c3a5453a7710068d722fcbb2095 diff --git a/xtransmit/srt_socket_group.cpp b/xtransmit/srt_socket_group.cpp index b9b7b51..3c85afb 100644 --- a/xtransmit/srt_socket_group.cpp +++ b/xtransmit/srt_socket_group.cpp @@ -426,7 +426,7 @@ void socket::srt_group::on_connect_callback(SRTSOCKET sock, int error, const soc return; } - spdlog::warn(LOG_SRT_GROUP "Member socket 0x{:X} (token {}) connection failed: ({}) {}.", sock, token, error, + spdlog::warn(LOG_SRT_GROUP "Member socket 0x{:X} (token {}) connection error: ({}) {}.", sock, token, error, srt_strerror(error, 0)); bool reconn_scheduled = false;