Skip to content

Commit 215d042

Browse files
committed
feat(std): add constant resampler operator
1 parent 9d0a777 commit 215d042

File tree

3 files changed

+386
-0
lines changed

3 files changed

+386
-0
lines changed
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
#ifndef CONSTANTRESAMPLER_H
2+
#define CONSTANTRESAMPLER_H
3+
4+
#include "rtbot/Operator.h"
5+
6+
namespace rtbot {
7+
8+
using namespace std;
9+
10+
template <class T, class V>
11+
struct ConstantResampler : public Operator<T, V> {
12+
T dt; // Time interval between emissions
13+
T nextEmit; // Next time point to emit at
14+
bool initiated; // Whether we've received our first message
15+
V lastValue; // Last value received, used for causal consistency
16+
17+
ConstantResampler() = default;
18+
19+
ConstantResampler(string const& id, T interval) : Operator<T, V>(id) {
20+
if (interval <= 0) throw runtime_error(typeName() + ": time interval must be positive");
21+
22+
this->dt = interval;
23+
this->initiated = false;
24+
this->addDataInput("i1", 1);
25+
this->addOutput("o1");
26+
}
27+
28+
virtual Bytes collect() {
29+
Bytes bytes = Operator<T, V>::collect();
30+
31+
bytes.insert(bytes.end(), reinterpret_cast<const unsigned char*>(&dt),
32+
reinterpret_cast<const unsigned char*>(&dt) + sizeof(dt));
33+
bytes.insert(bytes.end(), reinterpret_cast<const unsigned char*>(&nextEmit),
34+
reinterpret_cast<const unsigned char*>(&nextEmit) + sizeof(nextEmit));
35+
bytes.insert(bytes.end(), reinterpret_cast<const unsigned char*>(&initiated),
36+
reinterpret_cast<const unsigned char*>(&initiated) + sizeof(initiated));
37+
bytes.insert(bytes.end(), reinterpret_cast<const unsigned char*>(&lastValue),
38+
reinterpret_cast<const unsigned char*>(&lastValue) + sizeof(lastValue));
39+
40+
return bytes;
41+
}
42+
43+
virtual void restore(Bytes::const_iterator& it) {
44+
Operator<T, V>::restore(it);
45+
46+
dt = *reinterpret_cast<const T*>(&(*it));
47+
it += sizeof(dt);
48+
nextEmit = *reinterpret_cast<const T*>(&(*it));
49+
it += sizeof(nextEmit);
50+
initiated = *reinterpret_cast<const bool*>(&(*it));
51+
it += sizeof(initiated);
52+
lastValue = *reinterpret_cast<const V*>(&(*it));
53+
it += sizeof(lastValue);
54+
}
55+
56+
string typeName() const override { return "ConstantResampler"; }
57+
58+
OperatorMessage<T, V> processData() override {
59+
string inputPort;
60+
auto in = this->getDataInputs();
61+
if (in.size() == 1)
62+
inputPort = in.at(0);
63+
else
64+
throw runtime_error(typeName() + " : more than 1 input port found");
65+
66+
Message<T, V> msg = this->getDataInputLastMessage(inputPort);
67+
68+
// Initialize on first message
69+
if (!initiated) {
70+
nextEmit = msg.time + dt;
71+
lastValue = msg.value;
72+
initiated = true;
73+
return {};
74+
}
75+
76+
OperatorMessage<T, V> outputMsgs;
77+
PortMessage<T, V> toEmit;
78+
79+
// If message time is past next emit time, we need to emit at all grid points
80+
// up to (but not including) the current message time
81+
while (nextEmit < msg.time) {
82+
Message<T, V> out;
83+
out.time = nextEmit;
84+
out.value = lastValue; // Use previous value for causal consistency
85+
toEmit.push_back(out);
86+
nextEmit += dt;
87+
}
88+
89+
// If current message is exactly on the grid, emit with current value
90+
if (nextEmit == msg.time) {
91+
Message<T, V> out;
92+
out.time = msg.time;
93+
out.value = msg.value; // Use current value for grid-aligned messages
94+
toEmit.push_back(out);
95+
nextEmit += dt;
96+
}
97+
98+
// If we emitted anything, add it to output messages
99+
if (!toEmit.empty()) {
100+
outputMsgs.emplace("o1", toEmit);
101+
}
102+
103+
// Update last value for future emissions
104+
lastValue = msg.value;
105+
106+
return outputMsgs;
107+
}
108+
109+
T getInterval() const { return dt; }
110+
T getNextEmissionTime() const { return nextEmit; }
111+
bool isInitiated() const { return initiated; }
112+
V getLastValue() const { return lastValue; }
113+
114+
private:
115+
void reset() { initiated = false; }
116+
};
117+
118+
} // namespace rtbot
119+
120+
#endif // CONSTANTRESAMPLER_H
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
---
2+
behavior:
3+
buffered: true
4+
throughput: constant
5+
view:
6+
shape: circle
7+
latex:
8+
template: |
9+
\Delta t = {{interval}}
10+
jsonschema:
11+
type: object
12+
properties:
13+
id:
14+
type: string
15+
description: The id of the operator
16+
interval:
17+
type: integer
18+
examples:
19+
- 5
20+
description: The time interval between emissions.
21+
minimum: 1
22+
required: ["id", "interval"]
23+
---
24+
25+
# ConstantResampler
26+
27+
Inputs: `i1`
28+
Outputs: `o1`
29+
30+
The `ConstantResampler` operator transforms a variable throughput input signal into a constant throughput output signal by emitting values at fixed time intervals. The operator maintains causal consistency while handling grid-aligned messages with special precision.
31+
32+
## Behavior
33+
34+
### Initialization
35+
36+
When receiving its first message at time t₀, the operator:
37+
38+
- Stores the initial value
39+
- Sets up first emission point at t₀ + dt
40+
- Does not emit
41+
42+
### Regular Operation
43+
44+
For subsequent messages, the operator follows these rules:
45+
46+
1. **Grid-Aligned Messages**:
47+
48+
- If a message arrives exactly at a grid point (t = nextEmit)
49+
- Emits immediately with the current message's value
50+
- Updates nextEmit to t + dt
51+
52+
2. **Between Grid Points**:
53+
54+
- If message time is between grid points
55+
- Stores the value for future emissions
56+
- No immediate emission
57+
58+
3. **Past Grid Points**:
59+
60+
- If message time is past one or more grid points
61+
- Emits at each missed grid point using the last known value
62+
- Maintains causal consistency by using values known at each emission time
63+
64+
4. **Large Gaps**:
65+
- For large time gaps between messages
66+
- Emits at all intermediate grid points
67+
- Uses the last known value before each emission point
68+
69+
### Key Characteristics
70+
71+
- Lazy evaluation: only emits when receiving messages
72+
- Maintains constant time intervals between emissions
73+
- Preserves causal consistency
74+
- Special handling for grid-aligned messages
75+
- Buffers only one message (the most recent value)
76+
- State is maintained between calls and can be serialized/restored
77+
78+
## Examples
79+
80+
### Basic Grid Alignment
81+
82+
```
83+
Input: dt = 5
84+
t=1: Receive(1, 10.0) -> No emission, nextEmit = 6
85+
t=3: Receive(3, 30.0) -> No emission
86+
t=6: Receive(6, 60.0) -> Emit(6, 60.0), nextEmit = 11
87+
t=8: Receive(8, 80.0) -> No emission
88+
```
89+
90+
### Large Gap with Multiple Emissions
91+
92+
```
93+
Input: dt = 3
94+
t=1: Receive(1, 10.0) -> No emission, nextEmit = 4
95+
t=9: Receive(9, 90.0) -> Emit sequence:
96+
- (4, 10.0) [using last known value]
97+
- (7, 10.0) [using last known value]
98+
nextEmit = 10
99+
```
100+
101+
### Mixed Grid-Aligned and Non-Aligned
102+
103+
```
104+
Input: dt = 5
105+
t=1: Receive(1, 10.0) -> No emission, nextEmit = 6
106+
t=6: Receive(6, 60.0) -> Emit(6, 60.0) [grid-aligned]
107+
t=13: Receive(13, 130.0) -> Emit(11, 60.0) [using previous value]
108+
```
109+
110+
## Error Handling
111+
112+
- Throws if dt ≤ 0
113+
- Throws if input port count is incorrect
114+
115+
## State Management
116+
117+
The operator maintains:
118+
119+
- Current interval (dt)
120+
- Next emission time
121+
- Last received value
122+
- Initialization status
123+
124+
All state can be serialized and restored for system persistence.
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
#include <catch2/catch.hpp>
2+
3+
#include "rtbot/std/ConstantResampler.h"
4+
5+
using namespace rtbot;
6+
using namespace std;
7+
8+
TEST_CASE("ConstantResampler") {
9+
SECTION("Constructor validation") {
10+
REQUIRE_NOTHROW(ConstantResampler<uint64_t, double>("test", 5));
11+
REQUIRE_THROWS(ConstantResampler<uint64_t, double>("test", 0));
12+
}
13+
14+
SECTION("Basic resampling with causal consistency") {
15+
auto resampler = ConstantResampler<uint64_t, double>("test", 3);
16+
17+
// First message - should not emit
18+
resampler.receiveData(Message<uint64_t, double>(1, 10.0));
19+
auto result1 = resampler.executeData();
20+
REQUIRE(result1.empty());
21+
REQUIRE(resampler.getNextEmissionTime() == 4);
22+
REQUIRE(resampler.getLastValue() == 10.0);
23+
24+
// Message at emission time - should emit with previous value
25+
resampler.receiveData(Message<uint64_t, double>(5, 50.0));
26+
auto result2 = resampler.executeData();
27+
REQUIRE(!result2.empty());
28+
auto& emissions = result2.find("test")->second.find("o1")->second;
29+
REQUIRE(emissions[0].time == 4);
30+
REQUIRE(emissions[0].value == 10.0); // Should use previous value
31+
REQUIRE(resampler.getLastValue() == 50.0); // But update last value
32+
}
33+
34+
SECTION("Multiple grid points between messages") {
35+
auto resampler = ConstantResampler<uint64_t, double>("test", 2);
36+
37+
// Initialize
38+
resampler.receiveData(Message<uint64_t, double>(1, 10.0));
39+
auto result1 = resampler.executeData();
40+
REQUIRE(result1.empty());
41+
REQUIRE(resampler.getNextEmissionTime() == 3);
42+
43+
// Send message with large gap - should emit at all grid points
44+
resampler.receiveData(Message<uint64_t, double>(8, 80.0));
45+
auto result2 = resampler.executeData();
46+
REQUIRE(!result2.empty());
47+
48+
auto& emissions = result2.find("test")->second.find("o1")->second;
49+
REQUIRE(emissions.size() == 3); // Should emit at t=3,5,7
50+
51+
// All emissions should use the previous value (10.0)
52+
REQUIRE(emissions[0].time == 3);
53+
REQUIRE(emissions[0].value == 10.0);
54+
REQUIRE(emissions[1].time == 5);
55+
REQUIRE(emissions[1].value == 10.0);
56+
REQUIRE(emissions[2].time == 7);
57+
REQUIRE(emissions[2].value == 10.0);
58+
59+
// Last value should be updated
60+
REQUIRE(resampler.getLastValue() == 80.0);
61+
REQUIRE(resampler.getNextEmissionTime() == 9);
62+
}
63+
64+
SECTION("State serialization with lastValue") {
65+
auto resampler1 = ConstantResampler<uint64_t, double>("test", 3);
66+
67+
// Initialize and process some data
68+
resampler1.receiveData(Message<uint64_t, double>(1, 10.0));
69+
resampler1.executeData();
70+
71+
// Serialize state
72+
Bytes state = resampler1.collect();
73+
74+
// Create new resampler and restore state
75+
auto resampler2 = ConstantResampler<uint64_t, double>("test", 3);
76+
Bytes::const_iterator it = state.begin();
77+
resampler2.restore(it);
78+
79+
// Verify both resamplers have same state including lastValue
80+
REQUIRE(resampler1.getInterval() == resampler2.getInterval());
81+
REQUIRE(resampler1.getNextEmissionTime() == resampler2.getNextEmissionTime());
82+
REQUIRE(resampler1.isInitiated() == resampler2.isInitiated());
83+
REQUIRE(resampler1.getLastValue() == resampler2.getLastValue());
84+
}
85+
86+
SECTION("Rapid message sequence") {
87+
auto resampler = ConstantResampler<uint64_t, double>("test", 5);
88+
89+
resampler.receiveData(Message<uint64_t, double>(1, 10.0));
90+
auto result1 = resampler.executeData();
91+
REQUIRE(result1.empty());
92+
93+
// Rapid sequence of messages before next emit time
94+
resampler.receiveData(Message<uint64_t, double>(2, 20.0));
95+
resampler.receiveData(Message<uint64_t, double>(3, 30.0));
96+
resampler.receiveData(Message<uint64_t, double>(4, 40.0));
97+
auto result2 = resampler.executeData();
98+
REQUIRE(result2.empty());
99+
100+
// Message at emit time should use last value before emit time
101+
resampler.receiveData(Message<uint64_t, double>(7, 70.0));
102+
auto result3 = resampler.executeData();
103+
REQUIRE(!result3.empty());
104+
auto& emissions = result3.find("test")->second.find("o1")->second;
105+
REQUIRE(emissions[0].time == 6);
106+
REQUIRE(emissions[0].value == 40.0); // Should use value from t=4
107+
}
108+
109+
SECTION("Grid-aligned messages") {
110+
auto resampler = ConstantResampler<uint64_t, double>("test", 5);
111+
112+
// Initialize
113+
resampler.receiveData(Message<uint64_t, double>(1, 10.0));
114+
auto result1 = resampler.executeData();
115+
REQUIRE(result1.empty());
116+
REQUIRE(resampler.getNextEmissionTime() == 6);
117+
118+
// Send messages before grid point
119+
resampler.receiveData(Message<uint64_t, double>(3, 30.0));
120+
resampler.receiveData(Message<uint64_t, double>(4, 40.0));
121+
auto result2 = resampler.executeData();
122+
REQUIRE(result2.empty());
123+
124+
// Send message exactly at grid point
125+
resampler.receiveData(Message<uint64_t, double>(6, 60.0));
126+
auto result3 = resampler.executeData();
127+
REQUIRE(!result3.empty());
128+
auto& emissions = result3.find("test")->second.find("o1")->second;
129+
REQUIRE(emissions[0].time == 6);
130+
REQUIRE(emissions[0].value == 60.0); // Should use current value, not previous
131+
REQUIRE(resampler.getNextEmissionTime() == 11);
132+
133+
// Send message past grid point
134+
resampler.receiveData(Message<uint64_t, double>(13, 130.0));
135+
auto result4 = resampler.executeData();
136+
REQUIRE(!result4.empty());
137+
auto& emissions2 = result4.find("test")->second.find("o1")->second;
138+
REQUIRE(emissions2.size() == 1);
139+
REQUIRE(emissions2[0].time == 11);
140+
REQUIRE(emissions2[0].value == 60.0); // Should use previous value
141+
}
142+
}

0 commit comments

Comments
 (0)