11use crate :: {
22 InstanceMetrics ,
3+ utils:: report_error,
34 web:: {
45 page:: {
56 TemplateData ,
@@ -8,6 +9,7 @@ use crate::{
89 rustdoc:: RustdocPage ,
910 } ,
1011} ;
12+ use anyhow:: { Context as _, anyhow} ;
1113use askama:: Template ;
1214use async_stream:: stream;
1315use axum:: body:: Bytes ;
@@ -18,6 +20,8 @@ use tokio::{io::AsyncRead, task::JoinHandle};
1820use tokio_util:: io:: ReaderStream ;
1921use tracing:: error;
2022
23+ const CHANNEL_SIZE : usize = 64 ;
24+
2125#[ derive( thiserror:: Error , Debug ) ]
2226pub ( crate ) enum RustdocRewritingError {
2327 #[ error( "HTML rewriter error: {0}" ) ]
@@ -38,13 +42,15 @@ pub(crate) fn rewrite_rustdoc_html_stream<R>(
3842 max_allowed_memory_usage : usize ,
3943 data : Arc < RustdocPage > ,
4044 metrics : Arc < InstanceMetrics > ,
41- ) -> impl Stream < Item = Result < Bytes , RustdocRewritingError > >
45+ ) -> impl Stream < Item = Result < Bytes , RustdocRewritingError > > + Send + ' static
4246where
43- R : AsyncRead + Unpin + ' static ,
47+ R : AsyncRead + Unpin + Send + ' static ,
4448{
4549 stream ! ( {
46- let ( input_sender, input_receiver) = std:: sync:: mpsc:: channel:: <Option <Vec <u8 >>>( ) ;
47- let ( result_sender, mut result_receiver) = tokio:: sync:: mpsc:: unbounded_channel:: <Bytes >( ) ;
50+ let ( input_sender, mut input_receiver) =
51+ tokio:: sync:: mpsc:: channel:: <Option <Bytes >>( CHANNEL_SIZE ) ;
52+ let ( result_sender, mut result_receiver) =
53+ tokio:: sync:: mpsc:: channel:: <Bytes >( CHANNEL_SIZE ) ;
4854
4955 let join_handle: JoinHandle <anyhow:: Result <_>> = tokio:: spawn( async move {
5056 // we're using the rendering threadpool to limit CPU usage on the server, and to
@@ -135,9 +141,12 @@ where
135141 // send the result back to the main rewriter when its coming in.
136142 // this can fail only when the receiver is dropped, in which case
137143 // we exit this thread anyways.
138- let _ = result_sender. send ( Bytes :: from ( chunk. to_vec ( ) ) ) ;
144+ let _ = result_sender. blocking_send ( Bytes :: copy_from_slice ( chunk) ) ;
139145 } ) ;
140- while let Some ( chunk) = input_receiver. recv( ) ? {
146+ while let Some ( chunk) = input_receiver
147+ . blocking_recv( )
148+ . ok_or_else( || anyhow!( "couldn't receive from input_receiver" ) ) ?
149+ {
141150 // receive data from the input receiver.
142151 // `input_receiver` is a non-async one.
143152 // Since we're in a normal background thread, we can use the blocking `.recv`
@@ -157,17 +166,20 @@ where
157166
158167 let mut reader_stream = ReaderStream :: new( & mut reader) ;
159168 while let Some ( chunk) = reader_stream. next( ) . await {
160- let chunk = chunk. map_err( |err| {
161- error!( ?err, "error while reading from rustdoc HTML reader" ) ;
162- RustdocRewritingError :: Other ( err. into( ) )
163- } ) ?;
169+ let chunk = chunk
170+ . context( "error while reading from rustdoc HTML reader" )
171+ . map_err( |err| {
172+ report_error( & err) ;
173+ RustdocRewritingError :: Other ( err)
174+ } ) ?;
164175
165- if let Err ( err) = input_sender. send( Some ( chunk. to_vec( ) ) ) {
166- error!(
167- ?err,
168- "error when trying to send chunk to html rewriter thread"
169- ) ;
170- yield Err ( RustdocRewritingError :: Other ( err. into( ) ) ) ;
176+ if let Err ( err) = input_sender
177+ . send( Some ( chunk) )
178+ . await
179+ . context( "error when trying to send chunk to html rewriter thread" )
180+ {
181+ report_error( & err) ;
182+ yield Err ( RustdocRewritingError :: Other ( err) ) ;
171183 break ;
172184 }
173185
@@ -176,37 +188,38 @@ where
176188 }
177189 }
178190 // This signals the renderer thread to finalize & exit.
179- if let Err ( err) = input_sender. send( None ) {
180- error!(
181- ?err,
182- "error when trying to send end signal to html rewriter thread"
183- ) ;
184- yield Err ( RustdocRewritingError :: Other ( err. into( ) ) ) ;
191+ if let Err ( err) = input_sender
192+ . send( None )
193+ . await
194+ . context( "error when trying to send end signal to html rewriter thread" )
195+ {
196+ report_error( & err) ;
197+ yield Err ( RustdocRewritingError :: Other ( err) ) ;
185198 }
186199 while let Some ( bytes) = result_receiver. recv( ) . await {
187200 yield Ok ( bytes) ;
188201 }
189202
190- join_handle. await . expect( "Task panicked" ) . map_err( |e| {
191- error!(
192- ?e,
193- memory_limit = max_allowed_memory_usage,
194- "error while rewriting rustdoc HTML"
195- ) ;
196- // our `render_in_threadpool` and so the async tokio task return an `anyhow::Result`.
197- // In most cases this will be an error from the `HtmlRewriter`, which we'll get as a
198- // `RewritingError` which we extract here again. The other cases remain an
199- // `anyhow::Error`.
200- match e. downcast:: <RewritingError >( ) {
201- Ok ( e) => {
202- if matches!( e, RewritingError :: MemoryLimitExceeded ( _) ) {
203- metrics. html_rewrite_ooms. inc( ) ;
203+ join_handle
204+ . await
205+ . context( "task join failed" ) ?
206+ . context( "error while rewriting rustdoc HTML" )
207+ . map_err( |e| {
208+ report_error( & e) ;
209+ // our `render_in_threadpool` and so the async tokio task return an `anyhow::Result`.
210+ // In most cases this will be an error from the `HtmlRewriter`, which we'll get as a
211+ // `RewritingError` which we extract here again. The other cases remain an
212+ // `anyhow::Error`.
213+ match e. downcast:: <RewritingError >( ) {
214+ Ok ( e) => {
215+ if matches!( e, RewritingError :: MemoryLimitExceeded ( _) ) {
216+ metrics. html_rewrite_ooms. inc( ) ;
217+ }
218+ RustdocRewritingError :: RewritingError ( e)
204219 }
205- RustdocRewritingError :: RewritingError ( e)
220+ Err ( e ) => RustdocRewritingError :: Other ( e) ,
206221 }
207- Err ( e) => RustdocRewritingError :: Other ( e) ,
208- }
209- } ) ?;
222+ } ) ?;
210223 } )
211224}
212225
0 commit comments