Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/condy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "condy/sync_wait.hpp" // IWYU pragma: export
#include "condy/task.hpp" // IWYU pragma: export
#include "condy/version.hpp" // IWYU pragma: export
#include "condy/zcrx.hpp" // IWYU pragma: export

/**
* @brief The main namespace for the Condy library.
Expand Down
33 changes: 33 additions & 0 deletions include/condy/async_operations.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "condy/concepts.hpp"
#include "condy/condy_uring.hpp"
#include "condy/helpers.hpp"
#include "condy/zcrx.hpp"

namespace condy {

Expand Down Expand Up @@ -701,6 +702,38 @@ inline auto async_recv_multishot(Fd sockfd, Buffer &buf, int flags,
}
#endif

#if !IO_URING_CHECK_VERSION(2, 15) // >= 2.15

namespace detail {

inline void prep_recv_zc_multishot(io_uring_sqe *sqe, int fd,
uint32_t zcrx_id) {
io_uring_prep_rw(IORING_OP_RECV_ZC, sqe, fd, nullptr, 0, 0);
sqe->ioprio |= IORING_RECV_MULTISHOT;
sqe->zcrx_ifq_idx = zcrx_id;
}

} // namespace detail

// TODO: Consider the function signature later...
template <FdLike Fd, typename MultiShotFunc>
inline auto async_recv_multishot(Fd fd, ZeroCopyRxBufferPool &pool,
[[maybe_unused]] int flags,
MultiShotFunc &&func) {
auto zcrx_id = pool.zcrx_id();
auto prep_func = [=](Ring *ring) {
auto *sqe = ring->get_sqe();
detail::prep_recv_zc_multishot(sqe, fd, zcrx_id);
return sqe;
};
auto op = build_multishot_op_awaiter<
SelectBufferCQEHandler<ZeroCopyRxBufferPool>>(
std::move(prep_func), std::forward<MultiShotFunc>(func), &pool);
return detail::maybe_flag_fixed_fd(std::move(op), fd);
}

#endif

/**
* @brief See io_uring_prep_openat2
*/
Expand Down
63 changes: 63 additions & 0 deletions include/condy/buffers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <string>
#include <sys/mman.h>
#include <sys/uio.h>
#include <utility>
#include <vector>

namespace condy {
Expand Down Expand Up @@ -190,4 +191,66 @@ inline MutableBuffer buffer(std::span<PodType, N> sp) noexcept {
sp.size() * sizeof(PodType));
}

