Skip to content

Commit cce79e2

Browse files
authored
Merge pull request #135 from sccn/spmc-queue
Replace boost::spsc_queue by a fast SPMC queue
2 parents 65ccfca + 2a3cd09 commit cce79e2

File tree

6 files changed

+347
-83
lines changed

6 files changed

+347
-83
lines changed

examples/ReceiveDataInChunks.cpp

Lines changed: 49 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,63 @@
1+
#include <chrono>
12
#include <iostream>
23
#include <lsl_cpp.h>
34
#include <stdint.h>
5+
#include <thread>
46

57

6-
// define a packed sample struct (here a stereo sample).
7-
#pragma pack(1)
8-
struct stereo_sample {
9-
int16_t l, r;
10-
};
8+
int main(int argc, char **argv) {
9+
std::cout << "ReceiveDataInChunks" << std::endl;
10+
std::cout << "ReceiveDataInChunks StreamName max_buflen flush" << std::endl;
1111

12-
int main(int, char *[]) {
1312
try {
1413

14+
std::string name{argc > 1 ? argv[1] : "MyAudioStream"};
15+
int32_t max_buflen = argc > 2 ? std::stol(argv[2]) : 360;
16+
bool flush = argc > 3;
1517
// resolve the stream of interest & make an inlet
16-
lsl::stream_inlet inlet(lsl::resolve_stream("name", "MyAudioStream").at(0));
18+
lsl::stream_inlet inlet(lsl::resolve_stream("name", name).at(0), max_buflen);
19+
20+
// Use set_postprocessing to get the timestamps in a common base clock.
21+
// Do not use if this application will record timestamps to disk -- it is better to
22+
// do posthoc synchronization.
23+
inlet.set_postprocessing(lsl::post_ALL);
24+
25+
// Inlet opening is implicit when doing pull_sample or pull_chunk.
26+
// Here we open the stream explicitly because we might be doing
27+
// `flush` only.
28+
inlet.open_stream();
29+
30+
double starttime = lsl::local_clock(), next_display = starttime + 1,
31+
next_reset = starttime + 10;
32+
33+
// and retrieve the chunks
34+
uint64_t k = 0, num_samples = 0;
35+
std::vector<std::vector<int16_t>> result;
36+
auto fetch_interval = std::chrono::milliseconds(20);
37+
auto next_fetch = std::chrono::steady_clock::now() + fetch_interval;
38+
1739

18-
// and retrieve the chunks (note: this can of course also be done with pure std::vectors
19-
// instead of stereo_samples)
2040
while (true) {
21-
std::vector<stereo_sample> result;
22-
if (double timestamp = inlet.pull_chunk_numeric_structs(result))
23-
std::cout << timestamp << std::endl; // only showing the time stamps here
41+
std::this_thread::sleep_until(next_fetch);
42+
if (flush) {
43+
// You almost certainly don't want to use flush. This is here so we
44+
// can test maximum outlet throughput.
45+
num_samples += inlet.flush();
46+
} else {
47+
if (double timestamp = inlet.pull_chunk(result)) num_samples += result.size();
48+
}
49+
k++;
50+
next_fetch += fetch_interval;
51+
if (k % 50 == 0) {
52+
double now = lsl::local_clock();
53+
std::cout << num_samples / (now - starttime) << " samples/sec" << std::endl;
54+
if (now > next_reset) {
55+
std::cout << "Resetting counters..." << std::endl;
56+
starttime = now;
57+
next_reset = now + 10;
58+
num_samples = 0;
59+
}
60+
}
2461
}
2562

2663
} catch (std::exception &e) { std::cerr << "Got an exception: " << e.what() << std::endl; }

examples/SendDataInChunks.cpp

Lines changed: 119 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,14 @@
11
#include <cmath>
22
#include <iostream>
3+
#include <map>
4+
#include <cstring>
35
#include <lsl_cpp.h>
46
#include <thread>
7+
#include <algorithm>
8+
#include <random>
9+
#ifndef M_PI
10+
#define M_PI 3.14159265358979323846
11+
#endif
512

613

714
// define a packed sample struct (here: a 16 bit stereo sample).
@@ -10,31 +17,128 @@ struct stereo_sample {
1017
int16_t l, r;
1118
};
1219

20+
struct fake_device {
21+
/*
22+
We create a fake device that will generate data. The inner details are not
23+
so important because typically it will be up to the real data source + SDK
24+
to provide a way to get data.
25+
*/
26+
std::size_t n_channels;
27+
double srate;
28+
int64_t pattern_samples;
29+
int64_t head;
30+
std::vector<int16_t> pattern;
31+
std::chrono::steady_clock::time_point last_time;
32+
33+
fake_device(const int16_t n_channels, const float srate)
34+
: n_channels(n_channels), srate(srate), head(0) {
35+
pattern_samples = (int64_t)(srate - 0.5) + 1; // truncate OK.
36+
37+
// Pre-allocate entire test pattern. The data _could_ be generated on the fly
38+
// for a much smaller memory hit, but we also use this example application
39+
// to test LSL Outlet performance so we want to reduce out-of-LSL CPU
40+
// utilization.
41+
int64_t magnitude = std::numeric_limits<int16_t>::max();
42+
int64_t offset_0 = magnitude / 2;
43+
int64_t offset_step = magnitude / n_channels;
44+
pattern.reserve(pattern_samples * n_channels);
45+
for (auto sample_ix = 0; sample_ix < pattern_samples; ++sample_ix) {
46+
for (auto chan_ix = 0; chan_ix < n_channels; ++chan_ix) {
47+
pattern.emplace_back(
48+
offset_0 + chan_ix * offset_step +
49+
magnitude * static_cast<int16_t>(sin(M_PI * chan_ix * sample_ix / n_channels)));
50+
}
51+
}
52+
last_time = std::chrono::steady_clock::now();
53+
}
54+
55+
std::vector<int16_t> get_data() {
56+
auto now = std::chrono::steady_clock::now();
57+
auto elapsed_nano =
58+
std::chrono::duration_cast<std::chrono::nanoseconds>(now - last_time).count();
59+
std::size_t elapsed_samples = std::size_t(elapsed_nano * srate * 1e-9); // truncate OK.
60+
std::vector<int16_t> result;
61+
result.resize(elapsed_samples * n_channels);
62+
int64_t ret_samples = get_data(result);
63+
std::vector<int16_t> output(result.begin(), result.begin() + ret_samples);
64+
return output;
65+
}
66+
67+
std::size_t get_data(std::vector<int16_t> &buffer) {
68+
auto now = std::chrono::steady_clock::now();
69+
auto elapsed_nano =
70+
std::chrono::duration_cast<std::chrono::nanoseconds>(now - last_time).count();
71+
int64_t elapsed_samples = std::size_t(elapsed_nano * srate * 1e-9); // truncate OK.
72+
elapsed_samples = std::min(elapsed_samples, (int64_t)buffer.size());
73+
if (false) {
74+
// The fastest but no patterns.
75+
memset(&buffer[0], 23, buffer.size() * sizeof buffer[0]);
76+
} else {
77+
std::size_t end_sample = head + elapsed_samples;
78+
std::size_t nowrap_samples = std::min(pattern_samples - head, elapsed_samples);
79+
memcpy(&buffer[0], &(pattern[head]), nowrap_samples);
80+
if (end_sample > pattern_samples) {
81+
memcpy(&buffer[nowrap_samples], &(pattern[0]), elapsed_samples - nowrap_samples);
82+
}
83+
}
84+
head = (head + elapsed_samples) % pattern_samples;
85+
last_time += std::chrono::nanoseconds(int64_t(1e9 * elapsed_samples / srate));
86+
return elapsed_samples;
87+
}
88+
};
89+
1390
int main(int argc, char **argv) {
91+
std::cout << "SendDataInChunks" << std::endl;
92+
std::cout << "SendDataInChunks StreamName StreamType samplerate n_channels max_buffered chunk_rate" << std::endl;
93+
std::cout << "- max_buffered -- duration in sec (or x100 samples if samplerate is 0) to buffer for each outlet" << std::endl;
94+
std::cout << "- chunk_rate -- number of chunks pushed per second. For this example, make it a common factor of samplingrate and 1000." << std::endl;
95+
1496
std::string name{argc > 1 ? argv[1] : "MyAudioStream"}, type{argc > 2 ? argv[2] : "Audio"};
15-
int samplingrate = argc > 3 ? std::stol(argv[3]) : 44100;
97+
int samplingrate = argc > 3 ? std::stol(argv[3]) : 44100; // Here we specify srate, but typically this would come from the device.
98+
int n_channels = argc > 4 ? std::stol(argv[4]) : 2; // Here we specify n_chans, but typically this would come from theh device.
99+
int32_t max_buffered = argc > 5 ? std::stol(argv[5]) : 360;
100+
int32_t chunk_rate = argc > 6 ? std::stol(argv[6]) : 10; // Chunks per second.
101+
int32_t chunk_samples = samplingrate > 0 ? std::max((samplingrate / chunk_rate), 1) : 100; // Samples per chunk.
102+
int32_t chunk_duration = 1000 / chunk_rate; // Milliseconds per chunk
103+
16104
try {
17-
// make a new stream_info (44.1Khz, 16bit, audio, 2 channels) and open an outlet with it
18-
lsl::stream_info info(name, type, 2, samplingrate, lsl::cf_int16);
19-
lsl::stream_outlet outlet(info);
105+
// Prepare the LSL stream.
106+
lsl::stream_info info(name, type, n_channels, samplingrate, lsl::cf_int16);
107+
lsl::stream_outlet outlet(info, 0, max_buffered);
108+
lsl::xml_element desc = info.desc();
109+
desc.append_child_value("manufacturer", "LSL");
110+
lsl::xml_element chns = desc.append_child("channels");
111+
for (int c = 0; c < n_channels; c++) {
112+
lsl::xml_element chn = chns.append_child("channel");
113+
chn.append_child_value("label", "Chan-" + std::to_string(c));
114+
chn.append_child_value("unit", "microvolts");
115+
chn.append_child_value("type", "EEG");
116+
}
117+
118+
// Create a connection to our device.
119+
fake_device my_device(n_channels, (float)samplingrate);
120+
121+
// Prepare buffer to get data from 'device'.
122+
// The buffer should be largery than you think you need. Here we make it twice as large.
123+
std::vector<int16_t> chunk_buffer(2 * chunk_samples * n_channels);
20124

21125
std::cout << "Now sending data..." << std::endl;
126+
127+
// Your device might have its own timer. Or you can decide how often to poll
128+
// your device, as we do here.
22129
auto nextsample = std::chrono::high_resolution_clock::now();
23-
std::vector<stereo_sample> mychunk(info.nominal_srate() / 10);
24-
int phase = 0;
130+
uint64_t sample_counter = 0;
25131
for (unsigned c = 0;; c++) {
26-
// wait a bit and generate a chunk of random data
27-
nextsample += std::chrono::milliseconds(100);
132+
// wait a bit
133+
nextsample += std::chrono::milliseconds(chunk_duration);
28134
std::this_thread::sleep_until(nextsample);
29135

30-
for (stereo_sample &sample : mychunk) {
31-
sample.l = static_cast<int16_t>(100 * sin(phase / 200.));
32-
sample.r = static_cast<int16_t>(120 * sin(phase / 400.));
33-
phase++;
34-
}
136+
// Get data from device
137+
std::size_t returned_samples = my_device.get_data(chunk_buffer);
35138

36-
// send it
37-
outlet.push_chunk_numeric_structs(mychunk);
139+
// send it to the outlet. push_chunk_multiplexed is one of the more complicated approaches.
140+
// other push_chunk methods are easier but slightly slower.
141+
outlet.push_chunk_multiplexed(chunk_buffer.data(), returned_samples * n_channels, 0.0, true);
38142
}
39143

40144
} catch (std::exception &e) { std::cerr << "Got an exception: " << e.what() << std::endl; }

src/common.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,16 @@ extern "C" {
3030
"Please do not compile this with a lslboost version older than 1.45 because the library would otherwise not be protocol-compatible with builds using other versions."
3131
#endif
3232

33+
// compiler hint that the given expression is likely or unlikely
34+
// (e.g., in conditional statements)
35+
#if defined(__clang__) || defined(__GNUC__)
36+
#define LIKELY(x) __builtin_expect(!!(x), 1)
37+
#define UNLIKELY(x) __builtin_expect(!!(x), 0)
38+
#else
39+
#define LIKELY(x) (x)
40+
#define UNLIKELY(x) (x)
41+
#endif
42+
3343
// the highest supported protocol version
3444
// * 100 is the original version, supported by library versions 1.00+
3545
// * 110 is an alternative protocol that improves throughput, supported by library versions 1.10+

src/consumer_queue.cpp

Lines changed: 25 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,23 @@
11
#include "consumer_queue.h"
2-
#include "sample.h"
2+
#include "common.h"
33
#include "send_buffer.h"
44
#include <chrono>
55
#include <loguru.hpp>
66
#include <utility>
77

88
using namespace lsl;
99

10-
consumer_queue::consumer_queue(std::size_t max_capacity, send_buffer_p registry)
11-
: registry_(std::move(registry)), buffer_(max_capacity) {
10+
consumer_queue::consumer_queue(std::size_t size, send_buffer_p registry)
11+
: registry_(std::move(registry)), buffer_(new item_t[size]), size_(size),
12+
// largest integer at which we can wrap correctly
13+
wrap_at_(std::numeric_limits<std::size_t>::max() - size -
14+
std::numeric_limits<std::size_t>::max() % size) {
15+
assert(size_ > 1);
16+
for (std::size_t i = 0; i < size_; ++i)
17+
buffer_[i].seq_state.store(i, std::memory_order_release);
18+
write_idx_.store(0, std::memory_order_release);
19+
read_idx_.store(0, std::memory_order_release);
20+
done_sync_.store(false, std::memory_order_release);
1221
if (registry_) registry_->register_consumer(this);
1322
}
1423

@@ -20,41 +29,23 @@ consumer_queue::~consumer_queue() {
2029
"Unexpected error while trying to unregister a consumer queue from its registry: %s",
2130
e.what());
2231
}
23-
}
24-
25-
void consumer_queue::push_sample(const sample_p &sample) {
26-
// push a sample, dropping the oldest sample if the queue ist already full.
27-
// During this operation the producer becomes a second consumer, i.e., a case
28-
// where the underlying spsc queue isn't thread-safe) so the mutex is locked.
29-
std::lock_guard<std::mutex> lk(mut_);
30-
while (!buffer_.push(sample)) {
31-
buffer_.pop();
32-
}
33-
cv_.notify_one();
34-
}
35-
36-
sample_p consumer_queue::pop_sample(double timeout) {
37-
sample_p result;
38-
if (timeout <= 0.0) {
39-
std::lock_guard<std::mutex> lk(mut_);
40-
buffer_.pop(result);
41-
} else {
42-
std::unique_lock<std::mutex> lk(mut_);
43-
if (!buffer_.pop(result)) {
44-
// wait for a new sample until the thread calling push_sample delivers one and sends a
45-
// notification, or until timeout
46-
std::chrono::duration<double> sec(timeout);
47-
cv_.wait_for(lk, sec, [&]{ return this->buffer_.pop(result); });
48-
}
49-
}
50-
return result;
32+
delete[] buffer_;
5133
}
5234

5335
uint32_t consumer_queue::flush() noexcept {
54-
std::lock_guard<std::mutex> lk(mut_);
5536
uint32_t n = 0;
56-
while (buffer_.pop()) n++;
37+
while (try_pop()) n++;
5738
return n;
5839
}
5940

60-
bool consumer_queue::empty() { return buffer_.empty(); }
41+
std::size_t consumer_queue::read_available() const {
42+
std::size_t write_index = write_idx_.load(std::memory_order_acquire);
43+
std::size_t read_index = read_idx_.load(std::memory_order_relaxed);
44+
if (write_index >= read_index) return write_index - read_index;
45+
const std::size_t ret = write_index + size_ - read_index;
46+
return ret;
47+
}
48+
49+
bool consumer_queue::empty() const {
50+
return write_idx_.load(std::memory_order_acquire) == read_idx_.load(std::memory_order_relaxed);
51+
}

0 commit comments

Comments
 (0)