Skip to content
This repository was archived by the owner on Apr 6, 2019. It is now read-only.

Commit 49d766d

Browse files
committed
patch issue concerning clients destroyed before the end of io_service callback execution. Handle subscribe/psubscribe ackowledgement reply. Patch randomly failing test (by using the new ackowledgement feature).
1 parent e439f42 commit 49d766d

File tree

9 files changed

+408
-166
lines changed

9 files changed

+408
-166
lines changed

includes/cpp_redis/network/unix/io_service.hpp

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#pragma once
22

33
#include <atomic>
4+
#include <condition_variable>
45
#include <mutex>
56
#include <thread>
67
#include <unordered_map>
@@ -65,6 +66,9 @@ class io_service {
6566
std::vector<char> write_buffer;
6667
std::size_t write_size;
6768
write_callback_t write_callback;
69+
70+
std::atomic_bool callback_running;
71+
std::condition_variable_any callback_notification;
6872
};
6973

7074
private:
@@ -79,9 +83,8 @@ class io_service {
7983
unsigned int init_sets(struct pollfd* fds);
8084
void process_sets(struct pollfd* fds, unsigned int nfds);
8185

82-
typedef std::function<void()> callback_t;
83-
callback_t read_fd(int fd);
84-
callback_t write_fd(int fd);
86+
void read_fd(int fd);
87+
void write_fd(int fd);
8588

8689
private:
8790
//! whether the worker should terminate or not

includes/cpp_redis/redis_subscriber.hpp

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,28 +29,38 @@ class redis_subscriber {
2929

3030
//! subscribe - unsubscribe
3131
typedef std::function<void(const std::string&, const std::string&)> subscribe_callback_t;
32-
redis_subscriber& subscribe(const std::string& channel, const subscribe_callback_t& callback);
33-
redis_subscriber& psubscribe(const std::string& pattern, const subscribe_callback_t& callback);
32+
typedef std::function<void(int)> acknowledgement_callback_t;
33+
redis_subscriber& subscribe(const std::string& channel, const subscribe_callback_t& callback, const acknowledgement_callback_t& acknowledgement_callback = nullptr);
34+
redis_subscriber& psubscribe(const std::string& pattern, const subscribe_callback_t& callback, const acknowledgement_callback_t& acknowledgement_callback = nullptr);
3435
redis_subscriber& unsubscribe(const std::string& channel);
3536
redis_subscriber& punsubscribe(const std::string& pattern);
3637

3738
//! commit pipelined transaction
3839
redis_subscriber& commit(void);
3940

41+
private:
42+
struct callback_holder {
43+
subscribe_callback_t subscribe_callback;
44+
acknowledgement_callback_t acknowledgement_callback;
45+
};
46+
4047
private:
4148
void connection_receive_handler(network::redis_connection&, reply& reply);
4249
void connection_disconnection_handler(network::redis_connection&);
4350

51+
void handle_acknowledgement_reply(const std::vector<reply>& reply);
4452
void handle_subscribe_reply(const std::vector<reply>& reply);
4553
void handle_psubscribe_reply(const std::vector<reply>& reply);
4654

55+
void call_acknowledgement_callback(const std::string& channel, const std::map<std::string, callback_holder>& channels, std::mutex& channels_mtx, int nb_chans);
56+
4757
private:
4858
//! redis connection
4959
network::redis_connection m_client;
5060

5161
//! (p)subscribed channels and their associated channels
52-
std::map<std::string, subscribe_callback_t> m_subscribed_channels;
53-
std::map<std::string, subscribe_callback_t> m_psubscribed_channels;
62+
std::map<std::string, callback_holder> m_subscribed_channels;
63+
std::map<std::string, callback_holder> m_psubscribed_channels;
5464

5565
//! disconnection handler
5666
disconnection_handler_t m_disconnection_handler;

sources/network/redis_connection.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ redis_connection::redis_connection(const std::shared_ptr<io_service>& IO)
1313
}
1414

1515
redis_connection::~redis_connection(void) {
16+
m_client.disconnect();
1617
__CPP_REDIS_LOG(debug, "cpp_redis::network::redis_connection destroyed");
1718
}
1819

sources/network/unix/io_service.cpp

Lines changed: 39 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -72,16 +72,16 @@ io_service::init_sets(struct pollfd* fds) {
7272
return nfds;
7373
}
7474

75-
io_service::callback_t
75+
void
7676
io_service::read_fd(int fd) {
77-
std::lock_guard<std::recursive_mutex> lock(m_fds_mutex);
77+
std::unique_lock<std::recursive_mutex> lock(m_fds_mutex);
7878

7979
__CPP_REDIS_LOG(debug, "cpp_redis::network::io_service non-blocking read available for fd #" + std::to_string(fd));
8080

8181
auto fd_it = m_fds.find(fd);
8282
if (fd_it == m_fds.end()) {
8383
__CPP_REDIS_LOG(debug, "cpp_redis::network::io_service does not track fd #" + std::to_string(fd));
84-
return nullptr;
84+
return;
8585
}
8686

8787
auto& buffer = *fd_it->second.read_buffer;
@@ -97,26 +97,30 @@ io_service::read_fd(int fd) {
9797
buffer.resize(original_buffer_size);
9898
fd_it->second.disconnection_handler(*this);
9999
m_fds.erase(fd_it);
100-
101-
return nullptr;
102100
}
103101
else {
104102
buffer.resize(original_buffer_size + nb_bytes_read);
105103

106-
return std::bind(fd_it->second.read_callback, nb_bytes_read);
104+
__CPP_REDIS_LOG(debug, "cpp_redis::network::io_service calling read callback for fd #" + std::to_string(fd));
105+
106+
fd_it->second.callback_running = true;
107+
lock.unlock();
108+
fd_it->second.read_callback(nb_bytes_read);
109+
fd_it->second.callback_running = false;
110+
fd_it->second.callback_notification.notify_all();
107111
}
108112
}
109113

110-
io_service::callback_t
114+
void
111115
io_service::write_fd(int fd) {
112-
std::lock_guard<std::recursive_mutex> lock(m_fds_mutex);
116+
std::unique_lock<std::recursive_mutex> lock(m_fds_mutex);
113117

114118
__CPP_REDIS_LOG(debug, "cpp_redis::network::io_service non-blocking write available for fd #" + std::to_string(fd));
115119

116120
auto fd_it = m_fds.find(fd);
117121
if (fd_it == m_fds.end()) {
118122
__CPP_REDIS_LOG(debug, "cpp_redis::network::io_service does not track fd #" + std::to_string(fd));
119-
return nullptr;
123+
return;
120124
}
121125

122126
__CPP_REDIS_LOG(debug, "cpp_redis::network::io_service writing data for fd #" + std::to_string(fd));
@@ -127,11 +131,16 @@ io_service::write_fd(int fd) {
127131
__CPP_REDIS_LOG(error, "cpp_redis::network::io_service write error for fd #" + std::to_string(fd));
128132
fd_it->second.disconnection_handler(*this);
129133
m_fds.erase(fd_it);
134+
}
135+
else {
136+
__CPP_REDIS_LOG(debug, "cpp_redis::network::io_service calling write callback for fd #" + std::to_string(fd));
130137

131-
return nullptr;
138+
fd_it->second.callback_running = true;
139+
lock.unlock();
140+
fd_it->second.write_callback(nb_bytes_written);
141+
fd_it->second.callback_running = false;
142+
fd_it->second.callback_notification.notify_all();
132143
}
133-
else
134-
return std::bind(fd_it->second.write_callback, nb_bytes_written);
135144
}
136145

137146
void
@@ -153,20 +162,8 @@ io_service::process_sets(struct pollfd* fds, unsigned int nfds) {
153162
}
154163
}
155164

