diff --git a/bin/all-o-stasis/src/main.rs b/bin/all-o-stasis/src/main.rs index 44847a7..71b2f6f 100644 --- a/bin/all-o-stasis/src/main.rs +++ b/bin/all-o-stasis/src/main.rs @@ -12,7 +12,6 @@ use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; mod passport; mod routes; -mod session; mod storage; mod types; mod word_list; @@ -31,13 +30,13 @@ struct AppState { // The kinds of errors we can hit in our application. #[derive(Debug)] -enum AppError { +pub enum AppError { // Ot operations fail Ot(OtError), // firestore db errors Firestore(FirestoreError), // query error - Query(String), + Query(String), // TODO split and more meaningful name // unable to parse json content into type ParseError(String), // No session found diff --git a/bin/all-o-stasis/src/passport.rs b/bin/all-o-stasis/src/passport.rs index 88189f4..152a968 100644 --- a/bin/all-o-stasis/src/passport.rs +++ b/bin/all-o-stasis/src/passport.rs @@ -17,11 +17,8 @@ use serde::{Deserialize, Serialize}; use crate::{ AppError, AppState, - storage::{ - ACCOUNTS_VIEW_COLLECTION, SESSIONS_COLLECTION, apply_object_updates, create_object, - lookup_latest_snapshot, save_session, - }, - types::{Account, AccountRole, ObjectType}, + storage::apply_object_updates, + types::{Account, AccountRole, AccountsView, Object, ObjectType, Snapshot}, word_list::make_security_code, }; @@ -121,6 +118,16 @@ mod maileroo { pub type SessionId = String; +pub(crate) async fn author_from_session( + state: &AppState, + gym: &String, + session_id: &Cookie<'static>, +) -> Result { + let session_id = session_id.value().to_owned(); + let session = Session::lookup(state, gym, session_id).await?; + Ok(session.obj_id) +} + #[derive(Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub(crate) struct Session { @@ -143,6 +150,72 @@ impl fmt::Display for Session { } } +impl Session { + const COLLECTION: &str = "sessions"; + + pub async fn lookup( + state: &AppState, + gym: &String, + object_id: ObjectId, + ) -> Result { + let parent_path = state.db.parent_path("gyms", gym)?; + state + .db + .fluent() + .select() + .by_id_in(Self::COLLECTION) + .parent(&parent_path) + .obj() + .one(&object_id) + .await? + .ok_or(AppError::NoSession()) + } + + pub async fn store( + &self, + state: &AppState, + gym: &String, + session_id: &str, + ) -> Result { + let parent_path = state.db.parent_path("gyms", gym)?; + let p: Option = state + .db + .fluent() + .update() + .in_col(Session::COLLECTION) + .document_id(session_id) + .parent(&parent_path) + .object(self) + .execute() + .await?; + + match p { + Some(p) => { + tracing::debug!("storing session: {p}"); + Ok(p) + } + None => { + tracing::warn!("failed to update session: {self} (no such object exists"); + Err(AppError::NoSession()) + } + } + } + + pub async fn delete(state: &AppState, gym: &String, session_id: &str) -> Result<(), AppError> { + let parent_path = state.db.parent_path("gyms", gym)?; + state + .db + .fluent() + .delete() + .from(Self::COLLECTION) + .parent(&parent_path) + .document_id(session_id) + .execute() + .await?; + Ok(()) + } +} + #[derive(Serialize, Deserialize)] #[serde(rename_all = "camelCase")] struct Passport { @@ -241,28 +314,10 @@ async fn create_passport( Path(gym): Path, Json(payload): axum::extract::Json, ) -> Result, AppError> { - let parent_path = state.db.parent_path("gyms", gym.clone())?; - // 1. Lookup account by email. If no such account exists, create a new one - let account_stream: BoxStream> = state - .db - .fluent() - .select() - .from(ACCOUNTS_VIEW_COLLECTION) - .parent(&parent_path) - .filter(|q| { - q.for_all([q - .field(path_camel_case!(Account::email)) - .eq(payload.email.clone())]) - }) - .limit(1) - .obj() - .stream_query_with_errors() - .await?; - - let accounts: Vec = account_stream.try_collect().await?; - let maybe_account_id: Result = match accounts.first() { - Some(account) => Ok(account.id.clone().expect("object has no id")), + let account = AccountsView::with_email(&state, gym.clone(), payload.email.clone()).await?; + let maybe_account_id: Result = match account { + Some(account) => Ok(account.id.clone().expect("existing accounts have an id")), None => { let account = Account { id: None, @@ -272,15 +327,10 @@ async fn create_passport( name: None, }; let value = serde_json::to_value(account).expect("serialising account"); - let obj = create_object( - &state, - &gym, - String::from(""), // TODO fine? - ObjectType::Account, - &value, - ) - .await?; - + // TODO: author? root? + let obj = + Object::from_value(&state, &gym, String::from(""), ObjectType::Account, &value) + .await?; Ok(obj.id.clone()) } }; @@ -299,7 +349,7 @@ async fn create_passport( validity: PassportValidity::Unconfirmed, }; let value = serde_json::to_value(passport).expect("serialising passport"); - let obj = create_object(&state, &gym, account_id, ObjectType::Passport, &value).await?; + let obj = Object::from_value(&state, &gym, account_id, ObjectType::Passport, &value).await?; let passport_id = obj.id.clone(); @@ -327,7 +377,7 @@ async fn confirm_passport( Query(pport): Query, jar: CookieJar, ) -> Result { - let snapshot = lookup_latest_snapshot(&state, &gym, &pport.passport_id.clone()).await?; + let snapshot = Snapshot::lookup_latest(&state, &gym, &pport.passport_id.clone()).await?; let passport: Passport = serde_json::from_value(snapshot.content).or(Err( AppError::ParseError("failed to parse object into Passport".to_string()), ))?; @@ -336,17 +386,13 @@ async fn confirm_passport( Err(AppError::NotAuthorized()) } else { // create a new session for the account in the Passport object - let session = save_session( - &state, - &gym, - &Session { - id: None, - obj_id: passport.account_id, - created_at: None, - last_accessed_at: chrono::offset::Utc::now(), - }, - &new_id(80), - ) + let session = Session { + id: None, + obj_id: passport.account_id, + created_at: None, + last_accessed_at: chrono::offset::Utc::now(), + } + .store(&state, &gym, &new_id(80)) .await?; // mark as valid @@ -361,7 +407,6 @@ async fn confirm_passport( snapshot.revision_id, String::from(""), // TODO fine? [op].to_vec(), - false, ) .await?; @@ -396,7 +441,7 @@ async fn await_passport_confirmation( jar: CookieJar, ) -> Result { let (account_id, revision_id) = loop { - let snapshot = lookup_latest_snapshot(&state, &gym, &pport.passport_id.clone()).await?; + let snapshot = Snapshot::lookup_latest(&state, &gym, &pport.passport_id.clone()).await?; let passport: Passport = serde_json::from_value(snapshot.content).or(Err( AppError::ParseError("failed to parse object into Passport".to_string()), ))?; @@ -426,7 +471,6 @@ async fn await_passport_confirmation( revision_id, String::from(""), // TODO fine? [op].to_vec(), - false, ) .await?; @@ -436,7 +480,7 @@ async fn await_passport_confirmation( .db .fluent() .select() - .from(SESSIONS_COLLECTION) + .from(Session::COLLECTION) .parent(&parent_path) .filter(|q| { q.for_all([q diff --git a/bin/all-o-stasis/src/routes.rs b/bin/all-o-stasis/src/routes.rs index c2f30e4..de0d2e7 100644 --- a/bin/all-o-stasis/src/routes.rs +++ b/bin/all-o-stasis/src/routes.rs @@ -7,7 +7,7 @@ mod api; mod collection; mod stats; -pub use api::{LookupObjectResponse, PatchObjectResponse}; +pub use api::PatchObjectResponse; mod built_info { include!(concat!(env!("OUT_DIR"), "/built.rs")); diff --git a/bin/all-o-stasis/src/routes/api.rs b/bin/all-o-stasis/src/routes/api.rs index 6e107dd..d0924b5 100644 --- a/bin/all-o-stasis/src/routes/api.rs +++ b/bin/all-o-stasis/src/routes/api.rs @@ -1,12 +1,8 @@ use std::net::SocketAddr; -use crate::passport::Session; -use crate::session::{account_role, author_from_session}; -use crate::storage::{ - BOULDERS_VIEW_COLLECTION, OBJECTS_COLLECTION, PATCHES_COLLECTION, SESSIONS_COLLECTION, - apply_object_updates, create_object, lookup_object_, -}; -use crate::types::{AccountRole, Boulder, Object, ObjectDoc, ObjectType, Patch}; +use crate::passport::{Session, author_from_session}; +use crate::storage::apply_object_updates; +use crate::types::{AccountRole, AccountsView, Boulder, Object, ObjectType, Patch, Snapshot}; use crate::ws::handle_socket; use crate::{AppError, AppState}; use axum::{ @@ -21,9 +17,6 @@ use axum_extra::extract::{CookieJar, cookie::Cookie}; use axum_extra::headers::UserAgent; use chrono::{DateTime, Utc}; use cookie::time::Duration; -use firestore::{FirestoreResult, path_camel_case}; -use futures::TryStreamExt; -use futures::stream::BoxStream; use otp::{ObjectId, Operation, RevId}; use reqwest::StatusCode; use serde::{Deserialize, Serialize}; @@ -57,6 +50,22 @@ pub struct LookupObjectResponse { pub content: Value, } +impl LookupObjectResponse { + pub async fn build(state: &AppState, gym: &String, id: ObjectId) -> Result { + let obj = Object::lookup(state, gym, &id).await?; + let snapshot = Snapshot::lookup_latest(state, gym, &id.clone()).await?; + + Ok(LookupObjectResponse { + id, + ot_type: obj.object_type, + created_at: obj.created_at, + created_by: obj.created_by, + revision_id: snapshot.revision_id, + content: snapshot.content, + }) + } +} + #[derive(Serialize, Deserialize)] #[serde(rename_all = "camelCase")] struct PatchObjectBody { @@ -93,53 +102,13 @@ struct LookupSessionResponse { obj_id: ObjectId, } -async fn object_type( - state: &AppState, - gym: &String, - object_id: ObjectId, -) -> Result { - let parent_path = state.db.parent_path("gyms", gym)?; - let object_doc: Option = state - .db - .fluent() - .select() - .by_id_in(OBJECTS_COLLECTION) - .parent(&parent_path) - .obj() - .one(&object_id) - .await?; - - if let Some(doc) = object_doc { - let object: Object = doc - .try_into() - .map_err(|e| AppError::Query(format!("lookup_object_type: {e}")))?; - Ok(object.object_type) - } else { - Err(AppError::NotAuthorized()) - } -} - -async fn lookup_boulder( +async fn account_role( state: &AppState, gym: &String, object_id: &ObjectId, -) -> Result { - let parent_path = state.db.parent_path("gyms", gym)?; - let boulder: Option = state - .db - .fluent() - .select() - .by_id_in(BOULDERS_VIEW_COLLECTION) - .parent(&parent_path) - .obj() - .one(&object_id) - .await?; - - if let Some(boulder) = boulder { - Ok(boulder) - } else { - Err(AppError::NotAuthorized()) - } +) -> Result { + let account = AccountsView::with_id(state, gym, object_id.clone()).await?; + Ok(account.role) } pub fn routes() -> Router { @@ -155,29 +124,20 @@ pub fn routes() -> Router { .route("/{gym}/feed", any(feed)) } +// TODO maybe impl FromRequestParts into a state/path context async fn delete_session( State(state): State, Path(gym): Path, jar: CookieJar, ) -> Result { - let parent_path = state.db.parent_path("gyms", gym)?; let session_id = jar .get("session") .ok_or(AppError::NoSession())? .value() .to_owned(); + Session::delete(&state, &gym, &session_id).await?; - state - .db - .fluent() - .delete() - .from(SESSIONS_COLLECTION) - .parent(&parent_path) - .document_id(&session_id) - .execute() - .await?; - - let cookie = Cookie::build(("session", session_id.clone())) + let cookie = Cookie::build(("session", session_id)) .path("/") .max_age(Duration::seconds(0)) .secure(true) @@ -191,23 +151,13 @@ async fn lookup_session( Path(gym): Path, jar: CookieJar, ) -> Result { - let parent_path = state.db.parent_path("gyms", gym)?; // TODO NoSession correct here? let session_id = jar .get("session") .ok_or(AppError::NoSession())? .value() .to_owned(); - let session: Session = state - .db - .fluent() - .select() - .by_id_in(SESSIONS_COLLECTION) - .parent(&parent_path) - .obj() - .one(&session_id) - .await? - .ok_or(AppError::NoSession())?; + let session = Session::lookup(&state, &gym, session_id.clone()).await?; let cookie = Cookie::build(("session", session_id.clone())) .path("/") @@ -230,7 +180,7 @@ async fn new_object( jar: CookieJar, Json(payload): axum::extract::Json, ) -> Result, AppError> { - let session_id = jar.get("session"); + let session_id = jar.get("session").ok_or(AppError::NoSession())?; let created_by = author_from_session(&state, &gym, session_id).await?; // unauthorized users should be able to create accounts @@ -249,10 +199,10 @@ async fn new_object( let ot_type = payload.ot_type; let content = payload.content; // changing this to also add the object to the view - let obj = create_object(&state, &gym, created_by, ot_type.clone(), &content).await?; + let obj = Object::from_value(&state, &gym, created_by, ot_type.clone(), &content).await?; Ok(Json(CreateObjectResponse { - id: obj.id.clone(), + id: obj.id, ot_type, content, })) @@ -263,14 +213,14 @@ async fn lookup_object( Path((gym, id)): Path<(String, String)>, jar: CookieJar, ) -> Result, AppError> { - let response = lookup_object_(&state, &gym, id).await?; + let response = Json(LookupObjectResponse::build(&state, &gym, id).await?); // anyone can lookup boulders if response.ot_type == ObjectType::Boulder { return Ok(response); } - let session_id = jar.get("session"); + let session_id = jar.get("session").ok_or(AppError::NoSession())?; let created_by = author_from_session(&state, &gym, session_id).await?; // otherwise just object the owner owns @@ -293,7 +243,7 @@ async fn patch_object( jar: CookieJar, Json(payload): axum::extract::Json, ) -> Result, AppError> { - let session_id = jar.get("session"); + let session_id = jar.get("session").ok_or(AppError::NoSession())?; let created_by = author_from_session(&state, &gym, session_id).await?; // users cant patch atm @@ -303,8 +253,8 @@ async fn patch_object( return Err(AppError::NotAuthorized()); } - let ot_type = object_type(&state, &gym, id.clone()).await?; - match ot_type { + let object = Object::lookup(&state, &gym, &id).await?; + match object.object_type { ObjectType::Account => { if role == AccountRole::Setter { // only admins can change the role of an Account @@ -324,7 +274,7 @@ async fn patch_object( } } ObjectType::Boulder => { - let boulder = lookup_boulder(&state, &gym, &id).await?; + let boulder = Boulder::lookup(&state, &gym, &id).await?; if boulder.is_draft > 0 { // drafts can be edited by any admin/setter } else { @@ -354,7 +304,6 @@ async fn patch_object( payload.revision_id, created_by, payload.operations, - false, ) .await?; @@ -365,32 +314,7 @@ async fn lookup_patch( State(state): State, Path((gym, id, rev_id)): Path<(String, String, i64)>, ) -> Result, AppError> { - let parent_path = state.db.parent_path("gyms", gym)?; - let patch_stream: BoxStream> = state - .db - .fluent() - .select() - .from(PATCHES_COLLECTION) - .parent(&parent_path) - .filter(|q| { - q.for_all([ - q.field(path_camel_case!(Patch::object_id)).eq(id.clone()), - q.field(path_camel_case!(Patch::revision_id)).eq(rev_id), - ]) - }) - .limit(1) - .obj() - .stream_query_with_errors() - .await?; - - let mut patches: Vec = patch_stream.try_collect().await?; - if patches.len() != 1 { - return Err(AppError::Query(format!( - "lookup_patch found {} patches, expecting only 1", - patches.len() - ))); - } - let patch = patches.pop().unwrap(); + let patch = Patch::lookup(&state, &gym, &id, rev_id).await?; Ok(Json(patch)) } diff --git a/bin/all-o-stasis/src/routes/collection.rs b/bin/all-o-stasis/src/routes/collection.rs index 3efd39b..a0e0924 100644 --- a/bin/all-o-stasis/src/routes/collection.rs +++ b/bin/all-o-stasis/src/routes/collection.rs @@ -5,16 +5,12 @@ use axum::{ extract::{Path, State}, }; use axum_extra::extract::CookieJar; -use firestore::{FirestoreQueryDirection, FirestoreResult, path_camel_case}; -use futures::TryStreamExt; -use futures::stream::BoxStream; use otp::ObjectId; use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; -use crate::session::author_from_session; -use crate::storage::{ACCOUNTS_VIEW_COLLECTION, BOULDERS_VIEW_COLLECTION, lookup_latest_snapshot}; -use crate::types::{Account, AccountRole, Boulder}; +use crate::passport::author_from_session; +use crate::types::{Account, AccountsView, BouldersView, Snapshot}; use crate::{AppError, AppState}; #[derive(Serialize, Deserialize, Debug)] @@ -46,7 +42,7 @@ async fn public_profile( State(state): State, Path((gym, id)): Path<(String, String)>, ) -> Result, AppError> { - let snapshot = lookup_latest_snapshot(&state, &gym, &id).await?; + let snapshot = Snapshot::lookup_latest(&state, &gym, &id).await?; let account: Account = serde_json::from_value(snapshot.content).or(Err( AppError::ParseError("failed to parse object".to_string()), ))?; @@ -71,30 +67,9 @@ async fn active_boulders( State(state): State, Path(gym): Path, ) -> Result>, AppError> { - let parent_path = state.db.parent_path("gyms", gym)?; - let object_stream: BoxStream> = state - .db - .fluent() - .select() - .from(BOULDERS_VIEW_COLLECTION) - .parent(&parent_path) - .filter(|q| { - q.for_all([ - q.field(path_camel_case!(Boulder::removed)).eq(0), - q.field(path_camel_case!(Boulder::is_draft)).eq(0), - ]) - }) - .order_by([( - path_camel_case!(Boulder::set_date), - FirestoreQueryDirection::Descending, - )]) - .obj() - .stream_query_with_errors() - .await?; - - let as_vec: Vec = object_stream.try_collect().await?; + let boulders = BouldersView::active(&state, &gym).await?; Ok(Json( - as_vec + boulders .into_iter() .map(|b| b.id.expect("object in view has no id")) // TODO no panic .collect(), @@ -105,27 +80,7 @@ async fn draft_boulders( State(state): State, Path(gym): Path, ) -> Result>, AppError> { - let parent_path = state.db.parent_path("gyms", gym)?; - // XXX we used to have a separate collection for draft boulders but never used it in the (old) - // code. Here we choose to follow the old implementation and do not add a collection for draft - // boulders. - let object_stream: BoxStream> = state - .db - .fluent() - .select() - .from(BOULDERS_VIEW_COLLECTION) - .parent(&parent_path) - .filter(|q| { - q.for_all([ - q.field(path_camel_case!(Boulder::removed)).eq(0), - q.field(path_camel_case!(Boulder::is_draft)).neq(0), - ]) - }) - .obj() - .stream_query_with_errors() - .await?; - - let as_vec: Vec = object_stream.try_collect().await?; + let as_vec = BouldersView::drafts(&state, &gym).await?; Ok(Json( as_vec .into_iter() @@ -139,26 +94,14 @@ async fn own_boulders( Path(gym): Path, jar: CookieJar, ) -> Result>, AppError> { - let session_id = jar.get("session"); + let session_id = jar.get("session").ok_or(AppError::NoSession())?; let own = author_from_session(&state, &gym, session_id).await?; // TODO not sure if it is okay to return NotAuthorized // if own == ROOT_OBJ_ID { // return Ok(Json(Vec::new())); // } - let parent_path = state.db.parent_path("gyms", gym)?; - let object_stream: BoxStream> = state - .db - .fluent() - .select() - .from(BOULDERS_VIEW_COLLECTION) - .parent(&parent_path) - .filter(|q| q.for_all([q.field(path_camel_case!(Boulder::id)).eq(own.to_owned())])) - .obj() - .stream_query_with_errors() - .await?; - - let as_vec: Vec = object_stream.try_collect().await?; + let as_vec = BouldersView::with_id(&state, &gym, own).await?; Ok(Json( as_vec .into_iter() @@ -171,18 +114,7 @@ async fn accounts( State(state): State, Path(gym): Path, ) -> Result>, AppError> { - let parent_path = state.db.parent_path("gyms", gym)?; - let object_stream: BoxStream> = state - .db - .fluent() - .select() - .from(ACCOUNTS_VIEW_COLLECTION) - .parent(&parent_path) - .obj() - .stream_query_with_errors() - .await?; - - let as_vec: Vec = object_stream.try_collect().await?; + let as_vec = AccountsView::all(&state, &gym).await?; Ok(Json( as_vec .into_iter() @@ -195,23 +127,7 @@ async fn admin_accounts( State(state): State, Path(gym): Path, ) -> Result>, AppError> { - let parent_path = state.db.parent_path("gyms", gym)?; - let object_stream: BoxStream> = state - .db - .fluent() - .select() - .from(ACCOUNTS_VIEW_COLLECTION) - .parent(&parent_path) - .filter(|q| { - q.for_all([q - .field(path_camel_case!(Account::role)) - .neq(AccountRole::User)]) - }) - .obj() - .stream_query_with_errors() - .await?; - - let as_vec: Vec = object_stream.try_collect().await?; + let as_vec = AccountsView::admins(&state, &gym).await?; Ok(Json( as_vec .into_iter() diff --git a/bin/all-o-stasis/src/routes/stats.rs b/bin/all-o-stasis/src/routes/stats.rs index e0e5a3c..ecbb832 100644 --- a/bin/all-o-stasis/src/routes/stats.rs +++ b/bin/all-o-stasis/src/routes/stats.rs @@ -3,13 +3,9 @@ use axum::extract::{Path, State}; use axum::response::Json; use axum::routing::get; use chrono::DateTime; -use firestore::{FirestoreResult, path_camel_case}; -use futures::TryStreamExt; -use futures::stream::BoxStream; use serde::{Deserialize, Serialize}; -use crate::storage::BOULDERS_VIEW_COLLECTION; -use crate::types::Boulder; +use crate::types::BouldersView; use crate::{AppError, AppState}; #[derive(Serialize, Deserialize, Debug)] @@ -35,20 +31,7 @@ async fn stats_boulders( State(state): State, Path(gym): Path, ) -> Result>, AppError> { - let parent_path = state.db.parent_path("gyms", gym)?; - // TODO this is too expensive: we read all records to compute the stats - let object_stream: BoxStream> = state - .db - .fluent() - .select() - .from(BOULDERS_VIEW_COLLECTION) - .parent(&parent_path) - .filter(|q| q.for_all([q.field(path_camel_case!(Boulder::is_draft)).eq(0)])) - .obj() - .stream_query_with_errors() - .await?; - - let as_vec: Vec = object_stream.try_collect().await?; + let as_vec = BouldersView::stats(&state, &gym).await?; let stats: Vec = as_vec .into_iter() .map(|b| BoulderStat { diff --git a/bin/all-o-stasis/src/session.rs b/bin/all-o-stasis/src/session.rs deleted file mode 100644 index 28bfc9f..0000000 --- a/bin/all-o-stasis/src/session.rs +++ /dev/null @@ -1,118 +0,0 @@ -use crate::passport::Session; -use crate::storage::{ACCOUNTS_VIEW_COLLECTION, SESSIONS_COLLECTION}; -use crate::types::{Account, AccountRole}; -use crate::{AppError, AppState}; -use axum_extra::extract::cookie::Cookie; -use otp::ObjectId; - -// pub(crate) async fn author_from_session( -// state: &AppState, -// gym: &String, -// session_id: Option<&Cookie<'static>>, -// ) -> Result, AppError> { -// let session_id = if let Some(session_id) = session_id { -// session_id.value().to_owned() -// } else { -// return Ok(None); -// // FIXME why do we allow this? -// // return Ok(String::from("")); -// }; -// -// let parent_path = state.db.parent_path("gyms", gym)?; -// let session: Option = state -// .db -// .fluent() -// .select() -// .by_id_in(SESSIONS_COLLECTION) -// .parent(&parent_path) -// .obj() -// .one(&session_id) -// .await?; -// -// if let Some(session) = session { -// Ok(Some(session.obj_id)) -// } else { -// Err(AppError::NotAuthorized()) -// } -// } - -pub(crate) async fn author_from_session( - state: &AppState, - gym: &String, - session_id: Option<&Cookie<'static>>, -) -> Result { - let session_id = if let Some(session_id) = session_id { - session_id.value().to_owned() - } else { - // FIXME why do we allow this? - // should be option? - return Ok(String::from("")); - }; - - let parent_path = state.db.parent_path("gyms", gym)?; - let session: Option = state - .db - .fluent() - .select() - .by_id_in(SESSIONS_COLLECTION) - .parent(&parent_path) - .obj() - .one(&session_id) - .await?; - - if let Some(session) = session { - Ok(session.obj_id) - } else { - Err(AppError::NotAuthorized()) - } -} - -pub(crate) async fn account_role( - state: &AppState, - gym: &String, - object_id: &ObjectId, -) -> Result { - let parent_path = state.db.parent_path("gyms", gym)?; - let account: Option = state - .db - .fluent() - .select() - .by_id_in(ACCOUNTS_VIEW_COLLECTION) - .parent(&parent_path) - .obj() - .one(object_id) - .await?; - - if let Some(account) = account { - Ok(account.role) - } else { - Err(AppError::NotAuthorized()) - } -} - -// pub(crate) async fn account_role( -// state: &AppState, -// gym: &String, -// object_id: ObjectId, -// ) -> Result { -// let parent_path = state.db.parent_path("gyms", gym)?; -// if let Some(object_id) = object_id { -// let account: Option = state -// .db -// .fluent() -// .select() -// .by_id_in(ACCOUNTS_VIEW_COLLECTION) -// .parent(&parent_path) -// .obj() -// .one(object_id) -// .await?; -// -// if let Some(account) = account { -// Ok(account.role) -// } else { -// Err(AppError::NotAuthorized()) -// } -// } else { -// return Ok(AccountRole::User); -// } -// } diff --git a/bin/all-o-stasis/src/storage.rs b/bin/all-o-stasis/src/storage.rs index 70300c3..ec4e344 100644 --- a/bin/all-o-stasis/src/storage.rs +++ b/bin/all-o-stasis/src/storage.rs @@ -1,162 +1,37 @@ use crate::{ - passport::Session, - routes::{LookupObjectResponse, PatchObjectResponse}, - types::{Account, Boulder, Object, ObjectDoc, ObjectType, Patch, Snapshot}, - {AppError, AppState}, + AppError, AppState, + routes::PatchObjectResponse, + types::{AccountsView, BouldersView, Object, ObjectType, Patch, Snapshot}, }; use axum::Json; -use firestore::{FirestoreQueryDirection, FirestoreResult, path_camel_case}; -use futures::TryStreamExt; -use futures::stream::BoxStream; -use otp::{ObjectId, Operation, RevId, ZERO_REV_ID, rebase}; -use serde_json::{Value, from_value}; +use otp::{ObjectId, Operation, RevId, rebase}; +use serde_json::Value; -pub const ACCOUNTS_VIEW_COLLECTION: &str = "accounts_view"; -pub const BOULDERS_VIEW_COLLECTION: &str = "boulders_view"; -pub const OBJECTS_COLLECTION: &str = "objects"; -pub const PATCHES_COLLECTION: &str = "patches"; -pub const SESSIONS_COLLECTION: &str = "sessions"; -pub const SNAPSHOTS_COLLECTION: &str = "snapshots"; - -macro_rules! store { - ($state:expr, $gym:expr, $entity:expr, $collection:expr) => {{ - let parent_path = $state.db.parent_path("gyms", $gym)?; - let result = $state - .db - .fluent() - .insert() - .into($collection) - .generate_document_id() - .parent(&parent_path) - .object($entity) - .execute() - .await?; - - match &result { - Some(r) => tracing::debug!("storing: {r}"), - None => tracing::warn!("failed to store: {}", $entity), - } - - result - }}; -} - -// TODO only diff here is that we provide an id and update -pub(crate) async fn save_session( - state: &AppState, - gym: &String, - session: &Session, - session_id: &str, -) -> Result { - let parent_path = state.db.parent_path("gyms", gym)?; - let p: Option = state - .db - .fluent() - .update() - .in_col(SESSIONS_COLLECTION) - .document_id(session_id) - .parent(&parent_path) - .object(session) - .execute() - .await?; - - match p { - Some(p) => { - tracing::debug!("storing session: {p}"); - Ok(p) - } - None => { - tracing::warn!("failed to update session: {session} (no such object exists"); - Err(AppError::NoSession()) - } - } +struct SaveOp { + patch: Patch, + snapshot: Snapshot, } -pub(crate) async fn create_object( +pub(crate) async fn update_view( state: &AppState, gym: &String, - author_id: ObjectId, - object_type: ObjectType, - value: &Value, -) -> Result { - let obj_doc = ObjectDoc::new(object_type); - let obj_doc: Option = store!(state, gym, &obj_doc, OBJECTS_COLLECTION); - let obj_doc = obj_doc.ok_or(AppError::Query( - "create_object: failed to create object".to_string(), - ))?; - - let obj: Object = obj_doc - .try_into() - .map_err(|e| AppError::Query(format!("create_object: {e}")))?; - - let patch = Patch::new(obj.id.clone(), author_id, value); - let patch: Option = store!(state, gym, &patch, PATCHES_COLLECTION); - let _ = patch.ok_or(AppError::Query( - "create_object: failed to store patch".to_string(), - ))?; - - update_view(state, gym, &obj.id, value).await?; - - Ok(obj) + object_id: &ObjectId, + content: &Value, +) -> Result<(), AppError> { + let obj = Object::lookup(state, gym, object_id).await?; + update_view_typed(state, gym, object_id, &obj.object_type, content).await } -pub(crate) async fn update_view( +pub(crate) async fn update_view_typed( state: &AppState, gym: &String, object_id: &ObjectId, + object_type: &ObjectType, content: &Value, ) -> Result<(), AppError> { - let parent_path = state.db.parent_path("gyms", gym)?; - - // lookup object to find out what type it is - let obj: ObjectDoc = state - .db - .fluent() - .select() - .by_id_in(OBJECTS_COLLECTION) - .parent(&parent_path) - .obj() - .one(&object_id) - .await? - .ok_or(AppError::Query(format!( - "update_view: failed to update view for {object_id}" - )))?; - - let obj: Object = obj - .try_into() - .map_err(|e| AppError::Query(format!("update_view: {e}")))?; - - match obj.object_type { - ObjectType::Account => { - let account = from_value::(content.clone()) - .map_err(|e| AppError::ParseError(format!("{e} in: {content}")))?; - - let _: Option = state - .db - .fluent() - .update() - .in_col(ACCOUNTS_VIEW_COLLECTION) - .document_id(object_id.clone()) - .parent(&parent_path) - .object(&account) - .execute() - .await?; - } - ObjectType::Boulder => { - let boulder = from_value::(content.clone()) - .map_err(|e| AppError::ParseError(format!("{e} in: {content}")))?; - - let _: Option = state - .db - .fluent() - .update() - .in_col(BOULDERS_VIEW_COLLECTION) - .document_id(object_id.clone()) - .parent(&parent_path) - .object(&boulder) - .execute() - .await?; - } + match object_type { + ObjectType::Account => AccountsView::store(state, gym, object_id, content).await?, + ObjectType::Boulder => BouldersView::store(state, gym, object_id, content).await?, ObjectType::Passport => { // no view table } @@ -165,379 +40,96 @@ pub(crate) async fn update_view( Ok(()) } -/// generic object lookup in `gym` with `id` -pub(crate) async fn lookup_object_( - state: &AppState, - gym: &String, - id: ObjectId, -) -> Result, AppError> { - let parent_path = state.db.parent_path("gyms", gym)?; - let obj: ObjectDoc = state - .db - .fluent() - .select() - .by_id_in(OBJECTS_COLLECTION) - .parent(&parent_path) - .obj() - .one(&id) - .await? - .ok_or(AppError::Query(format!( - "lookup_object: failed to get object {id}" - )))?; - - let obj: Object = obj - .try_into() - .map_err(|e| AppError::Query(format!("lookup_object: {e}")))?; - - tracing::debug!("looking up last snapshot for obj={id}"); - let snapshot = lookup_latest_snapshot(state, gym, &id.clone()).await?; - - Ok(Json(LookupObjectResponse { - id, - ot_type: obj.object_type, - created_at: obj.created_at, - created_by: obj.created_by, - revision_id: snapshot.revision_id, - content: snapshot.content, - })) -} - -pub(crate) async fn lookup_latest_snapshot( - state: &AppState, - gym: &String, - obj_id: &ObjectId, -) -> Result { - // same as lookup_snapshot but not with upper bound - let parent_path = state.db.parent_path("gyms", gym)?; - let object_stream: BoxStream> = state - .db - .fluent() - .select() - .from(SNAPSHOTS_COLLECTION) - .parent(&parent_path) - .filter(|q| { - q.for_all([ - q.field(path_camel_case!(Snapshot::object_id)).eq(obj_id), - q.field(path_camel_case!(Snapshot::revision_id)) - .greater_than_or_equal(ZERO_REV_ID), - ]) - }) - .limit(1) - .order_by([( - path_camel_case!(Snapshot::revision_id), - FirestoreQueryDirection::Descending, - )]) - .obj() - .stream_query_with_errors() - .await?; - - let snapshots: Vec = object_stream.try_collect().await?; - let latest_snapshot: Snapshot = match snapshots.first() { - Some(snapshot) => { - tracing::debug!("found {snapshot}"); - snapshot.clone() - } - None => { - tracing::debug!("no snapshot found"); - // XXX we could already create the first snapshot on object creation? - let snapshot = Snapshot::new(obj_id.clone()); - let _: Option = store!(state, gym, &snapshot, SNAPSHOTS_COLLECTION); - snapshot - } - }; - - // get all patches which we need to apply on top of the snapshot to - // arrive at the desired revision - let patches = patches_after_revision(state, gym, obj_id, latest_snapshot.revision_id).await?; - - // apply those patches to the snapshot - apply_patches(&latest_snapshot, &patches) -} - -/// get or create a snapshot between low and high (inclusive) -async fn lookup_snapshot_between( - state: &AppState, - gym: &String, - obj_id: &ObjectId, - low: RevId, - high: RevId, -) -> Result { - let parent_path = state.db.parent_path("gyms", gym)?; - let object_stream: BoxStream> = state - .db - .fluent() - .select() - .from(SNAPSHOTS_COLLECTION) - .parent(&parent_path) - .filter(|q| { - q.for_all([ - q.field(path_camel_case!(Snapshot::object_id)).eq(obj_id), - q.field(path_camel_case!(Snapshot::revision_id)) - .greater_than_or_equal(low), - q.field(path_camel_case!(Snapshot::revision_id)) - .less_than_or_equal(high), - ]) - }) - .limit(1) - .order_by([( - path_camel_case!(Snapshot::revision_id), - FirestoreQueryDirection::Descending, - )]) - .obj() - .stream_query_with_errors() - .await?; - - let snapshots: Vec = object_stream.try_collect().await?; - tracing::debug!( - "snapshots ({low} <= s <= {high}): {} snapshots, obj={obj_id}", - snapshots.len(), - ); - match snapshots.first() { - Some(snapshot) => Ok(snapshot.clone()), - None => { - // TODO we could already create the first snapshot on object creation? - // TODO why is initial snapshot rev = -1? - let snapshot = Snapshot::new(obj_id.clone()); - let _: Option = store!(state, gym, &snapshot, SNAPSHOTS_COLLECTION); - Ok(snapshot) - } - } -} - -async fn lookup_snapshot( - state: &AppState, - gym: &String, - obj_id: &ObjectId, - rev_id: RevId, // inclusive -) -> Result { - let latest_snapshot = lookup_snapshot_between(state, gym, obj_id, ZERO_REV_ID, rev_id).await?; - - // get all patches which we need to apply on top of the snapshot to - // arrive at the desired revision - let patches: Vec = - patches_after_revision(state, gym, obj_id, latest_snapshot.revision_id) - .await? - .into_iter() - .filter(|p| p.revision_id <= rev_id) - .collect(); - - // apply those patches to the snapshot - apply_patches(&latest_snapshot, &patches) -} - -async fn patches_after_revision( - state: &AppState, - gym: &String, - obj_id: &ObjectId, - rev_id: RevId, -) -> Result, AppError> { - let parent_path = state.db.parent_path("gyms", gym)?; - let object_stream: BoxStream> = state - .db - .fluent() - .select() - .from(PATCHES_COLLECTION) - .parent(&parent_path) - .filter(|q| { - q.for_all([ - q.field(path_camel_case!(Patch::object_id)).eq(obj_id), - q.field(path_camel_case!(Patch::revision_id)) - .greater_than(rev_id), - ]) - }) - .order_by([( - path_camel_case!(Snapshot::revision_id), - FirestoreQueryDirection::Ascending, - )]) - .obj() - .stream_query_with_errors() - .await?; - - let patches: Vec = object_stream.try_collect().await?; - tracing::debug!( - "patches after rev ({rev_id}): {}, obj = {obj_id}", - patches.len() - ); - Ok(patches) -} - -fn apply_patch_to_snapshot(snapshot: &Snapshot, patch: &Patch) -> Result { - let s = Snapshot { - object_id: snapshot.object_id.to_owned(), - revision_id: patch.revision_id, - content: patch.operation.apply_to(snapshot.content.clone())?, - }; - tracing::debug!("applying patch={patch} to {snapshot} results in snapshot={s}"); - Ok(s) -} - -fn apply_patches(snapshot: &Snapshot, patches: &Vec) -> Result { - let mut s = snapshot.clone(); - for patch in patches { - s = apply_patch_to_snapshot(&s, patch)?; - } - // Ok(patches.iter().fold(snapshot.clone(), |snapshot, patch| { - // apply_patch_to_snapshot(&snapshot, &patch)? - // })) - - Ok(s) -} - pub async fn apply_object_updates( state: &AppState, gym: &String, obj_id: ObjectId, - rev_id: RevId, // TODO this is what? first is 0? + rev_id: RevId, author: ObjectId, operations: Vec, - skip_validation: bool, ) -> Result, AppError> { - // first check that the object exists. We'll need its metadata later - // let id = base_id(&obj_id); - // the 'Snapshot' against which the submitted operations were created // this only contains patches until base_snapshot.revision_id - tracing::debug!("looking up base_snapshot@rev{rev_id}"); - let base_snapshot = lookup_snapshot(state, gym, &obj_id, rev_id).await?; - tracing::debug!("base_snapshot={base_snapshot}"); + let base_snapshot = Snapshot::lookup(state, gym, &obj_id, rev_id).await?; // if there are any patches which the client doesn't know about we need // to let her know - // TODO cant we have patches that are not applied above but are now missing? - let previous_patches = patches_after_revision(state, gym, &obj_id, rev_id).await?; - let latest_snapshot = apply_patches(&base_snapshot, &previous_patches)?; - - let mut patches = Vec::::new(); - for op in operations { - let patch = save_operation( - state, - gym, - obj_id.clone(), - author.clone(), - (base_snapshot.content).clone(), - &latest_snapshot, - &previous_patches, - op, - !skip_validation, - ) - .await; // TODO await all? does not matter that much probably? - - match patch { - Ok(Some(val)) => patches.push(val), - Ok(None) => (), // TODO push nones? - Err(e) => return Err(e), + let previous_patches = Patch::after_revision(state, gym, &obj_id, rev_id).await?; + let latest_snapshot = base_snapshot.apply_patches(&previous_patches)?; + + let (patches, num_processed, snapshot) = { + let mut patches = Vec::::new(); + let mut num_processed = 0; + let mut snapshot = latest_snapshot; + for op in operations { + match save_operation( + state, + gym, + author.clone(), + (base_snapshot.content).clone(), + &snapshot, + previous_patches.iter().chain(patches.iter()), + op, + ) + .await + { + Err(e) => return Err(e), + Ok(saved) => { + num_processed += 1; + if let Some(saved) = saved { + patches.push(saved.patch); + snapshot = saved.snapshot; + } + } + } } - } + (patches, num_processed, snapshot) + }; - // TODO update boulder/account view here? to make queries possible? - // so in Avers we had generic Views that provided this interface - // - // viewObjectTransformer :: obj -> Avers (Maybe a) - // (here this would just be serde trying to parse the Json) - // - // and concrete types implemented this transform to store concrete queriable - // data in the database: - // - // FIXME why using validate here? validation and view update is the same? - // unless novalidate $ do - // FIXME this is the wrong snapshot - we dont return the one with the op applied - // update_boulder_view(state, gym, &latest_snapshot).await?; + update_view(state, gym, &snapshot.object_id, &snapshot.content).await?; Ok(Json(PatchObjectResponse::new( previous_patches, - patches.len(), + num_processed, patches, ))) - - // FIXME async in closure - can we separate this out? we only need async for actually storing - // the patch and snapshot in the database? - // let patches = operations.iter().map(|&op| { - // save_operation( - // &state, - // &gym, - // obj_id.clone(), - // author.clone(), - // (base_snapshot.content).clone(), - // &latest_snapshot, - // previous_patches.clone(), - // op, - // !skip_validation, - // ) - // }); - // - // let concret_patches = patches.await?; - // let ps = concret_patches - // .filter_map(|p| match p { - // Ok(Some(val)) => Some(val), - // Ok(None) => None, - // Err(_e) => None, // Some(Err(e)), FIXME handle err? - // }) - // .collect::>(); } -/// try rebase and then apply the operation to get a new snapshot (or return the old) -#[allow(clippy::too_many_arguments)] +/// Rebase and then apply the operation to the snapshot to get a new snapshot +/// Returns `None` if the rebasing fails or applying the (rebased) operation yields the same +/// snapshot. async fn save_operation( state: &AppState, gym: &String, - object_id: ObjectId, author_id: ObjectId, base_content: Value, snapshot: &Snapshot, - previous_patches: &[Patch], + previous_patches: impl Iterator, op: Operation, - validate: bool, -) -> Result, AppError> { - let new_op = match rebase( - base_content, - op, - previous_patches.iter().map(|p| &p.operation), - ) { - Ok(Some(new_op)) => new_op, +) -> Result, AppError> { + let rebased_op = match rebase(base_content, op, previous_patches.map(|p| &p.operation)) { + Ok(Some(rebased_op)) => rebased_op, Ok(None) => { - tracing::warn!("rebase had a conflicting patch"); + // TODO better error, log op, base_content + tracing::warn!("rebase failed due to a conflict"); return Ok(None); } Err(e) => { tracing::error!("rebase failed with error: {e}"); + // TODO error? or skip? return Ok(None); } }; - // tracing::debug!("save_operation: {snapshot}, op={new_op}"); - // FIXME clone? - let new_content = new_op.apply_to(snapshot.content.to_owned())?; - if new_content == snapshot.content { - tracing::debug!("skipping save operation: content did not change"); - return Ok(None); - } - if validate { - // TODO: validateWithType psObjectType newContent + match snapshot.new_revision(author_id, rebased_op)? { + None => Ok(None), + Some((new_snapshot, patch)) => { + let s = new_snapshot.store(state, gym).await?; + let p = patch.store(state, gym).await?; + Ok(Some(SaveOp { + patch: p, + snapshot: s, + })) + } } - - let rev_id = snapshot.revision_id + 1; - // now we know that the patch can be applied cleanly, so we can store both - let new_snapshot = Snapshot { - object_id: snapshot.object_id.to_owned(), - revision_id: rev_id, - content: new_content, - }; - let s: Option = store!(state, gym, &new_snapshot, SNAPSHOTS_COLLECTION); - s.ok_or(AppError::Query("storing snapshot failed".to_string()))?; - - // FIXME moved to here but we should probably only do that for the final snapshot? - update_view(state, gym, &new_snapshot.object_id, &new_snapshot.content).await?; - - let patch = Patch { - object_id, - revision_id: rev_id, - author_id, - created_at: None, - operation: new_op.to_owned(), - }; - let p: Option = store!(state, gym, &patch, PATCHES_COLLECTION); - p.ok_or(AppError::Query("storing patch failed".to_string()))?; - - // TODO maybe await here? or return futures? - Ok(Some(patch)) } diff --git a/bin/all-o-stasis/src/types.rs b/bin/all-o-stasis/src/types.rs deleted file mode 100644 index 2653eb3..0000000 --- a/bin/all-o-stasis/src/types.rs +++ /dev/null @@ -1,212 +0,0 @@ -use std::fmt; - -use chrono::{DateTime, Utc}; -use otp::{ObjectId, Operation, RevId}; -use serde::{Deserialize, Serialize}; -use serde_json::{Value, json}; - -// TODO implement Arbitrary for types - -#[derive(Serialize, Deserialize, Clone, PartialEq)] -#[serde(rename_all = "camelCase")] -pub enum AccountRole { - User, - Setter, - Admin, -} - -#[derive(Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct Account { - #[serde(alias = "_firestore_id")] - pub id: Option, - // TODO this is not used - remove - pub login: String, - pub role: AccountRole, - pub email: String, - #[serde(with = "firestore::serialize_as_null")] - pub name: Option, -} - -#[derive(Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct Boulder { - #[serde(alias = "_firestore_id")] - pub id: Option, - pub setter: Vec, - pub sector: String, - pub grade: String, - grade_nr: u32, - /// set date as epoch timestamp in millis - pub set_date: usize, - // #[serde(with = "firestore::serialize_as_null")] - // pub removed: Option, - /// removed date as epoch timestamp in millis, 0 means not removed yet - pub removed: usize, - // #[serde(with = "firestore::serialize_as_null")] - // pub is_draft: Option, - pub is_draft: usize, - name: String, - // name: Option, -} - -impl fmt::Display for Boulder { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!( - f, - "Boulder: {}", - serde_json::to_string_pretty(self).expect("serialisation should not fail") - ) - } -} - -impl Boulder { - pub fn in_setter(&self, setter: &ObjectId) -> bool { - self.setter.contains(setter) - } -} - -// Object storage representation - used for Firestore serialization -#[derive(Serialize, Deserialize, Debug)] -#[serde(rename_all = "camelCase")] -pub struct ObjectDoc { - #[serde(alias = "_firestore_id")] - pub id: Option, - #[serde(alias = "_firestore_created")] - pub created_at: Option>, - pub object_type: ObjectType, - pub created_by: ObjectId, - pub deleted: Option, -} - -impl ObjectDoc { - pub fn new(object_type: ObjectType) -> Self { - Self { - id: None, - object_type, - created_at: None, - created_by: otp::ROOT_OBJ_ID.to_owned(), - deleted: None, - } - } -} - -impl fmt::Display for ObjectDoc { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match &self.id { - None => write!(f, "ObjectDoc: no id {}", self.object_type), - Some(id) => write!(f, "ObjectDoc: {id} {}", self.object_type), - } - } -} - -pub struct Object { - pub id: ObjectId, - pub created_at: DateTime, - pub object_type: ObjectType, - pub created_by: ObjectId, - #[allow(dead_code)] - pub deleted: bool, -} - -impl TryFrom for Object { - type Error = &'static str; - - fn try_from(doc: ObjectDoc) -> Result { - Ok(Object { - id: doc.id.ok_or("Object missing id")?, - created_at: doc.created_at.ok_or("Object missing created_at")?, - object_type: doc.object_type, - created_by: doc.created_by, - deleted: doc.deleted.unwrap_or(false), - }) - } -} - -impl fmt::Display for Object { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "Object: {} {}", self.id, self.object_type) - } -} - -// here we fix types to those instead of doing a generic str to type "cast" -#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] -#[serde(rename_all = "camelCase")] -pub enum ObjectType { - Account, - Boulder, - Passport, -} - -impl fmt::Display for ObjectType { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - ObjectType::Account => write!(f, "type=account"), - ObjectType::Boulder => write!(f, "type=boulder"), - ObjectType::Passport => write!(f, "type=passport"), - } - } -} - -#[derive(Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct Patch { - pub object_id: ObjectId, - pub revision_id: RevId, - pub author_id: ObjectId, - #[serde(alias = "_firestore_created")] - pub created_at: Option>, //Option, - pub operation: Operation, -} - -impl fmt::Display for Patch { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!( - f, - "Patch: {}@{} ops={}", - self.object_id, self.revision_id, self.operation - ) - } -} - -impl Patch { - pub fn new(object_id: ObjectId, author_id: String, value: &Value) -> Self { - let op = Operation::new_set(otp::ROOT_PATH.to_owned(), value.to_owned()); - Self { - object_id, - revision_id: otp::ZERO_REV_ID, - author_id, - created_at: None, - operation: op, - } - } -} - -#[derive(Serialize, Deserialize, Clone)] -#[serde(rename_all = "camelCase")] -pub struct Snapshot { - pub object_id: ObjectId, - pub revision_id: RevId, - pub content: Value, -} - -impl Snapshot { - pub fn new(object_id: ObjectId) -> Self { - Self { - object_id, - // FIXME why is this not ZERO_REV_ID? - revision_id: -1, - content: json!({}), - } - } -} - -impl fmt::Display for Snapshot { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!( - f, - "Snapshot: {}@{} content={}", - self.object_id, self.revision_id, self.content - ) - } -} diff --git a/bin/all-o-stasis/src/types/mod.rs b/bin/all-o-stasis/src/types/mod.rs new file mode 100644 index 0000000..dc4dc5a --- /dev/null +++ b/bin/all-o-stasis/src/types/mod.rs @@ -0,0 +1,402 @@ +use std::fmt; + +use firestore::{FirestoreQueryDirection, FirestoreResult, path_camel_case}; +use futures::TryStreamExt; +use futures::stream::BoxStream; +use otp::ObjectId; +use serde::{Deserialize, Serialize}; +use serde_json::{Value, from_value}; + +use crate::{AppError, AppState}; + +pub mod object; +pub mod patch; +pub mod snapshot; + +pub use object::Object; +pub use patch::Patch; +pub use snapshot::Snapshot; + +macro_rules! store { + ($state:expr, $gym:expr, $entity:expr, $collection:expr) => {{ + let parent_path = $state.db.parent_path("gyms", $gym)?; + let result = $state + .db + .fluent() + .insert() + .into($collection) + .generate_document_id() + .parent(&parent_path) + .object($entity) + .execute() + .await?; + + match &result { + Some(r) => tracing::debug!("storing: {r}"), + None => tracing::warn!("failed to store: {}", $entity), + } + + result + }}; +} +pub(crate) use store; + +#[derive(Serialize, Deserialize, Clone, PartialEq)] +#[serde(rename_all = "camelCase")] +pub enum AccountRole { + User, + Setter, + Admin, +} + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +#[serde(rename_all = "camelCase")] +pub enum ObjectType { + Account, + Boulder, + Passport, +} + +impl fmt::Display for ObjectType { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + ObjectType::Account => write!(f, "type=account"), + ObjectType::Boulder => write!(f, "type=boulder"), + ObjectType::Passport => write!(f, "type=passport"), + } + } +} + +#[derive(Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Account { + #[serde(alias = "_firestore_id")] + pub id: Option, + // TODO this is not used - remove + pub login: String, + pub role: AccountRole, + pub email: String, + #[serde(with = "firestore::serialize_as_null")] + pub name: Option, +} + +#[derive(Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Boulder { + #[serde(alias = "_firestore_id")] + pub id: Option, + pub setter: Vec, + pub sector: String, + pub grade: String, + grade_nr: u32, + /// set date as epoch timestamp in millis + pub set_date: usize, + // #[serde(with = "firestore::serialize_as_null")] + // pub removed: Option, + /// removed date as epoch timestamp in millis, 0 means not removed yet + pub removed: usize, + // #[serde(with = "firestore::serialize_as_null")] + // pub is_draft: Option, + pub is_draft: usize, + name: String, + // name: Option, +} + +impl fmt::Display for Boulder { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "Boulder: {}", + serde_json::to_string_pretty(self).expect("serialisation should not fail") + ) + } +} + +impl Boulder { + pub fn in_setter(&self, setter: &ObjectId) -> bool { + self.setter.contains(setter) + } + + pub async fn lookup( + state: &AppState, + gym: &String, + object_id: &ObjectId, + ) -> Result { + let parent_path = state.db.parent_path("gyms", gym)?; + state + .db + .fluent() + .select() + .by_id_in(BouldersView::COLLECTION) + .parent(&parent_path) + .obj() + .one(&object_id) + .await? + .ok_or(AppError::Query(format!( + "lookup_boulder: failed to get boulder {object_id}" + ))) + } +} + +pub struct AccountsView {} + +impl AccountsView { + const COLLECTION: &str = "accounts_view"; + + pub async fn store( + state: &AppState, + gym: &String, + object_id: &ObjectId, + content: &Value, + ) -> Result<(), AppError> { + let parent_path = state.db.parent_path("gyms", gym)?; + let account = from_value::(content.clone()) + .map_err(|e| AppError::ParseError(format!("{e} in: {content}")))?; + + let _: Option = state + .db + .fluent() + .update() + .in_col(Self::COLLECTION) + .document_id(object_id.clone()) + .parent(parent_path) + .object(&account) + .execute() + .await?; + + Ok(()) + } + + pub async fn all(state: &AppState, gym: &String) -> Result, AppError> { + let parent_path = state.db.parent_path("gyms", gym)?; + let object_stream: BoxStream> = state + .db + .fluent() + .select() + .from(Self::COLLECTION) + .parent(&parent_path) + .obj() + .stream_query_with_errors() + .await?; + let as_vec = object_stream.try_collect().await?; + Ok(as_vec) + } + + pub async fn admins(state: &AppState, gym: &String) -> Result, AppError> { + let parent_path = state.db.parent_path("gyms", gym)?; + let object_stream: BoxStream> = state + .db + .fluent() + .select() + .from(AccountsView::COLLECTION) + .parent(&parent_path) + .filter(|q| { + q.for_all([q + .field(path_camel_case!(Account::role)) + .neq(AccountRole::User)]) + }) + .obj() + .stream_query_with_errors() + .await?; + let as_vec = object_stream.try_collect().await?; + Ok(as_vec) + } + + pub async fn with_email( + state: &AppState, + gym: String, + email: String, + ) -> Result, AppError> { + let parent_path = state.db.parent_path("gyms", gym.clone())?; + + let account_stream: BoxStream> = state + .db + .fluent() + .select() + .from(Self::COLLECTION) + .parent(&parent_path) + .filter(|q| q.for_all([q.field(path_camel_case!(Account::email)).eq(email.clone())])) + .limit(1) + .obj() + .stream_query_with_errors() + .await?; + + let mut accounts: Vec = account_stream.try_collect().await?; + Ok(accounts.pop()) + } + + pub async fn with_id( + state: &AppState, + gym: &String, + object_id: ObjectId, + ) -> Result { + let parent_path = state.db.parent_path("gyms", gym)?; + state + .db + .fluent() + .select() + .by_id_in(Self::COLLECTION) + .parent(&parent_path) + .obj() + .one(object_id.clone()) + .await? + .ok_or(AppError::Query(format!( + "lookup accounts view: failed to get object {object_id}" + ))) + } +} + +pub struct BouldersView {} + +impl BouldersView { + const COLLECTION: &str = "boulders_view"; + + pub async fn store( + state: &AppState, + gym: &String, + object_id: &ObjectId, + content: &Value, + ) -> Result<(), AppError> { + let parent_path = state.db.parent_path("gyms", gym)?; + let boulder = from_value::(content.clone()) + .map_err(|e| AppError::ParseError(format!("{e} in: {content}")))?; + + let _: Option = state + .db + .fluent() + .update() + .in_col(Self::COLLECTION) + .document_id(object_id.clone()) + .parent(parent_path) + .object(&boulder) + .execute() + .await?; + + Ok(()) + } + + pub async fn active(state: &AppState, gym: &String) -> Result, AppError> { + let parent_path = state.db.parent_path("gyms", gym)?; + let object_stream: BoxStream> = state + .db + .fluent() + .select() + .from(Self::COLLECTION) + .parent(&parent_path) + .filter(|q| { + q.for_all([ + q.field(path_camel_case!(Boulder::removed)).eq(0), + q.field(path_camel_case!(Boulder::is_draft)).eq(0), + ]) + }) + .order_by([( + path_camel_case!(Boulder::set_date), + FirestoreQueryDirection::Descending, + )]) + .obj() + .stream_query_with_errors() + .await?; + + let boulders: Vec = object_stream.try_collect().await?; + Ok(boulders) + } + + pub async fn with_id( + state: &AppState, + gym: &String, + object_id: ObjectId, + ) -> Result, AppError> { + let parent_path = state.db.parent_path("gyms", gym)?; + let object_stream: BoxStream> = state + .db + .fluent() + .select() + .from(Self::COLLECTION) + .parent(&parent_path) + .filter(|q| { + q.for_all([q + .field(path_camel_case!(Boulder::id)) + .eq(object_id.to_owned())]) + }) + .obj() + .stream_query_with_errors() + .await?; + + let as_vec: Vec = object_stream.try_collect().await?; + Ok(as_vec) + } + + pub async fn drafts(state: &AppState, gym: &String) -> Result, AppError> { + let parent_path = state.db.parent_path("gyms", gym)?; + // XXX we used to have a separate collection for draft boulders but never used it in the (old) + // code. Here we choose to follow the old implementation and do not add a collection for draft + // boulders. + let object_stream: BoxStream> = state + .db + .fluent() + .select() + .from(Self::COLLECTION) + .parent(&parent_path) + .filter(|q| { + q.for_all([ + q.field(path_camel_case!(Boulder::removed)).eq(0), + q.field(path_camel_case!(Boulder::is_draft)).neq(0), + ]) + }) + .obj() + .stream_query_with_errors() + .await?; + + let as_vec: Vec = object_stream.try_collect().await?; + Ok(as_vec) + } + + pub async fn stats(state: &AppState, gym: &String) -> Result, AppError> { + let parent_path = state.db.parent_path("gyms", gym)?; + // TODO this is too expensive: we read all records to compute the stats + let object_stream: BoxStream> = state + .db + .fluent() + .select() + .from(Self::COLLECTION) + .parent(&parent_path) + .filter(|q| q.for_all([q.field(path_camel_case!(Boulder::is_draft)).eq(0)])) + .obj() + .stream_query_with_errors() + .await?; + + let as_vec: Vec = object_stream.try_collect().await?; + Ok(as_vec) + } + + // pub async fn collect( + // state: &AppState, + // gym: &String, + // removed: Option, + // is_draft: Option, + // ) -> Result, AppError> { + // let parent_path = state.db.parent_path("gyms", gym)?; + // let object_stream: BoxStream> = state + // .db + // .fluent() + // .select() + // .from(Self::COLLECTION) + // .parent(&parent_path) + // .filter(|q| { + // q.for_all( + // [ + // removed.map(|r| q.field(path_camel_case!(Boulder::removed)).eq(r)), + // is_draft.map(|d| q.field(path_camel_case!(Boulder::is_draft)).eq(d)), + // ] + // .into_iter() + // .flatten(), + // ) + // }) + // .obj() + // .stream_query_with_errors() + // .await?; + // + // let as_vec: Vec = object_stream.try_collect().await?; + // Ok(as_vec) + // } +} diff --git a/bin/all-o-stasis/src/types/object.rs b/bin/all-o-stasis/src/types/object.rs new file mode 100644 index 0000000..b24bc95 --- /dev/null +++ b/bin/all-o-stasis/src/types/object.rs @@ -0,0 +1,142 @@ +use std::fmt; + +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +use crate::{ + AppError, AppState, + storage::update_view_typed, + types::{ObjectType, Patch, store}, +}; +use otp::ObjectId; + +// Object storage representation - used for Firestore serialization +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct ObjectDoc { + #[serde(alias = "_firestore_id")] + id: Option, + #[serde(alias = "_firestore_created")] + created_at: Option>, + object_type: ObjectType, + created_by: ObjectId, + deleted: Option, +} + +impl ObjectDoc { + const COLLECTION: &str = "objects"; + + fn new(object_type: ObjectType) -> Self { + Self { + id: None, + object_type, + created_at: None, + created_by: otp::ROOT_OBJ_ID.to_owned(), + deleted: None, + } + } + + async fn lookup(state: &AppState, gym: &String, object_id: ObjectId) -> Result { + let parent_path = state.db.parent_path("gyms", gym)?; + state + .db + .fluent() + .select() + .by_id_in(ObjectDoc::COLLECTION) + .parent(&parent_path) + .obj() + .one(&object_id) + .await? + .ok_or(AppError::Query(format!( + "lookup_object: failed to get object {object_id}" + ))) + } + + pub async fn store(&self, state: &AppState, gym: &String) -> Result { + let s: Option = store!(state, gym, self, Self::COLLECTION); + s.ok_or(AppError::Query("storing object failed".to_string())) + } +} + +impl fmt::Display for ObjectDoc { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match &self.id { + None => write!(f, "ObjectDoc: no id {}", self.object_type), + Some(id) => write!(f, "ObjectDoc: {id} {}", self.object_type), + } + } +} + +pub struct Object { + pub id: ObjectId, + pub created_at: DateTime, + pub object_type: ObjectType, + pub created_by: ObjectId, + #[allow(dead_code)] + pub deleted: bool, +} + +impl TryFrom for Object { + type Error = AppError; + + fn try_from(doc: ObjectDoc) -> Result { + Ok(Object { + id: doc + .id + .ok_or(AppError::Query("object doc is missing an id".to_string()))?, + created_at: doc.created_at.ok_or(AppError::Query( + "object doc is missing created_at".to_string(), + ))?, + object_type: doc.object_type, + created_by: doc.created_by, + deleted: doc.deleted.unwrap_or(false), + }) + } +} + +impl Object { + pub async fn new( + state: &AppState, + gym: &String, + object_type: &ObjectType, + ) -> Result { + let obj_doc = ObjectDoc::new(object_type.clone()) + .store(state, gym) + .await?; + let obj: Object = obj_doc.try_into()?; + Ok(obj) + } + + pub async fn lookup( + state: &AppState, + gym: &String, + object_id: &ObjectId, + ) -> Result { + let obj_doc = ObjectDoc::lookup(state, gym, object_id.clone()).await?; + let obj: Object = obj_doc.try_into()?; + Ok(obj) + } + + pub async fn from_value( + state: &AppState, + gym: &String, + author_id: String, + object_type: ObjectType, + value: &Value, + ) -> Result { + let obj = Object::new(state, gym, &object_type).await?; + let _ = Patch::new(obj.id.clone(), author_id, value) + .store(state, gym) + .await?; + update_view_typed(state, gym, &obj.id, &object_type, value).await?; + + Ok(obj) + } +} + +impl fmt::Display for Object { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Object: {} {}", self.id, self.object_type) + } +} diff --git a/bin/all-o-stasis/src/types/patch.rs b/bin/all-o-stasis/src/types/patch.rs new file mode 100644 index 0000000..10bea89 --- /dev/null +++ b/bin/all-o-stasis/src/types/patch.rs @@ -0,0 +1,197 @@ +use std::collections::hash_map::DefaultHasher; +use std::fmt; +use std::hash::{Hash, Hasher}; +use std::net::SocketAddr; + +use chrono::{DateTime, Utc}; +use firestore::{ + FirestoreDb, FirestoreListener, FirestoreListenerTarget, FirestoreMemListenStateStorage, + ParentPathBuilder, +}; +use firestore::{FirestoreQueryDirection, FirestoreResult, path_camel_case}; +use futures::{TryStreamExt, stream::BoxStream}; +use otp::{ObjectId, Operation, RevId}; +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +use crate::types::store; +use crate::{AppError, AppState, types::Snapshot}; + +fn hash_addr(addr: &SocketAddr) -> u64 { + let mut hasher = DefaultHasher::new(); + // TODO hash addr.ip()? + + match addr { + SocketAddr::V4(v4) => { + v4.ip().octets().hash(&mut hasher); + v4.port().hash(&mut hasher); + } + SocketAddr::V6(v6) => { + v6.ip().octets().hash(&mut hasher); + v6.port().hash(&mut hasher); + } + } + + hasher.finish() +} + +#[derive(Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Patch { + pub object_id: ObjectId, + pub revision_id: RevId, + pub author_id: ObjectId, + #[serde(alias = "_firestore_created")] + pub created_at: Option>, //Option, + pub operation: Operation, +} + +impl fmt::Display for Patch { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "Patch: {}@{} ops={}", + self.object_id, self.revision_id, self.operation + ) + } +} + +impl Patch { + const COLLECTION: &str = "patches"; + + pub fn new(object_id: ObjectId, author_id: String, value: &Value) -> Self { + let op = Operation::new_set(otp::ROOT_PATH.to_owned(), value.to_owned()); + Self { + object_id, + revision_id: otp::ZERO_REV_ID, + author_id, + created_at: None, + operation: op, + } + } + + pub fn new_revision( + revision_id: RevId, + object_id: ObjectId, + author_id: String, + operation: Operation, + ) -> Self { + Self { + object_id, + revision_id, + author_id, + created_at: None, + operation, + } + } + + pub async fn store(&self, state: &AppState, gym: &String) -> Result { + let s: Option = store!(state, gym, self, Self::COLLECTION); + s.ok_or(AppError::Query("storing patch failed".to_string())) + } + + /// lookup a patch with rev_id + pub async fn lookup( + state: &AppState, + gym: &String, + object_id: &ObjectId, + rev_id: RevId, // inclusive + ) -> Result { + let parent_path = state.db.parent_path("gyms", gym)?; + let patch_stream: BoxStream> = state + .db + .fluent() + .select() + .from(Self::COLLECTION) + .parent(&parent_path) + .filter(|q| { + q.for_all([ + q.field(path_camel_case!(Patch::object_id)) + .eq(object_id.clone()), + q.field(path_camel_case!(Patch::revision_id)).eq(rev_id), + ]) + }) + .limit(1) + .obj() + .stream_query_with_errors() + .await?; + + let mut patches: Vec = patch_stream.try_collect().await?; + if patches.len() != 1 { + return Err(AppError::Query(format!( + "lookup_patch found {} patches, expecting only 1", + patches.len() + ))); + } + let patch = patches.pop().unwrap(); + Ok(patch) + } + + /// get all patches for an object with revision id > rev_id + pub async fn after_revision( + state: &AppState, + gym: &String, + obj_id: &ObjectId, + rev_id: RevId, + ) -> Result, AppError> { + let parent_path = state.db.parent_path("gyms", gym)?; + let object_stream: BoxStream> = state + .db + .fluent() + .select() + .from(Patch::COLLECTION) + .parent(&parent_path) + .filter(|q| { + q.for_all([ + q.field(path_camel_case!(Patch::object_id)).eq(obj_id), + q.field(path_camel_case!(Patch::revision_id)) + .greater_than(rev_id), + ]) + }) + .order_by([( + path_camel_case!(Snapshot::revision_id), + FirestoreQueryDirection::Ascending, + )]) + .obj() + .stream_query_with_errors() + .await?; + + let patches: Vec = object_stream.try_collect().await?; + tracing::debug!( + "patches after rev ({rev_id}): {}, obj = {obj_id}", + patches.len() + ); + Ok(patches) + } + + pub async fn listener( + state: &AppState, + parent_path: &ParentPathBuilder, + who: SocketAddr, + ) -> Option> { + let client_id = hash_addr(&who) as u32; + let listener_id: FirestoreListenerTarget = FirestoreListenerTarget::new(client_id); + tracing::debug!("connection {who} gets firestore listener id: {client_id:?}"); + + // now start streaming patches using firestore listeners: https://github.com/abdolence/firestore-rs/blob/master/examples/listen-changes.rs + let mut listener = match state + .db + .create_listener(FirestoreMemListenStateStorage::new()) + .await + { + Ok(l) => l, + Err(..) => return None, + }; + + let _ = state + .db + .fluent() + .select() + .from(Patch::COLLECTION) + .parent(parent_path) + .listen() + .add_target(listener_id, &mut listener); + + Some(listener) + } +} diff --git a/bin/all-o-stasis/src/types/snapshot.rs b/bin/all-o-stasis/src/types/snapshot.rs new file mode 100644 index 0000000..af5e2fb --- /dev/null +++ b/bin/all-o-stasis/src/types/snapshot.rs @@ -0,0 +1,190 @@ +use std::fmt; + +use firestore::{FirestoreQueryDirection, FirestoreResult, path_camel_case}; +use futures::{TryStreamExt, stream::BoxStream}; +use otp::{ObjectId, Operation, OtError, RevId, ZERO_REV_ID}; +use serde::{Deserialize, Serialize}; +use serde_json::{Value, json}; + +use crate::{ + AppError, AppState, + types::{patch::Patch, store}, +}; + +#[derive(Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct Snapshot { + pub object_id: ObjectId, + pub revision_id: RevId, + pub content: Value, +} + +impl fmt::Display for Snapshot { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "Snapshot: {}@{} content={}", + self.object_id, self.revision_id, self.content + ) + } +} + +impl Snapshot { + const COLLECTION: &str = "snapshots"; + + // TODO why is this not ZERO_REV_ID? + /// create a new empty snapshot with revision id -1 + pub fn new(object_id: ObjectId) -> Self { + Self { + object_id, + revision_id: -1, + content: json!({}), + } + } + + /// apply the operation to the snapshot and create a new revision returning the new snapshot + /// and the patch + pub fn new_revision( + &self, + author_id: ObjectId, + operation: Operation, + ) -> Result, OtError> { + let content = operation.apply_to(self.content.to_owned())?; + if content == self.content { + tracing::debug!("skipping save operation: content did not change"); + return Ok(None); + } + + let revision_id = self.revision_id + 1; + let patch = Patch::new_revision(revision_id, self.object_id.clone(), author_id, operation); + Ok(Some(( + Self { + object_id: self.object_id.clone(), + revision_id, + content, + }, + patch, + ))) + } + + fn apply_patch(&self, patch: &Patch) -> Result { + Ok(Self { + object_id: self.object_id.to_owned(), + revision_id: patch.revision_id, + content: patch.operation.apply_to(self.content.clone())?, + }) + } + + /// return a new snapshot with all patches applied + pub fn apply_patches(&self, patches: &Vec) -> Result { + let mut s = self.clone(); + for patch in patches { + s = s.apply_patch(patch)?; + } + + Ok(s) + } + + pub async fn store(&self, state: &AppState, gym: &String) -> Result { + let s: Option = store!(state, gym, self, Self::COLLECTION); + s.ok_or(AppError::Query("storing snapshot failed".to_string())) + } + + /// lookup a snapshot with rev_id or lower and apply patches with revision <= rev_id if necessary + pub async fn lookup( + state: &AppState, + gym: &String, + obj_id: &ObjectId, + rev_id: RevId, // inclusive + ) -> Result { + let latest_snapshot = + Self::lookup_between(state, gym, obj_id, (ZERO_REV_ID, Some(rev_id))).await?; + + // get all patches which we need to apply on top of the snapshot to + // arrive at the desired revision + let patches: Vec = + Patch::after_revision(state, gym, obj_id, latest_snapshot.revision_id) + .await? + .into_iter() + .filter(|p| p.revision_id <= rev_id) + .collect(); + + // apply those patches to the snapshot + latest_snapshot.apply_patches(&patches) + } + + /// get latest available snapshot with object_id or create a new snapshot. apply unapplied + /// patches to get to the latest possible revision. + pub async fn lookup_latest( + state: &AppState, + gym: &String, + object_id: &ObjectId, + ) -> Result { + let latest_snapshot = + Snapshot::lookup_between(state, gym, object_id, (ZERO_REV_ID, None)).await?; + + // get all patches which we need to apply on top of the snapshot to + // arrive at the desired revision + let patches = + Patch::after_revision(state, gym, object_id, latest_snapshot.revision_id).await?; + + // apply those patches to the snapshot + latest_snapshot.apply_patches(&patches) + } + + /// get or create a latest snapshot between low and high (inclusive) + async fn lookup_between( + state: &AppState, + gym: &String, + object_id: &ObjectId, + range: (RevId, Option), + ) -> Result { + let parent_path = state.db.parent_path("gyms", gym)?; + let object_stream: BoxStream> = state + .db + .fluent() + .select() + .from(Self::COLLECTION) + .parent(&parent_path) + .filter(|q| { + q.for_all( + [ + Some(q.field(path_camel_case!(Snapshot::object_id)).eq(object_id)), + Some( + q.field(path_camel_case!(Snapshot::revision_id)) + .greater_than_or_equal(range.0), + ), + range.1.map(|h| { + q.field(path_camel_case!(Snapshot::revision_id)) + .less_than_or_equal(h) + }), + ] + .into_iter() + .flatten(), + ) + }) + .limit(1) + .order_by([( + path_camel_case!(Snapshot::revision_id), + FirestoreQueryDirection::Descending, + )]) + .obj() + .stream_query_with_errors() + .await?; + + let snapshots: Vec = object_stream.try_collect().await?; + tracing::debug!( + "snapshots ({} <= s <= {:?}): {} snapshots, obj={object_id}", + range.0, + range.1, + snapshots.len(), + ); + match snapshots.first() { + Some(snapshot) => Ok(snapshot.clone()), + None => { + // TODO we could already create the first snapshot on object creation? + Ok(Snapshot::new(object_id.clone()).store(state, gym).await?) + } + } + } +} diff --git a/bin/all-o-stasis/src/ws.rs b/bin/all-o-stasis/src/ws.rs index 2d6e436..4dd72a7 100644 --- a/bin/all-o-stasis/src/ws.rs +++ b/bin/all-o-stasis/src/ws.rs @@ -2,22 +2,16 @@ use crate::types::Patch; use axum::body::Bytes; use axum::extract::ws::{Message, Utf8Bytes, WebSocket}; use chrono::{DateTime, Utc}; -use firestore::{ - FirestoreDb, FirestoreListenEvent, FirestoreListener, FirestoreListenerTarget, - FirestoreMemListenStateStorage, ParentPathBuilder, -}; +use firestore::{FirestoreDb, FirestoreListenEvent, ParentPathBuilder}; use futures::stream::{SplitSink, SplitStream}; use futures::{SinkExt, StreamExt, TryStreamExt}; use otp::ObjectId; use serde::Serialize; -use std::collections::hash_map::DefaultHasher; use std::error::Error; -use std::hash::{Hash, Hasher}; use std::net::SocketAddr; use std::sync::Arc; use tokio::sync::{Mutex, mpsc, mpsc::Receiver, mpsc::Sender}; -use crate::storage::PATCHES_COLLECTION; use crate::{AppError, AppState}; #[derive(Serialize)] @@ -28,55 +22,6 @@ struct WsPatchResponse { ot_type: String, } -fn hash_addr(addr: &SocketAddr) -> u64 { - let mut hasher = DefaultHasher::new(); - - match addr { - SocketAddr::V4(v4) => { - v4.ip().octets().hash(&mut hasher); - v4.port().hash(&mut hasher); - } - SocketAddr::V6(v6) => { - v6.ip().octets().hash(&mut hasher); - v6.port().hash(&mut hasher); - } - } - - hasher.finish() -} - -async fn patch_listener( - state: AppState, - parent_path: ParentPathBuilder, - who: SocketAddr, -) -> Option> { - let client_id = hash_addr(&who) as u32; - let listener_id: FirestoreListenerTarget = FirestoreListenerTarget::new(client_id); - tracing::debug!("connection {who} gets firestore listener id: {client_id:?}"); - - // now start streaming patches using firestore listeners: https://github.com/abdolence/firestore-rs/blob/master/examples/listen-changes.rs - // do we have enough mem? - let mut listener = match state - .db - .create_listener(FirestoreMemListenStateStorage::new()) - .await - { - Ok(l) => l, - Err(..) => return None, - }; - - let _ = state - .db - .fluent() - .select() - .from(PATCHES_COLLECTION) - .parent(parent_path) - .listen() - .add_target(listener_id, &mut listener); - - Some(listener) -} - async fn handle_listener_event( event: FirestoreListenEvent, send_tx_patch: Sender, @@ -175,7 +120,7 @@ async fn drain_channel( } Message::Ping(bytes) => Message::Ping(bytes), t => { - tracing::error!("received unexpected message from on ws_send: {t:?}"); + tracing::error!("received unexpected message on ws_send: {t:?}"); continue; } }; @@ -260,7 +205,7 @@ pub(crate) async fn handle_socket( // collect all objects ids the client wants to get notified about changes let subscriptions: Arc>> = Arc::new(Mutex::new(Vec::new())); - let mut listener = match patch_listener(state, parent_path, who).await { + let mut listener = match Patch::listener(&state, &parent_path, who).await { Some(listener) => listener, None => return, };