Skip to content

Commit 330e75f

Browse files
committed
support fragmentation: concat following frames.
1 parent a8943a3 commit 330e75f

File tree

6 files changed

+226
-136
lines changed

6 files changed

+226
-136
lines changed

rsocket/src/frame/mod.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,9 @@ pub enum Body {
8484

8585
#[derive(Debug, PartialEq)]
8686
pub struct Frame {
87-
stream_id: u32,
88-
body: Body,
89-
flag: u16,
87+
pub(crate) stream_id: u32,
88+
pub(crate) body: Body,
89+
pub(crate) flag: u16,
9090
}
9191

9292
impl Writeable for Frame {
@@ -167,6 +167,17 @@ impl Frame {
167167
body.map(|it| Frame::new(sid, it, flag))
168168
}
169169

170+
pub fn is_followable(&self) -> bool {
171+
match &self.body {
172+
Body::RequestFNF(_) => true,
173+
Body::RequestResponse(_) => true,
174+
Body::RequestStream(_) => true,
175+
Body::RequestChannel(_) => true,
176+
Body::Payload(_) => true,
177+
_ => false,
178+
}
179+
}
180+
170181
pub fn get_body(self) -> Body {
171182
self.body
172183
}

rsocket/src/transport/fragmentation.rs

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ use crate::payload::Payload;
33
use bytes::{Buf, BufMut, Bytes, BytesMut};
44
use std::collections::LinkedList;
55

6+
pub(crate) const MIN_MTU: usize = 64;
7+
68
pub(crate) struct Joiner {
79
inner: LinkedList<Frame>,
810
}
@@ -79,7 +81,7 @@ impl Into<Payload> for Joiner {
7981
let mut bf = BytesMut::new();
8082
let mut bf2 = BytesMut::new();
8183
self.inner.into_iter().for_each(|it: Frame| {
82-
let (d, m) = match it.get_body() {
84+
let (d, m) = match it.body {
8385
Body::RequestResponse(body) => body.split(),
8486
Body::RequestStream(body) => body.split(),
8587
Body::RequestChannel(body) => body.split(),
@@ -110,10 +112,10 @@ impl Into<Payload> for Joiner {
110112
}
111113

112114
impl Joiner {
113-
pub(crate) fn new(first: Frame) -> Joiner {
114-
let mut inner = LinkedList::new();
115-
inner.push_back(first);
116-
Joiner { inner }
115+
pub(crate) fn new() -> Joiner {
116+
Joiner {
117+
inner: LinkedList::new(),
118+
}
117119
}
118120

119121
pub(crate) fn get_stream_id(&self) -> u32 {
@@ -129,13 +131,11 @@ impl Joiner {
129131
}
130132

131133
pub(crate) fn first(&self) -> &Frame {
132-
self.inner.front().unwrap()
134+
self.inner.front().expect("No frames pushed!")
133135
}
134136

135-
pub(crate) fn push(&mut self, next: Frame) -> bool {
136-
let has_follow = (next.get_flag() & frame::FLAG_FOLLOW) != 0;
137+
pub(crate) fn push(&mut self, next: Frame) {
137138
self.inner.push_back(next);
138-
!has_follow
139139
}
140140
}
141141

@@ -153,7 +153,8 @@ mod tests {
153153
.set_data(Bytes::from("(ROOT)"))
154154
.set_metadata(Bytes::from("(ROOT)"))
155155
.build();
156-
let mut joiner = Joiner::new(first);
156+
let mut joiner = Joiner::new();
157+
joiner.push(first);
157158

158159
for i in 0..10 {
159160
let flag = if i == 9 { 0u16 } else { frame::FLAG_FOLLOW };

rsocket/src/transport/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,6 @@ mod misc;
33
mod socket;
44
mod spi;
55

6-
pub(crate) use fragmentation::{Joiner, Splitter};
6+
pub(crate) use fragmentation::{Joiner, Splitter, MIN_MTU};
77
pub(crate) use socket::DuplexSocket;
88
pub use spi::*;

0 commit comments

Comments
 (0)