Skip to content

Commit ca8de2e

Browse files
committed
finsh start_server/echo_server
1 parent 6cd6f88 commit ca8de2e

File tree

9 files changed

+216
-18
lines changed

9 files changed

+216
-18
lines changed

CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ set(ASYNCIO_INC
3030
include/asyncio/callstack.h
3131
include/asyncio/open_connection.h
3232
include/asyncio/stream.h
33+
include/asyncio/start_server.h
34+
include/asyncio/addrinfo_guard.h
3335
)
3436

3537
include_directories(${CMAKE_SOURCE_DIR}/include)

include/asyncio/addrinfo_guard.h

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
//
2+
// Created by netcan on 2021/11/30.
3+
//
4+
5+
#ifndef ASYNCIO_ADDRINFO_GUARD_H
6+
#define ASYNCIO_ADDRINFO_GUARD_H
7+
#include <asyncio/asyncio_ns.h>
8+
#include <netdb.h>
9+
10+
ASYNCIO_NS_BEGIN
11+
struct AddrInfoGuard {
12+
AddrInfoGuard(addrinfo* info): info_(info) { }
13+
~AddrInfoGuard() { freeaddrinfo(info_); }
14+
private:
15+
addrinfo* info_{nullptr};
16+
};
17+
ASYNCIO_NS_END
18+
19+
#endif // ASYNCIO_ADDRINFO_GUARD_H

include/asyncio/open_connection.h

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,14 @@
66
#define ASYNCIO_OPEN_CONNECTION_H
77
#include <asyncio/asyncio_ns.h>
88
#include <asyncio/stream.h>
9+
#include <asyncio/addrinfo_guard.h>
910
#include <asyncio/selector/event.h>
1011
#include <exception>
1112
#include <asyncio/task.h>
1213
#include <fcntl.h>
1314
#include <sys/types.h>
1415
#include <sys/socket.h>
1516
#include <system_error>
16-
#include <netdb.h>
1717

1818
ASYNCIO_NS_BEGIN
1919
namespace detail {
@@ -36,37 +36,29 @@ Task<bool> connect(int fd, const sockaddr *addr, socklen_t len) noexcept {
3636
co_return result == 0;
3737
}
3838

39-
struct AddrInfoRAII {
40-
AddrInfoRAII(addrinfo* info): info_(info) { }
41-
~AddrInfoRAII() { freeaddrinfo(info_); }
42-
private:
43-
addrinfo* info_{nullptr};
44-
};
4539
}
4640