namespace detail {

template <typename BufferPool> struct ManagedBuffer : public BufferBase {
public:
ManagedBuffer() = default;
ManagedBuffer(void *data, size_t size, BufferPool *pool)
: data_(data), size_(size), pool_(pool) {}
ManagedBuffer(ManagedBuffer &&other) noexcept
: data_(std::exchange(other.data_, nullptr)),
size_(std::exchange(other.size_, 0)),
pool_(std::exchange(other.pool_, nullptr)) {}
ManagedBuffer &operator=(ManagedBuffer &&other) noexcept {
if (this != &other) {
reset();
data_ = std::exchange(other.data_, nullptr);
size_ = std::exchange(other.size_, 0);
pool_ = std::exchange(other.pool_, nullptr);
}
return *this;
}

~ManagedBuffer() { reset(); }

ManagedBuffer(const ManagedBuffer &) = delete;
ManagedBuffer &operator=(const ManagedBuffer &) = delete;

public:
/**
* @brief Get the data pointer of the buffer
*/
void *data() const noexcept { return data_; }

/** *
* @brief Get the size of the buffer
*/
size_t size() const noexcept { return size_; }

/**
* @brief Reset the buffer, returning it to the pool if owned
*/
void reset() noexcept {
if (pool_ != nullptr) {
pool_->add_buffer_back(data_, size_);
}
data_ = nullptr;
size_ = 0;
pool_ = nullptr;
}

/**
* @brief Check if the buffer owns a buffer from a pool.
*/
bool owns_buffer() const noexcept { return pool_ != nullptr; }

private:
void *data_ = nullptr;
size_t size_ = 0;
BufferPool *pool_ = nullptr;
};

} // namespace detail

} // namespace condy
4 changes: 3 additions & 1 deletion include/condy/cqe_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ struct SimpleCQEHandler {
* result of the operation (the value of `cqe->res`) and the selected buffer,
* whose type is determined by the buffer ring.
*/
template <BufferRingLike Br> class SelectBufferCQEHandler {
template <typename Br>
requires(requires(Br *br, io_uring_cqe *cqe) { br->handle_finish(cqe); })
class SelectBufferCQEHandler {
public:
SelectBufferCQEHandler(Br *buffers) : buffers_(buffers) {}

Expand Down
64 changes: 3 additions & 61 deletions include/condy/provided_buffers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,57 +227,7 @@ class BundledProvidedBufferPool;
* @note The lifetime of the provided buffer must not exceed the lifetime of the
* provided buffer pool it is associated with.
*/
struct ProvidedBuffer : public BufferBase {
public:
ProvidedBuffer() = default;
ProvidedBuffer(void *data, size_t size,
detail::BundledProvidedBufferPool *pool)
: data_(data), size_(size), pool_(pool) {}
ProvidedBuffer(ProvidedBuffer &&other) noexcept
: data_(std::exchange(other.data_, nullptr)),
size_(std::exchange(other.size_, 0)),
pool_(std::exchange(other.pool_, nullptr)) {}
ProvidedBuffer &operator=(ProvidedBuffer &&other) noexcept {
if (this != &other) {
reset();
data_ = std::exchange(other.data_, nullptr);
size_ = std::exchange(other.size_, 0);
pool_ = std::exchange(other.pool_, nullptr);
}
return *this;
}

~ProvidedBuffer() { reset(); }

ProvidedBuffer(const ProvidedBuffer &) = delete;
ProvidedBuffer &operator=(const ProvidedBuffer &) = delete;

public:
/**
* @brief Get the data pointer of the provided buffer
*/
void *data() const noexcept { return data_; }

/** *
* @brief Get the size of the provided buffer
*/
size_t size() const noexcept { return size_; }

/**
* @brief Reset the provided buffer, returning it to the pool if owned
*/
void reset() noexcept;

/**
* @brief Check if the provided buffer owns a buffer from a pool.
*/
bool owns_buffer() const noexcept { return pool_ != nullptr; }

private:
void *data_ = nullptr;
size_t size_ = 0;
detail::BundledProvidedBufferPool *pool_ = nullptr;
};
using ProvidedBuffer = detail::ManagedBuffer<detail::BundledProvidedBufferPool>;

namespace detail {

Expand Down Expand Up @@ -397,7 +347,8 @@ class BundledProvidedBufferPool {
return buffers;
}

void add_buffer_back(void *ptr) noexcept {
void add_buffer_back(void *ptr, [[maybe_unused]] size_t size) noexcept {
assert(size <= buffer_size_);
char *base = get_buffers_base_();
assert(ptr >= base);
size_t offset = static_cast<char *>(ptr) - base;
Expand Down Expand Up @@ -437,15 +388,6 @@ class BundledProvidedBufferPool {

} // namespace detail

inline void ProvidedBuffer::reset() noexcept {
if (pool_ != nullptr) {
pool_->add_buffer_back(data_);
}
data_ = nullptr;
size_ = 0;
pool_ = nullptr;
}

/**
* @brief Provided buffer pool.
* @details A provided buffer pool manages a pool of buffers that can be used in
Expand Down
6 changes: 6 additions & 0 deletions include/condy/utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,4 +227,10 @@ std::variant<Ts...> tuple_at(std::tuple<Ts...> &results, size_t idx) {
}
}

template <typename T> inline T align_up(T value, size_t alignment) noexcept {
// alignment must be a power of two
assert(alignment > 0 && (alignment & (alignment - 1)) == 0);
return (value + alignment - 1) & ~(alignment - 1);
}

} // namespace condy
Loading
Loading