Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions CMakePresets.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"version": 2,
"configurePresets": [
{
"name": "vcpkg",
"generator": "Ninja",
"binaryDir": "${sourceDir}/build",
"cacheVariables": {
"CMAKE_TOOLCHAIN_FILE": "$env{VCPKG_ROOT}/scripts/buildsystems/vcpkg.cmake",
"CMAKE_BUILD_TYPE": "Debug"
}
}
]
}
18 changes: 9 additions & 9 deletions include/libp2p/transport/quic/engine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <lsquic.h>
#include <boost/asio/ip/udp.hpp>
#include <boost/asio/steady_timer.hpp>
#include <deque>
#include <libp2p/multi/multiaddress.hpp>
#include <libp2p/peer/peer_id.hpp>
#include <memory>
Expand Down Expand Up @@ -75,14 +76,9 @@ namespace libp2p::transport::lsquic {
Engine *engine;
lsquic_stream_t *ls_stream;
std::weak_ptr<QuicStream> stream{};
/**
* Stream read operation arguments.
*/
struct Reading {
BytesOut out;
std::function<void(outcome::result<size_t>)> cb;
};
std::optional<Reading> reading{};
std::optional<std::function<void()>> reading{};
std::optional<std::function<void()>> writing{};
bool want_flush = false;
};

