Skip to content

Commit e01d3f2

Browse files
authored
Turbopack: perf: Avoid clones in RopeReader (#86708)
We (unsurprisingly) spend a lot of time encoding `Rope`s. Most of this is in `memcpy`ing the data out of the rope (not easy to optimize), but there's some cloning of refcounted `Bytes` containers here that can be easily avoided. Yes, `Bytes`​ is refcounted, but updating refcounts isn't free, and it matters because this is a hot codepath. This is too small to show up on any top-line measurement (<1%) of total time, but should still be an improvement.
1 parent 4c99cf3 commit e01d3f2

File tree

6 files changed

+86
-75
lines changed

6 files changed

+86
-75
lines changed

Cargo.lock

Lines changed: 1 addition & 14 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -373,10 +373,6 @@ allsorts = { version = "0.14.0", default-features = false, features = [
373373
"flate2_rust",
374374
] }
375375
anyhow = "1.0.100"
376-
async-compression = { version = "0.3.13", default-features = false, features = [
377-
"gzip",
378-
"tokio",
379-
] }
380376
async-trait = "0.1.64"
381377
bitfield = "0.18.0"
382378
byteorder = "1.5.0"

turbopack/crates/turbo-tasks-fs/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1912,7 +1912,7 @@ impl File {
19121912
}
19131913

19141914
/// Returns a Read/AsyncRead/Stream/Iterator to access the File's contents.
1915-
pub fn read(&self) -> RopeReader {
1915+
pub fn read(&self) -> RopeReader<'_> {
19161916
self.content.read()
19171917
}
19181918
}

turbopack/crates/turbo-tasks-fs/src/rope.rs