4741
Task<Stream> open_connection(std::string_view ip, uint16_t port) {
48-
addrinfo hints {
49-
.ai_family = AF_UNSPEC,
50-
.ai_socktype = SOCK_STREAM,
51-
};
42+
addrinfo hints { .ai_family = AF_UNSPEC, .ai_socktype = SOCK_STREAM };
5243
addrinfo *server_info {nullptr};
5344
auto service = std::to_string(port);
5445
// TODO: getaddrinfo is a blocking api
5546
if (int rv = getaddrinfo(ip.data(), service.c_str(), &hints, &server_info);
5647
rv != 0) {
5748
throw std::system_error(std::make_error_code(std::errc::address_not_available));
5849
}
59-
detail::AddrInfoRAII _i(server_info);
50+
AddrInfoGuard _i(server_info);
6051

6152
int sockfd = -1;
6253
for (auto p = server_info; p != nullptr; p = p->ai_next) {
63-
sockfd = -1;
6454
if ( (sockfd = socket(p->ai_family, p->ai_socktype | SOCK_NONBLOCK, p->ai_protocol)) == -1) {
6555
continue;
6656
}
67-
if (co_await detail::connect(sockfd, p->ai_addr, p->ai_addrlen) ) {
57+
if (co_await detail::connect(sockfd, p->ai_addr, p->ai_addrlen)) {
6858
break;
6959
}
60+
close(sockfd);
61+
sockfd = -1;
7062
}
7163
if (sockfd == -1) {
7264
throw std::system_error(std::make_error_code(std::errc::address_not_available));

include/asyncio/start_server.h

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
//
2+
// Created by netcan on 2021/11/30.
3+
//
4+
5+
#ifndef ASYNCIO_START_SERVER_H
6+
#define ASYNCIO_START_SERVER_H
7+
#include <asyncio/asyncio_ns.h>
8+
#include <asyncio/stream.h>
9+
#include <asyncio/addrinfo_guard.h>
10+
#include <asyncio/schedule_task.h>
11+
#include <list>
12+
#include <sys/types.h>
13+
14+
ASYNCIO_NS_BEGIN
15+
namespace concepts {
16+
template<typename CONNECT_CB>
17+
concept ConnectCb = requires(CONNECT_CB cb) {
18+
{ cb(std::declval<Stream>()) } -> concepts::Awaitable;
19+
};
20+
}
21+
22+
constexpr static size_t max_connect_count = 16;
23+
24+
template<concepts::ConnectCb CONNECT_CB>
25+
struct Server: NonCopyable {
26+
Server(CONNECT_CB cb, int fd): connect_cb_(cb), fd_(fd) {}
27+
Server(Server&& other): connect_cb_(other.connect_cb_),
28+
fd_{std::exchange(other.fd_, -1) } {}
29+
~Server() { close(); }
30+
31+
Task<void> serve_forever() {
32+
Event ev { .fd = fd_, .events = EPOLLIN };
33+
auto& loop = get_event_loop();
34+
std::list<Task<>> connected;
35+
while (true) {
36+
co_await loop.wait_event(ev);
37+
sockaddr_storage remoteaddr{};
38+
socklen_t addrlen = sizeof(remoteaddr);
39+
int clientfd = ::accept(fd_, reinterpret_cast<sockaddr*>(&remoteaddr), &addrlen);
40+
if (clientfd == -1) { continue; }
41+
connected.emplace_back(schedule_task(connect_cb_(Stream{clientfd, remoteaddr})));
42+
// garbage collect
43+
clean_up_connected(connected);
44+
}
45+
}
46+
47+
private:
48+
void clean_up_connected(std::list<Task<>>& connected) {
49+
if (connected.size() < 100) [[likely]] { return; }
50+
for (auto iter = connected.begin(); iter != connected.end(); ) {
51+
if (iter->done()) {
52+
iter = connected.erase(iter);
53+
} else {
54+
++iter;
55+
}
56+
}
57+
}
58+
59+
private:
60+
void close() {
61+
if (fd_ > 0) { ::close(fd_); }
62+
fd_ = -1;
63+
}
64+
65+
private:
66+
[[no_unique_address]] CONNECT_CB connect_cb_;
67+
int fd_{-1};
68+
};
69+
70+
template<concepts::ConnectCb CONNECT_CB>
71+
Task<Server<CONNECT_CB>> start_server(CONNECT_CB cb, std::string_view ip, uint16_t port) {
72+
addrinfo hints { .ai_family = AF_UNSPEC, .ai_socktype = SOCK_STREAM };
73+
addrinfo *server_info {nullptr};
74+
auto service = std::to_string(port);
75+
// TODO: getaddrinfo is a blocking api
76+
if (int rv = getaddrinfo(ip.data(), service.c_str(), &hints, &server_info);
77+
rv != 0) {
78+
throw std::system_error(std::make_error_code(std::errc::address_not_available));
79+
}
80+
AddrInfoGuard _i(server_info);
81+
82+
int serverfd = -1;
83+
for (auto p = server_info; p != nullptr; p = p->ai_next) {
84+
if ( (serverfd = socket(p->ai_family, p->ai_socktype | SOCK_NONBLOCK, p->ai_protocol)) == -1) {
85+
continue;
86+
}
87+
int yes = 1;
88+
// lose the pesky "address already in use" error message
89+
setsockopt(serverfd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
90+
if ( bind(serverfd, p->ai_addr, p->ai_addrlen) == 0) {
91+
break;
92+
}
93+
close(serverfd);
94+
serverfd = -1;
95+
}
96+
if (serverfd == -1) {
97+
throw std::system_error(std::make_error_code(std::errc::address_not_available));
98+
}
99+
100+
if (listen(serverfd, max_connect_count) == -1) {
101+
throw std::system_error(std::make_error_code(static_cast<std::errc>(errno)));
102+
}
103+
104+
co_return Server{cb, serverfd};
105+
}
106+
107+
ASYNCIO_NS_END
108+
109+
#endif // ASYNCIO_START_SERVER_H

include/asyncio/stream.h

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,23 @@
77
#include <asyncio/asyncio_ns.h>
88
#include <asyncio/noncopyable.h>
99
#include <asyncio/task.h>
10+
#include <netdb.h>
1011
#include <utility>
1112
#include <vector>
1213
#include <unistd.h>
1314

1415
ASYNCIO_NS_BEGIN
1516
struct Stream: NonCopyable {
1617
using Buffer = std::vector<char>;
17-
Stream(int fd): fd_(fd) {}
18-
Stream(Stream&& other): fd_{std::exchange(other.fd_, -1) } {}
18+
Stream(int fd): fd_(fd) {
19+
if (fd_ > 0) {
20+
socklen_t addrlen = sizeof(sock_info_);
21+
getsockname(fd_, reinterpret_cast<sockaddr*>(&sock_info_), &addrlen);
22+
}
23+
}
24+
Stream(int fd, const sockaddr_storage& sockinfo): fd_(fd), sock_info_(sockinfo) { }
25+
Stream(Stream&& other): fd_{std::exchange(other.fd_, -1) },
26+
sock_info_(other.sock_info_) { }
1927
~Stream() { close(); }
2028

2129
void close() {
@@ -51,6 +59,9 @@ struct Stream: NonCopyable {
5159
total_write += sz;
5260
}
5361
}
62+
const sockaddr_storage& get_sock_info() const {
63+
return sock_info_;
64+
}
5465

5566
private:
5667
Task<Buffer> read_until_eof() {
@@ -74,7 +85,25 @@ struct Stream: NonCopyable {
7485
}
7586
private:
7687
int fd_{-1};
88+
sockaddr_storage sock_info_{};
7789
constexpr static size_t chunk_size = 4096;
7890
};
91+
92+
93+
inline const void *get_in_addr(const sockaddr *sa) {
94+
if (sa->sa_family == AF_INET) {
95+
return &reinterpret_cast<const sockaddr_in*>(sa)->sin_addr;
96+
}
97+
return &reinterpret_cast<const sockaddr_in6*>(sa)->sin6_addr;
98+
}
99+
100+
uint16_t get_in_port(const sockaddr *sa) {
101+
if (sa->sa_family == AF_INET) {
102+
return ntohs(reinterpret_cast<const sockaddr_in*>(sa)->sin_port);
103+
}
104+
105+
return ntohs(reinterpret_cast<const sockaddr_in6*>(sa)->sin6_port);
106+
}
107+
79108
ASYNCIO_NS_END
80109
#endif // ASYNCIO_STREAM_H

include/asyncio/task.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ struct Task: private NonCopyable {
141141
};
142142

143143
bool valid() const { return handle_ != nullptr; }
144+
bool done() const { return handle_.done(); }
144145
explicit Task(coro_handle h) noexcept: handle_(h) {}
145146
private:
146147
coro_handle handle_;

test/st/CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
add_executable(hello_world hello_world.cpp)
22
add_executable(echo_client echo_client.cpp)
3+
add_executable(echo_server echo_server.cpp)
4+
35
target_link_libraries(hello_world asyncio)
46
target_link_libraries(echo_client asyncio)
7+
target_link_libraries(echo_server asyncio)

test/st/echo_client.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,11 @@ using asyncio::Stream;
99
Task<void> tcp_echo_client(std::string_view message) {
1010
auto stream = co_await asyncio::open_connection("127.0.0.1", 8888);
1111

12-
fmt::print("Send: {}!\n", message);
12+
fmt::print("Send: '{}'\n", message);
1313
co_await stream.write(Stream::Buffer(message.begin(), message.end()));
1414

1515
auto data = co_await stream.read(100);
16-
fmt::print("Received: {}!\n", data.data());
16+
fmt::print("Received: '{}'\n", data.data());
1717

1818
fmt::print("Close the connection\n");
1919
stream.close();

test/st/echo_server.cpp

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
//
2+
// Created by netcan on 2021/11/30.
3+
//
4+
5+
#include <asyncio/runner.h>
6+
#include <asyncio/start_server.h>
7+
#include <arpa/inet.h>
8+
9+
using asyncio::Task;
10+
using asyncio::Stream;
11+
using asyncio::get_in_addr;
12+
using asyncio::get_in_port;
13+
14+
Task<void> handle_echo(Stream stream) {
15+
auto& sockinfo = stream.get_sock_info();
16+
auto sa = reinterpret_cast<const sockaddr*>(&sockinfo);
17+
char addr[INET6_ADDRSTRLEN] {};
18+
19+
auto data = co_await stream.read(100);
20+
fmt::print("Received: '{}' from '{}:{}'\n", data.data(),
21+
inet_ntop(sockinfo.ss_family, get_in_addr(sa), addr, sizeof addr),
22+
get_in_port(sa));
23+
24+
fmt::print("Send: '{}'\n", data.data());
25+
co_await stream.write(data);
26+
27+
fmt::print("Close the connection\n");
28+
stream.close();
29+
}
30+
31+
Task<void> amain() {
32+
auto server = co_await asyncio::start_server(
33+
handle_echo, "127.0.0.1", 8888);
34+
35+
fmt::print("Serving on 127.0.0.1:8888\n");
36+
37+
co_await server.serve_forever();
38+
}
39+
40+
int main() {
41+
asyncio::run(amain());
42+
return 0;
43+
}

0 commit comments

Comments
 (0)