Skip to content

Commit bacf9a5

Browse files
committed
fix big bugs for Payload
1 parent abe6a32 commit bacf9a5

File tree

3 files changed

+35
-2
lines changed

3 files changed

+35
-2
lines changed

examples/echo.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use clap::{App, Arg, SubCommand};
55
use rsocket_rust::prelude::*;
66
use rsocket_rust_transport_tcp::{TcpClientTransport, TcpServerTransport};
77
use std::error::Error;
8+
use std::fs;
89

910
enum RequestMode {
1011
FNF,
@@ -154,7 +155,13 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
154155
.expect("Connect failed!");
155156
let mut bu = Payload::builder();
156157
if let Some(data) = flags.value_of("input") {
157-
bu = bu.set_data_utf8(data);
158+
if data.starts_with("@") {
159+
let file_content =
160+
fs::read_to_string(&data[1..].to_owned()).expect("Read file failed.");
161+
bu = bu.set_data_utf8(&file_content);
162+
} else {
163+
bu = bu.set_data_utf8(data);
164+
}
158165
}
159166
let req = bu.build();
160167

rsocket/src/payload/normal.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,8 @@ impl From<(&'static str, &'static str)> for Payload {
122122

123123
impl From<frame::Payload> for Payload {
124124
fn from(input: frame::Payload) -> Payload {
125-
Payload::new(input.get_data().clone(), input.get_data().clone())
125+
let (d, m) = input.split();
126+
Payload::new(d, m)
126127
}
127128
}
128129

rsocket/src/transport/socket.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ where
3232
handlers: Arc<Mutex<HashMap<u32, Handler>>>,
3333
canceller: Tx<u32>,
3434
splitter: Option<Splitter>,
35+
joiners: Arc<Mutex<HashMap<u32, Joiner>>>,
3536
}
3637

3738
#[derive(Clone)]
@@ -66,6 +67,7 @@ where
6667
canceller: canceller_tx,
6768
responder: Responder::new(),
6869
handlers: Arc::new(Mutex::new(HashMap::new())),
70+
joiners: Arc::new(Mutex::new(HashMap::new())),
6971
splitter,
7072
};
7173

@@ -188,6 +190,29 @@ where
188190
}
189191
}
190192

193+
async fn join_frame(&self, input: Frame) {
194+
let sid = input.get_stream_id();
195+
let mut joiners = self.joiners.lock().await;
196+
match (*joiners).remove(&sid) {
197+
Some(joiner) => {
198+
// joiner.push(input);
199+
}
200+
None => {}
201+
}
202+
}
203+
204+
async fn try_join(&self, input: Frame) {
205+
let follow = input.get_flag() & frame::FLAG_FOLLOW != 0;
206+
match input.get_frame_type() {
207+
frame::TYPE_REQUEST_RESPONSE => if follow {},
208+
frame::TYPE_REQUEST_STREAM => {}
209+
frame::TYPE_REQUEST_FNF => {}
210+
frame::TYPE_REQUEST_CHANNEL => {}
211+
frame::TYPE_PAYLOAD => {}
212+
_ => {}
213+
}
214+
}
215+
191216
#[inline]
192217
async fn on_error(&self, sid: u32, flag: u16, input: frame::Error) {
193218
// pick handler

0 commit comments

Comments
 (0)