Skip to content

Commit 2676bb4

Browse files
committed
add messaging requester builder.
1 parent 9542648 commit 2676bb4

File tree

13 files changed

+234
-86
lines changed

13 files changed

+234
-86
lines changed

examples/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ publish = false
88
[dev-dependencies]
99
log = "0.4.8"
1010
env_logger = "0.7.1"
11-
futures = "0.3.4"
11+
futures = "0.3.5"
1212
clap = "2.33.0"
1313

1414
[dev-dependencies.rsocket_rust]

rsocket-benchmark/Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,13 @@ publish = false
77

88
[dev-dependencies]
99
log = "0.4"
10-
futures = "0.3.4"
10+
futures = "0.3.5"
1111
env_logger = "0.7.1"
1212
bytes = "0.5.4"
1313
hex = "0.4.2"
1414
rand = "0.7.3"
15-
serde = "1.0.106"
16-
serde_derive = "1.0.106"
15+
serde = "1.0.110"
16+
serde_derive = "1.0.110"
1717
criterion = "0.3.2"
1818

1919
[dev-dependencies.rsocket_rust]

rsocket-messaging/Cargo.toml

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,20 @@ edition = "2018"
66

77
[dependencies]
88
log = "0.4.8"
9-
futures = "0.3.4"
9+
futures = "0.3.5"
1010
bytes = "0.5.4"
11-
serde = "1.0.106"
12-
serde_json = "1.0.52"
11+
serde = "1.0.110"
12+
serde_json = "1.0.53"
1313
serde_cbor = "0.11.1"
1414
hex = "0.4.2"
15+
url = "2.1.1"
1516

1617
[dependencies.rsocket_rust]
1718
path = "../rsocket"
18-
features = ["frame"]
19+
features = ["frame"]
20+
21+
[dependencies.rsocket_rust_transport_tcp]
22+
path = "../rsocket-transport-tcp"
23+
24+
[dependencies.rsocket_rust_transport_websocket]
25+
path = "../rsocket-transport-websocket"

rsocket-messaging/src/builder.rs

Lines changed: 0 additions & 48 deletions
This file was deleted.

rsocket-messaging/src/lib.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1-
mod builder;
1+
#[macro_use]
2+
extern crate log;
3+
24
mod misc;
35
mod requester;
46

5-
pub use builder::RequesterBuilder;
6-
pub use requester::{RequestSpec, Requester};
7+
pub use requester::{RequestSpec, Requester, RequesterBuilder};

rsocket-messaging/src/requester.rs

Lines changed: 170 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,25 @@
11
use super::misc::{marshal, unmarshal};
22
use bytes::BytesMut;
33
use rsocket_rust::error::RSocketError;
4-
use rsocket_rust::extension::{CompositeMetadata, MimeType, RoutingMetadata};
4+
use rsocket_rust::extension::{
5+
CompositeMetadata, CompositeMetadataEntry, MimeType, RoutingMetadata,
6+
};
57
use rsocket_rust::prelude::*;
68
use rsocket_rust::utils::Writeable;
9+
use rsocket_rust_transport_tcp::TcpClientTransport;
10+
use rsocket_rust_transport_websocket::WebsocketClientTransport;
711
use serde::{Deserialize, Serialize};
812
use std::collections::LinkedList;
913
use std::error::Error;
14+
use std::net::SocketAddr;
15+
use std::result::Result;
1016
use std::sync::Arc;
17+
use url::Url;
18+
19+
enum TransportKind {
20+
TCP(String, u16),
21+
WS(String),
22+
}
1123

