Skip to content

Commit 6c0f9a6

Browse files
committed
add open_connection
1 parent f512e78 commit 6c0f9a6

File tree

12 files changed

+194
-25
lines changed

12 files changed

+194
-25
lines changed

CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ include_directories(${CMAKE_SOURCE_DIR}/include)
3333

3434
add_library(asyncio STATIC
3535
${ASYNC_INC}
36-
src/event_loop.cpp)
36+
src/event_loop.cpp include/asyncio/open_connection.h)
3737
target_link_libraries(asyncio PUBLIC fmt::fmt)
3838

3939
add_subdirectory(test)

README.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,9 +100,10 @@ Task C: factorial(4) = 24
100100
- gcc-12
101101

102102
## TODO
103-
- implement result type for code reuse, `variant<monostate, value, exception>`
104-
- implement coroutine backtrace(dump continuation chain)
105-
- implement some io coroutine(socket/read/write/close)
103+
- [x] implement result type for code reuse, `variant<monostate, value, exception>`
104+
- [x] implement coroutine backtrace(dump continuation chain)
105+
- [ ] implement some io coroutine(socket/read/write/close)
106+
- [ ] using libuv as backend
106107

107108
## Reference
108109
- https://github.com/lewissbaker/cppcoro

include/asyncio/event_loop.h

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ class EventLoop: private NonCopyable {
3030
}
3131

3232
bool is_stop() {
33-
return schedule_.empty() && ready_.empty();
33+
return schedule_.empty() && ready_.empty() && selector_.is_stop();
3434
}
3535

3636
template<typename Rep, typename Period>
@@ -61,6 +61,28 @@ class EventLoop: private NonCopyable {
6161
return future.get_result();
6262
}
6363

64+
struct WaitEventAwaiter {
65+
constexpr bool await_ready() const noexcept { return false; }
66+
template<typename Promise>
67+
constexpr void await_suspend(std::coroutine_handle<Promise> handle) noexcept {
68+
auto& promise = handle.promise();
69+
promise.set_state(PromiseState::PENDING);
70+
event_.data = static_cast<Handle*>(&promise);
71+
selector_.register_event(event_);
72+
}
73+
void await_resume() noexcept {
74+
selector_.remove_event(event_);
75+
}
76+
77+
Selector& selector_;
78+
Event event_;
79+
};
80+
81+
[[nodiscard]]
82+
auto wait_event(const Event& event) {
83+
return WaitEventAwaiter(selector_, event);
84+
}
85+
6486
void run_forever();
6587

6688
private:

