From ee8b8517bab6bab3296721451a62a44f86d12d7d Mon Sep 17 00:00:00 2001 From: Christian Kothe Date: Thu, 5 Nov 2020 12:47:31 -0800 Subject: [PATCH 01/14] Replaced boost::spsc_queue by a fast SPMC queue - pop_sample is now lockfree again, except if buffer full or empty and timeout is used - queue is much less of a bottleneck now --- src/common.h | 28 ++++++++ src/consumer_queue.cpp | 59 +++++++--------- src/consumer_queue.h | 151 +++++++++++++++++++++++++++++++++++------ 3 files changed, 182 insertions(+), 56 deletions(-) diff --git a/src/common.h b/src/common.h index 1a3d997bf..080f0aacb 100644 --- a/src/common.h +++ b/src/common.h @@ -30,6 +30,34 @@ extern "C" { "Please do not compile this with a lslboost version older than 1.45 because the library would otherwise not be protocol-compatible with builds using other versions." #endif +// size of a cache line +#if defined(__s390__) || defined(__s390x__) +#define CACHELINE_BYTES 256 +#elif defined(powerpc) || defined(__powerpc__) || defined(__ppc__) +#define CACHELINE_BYTES 128 +#else +#define CACHELINE_BYTES 64 +#endif + +// force-inline the given function, if possible +#if defined(__clang__) || defined(__GNUC__) +#define FORCEINLINE __attribute__((always_inline)) +#elif defined _MSC_VER +#define FORCEINLINE __forceinline +#else +#define FORCEINLINE inline +#endif + +// compiler hint that the given expression is likely or unlikely +// (e.g., in conditional statements) +#if defined(__clang__) || defined(__GNUC__) +#define LIKELY(x) __builtin_expect(!!(x), 1) +#define UNLIKELY(x) __builtin_expect(!!(x), 0) +#else +#define LIKELY(x) (x) +#define UNLIKELY(x) (x) +#endif + // the highest supported protocol version // * 100 is the original version, supported by library versions 1.00+ // * 110 is an alternative protocol that improves throughput, supported by library versions 1.10+ diff --git a/src/consumer_queue.cpp b/src/consumer_queue.cpp index 6869de5ec..53d13891f 100644 --- a/src/consumer_queue.cpp +++ b/src/consumer_queue.cpp @@ -1,5 +1,5 @@ #include "consumer_queue.h" -#include "sample.h" +#include "common.h" #include "send_buffer.h" #include #include @@ -7,8 +7,17 @@ using namespace lsl; -consumer_queue::consumer_queue(std::size_t max_capacity, send_buffer_p registry) - : registry_(std::move(registry)), buffer_(max_capacity) { +consumer_queue::consumer_queue(std::size_t size, send_buffer_p registry) + : registry_(std::move(registry)), buffer_(new item_t[size]), size_(size), + // largest integer at which we can wrap correctly + wrap_at_(std::numeric_limits::max() - size - + std::numeric_limits::max() % size) { + assert(size_ > 1); + for (std::size_t i = 0; i < size_; ++i) + buffer_[i].seq_state.store(i, std::memory_order_release); + write_idx_.store(0, std::memory_order_release); + read_idx_.store(0, std::memory_order_release); + done_sync_.store(false, std::memory_order_release); if (registry_) registry_->register_consumer(this); } @@ -20,41 +29,23 @@ consumer_queue::~consumer_queue() { "Unexpected error while trying to unregister a consumer queue from its registry: %s", e.what()); } -} - -void consumer_queue::push_sample(const sample_p &sample) { - // push a sample, dropping the oldest sample if the queue ist already full. - // During this operation the producer becomes a second consumer, i.e., a case - // where the underlying spsc queue isn't thread-safe) so the mutex is locked. - std::lock_guard lk(mut_); - while (!buffer_.push(sample)) { - buffer_.pop(); - } - cv_.notify_one(); -} - -sample_p consumer_queue::pop_sample(double timeout) { - sample_p result; - if (timeout <= 0.0) { - std::lock_guard lk(mut_); - buffer_.pop(result); - } else { - std::unique_lock lk(mut_); - if (!buffer_.pop(result)) { - // wait for a new sample until the thread calling push_sample delivers one and sends a - // notification, or until timeout - std::chrono::duration sec(timeout); - cv_.wait_for(lk, sec, [&]{ return this->buffer_.pop(result); }); - } - } - return result; + delete[] buffer_; } uint32_t consumer_queue::flush() noexcept { - std::lock_guard lk(mut_); uint32_t n = 0; - while (buffer_.pop()) n++; + while (try_pop()) n++; return n; } -bool consumer_queue::empty() { return buffer_.empty(); } +std::size_t consumer_queue::read_available() const { + std::size_t write_index = write_idx_.load(std::memory_order_acquire); + std::size_t read_index = read_idx_.load(std::memory_order_relaxed); + if (write_index >= read_index) return write_index - read_index; + const std::size_t ret = write_index + size_ - read_index; + return ret; +} + +bool consumer_queue::empty() const { + return write_idx_.load(std::memory_order_acquire) == read_idx_.load(std::memory_order_relaxed); +} diff --git a/src/consumer_queue.h b/src/consumer_queue.h index a84b26b2f..3e8d2694c 100644 --- a/src/consumer_queue.h +++ b/src/consumer_queue.h @@ -2,60 +2,167 @@ #define CONSUMER_QUEUE_H #include "common.h" -#include "forward.h" -#include +#include "sample.h" +#include #include #include +#include namespace lsl { /** - * A thread-safe producer-consumer queue of unread samples. + * A thread-safe producer/consumer queue of unread samples. * - * Erases the oldest samples if max capacity is exceeded. Implemented as a circular buffer. + * Erases the oldest samples if max capacity is exceeded. Implemented as a ring buffer (wait-free + * unless the buffer is full or empty). */ class consumer_queue { - using buffer_type = lslboost::lockfree::spsc_queue; - public: /** * Create a new queue with a given capacity. - * @param max_capacity The maximum number of samples that can be held by the queue. Beyond that, + * @param size The maximum number of samples that can be held by the queue. Beyond that, * the oldest samples are dropped. * @param registry Optionally a pointer to a registration facility, for multiple-reader * arrangements. */ - consumer_queue(std::size_t max_capacity, send_buffer_p registry = send_buffer_p()); + explicit consumer_queue(std::size_t size, send_buffer_p registry = send_buffer_p()); /// Destructor. Unregisters from the send buffer, if any. ~consumer_queue(); - /// Push a new sample onto the queue. - void push_sample(const sample_p &sample); + /** + * Push a new sample onto the queue. Can only be called by one thread (single-producer). + * This deletes the oldest sample if the max capacity is exceeded. + */ + template void push_sample(T &&sample) { + while (!try_push(std::forward(sample))) { + // buffer full, drop oldest sample + if (not done_sync_.load(std::memory_order_acquire)) { + // synchronizes-with store to done_sync_ in ctor + std::atomic_thread_fence(std::memory_order_acquire); + done_sync_.store(true, std::memory_order_release); + } + try_pop(); + } + { + // ensure that notify_one doesn't happen in between try_pop and wait_for + std::lock_guard lk(mut_); + cv_.notify_one(); + } + } /** - * Pop a sample from the queue. - * Blocks if empty. + * Pop a sample from the queue. Can be called by multiple threads (multi-consumer). + * Blocks if empty and if a nonzero timeout is used. * @param timeout Timeout for the blocking, in seconds. If expired, an empty sample is returned. */ - sample_p pop_sample(double timeout = FOREVER); + sample_p pop_sample(double timeout = FOREVER) { + sample_p result; + bool success = try_pop(result); + if (!success && timeout > 0.0) { + // only acquire mutex if we have to do a blocking wait with timeout + std::chrono::duration sec(timeout); + std::unique_lock lk(mut_); + if (!try_pop(result)) cv_.wait_for(lk, sec, [&] { return this->try_pop(result); }); + } + return result; + } - /// Number of available samples - std::size_t read_available() const { return buffer_.read_available(); } + /// Number of available samples. This is approximate unless called by the thread calling the + /// pop_sample(). + std::size_t read_available() const; - /// Flush the queue, return the number of dropped samples + /// Flush the queue, return the number of dropped samples. uint32_t flush() noexcept; - /// Check whether the buffer is empty. - bool empty(); + /// Check whether the buffer is empty. This is approximate unless called by the thread calling + /// the pop_sample(). + bool empty() const; consumer_queue(const consumer_queue&) = delete; + consumer_queue(consumer_queue &&) = delete; consumer_queue& operator=(const consumer_queue&) = delete; + consumer_queue &operator=(consumer_queue &&) = delete; private: - send_buffer_p registry_; // optional consumer registry - buffer_type buffer_; // the sample buffer - std::mutex mut_; // mutex for cond var (also to protect queue at buffer overflow) - std::condition_variable cv_; // to allow for blocking wait by consumer + // an item stored in the queue + struct item_t { + std::atomic seq_state; + sample_p value; + }; + + // Push a new element to the queue. + // Returns true if successful or false if queue full. + template bool try_push(T &&sample) { + std::size_t write_index = write_idx_.load(std::memory_order_acquire); + std::size_t next_idx = add_wrap(write_index, 1); + item_t &item = buffer_[write_index % size_]; + if (UNLIKELY(write_index != item.seq_state.load(std::memory_order_acquire))) + return false; // item currently occupied, queue full + write_idx_.store(next_idx, std::memory_order_release); + copy_or_move(item.value, std::forward(sample)); + item.seq_state.store(next_idx, std::memory_order_release); + return true; + } + + // Pop an element from the queue (can be called with zero or one argument). Returns true if + // successful or false if queue is empty. Uses the same method as Vyukov's bounded MPMC queue. + template bool try_pop(T &...result) { + item_t *item; + std::size_t read_index = read_idx_.load(std::memory_order_relaxed); + for (;;) { + item = &buffer_[read_index % size_]; + const std::size_t seq_state = item->seq_state.load(std::memory_order_acquire); + const std::size_t next_idx = add_wrap(read_index, 1); + // check if the item is ok to pop + if (LIKELY(seq_state == next_idx)) { + // yes, try to claim slot using CAS + if (LIKELY(read_idx_.compare_exchange_weak( + read_index, next_idx, std::memory_order_relaxed))) + break; + } else if (LIKELY(seq_state == read_index)) + return false; // queue empty + else + // we're behind or ahead of another pop, try again + read_index = read_idx_.load(std::memory_order_relaxed); + } + move_or_drop(item->value, result...); + // mark item as free for next pass + item->seq_state.store(add_wrap(read_index, size_), std::memory_order_release); + return true; + } + + // helper to either copy or move a value, depending on whether it's an rvalue ref + inline static void copy_or_move(sample_p &dst, const sample_p &src) { dst = src; } + inline static void copy_or_move(sample_p &dst, sample_p &&src) { dst = std::move(src); } + // helper to either move or drop a value, depending on whether a dst argument is given + inline static void move_or_drop(sample_p & /*unused*/) {} + inline static void move_or_drop(sample_p &src, sample_p &dst) { dst = std::move(src); } + // helper to add a delta to the given index and wrap correctly + FORCEINLINE std::size_t add_wrap(std::size_t x, std::size_t delta) const { + const std::size_t xp = x + delta; + return xp >= wrap_at_ ? xp - wrap_at_ : xp; + } + + /// optional consumer registry + send_buffer_p registry_; + /// the sample buffer + item_t *buffer_; + /// max number of elements in the queue + const std::size_t size_; + /// threshold at which to wrap read/write indices + const std::size_t wrap_at_; + // whether we have performed a sync on the data stored by the constructor + alignas(CACHELINE_BYTES) std::atomic done_sync_; + /// current write position + alignas(CACHELINE_BYTES) std::atomic write_idx_; + /// current read position + alignas(CACHELINE_BYTES) std::atomic read_idx_; + /// whether a pop() is waiting + alignas(CACHELINE_BYTES) std::atomic has_waiter_; + /// for use with the condition variable + std::mutex mut_; + /// condition for waiting with timeout + std::condition_variable cv_; }; } // namespace lsl From cbeec376fe5ee3ee7ceb7d30adecf78c421abae3 Mon Sep 17 00:00:00 2001 From: Christian Kothe Date: Thu, 5 Nov 2020 13:44:45 -0800 Subject: [PATCH 02/14] Fixed syntax error caught by windows CI build --- src/consumer_queue.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/consumer_queue.h b/src/consumer_queue.h index 3e8d2694c..be6c41575 100644 --- a/src/consumer_queue.h +++ b/src/consumer_queue.h @@ -36,7 +36,7 @@ class consumer_queue { template void push_sample(T &&sample) { while (!try_push(std::forward(sample))) { // buffer full, drop oldest sample - if (not done_sync_.load(std::memory_order_acquire)) { + if (!done_sync_.load(std::memory_order_acquire)) { // synchronizes-with store to done_sync_ in ctor std::atomic_thread_fence(std::memory_order_acquire); done_sync_.store(true, std::memory_order_release); From b56f9975da8dc1b07ccba75b441270100821a4a4 Mon Sep 17 00:00:00 2001 From: Christian Kothe Date: Thu, 5 Nov 2020 13:56:49 -0800 Subject: [PATCH 03/14] Removed unused variable --- src/consumer_queue.h | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/consumer_queue.h b/src/consumer_queue.h index be6c41575..cfdef0f26 100644 --- a/src/consumer_queue.h +++ b/src/consumer_queue.h @@ -157,8 +157,6 @@ class consumer_queue { alignas(CACHELINE_BYTES) std::atomic write_idx_; /// current read position alignas(CACHELINE_BYTES) std::atomic read_idx_; - /// whether a pop() is waiting - alignas(CACHELINE_BYTES) std::atomic has_waiter_; /// for use with the condition variable std::mutex mut_; /// condition for waiting with timeout From ace6c921011f272d05f9d8c6f61281f06d8851f1 Mon Sep 17 00:00:00 2001 From: Christian Kothe Date: Fri, 6 Nov 2020 12:50:51 -0800 Subject: [PATCH 04/14] move_or_drop should destroy sample_p on drop --- src/consumer_queue.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/consumer_queue.h b/src/consumer_queue.h index cfdef0f26..c72eeba3b 100644 --- a/src/consumer_queue.h +++ b/src/consumer_queue.h @@ -135,7 +135,7 @@ class consumer_queue { inline static void copy_or_move(sample_p &dst, const sample_p &src) { dst = src; } inline static void copy_or_move(sample_p &dst, sample_p &&src) { dst = std::move(src); } // helper to either move or drop a value, depending on whether a dst argument is given - inline static void move_or_drop(sample_p & /*unused*/) {} + inline static void move_or_drop(sample_p &src) { src.~sample_p(); } inline static void move_or_drop(sample_p &src, sample_p &dst) { dst = std::move(src); } // helper to add a delta to the given index and wrap correctly FORCEINLINE std::size_t add_wrap(std::size_t x, std::size_t delta) const { From bead2f33caad9664573bbe7945f96fe31ce8af19 Mon Sep 17 00:00:00 2001 From: Tristan Stenner Date: Wed, 7 Jul 2021 15:26:40 +0200 Subject: [PATCH 05/14] Add optimized variant for the common `add_wrap(x, 1)` case --- src/consumer_queue.h | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/consumer_queue.h b/src/consumer_queue.h index c72eeba3b..61b993e22 100644 --- a/src/consumer_queue.h +++ b/src/consumer_queue.h @@ -94,7 +94,7 @@ class consumer_queue { // Returns true if successful or false if queue full. template bool try_push(T &&sample) { std::size_t write_index = write_idx_.load(std::memory_order_acquire); - std::size_t next_idx = add_wrap(write_index, 1); + std::size_t next_idx = add1_wrap(write_index); item_t &item = buffer_[write_index % size_]; if (UNLIKELY(write_index != item.seq_state.load(std::memory_order_acquire))) return false; // item currently occupied, queue full @@ -112,7 +112,7 @@ class consumer_queue { for (;;) { item = &buffer_[read_index % size_]; const std::size_t seq_state = item->seq_state.load(std::memory_order_acquire); - const std::size_t next_idx = add_wrap(read_index, 1); + const std::size_t next_idx = add1_wrap(read_index); // check if the item is ok to pop if (LIKELY(seq_state == next_idx)) { // yes, try to claim slot using CAS @@ -137,12 +137,18 @@ class consumer_queue { // helper to either move or drop a value, depending on whether a dst argument is given inline static void move_or_drop(sample_p &src) { src.~sample_p(); } inline static void move_or_drop(sample_p &src, sample_p &dst) { dst = std::move(src); } - // helper to add a delta to the given index and wrap correctly + + /// helper to add a delta to the given index and wrap correctly FORCEINLINE std::size_t add_wrap(std::size_t x, std::size_t delta) const { const std::size_t xp = x + delta; return xp >= wrap_at_ ? xp - wrap_at_ : xp; } + /// helper to increment the given index, wrapping it if necessary + inline std::size_t add1_wrap(std::size_t x) const { + return ++x == wrap_at_ ? 0 : x; + } + /// optional consumer registry send_buffer_p registry_; /// the sample buffer From 559a0b10e577b11ded327845e43b3fb1dbfd70c4 Mon Sep 17 00:00:00 2001 From: Tristan Stenner Date: Wed, 7 Jul 2021 17:22:13 +0200 Subject: [PATCH 06/14] Add missing #include --- src/send_buffer.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/send_buffer.cpp b/src/send_buffer.cpp index 4977fcb22..e28231102 100644 --- a/src/send_buffer.cpp +++ b/src/send_buffer.cpp @@ -1,5 +1,6 @@ #include "send_buffer.h" #include "consumer_queue.h" +#include #include #include From b044b4910ed647b0d2a4a70d8faa9b2275db1b9a Mon Sep 17 00:00:00 2001 From: Chadwick Boulay Date: Fri, 3 Sep 2021 17:47:49 -0400 Subject: [PATCH 07/14] More options in the cpp Receive/Send DataInChunks for testing. --- examples/ReceiveDataInChunks.cpp | 35 ++++++++------ examples/SendDataInChunks.cpp | 79 ++++++++++++++++++++++++++------ 2 files changed, 87 insertions(+), 27 deletions(-) diff --git a/examples/ReceiveDataInChunks.cpp b/examples/ReceiveDataInChunks.cpp index 5ff2fd678..336385cb5 100644 --- a/examples/ReceiveDataInChunks.cpp +++ b/examples/ReceiveDataInChunks.cpp @@ -3,24 +3,33 @@ #include -// define a packed sample struct (here a stereo sample). -#pragma pack(1) -struct stereo_sample { - int16_t l, r; -}; - -int main(int, char *[]) { +int main(int argc, char **argv) { try { + std::string name{argc > 1 ? argv[1] : "MyAudioStream"}; // resolve the stream of interest & make an inlet - lsl::stream_inlet inlet(lsl::resolve_stream("name", "MyAudioStream").at(0)); + lsl::stream_inlet inlet(lsl::resolve_stream("name", name).at(0)); + + inlet.flush(); - // and retrieve the chunks (note: this can of course also be done with pure std::vectors - // instead of stereo_samples) + double starttime = lsl::local_clock(), next_display = starttime + 1; + + // and retrieve the chunks + uint64_t k = 0, num_samples = 0; while (true) { - std::vector result; - if (double timestamp = inlet.pull_chunk_numeric_structs(result)) - std::cout << timestamp << std::endl; // only showing the time stamps here + std::vector < std::vector > result; + if (double timestamp = inlet.pull_chunk(result)) + num_samples += result.size(); + k++; + + // display code + if (k % 50 == 0) { + double now = lsl::local_clock(); + if (now > next_display) { + std::cout << num_samples / (now - starttime) << " samples/sec" << std::endl; + next_display = now + 1; + } + } } } catch (std::exception &e) { std::cerr << "Got an exception: " << e.what() << std::endl; } diff --git a/examples/SendDataInChunks.cpp b/examples/SendDataInChunks.cpp index 2ffce9a4b..7c55d27f4 100644 --- a/examples/SendDataInChunks.cpp +++ b/examples/SendDataInChunks.cpp @@ -1,7 +1,10 @@ #include #include +#include #include #include +#include +#include // define a packed sample struct (here: a 16 bit stereo sample). @@ -10,31 +13,79 @@ struct stereo_sample { int16_t l, r; }; +// fill buffer with data from device -- Normally your device SDK would provide such a function. Here we use a random number generator. +void get_data_from_device(std::vector> buffer, uint64_t &sample_counter) { + static std::uniform_int_distribution distribution( + std::numeric_limits::min(), std::numeric_limits::max()); + static std::default_random_engine generator; + + if (buffer[0].size() == 2) { + // If there are only 2 channels then we'll do a sine wave, pretending this is an audio device. + for (auto &frame : buffer) { + frame[0] = static_cast(100 * sin(sample_counter / 200.)); + frame[1] = static_cast(120 * sin(sample_counter / 400.)); + sample_counter++; + } + } + else { + for (auto &frame : buffer) { + for (std::size_t chan_idx = 0; chan_idx < frame.size(); ++chan_idx) { + frame[chan_idx] = distribution(generator); + } + sample_counter++; + } + } +} + int main(int argc, char **argv) { + std::cout << "SendDataInChunks" << std::endl; + std::cout << "SendDataInChunks StreamName StreamType samplerate n_channels max_buffered chunk_rate" << std::endl; + std::cout << "- max_buffered -- duration in sec (or x100 samples if samplerate is 0) to buffer for each outlet" << std::endl; + std::cout << "- chunk_rate -- number of chunks pushed per second. For this example, make it a common factor of samplingrate and 1000." << std::endl; + std::string name{argc > 1 ? argv[1] : "MyAudioStream"}, type{argc > 2 ? argv[2] : "Audio"}; int samplingrate = argc > 3 ? std::stol(argv[3]) : 44100; + int n_channels = argc > 4 ? std::stol(argv[4]) : 2; + int32_t max_buffered = argc > 5 ? std::stol(argv[5]) : 360; + int32_t chunk_rate = argc > 6 ? std::stol(argv[6]) : 10; // Chunks per second. + int32_t chunk_samples = samplingrate > 0 ? std::max((samplingrate / chunk_rate), 1) : 100; // Samples per chunk. + int32_t chunk_duration = 1000 / chunk_rate; // Milliseconds per chunk + try { - // make a new stream_info (44.1Khz, 16bit, audio, 2 channels) and open an outlet with it - lsl::stream_info info(name, type, 2, samplingrate, lsl::cf_int16); - lsl::stream_outlet outlet(info); + lsl::stream_info info(name, type, n_channels, samplingrate, lsl::cf_int16); + lsl::stream_outlet outlet(info, 0, max_buffered); + lsl::xml_element desc = info.desc(); + desc.append_child_value("manufacturer", "LSL"); + lsl::xml_element chns = desc.append_child("channels"); + for (int c = 0; c < n_channels; c++) { + lsl::xml_element chn = chns.append_child("channel"); + chn.append_child_value("label", "Chan-" + std::to_string(c)); + chn.append_child_value("unit", "microvolts"); + chn.append_child_value("type", "EEG"); + } + + // Prepare buffer to get data from 'device' + std::vector> chunk_buffer( + chunk_samples, + std::vector(n_channels)); std::cout << "Now sending data..." << std::endl; + + // Your device might have its own timer. Or you can decide how often to poll + // your device, as we do here. auto nextsample = std::chrono::high_resolution_clock::now(); - std::vector mychunk(info.nominal_srate() / 10); - int phase = 0; + uint64_t sample_counter = 0; for (unsigned c = 0;; c++) { - // wait a bit and generate a chunk of random data - nextsample += std::chrono::milliseconds(100); + + // wait a bit + nextsample += std::chrono::milliseconds(chunk_duration); std::this_thread::sleep_until(nextsample); - for (stereo_sample &sample : mychunk) { - sample.l = static_cast(100 * sin(phase / 200.)); - sample.r = static_cast(120 * sin(phase / 400.)); - phase++; - } + // Get data from device + get_data_from_device(chunk_buffer, sample_counter); - // send it - outlet.push_chunk_numeric_structs(mychunk); + // send it to the outlet + outlet.push_chunk(chunk_buffer); } } catch (std::exception &e) { std::cerr << "Got an exception: " << e.what() << std::endl; } From 85d7239485dd90af74777a6b9376a4bd2f6ae2ba Mon Sep 17 00:00:00 2001 From: Chadwick Boulay Date: Fri, 3 Sep 2021 22:18:49 -0400 Subject: [PATCH 08/14] Further tweak to Chunks examples. --- examples/ReceiveDataInChunks.cpp | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/examples/ReceiveDataInChunks.cpp b/examples/ReceiveDataInChunks.cpp index 336385cb5..210138ac6 100644 --- a/examples/ReceiveDataInChunks.cpp +++ b/examples/ReceiveDataInChunks.cpp @@ -4,15 +4,20 @@ int main(int argc, char **argv) { + std::cout << "ReceiveDataInChunks" << std::endl; + std::cout << "ReceiveDataInChunks StreamName max_buflen" << std::endl; + try { std::string name{argc > 1 ? argv[1] : "MyAudioStream"}; + int32_t max_buflen = argc > 2 ? std::stol(argv[2]) : 360; // resolve the stream of interest & make an inlet - lsl::stream_inlet inlet(lsl::resolve_stream("name", name).at(0)); + lsl::stream_inlet inlet(lsl::resolve_stream("name", name).at(0), max_buflen); inlet.flush(); - double starttime = lsl::local_clock(), next_display = starttime + 1; + double starttime = lsl::local_clock(), next_display = starttime + 1, + next_reset = starttime + 10; // and retrieve the chunks uint64_t k = 0, num_samples = 0; @@ -29,7 +34,13 @@ int main(int argc, char **argv) { std::cout << num_samples / (now - starttime) << " samples/sec" << std::endl; next_display = now + 1; } + if (now > next_reset) { std::cout << "Resetting counters..." << std::endl; + starttime = now; + next_reset = now + 10; + num_samples = 0; + } } + } } catch (std::exception &e) { std::cerr << "Got an exception: " << e.what() << std::endl; } From 8b0f5d246d1b47246ffd0cb22eaaa8865ef6e579 Mon Sep 17 00:00:00 2001 From: Chadwick Boulay Date: Fri, 3 Sep 2021 17:47:49 -0400 Subject: [PATCH 09/14] More options in the cpp Receive/Send DataInChunks for testing. --- examples/ReceiveDataInChunks.cpp | 35 ++++++++------ examples/SendDataInChunks.cpp | 79 ++++++++++++++++++++++++++------ 2 files changed, 87 insertions(+), 27 deletions(-) diff --git a/examples/ReceiveDataInChunks.cpp b/examples/ReceiveDataInChunks.cpp index 5ff2fd678..336385cb5 100644 --- a/examples/ReceiveDataInChunks.cpp +++ b/examples/ReceiveDataInChunks.cpp @@ -3,24 +3,33 @@ #include -// define a packed sample struct (here a stereo sample). -#pragma pack(1) -struct stereo_sample { - int16_t l, r; -}; - -int main(int, char *[]) { +int main(int argc, char **argv) { try { + std::string name{argc > 1 ? argv[1] : "MyAudioStream"}; // resolve the stream of interest & make an inlet - lsl::stream_inlet inlet(lsl::resolve_stream("name", "MyAudioStream").at(0)); + lsl::stream_inlet inlet(lsl::resolve_stream("name", name).at(0)); + + inlet.flush(); - // and retrieve the chunks (note: this can of course also be done with pure std::vectors - // instead of stereo_samples) + double starttime = lsl::local_clock(), next_display = starttime + 1; + + // and retrieve the chunks + uint64_t k = 0, num_samples = 0; while (true) { - std::vector result; - if (double timestamp = inlet.pull_chunk_numeric_structs(result)) - std::cout << timestamp << std::endl; // only showing the time stamps here + std::vector < std::vector > result; + if (double timestamp = inlet.pull_chunk(result)) + num_samples += result.size(); + k++; + + // display code + if (k % 50 == 0) { + double now = lsl::local_clock(); + if (now > next_display) { + std::cout << num_samples / (now - starttime) << " samples/sec" << std::endl; + next_display = now + 1; + } + } } } catch (std::exception &e) { std::cerr << "Got an exception: " << e.what() << std::endl; } diff --git a/examples/SendDataInChunks.cpp b/examples/SendDataInChunks.cpp index 2ffce9a4b..7c55d27f4 100644 --- a/examples/SendDataInChunks.cpp +++ b/examples/SendDataInChunks.cpp @@ -1,7 +1,10 @@ #include #include +#include #include #include +#include +#include // define a packed sample struct (here: a 16 bit stereo sample). @@ -10,31 +13,79 @@ struct stereo_sample { int16_t l, r; }; +// fill buffer with data from device -- Normally your device SDK would provide such a function. Here we use a random number generator. +void get_data_from_device(std::vector> buffer, uint64_t &sample_counter) { + static std::uniform_int_distribution distribution( + std::numeric_limits::min(), std::numeric_limits::max()); + static std::default_random_engine generator; + + if (buffer[0].size() == 2) { + // If there are only 2 channels then we'll do a sine wave, pretending this is an audio device. + for (auto &frame : buffer) { + frame[0] = static_cast(100 * sin(sample_counter / 200.)); + frame[1] = static_cast(120 * sin(sample_counter / 400.)); + sample_counter++; + } + } + else { + for (auto &frame : buffer) { + for (std::size_t chan_idx = 0; chan_idx < frame.size(); ++chan_idx) { + frame[chan_idx] = distribution(generator); + } + sample_counter++; + } + } +} + int main(int argc, char **argv) { + std::cout << "SendDataInChunks" << std::endl; + std::cout << "SendDataInChunks StreamName StreamType samplerate n_channels max_buffered chunk_rate" << std::endl; + std::cout << "- max_buffered -- duration in sec (or x100 samples if samplerate is 0) to buffer for each outlet" << std::endl; + std::cout << "- chunk_rate -- number of chunks pushed per second. For this example, make it a common factor of samplingrate and 1000." << std::endl; + std::string name{argc > 1 ? argv[1] : "MyAudioStream"}, type{argc > 2 ? argv[2] : "Audio"}; int samplingrate = argc > 3 ? std::stol(argv[3]) : 44100; + int n_channels = argc > 4 ? std::stol(argv[4]) : 2; + int32_t max_buffered = argc > 5 ? std::stol(argv[5]) : 360; + int32_t chunk_rate = argc > 6 ? std::stol(argv[6]) : 10; // Chunks per second. + int32_t chunk_samples = samplingrate > 0 ? std::max((samplingrate / chunk_rate), 1) : 100; // Samples per chunk. + int32_t chunk_duration = 1000 / chunk_rate; // Milliseconds per chunk + try { - // make a new stream_info (44.1Khz, 16bit, audio, 2 channels) and open an outlet with it - lsl::stream_info info(name, type, 2, samplingrate, lsl::cf_int16); - lsl::stream_outlet outlet(info); + lsl::stream_info info(name, type, n_channels, samplingrate, lsl::cf_int16); + lsl::stream_outlet outlet(info, 0, max_buffered); + lsl::xml_element desc = info.desc(); + desc.append_child_value("manufacturer", "LSL"); + lsl::xml_element chns = desc.append_child("channels"); + for (int c = 0; c < n_channels; c++) { + lsl::xml_element chn = chns.append_child("channel"); + chn.append_child_value("label", "Chan-" + std::to_string(c)); + chn.append_child_value("unit", "microvolts"); + chn.append_child_value("type", "EEG"); + } + + // Prepare buffer to get data from 'device' + std::vector> chunk_buffer( + chunk_samples, + std::vector(n_channels)); std::cout << "Now sending data..." << std::endl; + + // Your device might have its own timer. Or you can decide how often to poll + // your device, as we do here. auto nextsample = std::chrono::high_resolution_clock::now(); - std::vector mychunk(info.nominal_srate() / 10); - int phase = 0; + uint64_t sample_counter = 0; for (unsigned c = 0;; c++) { - // wait a bit and generate a chunk of random data - nextsample += std::chrono::milliseconds(100); + + // wait a bit + nextsample += std::chrono::milliseconds(chunk_duration); std::this_thread::sleep_until(nextsample); - for (stereo_sample &sample : mychunk) { - sample.l = static_cast(100 * sin(phase / 200.)); - sample.r = static_cast(120 * sin(phase / 400.)); - phase++; - } + // Get data from device + get_data_from_device(chunk_buffer, sample_counter); - // send it - outlet.push_chunk_numeric_structs(mychunk); + // send it to the outlet + outlet.push_chunk(chunk_buffer); } } catch (std::exception &e) { std::cerr << "Got an exception: " << e.what() << std::endl; } From 23ea03adf345876b4ef2f25ed27b3077359cca41 Mon Sep 17 00:00:00 2001 From: Chadwick Boulay Date: Fri, 3 Sep 2021 22:18:49 -0400 Subject: [PATCH 10/14] Further tweak to Chunks examples. --- examples/ReceiveDataInChunks.cpp | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/examples/ReceiveDataInChunks.cpp b/examples/ReceiveDataInChunks.cpp index 336385cb5..210138ac6 100644 --- a/examples/ReceiveDataInChunks.cpp +++ b/examples/ReceiveDataInChunks.cpp @@ -4,15 +4,20 @@ int main(int argc, char **argv) { + std::cout << "ReceiveDataInChunks" << std::endl; + std::cout << "ReceiveDataInChunks StreamName max_buflen" << std::endl; + try { std::string name{argc > 1 ? argv[1] : "MyAudioStream"}; + int32_t max_buflen = argc > 2 ? std::stol(argv[2]) : 360; // resolve the stream of interest & make an inlet - lsl::stream_inlet inlet(lsl::resolve_stream("name", name).at(0)); + lsl::stream_inlet inlet(lsl::resolve_stream("name", name).at(0), max_buflen); inlet.flush(); - double starttime = lsl::local_clock(), next_display = starttime + 1; + double starttime = lsl::local_clock(), next_display = starttime + 1, + next_reset = starttime + 10; // and retrieve the chunks uint64_t k = 0, num_samples = 0; @@ -29,7 +34,13 @@ int main(int argc, char **argv) { std::cout << num_samples / (now - starttime) << " samples/sec" << std::endl; next_display = now + 1; } + if (now > next_reset) { std::cout << "Resetting counters..." << std::endl; + starttime = now; + next_reset = now + 10; + num_samples = 0; + } } + } } catch (std::exception &e) { std::cerr << "Got an exception: " << e.what() << std::endl; } From ee36a9479f16c66b7007ff34a0fdfe3577179921 Mon Sep 17 00:00:00 2001 From: Chadwick Boulay Date: Mon, 20 Sep 2021 22:37:59 -0400 Subject: [PATCH 11/14] Further modified Send/Receive DataInChunks examples for better speed profiling. --- examples/ReceiveDataInChunks.cpp | 33 ++++++---- examples/SendDataInChunks.cpp | 108 +++++++++++++++++++++++-------- 2 files changed, 101 insertions(+), 40 deletions(-) diff --git a/examples/ReceiveDataInChunks.cpp b/examples/ReceiveDataInChunks.cpp index 210138ac6..afd03fc26 100644 --- a/examples/ReceiveDataInChunks.cpp +++ b/examples/ReceiveDataInChunks.cpp @@ -1,16 +1,19 @@ +#include #include #include #include +#include int main(int argc, char **argv) { std::cout << "ReceiveDataInChunks" << std::endl; - std::cout << "ReceiveDataInChunks StreamName max_buflen" << std::endl; + std::cout << "ReceiveDataInChunks StreamName max_buflen flush" << std::endl; try { std::string name{argc > 1 ? argv[1] : "MyAudioStream"}; int32_t max_buflen = argc > 2 ? std::stol(argv[2]) : 360; + bool flush = argc > 3; // resolve the stream of interest & make an inlet lsl::stream_inlet inlet(lsl::resolve_stream("name", name).at(0), max_buflen); @@ -21,26 +24,32 @@ int main(int argc, char **argv) { // and retrieve the chunks uint64_t k = 0, num_samples = 0; + std::vector> result; + auto fetch_interval = std::chrono::milliseconds(20); + auto next_fetch = std::chrono::steady_clock::now() + fetch_interval; + + while (true) { - std::vector < std::vector > result; - if (double timestamp = inlet.pull_chunk(result)) - num_samples += result.size(); + std::this_thread::sleep_until(next_fetch); + if (flush) { + // You almost certainly don't want to use flush. This is here so we + // can test maximum outlet throughput. + num_samples += inlet.flush(); + } else { + if (double timestamp = inlet.pull_chunk(result)) num_samples += result.size(); + } k++; - - // display code + next_fetch += fetch_interval; if (k % 50 == 0) { double now = lsl::local_clock(); - if (now > next_display) { - std::cout << num_samples / (now - starttime) << " samples/sec" << std::endl; - next_display = now + 1; - } - if (now > next_reset) { std::cout << "Resetting counters..." << std::endl; + std::cout << num_samples / (now - starttime) << " samples/sec" << std::endl; + if (now > next_reset) { + std::cout << "Resetting counters..." << std::endl; starttime = now; next_reset = now + 10; num_samples = 0; } } - } } catch (std::exception &e) { std::cerr << "Got an exception: " << e.what() << std::endl; } diff --git a/examples/SendDataInChunks.cpp b/examples/SendDataInChunks.cpp index 7c55d27f4..a274c76f0 100644 --- a/examples/SendDataInChunks.cpp +++ b/examples/SendDataInChunks.cpp @@ -5,6 +5,9 @@ #include #include #include +#ifndef M_PI +#define M_PI 3.14159265358979323846 +#endif // define a packed sample struct (here: a 16 bit stereo sample). @@ -13,29 +16,75 @@ struct stereo_sample { int16_t l, r; }; -// fill buffer with data from device -- Normally your device SDK would provide such a function. Here we use a random number generator. -void get_data_from_device(std::vector> buffer, uint64_t &sample_counter) { - static std::uniform_int_distribution distribution( - std::numeric_limits::min(), std::numeric_limits::max()); - static std::default_random_engine generator; - - if (buffer[0].size() == 2) { - // If there are only 2 channels then we'll do a sine wave, pretending this is an audio device. - for (auto &frame : buffer) { - frame[0] = static_cast(100 * sin(sample_counter / 200.)); - frame[1] = static_cast(120 * sin(sample_counter / 400.)); - sample_counter++; +struct fake_device { + /* + We create a fake device that will generate data. The inner details are not + so important because typically it will be up to the real data source + SDK + to provide a way to get data. + */ + std::size_t n_channels; + double srate; + int64_t pattern_samples; + int64_t head; + std::vector pattern; + std::chrono::steady_clock::time_point last_time; + + fake_device(const int16_t n_channels, const float srate) + : n_channels(n_channels), srate(srate), head(0) { + pattern_samples = (int64_t)(srate - 0.5) + 1; // truncate OK. + + // Pre-allocate entire test pattern. The data _could_ be generated on the fly + // for a much smaller memory hit, but we also use this example application + // to test LSL Outlet performance so we want to reduce out-of-LSL CPU + // utilization. + int64_t magnitude = std::numeric_limits::max(); + int64_t offset_0 = magnitude / 2; + int64_t offset_step = magnitude / n_channels; + pattern.reserve(pattern_samples * n_channels); + for (auto sample_ix = 0; sample_ix < pattern_samples; ++sample_ix) { + for (auto chan_ix = 0; chan_ix < n_channels; ++chan_ix) { + pattern.emplace_back( + offset_0 + chan_ix * offset_step + + magnitude * static_cast(sin(M_PI * chan_ix * sample_ix / n_channels))); + } } + last_time = std::chrono::high_resolution_clock::now(); + } + + std::vector get_data() { + auto now = std::chrono::steady_clock::now(); + auto elapsed_nano = + std::chrono::duration_cast(now - last_time).count(); + std::size_t elapsed_samples = std::size_t(elapsed_nano * srate * 1e-9); // truncate OK. + std::vector result; + result.resize(elapsed_samples * n_channels); + int64_t ret_samples = get_data(result); + std::vector output(result.begin(), result.begin() + ret_samples); + return output; } - else { - for (auto &frame : buffer) { - for (std::size_t chan_idx = 0; chan_idx < frame.size(); ++chan_idx) { - frame[chan_idx] = distribution(generator); + + std::size_t get_data(std::vector &buffer) { + auto now = std::chrono::steady_clock::now(); + auto elapsed_nano = + std::chrono::duration_cast(now - last_time).count(); + int64_t elapsed_samples = std::size_t(elapsed_nano * srate * 1e-9); // truncate OK. + elapsed_samples = std::min(elapsed_samples, (int64_t)buffer.size()); + if (false) { + // The fastest but no patterns. + memset(&buffer[0], 23, buffer.size() * sizeof buffer[0]); + } else { + std::size_t end_sample = head + elapsed_samples; + std::size_t nowrap_samples = std::min(pattern_samples - head, elapsed_samples); + memcpy(&buffer[0], &(pattern[head]), nowrap_samples); + if (end_sample > pattern_samples) { + memcpy(&buffer[nowrap_samples], &(pattern[0]), elapsed_samples - nowrap_samples); } - sample_counter++; } + head = (head + elapsed_samples) % pattern_samples; + last_time += std::chrono::nanoseconds(int64_t(1e9 * elapsed_samples / srate)); + return elapsed_samples; } -} +}; int main(int argc, char **argv) { std::cout << "SendDataInChunks" << std::endl; @@ -44,14 +93,15 @@ int main(int argc, char **argv) { std::cout << "- chunk_rate -- number of chunks pushed per second. For this example, make it a common factor of samplingrate and 1000." << std::endl; std::string name{argc > 1 ? argv[1] : "MyAudioStream"}, type{argc > 2 ? argv[2] : "Audio"}; - int samplingrate = argc > 3 ? std::stol(argv[3]) : 44100; - int n_channels = argc > 4 ? std::stol(argv[4]) : 2; + int samplingrate = argc > 3 ? std::stol(argv[3]) : 44100; // Here we specify srate, but typically this would come from the device. + int n_channels = argc > 4 ? std::stol(argv[4]) : 2; // Here we specify n_chans, but typically this would come from theh device. int32_t max_buffered = argc > 5 ? std::stol(argv[5]) : 360; int32_t chunk_rate = argc > 6 ? std::stol(argv[6]) : 10; // Chunks per second. int32_t chunk_samples = samplingrate > 0 ? std::max((samplingrate / chunk_rate), 1) : 100; // Samples per chunk. int32_t chunk_duration = 1000 / chunk_rate; // Milliseconds per chunk try { + // Prepare the LSL stream. lsl::stream_info info(name, type, n_channels, samplingrate, lsl::cf_int16); lsl::stream_outlet outlet(info, 0, max_buffered); lsl::xml_element desc = info.desc(); @@ -64,10 +114,12 @@ int main(int argc, char **argv) { chn.append_child_value("type", "EEG"); } - // Prepare buffer to get data from 'device' - std::vector> chunk_buffer( - chunk_samples, - std::vector(n_channels)); + // Create a connection to our device. + fake_device my_device(n_channels, (float)samplingrate); + + // Prepare buffer to get data from 'device'. + // The buffer should be largery than you think you need. Here we make it twice as large. + std::vector chunk_buffer(2 * chunk_samples * n_channels); std::cout << "Now sending data..." << std::endl; @@ -76,16 +128,16 @@ int main(int argc, char **argv) { auto nextsample = std::chrono::high_resolution_clock::now(); uint64_t sample_counter = 0; for (unsigned c = 0;; c++) { - // wait a bit nextsample += std::chrono::milliseconds(chunk_duration); std::this_thread::sleep_until(nextsample); // Get data from device - get_data_from_device(chunk_buffer, sample_counter); + std::size_t returned_samples = my_device.get_data(chunk_buffer); - // send it to the outlet - outlet.push_chunk(chunk_buffer); + // send it to the outlet. push_chunk_multiplexed is one of the more complicated approaches. + // other push_chunk methods are easier but slightly slower. + outlet.push_chunk_multiplexed(chunk_buffer.data(), returned_samples * n_channels, 0.0, true); } } catch (std::exception &e) { std::cerr << "Got an exception: " << e.what() << std::endl; } From 1ae37030a7780ca1c891f16108af7b0eef27185b Mon Sep 17 00:00:00 2001 From: Chadwick Boulay Date: Tue, 21 Sep 2021 00:01:03 -0400 Subject: [PATCH 12/14] Fix Linux build. --- examples/SendDataInChunks.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/examples/SendDataInChunks.cpp b/examples/SendDataInChunks.cpp index a274c76f0..6e2747f43 100644 --- a/examples/SendDataInChunks.cpp +++ b/examples/SendDataInChunks.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include #include @@ -48,7 +49,7 @@ struct fake_device { magnitude * static_cast(sin(M_PI * chan_ix * sample_ix / n_channels))); } } - last_time = std::chrono::high_resolution_clock::now(); + last_time = std::chrono::steady_clock::now(); } std::vector get_data() { From 8e04e0daf8ca1881b4e88abd66852bfb8cc833d8 Mon Sep 17 00:00:00 2001 From: Chadwick Boulay Date: Tue, 21 Sep 2021 06:39:07 -0400 Subject: [PATCH 13/14] Explicit stream opening in ReceiveDataInChunks --- examples/ReceiveDataInChunks.cpp | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/examples/ReceiveDataInChunks.cpp b/examples/ReceiveDataInChunks.cpp index afd03fc26..ec2f91e23 100644 --- a/examples/ReceiveDataInChunks.cpp +++ b/examples/ReceiveDataInChunks.cpp @@ -17,7 +17,15 @@ int main(int argc, char **argv) { // resolve the stream of interest & make an inlet lsl::stream_inlet inlet(lsl::resolve_stream("name", name).at(0), max_buflen); - inlet.flush(); + // Use set_postprocessing to get the timestamps in a common base clock. + // Do not use if this application will record timestamps to disk -- it is better to + // do posthoc synchronization. + inlet.set_postprocessing(lsl::post_ALL); + + // Inlet opening is implicit when doing pull_sample or pull_chunk. + // Here we open the stream explicitly because we might be doing + // `flush` only. + inlet.open_stream(); double starttime = lsl::local_clock(), next_display = starttime + 1, next_reset = starttime + 10; From 2a3cd09aea8b908a78e27dc3a2e7a527e308abfe Mon Sep 17 00:00:00 2001 From: Tristan Stenner Date: Wed, 22 Sep 2021 20:00:38 +0200 Subject: [PATCH 14/14] Avoid using common words as macro names --- src/common.h | 18 ------------------ src/consumer_queue.h | 14 ++++++++++++-- 2 files changed, 12 insertions(+), 20 deletions(-) diff --git a/src/common.h b/src/common.h index 8b1b74854..5d6614b3f 100644 --- a/src/common.h +++ b/src/common.h @@ -30,24 +30,6 @@ extern "C" { "Please do not compile this with a lslboost version older than 1.45 because the library would otherwise not be protocol-compatible with builds using other versions." #endif -// size of a cache line -#if defined(__s390__) || defined(__s390x__) -#define CACHELINE_BYTES 256 -#elif defined(powerpc) || defined(__powerpc__) || defined(__ppc__) -#define CACHELINE_BYTES 128 -#else -#define CACHELINE_BYTES 64 -#endif - -// force-inline the given function, if possible -#if defined(__clang__) || defined(__GNUC__) -#define FORCEINLINE __attribute__((always_inline)) -#elif defined _MSC_VER -#define FORCEINLINE __forceinline -#else -#define FORCEINLINE inline -#endif - // compiler hint that the given expression is likely or unlikely // (e.g., in conditional statements) #if defined(__clang__) || defined(__GNUC__) diff --git a/src/consumer_queue.h b/src/consumer_queue.h index 61b993e22..0f83ae7bd 100644 --- a/src/consumer_queue.h +++ b/src/consumer_queue.h @@ -9,6 +9,16 @@ #include namespace lsl { + +// size of a cache line +#if defined(__s390__) || defined(__s390x__) +constexpr int CACHELINE_BYTES = 256; +#elif defined(powerpc) || defined(__powerpc__) || defined(__ppc__) +constexpr int CACHELINE_BYTES = 128; +#else +constexpr int CACHELINE_BYTES = 64; +#endif + /** * A thread-safe producer/consumer queue of unread samples. * @@ -139,13 +149,13 @@ class consumer_queue { inline static void move_or_drop(sample_p &src, sample_p &dst) { dst = std::move(src); } /// helper to add a delta to the given index and wrap correctly - FORCEINLINE std::size_t add_wrap(std::size_t x, std::size_t delta) const { + inline std::size_t add_wrap(std::size_t x, std::size_t delta) const noexcept { const std::size_t xp = x + delta; return xp >= wrap_at_ ? xp - wrap_at_ : xp; } /// helper to increment the given index, wrapping it if necessary - inline std::size_t add1_wrap(std::size_t x) const { + inline std::size_t add1_wrap(std::size_t x) const noexcept { return ++x == wrap_at_ ? 0 : x; }