Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 20 additions & 6 deletions crates/bevy_asset/src/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ use alloc::{
string::{String, ToString},
vec::Vec,
};
use futures_lite::AsyncReadExt;

use crate::{
loader::AssetLoader, processor::Process, Asset, AssetPath, DeserializeMetaError,
VisitAssetDependencies,
io::{AssetReaderError, Reader},
loader::AssetLoader,
processor::Process,
Asset, AssetPath, DeserializeMetaError, VisitAssetDependencies,
};
use downcast_rs::{impl_downcast, Downcast};
use ron::ser::PrettyConfig;
Expand Down Expand Up @@ -204,7 +207,7 @@ impl AssetLoader for () {
type Error = std::io::Error;
async fn load(
&self,
_reader: &mut dyn crate::io::Reader,
_reader: &mut dyn Reader,
_settings: &Self::Settings,
_load_context: &mut crate::LoadContext<'_>,
) -> Result<Self::Asset, Self::Error> {
Expand Down Expand Up @@ -241,11 +244,22 @@ pub(crate) fn loader_settings_meta_transform<S: Settings>(
pub type AssetHash = [u8; 32];

/// NOTE: changing the hashing logic here is a _breaking change_ that requires a [`META_FORMAT_VERSION`] bump.
pub(crate) fn get_asset_hash(meta_bytes: &[u8], asset_bytes: &[u8]) -> AssetHash {
pub(crate) async fn get_asset_hash(
meta_bytes: &[u8],
asset_reader: &mut impl Reader,
) -> Result<AssetHash, AssetReaderError> {
let mut hasher = blake3::Hasher::new();
hasher.update(meta_bytes);
hasher.update(asset_bytes);
*hasher.finalize().as_bytes()
let mut buffer = [0; blake3::CHUNK_LEN];
loop {
let bytes_read = asset_reader.read(&mut buffer).await?;
hasher.update(&buffer[..bytes_read]);
if bytes_read < buffer.len() {
// This means we've reached EOF, so we're done consuming asset bytes.
break;
}
}
Ok(*hasher.finalize().as_bytes())
}

/// NOTE: changing the hashing logic here is a _breaking change_ that requires a [`META_FORMAT_VERSION`] bump.
Expand Down
53 changes: 31 additions & 22 deletions crates/bevy_asset/src/processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ use bevy_platform::{
};
use bevy_tasks::IoTaskPool;
use futures_io::ErrorKind;
use futures_lite::{AsyncReadExt, AsyncWriteExt, StreamExt};
use futures_lite::{AsyncWriteExt, StreamExt};
use futures_util::{select_biased, FutureExt};
use std::{
path::{Path, PathBuf},
Expand Down Expand Up @@ -966,9 +966,6 @@ impl AssetProcessor {
err,
};

// Note: we get the asset source reader first because we don't want to create meta files for assets that don't have source files
let mut byte_reader = reader.read(path).await.map_err(reader_err)?;

let (mut source_meta, meta_bytes, processor) = match reader.read_meta_bytes(path).await {
Ok(meta_bytes) => {
let minimal: AssetMetaMinimal = ron::de::from_bytes(&meta_bytes).map_err(|e| {
Expand Down Expand Up @@ -1023,19 +1020,14 @@ impl AssetProcessor {

let processed_writer = source.processed_writer()?;

let mut asset_bytes = Vec::new();
byte_reader
.read_to_end(&mut asset_bytes)
.await
.map_err(|e| ProcessError::AssetReaderError {
path: asset_path.clone(),
err: AssetReaderError::Io(e.into()),
})?;

// PERF: in theory these hashes could be streamed if we want to avoid allocating the whole asset.
// The downside is that reading assets would need to happen twice (once for the hash and once for the asset loader)
// Hard to say which is worse
let new_hash = get_asset_hash(&meta_bytes, &asset_bytes);
let new_hash = {
// Create a reader just for computing the hash. Keep this scoped here so that we drop it
// as soon as the hash is computed.
let mut reader_for_hash = reader.read(path).await.map_err(reader_err)?;
get_asset_hash(&meta_bytes, &mut reader_for_hash)
.await
.map_err(reader_err)?
};
let mut new_processed_info = ProcessedInfo {
hash: new_hash,
full_hash: new_hash,
Expand Down Expand Up @@ -1066,6 +1058,16 @@ impl AssetProcessor {
}
}

// Create a reader just for the actual process. Note: this means that we're performing two
// reads for the same file (but we avoid having to load the whole file into memory). For
// some sources (like local file systems), this is not a big deal, but for other sources
// like an HTTP asset sources, this could be an entire additional download (if the asset
// source doesn't do any caching). In practice, most sources being processed are likely to
// be local, and processing in general is a publish-time operation, so it's not likely to be
// too big a deal. If in the future, we decide we want to avoid this repeated read, we could
// "ask" the asset source if it prefers avoiding repeated reads or not.
let mut reader_for_process = reader.read(path).await.map_err(reader_err)?;

// Note: this lock must remain alive until all processed asset and meta writes have finished (or failed)
// See ProcessedAssetInfo::file_transaction_lock docs for more info
let _transaction_lock = {
Expand All @@ -1081,8 +1083,12 @@ impl AssetProcessor {
if let Some(processor) = processor {
let mut writer = processed_writer.write(path).await.map_err(writer_err)?;
let mut processed_meta = {
let mut context =
ProcessContext::new(self, asset_path, &asset_bytes, &mut new_processed_info);
let mut context = ProcessContext::new(
self,
asset_path,
reader_for_process,
&mut new_processed_info,
);
processor
.process(&mut context, source_meta, &mut *writer)
.await?
Expand Down Expand Up @@ -1112,10 +1118,13 @@ impl AssetProcessor {
.await
.map_err(writer_err)?;
} else {
processed_writer
.write_bytes(path, &asset_bytes)
let mut writer = processed_writer.write(path).await.map_err(writer_err)?;
futures_lite::io::copy(&mut reader_for_process, &mut writer)
.await
.map_err(writer_err)?;
.map_err(|err| ProcessError::AssetWriterError {
path: asset_path.clone_owned(),
err: err.into(),
})?;
*source_meta.processed_info_mut() = Some(new_processed_info.clone());
let meta_bytes = source_meta.serialize();
processed_writer
Expand Down
24 changes: 15 additions & 9 deletions crates/bevy_asset/src/processor/process.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
io::{
AssetReaderError, AssetWriterError, MissingAssetWriterError,
MissingProcessedAssetReaderError, MissingProcessedAssetWriterError, SliceReader, Writer,
MissingProcessedAssetReaderError, MissingProcessedAssetWriterError, Reader, Writer,
},
meta::{AssetAction, AssetMeta, AssetMetaDyn, ProcessDependencyInfo, ProcessedInfo, Settings},
processor::AssetProcessor,
Expand Down Expand Up @@ -280,20 +280,20 @@ pub struct ProcessContext<'a> {
/// [`AssetServer`]: crate::server::AssetServer
processor: &'a AssetProcessor,
path: &'a AssetPath<'static>,
asset_bytes: &'a [u8],
reader: Box<dyn Reader + 'a>,
}

impl<'a> ProcessContext<'a> {
pub(crate) fn new(
processor: &'a AssetProcessor,
path: &'a AssetPath<'static>,
asset_bytes: &'a [u8],
reader: Box<dyn Reader + 'a>,
new_processed_info: &'a mut ProcessedInfo,
) -> Self {
Self {
processor,
path,
asset_bytes,
reader,
new_processed_info,
}
}
Expand All @@ -309,9 +309,15 @@ impl<'a> ProcessContext<'a> {
let server = &self.processor.server;
let loader_name = core::any::type_name::<L>();
let loader = server.get_asset_loader_with_type_name(loader_name).await?;
let mut reader = SliceReader::new(self.asset_bytes);
let loaded_asset = server
.load_with_meta_loader_and_reader(self.path, &meta, &*loader, &mut reader, false, true)
.load_with_meta_loader_and_reader(
self.path,
&meta,
&*loader,
&mut self.reader,
false,
true,
)
.await?;
for (path, full_hash) in &loaded_asset.loader_dependencies {
self.new_processed_info
Expand All @@ -330,9 +336,9 @@ impl<'a> ProcessContext<'a> {
self.path
}

/// The source bytes of the asset being processed.
/// The reader for the asset being processed.
#[inline]
pub fn asset_bytes(&self) -> &[u8] {
self.asset_bytes
pub fn asset_reader(&mut self) -> &mut dyn Reader {
&mut self.reader
}
}
30 changes: 30 additions & 0 deletions release-content/migration-guides/process_trait_changes.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
---
title: Changes to the `Process` trait in `bevy_asset`.
pull_requests: [21925]
---

`ProcessContext` no longer includes `asset_bytes`. This has been replaced by `asset_reader`. To
maintain current behavior in a `Process` implementation, you can read all the bytes into memory.
If previously, you did:

```rust
// Inside `impl Process for Type`
let bytes = context.asset_bytes();
// Use bytes here!
```

Then now, it should be:

```rust
// Inside `impl Process for Type`
let reader = context.asset_reader();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a nice helper for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I intentionally don't want to. We should be encouraging users to engage with the "buffered" API of reading and writing, to reduce how much memory we're using. Today a lot of our loaders and sources just read everything into memory, and that can really limit how much data we can process. Besides, this is exactly how users should be doing it in loaders anyway, so this is no different. This migration is just weird because the previous behavior of having all the bytes is weird.

let mut bytes = vec![];
reader
.read_to_end(&mut bytes)
.await
.map_err(|err| ProcessError::AssetReaderError {
path: context.path().clone_owned(),
err: err.into(),
})?;
// Use bytes here!
```