-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgateway.cpp
More file actions
217 lines (203 loc) · 7.7 KB
/
gateway.cpp
File metadata and controls
217 lines (203 loc) · 7.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
#include "discord.hpp"
#include "utils.hpp"
#include "gateway.hpp"
#include "bot.hpp"
#define PAYLOAD_DEBUG 1
namespace backend {
// using default settings
gateway::gateway(Bot *const bot, bool compress, encoding enc) : Connection(bot), workers(NUM_THREADS), rate_sem{rate_limit} {
// set the websocket call back and close handlers
client.set_message_handler([this](const websocket_incoming_message &msg){ this->handle_callback(msg); });
client.set_close_handler([this](const wss_close_status close_status, const utility::string_t &reason, const std::error_code &error) {
std::cout << "error " << error.value() << " " << static_cast<int>(close_status) << ":" << reason << std::endl;
this->close();
});
}
/**
* @brief: run the connection
*
* A call to this function is blocking and this funciton should be called via a seperate thread
*/
void gateway::run() {
nlohmann::json dump;
// set the new connections status
bot->status = NEW;
bot->up_to_date = true;
// connect to the gateway, expect a HELLO packet right after
client.connect(uri).then([](){});
// future to grab exception by the heartbeat thread
// TODO for now, only the exception from the heartbeat thread is caught
std::promise<void> p;
std::future<void> f = p.get_future();
while(bot->status != ACTIVE); // wait until we receive the ready event for us to actually start heartbeating
// TODO if disconnected, signal all other threads from other files
while(bot->status != TERMINATE) { // continually try to maintain a connection
try {
// start all workers, for now, rate limiting is turned off because low activity is expected
boost::asio::post(workers, [&]{ this->heartbeat(p); }); // sends heartbeats at hearbeat intervals
//boost::asio::post(workers, [this] { this->manage_resources(); }); // resets the rate limit
//boost::asio::post(workers, [this] { this->manage_events(); }); // sends events at rate limit
workers.join();
f.get(); // throw exception from the heartbeat thread
} catch(const std::runtime_error &err) { // disconnected, send a resume payload
std::cout << "disconnected!" << std::endl;
bot->up_to_date = false;
reconnect();
} catch(const std::exception &e) { // unexpected exception, time to leave
std::cout << __FILE__ << __LINE__ << ": " << e.what() << std::endl;
throw std::runtime_error("Unexpected exception during heartbeat");
}
}
}
/**
* @brief:
*/
void gateway::reconnect() {
client.connect(uri).then([]() {});
send_payload(package({discord::RESUME}));
}
/**
* @brief: sends the payload via websocket
*
* @param[in]: payload reference that will be sent
*/
void gateway::send_payload(const nlohmann::json &payload) {
websocket_outgoing_message out_msg;
out_msg.set_utf8_message(payload.dump(4));
#if PAYLOAD_DEBUG == 1
std::cout << "Sending to " << client.uri().to_string() << std::endl << payload.dump(4) << std::endl;
#else
#endif
client_lock.lock();
client.send(out_msg).then([](){});
client_lock.unlock();
return;
}
/**
* @brief: client callback handler
*
* Handles incoming payloads by using the provided opcode as key.
*
* @param[in]: incoming message. Use to extract json
*/
void gateway::handle_callback(const websocket_incoming_message &msg) {
std::string utf8_msg = msg.extract_string().get();
auto parsed_json = utf8_msg.empty() ? throw "oh no" : nlohmann::json::parse(utf8_msg);
#if PAYLOAD_DEBUG == 1
std::cout << "message: " << parsed_json.dump(4) << std::endl;
#else
#endif
discord::payload payload_msg = unpack(parsed_json);
if(bot->up_to_date == true) { // if this is the most recent event, then we are ok with sending it
try {
(this->*(this->handlers[payload_msg.op]))(payload_msg);
} catch(const std::exception &e) { // TODO handle this
std::cout << __FILE__ << __LINE__ << ": " << e.what() << std::endl;
close();
exit(2);
}
} else { // we should record all events that have occured until we are resumed
while(payload_msg.t != "RESUMED") {
}
bot->up_to_date = true;
}
}
/**
* @brief: sends a heartbeat payload at heartbeat intervals
*
* Discord API requires that a heartbeat payload be sent at heartbeat_interval intervals.
* The number of heartbeat is kept in a variable heartbeat_ticks which is incremented everytime a
* heartbeat is sent and decrement when a HEARTBEAT_ACK is received, therefore it should always be
* 0 at the start of sending the heartbeat;
*
* @bug: how to make sure that the heartbeat thread and the other threads share clients properly?
* @bug: concurrency issues with heartbeat?
*/
void gateway::heartbeat(std::promise<void> &p) {
// FIXME not sure what to get for sequence data
try {
while(bot->status == ACTIVE) {
// serialize the main thread which can be sending heartbeats in response
heartbeat_lock.lock();
if(heartbeat_ticks != 0) {
bot->status = DISCONNECTED;
heartbeat_lock.unlock();
throw std::runtime_error("did not receive heartbeat ack");
}
heartbeat_ticks++; // increment the ticks to show that sent out a heartbeat
// heartbeat_interval is maximum amount of time and there's no punishment for
// heartbeating, so send it a little earlier
auto x = std::chrono::steady_clock::now() +
std::chrono::milliseconds(heartbeat_interval);
send_payload(
{
{ "op", discord::HEARTBEAT },
{ "d", last_sequence_data },
});
heartbeat_lock.unlock();
std::this_thread::sleep_until(x);
}
return;
} catch(const std::exception &err) {
p.set_exception(std::current_exception());
return;
}
}
/**
* @brief: manage sending of events
*
* While the connection is active, ths function manages the sending of events while
* implementing rate limiting. This thread uses a semaphore that is decremented everytime
* an event is sent to server. The semaphore will block at 0 which means that the connection
* is reaching its event rate limit. The function will block until the semaphore is reset by
* the resource manager thread.
*/
void gateway::manage_events() {
try {
while(bot->status == ACTIVE) {
event_q_lock.lock();
if(event_q.empty()) {
event_q_lock.unlock();
continue;
}
discord::payload p = event_q.front();
send_payload(package(p));
event_q.pop();
event_q_lock.unlock();
rate_sem.wait();
}
return;
} catch(const std::exception &e) {
std::cout << __FILE__ << __LINE__ << ": " << e.what() << std::endl;
}
}
/**
* @brief: ends the connection with the websocket
*
* Declares the state dead which forces heartbeating to stop and join the main thread.
* Once all threads have been joined, the client closes.
*/
void gateway::close() {
bot->status = TERMINATE;
workers.join(); // peaceful exit
client.close().then([](){});
}
/**
* @brief: manages the resources of the connection
*
* For now, all this thread does is reset the wait limit of the event semaphore.
*/
void gateway::manage_resources() {
try {
while(bot->status == ACTIVE) {
// reset the semaphore every minute
auto x = std::chrono::steady_clock::now() + std::chrono::seconds(60);
std::this_thread::sleep_until(x);
rate_sem.reset();
}
return;
} catch(const std::exception &e) {
std::cout << __FILE__ << __LINE__ << ": " << e.what() << std::endl;
}
}
}