Skip to content
This repository was archived by the owner on Sep 9, 2025. It is now read-only.

Commit 49203d3

Browse files
author
hamidr
committed
Separation of signatures from impls
1 parent debb11a commit 49203d3

18 files changed

+372
-510
lines changed

CMakeLists.txt

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,17 +28,30 @@ include_directories("/usr/local/include")
2828
link_directories("/usr/lib")
2929
link_directories("/usr/local/lib")
3030

31-
add_library(event_loop
31+
32+
add_library(event_loop
3233
${PROJECT_SOURCE_DIR}/event_loop/event_loop_ev.cpp)
3334

34-
add_library(parser
35+
add_library(network
36+
${PROJECT_SOURCE_DIR}/network/async_socket.cpp
37+
${PROJECT_SOURCE_DIR}/network/unix_socket.cpp
38+
${PROJECT_SOURCE_DIR}/network/tcp_socket.cpp)
39+
40+
add_library(parser
41+
${PROJECT_SOURCE_DIR}/parser/base_resp_parser.cpp
3542
${PROJECT_SOURCE_DIR}/parser/array_parser.cpp
3643
${PROJECT_SOURCE_DIR}/parser/bulk_string_parser.cpp
3744
${PROJECT_SOURCE_DIR}/parser/error_parser.cpp
3845
${PROJECT_SOURCE_DIR}/parser/number_parser.cpp
39-
${PROJECT_SOURCE_DIR}/parser/redis_response.cpp
4046
${PROJECT_SOURCE_DIR}/parser/simple_string_parser.cpp)
4147

48+
49+
## Compiler flags
50+
if(CMAKE_COMPILER_IS_GNUCXX)
51+
set(CMAKE_CXX_FLAGS "-O2") ## Optimize
52+
set(CMAKE_EXE_LINKER_FLAGS "-s") ## Strip binary
53+
endif()
54+
4255
target_link_libraries(event_loop ev)
4356

4457
install(TARGETS event_loop
@@ -52,5 +65,5 @@ install(TARGETS parser
5265

5366
install(DIRECTORY ${PROJECT_INCLUDE_DIR}/ DESTINATION /usr/local/include)
5467

55-
add_executable (test ${CMAKE_SOURCE_DIR}/test/main.cpp)
56-
target_link_libraries(test parser event_loop ev)
68+
add_executable (a1.out ${CMAKE_SOURCE_DIR}/test/main.cpp)
69+
target_link_libraries(a1.out parser event_loop network)

includes/connection.hpp

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
#include <memory>
66
#include <tuple>
77

8-
#include <parser/redis_response.h>
8+
#include <parser/base_resp_parser.h>
99
#include <network/tcp_socket.hpp>
1010
#include <network/unix_socket.hpp>
1111

@@ -18,7 +18,7 @@ namespace async_redis
1818
using unix_socket = network::unix_socket;
1919

2020
public:
21-
using parser_t = parser::redis_response::parser;
21+
using parser_t = parser::base_resp_parser::parser;
2222
using reply_cb_t = std::function<void (parser_t)>;
2323

2424
connection(event_loop::event_loop_ev& event_loop)
@@ -49,19 +49,23 @@ namespace async_redis
4949

5050
void disconnect() {
5151
socket_->close();
52+
//TODO: check the policy! Should we free queue or retry again?
5253
decltype(req_queue_) free_me;
5354
free_me.swap(req_queue_);
5455
}
5556

5657
bool pipelined_send(std::string&& pipelined_cmds, std::vector<reply_cb_t>&& callbacks)
5758
{
59+
if (!is_connected())
60+
return false;
61+
5862
return
5963
socket_->async_write(pipelined_cmds, [this, cbs = std::move(callbacks)](ssize_t sent_chunk_len) {
6064
if (sent_chunk_len == 0)
6165
return disconnect();
6266

6367
if (!req_queue_.size() && cbs.size())
64-
socket_->async_read(data_, max_data_size, std::bind(&connection::reply_received, this, std::placeholders::_1));
68+
do_read();
6569

6670
for(auto &&cb : cbs)
6771
req_queue_.emplace(std::move(cb), nullptr);
@@ -70,6 +74,9 @@ namespace async_redis
7074

7175
bool send(const std::string&& command, const reply_cb_t& reply_cb)
7276
{
77+
if (!is_connected())
78+
return false;
79+
7380
bool read_it = !req_queue_.size();
7481
req_queue_.emplace(reply_cb, nullptr);
7582

@@ -79,11 +86,16 @@ namespace async_redis
7986
return disconnect();
8087

8188
if (read_it)
82-
socket_->async_read(data_, max_data_size, std::bind(&connection::reply_received, this, std::placeholders::_1));
89+
do_read();
8390
});
8491
}
8592

86-
protected:
93+
private:
94+
inline
95+
void do_read() {
96+
socket_->async_read(data_, max_data_size, std::bind(&connection::reply_received, this, std::placeholders::_1));
97+
}
98+
8799
void reply_received(ssize_t len)
88100
{
89101
if (len == 0)
@@ -98,7 +110,7 @@ namespace async_redis
98110
auto &parser = std::get<1>(request);
99111

100112
bool is_finished = false;
101-
acc += parser::redis_response::append_chunk(parser, data_ + acc, len - acc, is_finished);
113+
acc += parser::base_resp_parser::append_chunk(parser, data_ + acc, len - acc, is_finished);
102114

103115
if (!is_finished)
104116
break;
@@ -108,7 +120,7 @@ namespace async_redis
108120
}
109121

110122
if (req_queue_.size())
111-
socket_->async_read(data_, max_data_size, std::bind(&connection::reply_received, this, std::placeholders::_1));
123+
do_read();
112124
}
113125

114126
private:

includes/monitor.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ namespace async_redis
2121
Disconnected
2222
};
2323

24-
using parser_t = parser::redis_response::parser;
24+
using parser_t = parser::base_resp_parser::parser;
2525
using watcher_cb_t = std::function<void (const string&, parser_t, EventState)>;
2626

2727
monitor(event_loop::event_loop_ev &event_loop)
@@ -240,7 +240,7 @@ namespace async_redis
240240
while (acc < len)
241241
{
242242
bool is_finished = false;
243-
acc += parser::redis_response::append_chunk(parser_, data_ + acc, len - acc, is_finished);
243+
acc += parser::base_resp_parser::append_chunk(parser_, data_ + acc, len - acc, is_finished);
244244

245245
if (!is_finished)
246246
break;

includes/network/async_socket.hpp

Lines changed: 18 additions & 138 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,13 @@
11
#pragma once
22

3-
43
#include <sys/socket.h>
5-
#include <sys/un.h>
6-
#include <errno.h>
7-
#include <sys/fcntl.h> // fcntl
8-
#include <unistd.h> // close
9-
#include <netinet/in.h>
10-
#include <arpa/inet.h>
114

125
#include <string>
136
#include <event_loop/event_loop_ev.h>
147

158
namespace async_redis {
169
namespace network
1710
{
18-
using socket_t = struct sockaddr;
1911
using std::string;
2012

2113
class socket_excetion : std::exception {};
@@ -26,120 +18,32 @@ namespace async_redis {
2618
class async_socket
2719
{
2820
public:
21+
using socket_t = struct sockaddr;
2922

3023
using socket_identifier_t = event_loop::event_loop_ev::socket_identifier_t;
3124
using recv_cb_t = std::function<void (ssize_t)>;
3225
using ready_cb_t = std::function<void (ssize_t)>;
3326
using connect_handler_t = std::function<void (bool)>;
3427

35-
async_socket(event_loop::event_loop_ev& io)
36-
: io_(io)
37-
{ }
38-
39-
~async_socket() {
40-
close();
41-
}
42-
43-
inline bool is_valid() {
44-
return fd_ != -1;
45-
}
46-
47-
inline ssize_t send(const string& data) {
48-
return send(data.data(), data.size());
49-
}
50-
51-
inline ssize_t send(const char *data, size_t len) {
52-
return ::send(fd_, data, len, 0);
53-
}
54-
55-
inline ssize_t receive(char *data, size_t len) {
56-
return ::recv(fd_, data, len, 0);
57-
}
58-
59-
inline bool listen(int backlog = 0) {
60-
return ::listen(fd_, backlog) == 0;
61-
}
62-
63-
inline int accept() {
64-
return ::accept(fd_, nullptr, nullptr);
65-
}
66-
67-
bool close()
68-
{
69-
if (!is_connected_)
70-
return true;
71-
72-
if(id_)
73-
io_.unwatch(id_);
74-
75-
auto res = ::close(fd_) == 0;
76-
is_connected_ = false;
77-
fd_ = -1;
78-
return res;
79-
}
80-
81-
bool async_write(const string& data, const ready_cb_t& cb)
82-
{
83-
if (!is_connected() || !data.size())
84-
return false;
85-
86-
io_.async_write(id_, [this, data, cb]() -> void {
87-
auto sent_chunk = send(data);
88-
89-
if(sent_chunk == 0)
90-
close();
91-
92-
if (sent_chunk < data.size() && sent_chunk != -1) {
93-
async_write(data.substr(sent_chunk, data.size()), cb);
94-
return;
95-
}
96-
97-
cb(sent_chunk);
98-
});
99-
100-
return true;
101-
}
28+
async_socket(event_loop::event_loop_ev& io);
10229

103-
bool async_read(char *buffer, int max_len, const recv_cb_t& cb)
104-
{
105-
if (!is_connected())
106-
return false;
30+
~async_socket();
10731

108-
io_.async_read(id_, [&, buffer, max_len, cb]() -> void {
109-
auto l = receive(buffer, max_len);
110-
if (l == 0)
111-
close();
32+
bool is_valid();
33+
ssize_t send(const string& data);
34+
ssize_t send(const char *data, size_t len);
35+
ssize_t receive(char *data, size_t len);
36+
bool listen(int backlog = 0);
37+
int accept();
38+
bool close();
39+
bool async_write(const string& data, const ready_cb_t& cb);
40+
bool async_read(char *buffer, int max_len, const recv_cb_t& cb);
41+
void async_accept(const std::function<void(std::shared_ptr<async_socket>)>& cb);
11242

113-
cb(l);
114-
});
115-
116-
return true;
117-
}
118-
119-
void async_accept(const std::function<void(std::shared_ptr<async_socket>)>& cb)
120-
{
121-
return io_.async_read(id_, [this, cb]() {
122-
int fd = this->accept();
123-
auto s = std::make_shared<async_socket>(io_);
124-
s->set_fd_socket(fd);
125-
cb(s);
126-
this->async_accept(cb);
127-
});
128-
}
129-
130-
inline
131-
bool is_connected() const {
132-
return is_connected_;
133-
}
43+
bool is_connected() const;
13444

13545
protected:
136-
void set_fd_socket(int fd)
137-
{
138-
fd_ = fd;
139-
is_connected_ = true;
140-
141-
id_ = io_.watch(fd_);
142-
}
46+
void set_fd_socket(int fd);
14347

14448
template <typename SocketType, typename... Args>
14549
void async_connect(int timeout, connect_handler_t handler, Args... args)
@@ -156,33 +60,9 @@ namespace async_redis {
15660
});
15761
}
15862

159-
160-
void create_socket(int domain) {
161-
if (-1 == (fd_ = socket(domain, SOCK_STREAM, 0)))
162-
throw connect_socket_exception();
163-
164-
if (-1 == fcntl(fd_, F_SETFL, fcntl(fd_, F_GETFL) | O_NONBLOCK))
165-
throw nonblocking_socket_exception();
166-
167-
id_ = io_.watch(fd_);
168-
}
169-
170-
//TODO: well i guess retry with create_socket in these functions
171-
int connect_to(socket_t* socket_addr, int len) {
172-
int ret = ::connect(fd_, socket_addr, len);
173-
if (!ret)
174-
is_connected_ = true;
175-
176-
return ret;
177-
}
178-
179-
int bind_to(socket_t* socket_addr, int len) {
180-
int b = ::bind(fd_, socket_addr, len);
181-
if (!b)
182-
is_connected_ = true;
183-
184-
return b;
185-
}
63+
void create_socket(int domain);
64+
int connect_to(socket_t* socket_addr, int len);
65+
int bind_to(socket_t* socket_addr, int len);
18666

18767
private:
18868
bool is_connected_ = false;

includes/network/tcp_socket.hpp

Lines changed: 4 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -7,42 +7,12 @@ namespace async_redis {
77
{
88
class tcp_socket : public async_socket
99
{
10-
1110
public:
12-
tcp_socket(event_loop::event_loop_ev& io)
13-
: async_socket(io)
14-
{
15-
this->create_socket(AF_INET);
16-
}
17-
18-
inline
19-
void async_connect(const string& ip, int port, connect_handler_t handler)
20-
{
21-
async_socket::template async_connect<tcp_socket>(0, handler, ip, port);
22-
}
23-
24-
bool bind(const string& host, int port)
25-
{
26-
struct sockaddr_in addr = {0};
27-
addr.sin_family = AF_INET;
28-
addr.sin_port = ::htons(port);
29-
addr.sin_addr.s_addr = inet_addr(host.data());
30-
31-
32-
return this->bind_to((socket_t *)&addr, sizeof(addr)) == 0;
33-
}
34-
35-
int connect(const string& host, int port)
36-
{
37-
//TODO:
38-
// setsockopt (fd_, SOL_SOCKET, SO_REUSEADDR, &on, sizeof (on));
39-
struct sockaddr_in addr = {0};
40-
addr.sin_family = AF_INET;
41-
addr.sin_port = ::htons(port);
42-
addr.sin_addr.s_addr = inet_addr(host.data());
11+
tcp_socket(event_loop::event_loop_ev& io);
4312

44-
return this->connect_to((socket_t *)&addr, sizeof(addr));
45-
}
13+
void async_connect(const string& ip, int port, connect_handler_t handler);
14+
bool bind(const string& host, int port);
15+
int connect(const string& host, int port);
4616
};
4717
}
4818
}

0 commit comments

Comments
 (0)