1224
pub struct Requester {
1325
rsocket: Arc<Box<dyn RSocket>>,
@@ -20,6 +32,159 @@ pub struct RequestSpec {
2032
metadatas: LinkedList<(MimeType, Vec<u8>)>,
2133
}
2234

35+
pub struct RequesterBuilder {
36+
data_mime_type: Option<MimeType>,
37+
route: Option<String>,
38+
metadata: Vec<CompositeMetadataEntry>,
39+
data: Option<Vec<u8>>,
40+
tp: Option<TransportKind>,
41+
}
42+
43+
impl Default for RequesterBuilder {
44+
fn default() -> Self {
45+
Self {
46+
data_mime_type: None,
47+
route: None,
48+
metadata: Default::default(),
49+
data: None,
50+
tp: None,
51+
}
52+
}
53+
}
54+
55+
impl RequesterBuilder {
56+
pub fn data_mime_type<I>(mut self, mime_type: I) -> Self
57+
where
58+
I: Into<MimeType>,
59+
{
60+
self.data_mime_type = Some(mime_type.into());
61+
self
62+
}
63+
64+
pub fn setup_route<I>(mut self, route: I) -> Self
65+
where
66+
I: Into<String>,
67+
{
68+
self.route = Some(route.into());
69+
self
70+
}
71+
72+
pub fn setup_data<D>(mut self, data: &D) -> Self
73+
where
74+
D: Sized + Serialize,
75+
{
76+
// TODO: lazy set
77+
let mut bf = BytesMut::new();
78+
let result = match &self.data_mime_type {
79+
Some(m) => marshal(m, &mut bf, data),
80+
None => marshal(&MimeType::APPLICATION_JSON, &mut bf, data),
81+
};
82+
match result {
83+
Ok(()) => {
84+
self.data = Some(bf.to_vec());
85+
}
86+
Err(e) => {
87+
error!("marshal failed: {:?}", e);
88+
}
89+
}
90+
self
91+
}
92+
93+
pub fn setup_metadata<M, T>(mut self, metadata: &M, mime_type: T) -> Self
94+
where
95+
M: Sized + Serialize,
96+
T: Into<MimeType>,
97+
{
98+
// TODO: lazy set
99+
let mut bf = BytesMut::new();
100+
let mime_type = mime_type.into();
101+
match marshal(&mime_type, &mut bf, metadata) {
102+
Ok(()) => {
103+
let entry = CompositeMetadataEntry::new(mime_type, bf.freeze());
104+
self.metadata.push(entry);
105+
}
106+
Err(e) => error!("marshal failed: {:?}", e),
107+
}
108+
self
109+
}
110+
111+
pub fn connect_tcp<A>(mut self, host: A, port: u16) -> Self
112+
where
113+
A: Into<String>,
114+
{
115+
self.tp = Some(TransportKind::TCP(host.into(), port));
116+
self
117+
}
118+
119+
pub fn connect_websocket<I>(mut self, url: I) -> Self
120+
where
121+
I: Into<String>,
122+
{
123+
self.tp = Some(TransportKind::WS(url.into()));
124+
self
125+
}
126+
127+
pub async fn build(self) -> Result<Requester, Box<dyn Error + Send + Sync>> {
128+
let data_mime_type = self.data_mime_type.unwrap_or(MimeType::APPLICATION_JSON);
129+
130+
let mut added = 0usize;
131+
let mut composite_builder = CompositeMetadata::builder();
132+
133+
if let Some(s) = self.route {
134+
let routing = RoutingMetadata::builder().push(s).build();
135+
composite_builder =
136+
composite_builder.push(MimeType::MESSAGE_X_RSOCKET_ROUTING_V0, routing.bytes());
137+
added += 1;
138+
}
139+
for it in self.metadata.into_iter() {
140+
composite_builder = composite_builder.push_entry(it);
141+
added += 1;
142+
}
143+
144+
let mut payload_builder = Payload::builder();
145+
146+
if added > 0 {
147+
payload_builder = payload_builder.set_metadata(composite_builder.build());
148+
}
149+
150+
if let Some(raw) = self.data {
151+
payload_builder = payload_builder.set_data(raw);
152+
}
153+
154+
let setup = payload_builder.build();
155+
156+
match self.tp {
157+
Some(TransportKind::TCP(h, p)) => {
158+
let addr: SocketAddr = format!("{}:{}", h, p).parse()?;
159+
let cli = RSocketFactory::connect()
160+
.data_mime_type(data_mime_type.as_ref())
161+
.setup(setup)
162+
.metadata_mime_type(MimeType::MESSAGE_X_RSOCKET_COMPOSITE_METADATA_V0.as_ref())
163+
.transport(TcpClientTransport::from(addr))
164+
.start()
165+
.await?;
166+
let rsocket: Box<dyn RSocket> = Box::new(cli);
167+
let requester = Requester::from(rsocket);
168+
Ok(requester)
169+
}
170+
Some(TransportKind::WS(u)) => {
171+
let url = Url::parse(&u)?;
172+
let cli = RSocketFactory::connect()
173+
.data_mime_type(data_mime_type.as_ref())
174+
.setup(setup)
175+
.metadata_mime_type(MimeType::MESSAGE_X_RSOCKET_COMPOSITE_METADATA_V0.as_ref())
176+
.transport(WebsocketClientTransport::from(url))
177+
.start()
178+
.await?;
179+
let rsocket: Box<dyn RSocket> = Box::new(cli);
180+
let requester = Requester::from(rsocket);
181+
Ok(requester)
182+
}
183+
None => Err("Missing transport!".into()),
184+
}
185+
}
186+
}
187+
23188
impl From<Box<dyn RSocket>> for Requester {
24189
fn from(rsocket: Box<dyn RSocket>) -> Requester {
25190
Requester {
@@ -29,6 +194,10 @@ impl From<Box<dyn RSocket>> for Requester {
29194
}
30195

31196
impl Requester {
197+
pub fn builder() -> RequesterBuilder {
198+
RequesterBuilder::default()
199+
}
200+
32201
pub fn route(&self, route: &str) -> RequestSpec {
33202
let routing = RoutingMetadata::builder().push_str(route).build();
34203
let mut buf = BytesMut::new();

rsocket-test/Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,13 @@ publish = false
77

88
[dev-dependencies]
99
log = "0.4"
10-
futures = "0.3.4"
10+
futures = "0.3.5"
1111
env_logger = "0.7.1"
1212
bytes = "0.5.4"
1313
hex = "0.4.2"
1414
rand = "0.7.3"
15-
serde = "1.0.106"
16-
serde_derive = "1.0.106"
15+
serde = "1.0.110"
16+
serde_derive = "1.0.110"
1717

1818
[dev-dependencies.rsocket_rust]
1919
path = "../rsocket"

rsocket-test/tests/test_messaging.rs

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,21 @@ extern crate log;
33
#[macro_use]
44
extern crate serde_derive;
55

6-
use rsocket_rust::prelude::*;
6+
use rsocket_rust::extension::MimeType;
77
use rsocket_rust_messaging::*;
8-
use rsocket_rust_transport_tcp::TcpClientTransport;
8+
9+
fn init() {
10+
let _ = env_logger::builder()
11+
.format_timestamp_millis()
12+
.is_test(true)
13+
.try_init();
14+
}
15+
16+
#[derive(Serialize, Deserialize, Debug)]
17+
pub struct Token {
18+
app: String,
19+
access: String,
20+
}
921

1022
#[derive(Serialize, Deserialize, Debug)]
1123
pub struct Student {
@@ -25,17 +37,19 @@ pub struct Response<T> {
2537
#[test]
2638
#[ignore]
2739
async fn test_messaging() {
28-
let rsocket = RSocketFactory::connect()
29-
.transport(TcpClientTransport::from("tcp://127.0.0.1:7878"))
30-
.data_mime_type("application/json")
31-
.metadata_mime_type("message/x.rsocket.composite-metadata.v0")
32-
.start()
40+
init();
41+
let token = Token {
42+
app: "xxx".to_owned(),
43+
access: "yyy".to_owned(),
44+
};
45+
let requester = Requester::builder()
46+
.setup_metadata(&token, MimeType::APPLICATION_JSON)
47+
.setup_data(&token)
48+
.connect_tcp("127.0.0.1", 7878)
49+
.build()
3350
.await
3451
.expect("Connect failed!");
3552

36-
let rsocket: Box<dyn RSocket> = Box::new(rsocket);
37-
let requester = Requester::from(rsocket);
38-
3953
let post = Student {
4054
id: 1234,
4155
name: "Jeffsky".to_owned(),
@@ -52,5 +66,5 @@ async fn test_messaging() {
5266
.block()
5367
.expect("Retrieve failed!")
5468
.expect("Empty result!");
55-
println!("------> RESPONSE: {:?}", res);
69+
info!("------> RESPONSE: {:?}", res);
5670
}

rsocket-transport-tcp/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ description = "TCP RSocket transport implementation."
1111

1212
[dependencies]
1313
log = "0.4.8"
14-
futures = "0.3.4"
14+
futures = "0.3.5"
1515
bytes = "0.5.4"
1616

1717
[dependencies.rsocket_rust]

0 commit comments

Comments
 (0)