Skip to content

Commit 75de0e0

Browse files
committed
Avoid reading the whole asset into memory for asset processing.
1 parent c7f607a commit 75de0e0

File tree

3 files changed

+65
-36
lines changed

3 files changed

+65
-36
lines changed

crates/bevy_asset/src/meta.rs

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,13 @@ use alloc::{
33
string::{String, ToString},
44
vec::Vec,
55
};
6+
use futures_lite::AsyncReadExt;
67

78
use crate::{
8-
loader::AssetLoader, processor::Process, Asset, AssetPath, DeserializeMetaError,
9-
VisitAssetDependencies,
9+
io::{AssetReaderError, Reader},
10+
loader::AssetLoader,
11+
processor::Process,
12+
Asset, AssetPath, DeserializeMetaError, VisitAssetDependencies,
1013
};
1114
use downcast_rs::{impl_downcast, Downcast};
1215
use ron::ser::PrettyConfig;
@@ -204,7 +207,7 @@ impl AssetLoader for () {
204207
type Error = std::io::Error;
205208
async fn load(
206209
&self,
207-
_reader: &mut dyn crate::io::Reader,
210+
_reader: &mut dyn Reader,
208211
_settings: &Self::Settings,
209212
_load_context: &mut crate::LoadContext<'_>,
210213
) -> Result<Self::Asset, Self::Error> {
@@ -241,11 +244,22 @@ pub(crate) fn loader_settings_meta_transform<S: Settings>(
241244
pub type AssetHash = [u8; 32];
242245

243246
/// NOTE: changing the hashing logic here is a _breaking change_ that requires a [`META_FORMAT_VERSION`] bump.
244-
pub(crate) fn get_asset_hash(meta_bytes: &[u8], asset_bytes: &[u8]) -> AssetHash {
247+
pub(crate) async fn get_asset_hash(
248+
meta_bytes: &[u8],
249+
asset_reader: &mut impl Reader,
250+
) -> Result<AssetHash, AssetReaderError> {
245251
let mut hasher = blake3::Hasher::new();
246252
hasher.update(meta_bytes);
247-
hasher.update(asset_bytes);
248-
*hasher.finalize().as_bytes()
253+
let mut buffer = [0; blake3::CHUNK_LEN];
254+
loop {
255+
let bytes_read = asset_reader.read(&mut buffer).await?;
256+
if bytes_read == 0 {
257+
// This means we've reached EOF, so we're done consume asset bytes.
258+
break;
259+
}
260+
hasher.update(&buffer[..bytes_read]);
261+
}
262+
Ok(*hasher.finalize().as_bytes())
249263
}
250264

251265
/// NOTE: changing the hashing logic here is a _breaking change_ that requires a [`META_FORMAT_VERSION`] bump.

crates/bevy_asset/src/processor/mod.rs

Lines changed: 31 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ use bevy_platform::{
6464
};
6565
use bevy_tasks::IoTaskPool;
6666
use futures_io::ErrorKind;
67-
use futures_lite::{AsyncReadExt, AsyncWriteExt, StreamExt};
67+
use futures_lite::{AsyncWriteExt, StreamExt};
6868
use futures_util::{select_biased, FutureExt};
6969
use std::{
7070
path::{Path, PathBuf},
@@ -966,9 +966,6 @@ impl AssetProcessor {
966966
err,
967967
};
968968

969-
// Note: we get the asset source reader first because we don't want to create meta files for assets that don't have source files
970-
let mut byte_reader = reader.read(path).await.map_err(reader_err)?;
971-
972969
let (mut source_meta, meta_bytes, processor) = match reader.read_meta_bytes(path).await {
973970
Ok(meta_bytes) => {
974971
let minimal: AssetMetaMinimal = ron::de::from_bytes(&meta_bytes).map_err(|e| {
@@ -1023,19 +1020,14 @@ impl AssetProcessor {
10231020

10241021
let processed_writer = source.processed_writer()?;
10251022

1026-
let mut asset_bytes = Vec::new();
1027-
byte_reader
1028-
.read_to_end(&mut asset_bytes)
1029-
.await
1030-
.map_err(|e| ProcessError::AssetReaderError {
1031-
path: asset_path.clone(),
1032-
err: AssetReaderError::Io(e.into()),
1033-
})?;
1034-
1035-
// PERF: in theory these hashes could be streamed if we want to avoid allocating the whole asset.
1036-
// The downside is that reading assets would need to happen twice (once for the hash and once for the asset loader)
1037-
// Hard to say which is worse
1038-
let new_hash = get_asset_hash(&meta_bytes, &asset_bytes);
1023+
let new_hash = {
1024+
// Create a reader just for computing the hash. Keep this scoped here so that we drop it
1025+
// as soon as the hash is computed.
1026+
let mut reader_for_hash = reader.read(path).await.map_err(reader_err)?;
1027+
get_asset_hash(&meta_bytes, &mut reader_for_hash)
1028+
.await
1029+
.map_err(reader_err)?
1030+
};
10391031
let mut new_processed_info = ProcessedInfo {
10401032
hash: new_hash,
10411033
full_hash: new_hash,
@@ -1066,6 +1058,16 @@ impl AssetProcessor {
10661058
}
10671059
}
10681060

1061+
// Create a reader just for the actual process. Note: this means that we're performing two
1062+
// reads for the same file (but we avoid having to load the whole file into memory). For
1063+
// some sources (like local file systems), this is not a big deal, but for other sources
1064+
// like an HTTP asset sources, this could be an entire additional download (if the asset
1065+
// source doesn't do any caching). In practice, most sources being processed are likely to
1066+
// be local, and processing in general is a publish-time operation, so it's not likely to be
1067+
// too big a deal. If in the future, we decide we want to avoid this repeated read, we could
1068+
// "ask" the asset source if it prefers avoiding repeated reads or not.
1069+
let mut reader_for_process = reader.read(path).await.map_err(reader_err)?;
1070+
10691071
// Note: this lock must remain alive until all processed asset and meta writes have finished (or failed)
10701072
// See ProcessedAssetInfo::file_transaction_lock docs for more info
10711073
let _transaction_lock = {
@@ -1081,8 +1083,12 @@ impl AssetProcessor {
10811083
if let Some(processor) = processor {
10821084
let mut writer = processed_writer.write(path).await.map_err(writer_err)?;
10831085
let mut processed_meta = {
1084-
let mut context =
1085-
ProcessContext::new(self, asset_path, &asset_bytes, &mut new_processed_info);
1086+
let mut context = ProcessContext::new(
1087+
self,
1088+
asset_path,
1089+
reader_for_process,
1090+
&mut new_processed_info,
1091+
);
10861092
processor
10871093
.process(&mut context, source_meta, &mut *writer)
10881094
.await?
@@ -1112,10 +1118,13 @@ impl AssetProcessor {
11121118
.await
11131119
.map_err(writer_err)?;
11141120
} else {
1115-
processed_writer
1116-
.write_bytes(path, &asset_bytes)
1121+
let mut writer = processed_writer.write(path).await.map_err(writer_err)?;
1122+
futures_lite::io::copy(&mut reader_for_process, &mut writer)
11171123
.await
1118-
.map_err(writer_err)?;
1124+
.map_err(|err| ProcessError::AssetWriterError {
1125+
path: asset_path.clone_owned(),
1126+
err: err.into(),
1127+
})?;
11191128
*source_meta.processed_info_mut() = Some(new_processed_info.clone());
11201129
let meta_bytes = source_meta.serialize();
11211130
processed_writer

crates/bevy_asset/src/processor/process.rs

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::{
22
io::{
33
AssetReaderError, AssetWriterError, MissingAssetWriterError,
4-
MissingProcessedAssetReaderError, MissingProcessedAssetWriterError, SliceReader, Writer,
4+
MissingProcessedAssetReaderError, MissingProcessedAssetWriterError, Reader, Writer,
55
},
66
meta::{AssetAction, AssetMeta, AssetMetaDyn, ProcessDependencyInfo, ProcessedInfo, Settings},
77
processor::AssetProcessor,
@@ -280,20 +280,20 @@ pub struct ProcessContext<'a> {
280280
/// [`AssetServer`]: crate::server::AssetServer
281281
processor: &'a AssetProcessor,
282282
path: &'a AssetPath<'static>,
283-
asset_bytes: &'a [u8],
283+
reader: Box<dyn Reader + 'a>,
284284
}
285285

286286
impl<'a> ProcessContext<'a> {
287287
pub(crate) fn new(
288288
processor: &'a AssetProcessor,
289289
path: &'a AssetPath<'static>,
290-
asset_bytes: &'a [u8],
290+
reader: Box<dyn Reader + 'a>,
291291
new_processed_info: &'a mut ProcessedInfo,
292292
) -> Self {
293293
Self {
294294
processor,
295295
path,
296-
asset_bytes,
296+
reader,
297297
new_processed_info,
298298
}
299299
}
@@ -309,9 +309,15 @@ impl<'a> ProcessContext<'a> {
309309
let server = &self.processor.server;
310310
let loader_name = core::any::type_name::<L>();
311311
let loader = server.get_asset_loader_with_type_name(loader_name).await?;
312-
let mut reader = SliceReader::new(self.asset_bytes);
313312
let loaded_asset = server
314-
.load_with_meta_loader_and_reader(self.path, &meta, &*loader, &mut reader, false, true)
313+
.load_with_meta_loader_and_reader(
314+
self.path,
315+
&meta,
316+
&*loader,
317+
&mut self.reader,
318+
false,
319+
true,
320+
)
315321
.await?;
316322
for (path, full_hash) in &loaded_asset.loader_dependencies {
317323
self.new_processed_info
@@ -332,7 +338,7 @@ impl<'a> ProcessContext<'a> {
332338

333339
/// The source bytes of the asset being processed.
334340
#[inline]
335-
pub fn asset_bytes(&self) -> &[u8] {
336-
self.asset_bytes
341+
pub fn asset_reader(&mut self) -> &mut dyn Reader {
342+
&mut self.reader
337343
}
338344
}

0 commit comments

Comments
 (0)