Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 53 additions & 40 deletions src/utils/html.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::{
InstanceMetrics,
utils::report_error,
web::{
page::{
TemplateData,
Expand All @@ -8,6 +9,7 @@ use crate::{
rustdoc::RustdocPage,
},
};
use anyhow::{Context as _, anyhow};
use askama::Template;
use async_stream::stream;
use axum::body::Bytes;
Expand All @@ -18,6 +20,8 @@ use tokio::{io::AsyncRead, task::JoinHandle};
use tokio_util::io::ReaderStream;
use tracing::error;

const CHANNEL_SIZE: usize = 64;

#[derive(thiserror::Error, Debug)]
pub(crate) enum RustdocRewritingError {
#[error("HTML rewriter error: {0}")]
Expand All @@ -38,13 +42,15 @@ pub(crate) fn rewrite_rustdoc_html_stream<R>(
max_allowed_memory_usage: usize,
data: Arc<RustdocPage>,
metrics: Arc<InstanceMetrics>,
) -> impl Stream<Item = Result<Bytes, RustdocRewritingError>>
) -> impl Stream<Item = Result<Bytes, RustdocRewritingError>> + Send + 'static
where
R: AsyncRead + Unpin + 'static,
R: AsyncRead + Unpin + Send + 'static,
{
stream!({
let (input_sender, input_receiver) = std::sync::mpsc::channel::<Option<Vec<u8>>>();
let (result_sender, mut result_receiver) = tokio::sync::mpsc::unbounded_channel::<Bytes>();
let (input_sender, mut input_receiver) =
tokio::sync::mpsc::channel::<Option<Bytes>>(CHANNEL_SIZE);
let (result_sender, mut result_receiver) =
tokio::sync::mpsc::channel::<Bytes>(CHANNEL_SIZE);

let join_handle: JoinHandle<anyhow::Result<_>> = tokio::spawn(async move {
// we're using the rendering threadpool to limit CPU usage on the server, and to
Expand Down Expand Up @@ -135,9 +141,12 @@ where
// send the result back to the main rewriter when its coming in.
// this can fail only when the receiver is dropped, in which case
// we exit this thread anyways.
let _ = result_sender.send(Bytes::from(chunk.to_vec()));
let _ = result_sender.blocking_send(Bytes::copy_from_slice(chunk));
});
while let Some(chunk) = input_receiver.recv()? {
while let Some(chunk) = input_receiver
.blocking_recv()
.ok_or_else(|| anyhow!("couldn't receive from input_receiver"))?
{
// receive data from the input receiver.
// `input_receiver` is a non-async one.
// Since we're in a normal background thread, we can use the blocking `.recv`
Expand All @@ -157,17 +166,20 @@ where

let mut reader_stream = ReaderStream::new(&mut reader);
while let Some(chunk) = reader_stream.next().await {
let chunk = chunk.map_err(|err| {
error!(?err, "error while reading from rustdoc HTML reader");
RustdocRewritingError::Other(err.into())
})?;
let chunk = chunk
.context("error while reading from rustdoc HTML reader")
.map_err(|err| {
report_error(&err);
RustdocRewritingError::Other(err)
})?;

if let Err(err) = input_sender.send(Some(chunk.to_vec())) {
error!(
?err,
"error when trying to send chunk to html rewriter thread"
);
yield Err(RustdocRewritingError::Other(err.into()));
if let Err(err) = input_sender
.send(Some(chunk))
.await
.context("error when trying to send chunk to html rewriter thread")
{
report_error(&err);
yield Err(RustdocRewritingError::Other(err));
break;
}

Expand All @@ -176,37 +188,38 @@ where
}
}
// This signals the renderer thread to finalize & exit.
if let Err(err) = input_sender.send(None) {
error!(
?err,
"error when trying to send end signal to html rewriter thread"
);
yield Err(RustdocRewritingError::Other(err.into()));
if let Err(err) = input_sender
.send(None)
.await
.context("error when trying to send end signal to html rewriter thread")
{
report_error(&err);
yield Err(RustdocRewritingError::Other(err));
}
while let Some(bytes) = result_receiver.recv().await {
yield Ok(bytes);
}

join_handle.await.expect("Task panicked").map_err(|e| {
error!(
?e,
memory_limit = max_allowed_memory_usage,
"error while rewriting rustdoc HTML"
);
// our `render_in_threadpool` and so the async tokio task return an `anyhow::Result`.
// In most cases this will be an error from the `HtmlRewriter`, which we'll get as a
// `RewritingError` which we extract here again. The other cases remain an
// `anyhow::Error`.
match e.downcast::<RewritingError>() {
Ok(e) => {
if matches!(e, RewritingError::MemoryLimitExceeded(_)) {
metrics.html_rewrite_ooms.inc();
join_handle
.await
.context("task join failed")?
.context("error while rewriting rustdoc HTML")
.map_err(|e| {
report_error(&e);
// our `render_in_threadpool` and so the async tokio task return an `anyhow::Result`.
// In most cases this will be an error from the `HtmlRewriter`, which we'll get as a
// `RewritingError` which we extract here again. The other cases remain an
// `anyhow::Error`.
match e.downcast::<RewritingError>() {
Ok(e) => {
if matches!(e, RewritingError::MemoryLimitExceeded(_)) {
metrics.html_rewrite_ooms.inc();
}
RustdocRewritingError::RewritingError(e)
}
RustdocRewritingError::RewritingError(e)
Err(e) => RustdocRewritingError::Other(e),
}
Err(e) => RustdocRewritingError::Other(e),
}
})?;
})?;
})
}

Expand Down
Loading