Skip to content

Commit 65444d9

Browse files
caiweiwei.cwwjjeffcaii
authored andcommitted
fix: check send error
1 parent 49acec6 commit 65444d9

File tree

10 files changed

+113
-53
lines changed

10 files changed

+113
-53
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@ target/
22
**/*.rs.bk
33
Cargo.lock
44
.vscode/
5-
5+
/flamegraph.svg

examples/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ log = "0.4.13"
1010
env_logger = "0.8.2"
1111
futures = "0.3.10"
1212
clap = "2.33.3"
13+
pprof = { version = "0.4", features = ["flamegraph"] }
1314

1415
[dev-dependencies.rsocket_rust]
1516
path = "../rsocket"
@@ -22,7 +23,7 @@ features = ["tls"]
2223
path = "../rsocket-transport-websocket"
2324

2425
[dev-dependencies.tokio]
25-
version = "1.0.1"
26+
version = "1.0.3"
2627
default-features = false
2728
features = ["full"]
2829

examples/qps.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#[macro_use]
22
extern crate log;
33

4+
use std::fs::File;
45
use std::sync::{
56
atomic::{AtomicU32, Ordering},
67
Arc,
@@ -65,6 +66,13 @@ async fn main() -> Result<()> {
6566
.version("0.0.0")
6667
.author("Jeffsky <jjeffcaii@outlook.com>")
6768
.about("An QPS benchmark tool for RSocket.")
69+
.arg(
70+
Arg::with_name("pprof")
71+
.long("pprof")
72+
.required(false)
73+
.takes_value(false)
74+
.help("Enable pprof."),
75+
)
6876
.arg(
6977
Arg::with_name("count")
7078
.short("c")
@@ -100,6 +108,12 @@ async fn main() -> Result<()> {
100108
.value_of("size")
101109
.map(|s| s.parse().expect("Invalid size!"))
102110
.unwrap();
111+
112+
let mut guard = None;
113+
if cli.is_present("pprof") {
114+
guard = Some(pprof::ProfilerGuard::new(100).unwrap());
115+
}
116+
103117
let addr = cli.value_of("URL").unwrap();
104118

105119
let notify = Arc::new(Notify::new());
@@ -145,5 +159,16 @@ async fn main() -> Result<()> {
145159
costs,
146160
1000f64 * (count as f64) / (costs as f64)
147161
);
162+
163+
if let Some(guard) = guard {
164+
match guard.report().build() {
165+
Ok(report) => {
166+
let file = File::create("flamegraph.svg").unwrap();
167+
report.flamegraph(file).unwrap();
168+
}
169+
Err(_) => {}
170+
};
171+
}
172+
148173
Ok(())
149174
}

justfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,4 @@ fmt:
1313
echo:
1414
@RUST_LOG=release cargo run --release --example echo -- serve tcp://127.0.0.1:7878
1515
bench:
16-
@RUST_LOG=info cargo run --release --example qps -- -c 1000000 -s 1024 tcp://127.0.0.1:7878
16+
@RUST_LOG=info cargo run --release --example qps -- -c 1000000 -s 1024 --pprof tcp://127.0.0.1:7878

rsocket-messaging/src/requester.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -155,9 +155,9 @@ impl RequesterBuilder {
155155
Some(TransportKind::TCP(h, p)) => {
156156
let addr: SocketAddr = format!("{}:{}", h, p).parse()?;
157157
let cli = RSocketFactory::connect()
158-
.data_mime_type(data_mime_type.as_ref())
158+
.data_mime_type(data_mime_type)
159159
.setup(setup)
160-
.metadata_mime_type(MimeType::MESSAGE_X_RSOCKET_COMPOSITE_METADATA_V0.as_ref())
160+
.metadata_mime_type(MimeType::MESSAGE_X_RSOCKET_COMPOSITE_METADATA_V0)
161161
.transport(TcpClientTransport::from(addr))
162162
.start()
163163
.await?;
@@ -168,9 +168,9 @@ impl RequesterBuilder {
168168
Some(TransportKind::WS(u)) => {
169169
let url = Url::parse(&u)?;
170170
let cli = RSocketFactory::connect()
171-
.data_mime_type(data_mime_type.as_ref())
171+
.data_mime_type(data_mime_type)
172172
.setup(setup)
173-
.metadata_mime_type(MimeType::MESSAGE_X_RSOCKET_COMPOSITE_METADATA_V0.as_ref())
173+
.metadata_mime_type(MimeType::MESSAGE_X_RSOCKET_COMPOSITE_METADATA_V0)
174174
.transport(WebsocketClientTransport::from(url))
175175
.start()
176176
.await?;

rsocket/src/core/client.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,10 @@ where
5252

5353
pub fn fragment(mut self, mtu: usize) -> Self {
5454
if mtu > 0 && mtu < transport::MIN_MTU {
55-
panic!("invalid fragment mtu: at least {}!", transport::MIN_MTU)
55+
warn!("invalid fragment mtu: at least {}!", transport::MIN_MTU)
56+
} else {
57+
self.mtu = mtu;
5658
}
57-
self.mtu = mtu;
5859
self
5960
}
6061

@@ -82,18 +83,22 @@ where
8283
self
8384
}
8485

85-
pub fn mime_type(mut self, metadata_mime_type: &str, data_mime_type: &str) -> Self {
86+
pub fn mime_type(
87+
mut self,
88+
metadata_mime_type: impl Into<String>,
89+
data_mime_type: impl Into<String>,
90+
) -> Self {
8691
self = self.metadata_mime_type(metadata_mime_type);
8792
self = self.data_mime_type(data_mime_type);
8893
self
8994
}
9095

91-
pub fn data_mime_type(mut self, mime_type: &str) -> Self {
96+
pub fn data_mime_type(mut self, mime_type: impl Into<String>) -> Self {
9297
self.setup = self.setup.set_data_mime_type(mime_type);
9398
self
9499
}
95100

96-
pub fn metadata_mime_type(mut self, mime_type: &str) -> Self {
101+
pub fn metadata_mime_type(mut self, mime_type: impl Into<String>) -> Self {
97102
self.setup = self.setup.set_metadata_mime_type(mime_type);
98103
self
99104
}
@@ -232,7 +237,7 @@ where
232237
}
233238
});
234239

235-
socket.setup(setup).await;
240+
socket.setup(setup).await?;
236241

237242
Ok(Client::new(socket, close_notify, closing))
238243
}

rsocket/src/extension/composite.rs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -119,13 +119,7 @@ impl CompositeMetadata {
119119
let mime_type = if 0x80 & first != 0 {
120120
// Well
121121
let n = first & 0x7F;
122-
match MimeType::parse(n) {
123-
Some(well) => well,
124-
None => {
125-
let err_str = format!("invalid Well-Known MIME type: identifier={:x}", n);
126-
return Err(RSocketError::WithDescription(err_str).into());
127-
}
128-
}
122+
MimeType::WellKnown(n)
129123
} else {
130124
// Bad
131125
let mime_len = (first as usize) + 1;

rsocket/src/extension/mime.rs

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
use std::collections::HashMap;
2+
use std::convert::TryInto;
23
use std::fmt;
34

45
use once_cell::sync::Lazy;
56

7+
use crate::error::RSocketError;
8+
use crate::Result;
9+
610
#[derive(PartialEq, Eq, Debug, Clone, Hash)]
711
pub enum MimeType {
812
Normal(String),
@@ -36,19 +40,26 @@ impl MimeType {
3640
Self::Normal(_) => None,
3741
}
3842
}
39-
}
4043

41-
impl Into<String> for MimeType {
42-
fn into(self) -> String {
43-
self.as_ref().to_owned()
44+
pub fn as_str(&self) -> Option<&str> {
45+
match self {
46+
Self::Normal(s) => Some(s.as_ref()),
47+
Self::WellKnown(n) => match U8_TO_STR.get(n) {
48+
Some(v) => Some(v),
49+
None => None,
50+
},
51+
}
4452
}
4553
}
4654

47-
impl AsRef<str> for MimeType {
48-
fn as_ref(&self) -> &str {
55+
impl Into<String> for MimeType {
56+
fn into(self) -> String {
4957
match self {
50-
Self::Normal(s) => &s,
51-
Self::WellKnown(n) => U8_TO_STR.get(n).unwrap(),
58+
Self::Normal(s) => s,
59+
Self::WellKnown(n) => match U8_TO_STR.get(&n) {
60+
Some(v) => v.to_string(),
61+
None => "UNKNOWN".to_string(),
62+
},
5263
}
5364
}
5465
}

rsocket/src/payload/setup.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,12 +86,12 @@ impl SetupPayloadBuilder {
8686
self
8787
}
8888

89-
pub fn set_data_mime_type(mut self, mime: &str) -> Self {
90-
self.inner.mime_d = Some(Bytes::from(mime.to_owned()));
89+
pub fn set_data_mime_type(mut self, mime: impl Into<String>) -> Self {
90+
self.inner.mime_d = Some(Bytes::from(mime.into()));
9191
self
9292
}
93-
pub fn set_metadata_mime_type(mut self, mime: &str) -> Self {
94-
self.inner.mime_m = Some(Bytes::from(mime.to_owned()));
93+
pub fn set_metadata_mime_type(mut self, mime: impl Into<String>) -> Self {
94+
self.inner.mime_m = Some(Bytes::from(mime.into()));
9595
self
9696
}
9797

rsocket/src/transport/socket.rs

Lines changed: 45 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ impl DuplexSocket {
6969
socket
7070
}
7171

72-
pub(crate) async fn setup(&mut self, setup: SetupPayload) {
72+
pub(crate) async fn setup(&mut self, setup: SetupPayload) -> Result<()> {
7373
let mut bu = frame::Setup::builder(0, 0);
7474
if let Some(s) = setup.data_mime_type() {
7575
bu = bu.set_mime_data(s);
@@ -86,7 +86,7 @@ impl DuplexSocket {
8686
if let Some(b) = m {
8787
bu = bu.set_metadata(b);
8888
}
89-
self.tx.send(bu.build()).expect("Send setup failed");
89+
self.tx.send(bu.build()).map_err(|e| e.into())
9090
}
9191

9292
#[inline]
@@ -128,7 +128,9 @@ impl DuplexSocket {
128128
.set_code(error::ERR_REJECT_SETUP)
129129
.set_data(Bytes::from(errmsg))
130130
.build();
131-
self.tx.send(sending).expect("Reject setup failed");
131+
if let Err(_) = self.tx.send(sending) {
132+
error!("Reject setup failed");
133+
}
132134
return;
133135
}
134136
}
@@ -261,13 +263,28 @@ impl DuplexSocket {
261263
self.joiners.remove(&sid);
262264
// pick handler
263265
if let Some((_, handler)) = self.handlers.remove(&sid) {
264-
let desc = input.get_data_utf8().unwrap().to_owned();
266+
let desc = input
267+
.get_data_utf8()
268+
.map(|it| it.to_string())
269+
.unwrap_or_default();
265270
let e = RSocketError::must_new_from_code(input.get_code(), desc);
266271
match handler {
267-
Handler::ReqRR(tx) => tx.send(Err(e.into())).expect("Send RR failed"),
272+
Handler::ReqRR(tx) => {
273+
if let Err(_) = tx.send(Err(e.into())) {
274+
error!("respond with error for REQUEST_RESPONSE failed!");
275+
}
276+
}
268277
Handler::ResRR(_) => unreachable!(),
269-
Handler::ReqRS(tx) => tx.send(Err(e.into())).await.expect("Send RS failed"),
270-
Handler::ReqRC(tx) => tx.send(Err(e.into())).await.expect("Send RC failed"),
278+
Handler::ReqRS(tx) => {
279+
if let Err(_) = tx.send(Err(e.into())).await {
280+
error!("respond with error for REQUEST_STREAM failed!");
281+
};
282+
}
283+
Handler::ReqRC(tx) => {
284+
if let Err(_) = tx.send(Err(e.into())).await {
285+
error!("respond with error for REQUEST_CHANNEL failed!");
286+
}
287+
}
271288
}
272289
}
273290
}
@@ -281,7 +298,9 @@ impl DuplexSocket {
281298
match handler {
282299
Handler::ReqRR(sender) => {
283300
info!("REQUEST_RESPONSE {} cancelled!", sid);
284-
sender.send(e).unwrap();
301+
if let Err(_) = sender.send(e) {
302+
error!("notify cancel for REQUEST_RESPONSE failed: sid={}", sid);
303+
}
285304
}
286305
Handler::ResRR(c) => {
287306
let lefts = c.count_down();
@@ -305,21 +324,26 @@ impl DuplexSocket {
305324
Handler::ReqRR(_) => match o.remove() {
306325
Handler::ReqRR(sender) => {
307326
if flag & Frame::FLAG_NEXT != 0 {
308-
sender.send(Ok(Some(input))).unwrap();
327+
if let Err(_) = sender.send(Ok(Some(input))) {
328+
error!("response successful payload for REQUEST_RESPONSE failed: sid={}",sid);
329+
}
309330
} else {
310-
sender.send(Ok(None)).unwrap();
331+
if let Err(_) = sender.send(Ok(None)) {
332+
error!("response successful payload for REQUEST_RESPONSE failed: sid={}",sid);
333+
}
311334
}
312335
}
313336
_ => unreachable!(),
314337
},
315338
Handler::ResRR(c) => unreachable!(),
316339
Handler::ReqRS(sender) => {
317340
if flag & Frame::FLAG_NEXT != 0 {
318-
sender
319-
.clone()
320-
.send(Ok(input))
321-
.await
322-
.expect("Send payload response failed.");
341+
if let Err(e) = sender.send(Ok(input)).await {
342+
error!(
343+
"response successful payload for REQUEST_STREAM failed: sid={}",
344+
sid
345+
);
346+
}
323347
}
324348
if flag & Frame::FLAG_COMPLETE != 0 {
325349
o.remove();
@@ -328,11 +352,9 @@ impl DuplexSocket {
328352
Handler::ReqRC(sender) => {
329353
// TODO: support channel
330354
if flag & Frame::FLAG_NEXT != 0 {
331-
sender
332-
.clone()
333-
.send(Ok(input))
334-
.await
335-
.expect("Send payload response failed");
355+
if let Err(_) = sender.clone().send(Ok(input)).await {
356+
error!("response successful payload for REQUEST_CHANNEL failed: sid={}",sid);
357+
}
336358
}
337359
if flag & Frame::FLAG_COMPLETE != 0 {
338360
o.remove();
@@ -396,7 +418,9 @@ impl DuplexSocket {
396418
}
397419

398420
// async remove canceller
399-
canceller.send(sid).await.expect("Send canceller failed");
421+
if let Err(_) = canceller.send(sid).await {
422+
error!("Send canceller failed: sid={}", sid);
423+
}
400424

401425
match result {
402426
Ok(Some(res)) => {

0 commit comments

Comments
 (0)