156-
for (int fd : fds_to_read) {
157-
auto callback = read_fd(fd);
158-
if (callback) {
159-
__CPP_REDIS_LOG(debug, "cpp_redis::network::io_service calling read callback for fd #" + std::to_string(fd));
160-
callback();
161-
}
162-
}
163-
for (int fd : fds_to_write) {
164-
auto callback = write_fd(fd);
165-
if (callback) {
166-
__CPP_REDIS_LOG(debug, "cpp_redis::network::io_service calling write callback for fd #" + std::to_string(fd));
167-
callback();
168-
}
169-
}
165+
for (int fd : fds_to_read) { read_fd(fd); }
166+
for (int fd : fds_to_write) { write_fd(fd); }
170167

171168
if (fds[0].revents & POLLIN) {
172169
char buf[1024];
@@ -210,8 +207,22 @@ io_service::track(int fd, const disconnection_handler_t& handler) {
210207

211208
void
212209
io_service::untrack(int fd) {
213-
std::lock_guard<std::recursive_mutex> lock(m_fds_mutex);
214-
m_fds.erase(fd);
210+
std::unique_lock<std::recursive_mutex> lock(m_fds_mutex);
211+
212+
__CPP_REDIS_LOG(debug, "cpp_redis::network::io_service requests to untrack fd #" + std::to_string(fd));
213+
214+
auto fd_it = m_fds.find(fd);
215+
if (fd_it == m_fds.end()) {
216+
__CPP_REDIS_LOG(debug, "cpp_redis::network::io_service does not track fd #" + std::to_string(fd));
217+
return;
218+
}
219+
220+
if (fd_it->second.callback_running) {
221+
__CPP_REDIS_LOG(debug, "cpp_redis::network::io_service waits for callbacks to complete before untracking fd #" + std::to_string(fd));
222+
fd_it->second.callback_notification.wait(lock, [=] { return !fd_it->second.callback_running; });
223+
}
224+
225+
m_fds.erase(fd_it);
215226

216227
__CPP_REDIS_LOG(debug, "cpp_redis::network::io_service now untracks fd #" + std::to_string(fd));
217228
}

sources/redis_client.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ redis_client::redis_client(const std::shared_ptr<network::io_service>& IO)
99
}
1010

1111
redis_client::~redis_client(void) {
12+
m_client.disconnect();
1213
__CPP_REDIS_LOG(debug, "cpp_redis::redis_client destroyed");
1314
}
1415

sources/redis_subscriber.cpp

Lines changed: 46 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ redis_subscriber::redis_subscriber(const std::shared_ptr<network::io_service>& I
1010
}
1111

1212
redis_subscriber::~redis_subscriber(void) {
13+
m_client.disconnect();
1314
__CPP_REDIS_LOG(debug, "cpp_redis::redis_subscriber destroyed");
1415
}
1516

@@ -40,23 +41,23 @@ redis_subscriber::is_connected(void) {
4041
}
4142

4243
redis_subscriber&
43-
redis_subscriber::subscribe(const std::string& channel, const subscribe_callback_t& callback) {
44+
redis_subscriber::subscribe(const std::string& channel, const subscribe_callback_t& callback, const acknowledgement_callback_t& acknowledgement_callback) {
4445
std::lock_guard<std::mutex> lock(m_subscribed_channels_mutex);
4546

4647
__CPP_REDIS_LOG(debug, "cpp_redis::redis_subscriber attemps to subscribe to channel " + channel);
47-
m_subscribed_channels[channel] = callback;
48+
m_subscribed_channels[channel] = {callback, acknowledgement_callback};
4849
m_client.send({"SUBSCRIBE", channel});
4950
__CPP_REDIS_LOG(info, "cpp_redis::redis_subscriber subscribed to channel " + channel);
5051

5152
return *this;
5253
}
5354

5455
redis_subscriber&
55-
redis_subscriber::psubscribe(const std::string& pattern, const subscribe_callback_t& callback) {
56+
redis_subscriber::psubscribe(const std::string& pattern, const subscribe_callback_t& callback, const acknowledgement_callback_t& acknowledgement_callback) {
5657
std::lock_guard<std::mutex> lock(m_psubscribed_channels_mutex);
5758

5859
__CPP_REDIS_LOG(debug, "cpp_redis::redis_subscriber attemps to psubscribe to channel " + pattern);
59-
m_psubscribed_channels[pattern] = callback;
60+
m_psubscribed_channels[pattern] = {callback, acknowledgement_callback};
6061
m_client.send({"PSUBSCRIBE", pattern});
6162
__CPP_REDIS_LOG(info, "cpp_redis::redis_subscriber psubscribed to channel " + pattern);
6263

@@ -114,6 +115,40 @@ redis_subscriber::commit(void) {
114115
return *this;
115116
}
116117

118+
void
119+
redis_subscriber::call_acknowledgement_callback(const std::string& channel, const std::map<std::string, callback_holder>& channels, std::mutex& channels_mtx, int nb_chans) {
120+
std::lock_guard<std::mutex> lock(channels_mtx);
121+
122+
auto it = channels.find(channel);
123+
if (it == channels.end())
124+
return;
125+
126+
if (it->second.acknowledgement_callback) {
127+
__CPP_REDIS_LOG(debug, "cpp_redis::redis_subscriber executes acknowledgement callback for channel " + channel);
128+
it->second.acknowledgement_callback(nb_chans);
129+
}
130+
}
131+
132+
void
133+
redis_subscriber::handle_acknowledgement_reply(const std::vector<reply>& reply) {
134+
if (reply.size() != 3)
135+
return;
136+
137+
const auto& title = reply[0];
138+
const auto& channel = reply[1];
139+
const auto& nb_chans = reply[2];
140+
141+
if (!title.is_string()
142+
|| !channel.is_string()
143+
|| !nb_chans.is_integer())
144+
return;
145+
146+
if (title.as_string() == "subscribe")
147+
call_acknowledgement_callback(channel.as_string(), m_subscribed_channels, m_subscribed_channels_mutex, nb_chans.as_integer());
148+
else if (title.as_string() == "psubscribe")
149+
call_acknowledgement_callback(channel.as_string(), m_psubscribed_channels, m_psubscribed_channels_mutex, nb_chans.as_integer());
150+
}
151+
117152
void
118153
redis_subscriber::handle_subscribe_reply(const std::vector<reply>& reply) {
119154
if (reply.size() != 3)
@@ -138,7 +173,7 @@ redis_subscriber::handle_subscribe_reply(const std::vector<reply>& reply) {
138173
return;
139174

140175
__CPP_REDIS_LOG(debug, "cpp_redis::redis_subscriber executes subscribe callback for channel " + channel.as_string());
141-
it->second(channel.as_string(), message.as_string());
176+
it->second.subscribe_callback(channel.as_string(), message.as_string());
142177
}
143178

144179
void
@@ -167,7 +202,7 @@ redis_subscriber::handle_psubscribe_reply(const std::vector<reply>& reply) {
167202
return;
168203

169204
__CPP_REDIS_LOG(debug, "cpp_redis::redis_subscriber executes psubscribe callback for channel " + channel.as_string());
170-
it->second(channel.as_string(), message.as_string());
205+
it->second.subscribe_callback(channel.as_string(), message.as_string());
171206
}
172207

173208
void
@@ -180,10 +215,13 @@ redis_subscriber::connection_receive_handler(network::redis_connection&, reply&
180215

181216
auto& array = reply.as_array();
182217

183-
//! Array size of 3 -> SUBSCRIBE
218+
//! Array size of 3 -> SUBSCRIBE if array[2] is a string
219+
//! Array size of 3 -> AKNOWLEDGEMENT if array[2] is an integer
184220
//! Array size of 4 -> PSUBSCRIBE
185221
//! Otherwise -> unexepcted reply
186-
if (array.size() == 3)
222+
if (array.size() == 3 && array[2].is_integer())
223+
handle_acknowledgement_reply(array);
224+
else if (array.size() == 3 && array[2].is_string())
187225
handle_subscribe_reply(array);
188226
else if (array.size() == 4)
189227
handle_psubscribe_reply(array);

tests/sources/main.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,15 @@
11
#include <gtest/gtest.h>
22

3+
//! For debugging purpose, uncomment
4+
// #include <memory>
5+
// #include <cpp_redis/cpp_redis>
6+
37
int
48
main(int argc, char** argv) {
9+
//! For debugging purpose, uncomment
10+
// cpp_redis::active_logger = std::unique_ptr<cpp_redis::logger>(new cpp_redis::logger(cpp_redis::logger::log_level::debug));
11+
512
::testing::InitGoogleTest(&argc, argv);
13+
614
return RUN_ALL_TESTS();
715
}

tests/sources/spec/redis_client_spec.cpp

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -234,28 +234,40 @@ TEST(RedisClient, MultipleSendPipeline) {
234234

235235
TEST(RedisClient, DisconnectionHandlerWithQuit) {
236236
cpp_redis::redis_client client;
237+
std::condition_variable cv;
237238

238239
std::atomic_bool disconnection_handler_called(false);
239240
client.connect("127.0.0.1", 6379, [&](cpp_redis::redis_client&) {
240241
disconnection_handler_called = true;
242+
cv.notify_all();
241243
});
242244

243245
client.send({"QUIT"});
244246
client.sync_commit();
245-
std::this_thread::sleep_for(std::chrono::seconds(1));
247+
248+
std::mutex mutex;
249+
std::unique_lock<std::mutex> lock(mutex);
250+
cv.wait_for(lock, std::chrono::seconds(2));
251+
246252
EXPECT_TRUE(disconnection_handler_called);
247253
}
248254

249255
TEST(RedisClient, DisconnectionHandlerWithoutQuit) {
250256
cpp_redis::redis_client client;
257+
std::condition_variable cv;
251258

252259
std::atomic_bool disconnection_handler_called(false);
253260
client.connect("127.0.0.1", 6379, [&](cpp_redis::redis_client&) {
254261
disconnection_handler_called = true;
262+
cv.notify_all();
255263
});
256264

257265
client.sync_commit();
258-
std::this_thread::sleep_for(std::chrono::seconds(1));
266+
267+
std::mutex mutex;
268+
std::unique_lock<std::mutex> lock(mutex);
269+
cv.wait_for(lock, std::chrono::seconds(2));
270+
259271
EXPECT_FALSE(disconnection_handler_called);
260272
}
261273

0 commit comments

Comments
 (0)