Skip to content
This repository was archived by the owner on Apr 6, 2019. It is now read-only.

Commit 5fab812

Browse files
committed
add pipelining support
1 parent 50cad5a commit 5fab812

File tree

9 files changed

+101
-21
lines changed

9 files changed

+101
-21
lines changed

README.md

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
# cpp_redis
22
cpp_redis is C++11 Asynchronous Redis Client.
33

4-
Network is based on raw sockets API. Thus, the library is really lightweight.
4+
Network is based on raw sockets API, making the library really lightweight.
5+
Commands pipelining is supported.
56

67
## Requirements
78
* C++11
9+
* Mac or Linux (no support for Windows platforms)
810

911
## Compiling
1012
The library uses `cmake`. In order to build the library, follow these steps:
@@ -52,11 +54,19 @@ Throws redis_error if client is not connected to any server.
5254
#### bool is_connected(void)
5355
Returns whether the client is connected or not.
5456

55-
#### void send(const std::vector<std::string>& redis_cmd, const reply_callback& callback = nullptr)
57+
#### redis_client& send(const std::vector<std::string>& redis_cmd, const reply_callback& callback = nullptr)
5658
Send a command and set the callback which has to be called when the reply has been received.
5759
If `nullptr` is passed as callback, command is executed and no callback will be called.
5860
Reply callback is an `std::function<void(reply&)>`.
5961

62+
The command is not effectively sent immediately, but stored inside an internal buffer until `commit()` is called.
63+
64+
### redis_client& commit(void)
65+
Send all the commands that have been stored by calling `send()` since the last `commit()` call to the redis server.
66+
67+
That is, pipelining is supported in a very simple and efficient way: `client.send(...).send(...).send(...).commit()` will send the 3 commands at once (instead of sending 3 network requests, one for each command, as it would have been done without pipelining).
68+
69+
6070
### Example
6171

