From b9ce2bdd7f19e406190cc3571e9b170990044e56 Mon Sep 17 00:00:00 2001 From: allnil Date: Thu, 26 Jun 2025 12:19:12 +0100 Subject: [PATCH] feat: make bundle tx struct cloneable --- src/deep_hash.rs | 23 ++++++++++++----------- src/transaction/bundlr.rs | 14 +++++++++++--- 2 files changed, 23 insertions(+), 14 deletions(-) diff --git a/src/deep_hash.rs b/src/deep_hash.rs index 492978d..9e5195e 100644 --- a/src/deep_hash.rs +++ b/src/deep_hash.rs @@ -10,15 +10,19 @@ use crate::{ }; use futures::{Stream, TryStream, TryStreamExt}; -pub enum DeepHashChunk<'a> { +pub enum DeepHashChunk { Chunk(Bytes), - Stream(&'a mut Pin> + Send>>), - Chunks(Vec>), + Stream( + std::sync::Arc< + tokio::sync::Mutex> + Send + 'static>>>, + >, + ), + Chunks(Vec), } trait Foo: Stream> + TryStream {} -pub async fn deep_hash(chunk: DeepHashChunk<'_>) -> Result { +pub async fn deep_hash(chunk: DeepHashChunk) -> Result { match chunk { DeepHashChunk::Chunk(b) => { let tag = [BLOB_AS_BUFFER, b.len().to_string().as_bytes()].concat(); @@ -26,14 +30,11 @@ pub async fn deep_hash(chunk: DeepHashChunk<'_>) -> Result { Ok(Bytes::copy_from_slice(&sha384hash(c.into()))) } DeepHashChunk::Stream(s) => { + let mut guard = s.lock().await; + let mut s = guard.as_mut(); let mut hasher = Sha384::new(); let mut length = 0; - while let Some(chunk) = s - .as_mut() - .try_next() - .await - .map_err(|_| BundlrError::NoBytesLeft)? - { + while let Some(chunk) = s.try_next().await.map_err(|_| BundlrError::NoBytesLeft)? { length += chunk.len(); hasher.update(&chunk); } @@ -62,7 +63,7 @@ pub async fn deep_hash(chunk: DeepHashChunk<'_>) -> Result { #[async_recursion] pub async fn deep_hash_chunks( - chunks: &mut Vec>, + chunks: &mut Vec, acc: Bytes, ) -> Result { if chunks.is_empty() { diff --git a/src/transaction/bundlr.rs b/src/transaction/bundlr.rs index 5ba90d0..a758250 100644 --- a/src/transaction/bundlr.rs +++ b/src/transaction/bundlr.rs @@ -15,12 +15,18 @@ use crate::signers::Signer; use crate::tags::{AvroDecode, AvroEncode, Tag}; use crate::utils::read_offset; +#[derive(Clone)] enum Data { None, Bytes(Vec), - Stream(Pin> + Send>>), + Stream( + std::sync::Arc< + tokio::sync::Mutex> + Send + 'static>>>, + >, + ), } +#[derive(Clone)] pub struct BundlrTx { signature_type: SignerMap, signature: Vec, @@ -161,7 +167,9 @@ impl BundlrTx { }; Ok(BundlrTx { - data: Data::Stream(Box::pin(file_stream)), + data: Data::Stream(std::sync::Arc::new(tokio::sync::Mutex::new(Box::pin( + file_stream, + )))), ..bundlr_tx }) } @@ -260,7 +268,7 @@ impl BundlrTx { ])) } Data::Stream(file_stream) => { - let data_chunk = DeepHashChunk::Stream(file_stream); + let data_chunk = DeepHashChunk::Stream(file_stream.clone()); let sig_type = &self.signature_type; let sig_type_bytes = sig_type.as_u16().to_string().as_bytes().to_vec(); deep_hash(DeepHashChunk::Chunks(vec![