From 7f561a6e752aeda47ef01bb65300c5bbbadff2ed Mon Sep 17 00:00:00 2001 From: tomos Date: Wed, 4 Mar 2026 15:08:55 +0000 Subject: [PATCH 1/6] refactor: define system-wide Event types and use in homeserver and SDK --- Cargo.lock | 3 +- examples/rust/7-events_stream/main.rs | 15 +- pubky-common/src/events.rs | 174 ++++++++++++ pubky-common/src/lib.rs | 1 + .../persistence/files/events/events_entity.rs | 15 +- .../files/events/events_repository.rs | 80 +----- .../files/events/events_service.rs | 2 +- .../src/persistence/files/events/mod.rs | 5 +- pubky-sdk/bindings/js/Cargo.toml | 3 +- .../bindings/js/src/wrappers/event_stream.rs | 170 ++++++++++-- pubky-sdk/src/actors/event_stream.rs | 261 +++++++++--------- 11 files changed, 480 insertions(+), 249 deletions(-) create mode 100644 pubky-common/src/events.rs diff --git a/Cargo.lock b/Cargo.lock index 200f8928c..70346c038 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4143,6 +4143,7 @@ dependencies = [ name = "pubky-wasm" version = "0.7.0-rc.3" dependencies = [ + "base64 0.22.1", "console_log", "futures-util", "getrandom 0.3.4", @@ -4160,7 +4161,7 @@ dependencies = [ "wasm-bindgen", "wasm-bindgen-futures", "wasm-bindgen-test", - "wasm-streams 0.5.0", + "wasm-streams 0.4.2", "web-sys", ] diff --git a/examples/rust/7-events_stream/main.rs b/examples/rust/7-events_stream/main.rs index 065000e06..b502bd0ef 100644 --- a/examples/rust/7-events_stream/main.rs +++ b/examples/rust/7-events_stream/main.rs @@ -1,7 +1,7 @@ use anyhow::Result; use clap::Parser; use futures_util::StreamExt; -use pubky::{EventCursor, EventStreamBuilder, EventType, Pubky, PublicKey}; +use pubky::{EventCursor, EventStreamBuilder, Pubky, PublicKey}; use std::env; #[derive(Parser, Debug)] @@ -125,15 +125,14 @@ async fn main() -> Result<()> { // Process events while let Some(result) = stream.next().await { let event = result?; + let hash_str = event + .event_type + .content_hash() + .map(|h| h.to_string()) + .unwrap_or("-".into()); println!( "[{}] {} (cursor: {}, hash: {})", - match event.event_type { - EventType::Put => "PUT", - EventType::Delete => "DEL", - }, - event.resource, - event.cursor, - event.content_hash.unwrap_or_else(|| "-".to_string()) + event.event_type, event.resource, event.cursor, hash_str ); } diff --git a/pubky-common/src/events.rs b/pubky-common/src/events.rs new file mode 100644 index 000000000..9d4d15702 --- /dev/null +++ b/pubky-common/src/events.rs @@ -0,0 +1,174 @@ +//! Event types shared across Pubky crates. +//! +//! This module provides unified types for event streaming functionality, +//! used by both the homeserver and SDK. + +use std::fmt::Display; +use std::str::FromStr; + +use crate::crypto::Hash; + +/// Cursor for pagination in event queries. +/// +/// The cursor represents the ID of an event and is used for pagination. +/// It can be parsed from a string representation of an integer. +/// +/// Note: Uses `u64` internally, but Postgres BIGINT is signed (`i64`). +/// sea_query/sqlx binds `u64` values, which works correctly as long as +/// IDs stay within `i64::MAX` (~9.2 quintillion). Since event IDs are +/// auto-incrementing from 1, this is not a practical concern. +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +pub struct EventCursor(u64); + +impl EventCursor { + /// Create a new cursor from an event ID. + #[must_use] + pub fn new(id: u64) -> Self { + Self(id) + } + + /// Get the underlying ID value. + #[must_use] + pub fn id(&self) -> u64 { + self.0 + } +} + +impl Display for EventCursor { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + +impl FromStr for EventCursor { + type Err = std::num::ParseIntError; + + fn from_str(s: &str) -> Result { + Ok(EventCursor(s.parse()?)) + } +} + +impl From for EventCursor { + fn from(id: u64) -> Self { + EventCursor(id) + } +} + +impl TryFrom<&str> for EventCursor { + type Error = std::num::ParseIntError; + + fn try_from(s: &str) -> Result { + s.parse() + } +} + +impl TryFrom for EventCursor { + type Error = std::num::ParseIntError; + + fn try_from(s: String) -> Result { + s.parse() + } +} + +/// Type of event in the event stream. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum EventType { + /// PUT event - resource created or updated, with its content hash. + Put { + /// Blake3 hash of the content. + content_hash: Hash, + }, + /// DELETE event - resource deleted. + Delete, +} + +impl EventType { + /// Get the string representation of the event type. + pub fn as_str(&self) -> &'static str { + match self { + EventType::Put { .. } => "PUT", + EventType::Delete => "DEL", + } + } + + /// Get the content hash if this is a PUT event. + pub fn content_hash(&self) -> Option<&Hash> { + match self { + EventType::Put { content_hash } => Some(content_hash), + EventType::Delete => None, + } + } +} + +impl Display for EventType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.as_str()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn cursor_display_and_from_str() { + let cursor = EventCursor::new(12345); + assert_eq!(cursor.to_string(), "12345"); + + let parsed: EventCursor = "67890".parse().unwrap(); + assert_eq!(parsed.id(), 67890); + + let from_u64: EventCursor = 111u64.into(); + assert_eq!(from_u64.id(), 111); + + let try_from_str = EventCursor::try_from("222").unwrap(); + assert_eq!(try_from_str.id(), 222); + + let try_from_string = EventCursor::try_from("333".to_string()).unwrap(); + assert_eq!(try_from_string.id(), 333); + } + + #[test] + fn cursor_ordering() { + let c1 = EventCursor::new(1); + let c2 = EventCursor::new(2); + let c3 = EventCursor::new(2); + + assert!(c1 < c2); + assert!(c2 > c1); + assert_eq!(c2, c3); + } + + #[test] + fn event_type_display() { + let put = EventType::Put { + content_hash: Hash::from_bytes([0; 32]), + }; + let del = EventType::Delete; + + assert_eq!(put.to_string(), "PUT"); + assert_eq!(del.to_string(), "DEL"); + assert_eq!(put.as_str(), "PUT"); + assert_eq!(del.as_str(), "DEL"); + } + + #[test] + fn event_type_content_hash() { + let hash = Hash::from_bytes([1; 32]); + let put = EventType::Put { + content_hash: hash.clone(), + }; + let del = EventType::Delete; + + assert_eq!(put.content_hash(), Some(&hash)); + assert_eq!(del.content_hash(), None); + } + + #[test] + fn cursor_parse_error() { + assert!("abc".parse::().is_err()); + assert!("".parse::().is_err()); + assert!("-1".parse::().is_err()); + assert!("12.34".parse::().is_err()); + } +} diff --git a/pubky-common/src/lib.rs b/pubky-common/src/lib.rs index 7be1849ce..01595e577 100644 --- a/pubky-common/src/lib.rs +++ b/pubky-common/src/lib.rs @@ -9,6 +9,7 @@ pub mod auth; pub mod capabilities; pub mod constants; pub mod crypto; +pub mod events; mod keys; pub mod namespaces; pub mod recovery_file; diff --git a/pubky-homeserver/src/persistence/files/events/events_entity.rs b/pubky-homeserver/src/persistence/files/events/events_entity.rs index f352276e8..0847f0e49 100644 --- a/pubky-homeserver/src/persistence/files/events/events_entity.rs +++ b/pubky-homeserver/src/persistence/files/events/events_entity.rs @@ -1,16 +1,11 @@ use pubky_common::crypto::Hash; use pubky_common::crypto::PublicKey; +use pubky_common::events::{EventCursor, EventType}; use sea_query::Iden; use sqlx::{postgres::PgRow, FromRow, Row}; use crate::{ - persistence::{ - files::events::{ - events_repository::{EventCursor, EventIden}, - EventType, - }, - sql::user::UserIden, - }, + persistence::{files::events::events_repository::EventIden, sql::user::UserIden}, shared::webdav::{EntryPath, WebDavPath}, }; @@ -39,8 +34,6 @@ impl FromRow<'_, PgRow> for EventEntity { let user_pubkey = PublicKey::try_from_z32(&user_public_key).map_err(|e| sqlx::Error::Decode(e.into()))?; let event_type_str: String = row.try_get(EventIden::Type.to_string().as_str())?; - let user_public_key = - PublicKey::try_from_z32(&user_public_key).map_err(|e| sqlx::Error::Decode(e.into()))?; let path: String = row.try_get(EventIden::Path.to_string().as_str())?; let path = WebDavPath::new(&path).map_err(|e| sqlx::Error::Decode(e.into()))?; let created_at: sqlx::types::chrono::NaiveDateTime = @@ -78,8 +71,8 @@ impl FromRow<'_, PgRow> for EventEntity { id, event_type, user_id, - user_pubkey, - path: EntryPath::new(user_public_key, path), + user_pubkey: user_pubkey.clone(), + path: EntryPath::new(user_pubkey, path), created_at, }) } diff --git a/pubky-homeserver/src/persistence/files/events/events_repository.rs b/pubky-homeserver/src/persistence/files/events/events_repository.rs index a6dbd8741..09fc68cff 100644 --- a/pubky-homeserver/src/persistence/files/events/events_repository.rs +++ b/pubky-homeserver/src/persistence/files/events/events_repository.rs @@ -1,6 +1,4 @@ -use std::{fmt::Display, str::FromStr}; - -use pubky_common::crypto::Hash; +use pubky_common::events::{EventCursor, EventType}; use pubky_common::timestamp::Timestamp; use sea_query::{Expr, Iden, LikeExpr, Order, PostgresQueryBuilder, Query, SimpleExpr}; use sea_query_binder::SqlxBinder; @@ -24,41 +22,6 @@ use crate::{ pub const EVENT_TABLE: &str = "events"; -/// Cursor for pagination in event queries. -/// -/// Note: Uses `u64` internally, but Postgres BIGINT is signed (`i64`). -/// sea_query/sqlx binds `u64` values, which works correctly as long as -/// IDs stay within `i64::MAX` (~9.2 quintillion). Since event IDs are -/// auto-incrementing from 1, this is not a practical concern. -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] -pub struct EventCursor(u64); - -impl EventCursor { - /// Create a new cursor from an event ID - pub fn new(id: u64) -> Self { - Self(id) - } - - /// Get the underlying ID value - pub fn id(&self) -> u64 { - self.0 - } -} - -impl Display for EventCursor { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.0) - } -} - -impl FromStr for EventCursor { - type Err = std::num::ParseIntError; - - fn from_str(s: &str) -> Result { - Ok(EventCursor(s.parse()?)) - } -} - /// Repository that handles all the queries regarding the EventEntity. pub struct EventRepository; @@ -309,37 +272,10 @@ pub enum EventIden { ContentHash, } -#[derive(Debug, PartialEq, Eq, Clone)] -pub enum EventType { - Put { content_hash: Hash }, - Delete, -} - -impl EventType { - pub fn as_str(&self) -> &'static str { - match self { - EventType::Put { .. } => "PUT", - EventType::Delete => "DEL", - } - } - - pub fn content_hash(&self) -> Option<&Hash> { - match self { - EventType::Put { content_hash } => Some(content_hash), - EventType::Delete => None, - } - } -} - -impl Display for EventType { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.as_str()) - } -} #[cfg(test)] mod tests { - use pubky_common::crypto::Keypair; + use pubky_common::crypto::{Hash, Keypair}; use crate::{ persistence::sql::{user::UserRepository, SqlDb}, @@ -355,12 +291,12 @@ mod tests { let db = SqlDb::test().await; let user_pubkey = Keypair::random().public_key(); - // Test create user + // Create user let user = UserRepository::create(&user_pubkey, &mut db.pool().into()) .await .unwrap(); - // Test create session + // Create events for _ in 0..10 { let path = EntryPath::new(user_pubkey.clone(), WebDavPath::new("/test").unwrap()); let _ = EventRepository::create( @@ -375,7 +311,7 @@ mod tests { .unwrap(); } - // Test get session + // Test get events by cursor let events = EventRepository::get_by_cursor( Some(EventCursor::new(5)), Some(4), @@ -399,13 +335,13 @@ mod tests { let db = SqlDb::test().await; let user_pubkey = Keypair::random().public_key(); - // Test create user + // Create user let user = UserRepository::create(&user_pubkey, &mut db.pool().into()) .await .unwrap(); let mut timestamp_events = Vec::new(); - // Test create session + // Create events with specific timestamps for i in 0..10 { let timestamp = Timestamp::now().add(1_000_000 * i); // Add 1s for each event let created_at = timestamp_to_sqlx_datetime(×tamp); @@ -424,7 +360,7 @@ mod tests { timestamp_events.push((timestamp, event.id)); } - // Test get session + // Test legacy cursor parsing for (timestamp, should_be_event_id) in timestamp_events { let cursor = EventRepository::parse_cursor(×tamp.to_string(), &mut db.pool().into()) diff --git a/pubky-homeserver/src/persistence/files/events/events_service.rs b/pubky-homeserver/src/persistence/files/events/events_service.rs index 46b1339a3..4c8af5e10 100644 --- a/pubky-homeserver/src/persistence/files/events/events_service.rs +++ b/pubky-homeserver/src/persistence/files/events/events_service.rs @@ -1,5 +1,5 @@ use crate::persistence::{ - files::events::{events_repository::EventCursor, EventEntity, EventRepository, EventType}, + files::events::{EventCursor, EventEntity, EventRepository, EventType}, sql::UnifiedExecutor, }; use crate::shared::webdav::EntryPath; diff --git a/pubky-homeserver/src/persistence/files/events/mod.rs b/pubky-homeserver/src/persistence/files/events/mod.rs index 87844f8a2..46f43f5c1 100644 --- a/pubky-homeserver/src/persistence/files/events/mod.rs +++ b/pubky-homeserver/src/persistence/files/events/mod.rs @@ -5,5 +5,8 @@ mod events_service; pub use events_entity::EventEntity; pub use events_layer::EventsLayer; -pub use events_repository::{EventCursor, EventIden, EventRepository, EventType, EVENT_TABLE}; +pub use events_repository::{EventIden, EventRepository, EVENT_TABLE}; pub use events_service::{EventsService, MAX_EVENT_STREAM_USERS}; + +// Re-export from pubky_common for convenience +pub use pubky_common::events::{EventCursor, EventType}; diff --git a/pubky-sdk/bindings/js/Cargo.toml b/pubky-sdk/bindings/js/Cargo.toml index ac877c1ef..cec46b087 100644 --- a/pubky-sdk/bindings/js/Cargo.toml +++ b/pubky-sdk/bindings/js/Cargo.toml @@ -58,7 +58,8 @@ getrandom = { version = "0.3.3", features = ["wasm_js"] } reqwest = { workspace = true, features = [ "stream", ] } -wasm-streams = "0.5" +wasm-streams = "0.4" +base64 = "0.22" [dev-dependencies] wasm-bindgen-test = "0.3" diff --git a/pubky-sdk/bindings/js/src/wrappers/event_stream.rs b/pubky-sdk/bindings/js/src/wrappers/event_stream.rs index 3944ac395..ad18b23c6 100644 --- a/pubky-sdk/bindings/js/src/wrappers/event_stream.rs +++ b/pubky-sdk/bindings/js/src/wrappers/event_stream.rs @@ -1,57 +1,176 @@ +use base64::Engine; use wasm_bindgen::prelude::*; use crate::wrappers::resource::PubkyResource; /// Type of event in the event stream. -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum EventType { +/// +/// Use the helper methods to check the event type and access the content hash. +/// +/// @example +/// ```typescript +/// if (event.eventType.isPut()) { +/// console.log("PUT event with hash:", event.eventType.contentHash()); +/// } else if (event.eventType.isDelete()) { +/// console.log("DELETE event"); +/// } +/// ``` +#[wasm_bindgen] +#[derive(Debug, Clone)] +pub struct EventType { + kind: EventKind, + /// Content hash (blake3) in base64 format (only for PUT events). + content_hash: Option, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum EventKind { Put, Delete, } -impl From for EventType { - fn from(value: pubky::EventType) -> Self { +#[wasm_bindgen] +impl EventType { + /// Returns true if this is a PUT event (resource created or updated). + #[wasm_bindgen(js_name = "isPut")] + pub fn is_put(&self) -> bool { + self.kind == EventKind::Put + } + + /// Returns true if this is a DELETE event (resource deleted). + #[wasm_bindgen(js_name = "isDelete")] + pub fn is_delete(&self) -> bool { + self.kind == EventKind::Delete + } + + /// Get the content hash in base64 format. + /// Returns the blake3 hash for PUT events, or undefined for DELETE events. + #[wasm_bindgen(js_name = "contentHash", getter)] + pub fn content_hash(&self) -> Option { + self.content_hash.clone() + } + + /// Get the string representation ("PUT" or "DEL"). + #[wasm_bindgen(js_name = "toString")] + pub fn to_string_js(&self) -> String { + match self.kind { + EventKind::Put => "PUT".to_string(), + EventKind::Delete => "DEL".to_string(), + } + } +} + +impl From<&pubky::EventType> for EventType { + fn from(value: &pubky::EventType) -> Self { match value { - pubky::EventType::Put => EventType::Put, - pubky::EventType::Delete => EventType::Delete, + pubky::EventType::Put { content_hash } => { + let hash_b64 = + base64::engine::general_purpose::STANDARD.encode(content_hash.as_bytes()); + EventType { + kind: EventKind::Put, + content_hash: Some(hash_b64), + } + } + pubky::EventType::Delete => EventType { + kind: EventKind::Delete, + content_hash: None, + }, } } } +/// Cursor for pagination in event queries. +/// +/// The cursor represents the ID of an event and is used for pagination. +/// +/// @example +/// ```typescript +/// // Get cursor from an event +/// const cursor = event.cursor; +/// console.log(cursor.id()); // numeric ID as string +/// console.log(cursor.toString()); // same as id() +/// +/// // Create a cursor for querying +/// const cursor = EventCursor.from("12345"); +/// ``` +#[wasm_bindgen] +#[derive(Debug, Clone)] +pub struct EventCursor(pubky::EventCursor); + +#[wasm_bindgen] +impl EventCursor { + /// Create a cursor from a string representation of the event ID. + #[wasm_bindgen(js_name = "from")] + pub fn parse(id: &str) -> Result { + id.parse::() + .map(EventCursor) + .map_err(|e| JsValue::from_str(&format!("Invalid cursor: {}", e))) + } + + /// Get the event ID as a string. + /// Returns a string to safely handle large numbers in JavaScript. + #[wasm_bindgen] + pub fn id(&self) -> String { + self.0.id().to_string() + } + + /// Get the string representation (same as id()). + #[wasm_bindgen(js_name = "toString")] + pub fn to_string_js(&self) -> String { + self.0.to_string() + } +} + +impl From for EventCursor { + fn from(value: pubky::EventCursor) -> Self { + EventCursor(value) + } +} + +impl EventCursor { + /// Get the inner native cursor (for Rust-side usage). + pub fn into_inner(self) -> pubky::EventCursor { + self.0 + } +} + /// A single event from the event stream. /// /// @example /// ```typescript /// for await (const event of stream) { -/// console.log(event.eventType); // "PUT" or "DEL" +/// // Check event type +/// if (event.eventType.isPut()) { +/// console.log("PUT:", event.resource.toPubkyUrl()); +/// console.log("Hash:", event.eventType.contentHash()); +/// } else { +/// console.log("DEL:", event.resource.toPubkyUrl()); +/// } +/// +/// // Access resource details /// console.log(event.resource.owner.z32()); // User's public key /// console.log(event.resource.path); // "/pub/example.txt" /// console.log(event.resource.toPubkyUrl()); // Full pubky:// URL -/// console.log(event.cursor); // Cursor for pagination +/// console.log(event.cursor.id()); // Cursor for pagination /// } /// ``` #[wasm_bindgen] #[derive(Debug, Clone)] pub struct Event { - /// Type of event (PUT or DEL). + /// Type of event (PUT or DELETE). event_type: EventType, /// The resource that was created, updated, or deleted. resource: PubkyResource, - /// Cursor for pagination (event id as string). - cursor: String, - /// Content hash (blake3) in hex format (only for PUT events with available hash). - content_hash: Option, + /// Cursor for pagination. + cursor: EventCursor, } #[wasm_bindgen] impl Event { - /// Get the event type ("PUT" or "DEL"). + /// Get the event type. #[wasm_bindgen(getter, js_name = "eventType")] - pub fn event_type(&self) -> String { - match self.event_type { - EventType::Put => "PUT".to_string(), - EventType::Delete => "DEL".to_string(), - } + pub fn event_type(&self) -> EventType { + self.event_type.clone() } /// Get the resource that was created, updated, or deleted. @@ -62,24 +181,17 @@ impl Event { /// Get the cursor for pagination. #[wasm_bindgen(getter)] - pub fn cursor(&self) -> String { + pub fn cursor(&self) -> EventCursor { self.cursor.clone() } - - /// Get the content hash (only for PUT events with available hash). - #[wasm_bindgen(getter, js_name = "contentHash")] - pub fn content_hash(&self) -> Option { - self.content_hash.clone() - } } impl From for Event { fn from(value: pubky::Event) -> Self { Event { - event_type: value.event_type.into(), + event_type: EventType::from(&value.event_type), resource: PubkyResource::from(value.resource), - cursor: value.cursor.to_string(), - content_hash: value.content_hash, + cursor: EventCursor::from(value.cursor), } } } diff --git a/pubky-sdk/src/actors/event_stream.rs b/pubky-sdk/src/actors/event_stream.rs index 7c2501816..19cc8a9f0 100644 --- a/pubky-sdk/src/actors/event_stream.rs +++ b/pubky-sdk/src/actors/event_stream.rs @@ -54,107 +54,32 @@ //! # } //! ``` -use std::fmt::Display; use std::pin::Pin; -use std::str::FromStr; use crate::PublicKey; +use base64::Engine; use eventsource_stream::Eventsource; use futures_util::{Stream, StreamExt}; +use pubky_common::crypto::Hash; use reqwest::Method; use url::Url; +pub use pubky_common::events::{EventCursor, EventType}; + use crate::{ Pkdns, PubkyHttpClient, PubkyResource, cross_log, errors::{Error, RequestError, Result}, }; -/// Cursor for pagination in event queries. -/// -/// The cursor represents the ID of an event and is used for pagination. -/// It can be parsed from a string representation of an integer. -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] -pub struct EventCursor(u64); - -impl EventCursor { - /// Create a new cursor from an event ID. - #[must_use] - pub fn new(id: u64) -> Self { - Self(id) - } - - /// Get the underlying ID value. - #[must_use] - pub fn id(&self) -> u64 { - self.0 - } -} - -impl Display for EventCursor { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.0) - } -} - -impl FromStr for EventCursor { - type Err = std::num::ParseIntError; - - fn from_str(s: &str) -> std::result::Result { - Ok(EventCursor(s.parse()?)) - } -} - -impl From for EventCursor { - fn from(id: u64) -> Self { - EventCursor(id) - } -} - -impl TryFrom<&str> for EventCursor { - type Error = std::num::ParseIntError; - - fn try_from(s: &str) -> std::result::Result { - s.parse() - } -} - -impl TryFrom for EventCursor { - type Error = std::num::ParseIntError; - - fn try_from(s: String) -> std::result::Result { - s.parse() - } -} - -/// Type of event in the event stream. -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum EventType { - /// PUT event - resource created or updated. - Put, - /// DELETE event - resource deleted. - Delete, -} - -impl std::fmt::Display for EventType { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - EventType::Put => write!(f, "PUT"), - EventType::Delete => write!(f, "DEL"), - } - } -} - /// A single event from the event stream. #[derive(Debug, Clone)] pub struct Event { - /// Type of event (PUT or DEL). + /// Type of event (PUT with content hash, or DELETE). pub event_type: EventType, /// The resource that was created, updated, or deleted. pub resource: PubkyResource, /// Cursor for pagination (event ID). pub cursor: EventCursor, - /// Content hash (blake3) in hex format (only for PUT events with available hash). - pub content_hash: Option, } /// Builder for creating an event stream subscription. @@ -569,23 +494,13 @@ impl EventStreamBuilder { /// event: PUT /// data: pubky://user_pubkey/pub/example.txt /// data: cursor: 42 -/// data: content_hash: abc123... (optional) +/// data: content_hash: (required for PUT events) /// ``` fn parse_sse_event(sse: &eventsource_stream::Event) -> Result { - let event_type = match sse.event.as_str() { - "PUT" => EventType::Put, - "DEL" => EventType::Delete, - _ => { - return Err(Error::from(RequestError::Validation { - message: format!("Unknown event type: {}", sse.event), - })); - } - }; - // Parse SSE data by prefix let mut path: Option = None; let mut cursor: Option = None; - let mut content_hash: Option = None; + let mut content_hash_base64: Option = None; for (i, line) in sse.data.lines().enumerate() { if let Some(cursor_str) = line.strip_prefix("cursor: ") { @@ -595,7 +510,7 @@ fn parse_sse_event(sse: &eventsource_stream::Event) -> Result { }) })?); } else if let Some(hash) = line.strip_prefix("content_hash: ") { - content_hash = Some(hash.to_string()); + content_hash_base64 = Some(hash.to_string()); } else if i == 0 { // First line without a known prefix is the path path = Some(line.to_string()); @@ -621,14 +536,59 @@ fn parse_sse_event(sse: &eventsource_stream::Event) -> Result { }) })?; + let event_type = match sse.event.as_str() { + "PUT" => { + let content_hash = decode_content_hash(content_hash_base64)?; + EventType::Put { content_hash } + } + "DEL" => EventType::Delete, + other => { + return Err(Error::from(RequestError::Validation { + message: format!("Unknown event type: {other}"), + })); + } + }; + Ok(Event { event_type, resource, cursor, - content_hash, }) } +/// Decode a base64-encoded content hash into a Hash. +/// If the hash is missing or invalid for a PUT event, falls back to zero hash. +fn decode_content_hash(content_hash_base64: Option) -> Result { + match content_hash_base64 { + Some(b64) if !b64.is_empty() => { + let bytes = base64::engine::general_purpose::STANDARD + .decode(&b64) + .map_err(|e| { + Error::from(RequestError::Validation { + message: format!("Invalid content_hash base64 encoding: {e}"), + }) + })?; + + let hash_bytes: [u8; 32] = bytes.try_into().map_err(|bytes: Vec| { + Error::from(RequestError::Validation { + message: format!( + "content_hash must be exactly 32 bytes, got {} bytes", + bytes.len() + ), + }) + })?; + + Ok(Hash::from_bytes(hash_bytes)) + } + _ => { + // Fallback to zero hash for missing/empty content_hash (legacy/error case) + // This matches homeserver's fallback behavior in events_entity.rs + cross_log!(warn, "PUT event missing content_hash. Using zero hash as fallback."); + Ok(Hash::from_bytes([0; 32])) + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -643,19 +603,29 @@ mod tests { } } + /// Helper to create a base64-encoded hash from bytes + fn encode_hash(bytes: [u8; 32]) -> String { + base64::engine::general_purpose::STANDARD.encode(bytes) + } + #[test] fn parse_put_event_with_content_hash() { + let hash_bytes = [1u8; 32]; + let hash_b64 = encode_hash(hash_bytes); let sse = make_sse( "PUT", - "pubky://o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo/pub/example.txt\ncursor: 42\ncontent_hash: abc123def456", + &format!("pubky://o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo/pub/example.txt\ncursor: 42\ncontent_hash: {hash_b64}"), ); let event = parse_sse_event(&sse).unwrap(); - assert_eq!(event.event_type, EventType::Put); + assert!(matches!(event.event_type, EventType::Put { .. })); assert_eq!(event.resource.path.as_str(), "/pub/example.txt"); assert_eq!(event.cursor.id(), 42); - assert_eq!(event.content_hash, Some("abc123def456".to_string())); + assert_eq!( + event.event_type.content_hash(), + Some(&Hash::from_bytes(hash_bytes)) + ); } #[test] @@ -670,35 +640,45 @@ mod tests { assert_eq!(event.event_type, EventType::Delete); assert_eq!(event.resource.path.as_str(), "/pub/deleted.txt"); assert_eq!(event.cursor.id(), 100); - assert_eq!(event.content_hash, None); + assert_eq!(event.event_type.content_hash(), None); } #[test] fn parse_event_with_unknown_prefixed_lines_for_forward_compatibility() { + let hash_bytes = [2u8; 32]; + let hash_b64 = encode_hash(hash_bytes); let sse = make_sse( "PUT", - "pubky://o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo/pub/file.txt\ncursor: 50\nfuture_field: some_value\nanother_future: 123\ncontent_hash: hashvalue", + &format!("pubky://o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo/pub/file.txt\ncursor: 50\nfuture_field: some_value\nanother_future: 123\ncontent_hash: {hash_b64}"), ); let event = parse_sse_event(&sse).unwrap(); - assert_eq!(event.event_type, EventType::Put); + assert!(matches!(event.event_type, EventType::Put { .. })); assert_eq!(event.cursor.id(), 50); - assert_eq!(event.content_hash, Some("hashvalue".to_string())); + assert_eq!( + event.event_type.content_hash(), + Some(&Hash::from_bytes(hash_bytes)) + ); } #[test] fn parse_event_with_lines_in_different_order() { + let hash_bytes = [3u8; 32]; + let hash_b64 = encode_hash(hash_bytes); // cursor before content_hash, both after path let sse = make_sse( "PUT", - "pubky://o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo/pub/test.txt\ncontent_hash: hash123\ncursor: 999", + &format!("pubky://o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo/pub/test.txt\ncontent_hash: {hash_b64}\ncursor: 999"), ); let event = parse_sse_event(&sse).unwrap(); assert_eq!(event.cursor.id(), 999); - assert_eq!(event.content_hash, Some("hash123".to_string())); + assert_eq!( + event.event_type.content_hash(), + Some(&Hash::from_bytes(hash_bytes)) + ); } #[test] @@ -717,7 +697,8 @@ mod tests { #[test] fn error_on_missing_path() { - let sse = make_sse("PUT", "cursor: 42\ncontent_hash: abc"); + let hash_b64 = encode_hash([0u8; 32]); + let sse = make_sse("PUT", &format!("cursor: 42\ncontent_hash: {hash_b64}")); let result = parse_sse_event(&sse); @@ -731,9 +712,10 @@ mod tests { #[test] fn error_on_missing_cursor() { + let hash_b64 = encode_hash([0u8; 32]); let sse = make_sse( "PUT", - "pubky://o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo/pub/file.txt\ncontent_hash: abc", + &format!("pubky://o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo/pub/file.txt\ncontent_hash: {hash_b64}"), ); let result = parse_sse_event(&sse); @@ -783,7 +765,7 @@ mod tests { } #[test] - fn parse_event_with_empty_content_hash() { + fn parse_put_event_with_empty_content_hash_uses_zero_hash() { let sse = make_sse( "PUT", "pubky://o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo/pub/file.txt\ncursor: 1\ncontent_hash: ", @@ -791,44 +773,73 @@ mod tests { let event = parse_sse_event(&sse).unwrap(); - // Empty string after prefix is still captured - assert_eq!(event.content_hash, Some("".to_string())); + // Empty content_hash falls back to zero hash + assert_eq!( + event.event_type.content_hash(), + Some(&Hash::from_bytes([0; 32])) + ); } #[test] - fn parse_event_with_large_cursor() { + fn parse_put_event_without_content_hash_uses_zero_hash() { let sse = make_sse( "PUT", - "pubky://o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo/pub/file.txt\ncursor: 9223372036854775807", + "pubky://o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo/pub/file.txt\ncursor: 1", ); let event = parse_sse_event(&sse).unwrap(); - assert_eq!(event.cursor.id(), 9223372036854775807u64); + // Missing content_hash falls back to zero hash + assert_eq!( + event.event_type.content_hash(), + Some(&Hash::from_bytes([0; 32])) + ); } #[test] - fn cursor_display_and_from_str() { - let cursor = EventCursor::new(12345); - assert_eq!(cursor.to_string(), "12345"); + fn parse_event_with_large_cursor() { + let hash_b64 = encode_hash([0u8; 32]); + let sse = make_sse( + "PUT", + &format!("pubky://o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo/pub/file.txt\ncursor: 9223372036854775807\ncontent_hash: {hash_b64}"), + ); - let parsed: EventCursor = "67890".parse().unwrap(); - assert_eq!(parsed.id(), 67890); + let event = parse_sse_event(&sse).unwrap(); - let from_u64: EventCursor = 111u64.into(); - assert_eq!(from_u64.id(), 111); + assert_eq!(event.cursor.id(), 9223372036854775807u64); + } + + // Note: EventCursor and EventType trait tests are in pubky-common/src/events.rs + // SDK tests focus on SSE parsing behavior specific to the SDK - let try_from_str = EventCursor::try_from("222").unwrap(); - assert_eq!(try_from_str.id(), 222); + #[test] + fn error_on_invalid_base64_content_hash() { + let sse = make_sse( + "PUT", + "pubky://o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo/pub/file.txt\ncursor: 1\ncontent_hash: not-valid-base64!!!", + ); - let try_from_string = EventCursor::try_from("333".to_string()).unwrap(); - assert_eq!(try_from_string.id(), 333); + let result = parse_sse_event(&sse); + + assert!(result.is_err()); + let err = result.unwrap_err().to_string(); + assert!(err.contains("Invalid content_hash"), "Got: {err}"); } #[test] - fn event_type_display() { - assert_eq!(EventType::Put.to_string(), "PUT"); - assert_eq!(EventType::Delete.to_string(), "DEL"); + fn error_on_wrong_length_content_hash() { + // Base64-encode only 16 bytes instead of 32 + let short_hash = base64::engine::general_purpose::STANDARD.encode([1u8; 16]); + let sse = make_sse( + "PUT", + &format!("pubky://o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo/pub/file.txt\ncursor: 1\ncontent_hash: {short_hash}"), + ); + + let result = parse_sse_event(&sse); + + assert!(result.is_err()); + let err = result.unwrap_err().to_string(); + assert!(err.contains("32 bytes"), "Got: {err}"); } // === Builder tests === From 5446f5f003c9ebabd08ccf6c655c45b2c81763e8 Mon Sep 17 00:00:00 2001 From: tomos Date: Thu, 5 Feb 2026 17:49:30 +0700 Subject: [PATCH 2/6] test: formatting and wasm tests --- .../files/events/events_repository.rs | 1 - pubky-sdk/bindings/js/pkg/tests/events.ts | 33 +++++++++---------- pubky-sdk/src/actors/event_stream.rs | 29 ++++++++++++---- 3 files changed, 38 insertions(+), 25 deletions(-) diff --git a/pubky-homeserver/src/persistence/files/events/events_repository.rs b/pubky-homeserver/src/persistence/files/events/events_repository.rs index 09fc68cff..cdd04810f 100644 --- a/pubky-homeserver/src/persistence/files/events/events_repository.rs +++ b/pubky-homeserver/src/persistence/files/events/events_repository.rs @@ -272,7 +272,6 @@ pub enum EventIden { ContentHash, } - #[cfg(test)] mod tests { use pubky_common::crypto::{Hash, Keypair}; diff --git a/pubky-sdk/bindings/js/pkg/tests/events.ts b/pubky-sdk/bindings/js/pkg/tests/events.ts index 1e3fee682..20ff6637c 100644 --- a/pubky-sdk/bindings/js/pkg/tests/events.ts +++ b/pubky-sdk/bindings/js/pkg/tests/events.ts @@ -62,12 +62,12 @@ test("eventStream: comprehensive", async (t) => { t.equal(events1.length, 10, "should receive exactly 10 events"); for (const event of events1) { - t.equal(typeof event.eventType, "string", "event type should be string"); + t.ok(event.eventType, "event should have eventType"); t.ok(event.resource, "event should have a resource"); t.ok(event.resource.path, "resource should have a path"); t.ok(event.cursor, "event should have a cursor"); - t.equal(event.eventType, "PUT", "first 10 events should all be PUT"); - t.ok(event.contentHash, "PUT events should have contentHash"); + t.ok(event.eventType.isPut(), "first 10 events should all be PUT"); + t.ok(event.eventType.contentHash, "PUT events should have contentHash"); } } finally { reader1.releaseLock(); @@ -99,8 +99,8 @@ test("eventStream: comprehensive", async (t) => { "should receive 18 events from /pub/app1/ (15 PUT + 3 DEL)", ); - const putCount = events2.filter((e) => e.eventType === "PUT").length; - const delCount = events2.filter((e) => e.eventType === "DEL").length; + const putCount = events2.filter((e) => e.eventType.isPut()).length; + const delCount = events2.filter((e) => e.eventType.isDelete()).length; t.equal(putCount, 15, "should have 15 PUT events"); t.equal(delCount, 3, "should have 3 DEL events"); @@ -142,7 +142,7 @@ test("eventStream: comprehensive", async (t) => { event.resource.path.includes("/pub/app2/"), `event path should contain /pub/app2/: ${event.resource.path}`, ); - t.equal(event.eventType, "PUT", "app2 events should all be PUT"); + t.ok(event.eventType.isPut(), "app2 events should all be PUT"); } } finally { reader3.releaseLock(); @@ -170,7 +170,7 @@ test("eventStream: comprehensive", async (t) => { } // In reverse order, the 3 DELETE events should come first - const delEvents = events4.filter((e) => e.eventType === "DEL"); + const delEvents = events4.filter((e) => e.eventType.isDelete()); t.ok(delEvents.length >= 3, "should have at least 3 DEL events"); @@ -178,7 +178,7 @@ test("eventStream: comprehensive", async (t) => { t.ok(delEvent.resource, "DEL event should have a resource"); t.ok(delEvent.resource.path, "DEL event resource should have a path"); t.ok(delEvent.cursor, "DEL event should have a cursor"); - t.notOk(delEvent.contentHash, "DEL event should not have contentHash"); + t.notOk(delEvent.eventType.contentHash, "DEL event should not have contentHash"); } } finally { reader4.releaseLock(); @@ -203,8 +203,8 @@ test("eventStream: comprehensive", async (t) => { // In reverse order, cursors should be decreasing if (events5.length > 1) { - const firstCursor = parseInt(events5[0].cursor); - const lastCursor = parseInt(events5[events5.length - 1].cursor); + const firstCursor = parseInt(events5[0].cursor.id()); + const lastCursor = parseInt(events5[events5.length - 1].cursor.id()); t.ok( firstCursor > lastCursor, "reverse order: first cursor should be greater than last cursor", @@ -212,9 +212,8 @@ test("eventStream: comprehensive", async (t) => { } // First events in reverse should be the DELETEs (most recent) - t.equal( - events5[0].eventType, - "DEL", + t.ok( + events5[0].eventType.isDelete(), "reverse order: first event should be DEL (most recent)", ); } finally { @@ -243,7 +242,7 @@ test("eventStream: comprehensive", async (t) => { t.equal(firstBatch.length, 5, "first batch should have 5 events"); // Get next batch using cursor - const lastCursor = firstBatch[firstBatch.length - 1].cursor; + const lastCursor = firstBatch[firstBatch.length - 1].cursor.toString(); const streamP2 = await sdk .eventStream() .addUser(userPk, lastCursor) @@ -298,10 +297,10 @@ test("eventStream: comprehensive", async (t) => { t.equal(events7.length, 3, "should receive 3 photo events"); for (const event of events7) { - t.equal(typeof event.eventType, "string", "eventType should be string"); + t.ok(event.eventType, "event should have eventType"); t.ok(event.resource, "event should have a resource"); t.equal(typeof event.resource.path, "string", "path should be string"); - t.equal(typeof event.cursor, "string", "cursor should be string"); + t.ok(event.cursor, "event should have a cursor"); t.ok(event.resource.path.includes("/pub/photos/"), "should be from photos directory"); } } finally { @@ -428,7 +427,7 @@ test("eventStream: multi-user subscription", async (t) => { reader1.releaseLock(); } - const cursor = batch1[batch1.length - 1].cursor; + const cursor = batch1[batch1.length - 1].cursor.toString(); // Add same user with updated cursor - should work without error const streamBatch2 = await sdk diff --git a/pubky-sdk/src/actors/event_stream.rs b/pubky-sdk/src/actors/event_stream.rs index 19cc8a9f0..f3649a25c 100644 --- a/pubky-sdk/src/actors/event_stream.rs +++ b/pubky-sdk/src/actors/event_stream.rs @@ -583,7 +583,10 @@ fn decode_content_hash(content_hash_base64: Option) -> Result { _ => { // Fallback to zero hash for missing/empty content_hash (legacy/error case) // This matches homeserver's fallback behavior in events_entity.rs - cross_log!(warn, "PUT event missing content_hash. Using zero hash as fallback."); + cross_log!( + warn, + "PUT event missing content_hash. Using zero hash as fallback." + ); Ok(Hash::from_bytes([0; 32])) } } @@ -614,7 +617,9 @@ mod tests { let hash_b64 = encode_hash(hash_bytes); let sse = make_sse( "PUT", - &format!("pubky://o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo/pub/example.txt\ncursor: 42\ncontent_hash: {hash_b64}"), + &format!( + "pubky://o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo/pub/example.txt\ncursor: 42\ncontent_hash: {hash_b64}" + ), ); let event = parse_sse_event(&sse).unwrap(); @@ -649,7 +654,9 @@ mod tests { let hash_b64 = encode_hash(hash_bytes); let sse = make_sse( "PUT", - &format!("pubky://o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo/pub/file.txt\ncursor: 50\nfuture_field: some_value\nanother_future: 123\ncontent_hash: {hash_b64}"), + &format!( + "pubky://o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo/pub/file.txt\ncursor: 50\nfuture_field: some_value\nanother_future: 123\ncontent_hash: {hash_b64}" + ), ); let event = parse_sse_event(&sse).unwrap(); @@ -669,7 +676,9 @@ mod tests { // cursor before content_hash, both after path let sse = make_sse( "PUT", - &format!("pubky://o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo/pub/test.txt\ncontent_hash: {hash_b64}\ncursor: 999"), + &format!( + "pubky://o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo/pub/test.txt\ncontent_hash: {hash_b64}\ncursor: 999" + ), ); let event = parse_sse_event(&sse).unwrap(); @@ -715,7 +724,9 @@ mod tests { let hash_b64 = encode_hash([0u8; 32]); let sse = make_sse( "PUT", - &format!("pubky://o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo/pub/file.txt\ncontent_hash: {hash_b64}"), + &format!( + "pubky://o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo/pub/file.txt\ncontent_hash: {hash_b64}" + ), ); let result = parse_sse_event(&sse); @@ -801,7 +812,9 @@ mod tests { let hash_b64 = encode_hash([0u8; 32]); let sse = make_sse( "PUT", - &format!("pubky://o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo/pub/file.txt\ncursor: 9223372036854775807\ncontent_hash: {hash_b64}"), + &format!( + "pubky://o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo/pub/file.txt\ncursor: 9223372036854775807\ncontent_hash: {hash_b64}" + ), ); let event = parse_sse_event(&sse).unwrap(); @@ -832,7 +845,9 @@ mod tests { let short_hash = base64::engine::general_purpose::STANDARD.encode([1u8; 16]); let sse = make_sse( "PUT", - &format!("pubky://o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo/pub/file.txt\ncursor: 1\ncontent_hash: {short_hash}"), + &format!( + "pubky://o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo/pub/file.txt\ncursor: 1\ncontent_hash: {short_hash}" + ), ); let result = parse_sse_event(&sse); From b9d0d7a651da2e660aeadfa8de5187b5e65ede9e Mon Sep 17 00:00:00 2001 From: tomos Date: Mon, 23 Feb 2026 08:52:25 +0000 Subject: [PATCH 3/6] fix: resolve wasm-stream version mismatch --- pubky-sdk/bindings/js/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pubky-sdk/bindings/js/Cargo.toml b/pubky-sdk/bindings/js/Cargo.toml index cec46b087..1d39f0e33 100644 --- a/pubky-sdk/bindings/js/Cargo.toml +++ b/pubky-sdk/bindings/js/Cargo.toml @@ -58,7 +58,7 @@ getrandom = { version = "0.3.3", features = ["wasm_js"] } reqwest = { workspace = true, features = [ "stream", ] } -wasm-streams = "0.4" +wasm-streams = "0.5" base64 = "0.22" [dev-dependencies] From d1f8d78ae2e5548885d6d9c210308df6e4635a50 Mon Sep 17 00:00:00 2001 From: tomos Date: Mon, 23 Feb 2026 12:17:21 +0000 Subject: [PATCH 4/6] fix: skip Event which is incorrectly missing content hash --- pubky-sdk/src/actors/event_stream.rs | 88 ++++++++++++---------------- 1 file changed, 39 insertions(+), 49 deletions(-) diff --git a/pubky-sdk/src/actors/event_stream.rs b/pubky-sdk/src/actors/event_stream.rs index f3649a25c..083815671 100644 --- a/pubky-sdk/src/actors/event_stream.rs +++ b/pubky-sdk/src/actors/event_stream.rs @@ -430,8 +430,8 @@ impl EventStreamBuilder { Ok(sse_event) => match parse_sse_event(&sse_event) { Ok(event) => Some(Ok(event)), Err(e) => { - cross_log!(warn, "Failed to parse SSE event: {}", e); - Some(Err(e)) + cross_log!(error, "Failed to parse SSE event, skipping: {}", e); + None } }, Err(e) => { @@ -538,7 +538,7 @@ fn parse_sse_event(sse: &eventsource_stream::Event) -> Result { let event_type = match sse.event.as_str() { "PUT" => { - let content_hash = decode_content_hash(content_hash_base64)?; + let content_hash = decode_content_hash(content_hash_base64.as_deref())?; EventType::Put { content_hash } } "DEL" => EventType::Delete, @@ -557,39 +557,33 @@ fn parse_sse_event(sse: &eventsource_stream::Event) -> Result { } /// Decode a base64-encoded content hash into a Hash. -/// If the hash is missing or invalid for a PUT event, falls back to zero hash. -fn decode_content_hash(content_hash_base64: Option) -> Result { - match content_hash_base64 { - Some(b64) if !b64.is_empty() => { - let bytes = base64::engine::general_purpose::STANDARD - .decode(&b64) - .map_err(|e| { - Error::from(RequestError::Validation { - message: format!("Invalid content_hash base64 encoding: {e}"), - }) - })?; - - let hash_bytes: [u8; 32] = bytes.try_into().map_err(|bytes: Vec| { - Error::from(RequestError::Validation { - message: format!( - "content_hash must be exactly 32 bytes, got {} bytes", - bytes.len() - ), - }) - })?; +fn decode_content_hash(content_hash_base64: Option<&str>) -> Result { + let b64 = content_hash_base64 + .filter(|s| !s.is_empty()) + .ok_or_else(|| { + Error::from(RequestError::Validation { + message: "PUT event missing required content_hash".into(), + }) + })?; + + let bytes = base64::engine::general_purpose::STANDARD + .decode(b64) + .map_err(|e| { + Error::from(RequestError::Validation { + message: format!("Invalid content_hash base64 encoding: {e}"), + }) + })?; + + let hash_bytes: [u8; 32] = bytes.try_into().map_err(|bytes: Vec| { + Error::from(RequestError::Validation { + message: format!( + "content_hash must be exactly 32 bytes, got {} bytes", + bytes.len() + ), + }) + })?; - Ok(Hash::from_bytes(hash_bytes)) - } - _ => { - // Fallback to zero hash for missing/empty content_hash (legacy/error case) - // This matches homeserver's fallback behavior in events_entity.rs - cross_log!( - warn, - "PUT event missing content_hash. Using zero hash as fallback." - ); - Ok(Hash::from_bytes([0; 32])) - } - } + Ok(Hash::from_bytes(hash_bytes)) } #[cfg(test)] @@ -776,35 +770,31 @@ mod tests { } #[test] - fn parse_put_event_with_empty_content_hash_uses_zero_hash() { + fn error_on_empty_content_hash() { let sse = make_sse( "PUT", "pubky://o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo/pub/file.txt\ncursor: 1\ncontent_hash: ", ); - let event = parse_sse_event(&sse).unwrap(); + let result = parse_sse_event(&sse); - // Empty content_hash falls back to zero hash - assert_eq!( - event.event_type.content_hash(), - Some(&Hash::from_bytes([0; 32])) - ); + assert!(result.is_err()); + let err = result.unwrap_err().to_string(); + assert!(err.contains("missing required content_hash"), "Got: {err}"); } #[test] - fn parse_put_event_without_content_hash_uses_zero_hash() { + fn error_on_missing_content_hash() { let sse = make_sse( "PUT", "pubky://o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo/pub/file.txt\ncursor: 1", ); - let event = parse_sse_event(&sse).unwrap(); + let result = parse_sse_event(&sse); - // Missing content_hash falls back to zero hash - assert_eq!( - event.event_type.content_hash(), - Some(&Hash::from_bytes([0; 32])) - ); + assert!(result.is_err()); + let err = result.unwrap_err().to_string(); + assert!(err.contains("missing required content_hash"), "Got: {err}"); } #[test] From 0ae4e75d722be555649a25cab22ab5260296a61a Mon Sep 17 00:00:00 2001 From: tomos Date: Thu, 26 Feb 2026 15:18:17 +0000 Subject: [PATCH 5/6] fix: Justify skipping unparseable events --- pubky-sdk/src/actors/event_stream.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pubky-sdk/src/actors/event_stream.rs b/pubky-sdk/src/actors/event_stream.rs index 083815671..fc9fa7128 100644 --- a/pubky-sdk/src/actors/event_stream.rs +++ b/pubky-sdk/src/actors/event_stream.rs @@ -430,6 +430,10 @@ impl EventStreamBuilder { Ok(sse_event) => match parse_sse_event(&sse_event) { Ok(event) => Some(Ok(event)), Err(e) => { + // Skip unparseable events rather than failing the entire stream. + // We don't control what homeservers return, and we shouldn't panic + // on unexpected data. + // This also provides forward compatibility for new event types. cross_log!(error, "Failed to parse SSE event, skipping: {}", e); None } @@ -494,7 +498,7 @@ impl EventStreamBuilder { /// event: PUT /// data: pubky://user_pubkey/pub/example.txt /// data: cursor: 42 -/// data: content_hash: (required for PUT events) +/// data: content_hash: (required for PUT events) /// ``` fn parse_sse_event(sse: &eventsource_stream::Event) -> Result { // Parse SSE data by prefix From 5bbb8e8d4646285ba00ab6b0e149e03bf8b7857d Mon Sep 17 00:00:00 2001 From: tomos Date: Wed, 4 Mar 2026 15:07:16 +0000 Subject: [PATCH 6/6] fix: Simplify exposed JS types --- pubky-sdk/bindings/js/pkg/tests/events.ts | 35 ++-- .../bindings/js/src/wrappers/event_stream.rs | 180 ++++-------------- 2 files changed, 51 insertions(+), 164 deletions(-) diff --git a/pubky-sdk/bindings/js/pkg/tests/events.ts b/pubky-sdk/bindings/js/pkg/tests/events.ts index 20ff6637c..6faa6af18 100644 --- a/pubky-sdk/bindings/js/pkg/tests/events.ts +++ b/pubky-sdk/bindings/js/pkg/tests/events.ts @@ -62,12 +62,12 @@ test("eventStream: comprehensive", async (t) => { t.equal(events1.length, 10, "should receive exactly 10 events"); for (const event of events1) { - t.ok(event.eventType, "event should have eventType"); + t.equal(typeof event.eventType, "string", "event type should be string"); t.ok(event.resource, "event should have a resource"); t.ok(event.resource.path, "resource should have a path"); - t.ok(event.cursor, "event should have a cursor"); - t.ok(event.eventType.isPut(), "first 10 events should all be PUT"); - t.ok(event.eventType.contentHash, "PUT events should have contentHash"); + t.equal(typeof event.cursor, "string", "cursor should be string"); + t.equal(event.eventType, "PUT", "first 10 events should all be PUT"); + t.ok(event.contentHash, "PUT events should have contentHash"); } } finally { reader1.releaseLock(); @@ -99,8 +99,8 @@ test("eventStream: comprehensive", async (t) => { "should receive 18 events from /pub/app1/ (15 PUT + 3 DEL)", ); - const putCount = events2.filter((e) => e.eventType.isPut()).length; - const delCount = events2.filter((e) => e.eventType.isDelete()).length; + const putCount = events2.filter((e) => e.eventType === "PUT").length; + const delCount = events2.filter((e) => e.eventType === "DEL").length; t.equal(putCount, 15, "should have 15 PUT events"); t.equal(delCount, 3, "should have 3 DEL events"); @@ -142,7 +142,7 @@ test("eventStream: comprehensive", async (t) => { event.resource.path.includes("/pub/app2/"), `event path should contain /pub/app2/: ${event.resource.path}`, ); - t.ok(event.eventType.isPut(), "app2 events should all be PUT"); + t.equal(event.eventType, "PUT", "app2 events should all be PUT"); } } finally { reader3.releaseLock(); @@ -170,7 +170,7 @@ test("eventStream: comprehensive", async (t) => { } // In reverse order, the 3 DELETE events should come first - const delEvents = events4.filter((e) => e.eventType.isDelete()); + const delEvents = events4.filter((e) => e.eventType === "DEL"); t.ok(delEvents.length >= 3, "should have at least 3 DEL events"); @@ -178,7 +178,7 @@ test("eventStream: comprehensive", async (t) => { t.ok(delEvent.resource, "DEL event should have a resource"); t.ok(delEvent.resource.path, "DEL event resource should have a path"); t.ok(delEvent.cursor, "DEL event should have a cursor"); - t.notOk(delEvent.eventType.contentHash, "DEL event should not have contentHash"); + t.notOk(delEvent.contentHash, "DEL event should not have contentHash"); } } finally { reader4.releaseLock(); @@ -203,8 +203,8 @@ test("eventStream: comprehensive", async (t) => { // In reverse order, cursors should be decreasing if (events5.length > 1) { - const firstCursor = parseInt(events5[0].cursor.id()); - const lastCursor = parseInt(events5[events5.length - 1].cursor.id()); + const firstCursor = parseInt(events5[0].cursor); + const lastCursor = parseInt(events5[events5.length - 1].cursor); t.ok( firstCursor > lastCursor, "reverse order: first cursor should be greater than last cursor", @@ -212,8 +212,9 @@ test("eventStream: comprehensive", async (t) => { } // First events in reverse should be the DELETEs (most recent) - t.ok( - events5[0].eventType.isDelete(), + t.equal( + events5[0].eventType, + "DEL", "reverse order: first event should be DEL (most recent)", ); } finally { @@ -242,7 +243,7 @@ test("eventStream: comprehensive", async (t) => { t.equal(firstBatch.length, 5, "first batch should have 5 events"); // Get next batch using cursor - const lastCursor = firstBatch[firstBatch.length - 1].cursor.toString(); + const lastCursor = firstBatch[firstBatch.length - 1].cursor; const streamP2 = await sdk .eventStream() .addUser(userPk, lastCursor) @@ -297,10 +298,10 @@ test("eventStream: comprehensive", async (t) => { t.equal(events7.length, 3, "should receive 3 photo events"); for (const event of events7) { - t.ok(event.eventType, "event should have eventType"); + t.equal(typeof event.eventType, "string", "eventType should be string"); t.ok(event.resource, "event should have a resource"); t.equal(typeof event.resource.path, "string", "path should be string"); - t.ok(event.cursor, "event should have a cursor"); + t.equal(typeof event.cursor, "string", "cursor should be string"); t.ok(event.resource.path.includes("/pub/photos/"), "should be from photos directory"); } } finally { @@ -427,7 +428,7 @@ test("eventStream: multi-user subscription", async (t) => { reader1.releaseLock(); } - const cursor = batch1[batch1.length - 1].cursor.toString(); + const cursor = batch1[batch1.length - 1].cursor; // Add same user with updated cursor - should work without error const streamBatch2 = await sdk diff --git a/pubky-sdk/bindings/js/src/wrappers/event_stream.rs b/pubky-sdk/bindings/js/src/wrappers/event_stream.rs index ad18b23c6..bd583a344 100644 --- a/pubky-sdk/bindings/js/src/wrappers/event_stream.rs +++ b/pubky-sdk/bindings/js/src/wrappers/event_stream.rs @@ -3,173 +3,42 @@ use wasm_bindgen::prelude::*; use crate::wrappers::resource::PubkyResource; -/// Type of event in the event stream. -/// -/// Use the helper methods to check the event type and access the content hash. -/// -/// @example -/// ```typescript -/// if (event.eventType.isPut()) { -/// console.log("PUT event with hash:", event.eventType.contentHash()); -/// } else if (event.eventType.isDelete()) { -/// console.log("DELETE event"); -/// } -/// ``` -#[wasm_bindgen] -#[derive(Debug, Clone)] -pub struct EventType { - kind: EventKind, - /// Content hash (blake3) in base64 format (only for PUT events). - content_hash: Option, -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -enum EventKind { - Put, - Delete, -} - -#[wasm_bindgen] -impl EventType { - /// Returns true if this is a PUT event (resource created or updated). - #[wasm_bindgen(js_name = "isPut")] - pub fn is_put(&self) -> bool { - self.kind == EventKind::Put - } - - /// Returns true if this is a DELETE event (resource deleted). - #[wasm_bindgen(js_name = "isDelete")] - pub fn is_delete(&self) -> bool { - self.kind == EventKind::Delete - } - - /// Get the content hash in base64 format. - /// Returns the blake3 hash for PUT events, or undefined for DELETE events. - #[wasm_bindgen(js_name = "contentHash", getter)] - pub fn content_hash(&self) -> Option { - self.content_hash.clone() - } - - /// Get the string representation ("PUT" or "DEL"). - #[wasm_bindgen(js_name = "toString")] - pub fn to_string_js(&self) -> String { - match self.kind { - EventKind::Put => "PUT".to_string(), - EventKind::Delete => "DEL".to_string(), - } - } -} - -impl From<&pubky::EventType> for EventType { - fn from(value: &pubky::EventType) -> Self { - match value { - pubky::EventType::Put { content_hash } => { - let hash_b64 = - base64::engine::general_purpose::STANDARD.encode(content_hash.as_bytes()); - EventType { - kind: EventKind::Put, - content_hash: Some(hash_b64), - } - } - pubky::EventType::Delete => EventType { - kind: EventKind::Delete, - content_hash: None, - }, - } - } -} - -/// Cursor for pagination in event queries. -/// -/// The cursor represents the ID of an event and is used for pagination. -/// -/// @example -/// ```typescript -/// // Get cursor from an event -/// const cursor = event.cursor; -/// console.log(cursor.id()); // numeric ID as string -/// console.log(cursor.toString()); // same as id() -/// -/// // Create a cursor for querying -/// const cursor = EventCursor.from("12345"); -/// ``` -#[wasm_bindgen] -#[derive(Debug, Clone)] -pub struct EventCursor(pubky::EventCursor); - -#[wasm_bindgen] -impl EventCursor { - /// Create a cursor from a string representation of the event ID. - #[wasm_bindgen(js_name = "from")] - pub fn parse(id: &str) -> Result { - id.parse::() - .map(EventCursor) - .map_err(|e| JsValue::from_str(&format!("Invalid cursor: {}", e))) - } - - /// Get the event ID as a string. - /// Returns a string to safely handle large numbers in JavaScript. - #[wasm_bindgen] - pub fn id(&self) -> String { - self.0.id().to_string() - } - - /// Get the string representation (same as id()). - #[wasm_bindgen(js_name = "toString")] - pub fn to_string_js(&self) -> String { - self.0.to_string() - } -} - -impl From for EventCursor { - fn from(value: pubky::EventCursor) -> Self { - EventCursor(value) - } -} - -impl EventCursor { - /// Get the inner native cursor (for Rust-side usage). - pub fn into_inner(self) -> pubky::EventCursor { - self.0 - } -} - /// A single event from the event stream. /// /// @example /// ```typescript /// for await (const event of stream) { -/// // Check event type -/// if (event.eventType.isPut()) { -/// console.log("PUT:", event.resource.toPubkyUrl()); -/// console.log("Hash:", event.eventType.contentHash()); -/// } else { -/// console.log("DEL:", event.resource.toPubkyUrl()); +/// console.log(event.eventType); // "PUT" or "DEL" +/// console.log(event.cursor); // cursor string for pagination +/// +/// if (event.eventType === "PUT") { +/// console.log("Hash:", event.contentHash); /// } /// /// // Access resource details /// console.log(event.resource.owner.z32()); // User's public key /// console.log(event.resource.path); // "/pub/example.txt" /// console.log(event.resource.toPubkyUrl()); // Full pubky:// URL -/// console.log(event.cursor.id()); // Cursor for pagination /// } /// ``` #[wasm_bindgen] #[derive(Debug, Clone)] pub struct Event { - /// Type of event (PUT or DELETE). - event_type: EventType, + /// Type of event ("PUT" or "DEL"). + event_type: String, /// The resource that was created, updated, or deleted. resource: PubkyResource, - /// Cursor for pagination. - cursor: EventCursor, + /// Cursor for pagination (event id as string). + cursor: String, + /// Content hash (blake3) in raw 32-byte base64 format (only for PUT events). + content_hash: Option, } #[wasm_bindgen] impl Event { - /// Get the event type. + /// Get the event type ("PUT" or "DEL"). #[wasm_bindgen(getter, js_name = "eventType")] - pub fn event_type(&self) -> EventType { + pub fn event_type(&self) -> String { self.event_type.clone() } @@ -181,17 +50,34 @@ impl Event { /// Get the cursor for pagination. #[wasm_bindgen(getter)] - pub fn cursor(&self) -> EventCursor { + pub fn cursor(&self) -> String { self.cursor.clone() } + + /// Get the content hash (only for PUT events). + /// Returns the blake3 hash in base64 format, or undefined for DELETE events. + #[wasm_bindgen(getter, js_name = "contentHash")] + pub fn content_hash(&self) -> Option { + self.content_hash.clone() + } } impl From for Event { fn from(value: pubky::Event) -> Self { + let (event_type, content_hash) = match &value.event_type { + pubky::EventType::Put { content_hash } => { + let hash_b64 = + base64::engine::general_purpose::STANDARD.encode(content_hash.as_bytes()); + ("PUT".to_string(), Some(hash_b64)) + } + pubky::EventType::Delete => ("DEL".to_string(), None), + }; + Event { - event_type: EventType::from(&value.event_type), + event_type, resource: PubkyResource::from(value.resource), - cursor: EventCursor::from(value.cursor), + cursor: value.cursor.to_string(), + content_hash, } } }