Lines changed: 60 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use std::{
1111

1212
use RopeElem::{Local, Shared};
1313
use anyhow::{Context, Result};
14-
use bytes::{Buf, Bytes};
14+
use bytes::Bytes;
1515
use futures::Stream;
1616
use serde::{Deserialize, Deserializer, Serialize, Serializer};
1717
use serde_bytes::ByteBuf;
@@ -103,7 +103,7 @@ impl Rope {
103103
}
104104

105105
/// Returns a [Read]/[AsyncRead]/[Iterator] instance over all bytes.
106-
pub fn read(&self) -> RopeReader {
106+
pub fn read(&self) -> RopeReader<'_> {
107107
RopeReader::new(&self.data, 0)
108108
}
109109

@@ -688,28 +688,30 @@ impl DeterministicHash for RopeElem {
688688

689689
#[derive(Debug, Default)]
690690
/// Implements the [Read]/[AsyncRead]/[Iterator] trait over a [Rope].
691-
pub struct RopeReader {
692-
/// The Rope's tree is kept as a cloned stack, allowing us to accomplish
693-
/// incremental yielding.
694-
stack: Vec<StackElem>,
691+
pub struct RopeReader<'a> {
692+
/// The Rope's tree is kept as a stack, allowing us to accomplish incremental yielding.
693+
stack: Vec<StackElem<'a>>,
694+
/// An offset in the current buffer, used by the `read` implementation.
695+
offset: usize,
695696
}
696697

697698
/// A StackElem holds the current index into either a Bytes or a shared Rope.
698699
/// When the index reaches the end of the associated data, it is removed and we
699700
/// continue onto the next item in the stack.
700701
#[derive(Debug)]
701-
enum StackElem {
702-
Local(Bytes),
703-
Shared(InnerRope, usize),
702+
enum StackElem<'a> {
703+
Local(&'a Bytes),
704+
Shared(&'a InnerRope, usize),
704705
}
705706

706-
impl RopeReader {
707-
fn new(inner: &InnerRope, index: usize) -> Self {
707+
impl<'a> RopeReader<'a> {
708+
fn new(inner: &'a InnerRope, index: usize) -> Self {
708709
if index >= inner.len() {
709710
Default::default()
710711
} else {
711712
RopeReader {
712-
stack: vec![StackElem::Shared(inner.clone(), index)],
713+
stack: vec![StackElem::Shared(inner, index)],
714+
offset: 0,
713715
}
714716
}
715717
}
@@ -720,30 +722,30 @@ impl RopeReader {
720722
let mut remaining = want;
721723

722724
while remaining > 0 {
723-
let mut bytes = match self.next() {
725+
let bytes = match self.next_internal() {
724726
None => break,
725727
Some(b) => b,
726728
};
727729

728-
let amount = min(bytes.len(), remaining);
730+
let lower = self.offset;
731+
let upper = min(bytes.len(), lower + remaining);
729732

730-
buf.put_slice(&bytes[0..amount]);
733+
buf.put_slice(&bytes[self.offset..upper]);
731734

732-
if amount < bytes.len() {
733-
bytes.advance(amount);
735+
if upper < bytes.len() {
736+
self.offset = upper;
734737
self.stack.push(StackElem::Local(bytes))
738+
} else {
739+
self.offset = 0;
735740
}
736-
remaining -= amount;
741+
remaining -= upper - lower;
737742
}
738743

739744
want - remaining
740745
}
741-
}
742746

743-
impl Iterator for RopeReader {
744-
type Item = Bytes;
745-
746-
fn next(&mut self) -> Option<Self::Item> {
747+
/// Returns the next item in the iterator without modifying `self.offset`.
748+
fn next_internal(&mut self) -> Option<&'a Bytes> {
747749
// Iterates the rope's elements recursively until we find the next Local
748750
// section, returning its Bytes.
749751
loop {
@@ -756,7 +758,7 @@ impl Iterator for RopeReader {
756758
Some(StackElem::Shared(r, i)) => (r, i),
757759
};
758760

759-
let el = inner[index].clone();
761+
let el = &inner[index];
760762
index += 1;
761763
if index < inner.len() {
762764
self.stack.push(StackElem::Shared(inner, index));
@@ -767,13 +769,22 @@ impl Iterator for RopeReader {
767769
}
768770
}
769771

770-
impl Read for RopeReader {
772+
impl<'a> Iterator for RopeReader<'a> {
773+
type Item = &'a Bytes;
774+
775+
fn next(&mut self) -> Option<Self::Item> {
776+
self.offset = 0;
777+
self.next_internal()
778+
}
779+
}
780+
781+
impl Read for RopeReader<'_> {
771782
fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
772783
Ok(self.read_internal(buf.len(), &mut ReadBuf::new(buf)))
773784
}
774785
}
775786

776-
impl AsyncRead for RopeReader {
787+
impl AsyncRead for RopeReader<'_> {
777788
fn poll_read(
778789
self: Pin<&mut Self>,
779790
_cx: &mut TaskContext<'_>,
@@ -785,12 +796,12 @@ impl AsyncRead for RopeReader {
785796
}
786797
}
787798

788-
impl BufRead for RopeReader {
799+
impl BufRead for RopeReader<'_> {
789800
/// Never returns an error.
790801
fn fill_buf(&mut self) -> IoResult<&[u8]> {
791802
// Returns the full buffer without coping any data. The same bytes will
792803
// continue to be returned until [consume] is called.
793-
let bytes = match self.next() {
804+
let bytes = match self.next_internal() {
794805
None => return Ok(EMPTY_BUF),
795806
Some(b) => b,
796807
};
@@ -803,37 +814,44 @@ impl BufRead for RopeReader {
803814
unreachable!()
804815
};
805816

806-
Ok(bytes)
817+
Ok(&bytes[self.offset..])
807818
}
808819

809820
fn consume(&mut self, amt: usize) {
810821
if let Some(StackElem::Local(b)) = self.stack.last_mut() {
811-
if amt == b.len() {
822+
// https://doc.rust-lang.org/std/io/trait.BufRead.html#tymethod.consume
823+
debug_assert!(
824+
self.offset + amt <= b.len(),
825+
"It is a logic error if `amount` exceeds the number of unread bytes in the \
826+
internal buffer, which is returned by `fill_buf`."
827+
);
828+
// Consume some amount of bytes from the current Bytes instance, ensuring those bytes
829+
// are not returned on the next call to `fill_buf`.
830+
self.offset += amt;
831+
if self.offset == b.len() {
832+
// whole Bytes instance was consumed
812833
self.stack.pop();
813-
} else {
814-
// Consume some amount of bytes from the current Bytes instance, ensuring
815-
// those bytes are not returned on the next call to [fill_buf].
816-
b.advance(amt);
834+
self.offset = 0;
817835
}
818836
}
819837
}
820838
}
821839

822-
impl Stream for RopeReader {
823-
// The Result<Bytes> item type is required for this to be streamable into a
824-
// [Hyper::Body].
825-
type Item = Result<Bytes>;
840+
impl<'a> Stream for RopeReader<'a> {
841+
/// This is efficiently streamable into a [`Hyper::Body`] if each item is cloned into an owned
842+
/// `Bytes` instance.
843+
type Item = Result<&'a Bytes>;
826844

827-
// Returns a "result" of reading the next shared bytes reference. This
828-
// differs from [Read::read] by not copying any memory.
845+
/// Returns a "result" of reading the next shared bytes reference. This
846+
/// differs from [`Read::read`] by not copying any memory.
829847
fn poll_next(self: Pin<&mut Self>, _cx: &mut TaskContext<'_>) -> Poll<Option<Self::Item>> {
830848
let this = self.get_mut();
831849
Poll::Ready(this.next().map(Ok))
832850
}
833851
}
834852

835-
impl From<RopeElem> for StackElem {
836-
fn from(el: RopeElem) -> Self {
853+
impl<'a> From<&'a RopeElem> for StackElem<'a> {
854+
fn from(el: &'a RopeElem) -> Self {
837855
match el {
838856
Local(bytes) => Self::Local(bytes),
839857
Shared(inner) => Self::Shared(inner, 0),

turbopack/crates/turbopack-dev-server/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ workspace = true
1717

1818
[dependencies]
1919
anyhow = { workspace = true }
20-
async-compression = { workspace = true }
2120
auto-hash-map = { workspace = true }
21+
flate2 = { workspace = true }
2222
futures = { workspace = true }
2323
hyper = { version = "0.14", features = ["full"] }
2424
hyper-tungstenite = "0.9.0"

turbopack/crates/turbopack-dev-server/src/http.rs

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
1+
use std::{io::Read, iter};
2+
13
use anyhow::{Result, anyhow};
24
use auto_hash_map::AutoSet;
3-
use futures::{StreamExt, TryStreamExt};
5+
use flate2::{Compression, bufread::GzEncoder};
6+
use futures::{StreamExt, TryStreamExt, stream};
47
use hyper::{
58
Request, Response,
69
header::{CONTENT_ENCODING, CONTENT_LENGTH, HeaderName},
710
http::HeaderValue,
811
};
912
use mime::Mime;
10-
use tokio_util::io::{ReaderStream, StreamReader};
1113
use turbo_tasks::{
1214
CollectiblesSource, OperationVc, ReadRef, ResolvedVc, TransientInstance, Vc, apply_effects,
1315
util::SharedError,
@@ -175,22 +177,30 @@ pub async fn process_request_with_content_source(
175177
let response = if should_compress {
176178
header_map.insert(CONTENT_ENCODING, HeaderValue::from_static("gzip"));
177179

178-
// Grab ropereader stream, coerce anyhow::Error to std::io::Error
179-
let stream_ext = content.read().into_stream().map_err(std::io::Error::other);
180-
181-
let gzipped_stream =
182-
ReaderStream::new(async_compression::tokio::bufread::GzipEncoder::new(
183-
StreamReader::new(stream_ext),
184-
));
185-
186-
response.body(hyper::Body::wrap_stream(gzipped_stream))?
180+
// Hyper requires an owned reader... We could do this with streaming by cloning
181+
// each `Bytes` and implementing `BufRead` for `Iterator<bytes::Bytes>`, but
182+
// it's not really worth it, just compressing the whole thing up-front is fine.
183+
//
184+
// Use fast compression, since we're likely just tranferring data over
185+
// localhost.
186+
let mut gz_bytes = Vec::new();
187+
GzEncoder::new(content.read(), Compression::fast())
188+
.read_to_end(&mut gz_bytes)
189+
.expect("read of Rope should never fail");
190+
response.body(hyper::Body::wrap_stream(stream::iter(iter::once(
191+
hyper::Result::Ok(gz_bytes),
192+
))))?
187193
} else {
194+
// hyper requires an owned stream, so we must clone the iterator items
195+
// this is relatively cheap: each chunk is a `Bytes`, so `Clone` updates a
196+
// refcount
197+
let owned_chunks: Vec<_> =
198+
content.read().cloned().map(hyper::Result::Ok).collect();
188199
header_map.insert(
189200
CONTENT_LENGTH,
190201
hyper::header::HeaderValue::try_from(content.len().to_string())?,
191202
);
192-
193-
response.body(hyper::Body::wrap_stream(content.read()))?
203+
response.body(hyper::Body::wrap_stream(stream::iter(owned_chunks)))?
194204
};
195205

196206
return Ok((response, side_effects));

0 commit comments

Comments
 (0)