Skip to content
Open
2 changes: 1 addition & 1 deletion docs/site/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ docusaurus_bin.docusaurus_binary(
docusaurus_bin.docusaurus_binary(
name = "start",
# TODO: compute the output dir
args = ["start", "docs/site"],
args = ["start", "docs/site", "--host 0.0.0.0"],
data = glob(
["**/*"],
exclude = ["node_modules/**/*"],
Expand Down
2 changes: 1 addition & 1 deletion docs/site/src/pages/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ function HomepageHeader() {
export default function Home() {
const { siteConfig } = useDocusaurusContext();
const programStr = simpleProgram;
const getStream = () => getNoisySinSignal(100, 0.0015, 100, 80, 2);
const getStream = () => getNoisySinSignal(20, 0.0015, 100, 80, 2);
return (
<Layout title={`${siteConfig.title}`} description="RtBot">
<HomepageHeader />
Expand Down
4 changes: 2 additions & 2 deletions libs/api/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ cc_library(
]),
hdrs = glob(["include/**/*.h"]) + [":jsonschema_to_src"],
copts = [
"-O3",
# "-O3",
"-Iexternal/json-schema-validator/src",
"-Ilibs/core/include/rtbot",
# this is needed when building from another workspace
Expand All @@ -48,7 +48,7 @@ cc_library(
)

BASE_LINKOPTS = [
"-O3",
# "-O3",
"-lembind", # Enable embind
"-s MODULARIZE",
"--embind-emit-tsd",
Expand Down
49 changes: 48 additions & 1 deletion libs/api/src/FactoryOp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "rtbot/Join.h"
#include "rtbot/Operator.h"
#include "rtbot/Output.h"
#include "rtbot/Pipeline.h"
#include "rtbot/finance/RelativeStrengthIndex.h"
#include "rtbot/std/Add.h"
#include "rtbot/std/And.h"
Expand All @@ -20,6 +21,7 @@
#include "rtbot/std/Difference.h"
#include "rtbot/std/Division.h"
#include "rtbot/std/EqualTo.h"
#include "rtbot/std/FastFourierTransform.h"
#include "rtbot/std/FiniteImpulseResponse.h"
#include "rtbot/std/GreaterThan.h"
#include "rtbot/std/HermiteResampler.h"
Expand All @@ -34,10 +36,11 @@
#include "rtbot/std/Plus.h"
#include "rtbot/std/Power.h"
#include "rtbot/std/Scale.h"
#include "rtbot/std/Sort.h"
#include "rtbot/std/SortIndex.h"
#include "rtbot/std/StandardDeviation.h"
#include "rtbot/std/TimeShift.h"
#include "rtbot/std/Variable.h"
#include "rtbot/Pipeline.h"

using json = nlohmann::json;

Expand Down Expand Up @@ -107,6 +110,22 @@ void from_json(const json& j, FiniteImpulseResponse<T, V>& p) {
p = FiniteImpulseResponse<T, V>(j["id"].get<string>(), j["coeff"].get<vector<V>>());
}

template <class T, class V>
void to_json(json& j, const FastFourrierTransform<T, V>& p) {
j = json{{"type", p.typeName()},
{"id", p.id},
{"N", p.getN()},
{"emitPower", p.getEmitPower()},
{"emitRePart", p.getEmitRePart()},
{"emitImPart", p.getEmitImPart()}};
}

template <class T, class V>
void from_json(const json& j, FastFourrierTransform<T, V>& p) {
p = FastFourrierTransform<T, V>(j["id"].get<string>(), j["N"].get<size_t>(), j["emitPower"].get<bool>(),
j["emitRePart"].get<bool>(), j["emitImPart"].get<bool>());
}

template <class T, class V>
void to_json(json& j, const Join<T, V>& p) {
j = json{{"type", p.typeName()}, {"id", p.id}, {"numPorts", p.getNumDataInputs()}};
Expand Down Expand Up @@ -317,6 +336,32 @@ void from_json(const json& j, Scale<T, V>& p) {
p = Scale<T, V>(j["id"].get<string>(), j["value"].get<V>());
}

template <class T, class V>
void to_json(json& j, const Sort<T, V>& p) {
j = json{{"type", p.typeName()}, {"id", p.id},
{"numInputs", p.getNumInputs()}, {"numOutputs", p.getNumOutputs()},
{"ascending", p.getAscending()}, {"maxInputBufferSize", p.getMaxInputBufferSize()}};
}

template <class T, class V>
void from_json(const json& j, Sort<T, V>& p) {
p = Sort<T, V>(j["id"].get<string>(), j["numInputs"].get<size_t>(), j["numOutputs"].get<size_t>(),
j["ascending"].get<bool>(), j["maxInputBufferSize"].get<size_t>());
}

template <class T, class V>
void to_json(json& j, const SortIndex<T, V>& p) {
j = json{{"type", p.typeName()}, {"id", p.id},
{"numInputs", p.getNumInputs()}, {"numOutputs", p.getNumOutputs()},
{"ascending", p.getAscending()}, {"maxInputBufferSize", p.getMaxInputBufferSize()}};
}

template <class T, class V>
void from_json(const json& j, SortIndex<T, V>& p) {
p = SortIndex<T, V>(j["id"].get<string>(), j["numInputs"].get<size_t>(), j["numOutputs"].get<size_t>(),
j["ascending"].get<bool>(), j["maxInputBufferSize"].get<size_t>());
}

template <class T, class V>
void to_json(json& j, const Power<T, V>& p) {
j = json{{"type", p.typeName()}, {"id", p.id}, {"value", p.getValue()}};
Expand Down Expand Up @@ -416,6 +461,8 @@ FactoryOp::FactoryOp() {
op_registry_add<LessThan<uint64_t, double>, json>();
op_registry_add<EqualTo<uint64_t, double>, json>();
op_registry_add<Scale<uint64_t, double>, json>();
op_registry_add<Sort<uint64_t, double>, json>();
op_registry_add<SortIndex<uint64_t, double>, json>();
op_registry_add<Constant<uint64_t, double>, json>();
op_registry_add<CumulativeSum<uint64_t, double>, json>();
op_registry_add<Count<uint64_t, double>, json>();
Expand Down
2 changes: 1 addition & 1 deletion libs/core/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ cc_library(
"src/**/*.cpp",
]),
copts = [
"-O3",
# "-O3",
],
hdrs = glob(["include/**/*.h"]),
includes = ["include"],
Expand Down
19 changes: 10 additions & 9 deletions libs/core/include/rtbot/Join.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,15 @@ Inputs: `i1`...`iN` where N defined by `numPorts`
Outputs: `o1`...`oN` where N defined by `numPorts`

The `Join` operator is used to synchronize two or more incoming message streams into
a single, consistent output.
a single, consistent output. `Join` waits until it can emit a consistent output: whenever
there exist messages in all buffers with common time. In such scenario we say a synchronization
has occur and synchronized messages are emitted.

The `Join` operator holds a message buffer on `i1`, `i2`, ... , `iN` respectively,
it uses the message time field to synchronize the streams and pick 1 message per input port.
If at least one of the message buffer is empty or a message with the expected time can not
be found on one of the buffers then the synchronization will not occur. When synchronization
occurs messages with older timestamps than the synchronization time are discarded since it is
understood that they can not be synchronized in the future. The `Join` operator emits the
synchronized messages through `o1`, `o2`, ... , `oN` respectively after the synchronization takes place.
Internally the `Join` operator holds a message buffer on `i1`, `i2`, ... , `iN` respectively,
it uses the message time field to synchronize the streams and pick 1 message per input port.
If at least one of the message buffer is empty, or if for each one of the buffer there isn't
a message with the same time as the incoming message, synchronization will not occur. When synchronization
occurs messages with older timestamps than the synchronization time are discarded since it is
understood that they can not be synchronized in the future. The `Join` operator emits the
synchronized messages through `o1`, `o2`, ... , `oN` respectively after the synchronization takes place.
If no synchronization occurs then an empty message {} is emitted through `o1`, `o2`, ... , `oN` respectively.

141 changes: 141 additions & 0 deletions libs/std/include/rtbot/std/FastFourierTransform.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
#include <complex>
#include <valarray>

#include "rtbot/Operator.h"

namespace rtbot {

template <class T, class V>
class FastFourrierTransform : public Operator<T, V> {
public:
FastFourrierTransform(string const& id, size_t N = 7, size_t skip = 127, bool emitPower = true,
bool emitRePart = false, bool emitImPart = false)
: Operator<T, V>(id) {
this->N = N;
this->M = pow(2, N);
this->emitPower = emitPower;
this->emitRePart = emitRePart;
this->emitImPart = emitImPart;
// recall that the N passed in the constructor is used to compute the size of the input buffer
// the actual size of the FFT is 2^N
this->addDataInput("i1", this->M);
// allocate the vector that will contain the FFT
this->a = vector<complex<V>>(this->M);
// declare the output ports based on the parameters passed in the constructor
for (int i = 0; i < this->M; i++) {
this->addOutput("w" + to_string(i + 1));
if (emitPower) this->addOutput("p" + to_string(i + 1));
if (emitRePart) this->addOutput("re" + to_string(i + 1));
if (emitImPart) this->addOutput("im" + to_string(i + 1));
}
}

string typeName() const override { return "FastFourrierTransform"; }

map<string, vector<Message<T, V>>> processData() override {
this->skipCounter++;

if (this->skipCounter < this->skip) return map<string, vector<Message<T, V>>>();

this->skipCounter = 0;

string inputPort;
auto in = this->getDataInputs();
if (in.size() == 1)
inputPort = in.at(0);
else
throw runtime_error(typeName() + " : more than 1 input port found");
map<string, vector<Message<T, V>>> outputMsgs;

auto input = this->dataInputs.find(inputPort)->second;
for (int i = 0; i < this->M; i++) {
this->a[i].real((input.at(i).value));
this->a[i].imag(0);
}

// compute the FFT
fft(this->a);

auto time = this->getDataInputLastMessage(inputPort).time;
for (size_t i = 0; i < this->M; i++) {
if (this->emitRePart) {
Message<T, V> re(time, this->a[i].real());
vector<Message<T, V>> toEmit = {re};
outputMsgs.emplace("re" + to_string(i + 1), toEmit);
}
if (this->emitImPart) {
Message<T, V> im(time, this->a[i].imag());
vector<Message<T, V>> toEmit = {im};
outputMsgs.emplace("im" + to_string(i + 1), toEmit);
}
if (this->emitPower) {
Message<T, V> p(time, pow(this->a[i].real(), 2) + pow(this->a[i].imag(), 2));
vector<Message<T, V>> toEmit = {p};
outputMsgs.emplace("p" + to_string(i + 1), toEmit);
}

Message<T, V> w(time, (i + 1.0) / this->M);
vector<Message<T, V>> toEmit = {w};
outputMsgs.emplace("w" + to_string(i + 1), toEmit);
}

return outputMsgs;
}

size_t getSize() { return this->M; }
size_t getN() { return this->N; }
bool getEmitPower() { return this->emitPower; }
bool getEmitRePart() { return this->emitRePart; }
bool getEmitImPart() { return this->emitImPart; }

private:
size_t skipCounter;
size_t skip;
size_t M;
size_t N;
bool emitPower;
bool emitRePart;
bool emitImPart;
vector<complex<V>> a;

// see https://rosettacode.org/wiki/Fast_Fourier_transform#C.2B.2B
void fft(std::vector<std::complex<V>>& x) {
// DFT
unsigned int N = x.size(), k = N, n;
double thetaT = 3.14159265358979323846264338328L / N;
std::complex<V> phiT = std::complex<V>(cos(thetaT), -sin(thetaT)), Tt;
while (k > 1) {
n = k;
k >>= 1;
phiT = phiT * phiT;
Tt = 1.0L;
for (unsigned int l = 0; l < k; l++) {
for (unsigned int a = l; a < N; a += n) {
unsigned int b = a + k;
std::complex<V> t = x[a] - x[b];
x[a] += x[b];
x[b] = t * Tt;
}
Tt *= phiT;
}
}
// Decimate
unsigned int m = (unsigned int)log2(N);
for (unsigned int a = 0; a < N; a++) {
unsigned int b = a;
// Reverse bits
b = (((b & 0xaaaaaaaa) >> 1) | ((b & 0x55555555) << 1));
b = (((b & 0xcccccccc) >> 2) | ((b & 0x33333333) << 2));
b = (((b & 0xf0f0f0f0) >> 4) | ((b & 0x0f0f0f0f) << 4));
b = (((b & 0xff00ff00) >> 8) | ((b & 0x00ff00ff) << 8));
b = ((b >> 16) | (b << 16)) >> (32 - m);
if (b > a) {
std::complex<V> t = x[a];
x[a] = x[b];
x[b] = t;
}
}
}
};

} // namespace rtbot
56 changes: 56 additions & 0 deletions libs/std/include/rtbot/std/FastFourierTransform.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
---
behavior:
buffered: true
throughput: variable
view:
shape: circle
latex:
template: |
fft
jsonschema:
type: object
properties:
id:
type: string
description: The id of the operator
N:
type: integer
default: 7
minimum: 2
description: Use to compute the fft matrix size, which will be $2^N$
examples: [3, 4, 5]
skip:
type: integer
default: 127
minimum: 0
description: Controls how many messages to skip before the next computation of the fft. This is useful for applications were the fft is not needed to be computed for each received message. Notice that a value different than 0 changes the throughput of the signal.
examples: [127, 255, 100]
emitPower:
type: boolean
description: Indicates whether the power correspondent to the different frequencies should be computed and emitted through the output ports `p1`, ..., `pM`, where $M=2^N$
default: true
examples: [true, false]
emitRePart:
type: boolean
description: Indicates whether the real part of the amplitude correspondent to the different frequencies should be computed and emitted through the output ports `re1`, ..., `reM`, where $M=2^N$
default: true
examples: [true, false]
emitImPart:
type: boolean
description: Indicates whether the imaginary part of the amplitude correspondent to the different frequencies should be computed and emitted through the output ports `im1`, ..., `imM`, where $M=2^N$
default: true
examples: [true, false]
required: ["id"]
---

# FastFourierTransform

Inputs: `i1`
Outputs:

- frequencies `w1`...`wM`
- power `p1`...`pM` if `emitPower` is true
- real part `re1`...`reM` if `emitRePart` is true
- imaginary part `im1`...`imM` if `emitImPart` is true

where `M` is equal to $2^n$.
Loading