diff --git a/CMakePresets.json b/CMakePresets.json new file mode 100644 index 000000000..e0445e306 --- /dev/null +++ b/CMakePresets.json @@ -0,0 +1,14 @@ +{ + "version": 2, + "configurePresets": [ + { + "name": "vcpkg", + "generator": "Ninja", + "binaryDir": "${sourceDir}/build", + "cacheVariables": { + "CMAKE_TOOLCHAIN_FILE": "$env{VCPKG_ROOT}/scripts/buildsystems/vcpkg.cmake", + "CMAKE_BUILD_TYPE": "Debug" + } + } + ] +} diff --git a/include/libp2p/transport/quic/engine.hpp b/include/libp2p/transport/quic/engine.hpp index f29151d9a..3688f2a12 100644 --- a/include/libp2p/transport/quic/engine.hpp +++ b/include/libp2p/transport/quic/engine.hpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include @@ -75,14 +76,9 @@ namespace libp2p::transport::lsquic { Engine *engine; lsquic_stream_t *ls_stream; std::weak_ptr stream{}; - /** - * Stream read operation arguments. - */ - struct Reading { - BytesOut out; - std::function)> cb; - }; - std::optional reading{}; + std::optional> reading{}; + std::optional> writing{}; + bool want_flush = false; }; using OnAccept = std::function)>; @@ -118,9 +114,11 @@ namespace libp2p::transport::lsquic { void onAccept(OnAccept cb) { on_accept_ = std::move(cb); } - void process(); + void wantProcess(); + void wantFlush(StreamCtx *stream_ctx); private: + void process(); void readLoop(); std::shared_ptr io_context_; @@ -134,6 +132,8 @@ namespace libp2p::transport::lsquic { lsquic_engine_t *engine_ = nullptr; OnAccept on_accept_; bool started_ = false; + std::deque> want_flush_; + bool want_process_ = false; std::optional connecting_; struct Reading { static constexpr size_t kMaxUdpPacketSize = 64 << 10; diff --git a/include/libp2p/transport/quic/stream.hpp b/include/libp2p/transport/quic/stream.hpp index 453a443a8..50ee30a89 100644 --- a/include/libp2p/transport/quic/stream.hpp +++ b/include/libp2p/transport/quic/stream.hpp @@ -13,12 +13,15 @@ namespace libp2p::transport { } // namespace libp2p::transport namespace libp2p::transport::lsquic { + class Engine; struct StreamCtx; } // namespace libp2p::transport::lsquic namespace libp2p::connection { class QuicStream : public Stream, public std::enable_shared_from_this { + friend class libp2p::transport::lsquic::Engine; + public: QuicStream(std::shared_ptr conn, transport::lsquic::StreamCtx *stream_ctx, diff --git a/src/transport/quic/engine.cpp b/src/transport/quic/engine.cpp index 267a8d761..69233be7d 100644 --- a/src/transport/quic/engine.cpp +++ b/src/transport/quic/engine.cpp @@ -45,6 +45,7 @@ namespace libp2p::transport::lsquic { lsquic_engine_settings settings{}; lsquic_engine_init_settings(&settings, flags); + settings.es_versions = 1 << LSQVER_I001; settings.es_init_max_stream_data_bidi_remote = mux_config.maximum_window_size; settings.es_init_max_stream_data_bidi_local = @@ -58,10 +59,10 @@ namespace libp2p::transport::lsquic { static lsquic_stream_if stream_if{}; stream_if.on_new_conn = +[](void *void_self, lsquic_conn_t *conn) { - auto self = static_cast(void_self); + auto *self = static_cast(void_self); auto op = qtils::optionTake(self->connecting_); // NOLINTNEXTLINE(cppcoreguidelines-owning-memory) - auto conn_ctx = new ConnCtx{self, conn, std::move(op)}; + auto *conn_ctx = new ConnCtx{self, conn, std::move(op)}; // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) auto _conn_ctx = reinterpret_cast(conn_ctx); lsquic_conn_set_ctx(conn, _conn_ctx); @@ -72,7 +73,7 @@ namespace libp2p::transport::lsquic { }; stream_if.on_conn_closed = +[](lsquic_conn_t *conn) { // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) - auto conn_ctx = reinterpret_cast(lsquic_conn_get_ctx(conn)); + auto *conn_ctx = reinterpret_cast(lsquic_conn_get_ctx(conn)); if (auto op = qtils::optionTake(conn_ctx->connecting)) { op->cb(QuicError::CONN_CLOSED); } @@ -85,7 +86,7 @@ namespace libp2p::transport::lsquic { }; stream_if.on_hsk_done = +[](lsquic_conn_t *conn, lsquic_hsk_status status) { // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) - auto conn_ctx = reinterpret_cast(lsquic_conn_get_ctx(conn)); + auto *conn_ctx = reinterpret_cast(lsquic_conn_get_ctx(conn)); auto self = conn_ctx->engine; auto ok = status == LSQ_HSK_OK or status == LSQ_HSK_RESUMED_OK; auto op = qtils::optionTake(conn_ctx->connecting); @@ -122,12 +123,12 @@ namespace libp2p::transport::lsquic { } }; stream_if.on_new_stream = +[](void *void_self, lsquic_stream_t *stream) { - auto self = static_cast(void_self); + auto *self = static_cast(void_self); // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) - auto conn_ctx = reinterpret_cast( + auto *conn_ctx = reinterpret_cast( lsquic_conn_get_ctx(lsquic_stream_conn(stream))); // NOLINTNEXTLINE(cppcoreguidelines-owning-memory) - auto stream_ctx = new StreamCtx{self, stream}; + auto *stream_ctx = new StreamCtx{self, stream}; if (auto conn = conn_ctx->conn.lock()) { auto stream = std::make_shared( conn, stream_ctx, conn_ctx->new_stream.has_value()); @@ -146,13 +147,16 @@ namespace libp2p::transport::lsquic { stream_if.on_close = +[](lsquic_stream_t *stream, lsquic_stream_ctx_t *_stream_ctx) { // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) - auto stream_ctx = reinterpret_cast(_stream_ctx); - if (auto op = qtils::optionTake(stream_ctx->reading)) { - op->cb(QuicError::STREAM_CLOSED); - } + auto *stream_ctx = reinterpret_cast(_stream_ctx); if (auto stream = stream_ctx->stream.lock()) { stream->onClose(); } + if (auto reading = qtils::optionTake(stream_ctx->reading)) { + reading.value()(); + } + if (auto writing = qtils::optionTake(stream_ctx->writing)) { + writing.value()(); + } // NOLINTNEXTLINE(cppcoreguidelines-owning-memory) delete stream_ctx; }; @@ -160,15 +164,19 @@ namespace libp2p::transport::lsquic { +[](lsquic_stream_t *stream, lsquic_stream_ctx_t *_stream_ctx) { lsquic_stream_wantread(stream, 0); // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) - auto stream_ctx = reinterpret_cast(_stream_ctx); - auto op = qtils::optionTake(stream_ctx->reading).value(); - auto n = lsquic_stream_read(stream, op.out.data(), op.out.size()); - outcome::result r = QuicError::STREAM_CLOSED; - if (n > 0) { - r = n; + auto *stream_ctx = reinterpret_cast(_stream_ctx); + if (auto reading = qtils::optionTake(stream_ctx->reading)) { + reading.value()(); + } + }; + stream_if.on_write = + +[](lsquic_stream_t *stream, lsquic_stream_ctx_t *_stream_ctx) { + lsquic_stream_wantwrite(stream, 0); + // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) + auto *stream_ctx = reinterpret_cast(_stream_ctx); + if (auto writing = qtils::optionTake(stream_ctx->writing)) { + writing.value()(); } - post(*stream_ctx->engine->io_context_, - [cb{std::move(op.cb)}, r] { cb(r); }); }; lsquic_engine_api api{}; @@ -179,7 +187,7 @@ namespace libp2p::transport::lsquic { api.ea_packets_out = +[](void *void_self, const lsquic_out_spec *out_spec, unsigned n_packets_out) { - auto self = static_cast(void_self); + auto *self = static_cast(void_self); // https://github.com/cbodley/nexus/blob/d1d8486f713fd089917331239d755932c7c8ed8e/src/socket.cc#L218 int r = 0; for (auto &spec : std::span{out_spec, n_packets_out}) { @@ -216,7 +224,7 @@ namespace libp2p::transport::lsquic { }; api.ea_packets_out_ctx = this; api.ea_get_ssl_ctx = +[](void *void_self, const sockaddr *) { - auto self = static_cast(void_self); + auto *self = static_cast(void_self); return self->ssl_context_->native_handle(); }; @@ -261,7 +269,7 @@ namespace libp2p::transport::lsquic { if (auto op = qtils::optionTake(connecting_)) { op->cb(QuicError::CANT_CREATE_CONNECTION); } - process(); + wantProcess(); } outcome::result> Engine::newStream( @@ -281,7 +289,47 @@ namespace libp2p::transport::lsquic { return stream; } + void Engine::wantProcess() { + if (want_process_) { + return; + } + want_process_ = true; + boost::asio::post(*io_context_, [weak_self{weak_from_this()}] { + if (auto self = weak_self.lock()) { + self->process(); + } + }); + } + + void Engine::wantFlush(StreamCtx *stream_ctx) { + if (stream_ctx->want_flush) { + return; + } + stream_ctx->want_flush = true; + if (stream_ctx->stream.expired()) { + return; + } + want_flush_.emplace_back(stream_ctx->stream); + wantProcess(); + } + void Engine::process() { + want_process_ = false; + auto want_flush = std::exchange(want_flush_, {}); + for (auto &weak_stream : want_flush) { + auto stream = weak_stream.lock(); + if (not stream) { + continue; + } + if (stream->stream_ctx_ == nullptr) { + continue; + } + if (stream->stream_ctx_->ls_stream == nullptr) { + continue; + } + stream->stream_ctx_->want_flush = false; + lsquic_stream_flush(stream->stream_ctx_->ls_stream); + } lsquic_engine_process_conns(engine_); int us = 0; if (not lsquic_engine_earliest_adv_tick(engine_, &us)) { diff --git a/src/transport/quic/stream.cpp b/src/transport/quic/stream.cpp index 1adb3f527..4c923c8d0 100644 --- a/src/transport/quic/stream.cpp +++ b/src/transport/quic/stream.cpp @@ -34,7 +34,15 @@ namespace libp2p::connection { } auto n = lsquic_stream_read(stream_ctx_->ls_stream, out.data(), out.size()); if (n == -1 and errno == EWOULDBLOCK) { - stream_ctx_->reading.emplace(StreamCtx::Reading{out, std::move(cb)}); + stream_ctx_->reading.emplace( + [weak_self{weak_from_this()}, out, cb{std::move(cb)}]() mutable { + auto self = weak_self.lock(); + if (not self) { + cb(QuicError::STREAM_CLOSED); + return; + } + self->readSome(out, std::move(cb)); + }); lsquic_stream_wantread(stream_ctx_->ls_stream, 1); return; } @@ -54,11 +62,31 @@ namespace libp2p::connection { if (not stream_ctx_) { return cb(r); } + if (stream_ctx_->writing) { + throw std::logic_error{"QuicStream::writeSome already in progress"}; + } + // Missing from `lsquic_stream_write` documentation comment. + // Return value 0 means buffer is full. + // Call `lsquic_stream_wantwrite` and wait for `stream_if.on_write` + // callback, before calling `lsquic_stream_write` again. auto n = lsquic_stream_write(stream_ctx_->ls_stream, in.data(), in.size()); - if (n > 0 and lsquic_stream_flush(stream_ctx_->ls_stream) == 0) { + if (n == 0) { + stream_ctx_->writing.emplace( + [weak_self{weak_from_this()}, in, cb{std::move(cb)}]() mutable { + auto self = weak_self.lock(); + if (not self) { + cb(QuicError::STREAM_CLOSED); + return; + } + self->writeSome(in, std::move(cb)); + }); + lsquic_stream_wantwrite(stream_ctx_->ls_stream, 1); + return; + } + if (n > 0) { r = n; + stream_ctx_->engine->wantFlush(stream_ctx_); } - stream_ctx_->engine->process(); deferReadCallback(r, std::move(cb)); } diff --git a/test/libp2p/transport/quic_test.cpp b/test/libp2p/transport/quic_test.cpp index ba81fde46..293092671 100644 --- a/test/libp2p/transport/quic_test.cpp +++ b/test/libp2p/transport/quic_test.cpp @@ -9,7 +9,6 @@ #include #include #include -#include #include "testutil/prepare_loggers.hpp" @@ -19,8 +18,6 @@ using libp2p::Multiaddress; using libp2p::StreamAndProtocol; using libp2p::StreamAndProtocolOrError; using libp2p::connection::Stream; -using qtils::byte2str; -using qtils::str2byte; auto makeInjector(std::shared_ptr io) { return libp2p::injector::makeHostInjector< @@ -54,7 +51,9 @@ struct Peer { TEST(Quic, Test) { testutil::prepareLoggers(); std::string protocol = "/test"; - std::string_view req{"request"}, res{"response"}; + const size_t size = 1 << 20; + libp2p::Bytes req(size, 'a'), res(size, 'b'); + auto io = std::make_shared(); auto run = [&] { io->restart(); @@ -94,16 +93,16 @@ TEST(Quic, Test) { } wait_count = 2; - qtils::Bytes req_out(req.size()); - libp2p::write(client.stream, str2byte(req), RW_CB); + libp2p::Bytes req_out(req.size()); + libp2p::write(client.stream, req, RW_CB); libp2p::read(server.stream, req_out, RW_CB); run(); - EXPECT_EQ(byte2str(req_out), req); + EXPECT_EQ(req_out, req); wait_count = 2; - qtils::Bytes res_out(res.size()); + libp2p::Bytes res_out(res.size()); libp2p::read(client.stream, res_out, RW_CB); - libp2p::write(server.stream, str2byte(res), RW_CB); + libp2p::write(server.stream, res, RW_CB); run(); - EXPECT_EQ(byte2str(res_out), res); + EXPECT_EQ(res_out, res); } diff --git a/vcpkg-configuration.json b/vcpkg-configuration.json index 1ad42d251..0073f1a14 100644 --- a/vcpkg-configuration.json +++ b/vcpkg-configuration.json @@ -1,16 +1,9 @@ { "default-registry": { "kind": "git", - "baseline": "fe1cde61e971d53c9687cf9a46308f8f55da19fa", + "baseline": "897ba2ab4c4c776b985ab1f599548fcf3ae598ba", "repository": "https://github.com/microsoft/vcpkg" }, - "registries": [ - { - "kind": "artifact", - "location": "https://github.com/microsoft/vcpkg-ce-catalog/archive/refs/heads/main.zip", - "name": "microsoft" - } - ], "overlay-ports": [ "vcpkg-overlay" ]