include/asyncio/open_connection.h

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
//
2+
// Created by netcan on 2021/11/29.
3+
//
4+
5+
#ifndef ASYNCIO_OPEN_CONNECTION_H
6+
#define ASYNCIO_OPEN_CONNECTION_H
7+
#include <asyncio/asyncio_ns.h>
8+
#include <asyncio/selector/event.h>
9+
#include <exception>
10+
#include <asyncio/task.h>
11+
#include <fcntl.h>
12+
#include <unistd.h>
13+
#include <sys/types.h>
14+
#include <sys/socket.h>
15+
#include <system_error>
16+
#include <netdb.h>
17+
18+
ASYNCIO_NS_BEGIN
19+
struct Stream: NonCopyable {
20+
Stream(int fd): fd_(fd) {}
21+
Stream(Stream&& other): fd_{std::exchange(other.fd_, -1) } {}
22+
~Stream() { if (fd_ > 0) { close(fd_); } }
23+
private:
24+
int fd_{-1};
25+
};
26+
27+
namespace detail {
28+
Task<bool> connect(int fd, const sockaddr *addr, socklen_t len) noexcept {
29+
int rc = ::connect(fd, addr, len);
30+
if (rc == 0) { co_return true; }
31+
if (rc < 0 && errno != EINPROGRESS) {
32+
throw std::system_error(std::make_error_code(static_cast<std::errc>(errno)));
33+
}
34+
Event ev { .fd = fd, .events = EPOLLOUT };
35+
auto& loop = get_event_loop();
36+
co_await loop.wait_event(ev);
37+
38+
int result{0};
39+
socklen_t result_len = sizeof(result);
40+
if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &result, &result_len) < 0) {
41+
// error, fail somehow, close socket
42+
co_return false;
43+
}
44+
co_return result == 0;
45+
}
46+
47+
struct AddrInfoRAII {
48+
AddrInfoRAII(addrinfo* info): info_(info) { }
49+
~AddrInfoRAII() { freeaddrinfo(info_); }
50+
private:
51+
addrinfo* info_{nullptr};
52+
};
53+
}
54+
55+
Task<Stream> open_connection(std::string_view ip, uint16_t port) {
56+
addrinfo hints {
57+
.ai_family = AF_UNSPEC,
58+
.ai_socktype = SOCK_STREAM,
59+
};
60+
addrinfo *server_info {nullptr};
61+
auto service = std::to_string(port);
62+
// TODO: getaddrinfo is a blocking api
63+
if (int rv = getaddrinfo(ip.data(), service.c_str(), &hints, &server_info);
64+
rv != 0) {
65+
throw std::system_error(std::make_error_code(std::errc::address_not_available));
66+
}
67+
detail::AddrInfoRAII _i(server_info);
68+
69+
int sockfd = -1;
70+
for (auto p = server_info; p != nullptr; p = p->ai_next) {
71+
sockfd = -1;
72+
if ( (sockfd = socket(p->ai_family, p->ai_socktype | SOCK_NONBLOCK, p->ai_protocol)) == -1) {
73+
continue;
74+
}
75+
if (co_await detail::connect(sockfd, p->ai_addr, p->ai_addrlen) ) {
76+
close(sockfd);
77+
continue;
78+
}
79+
}
80+
if (sockfd == -1) {
81+
throw std::system_error(std::make_error_code(std::errc::address_not_available));
82+
}
83+
84+
co_return Stream {sockfd};
85+
}
86+
87+
ASYNCIO_NS_END
88+
89+
#endif // ASYNCIO_OPEN_CONNECTION_H

include/asyncio/result.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ struct Result {
2525
return set_value(std::forward<R>(value));
2626
}
2727

