Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 52 additions & 13 deletions src/event_repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub struct DynamoEventRepository {
event_table: String,
snapshot_table: String,
stream_channel_size: usize,
use_strings: bool,
}

impl DynamoEventRepository {
Expand Down Expand Up @@ -58,10 +59,8 @@ impl DynamoEventRepository {
/// ```
pub fn with_streaming_channel_size(self, stream_channel_size: usize) -> Self {
Self {
client: self.client,
event_table: self.event_table,
snapshot_table: self.snapshot_table,
stream_channel_size,
..self
}
}
/// Configures a `DynamoEventRepository` to use the provided table names.
Expand All @@ -80,13 +79,22 @@ impl DynamoEventRepository {
pub fn with_tables(self, event_table: &str, snapshot_table: &str) -> Self {
Self::use_table_names(self.client, event_table, snapshot_table)
}
/// Configures a `DynamoEventRepository` to use strings rather than buffers for the payload
/// and metadata fields.
pub fn with_use_strings(self, use_strings: bool) -> Self {
Self {
use_strings,
..self
}
}

fn use_table_names(client: Client, event_table: &str, snapshot_table: &str) -> Self {
Self {
client,
event_table: event_table.to_string(),
snapshot_table: snapshot_table.to_string(),
stream_channel_size: DEFAULT_STREAMING_CHANNEL_SIZE,
use_strings: false,
}
}

Expand All @@ -97,14 +105,16 @@ impl DynamoEventRepository {
if events.is_empty() {
return Ok(());
}
let (transactions, _) = Self::build_event_put_transactions(&self.event_table, events);
let (transactions, _) =
Self::build_event_put_transactions(&self.event_table, events, self.use_strings);
commit_transactions(&self.client, transactions).await?;
Ok(())
}

fn build_event_put_transactions(
table_name: &str,
events: &[SerializedEvent],
use_strings: bool,
) -> (Vec<TransactWriteItem>, usize) {
let mut current_sequence: usize = 0;
let mut transactions: Vec<TransactWriteItem> = Vec::default();
Expand All @@ -117,10 +127,22 @@ impl DynamoEventRepository {
let sequence = AttributeValue::N(String::from(&event.sequence.to_string()));
let event_version = AttributeValue::S(String::from(&event.event_version));
let event_type = AttributeValue::S(String::from(&event.event_type));
let payload_blob = serde_json::to_vec(&event.payload).unwrap();
let payload = AttributeValue::B(Blob::new(payload_blob));
let metadata_blob = serde_json::to_vec(&event.metadata).unwrap();
let metadata = AttributeValue::B(Blob::new(metadata_blob));

let payload = if use_strings {
let payload_json = serde_json::to_string(&event.payload).unwrap();
AttributeValue::S(payload_json)
} else {
let payload_blob = serde_json::to_vec(&event.payload).unwrap();
AttributeValue::B(Blob::new(payload_blob))
};

let metadata = if use_strings {
let metadata_json = serde_json::to_string(&event.metadata).unwrap();
AttributeValue::S(metadata_json)
} else {
let metadata_blob = serde_json::to_vec(&event.metadata).unwrap();
AttributeValue::B(Blob::new(metadata_blob))
};

let put = Put::builder()
.table_name(table_name)
Expand Down Expand Up @@ -192,18 +214,26 @@ impl DynamoEventRepository {
aggregate_id: String,
current_snapshot: usize,
events: &[SerializedEvent],
use_strings: bool,
) -> Result<(), DynamoAggregateError> {
let expected_snapshot = current_snapshot - 1;
let (mut transactions, current_sequence) =
Self::build_event_put_transactions(&self.event_table, events);
Self::build_event_put_transactions(&self.event_table, events, self.use_strings);
let aggregate_type_and_id =
AttributeValue::S(format!("{}:{}", A::aggregate_type(), &aggregate_id));
let aggregate_type = AttributeValue::S(A::aggregate_type());
let aggregate_id = AttributeValue::S(aggregate_id);
let current_sequence = AttributeValue::N(current_sequence.to_string());
let current_snapshot = AttributeValue::N(current_snapshot.to_string());
let payload_blob = serde_json::to_vec(&aggregate_payload).unwrap();
let payload = AttributeValue::B(Blob::new(payload_blob));

let payload = if use_strings {
let payload_json = serde_json::to_string(&aggregate_payload).unwrap();
AttributeValue::S(payload_json)
} else {
let payload_blob = serde_json::to_vec(&aggregate_payload).unwrap();
AttributeValue::B(Blob::new(payload_blob))
};

let expected_snapshot = AttributeValue::N(expected_snapshot.to_string());
transactions.push(TransactWriteItem::builder()
.put(Put::builder()
Expand Down Expand Up @@ -331,8 +361,14 @@ impl PersistedEventRepository for DynamoEventRepository {
self.insert_events(events).await?;
}
Some((aggregate_id, aggregate, current_snapshot)) => {
self.update_snapshot::<A>(aggregate, aggregate_id, current_snapshot, events)
.await?;
self.update_snapshot::<A>(
aggregate,
aggregate_id,
current_snapshot,
events,
self.use_strings,
)
.await?;
}
}
Ok(())
Expand Down Expand Up @@ -562,6 +598,7 @@ mod test {
id.clone(),
1,
&vec![],
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -593,6 +630,7 @@ mod test {
id.clone(),
2,
&vec![],
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -625,6 +663,7 @@ mod test {
id.clone(),
2,
&vec![],
false,
)
.await
.unwrap_err();
Expand Down
9 changes: 6 additions & 3 deletions src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,12 @@ pub(crate) fn att_as_value(
let attribute = require_attribute(values, attribute_name)?;
match attribute.as_b() {
Ok(payload_blob) => Ok(serde_json::from_slice(payload_blob.as_ref())?),
Err(_) => Err(DynamoAggregateError::MissingAttribute(
attribute_name.to_string(),
)),
Err(_) => match attribute.as_s() {
Ok(payload_string) => Ok(serde_json::from_str(&payload_string)?),
Err(_) => Err(DynamoAggregateError::MissingAttribute(
attribute_name.to_string(),
)),
},
}
}

Expand Down
46 changes: 39 additions & 7 deletions src/view_repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@ use aws_sdk_dynamodb::types::{AttributeValue, Put, TransactWriteItem};
use cqrs_es::persist::{PersistenceError, ViewContext, ViewRepository};
use cqrs_es::{Aggregate, View};

use crate::helpers::{att_as_number, att_as_value, commit_transactions, load_dynamo_view};
use crate::helpers::{
att_as_number, att_as_string, att_as_value, commit_transactions, load_dynamo_view,
};

/// A DynamoDb backed view repository for use in backing a `GenericQuery`.
pub struct DynamoViewRepository<V, A> {
_phantom: PhantomData<(V, A)>,
view_name: String,
client: aws_sdk_dynamodb::client::Client,
use_strings: bool,
}

impl<V, A> DynamoViewRepository<V, A>
Expand Down Expand Up @@ -40,6 +43,15 @@ where
_phantom: Default::default(),
view_name: view_name.to_string(),
client,
use_strings: false,
}
}
/// Configures a `DynamoViewRepository` to use strings rather than buffers for the payload
/// field.
pub fn with_use_strings(self, use_strings: bool) -> Self {
Self {
use_strings,
..self
}
}
}
Expand All @@ -60,8 +72,14 @@ where
None => return Ok(None),
Some(item) => item,
};
let payload = att_as_value(query_item, "Payload")?;
let view: V = serde_json::from_value(payload)?;
let view: V = if self.use_strings {
let payload = att_as_string(query_item, "Payload")?;
serde_json::from_str(&payload)?
} else {
let payload = att_as_value(query_item, "Payload")?;
serde_json::from_value(payload)?
};

Ok(Some(view))
}

Expand All @@ -83,8 +101,15 @@ where
Some(item) => item,
};
let version = att_as_number(query_item, "ViewVersion")?;
let payload = att_as_value(query_item, "Payload")?;
let view: V = serde_json::from_value(payload)?;

let view: V = if self.use_strings {
let payload = att_as_string(query_item, "Payload")?;
serde_json::from_str(&payload)?
} else {
let payload = att_as_value(query_item, "Payload")?;
serde_json::from_value(payload)?
};

let context = ViewContext::new(view_id.to_string(), version as i64);
Ok(Some((view, context)))
}
Expand All @@ -93,8 +118,15 @@ where
let view_id = AttributeValue::S(String::from(&context.view_instance_id));
let expected_view_version = AttributeValue::N(context.version.to_string());
let view_version = AttributeValue::N((context.version + 1).to_string());
let payload_blob = serde_json::to_vec(&view).unwrap();
let payload = AttributeValue::B(Blob::new(payload_blob));

let payload = if self.use_strings {
let payload_json = serde_json::to_string(&view).unwrap();
AttributeValue::S(payload_json)
} else {
let payload_blob = serde_json::to_vec(&view).unwrap();
AttributeValue::B(Blob::new(payload_blob))
};

let transaction = TransactWriteItem::builder()
.put(Put::builder()
.table_name(&self.view_name)
Expand Down