using OnAccept = std::function<void(std::shared_ptr<QuicConnection>)>;
Expand Down Expand Up @@ -118,9 +114,11 @@ namespace libp2p::transport::lsquic {
void onAccept(OnAccept cb) {
on_accept_ = std::move(cb);
}
void process();
void wantProcess();
void wantFlush(StreamCtx *stream_ctx);

private:
void process();
void readLoop();

std::shared_ptr<boost::asio::io_context> io_context_;
Expand All @@ -134,6 +132,8 @@ namespace libp2p::transport::lsquic {
lsquic_engine_t *engine_ = nullptr;
OnAccept on_accept_;
bool started_ = false;
std::deque<std::weak_ptr<connection::QuicStream>> want_flush_;
bool want_process_ = false;
std::optional<Connecting> connecting_;
struct Reading {
static constexpr size_t kMaxUdpPacketSize = 64 << 10;
Expand Down
3 changes: 3 additions & 0 deletions include/libp2p/transport/quic/stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@ namespace libp2p::transport {
} // namespace libp2p::transport

namespace libp2p::transport::lsquic {
class Engine;
struct StreamCtx;
} // namespace libp2p::transport::lsquic

namespace libp2p::connection {
class QuicStream : public Stream,
public std::enable_shared_from_this<QuicStream> {
friend class libp2p::transport::lsquic::Engine;

public:
QuicStream(std::shared_ptr<transport::QuicConnection> conn,
transport::lsquic::StreamCtx *stream_ctx,
Expand Down
92 changes: 70 additions & 22 deletions src/transport/quic/engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ namespace libp2p::transport::lsquic {

lsquic_engine_settings settings{};
lsquic_engine_init_settings(&settings, flags);
settings.es_versions = 1 << LSQVER_I001;
settings.es_init_max_stream_data_bidi_remote =
mux_config.maximum_window_size;
settings.es_init_max_stream_data_bidi_local =
Expand All @@ -58,10 +59,10 @@ namespace libp2p::transport::lsquic {

static lsquic_stream_if stream_if{};
stream_if.on_new_conn = +[](void *void_self, lsquic_conn_t *conn) {
auto self = static_cast<Engine *>(void_self);
auto *self = static_cast<Engine *>(void_self);
auto op = qtils::optionTake(self->connecting_);
// NOLINTNEXTLINE(cppcoreguidelines-owning-memory)
auto conn_ctx = new ConnCtx{self, conn, std::move(op)};
auto *conn_ctx = new ConnCtx{self, conn, std::move(op)};
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
auto _conn_ctx = reinterpret_cast<lsquic_conn_ctx_t *>(conn_ctx);
lsquic_conn_set_ctx(conn, _conn_ctx);
Expand All @@ -72,7 +73,7 @@ namespace libp2p::transport::lsquic {
};
stream_if.on_conn_closed = +[](lsquic_conn_t *conn) {
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
auto conn_ctx = reinterpret_cast<ConnCtx *>(lsquic_conn_get_ctx(conn));
auto *conn_ctx = reinterpret_cast<ConnCtx *>(lsquic_conn_get_ctx(conn));
if (auto op = qtils::optionTake(conn_ctx->connecting)) {
op->cb(QuicError::CONN_CLOSED);
}
Expand All @@ -85,7 +86,7 @@ namespace libp2p::transport::lsquic {
};
stream_if.on_hsk_done = +[](lsquic_conn_t *conn, lsquic_hsk_status status) {
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
auto conn_ctx = reinterpret_cast<ConnCtx *>(lsquic_conn_get_ctx(conn));
auto *conn_ctx = reinterpret_cast<ConnCtx *>(lsquic_conn_get_ctx(conn));
auto self = conn_ctx->engine;
auto ok = status == LSQ_HSK_OK or status == LSQ_HSK_RESUMED_OK;
auto op = qtils::optionTake(conn_ctx->connecting);
Expand Down Expand Up @@ -122,12 +123,12 @@ namespace libp2p::transport::lsquic {
}
};
stream_if.on_new_stream = +[](void *void_self, lsquic_stream_t *stream) {
auto self = static_cast<Engine *>(void_self);
auto *self = static_cast<Engine *>(void_self);
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
auto conn_ctx = reinterpret_cast<ConnCtx *>(
auto *conn_ctx = reinterpret_cast<ConnCtx *>(
lsquic_conn_get_ctx(lsquic_stream_conn(stream)));
// NOLINTNEXTLINE(cppcoreguidelines-owning-memory)
auto stream_ctx = new StreamCtx{self, stream};
auto *stream_ctx = new StreamCtx{self, stream};
if (auto conn = conn_ctx->conn.lock()) {
auto stream = std::make_shared<QuicStream>(
conn, stream_ctx, conn_ctx->new_stream.has_value());
Expand All @@ -146,29 +147,36 @@ namespace libp2p::transport::lsquic {
stream_if.on_close =
+[](lsquic_stream_t *stream, lsquic_stream_ctx_t *_stream_ctx) {
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
auto stream_ctx = reinterpret_cast<StreamCtx *>(_stream_ctx);
if (auto op = qtils::optionTake(stream_ctx->reading)) {
op->cb(QuicError::STREAM_CLOSED);
}
auto *stream_ctx = reinterpret_cast<StreamCtx *>(_stream_ctx);
if (auto stream = stream_ctx->stream.lock()) {
stream->onClose();
}
if (auto reading = qtils::optionTake(stream_ctx->reading)) {
reading.value()();
}
if (auto writing = qtils::optionTake(stream_ctx->writing)) {
writing.value()();
}
// NOLINTNEXTLINE(cppcoreguidelines-owning-memory)
delete stream_ctx;
};
stream_if.on_read =
+[](lsquic_stream_t *stream, lsquic_stream_ctx_t *_stream_ctx) {
lsquic_stream_wantread(stream, 0);
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
auto stream_ctx = reinterpret_cast<StreamCtx *>(_stream_ctx);
auto op = qtils::optionTake(stream_ctx->reading).value();
auto n = lsquic_stream_read(stream, op.out.data(), op.out.size());
outcome::result<size_t> r = QuicError::STREAM_CLOSED;
if (n > 0) {
r = n;
auto *stream_ctx = reinterpret_cast<StreamCtx *>(_stream_ctx);
if (auto reading = qtils::optionTake(stream_ctx->reading)) {
reading.value()();
}
};
stream_if.on_write =
+[](lsquic_stream_t *stream, lsquic_stream_ctx_t *_stream_ctx) {
lsquic_stream_wantwrite(stream, 0);
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
auto *stream_ctx = reinterpret_cast<StreamCtx *>(_stream_ctx);
if (auto writing = qtils::optionTake(stream_ctx->writing)) {
writing.value()();
}
post(*stream_ctx->engine->io_context_,
[cb{std::move(op.cb)}, r] { cb(r); });
};

lsquic_engine_api api{};
Expand All @@ -179,7 +187,7 @@ namespace libp2p::transport::lsquic {
api.ea_packets_out = +[](void *void_self,
const lsquic_out_spec *out_spec,
unsigned n_packets_out) {
auto self = static_cast<Engine *>(void_self);
auto *self = static_cast<Engine *>(void_self);
// https://github.com/cbodley/nexus/blob/d1d8486f713fd089917331239d755932c7c8ed8e/src/socket.cc#L218
int r = 0;
for (auto &spec : std::span{out_spec, n_packets_out}) {
Expand Down Expand Up @@ -216,7 +224,7 @@ namespace libp2p::transport::lsquic {
};
api.ea_packets_out_ctx = this;
api.ea_get_ssl_ctx = +[](void *void_self, const sockaddr *) {
auto self = static_cast<Engine *>(void_self);
auto *self = static_cast<Engine *>(void_self);
return self->ssl_context_->native_handle();
};

Expand Down Expand Up @@ -261,7 +269,7 @@ namespace libp2p::transport::lsquic {
if (auto op = qtils::optionTake(connecting_)) {
op->cb(QuicError::CANT_CREATE_CONNECTION);
}
process();
wantProcess();
}

outcome::result<std::shared_ptr<QuicStream>> Engine::newStream(
Expand All @@ -281,7 +289,47 @@ namespace libp2p::transport::lsquic {
return stream;
}

void Engine::wantProcess() {
if (want_process_) {
return;
}
want_process_ = true;
boost::asio::post(*io_context_, [weak_self{weak_from_this()}] {
if (auto self = weak_self.lock()) {
self->process();
}
});
}

void Engine::wantFlush(StreamCtx *stream_ctx) {
if (stream_ctx->want_flush) {
return;
}
stream_ctx->want_flush = true;
if (stream_ctx->stream.expired()) {
return;
}
want_flush_.emplace_back(stream_ctx->stream);
wantProcess();
}

void Engine::process() {
want_process_ = false;
auto want_flush = std::exchange(want_flush_, {});
for (auto &weak_stream : want_flush) {
auto stream = weak_stream.lock();
if (not stream) {
continue;
}
if (stream->stream_ctx_ == nullptr) {
continue;
}
if (stream->stream_ctx_->ls_stream == nullptr) {
continue;
}
stream->stream_ctx_->want_flush = false;
lsquic_stream_flush(stream->stream_ctx_->ls_stream);
}
lsquic_engine_process_conns(engine_);
int us = 0;
if (not lsquic_engine_earliest_adv_tick(engine_, &us)) {
Expand Down
34 changes: 31 additions & 3 deletions src/transport/quic/stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,15 @@ namespace libp2p::connection {
}
auto n = lsquic_stream_read(stream_ctx_->ls_stream, out.data(), out.size());
if (n == -1 and errno == EWOULDBLOCK) {
stream_ctx_->reading.emplace(StreamCtx::Reading{out, std::move(cb)});
stream_ctx_->reading.emplace(
[weak_self{weak_from_this()}, out, cb{std::move(cb)}]() mutable {
auto self = weak_self.lock();
if (not self) {
cb(QuicError::STREAM_CLOSED);
return;
}
self->readSome(out, std::move(cb));
});
lsquic_stream_wantread(stream_ctx_->ls_stream, 1);
return;
}
Expand All @@ -54,11 +62,31 @@ namespace libp2p::connection {
if (not stream_ctx_) {
return cb(r);
}
if (stream_ctx_->writing) {
throw std::logic_error{"QuicStream::writeSome already in progress"};
}
// Missing from `lsquic_stream_write` documentation comment.
// Return value 0 means buffer is full.
// Call `lsquic_stream_wantwrite` and wait for `stream_if.on_write`
// callback, before calling `lsquic_stream_write` again.
auto n = lsquic_stream_write(stream_ctx_->ls_stream, in.data(), in.size());
if (n > 0 and lsquic_stream_flush(stream_ctx_->ls_stream) == 0) {
if (n == 0) {
stream_ctx_->writing.emplace(
[weak_self{weak_from_this()}, in, cb{std::move(cb)}]() mutable {
auto self = weak_self.lock();
if (not self) {
cb(QuicError::STREAM_CLOSED);
return;
}
self->writeSome(in, std::move(cb));
});
lsquic_stream_wantwrite(stream_ctx_->ls_stream, 1);
return;
}
if (n > 0) {
r = n;
stream_ctx_->engine->wantFlush(stream_ctx_);
}
stream_ctx_->engine->process();
deferReadCallback(r, std::move(cb));
}

Expand Down
19 changes: 9 additions & 10 deletions test/libp2p/transport/quic_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
#include <libp2p/basic/read.hpp>
#include <libp2p/basic/write.hpp>
#include <libp2p/injector/host_injector.hpp>
#include <qtils/bytestr.hpp>

#include "testutil/prepare_loggers.hpp"

Expand All @@ -19,8 +18,6 @@ using libp2p::Multiaddress;
using libp2p::StreamAndProtocol;
using libp2p::StreamAndProtocolOrError;
using libp2p::connection::Stream;
using qtils::byte2str;
using qtils::str2byte;

auto makeInjector(std::shared_ptr<io_context> io) {
return libp2p::injector::makeHostInjector<
Expand Down Expand Up @@ -54,7 +51,9 @@ struct Peer {
TEST(Quic, Test) {
testutil::prepareLoggers();
std::string protocol = "/test";
std::string_view req{"request"}, res{"response"};
const size_t size = 1 << 20;
libp2p::Bytes req(size, 'a'), res(size, 'b');

auto io = std::make_shared<io_context>();
auto run = [&] {
io->restart();
Expand Down Expand Up @@ -94,16 +93,16 @@ TEST(Quic, Test) {
}

wait_count = 2;
qtils::Bytes req_out(req.size());
libp2p::write(client.stream, str2byte(req), RW_CB);
libp2p::Bytes req_out(req.size());
libp2p::write(client.stream, req, RW_CB);
libp2p::read(server.stream, req_out, RW_CB);
run();
EXPECT_EQ(byte2str(req_out), req);
EXPECT_EQ(req_out, req);

wait_count = 2;
qtils::Bytes res_out(res.size());
libp2p::Bytes res_out(res.size());
libp2p::read(client.stream, res_out, RW_CB);
libp2p::write(server.stream, str2byte(res), RW_CB);
libp2p::write(server.stream, res, RW_CB);
run();
EXPECT_EQ(byte2str(res_out), res);
EXPECT_EQ(res_out, res);
}
9 changes: 1 addition & 8 deletions vcpkg-configuration.json
Original file line number Diff line number Diff line change
@@ -1,16 +1,9 @@
{
"default-registry": {
"kind": "git",
"baseline": "fe1cde61e971d53c9687cf9a46308f8f55da19fa",
"baseline": "897ba2ab4c4c776b985ab1f599548fcf3ae598ba",
"repository": "https://github.com/microsoft/vcpkg"
},
"registries": [
{
"kind": "artifact",
"location": "https://github.com/microsoft/vcpkg-ce-catalog/archive/refs/heads/main.zip",
"name": "microsoft"
}
],
"overlay-ports": [
"vcpkg-overlay"
]
Expand Down
Loading