6272
```cpp
@@ -88,6 +98,7 @@ main(void) {
8898
client.send({"GET", "hello"}, [] (cpp_redis::reply& reply) {
8999
std::cout << reply.as_string() << std::endl;
90100
});
101+
client.commit();
91102

92103
signal(SIGINT, &sigint_handler);
93104
while (not should_exit);
@@ -114,20 +125,33 @@ Throws redis_error if client is not connected to any server.
114125
#### bool is_connected(void)
115126
Returns whether the client is connected or not.
116127
117-
#### void subscribe(const std::string& channel, const subscribe_callback& callback)
128+
#### redis_subscriber& subscribe(const std::string& channel, const subscribe_callback& callback)
118129
Subscribe to the given channel and call subscribe_callback each time a message is published in this channel.
119130
subscribe_callback is an `std::function<void(const std::string&, const std::string&)>`.
120131
121-
#### void psubscribe(const std::string& pattern, const subscribe_callback& callback)
132+
The command is not effectively sent immediately, but stored inside an internal buffer until `commit()` is called.
133+
134+
#### redis_subscriber& psubscribe(const std::string& pattern, const subscribe_callback& callback)
122135
PSubscribe to the given pattern and call subscribe_callback each time a message is published in a channel matching the pattern.
123136
subscribe_callback is an `std::function<void(const std::string&, const std::string&)>`.
124137
125-
#### void unsubscribe(const std::string& channel)
138+
The command is not effectively sent immediately, but stored inside an internal buffer until `commit()` is called.
139+
140+
#### redis_subscriber& unsubscribe(const std::string& channel)
126141
Unsubscribe from the given channel.
127142
128-
#### void punsubscribe(const std::string& pattern)
143+
The command is not effectively sent immediately, but stored inside an internal buffer until `commit()` is called.
144+
145+
#### redis_subscriber& punsubscribe(const std::string& pattern)
129146
Unsubscribe from the given pattern.
130147
148+
The command is not effectively sent immediately, but stored inside an internal buffer until `commit()` is called.
149+
150+
### redis_subscriber& commit(void)
151+
Send all the commands that have been stored by calling `send()` since the last `commit()` call to the redis server.
152+
153+
That is, pipelining is supported in a very simple and efficient way: `sub.subscribe(...).psubscribe(...).unsubscribe(...).commit()` will send the 3 commands at once (instead of sending 3 network requests, one for each command, as it would have been done without pipelining).
154+
131155
### Example
132156
133157
```cpp
@@ -159,6 +183,7 @@ main(void) {
159183
sub.psubscribe("*", [] (const std::string& chan, const std::string& msg) {
160184
std::cout << "PMESSAGE " << chan << ": " << msg << std::endl;
161185
});
186+
sub.commit();
162187
163188
signal(SIGINT, &sigint_handler);
164189
while (not should_exit);

examples/redis_client.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ main(void) {
2626
client.send({"GET", "hello"}, [] (cpp_redis::reply& reply) {
2727
std::cout << reply.as_string() << std::endl;
2828
});
29+
client.commit();
2930

3031
signal(SIGINT, &sigint_handler);
3132
while (not should_exit);

examples/redis_subscriber.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ main(void) {
2626
sub.psubscribe("*", [] (const std::string& chan, const std::string& msg) {
2727
std::cout << "PMESSAGE " << chan << ": " << msg << std::endl;
2828
});
29+
sub.commit();
2930

3031
signal(SIGINT, &sigint_handler);
3132
while (not should_exit);

includes/cpp_redis/network/redis_connection.hpp

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,10 @@ class redis_connection {
3333
bool is_connected(void);
3434

3535
//! send cmd
36-
void send(const std::vector<std::string>& redis_cmd);
36+
redis_connection& send(const std::vector<std::string>& redis_cmd);
37+
38+
//! commit pipelined transaction
39+
redis_connection& commit(void);
3740

3841
private:
3942
//! receive & disconnection handlers
@@ -54,6 +57,12 @@ class redis_connection {
5457

5558
//! reply builder
5659
builders::reply_builder m_builder;
60+
61+
//! internal buffer used for pipelining
62+
std::string m_buffer;
63+
64+
//! protect internal buffer against race conditions
65+
std::mutex m_buffer_mutex;
5766
};
5867

5968
} //! network

includes/cpp_redis/redis_client.hpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,10 @@ class redis_client {
3030

3131
//! send cmd
3232
typedef std::function<void(reply&)> reply_callback_t;
33-
void send(const std::vector<std::string>& redis_cmd, const reply_callback_t& callback = nullptr);
33+
redis_client& send(const std::vector<std::string>& redis_cmd, const reply_callback_t& callback = nullptr);
34+
35+
//! commit pipelined transaction
36+
redis_client& commit(void);
3437

3538
private:
3639
//! receive & disconnection handlers

includes/cpp_redis/redis_subscriber.hpp

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,13 @@ class redis_subscriber {
3030

3131
//! subscribe - unsubscribe
3232
typedef std::function<void(const std::string&, const std::string&)> subscribe_callback_t;
33-
void subscribe(const std::string& channel, const subscribe_callback_t& callback);
34-
void psubscribe(const std::string& pattern, const subscribe_callback_t& callback);
35-
void unsubscribe(const std::string& channel);
36-
void punsubscribe(const std::string& pattern);
33+
redis_subscriber& subscribe(const std::string& channel, const subscribe_callback_t& callback);
34+
redis_subscriber& psubscribe(const std::string& pattern, const subscribe_callback_t& callback);
35+
redis_subscriber& unsubscribe(const std::string& channel);
36+
redis_subscriber& punsubscribe(const std::string& pattern);
37+
38+
//! commit pipelined transaction
39+
redis_subscriber& commit(void);
3740

3841
private:
3942
void connection_receive_handler(network::redis_connection&, reply& reply);

sources/network/redis_connection.cpp

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,22 @@ redis_connection::build_command(const std::vector<std::string>& redis_cmd) {
4747
return cmd;
4848
}
4949

50-
void
50+
redis_connection&
5151
redis_connection::send(const std::vector<std::string>& redis_cmd) {
52-
m_client.send(build_command(redis_cmd));
52+
std::lock_guard<std::mutex> lock(m_buffer_mutex);
53+
m_buffer += build_command(redis_cmd);
54+
55+
return *this;
56+
}
57+
58+
//! commit pipelined transaction
59+
redis_connection&
60+
redis_connection::commit(void) {
61+
std::lock_guard<std::mutex> lock(m_buffer_mutex);
62+
m_client.send(m_buffer);
63+
m_buffer.clear();
64+
65+
return *this;
5366
}
5467

5568
bool

sources/redis_client.cpp

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,22 @@ redis_client::is_connected(void) {
2929
return m_client.is_connected();
3030
}
3131

32-
void
32+
redis_client&
3333
redis_client::send(const std::vector<std::string>& redis_cmd, const reply_callback_t& callback) {
3434
m_client.send(redis_cmd);
3535

3636
std::lock_guard<std::mutex> lock(m_callbacks_mutex);
3737
m_callbacks.push(callback);
38+
39+
return *this;
40+
}
41+
42+
//! commit pipelined transaction
43+
redis_client&
44+
redis_client::commit(void) {
45+
m_client.commit();
46+
47+
return *this;
3848
}
3949

4050
void

sources/redis_subscriber.cpp

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,44 +30,59 @@ redis_subscriber::is_connected(void) {
3030
return m_client.is_connected();
3131
}
3232

33-
void
33+
redis_subscriber&
3434
redis_subscriber::subscribe(const std::string& channel, const subscribe_callback_t& callback) {
3535
std::lock_guard<std::mutex> lock(m_subscribed_channels_mutex);
3636

3737
m_subscribed_channels[channel] = callback;
3838
m_client.send({ "SUBSCRIBE", channel });
39+
40+
return *this;
3941
}
4042

41-
void
43+
redis_subscriber&
4244
redis_subscriber::psubscribe(const std::string& pattern, const subscribe_callback_t& callback) {
4345
std::lock_guard<std::mutex> lock(m_psubscribed_channels_mutex);
4446

4547
m_psubscribed_channels[pattern] = callback;
4648
m_client.send({ "PSUBSCRIBE", pattern });
49+
50+
return *this;
4751
}
4852

49-
void
53+
redis_subscriber&
5054
redis_subscriber::unsubscribe(const std::string& channel) {
5155
std::lock_guard<std::mutex> lock(m_subscribed_channels_mutex);
5256

5357
auto it = m_subscribed_channels.find(channel);
5458
if (it == m_subscribed_channels.end())
55-
return ;
59+
return *this;
5660

5761
m_client.send({ "UNSUBSCRIBE", channel });
5862
m_subscribed_channels.erase(it);
63+
64+
return *this;
5965
}
6066

61-
void
67+
redis_subscriber&
6268
redis_subscriber::punsubscribe(const std::string& pattern) {
6369
std::lock_guard<std::mutex> lock(m_psubscribed_channels_mutex);
6470

6571
auto it = m_psubscribed_channels.find(pattern);
6672
if (it == m_psubscribed_channels.end())
67-
return ;
73+
return *this;
6874

6975
m_client.send({ "PUNSUBSCRIBE", pattern });
7076
m_psubscribed_channels.erase(it);
77+
78+
return *this;
79+
}
80+
81+
redis_subscriber&
82+
redis_subscriber::commit(void) {
83+
m_client.commit();
84+
85+
return *this;
7186
}
7287

7388
void

0 commit comments

Comments
 (0)