Skip to content

Commit da48565

Browse files
authored
Merge pull request #68 from rtbot-dev/RB-249
RB-249 Add ConstantResampler and Function operators
2 parents 9d0a777 + 0627d3d commit da48565

File tree

7 files changed

+716
-0
lines changed

7 files changed

+716
-0
lines changed

libs/api/src/FactoryOp.cpp

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,34 @@ void from_json(const json& j, Pipeline<T, V>& p) {
399399
p = Pipeline<T, V>(j.at("id").get<string>(), j.at("prog").dump());
400400
}
401401

402+
template <class T, class V>
403+
void to_json(json& j, const ConstantResampler<T, V>& p) {
404+
j = json{{"type", p.typeName()}, {"id", p.id}, {"interval", p.getInterval()}};
405+
}
406+
407+
template <class T, class V>
408+
void from_json(const json& j, ConstantResampler<T, V>& p) {
409+
p = ConstantResampler<T, V>(j["id"].get<string>(), j["interval"].get<T>());
410+
}
411+
412+
template <class T, class V>
413+
void to_json(json& j, const Function<T, V>& p) {
414+
vector<vector<V>> points_array;
415+
for (const auto& point : p.getPoints()) {
416+
points_array.push_back({point.first, point.second});
417+
}
418+
j = json{{"type", p.typeName()}, {"id", p.id}, {"points", points_array}, {"type", p.getInterpolationType()}};
419+
}
420+
421+
template <class T, class V>
422+
void from_json(const json& j, Function<T, V>& p) {
423+
vector<pair<V, V>> points;
424+
for (const auto& point : j["points"]) {
425+
points.push_back({point[0].get<V>(), point[1].get<V>()});
426+
}
427+
p = Function<T, V>(j["id"].get<string>(), points, j.value("type", "linear"));
428+
}
429+
402430
/* Operators serialization - deserialization - end */
403431

