diff --git a/src/event_repository.rs b/src/event_repository.rs index 16dc448..8122477 100644 --- a/src/event_repository.rs +++ b/src/event_repository.rs @@ -27,6 +27,7 @@ pub struct DynamoEventRepository { event_table: String, snapshot_table: String, stream_channel_size: usize, + use_strings: bool, } impl DynamoEventRepository { @@ -58,10 +59,8 @@ impl DynamoEventRepository { /// ``` pub fn with_streaming_channel_size(self, stream_channel_size: usize) -> Self { Self { - client: self.client, - event_table: self.event_table, - snapshot_table: self.snapshot_table, stream_channel_size, + ..self } } /// Configures a `DynamoEventRepository` to use the provided table names. @@ -80,6 +79,14 @@ impl DynamoEventRepository { pub fn with_tables(self, event_table: &str, snapshot_table: &str) -> Self { Self::use_table_names(self.client, event_table, snapshot_table) } + /// Configures a `DynamoEventRepository` to use strings rather than buffers for the payload + /// and metadata fields. + pub fn with_use_strings(self, use_strings: bool) -> Self { + Self { + use_strings, + ..self + } + } fn use_table_names(client: Client, event_table: &str, snapshot_table: &str) -> Self { Self { @@ -87,6 +94,7 @@ impl DynamoEventRepository { event_table: event_table.to_string(), snapshot_table: snapshot_table.to_string(), stream_channel_size: DEFAULT_STREAMING_CHANNEL_SIZE, + use_strings: false, } } @@ -97,7 +105,8 @@ impl DynamoEventRepository { if events.is_empty() { return Ok(()); } - let (transactions, _) = Self::build_event_put_transactions(&self.event_table, events); + let (transactions, _) = + Self::build_event_put_transactions(&self.event_table, events, self.use_strings); commit_transactions(&self.client, transactions).await?; Ok(()) } @@ -105,6 +114,7 @@ impl DynamoEventRepository { fn build_event_put_transactions( table_name: &str, events: &[SerializedEvent], + use_strings: bool, ) -> (Vec, usize) { let mut current_sequence: usize = 0; let mut transactions: Vec = Vec::default(); @@ -117,10 +127,22 @@ impl DynamoEventRepository { let sequence = AttributeValue::N(String::from(&event.sequence.to_string())); let event_version = AttributeValue::S(String::from(&event.event_version)); let event_type = AttributeValue::S(String::from(&event.event_type)); - let payload_blob = serde_json::to_vec(&event.payload).unwrap(); - let payload = AttributeValue::B(Blob::new(payload_blob)); - let metadata_blob = serde_json::to_vec(&event.metadata).unwrap(); - let metadata = AttributeValue::B(Blob::new(metadata_blob)); + + let payload = if use_strings { + let payload_json = serde_json::to_string(&event.payload).unwrap(); + AttributeValue::S(payload_json) + } else { + let payload_blob = serde_json::to_vec(&event.payload).unwrap(); + AttributeValue::B(Blob::new(payload_blob)) + }; + + let metadata = if use_strings { + let metadata_json = serde_json::to_string(&event.metadata).unwrap(); + AttributeValue::S(metadata_json) + } else { + let metadata_blob = serde_json::to_vec(&event.metadata).unwrap(); + AttributeValue::B(Blob::new(metadata_blob)) + }; let put = Put::builder() .table_name(table_name) @@ -192,18 +214,26 @@ impl DynamoEventRepository { aggregate_id: String, current_snapshot: usize, events: &[SerializedEvent], + use_strings: bool, ) -> Result<(), DynamoAggregateError> { let expected_snapshot = current_snapshot - 1; let (mut transactions, current_sequence) = - Self::build_event_put_transactions(&self.event_table, events); + Self::build_event_put_transactions(&self.event_table, events, self.use_strings); let aggregate_type_and_id = AttributeValue::S(format!("{}:{}", A::aggregate_type(), &aggregate_id)); let aggregate_type = AttributeValue::S(A::aggregate_type()); let aggregate_id = AttributeValue::S(aggregate_id); let current_sequence = AttributeValue::N(current_sequence.to_string()); let current_snapshot = AttributeValue::N(current_snapshot.to_string()); - let payload_blob = serde_json::to_vec(&aggregate_payload).unwrap(); - let payload = AttributeValue::B(Blob::new(payload_blob)); + + let payload = if use_strings { + let payload_json = serde_json::to_string(&aggregate_payload).unwrap(); + AttributeValue::S(payload_json) + } else { + let payload_blob = serde_json::to_vec(&aggregate_payload).unwrap(); + AttributeValue::B(Blob::new(payload_blob)) + }; + let expected_snapshot = AttributeValue::N(expected_snapshot.to_string()); transactions.push(TransactWriteItem::builder() .put(Put::builder() @@ -331,8 +361,14 @@ impl PersistedEventRepository for DynamoEventRepository { self.insert_events(events).await?; } Some((aggregate_id, aggregate, current_snapshot)) => { - self.update_snapshot::(aggregate, aggregate_id, current_snapshot, events) - .await?; + self.update_snapshot::( + aggregate, + aggregate_id, + current_snapshot, + events, + self.use_strings, + ) + .await?; } } Ok(()) @@ -562,6 +598,7 @@ mod test { id.clone(), 1, &vec![], + false, ) .await .unwrap(); @@ -593,6 +630,7 @@ mod test { id.clone(), 2, &vec![], + false, ) .await .unwrap(); @@ -625,6 +663,7 @@ mod test { id.clone(), 2, &vec![], + false, ) .await .unwrap_err(); diff --git a/src/helpers.rs b/src/helpers.rs index b511338..2835654 100644 --- a/src/helpers.rs +++ b/src/helpers.rs @@ -46,9 +46,12 @@ pub(crate) fn att_as_value( let attribute = require_attribute(values, attribute_name)?; match attribute.as_b() { Ok(payload_blob) => Ok(serde_json::from_slice(payload_blob.as_ref())?), - Err(_) => Err(DynamoAggregateError::MissingAttribute( - attribute_name.to_string(), - )), + Err(_) => match attribute.as_s() { + Ok(payload_string) => Ok(serde_json::from_str(&payload_string)?), + Err(_) => Err(DynamoAggregateError::MissingAttribute( + attribute_name.to_string(), + )), + }, } } diff --git a/src/view_repository.rs b/src/view_repository.rs index a401a49..a706dc6 100644 --- a/src/view_repository.rs +++ b/src/view_repository.rs @@ -6,13 +6,16 @@ use aws_sdk_dynamodb::types::{AttributeValue, Put, TransactWriteItem}; use cqrs_es::persist::{PersistenceError, ViewContext, ViewRepository}; use cqrs_es::{Aggregate, View}; -use crate::helpers::{att_as_number, att_as_value, commit_transactions, load_dynamo_view}; +use crate::helpers::{ + att_as_number, att_as_string, att_as_value, commit_transactions, load_dynamo_view, +}; /// A DynamoDb backed view repository for use in backing a `GenericQuery`. pub struct DynamoViewRepository { _phantom: PhantomData<(V, A)>, view_name: String, client: aws_sdk_dynamodb::client::Client, + use_strings: bool, } impl DynamoViewRepository @@ -40,6 +43,15 @@ where _phantom: Default::default(), view_name: view_name.to_string(), client, + use_strings: false, + } + } + /// Configures a `DynamoViewRepository` to use strings rather than buffers for the payload + /// field. + pub fn with_use_strings(self, use_strings: bool) -> Self { + Self { + use_strings, + ..self } } } @@ -60,8 +72,14 @@ where None => return Ok(None), Some(item) => item, }; - let payload = att_as_value(query_item, "Payload")?; - let view: V = serde_json::from_value(payload)?; + let view: V = if self.use_strings { + let payload = att_as_string(query_item, "Payload")?; + serde_json::from_str(&payload)? + } else { + let payload = att_as_value(query_item, "Payload")?; + serde_json::from_value(payload)? + }; + Ok(Some(view)) } @@ -83,8 +101,15 @@ where Some(item) => item, }; let version = att_as_number(query_item, "ViewVersion")?; - let payload = att_as_value(query_item, "Payload")?; - let view: V = serde_json::from_value(payload)?; + + let view: V = if self.use_strings { + let payload = att_as_string(query_item, "Payload")?; + serde_json::from_str(&payload)? + } else { + let payload = att_as_value(query_item, "Payload")?; + serde_json::from_value(payload)? + }; + let context = ViewContext::new(view_id.to_string(), version as i64); Ok(Some((view, context))) } @@ -93,8 +118,15 @@ where let view_id = AttributeValue::S(String::from(&context.view_instance_id)); let expected_view_version = AttributeValue::N(context.version.to_string()); let view_version = AttributeValue::N((context.version + 1).to_string()); - let payload_blob = serde_json::to_vec(&view).unwrap(); - let payload = AttributeValue::B(Blob::new(payload_blob)); + + let payload = if self.use_strings { + let payload_json = serde_json::to_string(&view).unwrap(); + AttributeValue::S(payload_json) + } else { + let payload_blob = serde_json::to_vec(&view).unwrap(); + AttributeValue::B(Blob::new(payload_blob)) + }; + let transaction = TransactWriteItem::builder() .put(Put::builder() .table_name(&self.view_name)