diff --git a/.gitignore b/.gitignore index 4726e3c..db754f5 100644 --- a/.gitignore +++ b/.gitignore @@ -12,7 +12,7 @@ # Build results _build*/ -build/ +build*/ [Dd]ebug/ [Dd]ebugPublic/ [Rr]elease/ diff --git a/CMakeLists.txt b/CMakeLists.txt index d481cf6..50d5288 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -27,6 +27,7 @@ add_subdirectory(submodule/spdlog) SET(ENABLE_APPS ON) SET(ENABLE_SHARED OFF) SET(ENABLE_ENCRYPTION ON) +SET(ENABLE_EXPERIMENTAL_BONDING ON) add_subdirectory(submodule/srt) #set_target_properties(srt-live-transmit diff --git a/submodule/srt b/submodule/srt index 702153f..42c36ed 160000 --- a/submodule/srt +++ b/submodule/srt @@ -1 +1 @@ -Subproject commit 702153f098de79fd922196d97f8108d22d03c9f6 +Subproject commit 42c36ed311069c3a5453a7710068d722fcbb2095 diff --git a/xtransmit/CMakeLists.txt b/xtransmit/CMakeLists.txt index 9a2a585..d8fe513 100644 --- a/xtransmit/CMakeLists.txt +++ b/xtransmit/CMakeLists.txt @@ -17,6 +17,15 @@ target_include_directories(srt-xtransmit PUBLIC ${PTHREAD_INCLUDE_DIR} ) +if (ENABLE_EXPERIMENTAL_BONDING) + set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -DENABLE_EXPERIMENTAL_BONDING=1") + set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DENABLE_EXPERIMENTAL_BONDING=1") +endif() + +if (ENABLE_STDCXX_SYNC) + set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -DENABLE_STDCXX_SYNC=1") + set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DENABLE_STDCXX_SYNC=1") +endif() if (ENABLE_CXX17) set(REQUIRE_CXX_VER 17) diff --git a/xtransmit/generate.cpp b/xtransmit/generate.cpp index 959b2dd..cf459b4 100644 --- a/xtransmit/generate.cpp +++ b/xtransmit/generate.cpp @@ -14,6 +14,7 @@ // xtransmit #include "socket_stats.hpp" #include "srt_socket.hpp" +#include "srt_socket_group.hpp" #include "udp_socket.hpp" #include "generate.hpp" #include "pacer.hpp" @@ -36,7 +37,6 @@ using shared_sock = std::shared_ptr; void run_pipe(shared_sock dst, const config& cfg, const atomic_bool& force_break) { vector message_to_send(cfg.message_size); - iota(message_to_send.begin(), message_to_send.end(), (char)0); const auto start_time = steady_clock::now(); const int num_messages = cfg.duration > 0 ? -1 : cfg.num_messages; @@ -94,12 +94,19 @@ void run_pipe(shared_sock dst, const config& cfg, const atomic_bool& force_break } } -void xtransmit::generate::run(const string& dst_url, const config& cfg, const atomic_bool& force_break) +void xtransmit::generate::run(const vector& dst_urls, const config& cfg, const atomic_bool& force_break) { - const UriParser uri(dst_url); + if (dst_urls.empty()) + { + spdlog::error(LOG_SC_GENERATE "No destination URI was provided"); + return; + } - shared_sock sock; - shared_sock connection; + vector urls; + for (const string& url : dst_urls) + { + urls.emplace_back(UriParser(url)); + } const bool write_stats = cfg.stats_file != "" && cfg.stats_freq_ms > 0; // make_unique is not supported by GCC 4.8, only starting from GCC 4.9 :( @@ -121,39 +128,60 @@ void xtransmit::generate::run(const string& dst_url, const config& cfg, const at do { try { - if (uri.proto() == "udp") + shared_sock sock; + shared_sock connection; + + if (urls.size() == 1) { - connection = make_shared(uri); + if (urls[0].proto() == "udp") + { + connection = make_shared(urls[0]); + } + else + { + sock = make_shared(urls[0]); + socket::srt* s = dynamic_cast(sock.get()); + const bool accept = s->mode() == socket::srt::LISTENER; + if (accept) + s->listen(); + connection = accept ? s->accept() : s->connect(); + } } else { - sock = make_shared(uri); - socket::srt* s = static_cast(sock.get()); - const bool accept = s->mode() == socket::srt::LISTENER; + sock = make_shared(urls); + socket::srt_group* s = dynamic_cast(sock.get()); + const bool accept = s->mode() == socket::srt_group::LISTENER; if (accept) + { s->listen(); + } connection = accept ? s->accept() : s->connect(); } if (stats) stats->add_socket(connection); run_pipe(connection, cfg, force_break); + if (stats && cfg.reconnect) + stats->clear(); } catch (const socket::exception& e) { spdlog::warn(LOG_SC_GENERATE "{}", e.what()); + if (stats) + stats->clear(); } } while (cfg.reconnect && !force_break); } -CLI::App* xtransmit::generate::add_subcommand(CLI::App& app, config& cfg, string& dst_url) +CLI::App* xtransmit::generate::add_subcommand(CLI::App& app, config& cfg, vector& dst_urls) { const map to_bps{{"kbps", 1000}, {"Mbps", 1000000}, {"Gbps", 1000000000}}; const map to_ms{{"s", 1000}, {"ms", 1}}; const map to_sec{{"s", 1}, {"min", 60}, {"mins", 60}}; CLI::App* sc_generate = app.add_subcommand("generate", "Send generated data (SRT, UDP)")->fallthrough(); - sc_generate->add_option("dst", dst_url, "Destination URI"); + sc_generate->add_option("dst", dst_urls, "Destination URI")->expected(1, 10); sc_generate->add_option("--msgsize", cfg.message_size, "Size of a message to send"); sc_generate->add_option("--sendrate", cfg.sendrate, "Bitrate to generate") ->transform(CLI::AsNumberWithUnit(to_bps, CLI::AsNumberWithUnit::CASE_SENSITIVE)); diff --git a/xtransmit/generate.hpp b/xtransmit/generate.hpp index 4d3a4be..fbd0031 100644 --- a/xtransmit/generate.hpp +++ b/xtransmit/generate.hpp @@ -24,8 +24,9 @@ struct config std::string playback_csv; }; -void run(const std::string& dst_url, const config& cfg, const std::atomic_bool& force_break); +void run(const std::vector& dst_urls, const config& cfg, const std::atomic_bool& force_break); + +CLI::App* add_subcommand(CLI::App& app, config& cfg, std::vector& dst_urls); -CLI::App* add_subcommand(CLI::App& app, config& cfg, std::string& dst_url); } // namespace generate } // namespace xtransmit diff --git a/xtransmit/metrics.hpp b/xtransmit/metrics.hpp index 4e90b0f..30efe84 100644 --- a/xtransmit/metrics.hpp +++ b/xtransmit/metrics.hpp @@ -63,6 +63,7 @@ namespace metrics // TODO: latency measurements inline void validate_packet(const vector& payload) { + // TODO: validate payload const auto sys_time_now = system_clock::now(); const auto std_time_now = steady_clock::now(); diff --git a/xtransmit/metrics_reorder.hpp b/xtransmit/metrics_reorder.hpp index 8642af3..8741c83 100644 --- a/xtransmit/metrics_reorder.hpp +++ b/xtransmit/metrics_reorder.hpp @@ -40,16 +40,16 @@ class reorder { const uint64_t lost = pkt_seqno - m_stats.expected_seqno; m_stats.pkts_lost += lost; - //spdlog::warn(LOG_SC_RFC4737 "Detected loss of {} packets", lost); + spdlog::warn("[METRICS] Detected loss of {} packets (seqno [{}; {}))", lost, m_stats.expected_seqno, pkt_seqno); m_stats.expected_seqno = pkt_seqno + 1; } else // Packet reordering: pkt_seqno < m_seqno { ++m_stats.pkts_reordered; - const uint64_t reorder_dist = pkt_seqno - m_stats.expected_seqno; + const uint64_t reorder_dist = m_stats.expected_seqno - pkt_seqno; m_stats.reorder_dist = std::max(m_stats.reorder_dist, reorder_dist); - //spdlog::warn(LOG_SC_RFC4737 "Detected reordered packet, dist {}", reorder_dist); + spdlog::warn("[METRICS] Detected reordered packet, seqno {}, expected {} (dist {})", pkt_seqno, m_stats.expected_seqno, reorder_dist); } } diff --git a/xtransmit/receive.cpp b/xtransmit/receive.cpp index bb39c11..5bfd629 100644 --- a/xtransmit/receive.cpp +++ b/xtransmit/receive.cpp @@ -13,6 +13,7 @@ // xtransmit #include "socket_stats.hpp" #include "srt_socket.hpp" +#include "srt_socket_group.hpp" #include "udp_socket.hpp" #include "receive.hpp" #include "metrics.hpp" @@ -32,8 +33,6 @@ using shared_sock = std::shared_ptr; #define LOG_SC_RECEIVE "RECEIVE " - - void trace_message(const size_t bytes, const vector &buffer, int conn_id) { ::cout << "RECEIVED MESSAGE length " << bytes << " on conn ID " << conn_id; @@ -137,12 +136,19 @@ void run_pipe(shared_sock src, const config &cfg, const atomic_bool &force_break } } -void xtransmit::receive::run(const string &src_url, const config &cfg, const atomic_bool &force_break) +void xtransmit::receive::run(const vector &src_urls, const config &cfg, const atomic_bool &force_break) { - const UriParser uri(src_url); + if (src_urls.empty()) + { + spdlog::error(LOG_SC_RECEIVE "No destination URI was provided"); + return; + } - shared_sock sock; - shared_sock conn; + vector urls; + for (const string& url : src_urls) + { + urls.emplace_back(url); + } unique_ptr stats; @@ -164,17 +170,33 @@ void xtransmit::receive::run(const string &src_url, const config &cfg, const ato do { try { - if (uri.proto() == "udp") + shared_sock sock; + shared_sock conn; + + if (urls.size() == 1) { - conn = make_shared(uri); + if (urls[0].proto() == "udp") + { + conn = make_shared(urls[0]); + } + else + { + sock = make_shared(urls[0]); + socket::srt* s = dynamic_cast(sock.get()); + const bool accept = s->mode() == socket::srt::LISTENER; + if (accept) + s->listen(); + conn = accept ? s->accept() : s->connect(); + } } else { - sock = make_shared(uri); - socket::srt* s = static_cast(sock.get()); - const bool accept = s->mode() == socket::srt::LISTENER; - if (accept) + sock = make_shared(urls); + socket::srt_group* s = dynamic_cast(sock.get()); + const bool accept = s->mode() == socket::srt_group::LISTENER; + if (accept) { s->listen(); + } conn = accept ? s->accept() : s->connect(); } @@ -187,16 +209,18 @@ void xtransmit::receive::run(const string &src_url, const config &cfg, const ato catch (const socket::exception & e) { spdlog::warn(LOG_SC_RECEIVE "{}", e.what()); + if (stats) + stats->clear(); } } while (cfg.reconnect && !force_break); } -CLI::App* xtransmit::receive::add_subcommand(CLI::App& app, config& cfg, string& src_url) +CLI::App* xtransmit::receive::add_subcommand(CLI::App& app, config& cfg, vector& src_urls) { const map to_ms{ {"s", 1000}, {"ms", 1} }; CLI::App* sc_receive = app.add_subcommand("receive", "Receive data (SRT, UDP)")->fallthrough(); - sc_receive->add_option("src", src_url, "Source URI"); + sc_receive->add_option("--input,-i,src", src_urls, "Source URI"); sc_receive->add_option("--msgsize", cfg.message_size, "Size of a buffer to receive message payload"); sc_receive->add_option("--statsfile", cfg.stats_file, "output stats report filename"); sc_receive->add_option("--statsfreq", cfg.stats_freq_ms, "output stats report frequency (ms)") @@ -208,6 +232,7 @@ CLI::App* xtransmit::receive::add_subcommand(CLI::App& app, config& cfg, string& sc_receive->add_option("--metricsfreq", cfg.metrics_freq_ms, "Metrics report frequency") ->transform(CLI::AsNumberWithUnit(to_ms, CLI::AsNumberWithUnit::CASE_SENSITIVE)); sc_receive->add_flag("--twoway", cfg.send_reply, "Both send and receive data"); + sc_receive->add_option("--input-group", cfg.inputs, "More input group URLs for SRT bonding"); return sc_receive; } diff --git a/xtransmit/receive.hpp b/xtransmit/receive.hpp index d95b54e..9c3dc2b 100644 --- a/xtransmit/receive.hpp +++ b/xtransmit/receive.hpp @@ -1,6 +1,7 @@ #pragma once #include #include +#include // Third party libraries #include "CLI/CLI.hpp" @@ -21,14 +22,15 @@ namespace xtransmit { int message_size = 1316; int stats_freq_ms = 0; std::string stats_file; + std::vector inputs; }; - void run(const std::string& url, const config& cfg, + void run(const std::vector& urls, const config& cfg, const std::atomic_bool& force_break); - CLI::App* add_subcommand(CLI::App& app, config& cfg, std::string& src_url); + CLI::App* add_subcommand(CLI::App& app, config& cfg, std::vector& src_urls); } // namespace receive diff --git a/xtransmit/route.cpp b/xtransmit/route.cpp index efd8151..9dfaf7d 100644 --- a/xtransmit/route.cpp +++ b/xtransmit/route.cpp @@ -78,7 +78,7 @@ namespace route if(uri.proto() == "srt") { shared_sock socket = make_shared(uri); - socket::srt* s = static_cast(socket.get()); + socket::srt* s = dynamic_cast(socket.get()); const bool accept = s->mode() == socket::srt::LISTENER; if (accept) s->listen(); @@ -95,6 +95,12 @@ namespace route void xtransmit::route::run(const string& src_url, const string& dst_url, const config& cfg, const atomic_bool& force_break) { + if (src_url.empty() || dst_url.empty()) + { + spdlog::error(LOG_SC_ROUTE "Empty source/destination was provided"); + return; + } + try { const bool write_stats = cfg.stats_file != "" && cfg.stats_freq_ms > 0; // make_unique is not supported by GCC 4.8, only starting from GCC 4.9 :( @@ -130,8 +136,8 @@ CLI::App* xtransmit::route::add_subcommand(CLI::App& app, config& cfg, string& s const map to_ms{ {"s", 1000}, {"ms", 1} }; CLI::App* sc_route = app.add_subcommand("route", "Route data (SRT, UDP)")->fallthrough(); - sc_route->add_option("src", src_url, "Source URI"); - sc_route->add_option("dst", dst_url, "Destination URI"); + sc_route->add_option("-i,src", src_url, "Source URI")->expected(1); + sc_route->add_option("-o,dst", dst_url, "Destination URI")->expected(1); sc_route->add_option("--msgsize", cfg.message_size, "Size of a buffer to receive message payload"); sc_route->add_flag("--bidir", cfg.bidir, "Enable bidirectional transmission"); sc_route->add_option("--statsfile", cfg.stats_file, "output stats report filename"); diff --git a/xtransmit/scheduler.hpp b/xtransmit/scheduler.hpp new file mode 100644 index 0000000..3e51a11 --- /dev/null +++ b/xtransmit/scheduler.hpp @@ -0,0 +1,128 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace xtransmit +{ +using namespace std; +using namespace std::chrono; + +class task +{ +public: + explicit task(std::function&& f) + : f(std::move(f)) + { + } + + function f; +}; + +class scheduler +{ +public: + explicit scheduler(unsigned int max_n_tasks = 4) + : done_(false) + , thread_(&scheduler::timer_loop, this) + { + } + + scheduler(const scheduler&) = delete; + + scheduler(scheduler&&) noexcept = delete; + + scheduler& operator=(const scheduler&) = delete; + + scheduler& operator=(scheduler&&) noexcept = delete; + + ~scheduler() + { + done_ = true; + sync_.cv.notify_one(); + if (thread_.joinable()) + thread_.join(); + } + + template + void schedule_on(const steady_clock::time_point time, Callable&& f, Args&&... args) + { + shared_ptr t = + make_shared(bind(forward(f), forward(args)...)); + add_task(time, move(t)); + } + + template + void schedule_in(const steady_clock::duration time, Callable&& f, Args&&... args) + { + schedule_on(steady_clock::now() + time, forward(f), forward(args)...); + } + +private: + atomic done_; + struct + { + mutex mtx; + condition_variable cv; + } sync_; + + multimap> tasks_; + mutex lock_; + thread thread_; + + void timer_loop() + { + while (!this->done_) + { + this->manage_tasks(); + + if (this->tasks_.empty()) + { + unique_lock lock(sync_.mtx); + sync_.cv.wait(lock); + } + else + { + const auto time_of_first_task = (*tasks_.begin()).first; + unique_lock lock(sync_.mtx); + sync_.cv.wait_until(lock, time_of_first_task); + } + } + } + + + void add_task(const steady_clock::time_point time, shared_ptr t) + { + lock_guard l(lock_); + tasks_.emplace(time, move(t)); + sync_.cv.notify_one(); + } + + void manage_tasks() + { + lock_guard l(lock_); + + auto end_of_tasks_to_run = tasks_.upper_bound(steady_clock::now()); + if (end_of_tasks_to_run != tasks_.begin()) + { + // for all tasks_ that have been triggered + for (auto i = tasks_.begin(); i != end_of_tasks_to_run; ++i) + { + auto& task = (*i).second; + task->f(); + } + + // remove the completed tasks_ + tasks_.erase(tasks_.begin(), end_of_tasks_to_run); + } + } +}; + +} // namespace xtransmit diff --git a/xtransmit/socket.hpp b/xtransmit/socket.hpp index 0f261b4..f10b7a9 100644 --- a/xtransmit/socket.hpp +++ b/xtransmit/socket.hpp @@ -59,6 +59,7 @@ class isocket virtual int write(const const_buffer &buffer, int timeout_ms = -1) = 0; public: + /** Check if statistics is supported by a socket implementation. * * @returns true if statistics is supported, false otherwise. @@ -75,7 +76,7 @@ class isocket virtual const std::string statistics_csv(bool print_header) const { return std::string(); } - virtual int id() const = 0; + virtual SOCKET id() const = 0; }; } // namespace socket diff --git a/xtransmit/srt_socket.cpp b/xtransmit/srt_socket.cpp index a582ad8..f2584c8 100644 --- a/xtransmit/srt_socket.cpp +++ b/xtransmit/srt_socket.cpp @@ -79,12 +79,12 @@ socket::srt::~srt() { if (!m_blocking_mode) { - spdlog::debug(LOG_SOCK_SRT "0x{:X} Closing. Releasing epolls", m_bind_socket); + spdlog::debug(LOG_SOCK_SRT "@{} srt::~srt. releasing epolls", m_bind_socket); if (m_epoll_connect != -1) srt_epoll_release(m_epoll_connect); srt_epoll_release(m_epoll_io); } - spdlog::debug(LOG_SOCK_SRT "0x{:X} Closing", m_bind_socket); + spdlog::debug(LOG_SOCK_SRT "0x{:X} srt::~srt: closing", m_bind_socket); srt_close(m_bind_socket); } @@ -242,7 +242,7 @@ std::future socket::srt::async_read(std::vector &buffer) return std::future(); } -void socket::srt::assert_options_valid(const std::map& options) +void socket::srt::assert_options_valid(const map& options, const unordered_set& extra) { #ifdef ENABLE_CXX17 for (const auto& [key, val] : options) @@ -263,7 +263,7 @@ void socket::srt::assert_options_valid(const std::map& options) break; } - if (opt_found || key == "bind" || key == "mode") + if (opt_found || extra.count(key)) continue; stringstream ss; @@ -275,7 +275,7 @@ void socket::srt::assert_options_valid(const std::map& options) void socket::srt::assert_options_valid() const { - assert_options_valid(m_options); + assert_options_valid(m_options, {"bind", "mode"}); } diff --git a/xtransmit/srt_socket.hpp b/xtransmit/srt_socket.hpp index 0c7307f..92c56b0 100644 --- a/xtransmit/srt_socket.hpp +++ b/xtransmit/srt_socket.hpp @@ -5,6 +5,7 @@ #include #include #include +#include // xtransmit #include "buffer.hpp" @@ -51,9 +52,10 @@ class srt /// Verifies URI options provided are valid. /// /// @param [in] options a map of options key-value pairs to validate + /// @param [in] extra a set of extra options that are valid, e.g. "mode", "bind" /// @throw socket::exception on failure /// - static void assert_options_valid(const std::map& options); + static void assert_options_valid(const std::map& options, const std::unordered_set& extra); private: void configure(const std::map& options); @@ -88,7 +90,7 @@ class srt bool is_caller() const final { return m_mode == CALLER; } public: - int id() const final { return m_bind_socket; } + SOCKET id() const final { return m_bind_socket; } int statistics(SRT_TRACEBSTATS& stats, bool instant = true); bool supports_statistics() const final { return true; } const std::string statistics_csv(bool print_header) const final; @@ -99,7 +101,7 @@ class srt void raise_exception(const string&& place, const string&& reason) const; private: - int m_bind_socket = SRT_INVALID_SOCK; + SRTSOCKET m_bind_socket = SRT_INVALID_SOCK; int m_epoll_connect = -1; int m_epoll_io = -1; diff --git a/xtransmit/srt_socket_group.cpp b/xtransmit/srt_socket_group.cpp new file mode 100644 index 0000000..3c85afb --- /dev/null +++ b/xtransmit/srt_socket_group.cpp @@ -0,0 +1,816 @@ +#include +#include +#include // std::ostream_iterator + +// submodules +#include "spdlog/spdlog.h" + +// xtransmit +#include "srt_socket_group.hpp" +#include "srt_socket.hpp" +#include "misc.hpp" // HAS_PUTTIME + +// srt utils +#include "verbose.hpp" +#include "socketoptions.hpp" +#include "apputil.hpp" +#include "common.h" // SRT library's SockStatusStr(..) + +using namespace std; +using namespace xtransmit; +using shared_srt_group = shared_ptr; + +#define LOG_SRT_GROUP "SOCKET::SRT_GROUP " + +SocketOption::Mode detect_srt_mode(const UriParser& uri) +{ + string modestr = "default"; + string adapter; + + const auto& options = uri.parameters(); + + if (options.count("mode")) + { + modestr = options.at("mode"); + } + + if (options.count("adapter")) + { + adapter = options.at("adapter"); + } + + return SrtInterpretMode(modestr, uri.host(), adapter); +} + +SRT_GROUP_TYPE socket::srt_group::detect_group_type(const options& opts) +{ + const string key("grouptype"); + + if (!opts.count(key)) + return SRT_GTYPE_BROADCAST; + + const string gmode = opts.at(key); + if (gmode == "broadcast") + return SRT_GTYPE_BROADCAST; + + if (gmode == "backup") + return SRT_GTYPE_BACKUP; + + throw socket::exception(LOG_SRT_GROUP ": Failed to detect group mode. Value provided: " + gmode); +} + +static int detect_link_weight(const UriParser& uri) +{ + auto& options = uri.parameters(); + const string key("weight"); + + if (!options.count(key)) + return 0; + + const string weight_str = options.at(key); + int weight = 0; + try { + weight = std::stoi(weight_str); + } + catch (std::invalid_argument const &) + { + throw socket::exception(LOG_SRT_GROUP ": Bad input. weight=" + weight_str); + } + catch (std::out_of_range const &e) + { + throw socket::exception(LOG_SRT_GROUP ": Integer overflow. weight=" + weight_str); + } + + // the allowed value for weight is between 0 and 32767 + if (weight < 0 || weight >32767) + throw socket::exception(LOG_SRT_GROUP ": Wrong link weight provided. The allowed value is between 0 and 32767."); + + return weight; +} + +SocketOption::Mode validate_srt_group(const vector& urls) +{ + SocketOption::Mode prev_mode = SocketOption::FAILURE; + // All URLs ha + for (const auto url : urls) + { + if (url.type() != UriParser::SRT) + { + spdlog::error(LOG_SRT_GROUP "URI {} is not SRT.", url.uri()); + return SocketOption::FAILURE; + } + + const auto mode = detect_srt_mode(url); + if (mode <= SocketOption::FAILURE || mode > SocketOption::RENDEZVOUS) + { + spdlog::error(LOG_SRT_GROUP "Failed to detect SRT mode for URI {}.", url.uri()); + return SocketOption::FAILURE; + } + + if (prev_mode != SocketOption::FAILURE && mode != prev_mode) + { + spdlog::error(LOG_SRT_GROUP + "Failed to match SRT modes for provided URIs. URI {} has mode {}. Previous mode is {}", + url.uri(), + SocketOption::mode_names[mode], + SocketOption::mode_names[prev_mode]); + return SocketOption::FAILURE; + } + + prev_mode = mode; + } + + return prev_mode; +} + +// TODO: m_options per socket: +// - m_opts_group +// - m_opts_link[n] +socket::srt_group::srt_group(const vector& uris) +{ + // validate_srt_group(..) also checks for empty 'uris' + m_mode = (connection_mode)validate_srt_group(uris); + if (m_mode == FAILURE) + throw socket::exception("Group mode validation failed!"); + if (m_mode == RENDEZVOUS) + throw socket::exception("Rendezvous mode is not supported by socket groups!"); + + for (auto uri : uris) + { + // Will throw an exception if invalid options were provided. + srt::assert_options_valid(uri.parameters(), {"bind", "mode", "weight", "grouptype"}); + m_opts_link.push_back(uri.parameters()); + } + + const SRT_GROUP_TYPE gtype = detect_group_type(m_opts_link[0]); + + if (m_opts_link[0].count("blocking")) + { + m_blocking_mode = !false_names.count(m_opts_link[0].at("blocking")); + m_opts_link[0].erase("blocking"); + } + + if (!m_blocking_mode) + { + m_epoll_connect = srt_epoll_create(); + if (m_epoll_connect == -1) + throw socket::exception(srt_getlasterror_str()); + + m_epoll_io = srt_epoll_create(); + if (m_epoll_io == -1) + throw socket::exception(srt_getlasterror_str()); + } + + // Create SRT socket group + if (m_mode == LISTENER) + { + spdlog::trace(LOG_SRT_GROUP "Creating a group of listeners"); + create_listeners(uris); + } + else + { + const char* gtype_str = (gtype == SRT_GTYPE_BACKUP) ? "main/backup" : "broadcast"; + spdlog::trace(LOG_SRT_GROUP "Creating a group of callers (type {}).", gtype_str); + create_callers(uris, gtype); + } +} + +socket::srt_group::srt_group(srt_group& group, int group_id) + : m_bind_socket(group_id) + , m_blocking_mode(group.m_blocking_mode) + , m_mode(group.m_mode) +{ + if (!m_blocking_mode) + { + m_epoll_io = srt_epoll_create(); + if (m_epoll_io == -1) + throw socket::exception(srt_getlasterror_str()); + + const int io_modes = SRT_EPOLL_IN | SRT_EPOLL_OUT | SRT_EPOLL_ERR; + if (SRT_ERROR == srt_epoll_add_usock(m_epoll_io, m_bind_socket, &io_modes)) + throw socket::exception(srt_getlasterror_str()); + } +} + +socket::srt_group::~srt_group() +{ + if (!m_blocking_mode) + { + spdlog::debug(LOG_SRT_GROUP "0x{:X} Closing. Releasing epolls", m_bind_socket); + if (m_epoll_connect != -1) + srt_epoll_release(m_epoll_connect); + if (m_epoll_io != -1) + srt_epoll_release(m_epoll_io); + } + spdlog::debug(LOG_SRT_GROUP "0x{:X} Closing SRT group", m_bind_socket); + release_targets(); + release_listeners(); + srt_close(m_bind_socket); +} + +void socket::srt_group::create_listeners(const vector& src_uri) +{ + // Create listeners according to the parameters + for (size_t i = 0; i < src_uri.size(); ++i) + { + const UriParser& url = src_uri[i]; + sockaddr_any sa = CreateAddr(url.host(), url.portno()); + + SRTSOCKET s = srt_create_socket(); + if (s == SRT_INVALID_SOCK) + throw socket::exception(srt_getlasterror_str()); + + int gcon = 1; + if (SRT_SUCCESS != srt_setsockflag(s, SRTO_GROUPCONNECT, &gcon, sizeof gcon)) + throw socket::exception(srt_getlasterror_str()); + + if (SRT_SUCCESS != configure_pre(s, i)) + throw socket::exception(srt_getlasterror_str()); + + if (SRT_SUCCESS != srt_bind(s, sa.get(), sa.size())) + throw socket::exception(srt_getlasterror_str()); + + if (!m_blocking_mode) + { + const int modes = SRT_EPOLL_IN | SRT_EPOLL_ERR; + if (SRT_ERROR == srt_epoll_add_usock(m_epoll_connect, s, &modes)) + throw socket::exception(srt_getlasterror_str()); + + const int io_modes = SRT_EPOLL_IN | SRT_EPOLL_OUT | SRT_EPOLL_ERR; + if (SRT_ERROR == srt_epoll_add_usock(m_epoll_io, s, &io_modes)) + throw socket::exception(srt_getlasterror_str()); + } + + spdlog::trace(LOG_SRT_GROUP "Created listener 0x{:X} on {}:{}", s, url.host(), url.portno()); + + m_listeners.push_back(s); + } +} + +void socket::srt_group::create_callers(const vector& uris, SRT_GROUP_TYPE gtype) +{ + m_bind_socket = srt_create_group(gtype); + if (m_bind_socket == SRT_INVALID_SOCK) + raise_exception("srt_create_group"); + + if (SRT_SUCCESS != configure_pre(m_bind_socket, 0)) + throw socket::exception(srt_getlasterror_str()); + + for (const auto& uri : uris) + { + sockaddr_any sa; + try + { + sa = CreateAddr(uri.host(), uri.portno()); + } + catch (const std::invalid_argument& e) + { + raise_exception("connect::create_addr", e.what()); + } + + const sockaddr* bindsa = nullptr; + + SRT_SOCKGROUPCONFIG gd = srt_prepare_endpoint(bindsa, sa.get(), sa.size()); + + gd.weight = detect_link_weight(uri); + m_targets.push_back(gd); + } + + if (!m_blocking_mode) + { + const int modes = SRT_EPOLL_OUT | SRT_EPOLL_ERR; + if (SRT_ERROR == srt_epoll_add_usock(m_epoll_connect, m_bind_socket, &modes)) + throw socket::exception(srt_getlasterror_str()); + + const int io_modes = SRT_EPOLL_IN | SRT_EPOLL_OUT | SRT_EPOLL_ERR; + if (SRT_ERROR == srt_epoll_add_usock(m_epoll_io, m_bind_socket, &io_modes)) + throw socket::exception(srt_getlasterror_str()); + } + + set_connect_callback(); +} + +void socket::srt_group::listen() +{ + set_listen_callback(); + + for (const auto sockid : m_listeners) + { + if (srt_listen(sockid, 5) == SRT_ERROR) + raise_exception("listen failed with {}", srt_getlasterror_str()); + } +} + +shared_srt_group socket::srt_group::accept() +{ + // spdlog::debug(LOG_SRT_GROUP "0x{:X} (srt://{}:{:d}) {} Waiting for incoming connection", + // m_bind_socket, m_host, m_port, m_blocking_mode ? "SYNC" : "ASYNC"); + + SRTSOCKET accepted_sock = SRT_INVALID_SOCK; + // Wait for REAL connected state if nonblocking mode + if (!m_blocking_mode) + { + // Socket readiness for connection is checked by polling on READ allowed sockets in case of listeners. + // In the group connection mode we wait for the first accepted connection. + constexpr int timeout_ms = -1; + int len = 2; + SRTSOCKET ready[2]; + if (srt_epoll_wait(m_epoll_connect, ready, &len, 0, 0, timeout_ms, 0, 0, 0, 0) == SRT_ERROR) + raise_exception("accept::epoll_wait"); + + spdlog::trace(LOG_SRT_GROUP "Epoll read-ready sock 0x{:X}, 0x{:X}", ready[0], len > 1 ? ready[1] : 0); + + sockaddr_in scl; + int sclen = sizeof scl; + const SRTSOCKET lstnr_sock = ready[0]; + accepted_sock = srt_accept(lstnr_sock, (sockaddr*)&scl, &sclen); + if (accepted_sock == SRT_INVALID_SOCK) + raise_exception("accept", ready[1]); + } + else + { + accepted_sock = srt_accept_bond(m_listeners.data(), m_listeners.size(), -1); + if (accepted_sock == SRT_INVALID_SOCK) + raise_exception("accept_bond failed with {}", srt_getlasterror_str()); + } + + spdlog::info(LOG_SRT_GROUP "Accepted connection sock 0x{:X}", accepted_sock); + const int res = configure_post(accepted_sock, 0); // TODO: are there POST options per link? + if (res == SRT_ERROR) + raise_exception("accept::configure_post"); + + return make_shared(*this, accepted_sock); +} + +void socket::srt_group::print_member_socket(SRTSOCKET sock) +{ + int weight = -1; // unknown + int gtype = 0; + int gtype_len = sizeof gtype; + + if (srt_getsockflag(sock, SRTO_GROUPTYPE, (void*) >ype, >ype_len) == SRT_SUCCESS + && gtype == SRT_GTYPE_BACKUP) + { + const SRTSOCKET group_id = srt_groupof(sock); + SRT_SOCKGROUPDATA gdata[3] = {}; + size_t gdata_len = 3; + const int gsize = srt_group_data(group_id, gdata, &gdata_len); + for (int i = 0; i < gsize; ++i) + { + if (gdata[i].id != sock) + continue; + + weight = gdata[i].weight; + break; + } + } + + gtype = gtype < 0 ? 0 : (gtype > 3 ? 0 : gtype); + const char* gtype_str[] = { "NO GROUP", "BROADCAST", "BACKUP", "BALANCING"}; + spdlog::trace(LOG_SRT_GROUP "Member socket 0x{:X}, {} weight = {}", sock, + gtype_str[gtype], weight); +} + +int socket::srt_group::on_listen_callback(SRTSOCKET sock) +{ + m_scheduler.schedule_in(std::chrono::microseconds(20), &socket::srt_group::print_member_socket, this, sock); + return 0; +} + +int socket::srt_group::listen_callback_fn(void* opaq, SRTSOCKET sock, int hsversion, + const struct sockaddr* peeraddr, const char* streamid) +{ + if (opaq == nullptr) + { + spdlog::warn(LOG_SRT_GROUP "listen_callback_fn does not have a pointer to the group"); + return 0; + } + + sockaddr_any sa(peeraddr); + spdlog::trace(LOG_SRT_GROUP "Accepted member socket 0x{:X}, remote IP {}", sock, sa.str()); + + // TODO: this group may no longer exist. Use some global array to track valid groups. + socket::srt_group* group = reinterpret_cast(opaq); + return group->on_listen_callback(sock); +} + +void socket::srt_group::set_listen_callback() +{ + for (const auto sockid : m_listeners) + { + if (srt_listen_callback(sockid, listen_callback_fn, (void*) this) == SRT_ERROR) + raise_exception("listen failed with {}", srt_getlasterror_str()); + } +} + +void socket::srt_group::connect_callback_fn(void* opaq, SRTSOCKET sock, int error, const sockaddr* peer, int token) +{ + if (opaq == nullptr) + { + spdlog::warn(LOG_SRT_GROUP "connect_callback_fn does not have a pointer to the group"); + return; + } + + // TODO: this group may no longer exist. Use some global array to track valid groups. + socket::srt_group* group = reinterpret_cast(opaq); + + group->on_connect_callback(sock, error, peer, token); +} + +void socket::srt_group::on_connect_callback(SRTSOCKET sock, int error, const sockaddr* /*peer*/, int token) +{ + if (error == SRT_SUCCESS) + { + // After SRT v1.4.2 connection callback is no longer called on connection success. + spdlog::trace(LOG_SRT_GROUP "Member socket connected 0x{:X} (token {}).", sock, token); + return; + } + + spdlog::warn(LOG_SRT_GROUP "Member socket 0x{:X} (token {}) connection error: ({}) {}.", sock, token, error, + srt_strerror(error, 0)); + + bool reconn_scheduled = false; + for (auto target : m_targets) + { + if (target.token != token) + continue; + + auto connfn = [](SRTSOCKET group, SRT_SOCKGROUPCONFIG target) { + spdlog::trace(LOG_SRT_GROUP "0x{:X}: Reconnecting member socket (token {})", group, target.token); + const int st = srt_connect_group(group, &target, 1); + if (st == SRT_ERROR) + spdlog::warn(LOG_SRT_GROUP "0x{:X}: Member reconnection failed (token {})", group, target.token); + }; + + spdlog::trace(LOG_SRT_GROUP "0x{:X}: Scheduling member reconnection (token {})", m_bind_socket, token); + reconn_scheduled = true; + m_scheduler.schedule_in(std::chrono::seconds(1), connfn, m_bind_socket, target); + } + + if (!reconn_scheduled) + spdlog::warn(LOG_SRT_GROUP "0x{:X}: Could not schedule member reconnection (token {})", m_bind_socket, token); + + return; +} + +void socket::srt_group::set_connect_callback() +{ + srt_connect_callback(m_bind_socket, connect_callback_fn, (void*) this); +} + +void socket::srt_group::raise_exception(const string&& place, SRTSOCKET sock) const +{ + const int udt_result = srt_getlasterror(nullptr); + const string message = srt_getlasterror_str(); + spdlog::debug( + LOG_SRT_GROUP "0x{:X} {} ERROR {} {}", sock != SRT_INVALID_SOCK ? sock : m_bind_socket, place, udt_result, message); + throw socket::exception(place + ": " + message); +} + +void socket::srt_group::raise_exception(const string&& place, const string&& reason) const +{ + spdlog::debug(LOG_SRT_GROUP "0x{:X} {} ERROR {}", m_bind_socket, place, reason); + throw socket::exception(place + ": " + reason); +} + +void socket::srt_group::release_targets() +{ + for (auto& gd : m_targets) + srt_delete_config(gd.config); + m_targets.clear(); +} + +void socket::srt_group::release_listeners() +{ + for (auto sock : m_listeners) { + spdlog::trace(LOG_SRT_GROUP "Closing listener 0x{:X}", sock); + srt_close(sock); + } + m_listeners.clear(); +} + +shared_srt_group socket::srt_group::connect() +{ + spdlog::debug( + LOG_SRT_GROUP "0x{:X} {} Connecting group to remote SRT", m_bind_socket, m_blocking_mode ? "SYNC" : "ASYNC"); + + if (!m_blocking_mode && false) + { + // This branch does not assign a token to the target + // therefore it is not possible to schedule a reconnection. + // srt_connect_group is to be used instead in both blocking and non-blocking modes. + spdlog::debug( + LOG_SRT_GROUP "non blocking"); + for (auto target : m_targets) + { + sockaddr_any target_addr(target.peeraddr); + const int st = srt_connect(m_bind_socket, target_addr.get(), target_addr.size()); + if (st == SRT_ERROR) + raise_exception("srt_group::connect_member"); + } + + // In case of a caller a connection event triggers write-readiness. + int len = 2; + SRTSOCKET ready[2]; + if (srt_epoll_wait(m_epoll_connect, 0, 0, ready, &len, -1, 0, 0, 0, 0) != -1) + { + if (srt_getsockstate(m_bind_socket) != SRTS_CONNECTED) + { + const int reason = srt_getrejectreason(m_bind_socket); + raise_exception("connect failed", srt_rejectreason_str(reason)); + } + } + else + { + raise_exception("srt_group::connect.epoll_wait"); + } + } + else + { + spdlog::debug( + LOG_SRT_GROUP "srt_connect_group"); + const int st = srt_connect_group(m_bind_socket, m_targets.data(), m_targets.size()); + if (st == SRT_ERROR) + raise_exception("srt_group::connect"); + } + + spdlog::debug( + LOG_SRT_GROUP "0x{:X} {} Group member connected to remote", m_bind_socket, m_blocking_mode ? "SYNC" : "ASYNC"); + + return shared_from_this(); +} + +int socket::srt_group::configure_pre(SRTSOCKET sock, int link_index) +{ + SRT_ASSERT(link_index < m_opts_link.size()); + int maybe = m_blocking_mode ? 1 : 0; + const int result = srt_setsockopt(sock, 0, SRTO_RCVSYN, &maybe, sizeof maybe); + if (result == -1) + return result; + + const auto configure_link = [&](int li) -> int { + // host is only checked for emptiness and depending on that the connection mode is selected. + // Here we are not exactly interested with that information. + std::vector failures; + + // NOTE: here host = "", so the 'connmode' will be returned as LISTENER always, + // but it doesn't matter here. We don't use 'connmode' for anything else than + // checking for failures. + // TODO: use per-link options too + SocketOption::Mode conmode = SrtConfigurePre(sock, m_host, m_opts_link[li], &failures); + + if (conmode == SocketOption::FAILURE) + { + stringstream ss; + for (const auto v : failures) ss << v << ", "; + spdlog::error(LOG_SRT_GROUP "WARNING: failed to set options: {}", ss.str()); + return SRT_ERROR; + } + + return SRT_SUCCESS; + }; + + if (configure_link(0) != SRT_SUCCESS) + return SRT_ERROR; + + if (link_index != 0) + return configure_link(link_index); + + return SRT_SUCCESS; +} + +int socket::srt_group::configure_post(SRTSOCKET sock, int link_index) +{ + SRT_ASSERT(link_index < m_opts_link.size()); + int is_blocking = m_blocking_mode ? 1 : 0; + + int result = srt_setsockopt(sock, 0, SRTO_SNDSYN, &is_blocking, sizeof is_blocking); + if (result == -1) + return result; + result = srt_setsockopt(sock, 0, SRTO_RCVSYN, &is_blocking, sizeof is_blocking); + if (result == -1) + return result; + + // host is only checked for emptiness and depending on that the connection mode is selected. + // Here we are not exactly interested with that information. + vector failures; + + SrtConfigurePost(sock, m_opts_link[link_index], &failures); + + if (!failures.empty()) + { + if (Verbose::on) + { + stringstream ss; + for (const auto v : failures) ss << v << ", "; + spdlog::error(LOG_SRT_GROUP "WARNING: failed to set options: {}", ss.str()); + } + } + + return 0; +} + +size_t socket::srt_group::read(const mutable_buffer& buffer, int timeout_ms) +{ + if (!m_blocking_mode) + { + int ready[2] = {SRT_INVALID_SOCK, SRT_INVALID_SOCK}; + int len = 2; + + const int epoll_res = srt_epoll_wait(m_epoll_io, ready, &len, nullptr, nullptr, timeout_ms, 0, 0, 0, 0); + if (epoll_res == SRT_ERROR) + { + if (srt_getlasterror(nullptr) == SRT_ETIMEOUT) + return 0; + + raise_exception("read::epoll"); + } + } + + const int res = srt_recvmsg2(m_bind_socket, static_cast(buffer.data()), (int)buffer.size(), nullptr); + if (SRT_ERROR == res) + { + if (srt_getlasterror(nullptr) != SRT_EASYNCRCV) + raise_exception("read::recv"); + + spdlog::warn(LOG_SRT_GROUP "recvmsg returned error 6002: read error, try again"); + return 0; + } + + return static_cast(res); +} + +int socket::srt_group::write(const const_buffer& buffer, int timeout_ms) +{ + if (!m_blocking_mode) + { + int ready[2] = {SRT_INVALID_SOCK, SRT_INVALID_SOCK}; + int len = 2; + int rready[2] = {SRT_INVALID_SOCK, SRT_INVALID_SOCK}; + int rlen = 2; + // TODO: check error fds + const int res = srt_epoll_wait(m_epoll_io, rready, &rlen, ready, &len, timeout_ms, 0, 0, 0, 0); + if (res == SRT_ERROR) + raise_exception("write::epoll"); + } + + const int res = + srt_sendmsg2(m_bind_socket, static_cast(buffer.data()), static_cast(buffer.size()), nullptr); + if (res == SRT_ERROR) + { + if (srt_getlasterror(nullptr) == SRT_EASYNCSND) + return 0; + + raise_exception("socket::write::send", srt_getlasterror_str()); + } + + return res; +} + +socket::srt_group::connection_mode socket::srt_group::mode() const { return m_mode; } + +int socket::srt_group::statistics(SRT_TRACEBSTATS& stats, bool instant) +{ + return srt_bstats(m_bind_socket, &stats, instant); +} + +const string socket::srt_group::stats_to_csv(int socketid, const SRT_TRACEBSTATS& stats, uint16_t weight, bool print_header) +{ + std::ostringstream output; + +#define HAS_PKT_REORDER_TOL (SRT_VERSION_MAJOR >= 1) && (SRT_VERSION_MINOR >= 4) && (SRT_VERSION_PATCH > 0) +// pktSentUnique, pktRecvUnique were added in SRT v1.4.2 +#define HAS_UNIQUE_PKTS \ + (SRT_VERSION_MAJOR == 1) && ((SRT_VERSION_MINOR > 4) || ((SRT_VERSION_MINOR == 4) && (SRT_VERSION_PATCH >= 2))) + + if (print_header) + { +#ifdef HAS_PUT_TIME + output << "Timepoint,"; +#endif + output << "Time,SocketID,weight,pktFlowWindow,pktCongestionWindow,pktFlightSize,"; + output << "msRTT,mbpsBandwidth,mbpsMaxBW,pktSent,"; +#if HAS_UNIQUE_PKTS + output << "pktSentUnique,"; +#endif + output << "pktSndLoss,pktSndDrop,pktRetrans,byteSent,"; + output << "byteAvailSndBuf,byteSndDrop,mbpsSendRate,usPktSndPeriod,msSndBuf,pktRecv,"; +#if HAS_UNIQUE_PKTS + output << "pktRecvUnique,"; +#endif + output << "pktRcvLoss,pktRcvDrop,pktRcvRetrans,pktRcvBelated,"; + output << "byteRecv,byteAvailRcvBuf,byteRcvLoss,byteRcvDrop,mbpsRecvRate,msRcvBuf,msRcvTsbPdDelay"; +#if HAS_PKT_REORDER_TOL + output << ",pktReorderTolerance"; +#endif + output << endl; + return output.str(); + } + +#ifdef HAS_PUT_TIME + output << print_timestamp_now() << ','; +#endif // HAS_PUT_TIME + + output << stats.msTimeStamp << ','; + output << socketid << ','; + output << weight << ","; + output << stats.pktFlowWindow << ','; + output << stats.pktCongestionWindow << ','; + output << stats.pktFlightSize << ','; + + output << stats.msRTT << ','; + output << stats.mbpsBandwidth << ','; + output << stats.mbpsMaxBW << ','; + output << stats.pktSent << ','; +#if HAS_UNIQUE_PKTS + output << stats.pktSentUnique << ","; +#endif + output << stats.pktSndLoss << ','; + output << stats.pktSndDrop << ','; + + output << stats.pktRetrans << ','; + output << stats.byteSent << ','; + output << stats.byteAvailSndBuf << ','; + output << stats.byteSndDrop << ','; + output << stats.mbpsSendRate << ','; + output << stats.usPktSndPeriod << ','; + output << stats.msSndBuf << ','; + + output << stats.pktRecv << ','; +#if HAS_UNIQUE_PKTS + output << stats.pktRecvUnique << ","; +#endif + output << stats.pktRcvLoss << ','; + output << stats.pktRcvDrop << ','; + output << stats.pktRcvRetrans << ','; + output << stats.pktRcvBelated << ','; + + output << stats.byteRecv << ','; + output << stats.byteAvailRcvBuf << ','; + output << stats.byteRcvLoss << ','; + output << stats.byteRcvDrop << ','; + output << stats.mbpsRecvRate << ','; + output << stats.msRcvBuf << ','; + output << stats.msRcvTsbPdDelay; + +#if HAS_PKT_REORDER_TOL + output << "," << stats.pktReorderTolerance; +#endif + + output << endl; + + return output.str(); + +#undef HAS_PUT_TIME +#undef HAS_UNIQUE_PKTS +} + +const string socket::srt_group::statistics_csv(bool print_header) const +{ + if (print_header) + return stats_to_csv(m_bind_socket, SRT_TRACEBSTATS(), 0, print_header);; + + SRT_TRACEBSTATS stats = {}; + if (SRT_ERROR == srt_bstats(m_bind_socket, &stats, true)) + raise_exception("statistics"); + string csv_stats = stats_to_csv(m_bind_socket, stats, 0, print_header); + + size_t group_size = 0; + if (srt_group_data(m_bind_socket, NULL, &group_size) != SRT_SUCCESS) + { + // Not throwing an exception as group stats was retrieved. + spdlog::warn(LOG_SRT_GROUP "0x{:X} statistics_csv: Failed to retrieve the number of group members", m_bind_socket); + return csv_stats; + } + + vector group_data(group_size); + const int num_members = srt_group_data(m_bind_socket, group_data.data(), &group_size); + if (num_members == SRT_ERROR) + { + // Not throwing an exception as group stats was retrieved. + spdlog::warn(LOG_SRT_GROUP "0x{:X} statistics_csv: Failed to retrieve group data, {}", m_bind_socket, srt_getlasterror_str()); + return csv_stats; + } + + for (int i = 0; i < num_members; ++i) + { + const int id = group_data[i].id; + const SRT_SOCKSTATUS status = group_data[i].sockstate; + + if (group_data[i].sockstate != SRTS_CONNECTED) + { + spdlog::trace(LOG_SRT_GROUP "0x{:X} statistics_csv: Socket 0x{:X} state is {}, skipping.", m_bind_socket, id, srt_logging::SockStatusStr(status)); + continue; + } + + if (SRT_ERROR == srt_bstats(id, &stats, true)) + { + spdlog::warn(LOG_SRT_GROUP "0x{:X} statistics_csv: Failed to retrieve stats for member 0x{:X}. {}", m_bind_socket, id, srt_getlasterror_str()); + continue; + } + + csv_stats += stats_to_csv(id, stats, group_data[i].weight, false); + } + + return csv_stats; +} diff --git a/xtransmit/srt_socket_group.hpp b/xtransmit/srt_socket_group.hpp new file mode 100644 index 0000000..5942553 --- /dev/null +++ b/xtransmit/srt_socket_group.hpp @@ -0,0 +1,127 @@ +#pragma once +#include +#include +#include +#include +#include +#include + +// xtransmit +#include "buffer.hpp" +#include "socket.hpp" +#include "scheduler.hpp" + +// OpenSRT +#include "srt.h" +#include "uriparser.hpp" + +namespace xtransmit +{ +namespace socket +{ + +class srt_group + : public std::enable_shared_from_this + , public isocket +{ + using string = std::string; + using shared_srt_group = std::shared_ptr; + +public: + explicit srt_group(const std::vector& uris); + + srt_group(srt_group& group, int group_id); + + virtual ~srt_group(); + +public: + shared_srt_group connect(); + shared_srt_group accept(); + + /** + * Start listening on the incomming connection requests. + * + * May throw a socket_exception. + */ + void listen() noexcept(false); + + +private: + void configure(const std::map& options); + + void identify_connection_mode(const std::vector& uris); + + void set_listen_callback(); + void set_connect_callback(); + /// Set SRT socket options with PRE binding. + /// @param [in] sock member socket + /// @param [in] link_index link index in m_opts_link + int configure_pre(SRTSOCKET sock, int link_index); + int configure_post(SRTSOCKET sock, int link_index); + void create_listeners(const std::vector& uris); + void create_callers(const std::vector& uris, SRT_GROUP_TYPE gtype); + void release_targets(); + void release_listeners(); + + void on_connect_callback(SRTSOCKET sock, int error, const sockaddr*, int token); + static void connect_callback_fn(void* opaq, SRTSOCKET sock, int error, const sockaddr* peer, int token); + int on_listen_callback(SRTSOCKET sock); + static int listen_callback_fn(void* opaq, SRTSOCKET sock, int hsversion, + const struct sockaddr* peeraddr, const char* streamid); + + using options = std::map; + static SRT_GROUP_TYPE detect_group_type(const options& opts); + + void print_member_socket(SRTSOCKET sock); + +public: + /** + * @returns The number of bytes received. + * + * @throws socket_exception Thrown on failure. + */ + size_t read(const mutable_buffer& buffer, int timeout_ms = -1) final; + int write(const const_buffer& buffer, int timeout_ms = -1) final; + + enum connection_mode + { + FAILURE = -1, + LISTENER = 0, + CALLER = 1, + RENDEZVOUS = 2 + }; + + connection_mode mode() const; + + bool is_caller() const final { return m_mode == CALLER; } + +public: + SOCKET id() const final { return m_bind_socket; } + int statistics(SRT_TRACEBSTATS& stats, bool instant = true); + bool supports_statistics() const final { return true; } + const std::string statistics_csv(bool print_header) const final; + static const std::string stats_to_csv(int socketid, const SRT_TRACEBSTATS& stats, uint16_t weight, bool print_header); + +private: + void raise_exception(const string&& place, SRTSOCKET sock = SRT_INVALID_SOCK) const; + void raise_exception(const string&& place, const string&& reason) const; + +private: + SRTSOCKET m_bind_socket = SRT_INVALID_SOCK; + std::vector m_listeners; + std::vector m_targets; + int m_epoll_connect = -1; + int m_epoll_io = -1; + + connection_mode m_mode = FAILURE; + bool m_blocking_mode = true; + string m_host; + int m_port = -1; + std::vector m_opts_link; // Options per member link. [0] - also defines common options. + // Link index to token can be determined from m_targets + + scheduler m_scheduler; +}; + +} // namespace socket +} // namespace xtransmit diff --git a/xtransmit/udp_socket.hpp b/xtransmit/udp_socket.hpp index d75cc65..4c47065 100644 --- a/xtransmit/udp_socket.hpp +++ b/xtransmit/udp_socket.hpp @@ -32,7 +32,7 @@ class udp public: bool is_caller() const final { return m_host != ""; } - int id() const final { return m_bind_socket; } + SOCKET id() const final { return m_bind_socket; } public: /** diff --git a/xtransmit/xtransmit-app.cpp b/xtransmit/xtransmit-app.cpp index d677908..ab27827 100644 --- a/xtransmit/xtransmit-app.cpp +++ b/xtransmit/xtransmit-app.cpp @@ -65,7 +65,7 @@ string create_srt_logfa_description() // Each group on a new line stringstream ss; - ss << "SRT log functional areas: ["; + ss << "SRT log functional areas: \n["; int en10 = 0; for (auto entry : revmap) { @@ -159,15 +159,17 @@ int main(int argc, char** argv) }); string src, dst; + vector src_urls; + vector dst_urls; generate::config cfg_generate; - CLI::App* sc_generate = generate::add_subcommand(app, cfg_generate, dst); + CLI::App* sc_generate = generate::add_subcommand(app, cfg_generate, dst_urls); xtransmit::receive::config cfg_receive; - CLI::App* sc_receive = receive::add_subcommand(app, cfg_receive, src); + CLI::App* sc_receive = receive::add_subcommand(app, cfg_receive, src_urls); xtransmit::route::config cfg_route; - CLI::App* sc_route = route::add_subcommand(app, cfg_route, src, dst); + CLI::App* sc_route = route::add_subcommand(app, cfg_route, src, dst); #if ENABLE_FILE_TRANSFER CLI::App* sc_file = app.add_subcommand("file", "Send/receive a single file or folder contents")->fallthrough(); @@ -189,12 +191,20 @@ int main(int argc, char** argv) // https://cliutils.gitlab.io/CLI11Tutorial/chapters/an-advanced-example.html if (sc_generate->parsed()) { - generate::run(dst, cfg_generate, force_break); + for (const auto url : dst_urls) + { + spdlog::info("DST URL: {}", url); + } + generate::run(dst_urls, cfg_generate, force_break); return 0; } else if (sc_receive->parsed()) { - xtransmit::receive::run(src, cfg_receive, force_break); + for (const auto url : src_urls) + { + spdlog::info("SRC URL: {}", url); + } + xtransmit::receive::run(src_urls, cfg_receive, force_break); return 0; } else if (sc_route->parsed())