Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
ee8b851
Replaced boost::spsc_queue by a fast SPMC queue
chkothe Nov 5, 2020
cbeec37
Fixed syntax error caught by windows CI build
chkothe Nov 5, 2020
b56f997
Removed unused variable
chkothe Nov 5, 2020
ace6c92
move_or_drop should destroy sample_p on drop
chkothe Nov 6, 2020
bead2f3
Add optimized variant for the common `add_wrap(x, 1)` case
tstenner Jul 7, 2021
559a0b1
Add missing #include
tstenner Jul 7, 2021
2d4c13e
Merge branch 'master' into HEAD
cboulay Sep 3, 2021
b11ebd0
Merge branch 'master' into spmc-queue
cboulay Sep 3, 2021
b044b49
More options in the cpp Receive/Send DataInChunks for testing.
cboulay Sep 3, 2021
85d7239
Further tweak to Chunks examples.
cboulay Sep 4, 2021
8b0f5d2
More options in the cpp Receive/Send DataInChunks for testing.
cboulay Sep 3, 2021
23ea03a
Further tweak to Chunks examples.
cboulay Sep 4, 2021
cfae3f6
Merge branch 'master' into spmc-queue
cboulay Sep 20, 2021
06541b6
Merge branch 'master' of https://github.com/sccn/liblsl
cboulay Sep 20, 2021
b708b78
Merge branch 'master' into spmc-queue
cboulay Sep 20, 2021
ee36a94
Further modified Send/Receive DataInChunks examples for better speed …
cboulay Sep 21, 2021
2475204
Merge branch 'spmc-queue' of https://github.com/sccn/liblsl into spmc…
cboulay Sep 21, 2021
1ae3703
Fix Linux build.
cboulay Sep 21, 2021
8e04e0d
Explicit stream opening in ReceiveDataInChunks
cboulay Sep 21, 2021
f8b1b49
Merge branch 'master' into spmc-queue
cboulay Sep 21, 2021
2a3cd09
Avoid using common words as macro names
tstenner Sep 22, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 49 additions & 12 deletions examples/ReceiveDataInChunks.cpp
Original file line number Diff line number Diff line change
@@ -1,26 +1,63 @@
#include <chrono>
#include <iostream>
#include <lsl_cpp.h>
#include <stdint.h>
#include <thread>