28-
constexpr T& result() & {
28+
constexpr T result() & {
2929
if (auto exception = std::get_if<std::exception_ptr>(&result_)) {
3030
std::rethrow_exception(*exception);
3131
}
@@ -34,7 +34,7 @@ struct Result {
3434
}
3535
throw NoResultError{};
3636
}
37-
constexpr T&& result() && {
37+
constexpr T result() && {
3838
if (auto exception = std::get_if<std::exception_ptr>(&result_)) {
3939
std::rethrow_exception(*exception);
4040
}

include/asyncio/selector/epoll_selector.h

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,42 @@ struct EpollSelector {
1717
perror("epoll_create1");
1818
throw;
1919
}
20-
events_.resize(1);
2120
}
22-
std::vector<Event> select(size_t timeout /* ms */) {
21+
std::vector<Event> select(int timeout /* ms */) {
2322
errno = 0;
24-
int ndfs = epoll_wait(epfd_, events_.data(), events_.size(), timeout);
25-
// TODO: fill the event list
26-
std::vector<Event> events;
27-
return events;
23+
std::vector<epoll_event> events;
24+
events.resize(register_event_count_);
25+
int ndfs = epoll_wait(epfd_, events.data(), register_event_count_, timeout);
26+
std::vector<Event> result;
27+
for (size_t i = 0; i < ndfs; ++i) {
28+
result.emplace_back(Event {
29+
.data = events[i].data.ptr
30+
});
31+
}
32+
return result;
2833
}
2934
~EpollSelector() {
3035
if (epfd_ > 0) {
3136
close(epfd_);
3237
}
3338
}
39+
bool is_stop() { return register_event_count_ == 1; }
40+
void register_event(const Event& event) {
41+
epoll_event ev{ .events = event.events, .data {.ptr = event.data } };
42+
if (epoll_ctl(epfd_, EPOLL_CTL_ADD, event.fd, &ev) == 0) {
43+
++register_event_count_;
44+
}
45+
}
46+
47+
void remove_event(const Event& event) {
48+
epoll_event ev{ .events = event.events };
49+
if (epoll_ctl(epfd_, EPOLL_CTL_DEL, event.fd, &ev) == 0) {
50+
--register_event_count_;
51+
}
52+
}
3453
private:
3554
int epfd_;
36-
std::vector<epoll_event> events_;
55+
int register_event_count_ {1};
3756
};
3857
ASYNCIO_NS_END
3958
#endif // ASYNCIO_EPOLL_SELECTOR_H

include/asyncio/selector/event.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ ASYNCIO_NS_BEGIN
1111
struct Event {
1212
int fd;
1313
uint32_t events;
14+
void* data {nullptr};
1415
};
1516
ASYNCIO_NS_END
1617

include/asyncio/task.h

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,8 @@ struct Task: private NonCopyable {
4242
return handle_.promise().result();
4343
}
4444

45-
struct Awaiter {
45+
struct AwaiterBase {
4646
constexpr bool await_ready() { return self_coro_.done(); }
47-
R await_resume() const {
48-
return self_coro_.promise().result();
49-
}
5047
template<typename Promise>
5148
void await_suspend(std::coroutine_handle<Promise> resumer) const noexcept {
5249
assert(! self_coro_.promise().continuation_);
@@ -60,7 +57,21 @@ struct Task: private NonCopyable {
6057
}
6158
coro_handle self_coro_ {};
6259
};
63-
auto operator co_await() const noexcept {
60+
auto operator co_await() const & noexcept {
61+
struct Awaiter: AwaiterBase {
62+
decltype(auto) await_resume() const {
63+
return AwaiterBase::self_coro_.promise().result();
64+
}
65+
};
66+
return Awaiter {handle_};
67+
}
68+
69+
auto operator co_await() const && noexcept {
70+
struct Awaiter: AwaiterBase {
71+
decltype(auto) await_resume() const {
72+
return std::move(AwaiterBase::self_coro_.promise()).result();
73+
}
74+
};
6475
return Awaiter {handle_};
6576
}
6677

src/event_loop.cpp

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// Created by netcan on 2021/09/07.
33
//
44
#include <chrono>
5+
#include <optional>
56
#include <asyncio/event_loop.h>
67

78
namespace ranges = std::ranges;
@@ -19,7 +20,7 @@ void EventLoop::run_forever() {
1920
}
2021

2122
void EventLoop::run_once() {
22-
MSDuration timeout{0};
23+
std::optional<MSDuration> timeout;
2324
// Remove delayed calls that were cancelled from head of queue.
2425
while (! schedule_.empty()) {
2526
auto&& [when, handle] = schedule_[0];
@@ -32,13 +33,18 @@ void EventLoop::run_once() {
3233
}
3334
}
3435

35-
if (ready_.empty() && ! schedule_.empty()) {
36+
if (! ready_.empty()) {
37+
timeout.emplace(0);
38+
} else if (! schedule_.empty()) {
3639
auto&& [when, _] = schedule_[0];
3740
timeout = std::max(when - time(), MSDuration(0));
3841
}
3942

40-
auto event_lists = selector_.select(timeout.count());
41-
// TODO: handle event_lists
43+
auto event_lists = selector_.select(timeout.has_value() ? timeout->count() : -1);
44+
for (auto&& event: event_lists) {
45+
Handle* continuation_ = reinterpret_cast<Handle*>(event.data);
46+
ready_.emplace(continuation_);
47+
}
4248

4349
auto end_time = time();
4450
while (! schedule_.empty()) {

test/st/CMakeLists.txt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,4 @@
11
add_executable(hello_world hello_world.cpp)
2-
target_link_libraries(hello_world asyncio)
2+
add_executable(echo_client echo_client.cpp)
3+
target_link_libraries(hello_world asyncio)
4+
target_link_libraries(echo_client asyncio)

0 commit comments

Comments
 (0)