diff --git a/src/utils/html.rs b/src/utils/html.rs index bae9d19c2..2b55b977a 100644 --- a/src/utils/html.rs +++ b/src/utils/html.rs @@ -1,5 +1,6 @@ use crate::{ InstanceMetrics, + utils::report_error, web::{ page::{ TemplateData, @@ -8,6 +9,7 @@ use crate::{ rustdoc::RustdocPage, }, }; +use anyhow::{Context as _, anyhow}; use askama::Template; use async_stream::stream; use axum::body::Bytes; @@ -18,6 +20,8 @@ use tokio::{io::AsyncRead, task::JoinHandle}; use tokio_util::io::ReaderStream; use tracing::error; +const CHANNEL_SIZE: usize = 64; + #[derive(thiserror::Error, Debug)] pub(crate) enum RustdocRewritingError { #[error("HTML rewriter error: {0}")] @@ -38,13 +42,15 @@ pub(crate) fn rewrite_rustdoc_html_stream( max_allowed_memory_usage: usize, data: Arc, metrics: Arc, -) -> impl Stream> +) -> impl Stream> + Send + 'static where - R: AsyncRead + Unpin + 'static, + R: AsyncRead + Unpin + Send + 'static, { stream!({ - let (input_sender, input_receiver) = std::sync::mpsc::channel::>>(); - let (result_sender, mut result_receiver) = tokio::sync::mpsc::unbounded_channel::(); + let (input_sender, mut input_receiver) = + tokio::sync::mpsc::channel::>(CHANNEL_SIZE); + let (result_sender, mut result_receiver) = + tokio::sync::mpsc::channel::(CHANNEL_SIZE); let join_handle: JoinHandle> = tokio::spawn(async move { // we're using the rendering threadpool to limit CPU usage on the server, and to @@ -135,9 +141,12 @@ where // send the result back to the main rewriter when its coming in. // this can fail only when the receiver is dropped, in which case // we exit this thread anyways. - let _ = result_sender.send(Bytes::from(chunk.to_vec())); + let _ = result_sender.blocking_send(Bytes::copy_from_slice(chunk)); }); - while let Some(chunk) = input_receiver.recv()? { + while let Some(chunk) = input_receiver + .blocking_recv() + .ok_or_else(|| anyhow!("couldn't receive from input_receiver"))? + { // receive data from the input receiver. // `input_receiver` is a non-async one. // Since we're in a normal background thread, we can use the blocking `.recv` @@ -157,17 +166,20 @@ where let mut reader_stream = ReaderStream::new(&mut reader); while let Some(chunk) = reader_stream.next().await { - let chunk = chunk.map_err(|err| { - error!(?err, "error while reading from rustdoc HTML reader"); - RustdocRewritingError::Other(err.into()) - })?; + let chunk = chunk + .context("error while reading from rustdoc HTML reader") + .map_err(|err| { + report_error(&err); + RustdocRewritingError::Other(err) + })?; - if let Err(err) = input_sender.send(Some(chunk.to_vec())) { - error!( - ?err, - "error when trying to send chunk to html rewriter thread" - ); - yield Err(RustdocRewritingError::Other(err.into())); + if let Err(err) = input_sender + .send(Some(chunk)) + .await + .context("error when trying to send chunk to html rewriter thread") + { + report_error(&err); + yield Err(RustdocRewritingError::Other(err)); break; } @@ -176,37 +188,38 @@ where } } // This signals the renderer thread to finalize & exit. - if let Err(err) = input_sender.send(None) { - error!( - ?err, - "error when trying to send end signal to html rewriter thread" - ); - yield Err(RustdocRewritingError::Other(err.into())); + if let Err(err) = input_sender + .send(None) + .await + .context("error when trying to send end signal to html rewriter thread") + { + report_error(&err); + yield Err(RustdocRewritingError::Other(err)); } while let Some(bytes) = result_receiver.recv().await { yield Ok(bytes); } - join_handle.await.expect("Task panicked").map_err(|e| { - error!( - ?e, - memory_limit = max_allowed_memory_usage, - "error while rewriting rustdoc HTML" - ); - // our `render_in_threadpool` and so the async tokio task return an `anyhow::Result`. - // In most cases this will be an error from the `HtmlRewriter`, which we'll get as a - // `RewritingError` which we extract here again. The other cases remain an - // `anyhow::Error`. - match e.downcast::() { - Ok(e) => { - if matches!(e, RewritingError::MemoryLimitExceeded(_)) { - metrics.html_rewrite_ooms.inc(); + join_handle + .await + .context("task join failed")? + .context("error while rewriting rustdoc HTML") + .map_err(|e| { + report_error(&e); + // our `render_in_threadpool` and so the async tokio task return an `anyhow::Result`. + // In most cases this will be an error from the `HtmlRewriter`, which we'll get as a + // `RewritingError` which we extract here again. The other cases remain an + // `anyhow::Error`. + match e.downcast::() { + Ok(e) => { + if matches!(e, RewritingError::MemoryLimitExceeded(_)) { + metrics.html_rewrite_ooms.inc(); + } + RustdocRewritingError::RewritingError(e) } - RustdocRewritingError::RewritingError(e) + Err(e) => RustdocRewritingError::Other(e), } - Err(e) => RustdocRewritingError::Other(e), - } - })?; + })?; }) }