diff --git a/c++-virtan/.gitignore b/c++-virtan/.gitignore new file mode 100644 index 0000000..f1c6a7c --- /dev/null +++ b/c++-virtan/.gitignore @@ -0,0 +1,2 @@ +/build/ +/.idea/ \ No newline at end of file diff --git a/c++-virtan/CMakeLists.txt b/c++-virtan/CMakeLists.txt new file mode 100644 index 0000000..c2991e8 --- /dev/null +++ b/c++-virtan/CMakeLists.txt @@ -0,0 +1,40 @@ +cmake_minimum_required(VERSION 3.0) +project(server) + +set(Boost_USE_STATIC_LIBS ON) +set(Boost_USE_MULTITHREADED ON) +find_package(Boost REQUIRED COMPONENTS system date_time chrono regex program_options) +find_package(Threads REQUIRED) + +set(libraries ) +list(APPEND libraries ${Boost_LIBRARIES}) +list(APPEND libraries ${Boost_LIBRARIES} Threads::Threads) +if(WIN32) + list(APPEND libraries "ws2_32" "mswsock") +endif() + +if(NOT Boost_USE_STATIC_LIBS) + list(APPEND compile_definitions "BOOST_ALL_DYN_LINK") +endif() +if(WIN32) + list(APPEND compile_definitions + WIN32 + WIN32_LEAN_AND_MEAN + WINVER=0x0501 + _WIN32_WINNT=0x0501 + _WIN32_WINDOWS=0x0501 + _WIN32_IE=0x0600 + _UNICODE + UNICODE + _WINSOCK_DEPRECATED_NO_WARNINGS) +endif() + +if(MINGW) + set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -municode") +endif() + +add_executable(server "server.cc") +target_compile_features(server PRIVATE cxx_std_11 cxx_auto_type) +target_compile_definitions(server PRIVATE ${compile_definitions}) +target_include_directories(server PRIVATE ${Boost_INCLUDE_DIRS}) +target_link_libraries(server PRIVATE ${libraries}) diff --git a/c++-virtan/Dockerfile b/c++-virtan/Dockerfile new file mode 100644 index 0000000..e10e790 --- /dev/null +++ b/c++-virtan/Dockerfile @@ -0,0 +1,60 @@ +FROM ubuntu:20.04 AS build + +RUN apt-get update && \ + apt-get install -y --no-install-suggests --no-install-recommends \ + ca-certificates \ + curl \ + git \ + g++ \ + make \ + libstdc++-9-dev && \ + rm -rf /var/lib/apt/lists/* + +ENV CMAKE_HOME="/opt/cmake" + +ARG CMAKE_VERSION="3.18.4" + +RUN mkdir -p "${CMAKE_HOME}" && \ + cmake_url="https://github.com/Kitware/CMake/releases/download/v${CMAKE_VERSION}/cmake-${CMAKE_VERSION}-Linux-x86_64.tar.gz" && \ + echo "Downloading CMake ${CMAKE_VERSION} from ${cmake_url} to ${CMAKE_HOME}" && \ + curl -jksSL "${cmake_url}" | tar -xzf - -C "${CMAKE_HOME}" --strip-components 1 + +ENV PATH="${CMAKE_HOME}/bin:${PATH}" \ + BOOST_HOME="/usr/lib/boost" + +ARG BOOST_VERSION="1.74.0" +ARG BOOST_URL="https://dl.bintray.com/mabrarov/generic/boost" + +RUN mkdir -p "${BOOST_HOME}" && \ + boost_archive_url="${BOOST_URL}/${BOOST_VERSION}/boost-${BOOST_VERSION}-x64-gcc$(gcc -dumpversion | sed -r 's/^([[:digit:]]+)(\..*)?$/\1/;t;d').tar.gz" && \ + echo "Downloading Boost from ${boost_archive_url} to ${BOOST_HOME}" && \ + curl --connect-timeout 300 --max-time 1800 --retry 10 --retry-delay 10 \ + -jksSL "${boost_archive_url}" \ + | tar -xz -C "${BOOST_HOME}" --strip-components 1 + +ADD ["CMakeLists.txt", "server.cc", "/usr/src/cpp-virtan/"] + +RUN source_dir="/usr/src/cpp-virtan" && \ + build_dir="${source_dir}/build" && \ + mkdir -p "${build_dir}" && \ + cmake \ + -D CMAKE_SKIP_BUILD_RPATH=ON \ + -D CMAKE_BUILD_TYPE=Release \ + -D Boost_USE_MULTITHREADED=ON \ + -D Boost_USE_STATIC_LIBS=ON \ + -D Boost_NO_SYSTEM_PATHS=ON \ + -D BOOST_INCLUDEDIR="${BOOST_HOME}/include" \ + -D BOOST_LIBRARYDIR="${BOOST_HOME}/lib" \ + -S "${source_dir}" \ + -B "${build_dir}" && \ + cmake --build "${build_dir}" + +FROM gcr.io/distroless/cc-debian10 + +COPY --from=build ["/usr/src/cpp-virtan/build/server", "/opt/cpp-virtan/server"] + +ENTRYPOINT ["/opt/cpp-virtan/server"] + +CMD ["--help"] + +LABEL name="cpp-virtan" diff --git a/c++-virtan/Makefile b/c++-virtan/Makefile deleted file mode 100644 index b247aef..0000000 --- a/c++-virtan/Makefile +++ /dev/null @@ -1,16 +0,0 @@ -CXXFLAGS = -Os -march=native -mtune=native -std=c++0x -Wall -DEBUG_CXXFLAGS = -g -O0 -CPPFLAGS = -I/opt/local/include -LDFLAGS = -L/opt/local/lib -lboost_thread-mt -lboost_system-mt -pthread -SERVER = server - -default: $(SERVER) - -$(SERVER): $(addsuffix .cc, $(SERVER)) - -debug: - @CXXFLAGS="$(CXXFLAGS) $(DEBUG_CXXFLAGS)" make -e $(SERVER) - -.PHONY: clean -clean: - @rm -rf server server.dSYM diff --git a/c++-virtan/README.md b/c++-virtan/README.md new file mode 100644 index 0000000..92bd061 --- /dev/null +++ b/c++-virtan/README.md @@ -0,0 +1,40 @@ +# Building + +## Requirements + +1. C++ toolchain (compiler, linker, standard C++ library) +1. Make +1. [CMake](https://cmake.org/) +1. [Boost](https://www.boost.org/) + +## Steps to build + +Assuming current directory is directory where this file is located + +```bash +cmake -D CMAKE_SKIP_BUILD_RPATH=ON -D CMAKE_BUILD_TYPE=Release . +cmake --build . +``` + +# Docker image + +## Building + +### Requirements + +1. Docker 17.06+ for building +1. Current directory is directory where this repository is cloned + +### Steps to build + +```bash +docker build -t cpp-virtan c++-virtan +``` + +## Running with Docker + +```bash +docker run --rm cpp-virtan $params +``` + +where `$params` are parameters passed to application diff --git a/c++-virtan/server.cc b/c++-virtan/server.cc index ebb2bc8..88a1386 100644 --- a/c++-virtan/server.cc +++ b/c++-virtan/server.cc @@ -1,79 +1,371 @@ -#include +#if defined(WIN32) +#include +#endif + +#include +#include +#include +#include #include -#include -#include -#include +#include +#include +#include +#include +#include +#include +#include #include +#include +#include + +namespace asio_helpers { + +template +void* allocate(std::size_t size, Context& context) { + using namespace boost::asio; + return asio_handler_allocate(size, std::addressof(context)); +} + +template +void deallocate(void* pointer, std::size_t size, Context& context) { + using namespace boost::asio; + asio_handler_deallocate(pointer, size, std::addressof(context)); +} + +template +void invoke(Function&& function, Context& context) { + using namespace boost::asio; + asio_handler_invoke(std::forward(function), std::addressof(context)); +} + +#if BOOST_VERSION >= 105400 + +template +bool is_continuation(Context& context) { + using namespace boost::asio; + return asio_handler_is_continuation(std::addressof(context)); +} + +#else + +template +bool is_continuation(Context& /*context*/) { + return false; +} + +#endif -using namespace std; -using namespace boost; -using namespace boost::asio; -using namespace boost::asio::ip; +} // namespace asio_helpers -struct connection { - connection(io_service &s) : - sock(s) {} +namespace { - void start() { - //sock.set_option(tcp::no_delay(true)); - sock.async_read_some(buffer(data, max_length), - bind(&connection::read, this, asio::placeholders::error, asio::placeholders::bytes_transferred)); +template +class handler_allocator { +public: + handler_allocator() : in_use_(false) {} + ~handler_allocator() = default; + handler_allocator(const handler_allocator&) = delete; + handler_allocator& operator=(const handler_allocator&) = delete; + + bool owns(void* p) const { + return std::addressof(storage_) == p; + } + + void* allocate(std::size_t size) { + if (in_use_ || size > alloc_size) { + return nullptr; + } + in_use_ = true; + return std::addressof(storage_); + } + + void deallocate(void* p) { + if (p) { + in_use_ = false; } + } + +private: + typename std::aligned_storage::type storage_; + bool in_use_; +}; + +template +class custom_alloc_handler { +private: + typedef custom_alloc_handler this_type; + +public: + template + custom_alloc_handler(Allocator& allocator, H&& handler): + allocator_(std::addressof(allocator)), handler_(std::forward(handler)) {} - void read(const system::error_code &e, size_t bytes_transferred) { - if(!e) { async_write(sock, buffer(data, bytes_transferred), - bind(&connection::write, this, asio::placeholders::error)); } - else delete this; + friend void* asio_handler_allocate(std::size_t size, this_type* context) { + if (void* p = context->allocator_->allocate(size)) { + return p; } + return asio_helpers::allocate(size, context->handler_); + } - void write(const system::error_code &e) { - if(!e) start(); - else delete this; + friend void asio_handler_deallocate(void* pointer, std::size_t size, this_type* context) { + if (context->allocator_->owns(pointer)) { + context->allocator_->deallocate(pointer); + } else { + asio_helpers::deallocate(pointer, size, context->handler_); } + } - tcp::socket sock; - enum { max_length = 4096 }; - char data[max_length]; + template + friend void asio_handler_invoke(Function&& function, this_type* context) { + asio_helpers::invoke(std::forward(function), context->handler_); + } + + template + friend void asio_handler_invoke(Function& function, this_type* context) { + asio_helpers::invoke(function, context->handler_); + } + + template + friend void asio_handler_invoke(const Function& function, this_type* context) { + asio_helpers::invoke(function, context->handler_); + } + + friend bool asio_handler_is_continuation(this_type* context) { + return asio_helpers::is_continuation(context->handler_); + } + + template + void operator()(Arg&&... arg) { + handler_(std::forward(arg)...); + } + + template + void operator()(Arg&&... arg) const { + handler_(std::forward(arg)...); + } + +private: + Allocator* allocator_; + Handler handler_; }; -struct acceptor { - acceptor(int native_sock) : - a(s, tcp::v4(), native_sock) { start_accept(); } +template +custom_alloc_handler::type> +make_custom_alloc_handler(Allocator& allocator, Handler&& handler) { + typedef typename std::decay::type handler_type; + return custom_alloc_handler(allocator, std::forward(handler)); +} + +template +struct shared_ptr_factory_helper : T { - void start_accept() { - connection *c = new connection(s); - a.async_accept(c->sock, bind(&acceptor::accept, this, c, boost::asio::placeholders::error)); + template + explicit shared_ptr_factory_helper(Arg&&... arg) : T(std::forward(arg)...) {} + +}; + +class connection : public std::enable_shared_from_this { +public: + static std::shared_ptr create(boost::asio::io_service& service) { + return std::make_shared>(service); + } + + connection(const connection&) = delete; + connection& operator=(const connection&) = delete; + + void start() { + socket_.async_read_some(boost::asio::buffer(data_), + make_custom_alloc_handler(allocator_, std::bind(&connection::read, shared_from_this(), + std::placeholders::_1, std::placeholders::_2))); + } + + boost::asio::ip::tcp::socket& socket() { + return socket_; + } + +protected: + explicit connection(boost::asio::io_service& service) : socket_(service) {} + ~connection() = default; + +private: + void read(const boost::system::error_code& e, std::size_t bytes_transferred) { + if (!e) { + boost::asio::async_write(socket_, boost::asio::buffer(data_, bytes_transferred), + make_custom_alloc_handler(allocator_, std::bind(&connection::write, shared_from_this(), + std::placeholders::_1))); } + } + + void write(const boost::system::error_code& e) { + if (!e) { + start(); + } + } + + handler_allocator<128> allocator_; + std::array data_; + boost::asio::ip::tcp::socket socket_; +}; + +class acceptor : public std::enable_shared_from_this { +public: + static std::shared_ptr create(boost::asio::io_service& service, + const boost::asio::ip::tcp::acceptor::native_handle_type& native_acceptor) { + return std::make_shared>(service, native_acceptor); + } + + acceptor(const acceptor&) = delete; + acceptor& operator=(const acceptor&) = delete; + + void start() { + auto c = connection::create(service_); + acceptor_.async_accept(c->socket(), make_custom_alloc_handler(allocator_, + std::bind(&acceptor::accept, shared_from_this(), c, std::placeholders::_1))); + } + +protected: + acceptor(boost::asio::io_service& service, + const boost::asio::ip::tcp::acceptor::native_handle_type& native_acceptor) : + service_(service), acceptor_(service_, boost::asio::ip::tcp::v4(), native_acceptor) {} + ~acceptor() = default; - void accept(connection *c, const system::error_code &e) { - if(!e) c->start(); - else delete c; - start_accept(); +private: + void accept(const std::shared_ptr& c, const boost::system::error_code& e) { + if (!e) { + c->start(); } + start(); + } - io_service s; - tcp::acceptor a; + boost::asio::io_service& service_; + boost::asio::ip::tcp::acceptor acceptor_; + handler_allocator<256> allocator_; }; -void servicing(unsigned short port) { - acceptor accpt(port); - //io_service::work worker(accpt.s); - accpt.s.run(); +#if BOOST_VERSION >= 106600 + +typedef int io_context_concurrency_hint; + +io_context_concurrency_hint to_io_context_concurrency_hint(std::size_t hint) { + return 1 == hint ? BOOST_ASIO_CONCURRENCY_HINT_UNSAFE_IO + : boost::numeric_cast(hint); +} + +#else + +typedef std::size_t io_context_concurrency_hint; + +io_context_concurrency_hint to_io_context_concurrency_hint(std::size_t hint) { + return hint; +} + +#endif + +const char* help_option_name = "help"; +const char* host_option_name = "host"; +const char* port_option_name = "port"; +const char* threads_option_name = "threads"; + +boost::program_options::options_description build_program_options_description() { + boost::program_options::options_description description("Usage"); + description.add_options() + ( + help_option_name, + "produce help message" + ) + ( + host_option_name, + boost::program_options::value()->default_value( + boost::asio::ip::address_v4::any().to_string()), + "address to listen" + ) + ( + port_option_name, + boost::program_options::value(), + "port to listen" + ) + ( + threads_option_name, + boost::program_options::value()->default_value(24), + "number of threads" + ); + return std::move(description); +} + +#if defined(WIN32) +boost::program_options::variables_map parse_program_options( + const boost::program_options::options_description& options_description, + int argc, _TCHAR* argv[]) { +#else +boost::program_options::variables_map parse_program_options( + const boost::program_options::options_description& options_description, + int argc, char* argv[]) { +#endif + boost::program_options::variables_map values; + boost::program_options::store( + boost::program_options::parse_command_line(argc, argv, options_description), + values); + boost::program_options::notify(values); + return std::move(values); } -int main(int args, char **argv) { - if(args < 2) { - cout << "Usage: " << argv[0] << " [threads = 24]" << endl; - return 1; +} // anonymous namespace + +#if defined(WIN32) +int _tmain(int argc, _TCHAR** argv) { +#else +int main(int argc, char* argv[]) { +#endif + try { + auto po_description = build_program_options_description(); + auto po_values = parse_program_options(po_description, argc, argv); + if (po_values.count(help_option_name)) { + std::cout << po_description; + return EXIT_SUCCESS; + } + if (!po_values.count(port_option_name)) { + std::cerr << "port is required\n" << po_description; + return EXIT_FAILURE; + } + auto host = po_values[host_option_name].as(); + auto port = po_values[port_option_name].as(); + auto thread_num = po_values[threads_option_name].as(); + boost::asio::io_service io_service; + boost::asio::ip::tcp::acceptor fake_a(io_service, + boost::asio::ip::tcp::endpoint( + boost::asio::ip::address::from_string(host), + port)); + auto native_handle = fake_a.native_handle(); + std::vector> io_services; + io_services.reserve(thread_num); + std::vector threads; + threads.reserve(thread_num); + for (std::size_t i = 0; i < thread_num; ++i) { + io_services.push_back(std::make_unique( + to_io_context_concurrency_hint(1))); + auto& service = **io_services.rbegin(); + threads.emplace_back([native_handle, &service]() { + acceptor::create(service, native_handle)->start(); + service.run(); + }); + } + boost::asio::signal_set signal_set(io_service, SIGINT, SIGTERM); + signal_set.async_wait([&io_services](const boost::system::error_code& /*error*/, int /*signal*/) { + for (const auto& service : io_services) { + service->stop(); + } + }); + io_service.run(); + for (auto& thread : threads) { + thread.join(); } - unsigned short port = atoi(argv[1]); - size_t threads = args > 2 ? atoi(argv[2]) : 24; - thread *vthreads = new thread[threads]; - io_service fake_s; - tcp::acceptor fake_a(fake_s, tcp::endpoint(tcp::v4(), port)); - //int defer_accept = 1; - //setsockopt(fake_a.native_handle(), IPPROTO_TCP, TCP_DEFER_ACCEPT, &defer_accept, sizeof(defer_accept)); - for(size_t i = 0; i < threads; ++i) vthreads[i] = thread(servicing, fake_a.native_handle()); - sleep((unsigned int) 0 - 1); - return 0; + return EXIT_SUCCESS; + } catch (const boost::program_options::error& e) { + std::cerr << "Error reading options: " << e.what() << std::endl; + } catch (const std::exception& e) { + std::cerr << "Unexpected error: " << e.what() << std::endl; + } catch (...) { + std::cerr << "Unknown error" << std::endl; + } + return EXIT_FAILURE; } diff --git a/client-c++-virtan/.gitignore b/client-c++-virtan/.gitignore new file mode 100644 index 0000000..f1c6a7c --- /dev/null +++ b/client-c++-virtan/.gitignore @@ -0,0 +1,2 @@ +/build/ +/.idea/ \ No newline at end of file diff --git a/client-c++-virtan/CMakeLists.txt b/client-c++-virtan/CMakeLists.txt new file mode 100644 index 0000000..eedf5e9 --- /dev/null +++ b/client-c++-virtan/CMakeLists.txt @@ -0,0 +1,40 @@ +cmake_minimum_required(VERSION 3.0) +project(client) + +set(Boost_USE_STATIC_LIBS ON) +set(Boost_USE_MULTITHREADED ON) +find_package(Boost REQUIRED COMPONENTS system date_time chrono regex program_options) +find_package(Threads REQUIRED) + +set(libraries ) +list(APPEND libraries ${Boost_LIBRARIES}) +list(APPEND libraries ${Boost_LIBRARIES} Threads::Threads) +if(WIN32) + list(APPEND libraries "ws2_32" "mswsock") +endif() + +if(NOT Boost_USE_STATIC_LIBS) + list(APPEND compile_definitions "BOOST_ALL_DYN_LINK") +endif() +if(WIN32) + list(APPEND compile_definitions + WIN32 + WIN32_LEAN_AND_MEAN + WINVER=0x0501 + _WIN32_WINNT=0x0501 + _WIN32_WINDOWS=0x0501 + _WIN32_IE=0x0600 + _UNICODE + UNICODE + _WINSOCK_DEPRECATED_NO_WARNINGS) +endif() + +if(MINGW) + set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -municode") +endif() + +add_executable(client "client.cc") +target_compile_features(client PRIVATE cxx_std_11 cxx_auto_type) +target_compile_definitions(client PRIVATE ${compile_definitions}) +target_include_directories(client PRIVATE ${Boost_INCLUDE_DIRS}) +target_link_libraries(client PRIVATE ${libraries}) diff --git a/client-c++-virtan/Dockerfile b/client-c++-virtan/Dockerfile new file mode 100644 index 0000000..98be139 --- /dev/null +++ b/client-c++-virtan/Dockerfile @@ -0,0 +1,60 @@ +FROM ubuntu:20.04 AS build + +RUN apt-get update && \ + apt-get install -y --no-install-suggests --no-install-recommends \ + ca-certificates \ + curl \ + git \ + g++ \ + make \ + libstdc++-9-dev && \ + rm -rf /var/lib/apt/lists/* + +ENV CMAKE_HOME="/opt/cmake" + +ARG CMAKE_VERSION="3.18.4" + +RUN mkdir -p "${CMAKE_HOME}" && \ + cmake_url="https://github.com/Kitware/CMake/releases/download/v${CMAKE_VERSION}/cmake-${CMAKE_VERSION}-Linux-x86_64.tar.gz" && \ + echo "Downloading CMake ${CMAKE_VERSION} from ${cmake_url} to ${CMAKE_HOME}" && \ + curl -jksSL "${cmake_url}" | tar -xzf - -C "${CMAKE_HOME}" --strip-components 1 + +ENV PATH="${CMAKE_HOME}/bin:${PATH}" \ + BOOST_HOME="/usr/lib/boost" + +ARG BOOST_VERSION="1.74.0" +ARG BOOST_URL="https://dl.bintray.com/mabrarov/generic/boost" + +RUN mkdir -p "${BOOST_HOME}" && \ + boost_archive_url="${BOOST_URL}/${BOOST_VERSION}/boost-${BOOST_VERSION}-x64-gcc$(gcc -dumpversion | sed -r 's/^([[:digit:]]+)(\..*)?$/\1/;t;d').tar.gz" && \ + echo "Downloading Boost from ${boost_archive_url} to ${BOOST_HOME}" && \ + curl --connect-timeout 300 --max-time 1800 --retry 10 --retry-delay 10 \ + -jksSL "${boost_archive_url}" \ + | tar -xz -C "${BOOST_HOME}" --strip-components 1 + +ADD ["CMakeLists.txt", "client.cc", "/usr/src/client-cpp-virtan/"] + +RUN source_dir="/usr/src/client-cpp-virtan" && \ + build_dir="${source_dir}/build" && \ + mkdir -p "${build_dir}" && \ + cmake \ + -D CMAKE_SKIP_BUILD_RPATH=ON \ + -D CMAKE_BUILD_TYPE=Release \ + -D Boost_USE_MULTITHREADED=ON \ + -D Boost_USE_STATIC_LIBS=ON \ + -D Boost_NO_SYSTEM_PATHS=ON \ + -D BOOST_INCLUDEDIR="${BOOST_HOME}/include" \ + -D BOOST_LIBRARYDIR="${BOOST_HOME}/lib" \ + -S "${source_dir}" \ + -B "${build_dir}" && \ + cmake --build "${build_dir}" + +FROM gcr.io/distroless/cc-debian10 + +COPY --from=build ["/usr/src/client-cpp-virtan/build/client", "/opt/client-cpp-virtan/client"] + +ENTRYPOINT ["/opt/client-cpp-virtan/client"] + +CMD ["--help"] + +LABEL name="client-cpp-virtan" diff --git a/client-c++-virtan/Makefile b/client-c++-virtan/Makefile deleted file mode 100644 index d27f9dc..0000000 --- a/client-c++-virtan/Makefile +++ /dev/null @@ -1,16 +0,0 @@ -CXXFLAGS = -Os -march=native -mtune=native -std=c++0x -Wall -DEBUG_CXXFLAGS = -g -O0 -CPPFLAGS = -I/opt/local/include -LDFLAGS = -L/opt/local/lib -lboost_thread-mt -lboost_system-mt -pthread -CLIENT = client - -default: $(CLIENT) - -$(CLIENT): $(addsuffix .cc, $(CLIENT)) - -debug: - @CXXFLAGS="$(CXXFLAGS) $(DEBUG_CXXFLAGS)" make -e $(CLIENT) - -.PHONY: clean -clean: - @rm -rf client client.dSYM diff --git a/client-c++-virtan/README.md b/client-c++-virtan/README.md new file mode 100644 index 0000000..164207e --- /dev/null +++ b/client-c++-virtan/README.md @@ -0,0 +1,40 @@ +# Building + +## Requirements + +1. C++ toolchain (compiler, linker, standard C++ library) +1. Make +1. [CMake](https://cmake.org/) +1. [Boost](https://www.boost.org/) + +## Steps to build + +Assuming current directory is directory where this file is located + +```bash +cmake -D CMAKE_SKIP_BUILD_RPATH=ON -D CMAKE_BUILD_TYPE=Release . +cmake --build . +``` + +# Docker image + +## Building + +### Requirements + +1. Docker 17.06+ for building +1. Current directory is directory where this repository is cloned + +### Steps to build + +```bash +docker build -t client-cpp-virtan client-c++-virtan +``` + +## Running with Docker + +```bash +docker run --rm client-cpp-virtan $params +``` + +where `$params` are parameters passed to application diff --git a/client-c++-virtan/client.cc b/client-c++-virtan/client.cc index 1bc3fe9..23a1667 100644 --- a/client-c++-virtan/client.cc +++ b/client-c++-virtan/client.cc @@ -1,212 +1,603 @@ -#include -#include +#if defined(WIN32) +#include +#endif + +#include +#include +#include +#include #include +#include #include -#include +#include +#include +#include +#include +#include +#include +#include +#include #include +#include +#include -using namespace std; -using namespace boost; -using namespace boost::asio; -using namespace boost::asio::ip; - -const size_t clients_max = 10000; -const size_t bear_after_mcs = 1000; -const size_t send_each_mcs = 10000; -const size_t client_timeout_mcs = 30*1000*1000; - -size_t threads_; -thread *vthreads; -io_service *services; -size_t services_i = 0; -tcp::resolver::iterator connect_to; - -volatile size_t children = 0; -volatile size_t connection_active = 0; -volatile size_t connection_errors = 0; -volatile size_t exchange_errors = 0; -volatile size_t connection_summ_mcs = 0; -volatile size_t latency_summ_mcs = 0; -volatile size_t msgs = 0; +namespace asio_helpers { -class timer { - public: - timer() { reset(); } +template +void* allocate(std::size_t size, Context& context) { + using namespace boost::asio; + return asio_handler_allocate(size, std::addressof(context)); +} + +template +void deallocate(void* pointer, std::size_t size, Context& context) { + using namespace boost::asio; + asio_handler_deallocate(pointer, size, std::addressof(context)); +} + +template +void invoke(Function&& function, Context& context) { + using namespace boost::asio; + asio_handler_invoke(std::forward(function), std::addressof(context)); +} + +#if BOOST_VERSION >= 105400 + +template +bool is_continuation(Context& context) { + using namespace boost::asio; + return asio_handler_is_continuation(std::addressof(context)); +} - void start() { gettimeofday(&start_time, NULL); } +#else - size_t current() { - struct timeval now; - gettimeofday(&now, NULL); - size_t diff = (((size_t) now.tv_sec) * 1000000) + now.tv_usec - (((size_t) start_time.tv_sec) * 1000000) - start_time.tv_usec; - return diff; - } +template +bool is_continuation(Context& /*context*/) { + return false; +} + +#endif + +} // namespace asio_helpers + +namespace { + +template +class handler_allocator { +public: + handler_allocator() : in_use_(false) {} + ~handler_allocator() = default; + handler_allocator(const handler_allocator&) = delete; + handler_allocator& operator=(const handler_allocator&) = delete; + + bool owns(void* p) const { + return std::addressof(storage_) == p; + } + + void* allocate(std::size_t size) { + if (in_use_ || size > alloc_size) { + return nullptr; + } + in_use_ = true; + return std::addressof(storage_); + } - void reset() { - start(); - } + void deallocate(void* p) { + if (p) { + in_use_ = false; + } + } - private: - struct timeval start_time; +private: + typename std::aligned_storage::type storage_; + bool in_use_; }; -void servicing(size_t i) { - io_service::work worker(services[i]); - services[i].run(); +template +class custom_alloc_handler { +private: + typedef custom_alloc_handler this_type; + +public: + template + custom_alloc_handler(Allocator& allocator, H&& handler): + allocator_(std::addressof(allocator)), handler_(std::forward(handler)) {} + + friend void* asio_handler_allocate(std::size_t size, this_type* context) { + if (void* p = context->allocator_->allocate(size)) { + return p; + } + return asio_helpers::allocate(size, context->handler_); + } + + friend void asio_handler_deallocate(void* pointer, std::size_t size, this_type* context) { + if (context->allocator_->owns(pointer)) { + context->allocator_->deallocate(pointer); + } else { + asio_helpers::deallocate(pointer, size, context->handler_); + } + } + + template + friend void asio_handler_invoke(Function&& function, this_type* context) { + asio_helpers::invoke(std::forward(function), context->handler_); + } + + template + friend void asio_handler_invoke(Function& function, this_type* context) { + asio_helpers::invoke(function, context->handler_); + } + + template + friend void asio_handler_invoke(const Function& function, this_type* context) { + asio_helpers::invoke(function, context->handler_); + } + + friend bool asio_handler_is_continuation(this_type* context) { + return asio_helpers::is_continuation(context->handler_); + } + + template + void operator()(Arg&&... arg) { + handler_(std::forward(arg)...); + } + + template + void operator()(Arg&&... arg) const { + handler_(std::forward(arg)...); + } + +private: + Allocator* allocator_; + Handler handler_; +}; + +template +custom_alloc_handler::type> +make_custom_alloc_handler(Allocator& allocator, Handler&& handler) { + typedef typename std::decay::type handler_type; + return custom_alloc_handler(allocator, std::forward(handler)); } -struct connection_handler { - connection_handler() : - service_id(__sync_fetch_and_add(&services_i, 1)), - my_service(services[service_id % threads_]), - s(my_service), - bear_tmr(my_service, posix_time::microseconds(bear_after_mcs)), - exch_tmr(my_service), - errored(false) - { - if(service_id >= clients_max) return; - bear_tmr.async_wait(bind(&connection_handler::new_connection_handler, this, asio::placeholders::error)); - connect_timer.reset(); - __sync_fetch_and_add(&connection_active, 1); - async_connect(s, connect_to, bind(&connection_handler::connect, this, asio::placeholders::error)); - } - - void new_connection_handler(const system::error_code &e) { - // cout << "new connection\n"; - assert(!e); - new connection_handler; - } - - void connect(const system::error_code &e) { - // cout << "connect_staff\n"; - __sync_fetch_and_add(&children, 1); - __sync_fetch_and_sub(&connection_active, 1); - if(e) { - errored = true; - __sync_fetch_and_add(&connection_errors, 1); - return; - } - size_t spent_mcs = connect_timer.current(); - if(spent_mcs > client_timeout_mcs) { - errored = true; - __sync_fetch_and_add(&connection_errors, 1); - return; - } - __sync_fetch_and_add(&connection_summ_mcs, spent_mcs); - memset(send_buf, ' ', 32); - snprintf(send_buf, 32, "%d", rand()); - send_buf[31] = '\n'; - send_buf[32] = 0; - s.set_option(tcp::no_delay(true)); - //typedef boost::asio::detail::socket_option::integer snd_buf; - //typedef boost::asio::detail::socket_option::integer rcv_buf; - //s.set_option(snd_buf(18350)); - //s.set_option(rcv_buf(18350)); - read_staff(system::error_code()); - send_staff(system::error_code()); - } - - void exchange(const system::error_code &e) { - // cout << "exchange_staff\n"; - assert(!e); - } - - void send_staff(const system::error_code &e) { - // cout << "send_staff\n"; - assert(!e); - if(errored) return; - exch_tmr.expires_from_now(posix_time::microseconds(rand() % (2 * send_each_mcs))); - exch_tmr.async_wait(bind(&connection_handler::send_staff, this, asio::placeholders::error)); - tqueue.push(timer()); - async_write(s, mutable_buffers_1(send_buf, 32), bind(&connection_handler::stub, this, asio::placeholders::error)); - } - - void stub(const system::error_code &e) { - // cout << "stub\n"; - if(errored) return; - if(e) { - // cerr << "send error\n"; - errored = true; - __sync_fetch_and_add(&exchange_errors, 1); - return; - } - } - - void read_staff(const system::error_code &e) { - // cout << "read_staff\n"; - if(errored) return; - if(e) { - errored = true; - __sync_fetch_and_add(&exchange_errors, 1); - return; - } - memset(read_buf, 0, 33); - async_read(s, mutable_buffers_1(read_buf, 32), bind(&connection_handler::compare_staff, this, asio::placeholders::error, asio::placeholders::bytes_transferred)); - } - - void compare_staff(const system::error_code &e, size_t transferred) { - // cout << "compare_staff\n"; - if(errored) return; - if(e || transferred != 32 || strncmp(send_buf, read_buf, 32)) { - //if(strncmp(send_buf, read_buf, 32)) cerr << transferred << " \"" << send_buf << "\" != \"" << read_buf << "\"" << endl; - errored = true; - __sync_fetch_and_add(&exchange_errors, 1); - return; - } - size_t spent_mcs = tqueue.front().current(); - tqueue.pop(); - if(spent_mcs > client_timeout_mcs) { - errored = true; - __sync_fetch_and_add(&exchange_errors, 1); - return; - } - __sync_fetch_and_add(&msgs, 1); - __sync_fetch_and_add(&latency_summ_mcs, spent_mcs); - read_staff(e); - } - - size_t service_id; - io_service &my_service; - tcp::socket s; - deadline_timer bear_tmr; - deadline_timer exch_tmr; - char send_buf[33]; - char read_buf[33]; - timer connect_timer; - timer exchange_timer; - bool errored; - queue tqueue; +const std::size_t clients_max = 10000; +const std::size_t bear_after_mcs = 1000; +const std::size_t send_each_mcs = 10000; +const std::size_t client_timeout_mcs = 30 * 1000 * 1000; +const std::size_t buffer_size = 32; + +std::atomic_bool stopped(false); +std::atomic_size_t connection_index(0); +std::atomic_size_t child_num(0); +std::atomic_size_t active_connection_num(0); +std::atomic_size_t connection_error_num(0); +std::atomic_size_t exchange_error_num(0); +std::atomic_size_t connection_sum_mcs(0); +std::atomic_size_t latency_sum_mcs(0); +std::atomic_size_t msg_num(0); + +class timer { +public: + timer() { + reset(); + } + + void start() { + start_time_ = std::chrono::steady_clock::now(); + } + + std::size_t current() const { + auto now = std::chrono::steady_clock::now(); + return boost::numeric_cast( + std::chrono::duration_cast( + now - start_time_).count()); + } + + void reset() { + start(); + } + +private: + std::chrono::steady_clock::time_point start_time_; +}; + +struct connection_data_storage { + connection_data_storage() = default; + ~connection_data_storage() = default; + connection_data_storage(const connection_data_storage&) = delete; + connection_data_storage& operator=(const connection_data_storage&) = delete; + + handler_allocator<256> bear_timer_allocator; + handler_allocator<256> write_allocator; + handler_allocator<256> read_allocator; + handler_allocator<256> exchange_timer_allocator; + std::array write_buf; + std::array read_buf; +}; + +class connection; +typedef std::vector> connection_vector; + +class connection { +public: + connection(boost::asio::io_service& service, + connection_data_storage& connection_data, + const std::mt19937& rand_engine) : + my_service_(service), + socket_(my_service_), + bear_timer_(my_service_), + exchange_timer_(my_service_), + errored_(false), + write_in_progress_(false), + start_write_when_write_completes_(false), + exchange_timer_wait_in_progress_(false), + start_exchange_timer_wait_when_wait_completes_(false), + rand_engine_(rand_engine), + data_rand_(0, RAND_MAX), + exchange_rand_(0, send_each_mcs), + storage_(connection_data) { + std::fill(storage_.write_buf.begin(), storage_.write_buf.end() - 1, data_rand_(rand_engine_)); + *(storage_.write_buf.rbegin()) = '\n'; + timer_queue_.reserve(16); + } + + void async_start(const boost::asio::ip::tcp::resolver::iterator& connect_to, + const connection_vector& connections) { + my_service_.post(make_custom_alloc_handler(storage_.bear_timer_allocator, + std::bind(&connection::start, this, connect_to, std::cref(connections)))); + } + +private: + void start(const boost::asio::ip::tcp::resolver::iterator& connect_to, + const connection_vector& connections) { + bear_timer_.expires_from_now(boost::posix_time::microseconds(bear_after_mcs)); + bear_timer_.async_wait(make_custom_alloc_handler(storage_.bear_timer_allocator, + std::bind(&connection::handle_bear_timer, this, connect_to, + std::cref(connections), std::placeholders::_1))); + connect_timer_.reset(); + ++active_connection_num; + boost::asio::async_connect(socket_, connect_to, make_custom_alloc_handler( + storage_.write_allocator, std::bind(&connection::handle_connect, + this, std::placeholders::_1))); + } + + void handle_bear_timer(const boost::asio::ip::tcp::resolver::iterator& connect_to, + const connection_vector& connections, const boost::system::error_code& e) { + if (stopped) { + return; + } + if (e) { + return; + } + if (connection_index >= clients_max) { + return; + } + auto new_connection_index = connection_index.fetch_add(1); + if (new_connection_index >= clients_max) { + return; + } + auto& new_connection = *connections[new_connection_index]; + new_connection.async_start(connect_to, connections); + } + + void handle_connect(const boost::system::error_code& e) { + if (stopped) { + return; + } + ++child_num; + --active_connection_num; + if (e) { + errored_ = true; + ++connection_error_num; + stop_operations(); + return; + } + auto spent_mcs = connect_timer_.current(); + if (spent_mcs > client_timeout_mcs) { + errored_ = true; + ++connection_error_num; + stop_operations(); + return; + } + connection_sum_mcs += spent_mcs; + socket_.set_option(boost::asio::ip::tcp::no_delay(true)); + socket_.set_option(boost::asio::socket_base::linger(true, 0)); + start_read(); + start_write(); + } + + void start_write() { + if (write_in_progress_) { + start_write_when_write_completes_ = true; + return; + } + exchange_timer_.expires_from_now(boost::posix_time::microseconds( + exchange_rand_(rand_engine_))); + start_exchange_timer_wait(); + timer_queue_.emplace_back(timer()); + boost::asio::async_write(socket_, boost::asio::buffer(storage_.write_buf), + make_custom_alloc_handler(storage_.write_allocator, std::bind( + &connection::handle_write, this, std::placeholders::_1))); + write_in_progress_ = true; + } + + void handle_write(const boost::system::error_code& e) { + write_in_progress_ = false; + if (stopped) { + return; + } + if (errored_) { + return; + } + if (e) { + errored_ = true; + ++exchange_error_num; + stop_operations(); + return; + } + if (start_write_when_write_completes_) { + start_write_when_write_completes_ = false; + start_write(); + } + } + + void handle_exchange_timer(const boost::system::error_code& e) { + exchange_timer_wait_in_progress_ = false; + if (stopped) { + return; + } + if (errored_) { + return; + } + if (e && e != boost::asio::error::operation_aborted) { + errored_ = true; + stop_operations(); + return; + } + if (start_exchange_timer_wait_when_wait_completes_) { + start_exchange_timer_wait_when_wait_completes_ = false; + start_exchange_timer_wait(); + return; + } + if (e != boost::asio::error::operation_aborted) { + start_write(); + } + } + + void start_exchange_timer_wait() { + if (exchange_timer_wait_in_progress_) { + start_exchange_timer_wait_when_wait_completes_ = true; + return; + } + exchange_timer_.async_wait(make_custom_alloc_handler(storage_.exchange_timer_allocator, + std::bind(&connection::handle_exchange_timer, this, std::placeholders::_1))); + exchange_timer_wait_in_progress_ = true; + } + + void start_read() { + std::fill(storage_.read_buf.begin(), storage_.read_buf.end(), 0); + boost::asio::async_read(socket_, boost::asio::buffer(storage_.read_buf), + make_custom_alloc_handler(storage_.read_allocator, std::bind( + &connection::handle_read, this, std::placeholders::_1, std::placeholders::_2))); + } + + void handle_read(const boost::system::error_code& e, std::size_t transferred) { + if (stopped) { + return; + } + if (errored_) { + return; + } + if (e || transferred != 32 || !std::equal( + storage_.write_buf.begin(), storage_.write_buf.end(), + storage_.read_buf.begin(), storage_.read_buf.end())) { + errored_ = true; + ++exchange_error_num; + stop_operations(); + return; + } + auto spent_mcs = timer_queue_.front().current(); + timer_queue_.erase(timer_queue_.begin()); + if (spent_mcs > client_timeout_mcs) { + errored_ = true; + ++exchange_error_num; + stop_operations(); + return; + } + ++msg_num; + latency_sum_mcs += spent_mcs; + start_read(); + } + + void stop_operations() { + boost::system::error_code ignored; + socket_.close(ignored); + exchange_timer_.cancel(ignored); + } + + boost::asio::io_service& my_service_; + boost::asio::ip::tcp::socket socket_; + boost::asio::deadline_timer bear_timer_; + boost::asio::deadline_timer exchange_timer_; + timer connect_timer_; + bool errored_; + bool write_in_progress_; + bool start_write_when_write_completes_; + bool exchange_timer_wait_in_progress_; + bool start_exchange_timer_wait_when_wait_completes_; + std::vector timer_queue_; + std::mt19937 rand_engine_; + std::uniform_int_distribution data_rand_; + std::uniform_int_distribution exchange_rand_; + connection_data_storage& storage_; }; void print_stat(bool final = false) { - if(final) cout << "\nFinal statistics:\n"; - size_t conn_avg = connection_summ_mcs / max((size_t) children - connection_errors, (size_t) 1); - size_t lat_avg = latency_summ_mcs / max((size_t) msgs, (size_t) 1); - cout << "Chld: " << children << " ConnErr: " << (final ? connection_errors + connection_active : connection_errors) << " ExchErr: " << exchange_errors << " ConnAvg: " << (((double) conn_avg) / 1000) << "ms LatAvg: " << (((double) lat_avg) / 1000) << "ms Msgs: " << msgs << "\n"; + if (final) { + std::cout << "\nFinal statistics:\n"; + } + std::size_t conn_avg = connection_sum_mcs / (std::max)( + child_num - connection_error_num, static_cast(1)); + std::size_t lat_avg = latency_sum_mcs / (std::max)( + static_cast(msg_num), static_cast(1)); + std::cout << "Chld: " << child_num + << " ConnErr: " + << (final ? static_cast(connection_error_num + active_connection_num) + : static_cast(connection_error_num)) + << " ExchErr: " << exchange_error_num + << " ConnAvg: " << static_cast(conn_avg) / 1000 + << "ms LatAvg: " << static_cast(lat_avg) / 1000 + << "ms Msgs: " << msg_num << "\n"; +} + +#if BOOST_VERSION >= 106600 + +typedef int io_context_concurrency_hint; + +io_context_concurrency_hint to_io_context_concurrency_hint(std::size_t hint) { + return 1 == hint ? BOOST_ASIO_CONCURRENCY_HINT_UNSAFE_IO + : boost::numeric_cast(hint); +} + +#else // BOOST_VERSION >= 106600 + +typedef std::size_t io_context_concurrency_hint; + +io_context_concurrency_hint to_io_context_concurrency_hint(std::size_t hint) { + return hint; } -int main(int args, char **argv) { - if(args < 2) { - cout << "Usage: " << argv[0] << " [port = 32000 [threads = 24]]" << endl; - return 1; - } - string host(argv[1]); - string port(args > 2 ? argv[2] : "32000"); - string threads(args > 3 ? argv[3] : "24"); - threads_ = atoi(threads.c_str()); - vthreads = new thread[threads_]; - services = new io_service[threads_]; - for(size_t i = 0; i < threads_; ++i) vthreads[i] = thread(servicing, i); - sleep(1); - cout << "Starting tests" << endl; - tcp::resolver resolver(services[0]); - tcp::resolver::query query(host, port); - connect_to = resolver.resolve(query); - new connection_handler; - for(size_t i = 0; i < 60; i += 5) { - print_stat(); - sleep(5); +#endif // BOOST_VERSION >= 106600 + +const std::size_t print_stats_interval = 5; + +const char* help_option_name = "help"; +const char* host_option_name = "host"; +const char* port_option_name = "port"; +const char* threads_option_name = "threads"; +const char* duration_option_name = "duration"; + +boost::program_options::options_description build_program_options_description() { + boost::program_options::options_description description("Usage"); + description.add_options() + ( + help_option_name, + "produce help message" + ) + ( + host_option_name, + boost::program_options::value(), + "target host" + ) + ( + port_option_name, + boost::program_options::value()->default_value(32000), + "target port" + ) + ( + threads_option_name, + boost::program_options::value()->default_value(24), + "number of threads" + ) + ( + duration_option_name, + boost::program_options::value()->default_value(60), + "duration, seconds" + ); + return std::move(description); +} + +#if defined(WIN32) +boost::program_options::variables_map parse_program_options( + const boost::program_options::options_description& options_description, + int argc, _TCHAR* argv[]) { +#else +boost::program_options::variables_map parse_program_options( + const boost::program_options::options_description& options_description, + int argc, char* argv[]) { +#endif + boost::program_options::variables_map values; + boost::program_options::store( + boost::program_options::parse_command_line(argc, argv, options_description), + values); + boost::program_options::notify(values); + return std::move(values); +} + +} // anonymous namespace + +#if defined(WIN32) +int _tmain(int argc, _TCHAR** argv) { +#else +int main(int argc, char* argv[]) { +#endif + try { + auto po_description = build_program_options_description(); + auto po_values = parse_program_options(po_description, argc, argv); + if (po_values.count(help_option_name)) { + std::cout << po_description; + return EXIT_SUCCESS; + } + if (!po_values.count(host_option_name)) { + std::cerr << "host is required\n" << po_description; + return EXIT_FAILURE; + } + auto host = po_values[host_option_name].as(); + auto port = po_values[port_option_name].as(); + auto thread_num = po_values[threads_option_name].as(); + auto duration = po_values[duration_option_name].as(); + + std::random_device random_device; + std::mt19937 rand_engine(random_device()); + std::vector> connection_data; + connection_data.reserve(clients_max); + for (std::size_t i = 0; i < clients_max; ++i) { + connection_data.emplace_back(std::make_unique()); + } + std::vector> services; + services.reserve(thread_num); + for (std::size_t i = 0; i < thread_num; ++i) { + services.push_back(std::make_unique( + to_io_context_concurrency_hint(1))); + } + connection_vector connections; + connections.reserve(clients_max); + for (std::size_t i = 0; i < clients_max; ++i) { + connections.push_back(std::make_unique( + *services[i % thread_num], *connection_data[i], rand_engine)); + } + std::vector threads; + threads.reserve(thread_num); + for (std::size_t i = 0; i < thread_num; ++i) { + boost::asio::io_service& service = *services[i]; + threads.emplace_back([&service] { + boost::asio::io_service::work work_guard(service); + service.run(); + }); + } + std::cout << "Starting tests" << std::endl; + boost::asio::ip::tcp::resolver resolver(*services[0]); + boost::asio::ip::tcp::resolver::query query(host, std::to_string(port)); + auto& connection = *connections[connection_index.fetch_add(1)]; + connection.async_start(resolver.resolve(query), connections); + for (std::size_t i = 0; i < duration; i += print_stats_interval) { + print_stat(); + std::this_thread::sleep_for(std::chrono::seconds(print_stats_interval)); + } + stopped = true; + for (auto& service : services) { + service->stop(); + } + for (auto& thread : threads) { + thread.join(); } print_stat(true); - return 0; + return EXIT_SUCCESS; + } catch (const boost::program_options::error& e) { + std::cerr << "Error reading options: " << e.what() << std::endl; + } catch (const std::exception& e) { + std::cerr << "Unexpected error: " << e.what() << std::endl; + } catch (...) { + std::cerr << "Unknown error" << std::endl; + } + return EXIT_FAILURE; } diff --git a/client-c++-virtan/test.sh b/client-c++-virtan/test.sh new file mode 100644 index 0000000..8f506bd --- /dev/null +++ b/client-c++-virtan/test.sh @@ -0,0 +1,50 @@ +#!/bin/sh + +set -e + +if [[ "${#}" -lt 2 ]]; then + echo "Usage: ${0} host port [number-of-threads = 24 [number-of-iteration = 16 [client-docker-image]]]" >&2 + exit 1 +fi + +target_host="${1}" +target_port=${2} + +if [[ "${#}" -ge 3 ]]; then + work_threads=${3} +else + work_threads=24 +fi + +if [[ "${#}" -ge 4 ]]; then + run_iterations=${4} +else + run_iterations=16 +fi + +if [[ "${#}" -ge 5 ]]; then + docker_image="${5}" +else + docker_image="client-cpp-virtan" +fi + +max_msgs=0 +best_stats="" + +echo "Running container from ${docker_image} image ${run_iterations} times with command: --host \"${target_host}\" --port \"${target_port}\" --threads \"${work_threads}\"" + +i=0 +while [[ ${i} -lt ${run_iterations} ]] +do + #echo "Running iteration #${i}..." + stats=$(docker run --rm "${docker_image}" --host "${target_host}" --port "${target_port}" --threads "${work_threads}" 2>&1 | grep -A 1 "Final statistics" | grep "Chld:") + #echo "Stats: ${stats}" + msgs=$(echo "${stats}" | sed -r 's/.*Msgs: ([0-9]+)/\1/') + if [[ "${msgs}" -gt ${max_msgs} ]]; then + max_msgs=${msgs} + best_stats="${stats}" + fi + i=$(( ${i} + 1 )) +done + +echo "Best stats: ${best_stats}"