Skip to content

Commit 69395c8

Browse files
committed
feat: recover programs from binary representation
1 parent af9186a commit 69395c8

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+773
-144
lines changed

libs/api/include/rtbot/FactoryOp.h

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ using namespace std;
1616

1717
class FactoryOp {
1818
map<string, Program> programs;
19-
OperatorPayload<uint64_t, double> messageBuffer;
19+
ProgramMessage<uint64_t, double> messageBuffer;
2020

2121
public:
2222
FactoryOp();
@@ -54,8 +54,19 @@ class FactoryOp {
5454

5555
static Op_ptr<uint64_t, double> readOp(string const& json_string);
5656
static string writeOp(Op_ptr<uint64_t, double> const& op);
57+
5758
static Program createProgram(string const& json_string) { return Program(json_string); }
5859

60+
Bytes collect(string const& programId) {
61+
if (this->programs.count(programId) == 0) throw runtime_error("Program " + programId + " was not found");
62+
return programs.at(programId).collect();
63+
}
64+
65+
void restore(string const& programId, Bytes const& bytes) {
66+
if (this->programs.count(programId) == 0) throw runtime_error("Program " + programId + " was not found");
67+
programs.at(programId).restore(bytes);
68+
}
69+
5970
string createProgram(string const& id, string const& json_program);
6071

6172
bool deleteProgram(string const& id) { return (programs.erase(id) == 1) ? true : false; }
@@ -66,7 +77,7 @@ class FactoryOp {
6677
return true;
6778
}
6879

69-
OperatorPayload<uint64_t, double> processMessageBuffer(const string& apId) {
80+
ProgramMessage<uint64_t, double> processMessageBuffer(const string& apId) {
7081
if (this->programs.count(apId) == 0) throw runtime_error("Program " + apId + " was not found");
7182
if (this->messageBuffer.count(apId) > 0) {
7283
if (!this->messageBuffer.at(apId).empty()) {
@@ -79,7 +90,7 @@ class FactoryOp {
7990
return {};
8091
}
8192

82-
OperatorPayload<uint64_t, double> processMessageBufferDebug(const string& apId) {
93+
ProgramMessage<uint64_t, double> processMessageBufferDebug(const string& apId) {
8394
if (this->programs.count(apId) == 0) throw runtime_error("Program " + apId + " was not found");
8495
if (this->messageBuffer.count(apId) > 0) {
8596
if (!this->messageBuffer.at(apId).empty()) {
@@ -107,15 +118,15 @@ class FactoryOp {
107118
return this->programs.at(apId).getProgramOutputFilter();
108119
}
109120

110-
OperatorPayload<uint64_t, double> processMessageMap(string const& apId,
111-
const PortPayload<uint64_t, double>& messagesMap) {
121+
ProgramMessage<uint64_t, double> processMessageMap(string const& apId,
122+
const OperatorMessage<uint64_t, double>& messagesMap) {
112123
auto it = programs.find(apId);
113124
if (it == programs.end()) return {};
114125
return it->second.receive(messagesMap);
115126
}
116127

117-
OperatorPayload<uint64_t, double> processMessageMapDebug(string const& apId,
118-
const PortPayload<uint64_t, double>& messagesMap) {
128+
ProgramMessage<uint64_t, double> processMessageMapDebug(string const& apId,
129+
const OperatorMessage<uint64_t, double>& messagesMap) {
119130
auto it = programs.find(apId);
120131
if (it == programs.end()) return {};
121132
return it->second.receiveDebug(messagesMap);

libs/api/include/rtbot/Pipeline.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ struct Pipeline : public Operator<T, V> {
4646
this->toProcess.insert(inputPort);
4747
}
4848

49-
OperatorPayload<T, V> executeData() override {
50-
OperatorPayload<T, V> opResults;
49+
ProgramMessage<T, V> executeData() override {
50+
ProgramMessage<T, V> opResults;
5151
for (auto id : this->toProcess) {
5252
auto results = inputs.at(id)->executeData();
5353
Operator<T, V>::mergeOutput(opResults, results);
@@ -57,14 +57,14 @@ struct Pipeline : public Operator<T, V> {
5757

5858
// add the prefix of the pipeline: {id, {port,value}} --> {id:port, value}
5959
// transform this to o1, o2 notation
60-
PortPayload<T, V> output;
60+
OperatorMessage<T, V> output;
6161
for (auto [id, op1] : opResults)
6262
for (auto [port, value] : op1)
6363
if (auto it = outputs.find(id + ":" + port); it != outputs.end()) output.emplace(portsMap.at(it->first), value);
6464
return this->emit(output);
6565
}
6666

67-
PortPayload<T, V> processData() override { return {}; } // do nothing
67+
OperatorMessage<T, V> processData() override { return {}; } // do nothing
6868

6969
private:
7070
static auto split2(string const& s, char delim = ':') {

libs/api/include/rtbot/Program.h

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,31 @@ struct Program {
2424
void operator=(Program const&) = delete;
2525
Program(Program&& other) = default;
2626

27+
Bytes collect() {
28+
Bytes bytes;
29+
for (auto it = this->all_op.begin(); it != this->all_op.end(); ++it) {
30+
Bytes opBytes = it->second->collect();
31+
bytes.insert(bytes.end(), opBytes.begin(), opBytes.end());
32+
}
33+
34+
return bytes;
35+
}
36+
37+
void restore(Bytes const& bytes) {
38+
Bytes::const_iterator bytes_it = bytes.begin();
39+
for (auto it = this->all_op.begin(); it != this->all_op.end(); ++it) {
40+
it->second->restore(bytes_it);
41+
}
42+
}
43+
44+
string debug() {
45+
string toReturn;
46+
for (auto it = this->all_op.begin(); it != this->all_op.end(); ++it) {
47+
toReturn += "\n " + it->second->debug("");
48+
}
49+
return toReturn;
50+
}
51+
2752
string getProgramEntryOperatorId() { return entryOperator; }
2853

2954
vector<string> getProgramEntryPorts() {
@@ -37,9 +62,9 @@ struct Program {
3762
map<string, vector<string>> getProgramOutputFilter() { return this->outputFilter; }
3863

3964
/// return a list of output ports from the output operator that emitted: id, output message
40-
OperatorPayload<uint64_t, double> receive(const PortPayload<uint64_t, double>& messagesMap) {
41-
OperatorPayload<uint64_t, double> opResults;
42-
OperatorPayload<uint64_t, double> toReturn;
65+
ProgramMessage<uint64_t, double> receive(const OperatorMessage<uint64_t, double>& messagesMap) {
66+
ProgramMessage<uint64_t, double> opResults;
67+
ProgramMessage<uint64_t, double> toReturn;
4368

4469
opResults = receiveDebug(messagesMap);
4570

@@ -48,7 +73,7 @@ struct Program {
4873
auto outResults = opResults.at(op->first);
4974
for (int i = 0; i < op->second.size(); i++) {
5075
if (outResults.count(op->second.at(i)) > 0) {
51-
PortPayload<uint64_t, double> opReturn;
76+
OperatorMessage<uint64_t, double> opReturn;
5277
opReturn.emplace(op->second.at(i), outResults.at(op->second.at(i)));
5378
if (toReturn.count(op->first) == 0)
5479
toReturn.emplace(op->first, opReturn);
@@ -67,11 +92,11 @@ struct Program {
6792
}
6893

6994
/// return a list of the operator that emitted: id, output message
70-
OperatorPayload<uint64_t, double> receiveDebug(const PortPayload<uint64_t, double>& messagesMap) {
95+
ProgramMessage<uint64_t, double> receiveDebug(const OperatorMessage<uint64_t, double>& messagesMap) {
7196
if (this->all_op.count(this->entryOperator) == 0)
7297
throw runtime_error("Entry operator " + this->entryOperator + " was not found");
7398

74-
OperatorPayload<uint64_t, double> opResults;
99+
ProgramMessage<uint64_t, double> opResults;
75100

76101
for (auto it = messagesMap.begin(); it != messagesMap.end(); ++it) {
77102
for (int j = 0; j < it->second.size(); j++) {

libs/api/include/rtbot/bindings.h

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,15 @@
1212
using namespace std;
1313
using namespace rtbot;
1414

15+
Bytes collect(string const& programId);
16+
// after creating a program, we can restore it with the a collected state
17+
void restore(string const& programId, Bytes const& bytes);
18+
19+
// TODO
20+
/*const ProgramMessage<uint64_t, double>& processMessageMapNative(string const& programId,
21+
const OperatorMessage<uint64_t, double>& messagesMap);
22+
*/
23+
1524
string validate(string const& json_program);
1625
string validateOperator(string const& type, string const& json_op);
1726

@@ -27,8 +36,8 @@ string getProgramEntryOperatorId(const string& programId);
2736
string getProgramEntryPorts(const string& programId);
2837
string getProgramOutputFilter(const string& programId);
2938

30-
string processMessageMap(string const& programId, const PortPayload<uint64_t, double>& messagesMap);
31-
string processMessageMapDebug(string const& programId, const PortPayload<uint64_t, double>& messagesMap);
39+
string processMessageMap(string const& programId, const OperatorMessage<uint64_t, double>& messagesMap);
40+
string processMessageMapDebug(string const& programId, const OperatorMessage<uint64_t, double>& messagesMap);
3241

3342
string processBatch(string const& programId, vector<uint64_t> times, vector<double> values,
3443
vector<string> const& ports);

libs/api/src/bindings.cpp

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,9 @@ void from_json(const json& j, Message<T, V>& p) {
3939

4040
} // namespace rtbot
4141

42-
using namespace rtbot;
42+
Bytes collect(string const& programId) { return factory.collect(programId); }
43+
44+
void restore(string const& programId, Bytes const& bytes) { factory.restore(programId, bytes); }
4345

4446
string validateOperator(string const& type, string const& json_op) {
4547
json_validator validator(nullptr, nlohmann::json_schema::default_string_format_check); // create validator
@@ -134,12 +136,12 @@ string getProgramOutputFilter(const string& programId) {
134136
return nlohmann::json(result).dump();
135137
}
136138

137-
string processMessageMap(const string& programId, const PortPayload<uint64_t, double>& messagesMap) {
139+
string processMessageMap(const string& programId, const OperatorMessage<uint64_t, double>& messagesMap) {
138140
auto result = factory.processMessageMap(programId, messagesMap);
139141
return nlohmann::json(result).dump();
140142
}
141143

142-
string processMessageMapDebug(string const& programId, const PortPayload<uint64_t, double>& messagesMap) {
144+
string processMessageMapDebug(string const& programId, const OperatorMessage<uint64_t, double>& messagesMap) {
143145
auto result = factory.processMessageMapDebug(programId, messagesMap);
144146
return nlohmann::json(result).dump();
145147
}

libs/api/test/test_bollinger_bands_generated.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ TEST_CASE("read pipeline test bollinger_bands") {
4444
v.push_back(end);
4545

4646
for (int i = 0; i < v.size(); i++) {
47-
PortPayload<uint64_t, double> messagesMap;
47+
OperatorMessage<uint64_t, double> messagesMap;
4848
vector<Message<uint64_t, double>> x;
4949
x.push_back(Message<uint64_t, double>(v.at(i), v.at(i) * v.at(i)));
5050
messagesMap.emplace("i1", x);

libs/api/test/test_demultiplexer_pipeline.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ TEST_CASE("read pipeline test demultiplexer") {
2222

2323
// process the data
2424
for (int i = 1; i <= 100; i++) {
25-
PortPayload<uint64_t, double> messagesMap;
25+
OperatorMessage<uint64_t, double> messagesMap;
2626
vector<Message<uint64_t, double>> v;
2727
v.push_back(Message<uint64_t, double>(i, (i < 20) ? 1 : 2));
2828
messagesMap.emplace("i1", v);

libs/api/test/test_json_schema_validation.cpp

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
#include "rtbot/jsonschema.hpp"
2-
31
#include <catch2/catch.hpp>
42
#include <fstream>
53
#include <iomanip>
@@ -9,6 +7,7 @@
97

108
#include "rtbot/FactoryOp.h"
119
#include "rtbot/bindings.h"
10+
#include "rtbot/jsonschema.hpp"
1211
#include "tools.h"
1312

1413
using namespace rtbot;
@@ -22,10 +21,8 @@ TEST_CASE("jsonschema validation") {
2221
if (!in) throw runtime_error("file not found");
2322
in >> program;
2423
}
25-
cout << "validating program....." << endl;
2624

2725
std::string result = validate(program.dump());
28-
cout << "result " << result << endl;
2926
REQUIRE(nlohmann::json::parse(result)["valid"]);
3027
}
3128
SECTION("validates a valid operator") {
@@ -40,4 +37,3 @@ TEST_CASE("jsonschema validation") {
4037
REQUIRE(!nlohmann::json::parse(result)["valid"]);
4138
}
4239
}
43-

libs/api/test/test_misc_pipeline.cpp

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ TEST_CASE("read ppg pipeline") {
2626

2727
// process the data
2828
for (auto i = 0u; i < s.ti.size(); i++) {
29-
PortPayload<uint64_t, double> messagesMap;
29+
OperatorMessage<uint64_t, double> messagesMap;
3030
vector<Message<uint64_t, double>> v;
3131
v.push_back(Message<uint64_t, double>(s.ti[i], s.ppg[i]));
3232
messagesMap.emplace("i1", v);
@@ -44,7 +44,7 @@ TEST_CASE("read ppg pipeline") {
4444
REQUIRE(entryPipe1 == "in1");
4545
// process the data
4646
for (auto i = 0u; i < s.ti.size(); i++) {
47-
PortPayload<uint64_t, double> messagesMap;
47+
OperatorMessage<uint64_t, double> messagesMap;
4848
vector<Message<uint64_t, double>> v;
4949
v.push_back(Message<uint64_t, double>(s.ti[i], s.ppg[i]));
5050
messagesMap.emplace("i1", v);
@@ -70,7 +70,7 @@ TEST_CASE("read pipeline test data basic data") {
7070

7171
// process the data
7272
for (int i = 0; i < 100; i++) {
73-
PortPayload<uint64_t, double> messagesMap;
73+
OperatorMessage<uint64_t, double> messagesMap;
7474
vector<Message<uint64_t, double>> v;
7575
v.push_back(Message<uint64_t, double>(i, i % 5));
7676
messagesMap.emplace("i1", v);
@@ -104,7 +104,7 @@ TEST_CASE("read pipeline test join port") {
104104

105105
// process the data
106106
for (int i = 1; i < 100; i++) {
107-
PortPayload<uint64_t, double> messagesMap;
107+
OperatorMessage<uint64_t, double> messagesMap;
108108
vector<Message<uint64_t, double>> v;
109109
v.push_back(Message<uint64_t, double>(i, i % 5));
110110
messagesMap.emplace("i1", v);
@@ -120,7 +120,5 @@ TEST_CASE("read pipeline test join port") {
120120
REQUIRE(output.find("sc2")->second.find("o1")->second.at(0).value == 3 * (i % 5));
121121
}
122122
}
123-
124-
cout << pipe.getProgram() << endl;
125123
}
126124
}

libs/api/test/test_pipeline_op.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ TEST_CASE("Pipeline operator") {
5050

5151
// pass through the factory
5252
string s = FactoryOp::writeOp(make_unique<Pipeline<>>(pipe));
53-
cout << "Pipeline to json:\n" << s << endl;
5453
auto op = FactoryOp::readOp(s);
5554

5655
// process the data

0 commit comments

Comments
 (0)