Skip to content

Commit 6cd6f88

Browse files
committed
finish stream, echo_client
1 parent 6c0f9a6 commit 6cd6f88

File tree

5 files changed

+98
-17
lines changed

5 files changed

+98
-17
lines changed

CMakeLists.txt

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,16 @@ set(ASYNCIO_INC
2727
include/asyncio/concept/awaitable.h
2828
include/asyncio/gather.h
2929
include/asyncio/result.h
30-
include/asyncio/callstack.h)
30+
include/asyncio/callstack.h
31+
include/asyncio/open_connection.h
32+
include/asyncio/stream.h
33+
)
3134

3235
include_directories(${CMAKE_SOURCE_DIR}/include)
3336

3437
add_library(asyncio STATIC
3538
${ASYNC_INC}
36-
src/event_loop.cpp include/asyncio/open_connection.h)
39+
src/event_loop.cpp)
3740
target_link_libraries(asyncio PUBLIC fmt::fmt)
3841

3942
add_subdirectory(test)

include/asyncio/open_connection.h

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,25 +5,17 @@
55
#ifndef ASYNCIO_OPEN_CONNECTION_H
66
#define ASYNCIO_OPEN_CONNECTION_H
77
#include <asyncio/asyncio_ns.h>
8+
#include <asyncio/stream.h>
89
#include <asyncio/selector/event.h>
910
#include <exception>
1011
#include <asyncio/task.h>
1112
#include <fcntl.h>
12-
#include <unistd.h>
1313
#include <sys/types.h>
1414
#include <sys/socket.h>
1515
#include <system_error>
1616
#include <netdb.h>
1717

1818
ASYNCIO_NS_BEGIN
19-
struct Stream: NonCopyable {
20-
Stream(int fd): fd_(fd) {}
21-
Stream(Stream&& other): fd_{std::exchange(other.fd_, -1) } {}
22-
~Stream() { if (fd_ > 0) { close(fd_); } }
23-
private:
24-
int fd_{-1};
25-
};
26-
2719
namespace detail {
2820
Task<bool> connect(int fd, const sockaddr *addr, socklen_t len) noexcept {
2921
int rc = ::connect(fd, addr, len);
@@ -73,8 +65,7 @@ Task<Stream> open_connection(std::string_view ip, uint16_t port) {
7365
continue;
7466
}
7567
if (co_await detail::connect(sockfd, p->ai_addr, p->ai_addrlen) ) {
76-
close(sockfd);
77-
continue;
68+
break;
7869
}
7970
}
8071
if (sockfd == -1) {

include/asyncio/selector/epoll_selector.h

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,7 @@ struct EpollSelector {
3232
return result;
3333
}
3434
~EpollSelector() {
35-
if (epfd_ > 0) {
36-
close(epfd_);
37-
}
35+
if (epfd_ > 0) { close(epfd_); }
3836
}
3937
bool is_stop() { return register_event_count_ == 1; }
4038
void register_event(const Event& event) {

include/asyncio/stream.h

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
//
2+
// Created by netcan on 2021/11/30.
3+
//
4+
5+
#ifndef ASYNCIO_STREAM_H
6+
#define ASYNCIO_STREAM_H
7+
#include <asyncio/asyncio_ns.h>
8+
#include <asyncio/noncopyable.h>
9+
#include <asyncio/task.h>
10+
#include <utility>
11+
#include <vector>
12+
#include <unistd.h>
13+
14+
ASYNCIO_NS_BEGIN
15+
struct Stream: NonCopyable {
16+
using Buffer = std::vector<char>;
17+
Stream(int fd): fd_(fd) {}
18+
Stream(Stream&& other): fd_{std::exchange(other.fd_, -1) } {}
19+
~Stream() { close(); }
20+
21+
void close() {
22+
if (fd_ > 0) { ::close(fd_); }
23+
fd_ = -1;
24+
}
25+
26+
Task<Buffer> read(ssize_t sz = -1) {
27+
if (sz < 0) { co_return co_await read_until_eof(); }
28+
29+
Buffer result(sz, 0);
30+
Event ev { .fd = fd_, .events = EPOLLIN };
31+
auto& loop = get_event_loop();
32+
co_await loop.wait_event(ev);
33+
sz = ::read(fd_, result.data(), result.size());
34+
if (sz == -1) {
35+
throw std::system_error(std::make_error_code(static_cast<std::errc>(errno)));
36+
}
37+
result.resize(sz);
38+
co_return result;
39+
}
40+
41+
Task<> write(const Buffer& buf) {
42+
Event ev { .fd = fd_, .events = EPOLLOUT };
43+
auto& loop = get_event_loop();
44+
ssize_t total_write = 0;
45+
while (total_write < buf.size()) {
46+
co_await loop.wait_event(ev);
47+
ssize_t sz = ::write(fd_, buf.data() + total_write, buf.size() - total_write);
48+
if (sz == -1) {
49+
throw std::system_error(std::make_error_code(static_cast<std::errc>(errno)));
50+
}
51+
total_write += sz;
52+
}
53+
}
54+
55+
private:
56+
Task<Buffer> read_until_eof() {
57+
auto& loop = get_event_loop();
58+
59+
Buffer result(chunk_size, 0);
60+
Event ev { .fd = fd_, .events = EPOLLIN };
61+
int current_read = 0;
62+
int total_read = 0;
63+
do {
64+
co_await loop.wait_event(ev);
65+
current_read = ::read(fd_, result.data() + total_read, chunk_size);
66+
if (current_read == -1) {
67+
throw std::system_error(std::make_error_code(static_cast<std::errc>(errno)));
68+
}
69+
if (current_read < chunk_size) { result.resize(total_read + current_read); }
70+
total_read += current_read;
71+
result.resize(total_read + chunk_size);
72+
} while (current_read > 0);
73+
co_return result;
74+
}
75+
private:
76+
int fd_{-1};
77+
constexpr static size_t chunk_size = 4096;
78+
};
79+
ASYNCIO_NS_END
80+
#endif // ASYNCIO_STREAM_H

test/st/echo_client.cpp

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,19 @@
44
#include <asyncio/open_connection.h>
55
#include <asyncio/runner.h>
66
using asyncio::Task;
7+
using asyncio::Stream;
78

89
Task<void> tcp_echo_client(std::string_view message) {
910
auto stream = co_await asyncio::open_connection("127.0.0.1", 8888);
10-
co_return;
11+
12+
fmt::print("Send: {}!\n", message);
13+
co_await stream.write(Stream::Buffer(message.begin(), message.end()));
14+
15+
auto data = co_await stream.read(100);
16+
fmt::print("Received: {}!\n", data.data());
17+
18+
fmt::print("Close the connection\n");
19+
stream.close();
1120
}
1221

1322
int main(int argc, char** argv) {

0 commit comments

Comments
 (0)