Skip to content
This repository was archived by the owner on Sep 9, 2025. It is now read-only.

Commit 0f2ac2d

Browse files
author
hamidr
committed
Support sub/pub/psub/punsub commands
1 parent 21dd56b commit 0f2ac2d

File tree

1 file changed

+280
-0
lines changed

1 file changed

+280
-0
lines changed

includes/monitor.hpp

Lines changed: 280 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,280 @@
1+
#pragma once
2+
3+
#include <parser/array_parser.h>
4+
5+
#include <list>
6+
#include <cassert>
7+
8+
namespace async_redis {
9+
namespace redis_impl
10+
{
11+
using std::string;
12+
13+
template<typename InputOutputHandler, typename SocketType, typename ParserPolicy>
14+
class monitor
15+
{
16+
public:
17+
enum EventState {
18+
Sub,
19+
Unsub,
20+
Stream,
21+
Disconnected
22+
};
23+
24+
using parser_t = typename ParserPolicy::parser;
25+
using watcher_cb_t = std::function<void (const string&, parser_t, EventState)>;
26+
27+
monitor(InputOutputHandler &event_loop)
28+
: io_(event_loop)
29+
{
30+
socket_ = std::make_unique<SocketType>(event_loop);
31+
}
32+
33+
template<typename ...Args>
34+
inline void connect(Args... args) {
35+
if (!socket_->is_valid())
36+
socket_ = std::make_unique<SocketType>(io_);
37+
38+
socket_->template async_connect<SocketType>(0, std::forward<Args>(args)...);
39+
}
40+
41+
inline bool is_connected() const
42+
{ return socket_->is_connected(); }
43+
44+
bool is_watching() const
45+
{ return this->is_connected() && is_watching_; }
46+
47+
void disconnect()
48+
{
49+
socket_->close();
50+
51+
pwatchers_.clear();
52+
watchers_.clear();
53+
is_watching_ = false;
54+
}
55+
56+
bool psubscribe(const std::list<string>& channels, watcher_cb_t&& cb)
57+
{
58+
assert(channels.size());
59+
string start_cmd = "psubscribe";
60+
61+
for(auto &ch : channels) {
62+
start_cmd += " " + ch;
63+
pwatchers_.emplace(ch, cb);
64+
}
65+
66+
start_cmd += "\r\n";
67+
return send_and_receive(std::move(start_cmd));
68+
}
69+
70+
bool subscribe(const std::list<string>& channels, watcher_cb_t&& cb)
71+
{
72+
assert(channels.size());
73+
74+
string cmd = "subscribe";
75+
76+
for(auto &ch : channels) {
77+
cmd += " " + ch;
78+
watchers_.emplace(ch, cb);
79+
}
80+
81+
cmd += "\r\n";
82+
return send_and_receive(std::move(cmd));
83+
}
84+
85+
bool unsubscribe(const std::list<string>& channels, watcher_cb_t&& cb)
86+
{
87+
assert(channels.size());
88+
89+
string cmd = "unsubscribe";
90+
for(auto &ch : channels)
91+
cmd += " " + ch;
92+
cmd += "\r\n";
93+
94+
return send_and_receive(std::move(cmd));
95+
}
96+
97+
bool punsubscribe(const std::list<string>& channels, watcher_cb_t&& cb)
98+
{
99+
assert(channels.size());
100+
101+
string cmd = "punsubscribe";
102+
for(auto &ch : channels)
103+
cmd += " " + ch;
104+
cmd += "\r\n";
105+
106+
return send_and_receive(std::move(cmd));
107+
}
108+
109+
private:
110+
bool send_and_receive(string&& data)
111+
{
112+
if (!is_connected())
113+
return false;
114+
115+
socket_->async_write(data, [this](ssize_t sent_chunk_len) {
116+
if (is_watching_)
117+
return;
118+
119+
this->socket_->async_read(
120+
this->data_,
121+
this->max_data_size,
122+
std::bind(
123+
&monitor::stream_received,
124+
this,
125+
std::placeholders::_1
126+
)
127+
);
128+
129+
this->is_watching_ = true;
130+
});
131+
return true;
132+
}
133+
134+
void handle_message_event(parser_t& channel, parser_t& value)
135+
{
136+
const string& ch_key = channel->to_string();
137+
auto itr = watchers_.find(ch_key);
138+
assert(itr != watchers_.end());
139+
itr->second(ch_key, value, EventState::Stream);
140+
}
141+
142+
void handle_subscribe_event(parser_t& channel, parser_t& clients)
143+
{
144+
const string& ch_key = channel->to_string();
145+
auto itr = watchers_.find(ch_key);
146+
assert(itr != watchers_.end());
147+
itr->second(ch_key, clients, EventState::Sub);
148+
}
149+
150+
void handle_psubscribe_event(parser_t& channel, parser_t& clients)
151+
{
152+
const string& ch_key = channel->to_string();
153+
auto itr = pwatchers_.find(ch_key);
154+
assert(itr != pwatchers_.end());
155+
itr->second(ch_key, clients, EventState::Sub);
156+
}
157+
158+
void handle_punsubscribe_event(parser_t& pattern, parser_t& clients)
159+
{
160+
auto p_key = pattern->to_string();
161+
auto itr = pwatchers_.find(p_key);
162+
assert(itr != pwatchers_.end());
163+
itr->second(p_key, clients, EventState::Unsub);
164+
pwatchers_.erase(itr);
165+
}
166+
167+
void handle_unsubscribe_event(parser_t& channel, parser_t& clients)
168+
{
169+
auto ch_key = channel->to_string();
170+
auto itr = watchers_.find(ch_key);
171+
assert(itr != watchers_.end());
172+
itr->second(ch_key, clients, EventState::Unsub);
173+
watchers_.erase(itr);
174+
}
175+
176+
void handle_pmessage_event(parser_t& pattern, parser_t& channel, parser_t& value)
177+
{
178+
auto itr = pwatchers_.find(pattern->to_string());
179+
assert(itr != pwatchers_.end());
180+
itr->second(channel->to_string(), value, EventState::Stream);
181+
}
182+
183+
void handle_event(parser_t&& request)
184+
{
185+
assert(request->type() == async_redis::parser::RespType::Arr);
186+
187+
auto& event = static_cast<async_redis::parser::array_parser&>(*request);
188+
189+
assert(event.size() >= 3);
190+
191+
string type = event.nth(0)->to_string();
192+
193+
if (type == "message")
194+
return handle_message_event(event.nth(1), event.nth(2));
195+
else if (type == "pmessage")
196+
return handle_pmessage_event(event.nth(1), event.nth(2), event.nth(3));
197+
else if (type == "subscribe")
198+
return handle_subscribe_event(event.nth(1), event.nth(2));
199+
else if (type == "unsubscribe")
200+
return handle_unsubscribe_event(event.nth(1), event.nth(2));
201+
else if (type == "psubscribe")
202+
return handle_psubscribe_event(event.nth(1), event.nth(2));
203+
else if (type == "punsubscribe")
204+
return handle_punsubscribe_event(event.nth(1), event.nth(2));
205+
206+
assert(false);
207+
}
208+
209+
void report_disconnect()
210+
{
211+
//Swap it! cause if we call this->disconnect inside the functors
212+
//then it will be freeing the stackframes of functions!
213+
decltype(watchers_) t1, t2;
214+
t1.swap(watchers_);
215+
t2.swap(pwatchers_);
216+
217+
string str;
218+
219+
for(auto &w : t1)
220+
w.second(str, nullptr, EventState::Disconnected);
221+
222+
for(auto &w : t2)
223+
w.second(str, nullptr, EventState::Disconnected);
224+
225+
disconnect();
226+
}
227+
228+
void stream_received(ssize_t len)
229+
{
230+
if (len == 0)
231+
return report_disconnect();
232+
233+
ssize_t acc = 0;
234+
while (acc < len)
235+
{
236+
bool is_finished = false;
237+
acc += ParserPolicy(parser_).append_chunk(this->data_ + acc, len - acc, is_finished);
238+
239+
if (!is_finished)
240+
break;
241+
242+
{ // pass the parser and be done with it
243+
parser_t event;
244+
std::swap(event, parser_);
245+
246+
handle_event(std::move(event));
247+
}
248+
}
249+
250+
// if (!(watchers_.size() || pwatchers_.size()))
251+
if (!watchers_.size() && !pwatchers_.size()) {
252+
is_watching_ = false;
253+
return;
254+
}
255+
256+
this->socket_->async_read(
257+
this->data_,
258+
this->max_data_size,
259+
std::bind(
260+
&monitor::stream_received,
261+
this,
262+
std::placeholders::_1
263+
)
264+
);
265+
}
266+
267+
private:
268+
parser_t parser_;
269+
std::unordered_map<std::string, watcher_cb_t> watchers_;
270+
std::unordered_map<std::string, watcher_cb_t> pwatchers_;
271+
272+
std::unique_ptr<SocketType> socket_;
273+
InputOutputHandler &io_;
274+
enum { max_data_size = 1024 };
275+
char data_[max_data_size];
276+
bool is_watching_ = false;
277+
};
278+
279+
}
280+
}

0 commit comments

Comments
 (0)