404432
Bytes FactoryOp::serialize(string const& programId) {
@@ -463,6 +491,8 @@ FactoryOp::FactoryOp() {
463491
op_registry_add<Pipeline<uint64_t, double>, json>();
464492
op_registry_add<GreaterThanStream<uint64_t, double>, json>();
465493
op_registry_add<LessThanStream<uint64_t, double>, json>();
494+
op_registry_add<Function<uint64_t, double>, json>();
495+
op_registry_add<ConstantResampler<uint64_t, double>, json>();
466496
}
467497

468498
static FactoryOp factory;
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
#ifndef CONSTANTRESAMPLER_H
2+
#define CONSTANTRESAMPLER_H
3+
4+
#include "rtbot/Operator.h"
5+
6+
namespace rtbot {
7+
8+
using namespace std;
9+
10+
template <class T, class V>
11+
struct ConstantResampler : public Operator<T, V> {
12+
T dt; // Time interval between emissions
13+
T nextEmit; // Next time point to emit at
14+
bool initiated; // Whether we've received our first message
15+
V lastValue; // Last value received, used for causal consistency
16+
17+
ConstantResampler() = default;
18+
19+
ConstantResampler(string const& id, T interval) : Operator<T, V>(id) {
20+
if (interval <= 0) throw runtime_error(typeName() + ": time interval must be positive");
21+
22+
this->dt = interval;
23+
this->initiated = false;
24+
this->addDataInput("i1", 1);
25+
this->addOutput("o1");
26+
}
27+
28+
virtual Bytes collect() {
29+
Bytes bytes = Operator<T, V>::collect();
30+
31+
bytes.insert(bytes.end(), reinterpret_cast<const unsigned char*>(&dt),
32+
reinterpret_cast<const unsigned char*>(&dt) + sizeof(dt));
33+
bytes.insert(bytes.end(), reinterpret_cast<const unsigned char*>(&nextEmit),
34+
reinterpret_cast<const unsigned char*>(&nextEmit) + sizeof(nextEmit));
35+
bytes.insert(bytes.end(), reinterpret_cast<const unsigned char*>(&initiated),
36+
reinterpret_cast<const unsigned char*>(&initiated) + sizeof(initiated));
37+
bytes.insert(bytes.end(), reinterpret_cast<const unsigned char*>(&lastValue),
38+
reinterpret_cast<const unsigned char*>(&lastValue) + sizeof(lastValue));
39+
40+
return bytes;
41+
}
42+
43+
virtual void restore(Bytes::const_iterator& it) {
44+
Operator<T, V>::restore(it);
45+
46+
dt = *reinterpret_cast<const T*>(&(*it));
47+
it += sizeof(dt);
48+
nextEmit = *reinterpret_cast<const T*>(&(*it));
49+
it += sizeof(nextEmit);
50+
initiated = *reinterpret_cast<const bool*>(&(*it));
51+
it += sizeof(initiated);
52+
lastValue = *reinterpret_cast<const V*>(&(*it));
53+
it += sizeof(lastValue);
54+
}
55+
56+
string typeName() const override { return "ConstantResampler"; }
57+
58+
OperatorMessage<T, V> processData() override {
59+
string inputPort;
60+
auto in = this->getDataInputs();
61+
if (in.size() == 1)
62+
inputPort = in.at(0);
63+
else
64+
throw runtime_error(typeName() + " : more than 1 input port found");
65+
66+
Message<T, V> msg = this->getDataInputLastMessage(inputPort);
67+
68+
// Initialize on first message
69+
if (!initiated) {
70+
nextEmit = msg.time + dt;
71+
lastValue = msg.value;
72+
initiated = true;
73+
return {};
74+
}
75+
76+
OperatorMessage<T, V> outputMsgs;
77+
PortMessage<T, V> toEmit;
78+
79+
// If message time is past next emit time, we need to emit at all grid points
80+
// up to (but not including) the current message time
81+
while (nextEmit < msg.time) {
82+
Message<T, V> out;
83+
out.time = nextEmit;
84+
out.value = lastValue; // Use previous value for causal consistency
85+
toEmit.push_back(out);
86+
nextEmit += dt;
87+
}
88+
89+
// If current message is exactly on the grid, emit with current value
90+
if (nextEmit == msg.time) {
91+
Message<T, V> out;
92+
out.time = msg.time;
93+
out.value = msg.value; // Use current value for grid-aligned messages
94+
toEmit.push_back(out);
95+
nextEmit += dt;
96+
}
97+
98+
// If we emitted anything, add it to output messages
99+
if (!toEmit.empty()) {
100+
outputMsgs.emplace("o1", toEmit);
101+
}
102+
103+
// Update last value for future emissions
104+
lastValue = msg.value;
105+
106+
return outputMsgs;
107+
}
108+
109+
T getInterval() const { return dt; }
110+
T getNextEmissionTime() const { return nextEmit; }
111+
bool isInitiated() const { return initiated; }
112+
V getLastValue() const { return lastValue; }
113+
114+
private:
115+
void reset() { initiated = false; }
116+
};
117+
118+
} // namespace rtbot
119+
120+
#endif // CONSTANTRESAMPLER_H
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
---
2+
behavior:
3+
buffered: true
4+
throughput: constant
5+
view:
6+
shape: circle
7+
latex:
8+
template: |
9+
\Delta t = {{interval}}
10+
jsonschema:
11+
type: object
12+
properties:
13+
id:
14+
type: string
15+
description: The id of the operator
16+
interval:
17+
type: integer
18+
examples:
19+
- 5
20+
description: The time interval between emissions.
21+
minimum: 1
22+
required: ["id", "interval"]
23+
---
24+
25+
# ConstantResampler
26+
27+
Inputs: `i1`
28+
Outputs: `o1`
29+
30+
The `ConstantResampler` operator transforms a variable throughput input signal into a constant throughput output signal by emitting values at fixed time intervals. The operator maintains causal consistency while handling grid-aligned messages with special precision.
31+
32+
## Behavior
33+
34+
### Initialization
35+
36+
When receiving its first message at time t₀, the operator:
37+
38+
- Stores the initial value
39+
- Sets up first emission point at t₀ + dt
40+
- Does not emit
41+
42+
### Regular Operation
43+
44+
For subsequent messages, the operator follows these rules:
45+
46+
1. **Grid-Aligned Messages**:
47+
48+
- If a message arrives exactly at a grid point (t = nextEmit)
49+
- Emits immediately with the current message's value
50+
- Updates nextEmit to t + dt
51+
52+
2. **Between Grid Points**:
53+
54+
- If message time is between grid points
55+
- Stores the value for future emissions
56+
- No immediate emission
57+
58+
3. **Past Grid Points**:
59+
60+
- If message time is past one or more grid points
61+
- Emits at each missed grid point using the last known value
62+
- Maintains causal consistency by using values known at each emission time
63+
64+
4. **Large Gaps**:
65+
- For large time gaps between messages
66+
- Emits at all intermediate grid points
67+
- Uses the last known value before each emission point
68+
69+
### Key Characteristics
70+
71+
- Lazy evaluation: only emits when receiving messages
72+
- Maintains constant time intervals between emissions
73+
- Preserves causal consistency
74+
- Special handling for grid-aligned messages
75+
- Buffers only one message (the most recent value)
76+
- State is maintained between calls and can be serialized/restored
77+
78+
## Examples
79+
80+
### Basic Grid Alignment
81+
82+
```
83+
Input: dt = 5
84+
t=1: Receive(1, 10.0) -> No emission, nextEmit = 6
85+
t=3: Receive(3, 30.0) -> No emission
86+
t=6: Receive(6, 60.0) -> Emit(6, 60.0), nextEmit = 11
87+
t=8: Receive(8, 80.0) -> No emission
88+
```
89+
90+
### Large Gap with Multiple Emissions
91+
92+
```
93+
Input: dt = 3
94+
t=1: Receive(1, 10.0) -> No emission, nextEmit = 4
95+
t=9: Receive(9, 90.0) -> Emit sequence:
96+
- (4, 10.0) [using last known value]
97+
- (7, 10.0) [using last known value]
98+
nextEmit = 10
99+
```
100+
101+
### Mixed Grid-Aligned and Non-Aligned
102+
103+
```
104+
Input: dt = 5
105+
t=1: Receive(1, 10.0) -> No emission, nextEmit = 6
106+
t=6: Receive(6, 60.0) -> Emit(6, 60.0) [grid-aligned]
107+
t=13: Receive(13, 130.0) -> Emit(11, 60.0) [using previous value]
108+
```
109+
110+
## Error Handling
111+
112+
- Throws if dt ≤ 0
113+
- Throws if input port count is incorrect
114+
115+
## State Management
116+
117+
The operator maintains:
118+
119+
- Current interval (dt)
120+
- Next emission time
121+
- Last received value
122+
- Initialization status
123+
124+
All state can be serialized and restored for system persistence.

0 commit comments

Comments
 (0)