Skip to content

Commit abe6a32

Browse files
committed
add some fragmentation codes.
1 parent 97ebd9a commit abe6a32

File tree

13 files changed

+779
-106
lines changed

13 files changed

+779
-106
lines changed

examples/echo.rs

Lines changed: 119 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,13 @@ use rsocket_rust::prelude::*;
66
use rsocket_rust_transport_tcp::{TcpClientTransport, TcpServerTransport};
77
use std::error::Error;
88

9+
enum RequestMode {
10+
FNF,
11+
REQUEST,
12+
STREAM,
13+
CHANNEL,
14+
}
15+
916
#[tokio::main]
1017
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
1118
env_logger::builder().format_timestamp_millis().init();
@@ -17,6 +24,13 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
1724
.subcommand(
1825
SubCommand::with_name("serve")
1926
.about("serve an echo server")
27+
.arg(
28+
Arg::with_name("mtu")
29+
.long("mtu")
30+
.required(false)
31+
.takes_value(true)
32+
.help("Fragment mtu."),
33+
)
2034
.arg(
2135
Arg::with_name("URL")
2236
.required(true)
@@ -35,6 +49,41 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
3549
.takes_value(true)
3650
.help("Input payload data."),
3751
)
52+
.arg(
53+
Arg::with_name("mtu")
54+
.long("mtu")
55+
.required(false)
56+
.takes_value(true)
57+
.help("Fragment mtu."),
58+
)
59+
.arg(
60+
Arg::with_name("request")
61+
.long("request")
62+
.required(false)
63+
.takes_value(false)
64+
.help("request_response mode."),
65+
)
66+
.arg(
67+
Arg::with_name("channel")
68+
.long("channel")
69+
.required(false)
70+
.takes_value(false)
71+
.help("request_channel mode."),
72+
)
73+
.arg(
74+
Arg::with_name("stream")
75+
.long("stream")
76+
.required(false)
77+
.takes_value(false)
78+
.help("request_stream mode."),
79+
)
80+
.arg(
81+
Arg::with_name("fnf")
82+
.long("fnf")
83+
.required(false)
84+
.takes_value(false)
85+
.help("fire_and_forget mode."),
86+
)
3887
.arg(
3988
Arg::with_name("URL")
4089
.required(true)
@@ -52,8 +101,13 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
52101
match cli.subcommand() {
53102
("serve", Some(flags)) => {
54103
let addr = flags.value_of("URL").expect("Missing URL");
104+
let mtu: usize = flags
105+
.value_of("mtu")
106+
.map(|it| it.parse().expect("Invalid mtu string!"))
107+
.unwrap_or(0);
55108
RSocketFactory::receive()
56109
.transport(TcpServerTransport::from(addr))
110+
.fragment(mtu)
57111
.acceptor(|setup, _socket| {
58112
info!("accept setup: {:?}", setup);
59113
Ok(Box::new(EchoRSocket))
@@ -65,8 +119,35 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
65119
.await
66120
}
67121
("connect", Some(flags)) => {
122+
let mut modes: Vec<RequestMode> = vec![];
123+
124+
if flags.is_present("stream") {
125+
modes.push(RequestMode::STREAM);
126+
}
127+
if flags.is_present("fnf") {
128+
modes.push(RequestMode::FNF);
129+
}
130+
if flags.is_present("channel") {
131+
modes.push(RequestMode::CHANNEL);
132+
}
133+
134+
if flags.is_present("request") {
135+
modes.push(RequestMode::REQUEST);
136+
}
137+
138+
if modes.len() > 1 {
139+
error!("duplicated request mode: use one of --fnf/--request/--stream/--channel.");
140+
return Ok(());
141+
}
142+
143+
let mtu: usize = flags
144+
.value_of("mtu")
145+
.map(|it| it.parse().expect("Invalid mtu string!"))
146+
.unwrap_or(0);
147+
68148
let addr = flags.value_of("URL").expect("Missing URL");
69149
let cli = RSocketFactory::connect()
150+
.fragment(mtu)
70151
.transport(TcpClientTransport::from(addr))
71152
.start()
72153
.await
@@ -76,8 +157,44 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
76157
bu = bu.set_data_utf8(data);
77158
}
78159
let req = bu.build();
79-
let res = cli.request_response(req).await.expect("Request failed!");
80-
info!("request success: {:?}", res);
160+
161+
match modes.pop().unwrap_or(RequestMode::REQUEST) {
162+
RequestMode::FNF => {
163+
cli.fire_and_forget(req).await;
164+
}
165+
RequestMode::STREAM => {
166+
let mut results = cli.request_stream(req);
167+
loop {
168+
match results.next().await {
169+
Some(Ok(v)) => info!("{:?}", v),
170+
Some(Err(e)) => {
171+
error!("STREAM_RESPONSE FAILED: {:?}", e);
172+
break;
173+
}
174+
None => break,
175+
}
176+
}
177+
}
178+
RequestMode::CHANNEL => {
179+
let mut results =
180+
cli.request_channel(Box::pin(futures::stream::iter(vec![Ok(req)])));
181+
loop {
182+
match results.next().await {
183+
Some(Ok(v)) => info!("{:?}", v),
184+
Some(Err(e)) => {
185+
error!("CHANNEL_RESPONSE FAILED: {:?}", e);
186+
break;
187+
}
188+
None => break,
189+
}
190+
}
191+
}
192+
RequestMode::REQUEST => {
193+
let res = cli.request_response(req).await.expect("Request failed!");
194+
info!("{:?}", res);
195+
}
196+
}
197+
81198
Ok(())
82199
}
83200
_ => Ok(()),

rsocket/src/frame/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ pub const TYPE_RESUME_OK: u16 = 0x0E;
6262

6363
pub const REQUEST_MAX: u32 = 0x7FFF_FFFF; // 2147483647
6464

65-
const LEN_HEADER: usize = 6;
65+
pub(crate) const LEN_HEADER: usize = 6;
6666

6767
#[derive(Debug, PartialEq)]
6868
pub enum Body {

rsocket/src/frame/payload.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,21 @@ impl PayloadBuilder {
2626
}
2727
}
2828

29+
pub fn set_all(mut self, data_and_metadata: (Option<Bytes>, Option<Bytes>)) -> Self {
30+
self.value.data = data_and_metadata.0;
31+
match data_and_metadata.1 {
32+
Some(m) => {
33+
self.value.metadata = Some(m);
34+
self.flag |= FLAG_METADATA;
35+
}
36+
None => {
37+
self.value.metadata = None;
38+
self.flag &= !FLAG_METADATA;
39+
}
40+
}
41+
self
42+
}
43+
2944
pub fn set_data(mut self, data: Bytes) -> Self {
3045
self.value.data = Some(data);
3146
self

rsocket/src/frame/request_channel.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,21 @@ impl RequestChannelBuilder {
3737
self
3838
}
3939

40+
pub fn set_all(mut self, data_and_metadata: (Option<Bytes>, Option<Bytes>)) -> Self {
41+
self.value.data = data_and_metadata.0;
42+
match data_and_metadata.1 {
43+
Some(m) => {
44+
self.value.metadata = Some(m);
45+
self.flag |= FLAG_METADATA;
46+
}
47+
None => {
48+
self.value.metadata = None;
49+
self.flag &= !FLAG_METADATA;
50+
}
51+
}
52+
self
53+
}
54+
4055
pub fn set_metadata(mut self, metadata: Bytes) -> Self {
4156
self.value.metadata = Some(metadata);
4257
self.flag |= FLAG_METADATA;

rsocket/src/frame/request_fnf.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,21 @@ impl RequestFNFBuilder {
4040
self.value.data = Some(data);
4141
self
4242
}
43+
44+
pub fn set_all(mut self, data_and_metadata: (Option<Bytes>, Option<Bytes>)) -> Self {
45+
self.value.data = data_and_metadata.0;
46+
match data_and_metadata.1 {
47+
Some(m) => {
48+
self.value.metadata = Some(m);
49+
self.flag |= FLAG_METADATA;
50+
}
51+
None => {
52+
self.value.metadata = None;
53+
self.flag &= !FLAG_METADATA;
54+
}
55+
}
56+
self
57+
}
4358
}
4459

4560
impl RequestFNF {

rsocket/src/frame/request_response.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,21 @@ impl RequestResponseBuilder {
2626
}
2727
}
2828

29+
pub fn set_all(mut self, data_and_metadata: (Option<Bytes>, Option<Bytes>)) -> Self {
30+
self.value.data = data_and_metadata.0;
31+
match data_and_metadata.1 {
32+
Some(m) => {
33+
self.value.metadata = Some(m);
34+
self.flag |= FLAG_METADATA;
35+
}
36+
None => {
37+
self.value.metadata = None;
38+
self.flag &= !FLAG_METADATA;
39+
}
40+
}
41+
self
42+
}
43+
2944
pub fn set_metadata(mut self, metadata: Bytes) -> Self {
3045
self.value.metadata = Some(metadata);
3146
self.flag |= FLAG_METADATA;

rsocket/src/frame/request_stream.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,21 @@ impl RequestStreamBuilder {
2525
self
2626
}
2727

28+
pub fn set_all(mut self, data_and_metadata: (Option<Bytes>, Option<Bytes>)) -> Self {
29+
self.value.data = data_and_metadata.0;
30+
match data_and_metadata.1 {
31+
Some(m) => {
32+
self.value.metadata = Some(m);
33+
self.flag |= FLAG_METADATA;
34+
}
35+
None => {
36+
self.value.metadata = None;
37+
self.flag &= !FLAG_METADATA;
38+
}
39+
}
40+
self
41+
}
42+
2843
pub fn set_data(mut self, data: Bytes) -> Self {
2944
self.value.data = Some(data);
3045
self

rsocket/src/payload/normal.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ impl PayloadBuilder {
5151
}
5252

5353
impl Payload {
54-
fn new(data: Option<Bytes>, metadata: Option<Bytes>) -> Payload {
54+
pub(crate) fn new(data: Option<Bytes>, metadata: Option<Bytes>) -> Payload {
5555
Payload {
5656
d: data,
5757
m: metadata,
@@ -82,6 +82,21 @@ impl Payload {
8282
.map(|raw| String::from_utf8(raw.to_vec()).unwrap())
8383
}
8484

85+
pub fn is_empty(&self) -> bool {
86+
self.len() == 0
87+
}
88+
89+
pub fn len(&self) -> usize {
90+
let mut n = 0;
91+
if let Some(it) = &self.m {
92+
n += it.len();
93+
}
94+
if let Some(it) = &self.d {
95+
n += it.len();
96+
}
97+
n
98+
}
99+
85100
pub fn split(self) -> (Option<Bytes>, Option<Bytes>) {
86101
(self.d, self.m)
87102
}

0 commit comments

Comments
 (0)