From 0662c695b1af4ccbd29557d53fcca563a4e7717f Mon Sep 17 00:00:00 2001 From: turuslan Date: Thu, 19 Feb 2026 20:17:45 +0500 Subject: [PATCH 1/8] cmake preset vcpkg Signed-off-by: turuslan --- CMakePresets.json | 14 ++++++++++++++ 1 file changed, 14 insertions(+) create mode 100644 CMakePresets.json diff --git a/CMakePresets.json b/CMakePresets.json new file mode 100644 index 000000000..e0445e306 --- /dev/null +++ b/CMakePresets.json @@ -0,0 +1,14 @@ +{ + "version": 2, + "configurePresets": [ + { + "name": "vcpkg", + "generator": "Ninja", + "binaryDir": "${sourceDir}/build", + "cacheVariables": { + "CMAKE_TOOLCHAIN_FILE": "$env{VCPKG_ROOT}/scripts/buildsystems/vcpkg.cmake", + "CMAKE_BUILD_TYPE": "Debug" + } + } + ] +} From 5edd59786005573390c8c40bc2a99857f4ee864f Mon Sep 17 00:00:00 2001 From: turuslan Date: Thu, 19 Feb 2026 20:17:59 +0500 Subject: [PATCH 2/8] update vcpkg baseline Signed-off-by: turuslan --- vcpkg-configuration.json | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/vcpkg-configuration.json b/vcpkg-configuration.json index 1ad42d251..0073f1a14 100644 --- a/vcpkg-configuration.json +++ b/vcpkg-configuration.json @@ -1,16 +1,9 @@ { "default-registry": { "kind": "git", - "baseline": "fe1cde61e971d53c9687cf9a46308f8f55da19fa", + "baseline": "897ba2ab4c4c776b985ab1f599548fcf3ae598ba", "repository": "https://github.com/microsoft/vcpkg" }, - "registries": [ - { - "kind": "artifact", - "location": "https://github.com/microsoft/vcpkg-ce-catalog/archive/refs/heads/main.zip", - "name": "microsoft" - } - ], "overlay-ports": [ "vcpkg-overlay" ] From 8d733af65d6c4a371beb64430eb854f8de626a9f Mon Sep 17 00:00:00 2001 From: turuslan Date: Thu, 19 Feb 2026 20:20:34 +0500 Subject: [PATCH 3/8] simplify read Signed-off-by: turuslan --- include/libp2p/transport/quic/engine.hpp | 9 +-------- src/transport/quic/engine.cpp | 15 +++++---------- src/transport/quic/stream.cpp | 10 +++++++++- 3 files changed, 15 insertions(+), 19 deletions(-) diff --git a/include/libp2p/transport/quic/engine.hpp b/include/libp2p/transport/quic/engine.hpp index f29151d9a..6b73b2049 100644 --- a/include/libp2p/transport/quic/engine.hpp +++ b/include/libp2p/transport/quic/engine.hpp @@ -75,14 +75,7 @@ namespace libp2p::transport::lsquic { Engine *engine; lsquic_stream_t *ls_stream; std::weak_ptr stream{}; - /** - * Stream read operation arguments. - */ - struct Reading { - BytesOut out; - std::function)> cb; - }; - std::optional reading{}; + std::optional> reading{}; }; using OnAccept = std::function)>; diff --git a/src/transport/quic/engine.cpp b/src/transport/quic/engine.cpp index 267a8d761..43b2e3997 100644 --- a/src/transport/quic/engine.cpp +++ b/src/transport/quic/engine.cpp @@ -147,12 +147,12 @@ namespace libp2p::transport::lsquic { +[](lsquic_stream_t *stream, lsquic_stream_ctx_t *_stream_ctx) { // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) auto stream_ctx = reinterpret_cast(_stream_ctx); - if (auto op = qtils::optionTake(stream_ctx->reading)) { - op->cb(QuicError::STREAM_CLOSED); - } if (auto stream = stream_ctx->stream.lock()) { stream->onClose(); } + if (auto reading = qtils::optionTake(stream_ctx->reading)) { + reading.value()(); + } // NOLINTNEXTLINE(cppcoreguidelines-owning-memory) delete stream_ctx; }; @@ -161,14 +161,9 @@ namespace libp2p::transport::lsquic { lsquic_stream_wantread(stream, 0); // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) auto stream_ctx = reinterpret_cast(_stream_ctx); - auto op = qtils::optionTake(stream_ctx->reading).value(); - auto n = lsquic_stream_read(stream, op.out.data(), op.out.size()); - outcome::result r = QuicError::STREAM_CLOSED; - if (n > 0) { - r = n; + if (auto reading = qtils::optionTake(stream_ctx->reading)) { + reading.value()(); } - post(*stream_ctx->engine->io_context_, - [cb{std::move(op.cb)}, r] { cb(r); }); }; lsquic_engine_api api{}; diff --git a/src/transport/quic/stream.cpp b/src/transport/quic/stream.cpp index 1adb3f527..f200139d9 100644 --- a/src/transport/quic/stream.cpp +++ b/src/transport/quic/stream.cpp @@ -34,7 +34,15 @@ namespace libp2p::connection { } auto n = lsquic_stream_read(stream_ctx_->ls_stream, out.data(), out.size()); if (n == -1 and errno == EWOULDBLOCK) { - stream_ctx_->reading.emplace(StreamCtx::Reading{out, std::move(cb)}); + stream_ctx_->reading.emplace( + [weak_self{weak_from_this()}, out, cb{std::move(cb)}]() mutable { + auto self = weak_self.lock(); + if (not self) { + cb(QuicError::STREAM_CLOSED); + return; + } + self->readSome(out, std::move(cb)); + }); lsquic_stream_wantread(stream_ctx_->ls_stream, 1); return; } From db84f1a7d2f15f114f1c7227d08290a2c434bfba Mon Sep 17 00:00:00 2001 From: turuslan Date: Thu, 19 Feb 2026 20:20:55 +0500 Subject: [PATCH 4/8] fix quic version Signed-off-by: turuslan --- src/transport/quic/engine.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/transport/quic/engine.cpp b/src/transport/quic/engine.cpp index 43b2e3997..cadb3d5c2 100644 --- a/src/transport/quic/engine.cpp +++ b/src/transport/quic/engine.cpp @@ -45,6 +45,7 @@ namespace libp2p::transport::lsquic { lsquic_engine_settings settings{}; lsquic_engine_init_settings(&settings, flags); + settings.es_versions = 1 << LSQVER_I001; settings.es_init_max_stream_data_bidi_remote = mux_config.maximum_window_size; settings.es_init_max_stream_data_bidi_local = From e25bdb657d2fe2c5abf6e10b31d291da142206e2 Mon Sep 17 00:00:00 2001 From: turuslan Date: Thu, 19 Feb 2026 20:22:05 +0500 Subject: [PATCH 5/8] batch defer process Signed-off-by: turuslan --- include/libp2p/transport/quic/engine.hpp | 4 +++- src/transport/quic/engine.cpp | 15 ++++++++++++++- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/include/libp2p/transport/quic/engine.hpp b/include/libp2p/transport/quic/engine.hpp index 6b73b2049..c60f34d2e 100644 --- a/include/libp2p/transport/quic/engine.hpp +++ b/include/libp2p/transport/quic/engine.hpp @@ -111,9 +111,10 @@ namespace libp2p::transport::lsquic { void onAccept(OnAccept cb) { on_accept_ = std::move(cb); } - void process(); + void wantProcess(); private: + void process(); void readLoop(); std::shared_ptr io_context_; @@ -127,6 +128,7 @@ namespace libp2p::transport::lsquic { lsquic_engine_t *engine_ = nullptr; OnAccept on_accept_; bool started_ = false; + bool want_process_ = false; std::optional connecting_; struct Reading { static constexpr size_t kMaxUdpPacketSize = 64 << 10; diff --git a/src/transport/quic/engine.cpp b/src/transport/quic/engine.cpp index cadb3d5c2..38d7492ef 100644 --- a/src/transport/quic/engine.cpp +++ b/src/transport/quic/engine.cpp @@ -257,7 +257,7 @@ namespace libp2p::transport::lsquic { if (auto op = qtils::optionTake(connecting_)) { op->cb(QuicError::CANT_CREATE_CONNECTION); } - process(); + wantProcess(); } outcome::result> Engine::newStream( @@ -277,7 +277,20 @@ namespace libp2p::transport::lsquic { return stream; } + void Engine::wantProcess() { + if (want_process_) { + return; + } + want_process_ = true; + boost::asio::post(*io_context_, [weak_self{weak_from_this()}] { + if (auto self = weak_self.lock()) { + self->process(); + } + }); + } + void Engine::process() { + want_process_ = false; lsquic_engine_process_conns(engine_); int us = 0; if (not lsquic_engine_earliest_adv_tick(engine_, &us)) { From 404dc0dc7bf22644e2aed0701887f5ecfd1be4f6 Mon Sep 17 00:00:00 2001 From: turuslan Date: Thu, 19 Feb 2026 20:22:20 +0500 Subject: [PATCH 6/8] fix write Signed-off-by: turuslan --- include/libp2p/transport/quic/engine.hpp | 4 +++ include/libp2p/transport/quic/stream.hpp | 2 ++ src/transport/quic/engine.cpp | 36 ++++++++++++++++++++++++ src/transport/quic/stream.cpp | 24 ++++++++++++++-- 4 files changed, 64 insertions(+), 2 deletions(-) diff --git a/include/libp2p/transport/quic/engine.hpp b/include/libp2p/transport/quic/engine.hpp index c60f34d2e..105bcef2f 100644 --- a/include/libp2p/transport/quic/engine.hpp +++ b/include/libp2p/transport/quic/engine.hpp @@ -76,6 +76,8 @@ namespace libp2p::transport::lsquic { lsquic_stream_t *ls_stream; std::weak_ptr stream{}; std::optional> reading{}; + std::optional> writing{}; + bool want_flush = false; }; using OnAccept = std::function)>; @@ -112,6 +114,7 @@ namespace libp2p::transport::lsquic { on_accept_ = std::move(cb); } void wantProcess(); + void wantFlush(StreamCtx *stream_ctx); private: void process(); @@ -128,6 +131,7 @@ namespace libp2p::transport::lsquic { lsquic_engine_t *engine_ = nullptr; OnAccept on_accept_; bool started_ = false; + std::deque> want_flush_; bool want_process_ = false; std::optional connecting_; struct Reading { diff --git a/include/libp2p/transport/quic/stream.hpp b/include/libp2p/transport/quic/stream.hpp index 453a443a8..e45bf2062 100644 --- a/include/libp2p/transport/quic/stream.hpp +++ b/include/libp2p/transport/quic/stream.hpp @@ -19,6 +19,8 @@ namespace libp2p::transport::lsquic { namespace libp2p::connection { class QuicStream : public Stream, public std::enable_shared_from_this { + friend libp2p::transport::lsquic::Engine; + public: QuicStream(std::shared_ptr conn, transport::lsquic::StreamCtx *stream_ctx, diff --git a/src/transport/quic/engine.cpp b/src/transport/quic/engine.cpp index 38d7492ef..f1e046052 100644 --- a/src/transport/quic/engine.cpp +++ b/src/transport/quic/engine.cpp @@ -154,6 +154,9 @@ namespace libp2p::transport::lsquic { if (auto reading = qtils::optionTake(stream_ctx->reading)) { reading.value()(); } + if (auto writing = qtils::optionTake(stream_ctx->writing)) { + writing.value()(); + } // NOLINTNEXTLINE(cppcoreguidelines-owning-memory) delete stream_ctx; }; @@ -166,6 +169,15 @@ namespace libp2p::transport::lsquic { reading.value()(); } }; + stream_if.on_write = + +[](lsquic_stream_t *stream, lsquic_stream_ctx_t *_stream_ctx) { + lsquic_stream_wantwrite(stream, 0); + // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) + auto stream_ctx = reinterpret_cast(_stream_ctx); + if (auto writing = qtils::optionTake(stream_ctx->writing)) { + writing.value()(); + } + }; lsquic_engine_api api{}; api.ea_settings = &settings; @@ -289,8 +301,32 @@ namespace libp2p::transport::lsquic { }); } + void Engine::wantFlush(StreamCtx *stream_ctx) { + if (stream_ctx->want_flush) { + return; + } + stream_ctx->want_flush = true; + if (stream_ctx->stream.expired()) { + return; + } + want_flush_.emplace_back(stream_ctx->stream); + wantProcess(); + } + void Engine::process() { want_process_ = false; + auto want_flush = std::exchange(want_flush_, {}); + for (auto &weak_stream : want_flush) { + auto stream = weak_stream.lock(); + if (not stream) { + continue; + } + if (stream->stream_ctx_->ls_stream == nullptr) { + continue; + } + stream->stream_ctx_->want_flush = false; + lsquic_stream_flush(stream->stream_ctx_->ls_stream); + } lsquic_engine_process_conns(engine_); int us = 0; if (not lsquic_engine_earliest_adv_tick(engine_, &us)) { diff --git a/src/transport/quic/stream.cpp b/src/transport/quic/stream.cpp index f200139d9..4c923c8d0 100644 --- a/src/transport/quic/stream.cpp +++ b/src/transport/quic/stream.cpp @@ -62,11 +62,31 @@ namespace libp2p::connection { if (not stream_ctx_) { return cb(r); } + if (stream_ctx_->writing) { + throw std::logic_error{"QuicStream::writeSome already in progress"}; + } + // Missing from `lsquic_stream_write` documentation comment. + // Return value 0 means buffer is full. + // Call `lsquic_stream_wantwrite` and wait for `stream_if.on_write` + // callback, before calling `lsquic_stream_write` again. auto n = lsquic_stream_write(stream_ctx_->ls_stream, in.data(), in.size()); - if (n > 0 and lsquic_stream_flush(stream_ctx_->ls_stream) == 0) { + if (n == 0) { + stream_ctx_->writing.emplace( + [weak_self{weak_from_this()}, in, cb{std::move(cb)}]() mutable { + auto self = weak_self.lock(); + if (not self) { + cb(QuicError::STREAM_CLOSED); + return; + } + self->writeSome(in, std::move(cb)); + }); + lsquic_stream_wantwrite(stream_ctx_->ls_stream, 1); + return; + } + if (n > 0) { r = n; + stream_ctx_->engine->wantFlush(stream_ctx_); } - stream_ctx_->engine->process(); deferReadCallback(r, std::move(cb)); } From 51fa1a04f5aa5843070df137e814bf287d81cae9 Mon Sep 17 00:00:00 2001 From: turuslan Date: Wed, 25 Feb 2026 07:32:04 +0500 Subject: [PATCH 7/8] include Signed-off-by: turuslan --- include/libp2p/transport/quic/engine.hpp | 1 + 1 file changed, 1 insertion(+) diff --git a/include/libp2p/transport/quic/engine.hpp b/include/libp2p/transport/quic/engine.hpp index 105bcef2f..3688f2a12 100644 --- a/include/libp2p/transport/quic/engine.hpp +++ b/include/libp2p/transport/quic/engine.hpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include From 45daa760409ec794fee9b784fc5dec0e14ac5d11 Mon Sep 17 00:00:00 2001 From: turuslan Date: Tue, 3 Mar 2026 15:22:12 +0500 Subject: [PATCH 8/8] pr comment Signed-off-by: turuslan --- include/libp2p/transport/quic/stream.hpp | 3 ++- src/transport/quic/engine.cpp | 27 +++++++++++++----------- test/libp2p/transport/quic_test.cpp | 19 ++++++++--------- 3 files changed, 26 insertions(+), 23 deletions(-) diff --git a/include/libp2p/transport/quic/stream.hpp b/include/libp2p/transport/quic/stream.hpp index e45bf2062..50ee30a89 100644 --- a/include/libp2p/transport/quic/stream.hpp +++ b/include/libp2p/transport/quic/stream.hpp @@ -13,13 +13,14 @@ namespace libp2p::transport { } // namespace libp2p::transport namespace libp2p::transport::lsquic { + class Engine; struct StreamCtx; } // namespace libp2p::transport::lsquic namespace libp2p::connection { class QuicStream : public Stream, public std::enable_shared_from_this { - friend libp2p::transport::lsquic::Engine; + friend class libp2p::transport::lsquic::Engine; public: QuicStream(std::shared_ptr conn, diff --git a/src/transport/quic/engine.cpp b/src/transport/quic/engine.cpp index f1e046052..69233be7d 100644 --- a/src/transport/quic/engine.cpp +++ b/src/transport/quic/engine.cpp @@ -59,10 +59,10 @@ namespace libp2p::transport::lsquic { static lsquic_stream_if stream_if{}; stream_if.on_new_conn = +[](void *void_self, lsquic_conn_t *conn) { - auto self = static_cast(void_self); + auto *self = static_cast(void_self); auto op = qtils::optionTake(self->connecting_); // NOLINTNEXTLINE(cppcoreguidelines-owning-memory) - auto conn_ctx = new ConnCtx{self, conn, std::move(op)}; + auto *conn_ctx = new ConnCtx{self, conn, std::move(op)}; // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) auto _conn_ctx = reinterpret_cast(conn_ctx); lsquic_conn_set_ctx(conn, _conn_ctx); @@ -73,7 +73,7 @@ namespace libp2p::transport::lsquic { }; stream_if.on_conn_closed = +[](lsquic_conn_t *conn) { // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) - auto conn_ctx = reinterpret_cast(lsquic_conn_get_ctx(conn)); + auto *conn_ctx = reinterpret_cast(lsquic_conn_get_ctx(conn)); if (auto op = qtils::optionTake(conn_ctx->connecting)) { op->cb(QuicError::CONN_CLOSED); } @@ -86,7 +86,7 @@ namespace libp2p::transport::lsquic { }; stream_if.on_hsk_done = +[](lsquic_conn_t *conn, lsquic_hsk_status status) { // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) - auto conn_ctx = reinterpret_cast(lsquic_conn_get_ctx(conn)); + auto *conn_ctx = reinterpret_cast(lsquic_conn_get_ctx(conn)); auto self = conn_ctx->engine; auto ok = status == LSQ_HSK_OK or status == LSQ_HSK_RESUMED_OK; auto op = qtils::optionTake(conn_ctx->connecting); @@ -123,12 +123,12 @@ namespace libp2p::transport::lsquic { } }; stream_if.on_new_stream = +[](void *void_self, lsquic_stream_t *stream) { - auto self = static_cast(void_self); + auto *self = static_cast(void_self); // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) - auto conn_ctx = reinterpret_cast( + auto *conn_ctx = reinterpret_cast( lsquic_conn_get_ctx(lsquic_stream_conn(stream))); // NOLINTNEXTLINE(cppcoreguidelines-owning-memory) - auto stream_ctx = new StreamCtx{self, stream}; + auto *stream_ctx = new StreamCtx{self, stream}; if (auto conn = conn_ctx->conn.lock()) { auto stream = std::make_shared( conn, stream_ctx, conn_ctx->new_stream.has_value()); @@ -147,7 +147,7 @@ namespace libp2p::transport::lsquic { stream_if.on_close = +[](lsquic_stream_t *stream, lsquic_stream_ctx_t *_stream_ctx) { // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) - auto stream_ctx = reinterpret_cast(_stream_ctx); + auto *stream_ctx = reinterpret_cast(_stream_ctx); if (auto stream = stream_ctx->stream.lock()) { stream->onClose(); } @@ -164,7 +164,7 @@ namespace libp2p::transport::lsquic { +[](lsquic_stream_t *stream, lsquic_stream_ctx_t *_stream_ctx) { lsquic_stream_wantread(stream, 0); // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) - auto stream_ctx = reinterpret_cast(_stream_ctx); + auto *stream_ctx = reinterpret_cast(_stream_ctx); if (auto reading = qtils::optionTake(stream_ctx->reading)) { reading.value()(); } @@ -173,7 +173,7 @@ namespace libp2p::transport::lsquic { +[](lsquic_stream_t *stream, lsquic_stream_ctx_t *_stream_ctx) { lsquic_stream_wantwrite(stream, 0); // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) - auto stream_ctx = reinterpret_cast(_stream_ctx); + auto *stream_ctx = reinterpret_cast(_stream_ctx); if (auto writing = qtils::optionTake(stream_ctx->writing)) { writing.value()(); } @@ -187,7 +187,7 @@ namespace libp2p::transport::lsquic { api.ea_packets_out = +[](void *void_self, const lsquic_out_spec *out_spec, unsigned n_packets_out) { - auto self = static_cast(void_self); + auto *self = static_cast(void_self); // https://github.com/cbodley/nexus/blob/d1d8486f713fd089917331239d755932c7c8ed8e/src/socket.cc#L218 int r = 0; for (auto &spec : std::span{out_spec, n_packets_out}) { @@ -224,7 +224,7 @@ namespace libp2p::transport::lsquic { }; api.ea_packets_out_ctx = this; api.ea_get_ssl_ctx = +[](void *void_self, const sockaddr *) { - auto self = static_cast(void_self); + auto *self = static_cast(void_self); return self->ssl_context_->native_handle(); }; @@ -321,6 +321,9 @@ namespace libp2p::transport::lsquic { if (not stream) { continue; } + if (stream->stream_ctx_ == nullptr) { + continue; + } if (stream->stream_ctx_->ls_stream == nullptr) { continue; } diff --git a/test/libp2p/transport/quic_test.cpp b/test/libp2p/transport/quic_test.cpp index ba81fde46..293092671 100644 --- a/test/libp2p/transport/quic_test.cpp +++ b/test/libp2p/transport/quic_test.cpp @@ -9,7 +9,6 @@ #include #include #include -#include #include "testutil/prepare_loggers.hpp" @@ -19,8 +18,6 @@ using libp2p::Multiaddress; using libp2p::StreamAndProtocol; using libp2p::StreamAndProtocolOrError; using libp2p::connection::Stream; -using qtils::byte2str; -using qtils::str2byte; auto makeInjector(std::shared_ptr io) { return libp2p::injector::makeHostInjector< @@ -54,7 +51,9 @@ struct Peer { TEST(Quic, Test) { testutil::prepareLoggers(); std::string protocol = "/test"; - std::string_view req{"request"}, res{"response"}; + const size_t size = 1 << 20; + libp2p::Bytes req(size, 'a'), res(size, 'b'); + auto io = std::make_shared(); auto run = [&] { io->restart(); @@ -94,16 +93,16 @@ TEST(Quic, Test) { } wait_count = 2; - qtils::Bytes req_out(req.size()); - libp2p::write(client.stream, str2byte(req), RW_CB); + libp2p::Bytes req_out(req.size()); + libp2p::write(client.stream, req, RW_CB); libp2p::read(server.stream, req_out, RW_CB); run(); - EXPECT_EQ(byte2str(req_out), req); + EXPECT_EQ(req_out, req); wait_count = 2; - qtils::Bytes res_out(res.size()); + libp2p::Bytes res_out(res.size()); libp2p::read(client.stream, res_out, RW_CB); - libp2p::write(server.stream, str2byte(res), RW_CB); + libp2p::write(server.stream, res, RW_CB); run(); - EXPECT_EQ(byte2str(res_out), res); + EXPECT_EQ(res_out, res); }