// define a packed sample struct (here a stereo sample).
#pragma pack(1)
struct stereo_sample {
int16_t l, r;
};
int main(int argc, char **argv) {
std::cout << "ReceiveDataInChunks" << std::endl;
std::cout << "ReceiveDataInChunks StreamName max_buflen flush" << std::endl;

int main(int, char *[]) {
try {

std::string name{argc > 1 ? argv[1] : "MyAudioStream"};
int32_t max_buflen = argc > 2 ? std::stol(argv[2]) : 360;
bool flush = argc > 3;
// resolve the stream of interest & make an inlet
lsl::stream_inlet inlet(lsl::resolve_stream("name", "MyAudioStream").at(0));
lsl::stream_inlet inlet(lsl::resolve_stream("name", name).at(0), max_buflen);

// Use set_postprocessing to get the timestamps in a common base clock.
// Do not use if this application will record timestamps to disk -- it is better to
// do posthoc synchronization.
inlet.set_postprocessing(lsl::post_ALL);

// Inlet opening is implicit when doing pull_sample or pull_chunk.
// Here we open the stream explicitly because we might be doing
// `flush` only.
inlet.open_stream();

double starttime = lsl::local_clock(), next_display = starttime + 1,
next_reset = starttime + 10;

// and retrieve the chunks
uint64_t k = 0, num_samples = 0;
std::vector<std::vector<int16_t>> result;
auto fetch_interval = std::chrono::milliseconds(20);
auto next_fetch = std::chrono::steady_clock::now() + fetch_interval;


// and retrieve the chunks (note: this can of course also be done with pure std::vectors
// instead of stereo_samples)
while (true) {
std::vector<stereo_sample> result;
if (double timestamp = inlet.pull_chunk_numeric_structs(result))
std::cout << timestamp << std::endl; // only showing the time stamps here
std::this_thread::sleep_until(next_fetch);
if (flush) {
// You almost certainly don't want to use flush. This is here so we
// can test maximum outlet throughput.
num_samples += inlet.flush();
} else {
if (double timestamp = inlet.pull_chunk(result)) num_samples += result.size();
}
k++;
next_fetch += fetch_interval;
if (k % 50 == 0) {
double now = lsl::local_clock();
std::cout << num_samples / (now - starttime) << " samples/sec" << std::endl;
if (now > next_reset) {
std::cout << "Resetting counters..." << std::endl;
starttime = now;
next_reset = now + 10;
num_samples = 0;
}
}
}

} catch (std::exception &e) { std::cerr << "Got an exception: " << e.what() << std::endl; }
Expand Down
134 changes: 119 additions & 15 deletions examples/SendDataInChunks.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
#include <cmath>
#include <iostream>
#include <map>
#include <cstring>
#include <lsl_cpp.h>
#include <thread>
#include <algorithm>
#include <random>
#ifndef M_PI
#define M_PI 3.14159265358979323846
#endif


// define a packed sample struct (here: a 16 bit stereo sample).
Expand All @@ -10,31 +17,128 @@ struct stereo_sample {
int16_t l, r;
};

struct fake_device {
/*
We create a fake device that will generate data. The inner details are not
so important because typically it will be up to the real data source + SDK
to provide a way to get data.
*/
std::size_t n_channels;
double srate;
int64_t pattern_samples;
int64_t head;
std::vector<int16_t> pattern;
std::chrono::steady_clock::time_point last_time;

fake_device(const int16_t n_channels, const float srate)
: n_channels(n_channels), srate(srate), head(0) {
pattern_samples = (int64_t)(srate - 0.5) + 1; // truncate OK.

// Pre-allocate entire test pattern. The data _could_ be generated on the fly
// for a much smaller memory hit, but we also use this example application
// to test LSL Outlet performance so we want to reduce out-of-LSL CPU
// utilization.
int64_t magnitude = std::numeric_limits<int16_t>::max();
int64_t offset_0 = magnitude / 2;
int64_t offset_step = magnitude / n_channels;
pattern.reserve(pattern_samples * n_channels);
for (auto sample_ix = 0; sample_ix < pattern_samples; ++sample_ix) {
for (auto chan_ix = 0; chan_ix < n_channels; ++chan_ix) {
pattern.emplace_back(
offset_0 + chan_ix * offset_step +
magnitude * static_cast<int16_t>(sin(M_PI * chan_ix * sample_ix / n_channels)));
}
}
last_time = std::chrono::steady_clock::now();
}

std::vector<int16_t> get_data() {
auto now = std::chrono::steady_clock::now();
auto elapsed_nano =
std::chrono::duration_cast<std::chrono::nanoseconds>(now - last_time).count();
std::size_t elapsed_samples = std::size_t(elapsed_nano * srate * 1e-9); // truncate OK.
std::vector<int16_t> result;
result.resize(elapsed_samples * n_channels);
int64_t ret_samples = get_data(result);
std::vector<int16_t> output(result.begin(), result.begin() + ret_samples);
return output;
}

std::size_t get_data(std::vector<int16_t> &buffer) {
auto now = std::chrono::steady_clock::now();
auto elapsed_nano =
std::chrono::duration_cast<std::chrono::nanoseconds>(now - last_time).count();
int64_t elapsed_samples = std::size_t(elapsed_nano * srate * 1e-9); // truncate OK.
elapsed_samples = std::min(elapsed_samples, (int64_t)buffer.size());
if (false) {
// The fastest but no patterns.
memset(&buffer[0], 23, buffer.size() * sizeof buffer[0]);
} else {
std::size_t end_sample = head + elapsed_samples;
std::size_t nowrap_samples = std::min(pattern_samples - head, elapsed_samples);
memcpy(&buffer[0], &(pattern[head]), nowrap_samples);
if (end_sample > pattern_samples) {
memcpy(&buffer[nowrap_samples], &(pattern[0]), elapsed_samples - nowrap_samples);
}
}
head = (head + elapsed_samples) % pattern_samples;
last_time += std::chrono::nanoseconds(int64_t(1e9 * elapsed_samples / srate));
return elapsed_samples;
}
};

int main(int argc, char **argv) {
std::cout << "SendDataInChunks" << std::endl;
std::cout << "SendDataInChunks StreamName StreamType samplerate n_channels max_buffered chunk_rate" << std::endl;
std::cout << "- max_buffered -- duration in sec (or x100 samples if samplerate is 0) to buffer for each outlet" << std::endl;
std::cout << "- chunk_rate -- number of chunks pushed per second. For this example, make it a common factor of samplingrate and 1000." << std::endl;

std::string name{argc > 1 ? argv[1] : "MyAudioStream"}, type{argc > 2 ? argv[2] : "Audio"};
int samplingrate = argc > 3 ? std::stol(argv[3]) : 44100;
int samplingrate = argc > 3 ? std::stol(argv[3]) : 44100; // Here we specify srate, but typically this would come from the device.
int n_channels = argc > 4 ? std::stol(argv[4]) : 2; // Here we specify n_chans, but typically this would come from theh device.
int32_t max_buffered = argc > 5 ? std::stol(argv[5]) : 360;
int32_t chunk_rate = argc > 6 ? std::stol(argv[6]) : 10; // Chunks per second.
int32_t chunk_samples = samplingrate > 0 ? std::max((samplingrate / chunk_rate), 1) : 100; // Samples per chunk.
int32_t chunk_duration = 1000 / chunk_rate; // Milliseconds per chunk

try {
// make a new stream_info (44.1Khz, 16bit, audio, 2 channels) and open an outlet with it
lsl::stream_info info(name, type, 2, samplingrate, lsl::cf_int16);
lsl::stream_outlet outlet(info);
// Prepare the LSL stream.
lsl::stream_info info(name, type, n_channels, samplingrate, lsl::cf_int16);
lsl::stream_outlet outlet(info, 0, max_buffered);
lsl::xml_element desc = info.desc();
desc.append_child_value("manufacturer", "LSL");
lsl::xml_element chns = desc.append_child("channels");
for (int c = 0; c < n_channels; c++) {
lsl::xml_element chn = chns.append_child("channel");
chn.append_child_value("label", "Chan-" + std::to_string(c));
chn.append_child_value("unit", "microvolts");
chn.append_child_value("type", "EEG");
}

// Create a connection to our device.
fake_device my_device(n_channels, (float)samplingrate);

// Prepare buffer to get data from 'device'.
// The buffer should be largery than you think you need. Here we make it twice as large.
std::vector<int16_t> chunk_buffer(2 * chunk_samples * n_channels);

std::cout << "Now sending data..." << std::endl;

// Your device might have its own timer. Or you can decide how often to poll
// your device, as we do here.
auto nextsample = std::chrono::high_resolution_clock::now();
std::vector<stereo_sample> mychunk(info.nominal_srate() / 10);
int phase = 0;
uint64_t sample_counter = 0;
for (unsigned c = 0;; c++) {
// wait a bit and generate a chunk of random data
nextsample += std::chrono::milliseconds(100);
// wait a bit
nextsample += std::chrono::milliseconds(chunk_duration);
std::this_thread::sleep_until(nextsample);

for (stereo_sample &sample : mychunk) {
sample.l = static_cast<int16_t>(100 * sin(phase / 200.));
sample.r = static_cast<int16_t>(120 * sin(phase / 400.));
phase++;
}
// Get data from device
std::size_t returned_samples = my_device.get_data(chunk_buffer);

// send it
outlet.push_chunk_numeric_structs(mychunk);
// send it to the outlet. push_chunk_multiplexed is one of the more complicated approaches.
// other push_chunk methods are easier but slightly slower.
outlet.push_chunk_multiplexed(chunk_buffer.data(), returned_samples * n_channels, 0.0, true);
}

} catch (std::exception &e) { std::cerr << "Got an exception: " << e.what() << std::endl; }
Expand Down
10 changes: 10 additions & 0 deletions src/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,16 @@ extern "C" {
"Please do not compile this with a lslboost version older than 1.45 because the library would otherwise not be protocol-compatible with builds using other versions."
#endif

// compiler hint that the given expression is likely or unlikely
// (e.g., in conditional statements)
#if defined(__clang__) || defined(__GNUC__)
#define LIKELY(x) __builtin_expect(!!(x), 1)
#define UNLIKELY(x) __builtin_expect(!!(x), 0)
#else
#define LIKELY(x) (x)
#define UNLIKELY(x) (x)
#endif

// the highest supported protocol version
// * 100 is the original version, supported by library versions 1.00+
// * 110 is an alternative protocol that improves throughput, supported by library versions 1.10+
Expand Down
59 changes: 25 additions & 34 deletions src/consumer_queue.cpp
Original file line number Diff line number Diff line change
@@ -1,14 +1,23 @@
#include "consumer_queue.h"
#include "sample.h"
#include "common.h"
#include "send_buffer.h"
#include <chrono>
#include <loguru.hpp>
#include <utility>

using namespace lsl;

consumer_queue::consumer_queue(std::size_t max_capacity, send_buffer_p registry)
: registry_(std::move(registry)), buffer_(max_capacity) {
consumer_queue::consumer_queue(std::size_t size, send_buffer_p registry)
: registry_(std::move(registry)), buffer_(new item_t[size]), size_(size),
// largest integer at which we can wrap correctly
wrap_at_(std::numeric_limits<std::size_t>::max() - size -
std::numeric_limits<std::size_t>::max() % size) {
assert(size_ > 1);
for (std::size_t i = 0; i < size_; ++i)
buffer_[i].seq_state.store(i, std::memory_order_release);
write_idx_.store(0, std::memory_order_release);
read_idx_.store(0, std::memory_order_release);
done_sync_.store(false, std::memory_order_release);
if (registry_) registry_->register_consumer(this);
}

Expand All @@ -20,41 +29,23 @@ consumer_queue::~consumer_queue() {
"Unexpected error while trying to unregister a consumer queue from its registry: %s",
e.what());
}
}

void consumer_queue::push_sample(const sample_p &sample) {
// push a sample, dropping the oldest sample if the queue ist already full.
// During this operation the producer becomes a second consumer, i.e., a case
// where the underlying spsc queue isn't thread-safe) so the mutex is locked.
std::lock_guard<std::mutex> lk(mut_);
while (!buffer_.push(sample)) {
buffer_.pop();
}
cv_.notify_one();
}

sample_p consumer_queue::pop_sample(double timeout) {
sample_p result;
if (timeout <= 0.0) {
std::lock_guard<std::mutex> lk(mut_);
buffer_.pop(result);
} else {
std::unique_lock<std::mutex> lk(mut_);
if (!buffer_.pop(result)) {
// wait for a new sample until the thread calling push_sample delivers one and sends a
// notification, or until timeout
std::chrono::duration<double> sec(timeout);
cv_.wait_for(lk, sec, [&]{ return this->buffer_.pop(result); });
}
}
return result;
delete[] buffer_;
}

uint32_t consumer_queue::flush() noexcept {
std::lock_guard<std::mutex> lk(mut_);
uint32_t n = 0;
while (buffer_.pop()) n++;
while (try_pop()) n++;
return n;
}

bool consumer_queue::empty() { return buffer_.empty(); }
std::size_t consumer_queue::read_available() const {
std::size_t write_index = write_idx_.load(std::memory_order_acquire);
std::size_t read_index = read_idx_.load(std::memory_order_relaxed);
if (write_index >= read_index) return write_index - read_index;
const std::size_t ret = write_index + size_ - read_index;
return ret;
}

bool consumer_queue::empty() const {
return write_idx_.load(std::memory_order_acquire) == read_idx_.load(std::memory_order_relaxed);
}
Loading