From 5ceb96d0d527667b10b30a245162c2f806a42e14 Mon Sep 17 00:00:00 2001 From: tomos Date: Wed, 4 Mar 2026 09:35:35 +0000 Subject: [PATCH 1/8] chore: sdk version 0.7.0-rc.1 --- Cargo.lock | 12 ++++++------ e2e/Cargo.toml | 2 +- examples/rust/Cargo.toml | 2 +- pubky-common/Cargo.toml | 2 +- pubky-homeserver/Cargo.toml | 4 ++-- pubky-sdk/Cargo.toml | 6 +++--- pubky-sdk/bindings/js/Cargo.toml | 2 +- pubky-sdk/bindings/js/pkg/package.json | 2 +- pubky-testnet/Cargo.toml | 6 +++--- 9 files changed, 19 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 54b3ee9e6..7756279ec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1467,7 +1467,7 @@ checksum = "d0881ea181b1df73ff77ffaaf9c7544ecc11e82fba9b5f27b262a3c73a332555" [[package]] name = "e2e" -version = "0.6.0" +version = "0.7.0-rc.1" dependencies = [ "bytes", "eventsource-stream", @@ -3969,7 +3969,7 @@ checksum = "33cb294fe86a74cbcf50d4445b37da762029549ebeea341421c7c70370f86cac" [[package]] name = "pubky" -version = "0.6.0" +version = "0.7.0-rc.1" dependencies = [ "base64 0.22.1", "cookie", @@ -3995,7 +3995,7 @@ dependencies = [ [[package]] name = "pubky-common" -version = "0.6.0" +version = "0.7.0-rc.1" dependencies = [ "argon2", "base32", @@ -4016,7 +4016,7 @@ dependencies = [ [[package]] name = "pubky-core-examples" -version = "0.7.0" +version = "0.7.0-rc.1" dependencies = [ "anyhow", "base64 0.22.1", @@ -4035,7 +4035,7 @@ dependencies = [ [[package]] name = "pubky-homeserver" -version = "0.6.0" +version = "0.7.0-rc.1" dependencies = [ "anyhow", "async-dropper", @@ -4139,7 +4139,7 @@ dependencies = [ [[package]] name = "pubky-wasm" -version = "0.6.0" +version = "0.7.0-rc.1" dependencies = [ "console_log", "futures-util", diff --git a/e2e/Cargo.toml b/e2e/Cargo.toml index 3f21dd97a..5a09d6f15 100644 --- a/e2e/Cargo.toml +++ b/e2e/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "e2e" edition.workspace = true -version = "0.6.0" +version = "0.7.0-rc.1" publish = false [dependencies] diff --git a/examples/rust/Cargo.toml b/examples/rust/Cargo.toml index 2767fe423..bd9b02575 100644 --- a/examples/rust/Cargo.toml +++ b/examples/rust/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pubky-core-examples" -version = "0.7.0" +version = "0.7.0-rc.1" edition.workspace = true publish = false diff --git a/pubky-common/Cargo.toml b/pubky-common/Cargo.toml index e2b2727b3..75045de98 100644 --- a/pubky-common/Cargo.toml +++ b/pubky-common/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "pubky-common" description = "Types and struct in common between Pubky client and homeserver" -version = "0.6.0" +version = "0.7.0-rc.1" edition.workspace = true authors.workspace = true license.workspace = true diff --git a/pubky-homeserver/Cargo.toml b/pubky-homeserver/Cargo.toml index a99519a7c..7fdab0e27 100644 --- a/pubky-homeserver/Cargo.toml +++ b/pubky-homeserver/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "pubky-homeserver" description = "Pubky core's homeserver." -version = "0.6.0" +version = "0.7.0-rc.1" edition.workspace = true authors.workspace = true license.workspace = true @@ -33,7 +33,7 @@ hex = "0.4.3" httpdate = "1.0.3" postcard = { version = "1.1.1", features = ["alloc"] } pkarr = { workspace = true, features = ["dht", "lmdb-cache", "tls"] } -pubky-common = { path = "../pubky-common", version = "0.6.0" } +pubky-common = { path = "../pubky-common", version = "0.7.0-rc.1" } serde = { version = "1.0.217", features = ["derive"] } tokio = { workspace = true, features = ["full"] } toml = "0.8.20" diff --git a/pubky-sdk/Cargo.toml b/pubky-sdk/Cargo.toml index 69b418fd7..4aeb5a586 100644 --- a/pubky-sdk/Cargo.toml +++ b/pubky-sdk/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "pubky" description = "Pubky SDK" -version = "0.6.0" +version = "0.7.0-rc.1" edition = "2024" authors = [ "SeverinAlexB ", @@ -28,8 +28,8 @@ default = [] json = ["dep:serde", "reqwest/json"] [dependencies] -pubky-common = { path = "../pubky-common", version = "0.6.0" } -thiserror.workspace = true +pubky-common = { path = "../pubky-common", version = "0.7.0-rc.1" } +thiserror = "2.0.11" eventsource-stream = "0.2.3" url.workspace = true base64 = "0.22.1" diff --git a/pubky-sdk/bindings/js/Cargo.toml b/pubky-sdk/bindings/js/Cargo.toml index 2a5b398f1..144b26012 100644 --- a/pubky-sdk/bindings/js/Cargo.toml +++ b/pubky-sdk/bindings/js/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pubky-wasm" -version = "0.6.0" +version = "0.7.0-rc.1" edition = "2024" description = "Pubky-Core Client WASM bindings" authors = [ diff --git a/pubky-sdk/bindings/js/pkg/package.json b/pubky-sdk/bindings/js/pkg/package.json index e33d5391e..1f6b2fc52 100644 --- a/pubky-sdk/bindings/js/pkg/package.json +++ b/pubky-sdk/bindings/js/pkg/package.json @@ -2,7 +2,7 @@ "name": "@synonymdev/pubky", "type": "module", "description": "Pubky client", - "version": "0.6.0", + "version": "0.7.0-rc.1", "license": "MIT", "repository": { "type": "git", diff --git a/pubky-testnet/Cargo.toml b/pubky-testnet/Cargo.toml index e495c554c..7ea13cffb 100644 --- a/pubky-testnet/Cargo.toml +++ b/pubky-testnet/Cargo.toml @@ -19,9 +19,9 @@ tokio = { workspace = true, features = ["full"] } tracing-subscriber.workspace = true url.workspace = true -pubky = { path = "../pubky-sdk", version = "0.6.0", features = ["json"] } -pubky-common = { path = "../pubky-common", version = "0.6.0" } -pubky-homeserver = { path = "../pubky-homeserver", version = "0.6.0", default-features = false, features = [ +pubky = { path = "../pubky-sdk", version = "0.7.0-rc.1", features = ["json"] } +pubky-common = { path = "../pubky-common", version = "0.7.0-rc.1" } +pubky-homeserver = { path = "../pubky-homeserver", version = "0.7.0-rc.1", default-features = false, features = [ "testing", ] } pubky_test_utils = { path = "../test_utils/pubky_test", version = "0.1.0" } From 92da9bfbd8c710656c27d0de3c2f928677d11e38 Mon Sep 17 00:00:00 2001 From: tomos Date: Mon, 2 Mar 2026 17:08:25 +0000 Subject: [PATCH 2/8] Testnet: expose unseeded constructor (#329) --- pubky-testnet/src/testnet.rs | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/pubky-testnet/src/testnet.rs b/pubky-testnet/src/testnet.rs index 303c0be67..e9ac9e40b 100644 --- a/pubky-testnet/src/testnet.rs +++ b/pubky-testnet/src/testnet.rs @@ -30,9 +30,12 @@ pub struct Testnet { } impl Testnet { - /// Run a new testnet with a local DHT. - pub async fn new() -> Result { - let dht = pkarr::mainline::Testnet::builder(2).build()?; + fn new_inner(seeded: bool) -> Result { + let dht = match seeded { + true => pkarr::mainline::Testnet::new(2)?, + false => pkarr::mainline::Testnet::new_unseeded(2)?, + }; + let testnet = Self { dht, pkarr_relays: vec![], @@ -46,6 +49,16 @@ impl Testnet { Ok(testnet) } + /// Run a new testnet with a (fully-initialized) local DHT. + pub async fn new() -> Result { + Self::new_inner(true) + } + + /// Run a new testnet with a (faster, but partially-initialized) local DHT. + pub async fn new_unseeded() -> Result { + Self::new_inner(false) + } + /// Run a new testnet with a local DHT. /// Pass an optional postgres connection string to use for the homeserver. /// If None, the default test connection string is used. From 01444592de0aecbf4afd5a3dcb469d3ea9562f1d Mon Sep 17 00:00:00 2001 From: tomos <55086152+86667@users.noreply.github.com> Date: Mon, 2 Mar 2026 17:06:39 +0000 Subject: [PATCH 3/8] feat: SDK `EventStreamBuilder.event_stream_for()` and `EventStreamBuilder.add_users()` (#311) * feat: sdk EventStreamBuilder.event_stream_for() * feat: add_users() and deprecations --- examples/rust/7-events_stream/main.rs | 37 +- .../bindings/js/src/actors/event_stream.rs | 72 ++++ pubky-sdk/bindings/js/src/pubky.rs | 75 +++- pubky-sdk/src/actors/event_stream.rs | 332 ++++++++++++++++-- pubky-sdk/src/pubky.rs | 76 +++- 5 files changed, 538 insertions(+), 54 deletions(-) diff --git a/examples/rust/7-events_stream/main.rs b/examples/rust/7-events_stream/main.rs index a26708fa9..065000e06 100644 --- a/examples/rust/7-events_stream/main.rs +++ b/examples/rust/7-events_stream/main.rs @@ -1,11 +1,11 @@ use anyhow::Result; use clap::Parser; use futures_util::StreamExt; -use pubky::{EventCursor, EventType, Pubky, PublicKey}; +use pubky::{EventCursor, EventStreamBuilder, EventType, Pubky, PublicKey}; use std::env; #[derive(Parser, Debug)] -#[command(version, about = "Subscribe to multiple Pubky users' event streams")] +#[command(version, about = "Subscribe to Pubky users' event streams")] struct Cli { /// User public keys (z32 format, 1-50 users) #[arg(required = true)] @@ -72,14 +72,33 @@ async fn main() -> Result<()> { vec![None; args.users.len()] }; - // Build event stream subscription - let mut builder = pubky.event_stream(); + // Parse users and their cursors + let users: Vec<(PublicKey, Option)> = args + .users + .iter() + .zip(cursors.iter()) + .map(|(user_str, cursor)| { + let user_pubkey = PublicKey::try_from(user_str.as_str())?; + Ok((user_pubkey, *cursor)) + }) + .collect::>>()?; - // Add users with their cursors - for (user_str, cursor) in args.users.iter().zip(cursors.iter()) { - let user_pubkey = PublicKey::try_from(user_str.as_str())?; - builder = builder.add_user(&user_pubkey, *cursor)?; - } + // Build event stream subscription using the appropriate constructor + let mut builder: EventStreamBuilder = if users.len() == 1 { + // Single user: use the simple constructor + let (user, cursor) = &users[0]; + pubky.event_stream_for_user(user, *cursor) + } else { + // Multiple users: resolve homeserver from first user, then add all users + let (first_user, _) = &users[0]; + let homeserver = pubky + .get_homeserver_of(first_user) + .await + .ok_or_else(|| anyhow::anyhow!("Could not resolve homeserver for user {first_user}"))?; + + let users_refs: Vec<_> = users.iter().map(|(u, c)| (u, *c)).collect(); + pubky.event_stream_for(&homeserver).add_users(users_refs)? + }; if let Some(limit) = args.limit { builder = builder.limit(limit); diff --git a/pubky-sdk/bindings/js/src/actors/event_stream.rs b/pubky-sdk/bindings/js/src/actors/event_stream.rs index 70c45fa9e..872fb43bd 100644 --- a/pubky-sdk/bindings/js/src/actors/event_stream.rs +++ b/pubky-sdk/bindings/js/src/actors/event_stream.rs @@ -30,6 +30,9 @@ pub struct EventStreamBuilder(pub(crate) pubky::EventStreamBuilder); impl EventStreamBuilder { /// Add a user to the event stream subscription. /// + /// **Deprecated**: Use `eventStreamForUser()` for single-user streams or + /// `addUsers()` for adding multiple users. + /// /// You can add up to 50 users total. /// If the user is already in the subscription, their cursor position will be updated. /// @@ -37,7 +40,9 @@ impl EventStreamBuilder { /// @param {string | null} cursor - Optional cursor position (event ID as string) to start from /// @returns {EventStreamBuilder} - Builder for chaining /// @throws {Error} - If trying to add more than 50 users or if cursor is invalid + /// @deprecated Use `eventStreamForUser()` or `addUsers()` instead #[wasm_bindgen(js_name = "addUser")] + #[allow(deprecated)] pub fn add_user( self, user: &PublicKey, @@ -56,6 +61,73 @@ impl EventStreamBuilder { Ok(EventStreamBuilder(builder)) } + /// Add multiple users to the event stream subscription at once. + /// + /// Each user can have an independent cursor position. If a user already exists, + /// their cursor value is overwritten. + /// + /// @param {Array<[string, string | null]>} users - Array of [z32PublicKey, cursor] tuples + /// @returns {EventStreamBuilder} - Builder for chaining + /// @throws {Error} - If total users would exceed 50 or if any cursor/pubkey is invalid + /// + /// @example + /// ```typescript + /// const users: [string, string | null][] = [ + /// [user1.z32(), null], + /// [user2.z32(), "100"], + /// ]; + /// const stream = await pubky.eventStreamFor(homeserver) + /// .addUsers(users) + /// .live() + /// .subscribe(); + /// ``` + #[wasm_bindgen(js_name = "addUsers")] + pub fn add_users(self, users: js_sys::Array) -> Result { + // Parse all users first + let mut parsed_users: Vec<(pubky::PublicKey, Option)> = Vec::new(); + + for item in users.iter() { + let tuple = js_sys::Array::from(&item); + if tuple.length() != 2 { + return Err(JsValue::from_str( + "Each user entry must be a [PublicKey, cursor] tuple", + )); + } + + // Parse the public key from z32 string + let user_str = tuple.get(0).as_string().ok_or_else(|| { + JsValue::from_str("First element must be a z32 public key string") + })?; + let user = pubky::PublicKey::try_from(user_str) + .map_err(|e| JsValue::from_str(&format!("Invalid public key: {e}")))?; + + let cursor_val = tuple.get(1); + let event_cursor = if cursor_val.is_null() || cursor_val.is_undefined() { + None + } else { + let cursor_str = cursor_val + .as_string() + .ok_or_else(|| JsValue::from_str("Cursor must be a string or null"))?; + Some( + cursor_str + .parse::() + .map_err(|e| JsValue::from_str(&format!("Invalid cursor: {e}")))?, + ) + }; + + parsed_users.push((user, event_cursor)); + } + + // Use add_users with references + let user_refs: Vec<_> = parsed_users.iter().map(|(u, c)| (u, *c)).collect(); + let builder = self + .0 + .add_users(user_refs) + .map_err(|e| JsValue::from_str(&format!("Failed to add users: {e}")))?; + + Ok(EventStreamBuilder(builder)) + } + /// Set maximum number of events to receive before closing the connection. /// /// If omitted: diff --git a/pubky-sdk/bindings/js/src/pubky.rs b/pubky-sdk/bindings/js/src/pubky.rs index d3cbc8dab..0f42673b9 100644 --- a/pubky-sdk/bindings/js/src/pubky.rs +++ b/pubky-sdk/bindings/js/src/pubky.rs @@ -179,29 +179,78 @@ impl Pubky { /// events on a homeserver `/events-stream` endpoint. Use `.addUser()` to add /// users (up to 50), then call `.subscribe()`. /// - /// IMPORTANT: Only the first User's pubky is used to identify the Homeserver which this code calls. - /// It is the responsibility of the caller to ensure that all Users added are on the same Homeserver. - /// /// @returns {EventStreamBuilder} A builder to add users and configure the event stream + /// @deprecated Use `eventStreamForUser()` for single-user streams or `eventStreamFor()` for multi-user streams + #[wasm_bindgen(js_name = "eventStream")] + #[allow(deprecated)] + pub fn event_stream(&self) -> EventStreamBuilder { + EventStreamBuilder(pubky::EventStreamBuilder::new(self.0.client().clone())) + } + + /// Create an event stream builder for a single user. + /// + /// This is the simplest way to subscribe to events for one user. The homeserver + /// is automatically resolved from the user's Pkarr record. + /// + /// @param {PublicKey} user - The user's public key + /// @param {string | null} cursor - Optional cursor position to start from + /// @returns {EventStreamBuilder} - Builder for configuring and subscribing to the stream /// /// @example /// ```typescript - /// const user1 = PublicKey.from("o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo"); - /// const user2 = PublicKey.from("pxnu33x7jtpx9ar1ytsi4yxbp6a5o36gwhffs8zoxmbuptici1jy"); - /// const stream = await pubky.eventStream() - /// .addUser(user1, null) - /// .addUser(user2, "100") + /// const user = PublicKey.from("o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo"); + /// const stream = await pubky.eventStreamForUser(user, null) /// .live() - /// .limit(100) - /// .path("/pub/") /// .subscribe(); /// /// for await (const event of stream) { /// console.log(`${event.eventType}: ${event.resource.path}`); /// } /// ``` - #[wasm_bindgen(js_name = "eventStream")] - pub fn event_stream(&self) -> EventStreamBuilder { - EventStreamBuilder(pubky::EventStreamBuilder::new(self.0.client().clone())) + #[wasm_bindgen(js_name = "eventStreamForUser")] + pub fn event_stream_for_user( + &self, + user: &PublicKey, + cursor: Option, + ) -> Result { + let event_cursor = cursor + .map(|c| { + c.parse::() + .map_err(|e| JsValue::from_str(&format!("Invalid cursor: {e}"))) + }) + .transpose()?; + Ok(EventStreamBuilder(pubky::EventStreamBuilder::for_user( + self.0.client().clone(), + user.as_inner(), + event_cursor, + ))) + } + + /// Create an event stream builder for a specific homeserver. + /// + /// Use this when you already know the homeserver pubkey. This avoids + /// Pkarr resolution overhead. Obtain a homeserver pubkey via `getHomeserverOf()`. + /// + /// @param {PublicKey} homeserver - The homeserver public key + /// @returns {EventStreamBuilder} - Builder for configuring and subscribing to the stream + /// + /// @example + /// ```typescript + /// const homeserver = await pubky.getHomeserverOf(user1); + /// const stream = await pubky.eventStreamFor(homeserver) + /// .addUsers([[user1.z32(), null], [user2.z32(), null]]) + /// .live() + /// .subscribe(); + /// + /// for await (const event of stream) { + /// console.log(`${event.eventType}: ${event.resource.path}`); + /// } + /// ``` + #[wasm_bindgen(js_name = "eventStreamFor")] + pub fn event_stream_for(&self, homeserver: &PublicKey) -> EventStreamBuilder { + EventStreamBuilder(pubky::EventStreamBuilder::for_homeserver( + self.0.client().clone(), + homeserver.as_inner(), + )) } } diff --git a/pubky-sdk/src/actors/event_stream.rs b/pubky-sdk/src/actors/event_stream.rs index ff916c463..0a12e3143 100644 --- a/pubky-sdk/src/actors/event_stream.rs +++ b/pubky-sdk/src/actors/event_stream.rs @@ -1,12 +1,31 @@ //! Event stream actor for subscribing to multi-user event feeds. //! //! This module provides a builder-style API for subscribing to Server-Sent Events (SSE) -//! from a user's homeserver `/events-stream` endpoint. +//! from a homeserver's `/events-stream` endpoint. //! -//! IMPORTANT: Only the first User's pubky is used to identify the Homeserver which this code calls. -//! It is the responsibility of the caller to ensure that all Users added are on the same Homeserver. +//! # Example: Single user +//! ```no_run +//! use pubky::{Pubky, PublicKey}; +//! use futures_util::StreamExt; +//! +//! # async fn example() -> pubky::Result<()> { +//! let pubky = Pubky::new()?; +//! let user = PublicKey::try_from("o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo").unwrap(); +//! +//! let mut stream = pubky.event_stream_for_user(&user, None) +//! .live() +//! .subscribe() +//! .await?; //! -//! # Example +//! while let Some(result) = stream.next().await { +//! let event = result?; +//! println!("Event: {:?} at {}", event.event_type, event.resource); +//! } +//! # Ok(()) +//! # } +//! ``` +//! +//! # Example: Multiple users on the same homeserver //! ```no_run //! use pubky::{Pubky, PublicKey, EventCursor}; //! use futures_util::StreamExt; @@ -16,9 +35,11 @@ //! let user1 = PublicKey::try_from("o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo").unwrap(); //! let user2 = PublicKey::try_from("pxnu33x7jtpx9ar1ytsi4yxbp6a5o36gwhffs8zoxmbuptici1jy").unwrap(); //! -//! let mut stream = pubky.event_stream() -//! .add_user(&user1, None)? -//! .add_user(&user2, Some(EventCursor::new(100)))? +//! // When subscribing to multiple users, specify the homeserver directly +//! let homeserver = pubky.get_homeserver_of(&user1).await.unwrap(); +//! +//! let mut stream = pubky.event_stream_for(&homeserver) +//! .add_users([(&user1, None), (&user2, Some(EventCursor::new(100)))])? //! .live() //! .limit(100) //! .path("/pub/") @@ -138,11 +159,12 @@ pub struct Event { /// Builder for creating an event stream subscription. /// -/// Construct via [`crate::Pubky::event_stream`]. +/// Construct via [`crate::Pubky::event_stream`] or [`crate::Pubky::event_stream_for`]. #[derive(Clone, Debug)] pub struct EventStreamBuilder { client: PubkyHttpClient, users: Vec<(PublicKey, Option)>, + homeserver: Option, limit: Option, live: bool, reverse: bool, @@ -154,10 +176,100 @@ impl EventStreamBuilder { /// /// Typically called via [`crate::Pubky::event_stream`]. #[must_use] + #[deprecated(since = "0.7.0", note = "Use `for_user` or `for_homeserver` instead")] pub fn new(client: PubkyHttpClient) -> Self { Self { client, users: Vec::new(), + homeserver: None, + limit: None, + live: false, + reverse: false, + path: None, + } + } + + /// Create an event stream builder for a single user. + /// + /// This is the simplest way to subscribe to events for one user. The homeserver + /// is automatically resolved from the user's Pkarr record. + /// + /// # Example + /// ```no_run + /// use pubky::{Pubky, PublicKey, EventCursor}; + /// use futures_util::StreamExt; + /// + /// # async fn example() -> pubky::Result<()> { + /// let pubky = Pubky::new()?; + /// let user = PublicKey::try_from("o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo").unwrap(); + /// + /// let mut stream = pubky.event_stream_for_user(&user, None) + /// .live() + /// .subscribe() + /// .await?; + /// + /// while let Some(result) = stream.next().await { + /// let event = result?; + /// println!("Event: {:?}", event); + /// } + /// # Ok(()) + /// # } + /// ``` + #[must_use] + pub fn for_user( + client: PubkyHttpClient, + user: &PublicKey, + cursor: Option, + ) -> Self { + Self { + client, + users: vec![(user.clone(), cursor)], + homeserver: None, + limit: None, + live: false, + reverse: false, + path: None, + } + } + + /// Create a new event stream builder for a specific homeserver. + /// + /// Use this when you already know the homeserver pubkey, avoiding Pkarr resolution. + /// You can obtain a homeserver pubkey via [`crate::Pubky::get_homeserver_of`]. + /// + /// # Example + /// ```no_run + /// use pubky::{Pubky, PublicKey}; + /// use futures_util::StreamExt; + /// + /// # async fn example() -> pubky::Result<()> { + /// let pubky = Pubky::new()?; + /// let user1 = PublicKey::try_from("o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo").unwrap(); + /// let user2 = PublicKey::try_from("pxnu33x7jtpx9ar1ytsi4yxbp6a5o36gwhffs8zoxmbuptici1jy").unwrap(); + /// + /// // When subscribing to multiple users on the same homeserver, + /// // specify the homeserver directly to avoid redundant Pkarr lookups + /// let homeserver = pubky.get_homeserver_of(&user1).await.unwrap(); + /// + /// let mut stream = pubky.event_stream_for(&homeserver) + /// .add_users([(&user1, None), (&user2, None)])? + /// .live() + /// .subscribe() + /// .await?; + /// + /// while let Some(result) = stream.next().await { + /// let event = result?; + /// println!("Event: {:?}", event); + /// } + /// # Ok(()) + /// # } + /// ``` + #[must_use] + pub fn for_homeserver(client: PubkyHttpClient, homeserver: &PublicKey) -> Self { + Self { + client, + users: Vec::new(), + homeserver: Some(homeserver.clone()), limit: None, live: false, reverse: false, @@ -167,14 +279,15 @@ impl EventStreamBuilder { /// Add a user to the event stream subscription. /// + /// **Deprecated**: Use [`Self::for_user`] for single-user streams or + /// [`Self::add_users`] for adding multiple users. + /// /// You can add up to 50 users total. Each user can have an independent cursor position. /// If a user is added who already exists then their cursor value is overwritten with the newest value. /// - /// IMPORTANT: Only the first added User's pubky is used to identify the Homeserver. - /// It is the responsibility of the caller to ensure that all Users added are on the same Homeserver. - /// /// # Errors /// - Returns an error if trying to add more than 50 users + #[deprecated(since = "0.7.0", note = "Use `for_user` or `add_users` instead")] pub fn add_user(mut self, user: &PublicKey, cursor: Option) -> Result { if let Some(existing) = self.users.iter_mut().find(|(u, _)| u == user) { existing.1 = cursor; @@ -191,6 +304,56 @@ impl EventStreamBuilder { Ok(self) } + /// Add multiple users to the event stream subscription at once. + /// + /// # Errors + /// - Returns an error if the total number of users would exceed 50 + /// + /// # Example + /// ```no_run + /// use pubky::{Pubky, PublicKey, EventCursor}; + /// + /// # async fn example() -> pubky::Result<()> { + /// let pubky = Pubky::new()?; + /// let homeserver = PublicKey::try_from("h9m4r...").unwrap(); + /// let user1 = PublicKey::try_from("o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo").unwrap(); + /// let user2 = PublicKey::try_from("pxnu33x7jtpx9ar1ytsi4yxbp6a5o36gwhffs8zoxmbuptici1jy").unwrap(); + /// + /// let users = [ + /// (&user1, None), + /// (&user2, Some(EventCursor::new(100))), + /// ]; + /// + /// let stream = pubky.event_stream_for(&homeserver) + /// .add_users(users)? + /// .live() + /// .subscribe() + /// .await?; + /// # Ok(()) + /// # } + /// ``` + pub fn add_users<'a>( + mut self, + users: impl IntoIterator)>, + ) -> Result { + for (user, cursor) in users { + // Check if user already exists - update cursor if so + if let Some(existing) = self.users.iter_mut().find(|(u, _)| u == user) { + existing.1 = cursor; + continue; + } + + if self.users.len() >= 50 { + return Err(Error::from(RequestError::Validation { + message: "Cannot subscribe to more than 50 users".into(), + })); + } + + self.users.push((user.clone(), cursor)); + } + Ok(self) + } + /// Set maximum number of events to receive before closing the connection. /// /// If omitted: @@ -300,16 +463,20 @@ impl EventStreamBuilder { })); } - // Resolve homeserver for the first user - let (first_user, _) = &self.users[0]; - let homeserver = Pkdns::with_client(self.client.clone()) - .get_homeserver_of(first_user) - .await - .ok_or_else(|| { - Error::from(RequestError::Validation { - message: format!("Could not resolve homeserver for user {first_user}"), - }) - })?; + // Use pre-set homeserver or resolve from first user + let homeserver = if let Some(hs) = &self.homeserver { + hs.clone() + } else { + let (first_user, _) = &self.users[0]; + Pkdns::with_client(self.client.clone()) + .get_homeserver_of(first_user) + .await + .ok_or_else(|| { + Error::from(RequestError::Validation { + message: format!("Could not resolve homeserver for user {first_user}"), + }) + })? + }; cross_log!( info, @@ -663,4 +830,127 @@ mod tests { assert_eq!(EventType::Put.to_string(), "PUT"); assert_eq!(EventType::Delete.to_string(), "DEL"); } + + // === Builder tests === + + fn test_pubkeys(count: usize) -> Vec { + // Generate random test public keys + (0..count) + .map(|_| crate::Keypair::random().public_key()) + .collect() + } + + #[test] + fn builder_constructors_and_add_users() { + let client = crate::PubkyHttpClient::testnet().unwrap(); + let keys = test_pubkeys(4); + + // for_user initializes with single user and cursor + let builder = + EventStreamBuilder::for_user(client.clone(), &keys[0], Some(EventCursor::new(42))); + assert_eq!(builder.users.len(), 1); + assert_eq!(builder.users[0].0, keys[0]); + assert_eq!(builder.users[0].1, Some(EventCursor::new(42))); + assert!(builder.homeserver.is_none()); + + // for_homeserver initializes with homeserver and no users + let builder = EventStreamBuilder::for_homeserver(client.clone(), &keys[0]); + assert!(builder.users.is_empty()); + assert_eq!(builder.homeserver.as_ref(), Some(&keys[0])); + + // add_users adds multiple users with cursors + let builder = EventStreamBuilder::for_homeserver(client.clone(), &keys[0]) + .add_users([ + (&keys[1], None), + (&keys[2], Some(EventCursor::new(100))), + (&keys[3], Some(EventCursor::new(200))), + ]) + .unwrap(); + assert_eq!(builder.users.len(), 3); + assert_eq!(builder.users[0], (keys[1].clone(), None)); + assert_eq!( + builder.users[1], + (keys[2].clone(), Some(EventCursor::new(100))) + ); + assert_eq!( + builder.users[2], + (keys[3].clone(), Some(EventCursor::new(200))) + ); + + // add_users updates existing user's cursor + let builder = EventStreamBuilder::for_homeserver(client.clone(), &keys[0]) + .add_users([(&keys[1], Some(EventCursor::new(10))), (&keys[2], None)]) + .unwrap() + .add_users([(&keys[1], Some(EventCursor::new(999)))]) + .unwrap(); + assert_eq!(builder.users.len(), 2); + assert_eq!(builder.users[0].1, Some(EventCursor::new(999))); // Updated + assert_eq!(builder.users[1].1, None); // Unchanged + + // Builder chaining with live mode + let builder = EventStreamBuilder::for_user(client.clone(), &keys[0], None) + .limit(100) + .live() + .path("/pub/posts/".to_string()); + assert_eq!(builder.limit, Some(100)); + assert!(builder.live); + assert!(!builder.reverse); + assert_eq!(builder.path, Some("/pub/posts/".to_string())); + + // Builder chaining with reverse mode + let builder = EventStreamBuilder::for_user(client, &keys[0], None) + .limit(50) + .reverse() + .path("/pub/files/".to_string()); + assert_eq!(builder.limit, Some(50)); + assert!(!builder.live); + assert!(builder.reverse); + assert_eq!(builder.path, Some("/pub/files/".to_string())); + } + + #[test] + fn add_users_errors_on_exceeding_50_users() { + let client = crate::PubkyHttpClient::testnet().unwrap(); + let keys = test_pubkeys(52); + let homeserver = &keys[0]; + let users = &keys[1..]; // 51 users + + let user_refs: Vec<_> = users.iter().map(|u| (u, None)).collect(); + + let result = EventStreamBuilder::for_homeserver(client, homeserver).add_users(user_refs); + + assert!(result.is_err()); + let err = result.unwrap_err().to_string(); + assert!(err.contains("50 users"), "Got: {err}"); + } + + #[tokio::test] + async fn subscribe_fails_with_no_users() { + let client = crate::PubkyHttpClient::testnet().unwrap(); + let keys = test_pubkeys(1); + let homeserver = &keys[0]; + + // Building with empty users list succeeds + let empty: [(&PublicKey, Option); 0] = []; + let builder = EventStreamBuilder::for_homeserver(client, homeserver) + .add_users(empty) + .unwrap(); + + assert!(builder.users.is_empty()); + assert_eq!(builder.homeserver.as_ref(), Some(homeserver)); + + // But subscribe should fail + let result = builder.subscribe().await; + + match result { + Ok(_) => panic!("Expected error, but subscribe succeeded"), + Err(e) => { + let err = e.to_string(); + assert!( + err.contains("At least one user must be specified"), + "Got: {err}" + ); + } + } + } } diff --git a/pubky-sdk/src/pubky.rs b/pubky-sdk/src/pubky.rs index 84175f2a7..9dbfa63a9 100644 --- a/pubky-sdk/src/pubky.rs +++ b/pubky-sdk/src/pubky.rs @@ -53,8 +53,8 @@ use crate::PublicKey; use crate::{ - Capabilities, EventStreamBuilder, Pkdns, PubkyAuthFlow, PubkyHttpClient, PubkySigner, - PublicStorage, Result, actors::AuthFlowKind, + Capabilities, EventCursor, EventStreamBuilder, Pkdns, PubkyAuthFlow, PubkyHttpClient, + PubkySigner, PublicStorage, Result, actors::AuthFlowKind, }; #[cfg(not(target_arch = "wasm32"))] @@ -160,22 +160,76 @@ impl Pubky { /// This allows you to subscribe to Server-Sent Events (SSE) from multiple users' /// events on a homeserver `/events-stream` endpoint. Use `.add_user()` to add /// users (up to 50), then call `.subscribe()`. + #[must_use] + #[deprecated( + since = "0.7.0", + note = "Use `event_stream_for_user` for single-user streams or `event_stream_for` for multi-user streams" + )] + #[allow( + deprecated, + reason = "This deprecated method calls the deprecated EventStreamBuilder::new" + )] + pub fn event_stream(&self) -> EventStreamBuilder { + EventStreamBuilder::new(self.client.clone()) + } + + /// Create an event stream builder for a single user. + /// + /// This is the simplest way to subscribe to events for one user. The homeserver + /// is automatically resolved from the user's Pkarr record. + /// + /// # Example + /// ```no_run + /// use pubky::{Pubky, PublicKey, EventCursor}; + /// use futures_util::StreamExt; + /// + /// # async fn example() -> pubky::Result<()> { + /// let pubky = Pubky::new()?; + /// let user = PublicKey::try_from("o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo").unwrap(); + /// + /// let mut stream = pubky.event_stream_for_user(&user, None) + /// .live() + /// .subscribe() + /// .await?; + /// + /// while let Some(result) = stream.next().await { + /// let event = result?; + /// println!("Event: {:?} at {}", event.event_type, event.resource); + /// } + /// # Ok(()) + /// # } + /// ``` + #[must_use] + pub fn event_stream_for_user( + &self, + user: &PublicKey, + cursor: Option, + ) -> EventStreamBuilder { + EventStreamBuilder::for_user(self.client.clone(), user, cursor) + } + + /// Create an event stream builder for a specific homeserver. + /// + /// Use this when you already know the homeserver pubkey. This avoids + /// Pkarr resolution overhead. Obtain a homeserver pubkey via [`Self::get_homeserver_of`]. /// /// # Example /// ```no_run - /// # use pubky::{Pubky, PublicKey, EventCursor}; - /// # use futures_util::StreamExt; + /// use pubky::{Pubky, PublicKey}; + /// use futures_util::StreamExt; + /// /// # async fn example() -> pubky::Result<()> { /// let pubky = Pubky::new()?; /// let user1 = PublicKey::try_from("o1gg96ewuojmopcjbz8895478wdtxtzzuxnfjjz8o8e77csa1ngo").unwrap(); /// let user2 = PublicKey::try_from("pxnu33x7jtpx9ar1ytsi4yxbp6a5o36gwhffs8zoxmbuptici1jy").unwrap(); /// - /// let mut stream = pubky.event_stream() + /// // When subscribing to multiple users on the same homeserver, + /// // specify the homeserver directly to avoid redundant Pkarr lookups + /// let homeserver = pubky.get_homeserver_of(&user1).await.unwrap(); + /// + /// let mut stream = pubky.event_stream_for(&homeserver) /// .add_user(&user1, None)? - /// .add_user(&user2, Some(EventCursor::new(100)))? - /// .live() - /// .limit(100) - /// .path("/pub/") + /// .add_user(&user2, None)? /// .subscribe() /// .await?; /// @@ -187,8 +241,8 @@ impl Pubky { /// # } /// ``` #[must_use] - pub fn event_stream(&self) -> EventStreamBuilder { - EventStreamBuilder::new(self.client.clone()) + pub fn event_stream_for(&self, homeserver: &PublicKey) -> EventStreamBuilder { + EventStreamBuilder::for_homeserver(self.client.clone(), homeserver) } // ------ Persistance helpers ---------- From 5df3998bb5343d5118a1a53e1eb20ee22263951a Mon Sep 17 00:00:00 2001 From: tomos Date: Wed, 4 Mar 2026 13:26:50 +0000 Subject: [PATCH 4/8] chore: Sort out dependency versioning --- Cargo.lock | 8 ++++---- e2e/Cargo.toml | 2 +- examples/rust/Cargo.toml | 6 +++--- pubky-common/Cargo.toml | 2 +- pubky-homeserver/Cargo.toml | 4 ++-- pubky-sdk/Cargo.toml | 2 +- pubky-testnet/Cargo.toml | 6 +++--- 7 files changed, 15 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7756279ec..556d79caa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1467,7 +1467,7 @@ checksum = "d0881ea181b1df73ff77ffaaf9c7544ecc11e82fba9b5f27b262a3c73a332555" [[package]] name = "e2e" -version = "0.7.0-rc.1" +version = "0.6.0" dependencies = [ "bytes", "eventsource-stream", @@ -3995,7 +3995,7 @@ dependencies = [ [[package]] name = "pubky-common" -version = "0.7.0-rc.1" +version = "0.6.0" dependencies = [ "argon2", "base32", @@ -4016,7 +4016,7 @@ dependencies = [ [[package]] name = "pubky-core-examples" -version = "0.7.0-rc.1" +version = "0.7.0" dependencies = [ "anyhow", "base64 0.22.1", @@ -4035,7 +4035,7 @@ dependencies = [ [[package]] name = "pubky-homeserver" -version = "0.7.0-rc.1" +version = "0.6.0" dependencies = [ "anyhow", "async-dropper", diff --git a/e2e/Cargo.toml b/e2e/Cargo.toml index 5a09d6f15..3f21dd97a 100644 --- a/e2e/Cargo.toml +++ b/e2e/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "e2e" edition.workspace = true -version = "0.7.0-rc.1" +version = "0.6.0" publish = false [dependencies] diff --git a/examples/rust/Cargo.toml b/examples/rust/Cargo.toml index bd9b02575..03a8313e2 100644 --- a/examples/rust/Cargo.toml +++ b/examples/rust/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pubky-core-examples" -version = "0.7.0-rc.1" +version = "0.7.0" edition.workspace = true publish = false @@ -45,8 +45,8 @@ futures-util = "0.3.31" anyhow.workspace = true base64 = "0.22.1" clap = { version = "4.5.48", features = ["derive"] } -pubky = { path = "../../pubky-sdk", version = "0.6.0" } -pubky-testnet = { path = "../../pubky-testnet", version = "0.6.0" } +pubky = { path = "../../pubky-sdk", version = "0.7.0-rc.1" } +pubky-testnet = { path = "../../pubky-testnet" } pubky-common = { path = "../../pubky-common", version = "0.6.0" } reqwest.workspace = true rpassword = "7.4.0" diff --git a/pubky-common/Cargo.toml b/pubky-common/Cargo.toml index 75045de98..e2b2727b3 100644 --- a/pubky-common/Cargo.toml +++ b/pubky-common/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "pubky-common" description = "Types and struct in common between Pubky client and homeserver" -version = "0.7.0-rc.1" +version = "0.6.0" edition.workspace = true authors.workspace = true license.workspace = true diff --git a/pubky-homeserver/Cargo.toml b/pubky-homeserver/Cargo.toml index 7fdab0e27..a99519a7c 100644 --- a/pubky-homeserver/Cargo.toml +++ b/pubky-homeserver/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "pubky-homeserver" description = "Pubky core's homeserver." -version = "0.7.0-rc.1" +version = "0.6.0" edition.workspace = true authors.workspace = true license.workspace = true @@ -33,7 +33,7 @@ hex = "0.4.3" httpdate = "1.0.3" postcard = { version = "1.1.1", features = ["alloc"] } pkarr = { workspace = true, features = ["dht", "lmdb-cache", "tls"] } -pubky-common = { path = "../pubky-common", version = "0.7.0-rc.1" } +pubky-common = { path = "../pubky-common", version = "0.6.0" } serde = { version = "1.0.217", features = ["derive"] } tokio = { workspace = true, features = ["full"] } toml = "0.8.20" diff --git a/pubky-sdk/Cargo.toml b/pubky-sdk/Cargo.toml index 4aeb5a586..2873e18f7 100644 --- a/pubky-sdk/Cargo.toml +++ b/pubky-sdk/Cargo.toml @@ -28,7 +28,7 @@ default = [] json = ["dep:serde", "reqwest/json"] [dependencies] -pubky-common = { path = "../pubky-common", version = "0.7.0-rc.1" } +pubky-common = { path = "../pubky-common", version = "0.6.0" } thiserror = "2.0.11" eventsource-stream = "0.2.3" url.workspace = true diff --git a/pubky-testnet/Cargo.toml b/pubky-testnet/Cargo.toml index 7ea13cffb..bde36271b 100644 --- a/pubky-testnet/Cargo.toml +++ b/pubky-testnet/Cargo.toml @@ -19,9 +19,9 @@ tokio = { workspace = true, features = ["full"] } tracing-subscriber.workspace = true url.workspace = true -pubky = { path = "../pubky-sdk", version = "0.7.0-rc.1", features = ["json"] } -pubky-common = { path = "../pubky-common", version = "0.7.0-rc.1" } -pubky-homeserver = { path = "../pubky-homeserver", version = "0.7.0-rc.1", default-features = false, features = [ +pubky = { version = "0.7.0-rc.1", path = "../pubky-sdk" } +pubky-common = { path = "../pubky-common", version = "0.6.0" } +pubky-homeserver = { path = "../pubky-homeserver", version = "0.6.0", default-features = false, features = [ "testing", ] } pubky_test_utils = { path = "../test_utils/pubky_test", version = "0.1.0" } From 45e7fbb7a2e97eb70093218a29824c490452da60 Mon Sep 17 00:00:00 2001 From: tomos Date: Mon, 2 Mar 2026 17:51:10 +0000 Subject: [PATCH 5/8] fix: A spot of testing and pipeline build --- e2e/src/tests/events.rs | 291 +++++++++++++++++++++++++++ pubky-sdk/Cargo.toml | 2 +- pubky-sdk/src/actors/event_stream.rs | 72 +++++++ pubky-testnet/Cargo.toml | 2 +- 4 files changed, 365 insertions(+), 2 deletions(-) diff --git a/e2e/src/tests/events.rs b/e2e/src/tests/events.rs index f806af9db..53ba774f7 100644 --- a/e2e/src/tests/events.rs +++ b/e2e/src/tests/events.rs @@ -1387,3 +1387,294 @@ async fn events_stream_path_filter() { "Wildcard: Should get exactly 1 event (underscore not treated as wildcard)" ); } + +/// Test the SDK builder API: `event_stream_for()` and `add_users()` +/// This tests the high-level SDK interface rather than raw HTTP requests. +#[tokio::test] +#[pubky_testnet::test] +async fn events_stream_sdk_builder_api() { + use futures::StreamExt; + use pubky_testnet::pubky::EventCursor; + use tokio::time::{timeout, Duration}; + + let testnet = build_full_testnet().await; + let server = testnet.homeserver_app(); + let pubky = testnet.sdk().unwrap(); + + // Create three users + let keypair1 = Keypair::random(); + let keypair2 = Keypair::random(); + let keypair3 = Keypair::random(); + + let signer1 = pubky.signer(keypair1); + let signer2 = pubky.signer(keypair2); + let signer3 = pubky.signer(keypair3); + + let session1 = signer1.signup(&server.public_key(), None).await.unwrap(); + let session2 = signer2.signup(&server.public_key(), None).await.unwrap(); + let session3 = signer3.signup(&server.public_key(), None).await.unwrap(); + + let pubky1 = signer1.public_key(); + let pubky2 = signer2.public_key(); + let pubky3 = signer3.public_key(); + + // Create events for each user + for i in 0..3 { + session1 + .storage() + .put(format!("/pub/user1_{i}.txt"), vec![i as u8]) + .await + .unwrap(); + } + for i in 0..2 { + session2 + .storage() + .put(format!("/pub/user2_{i}.txt"), vec![i as u8]) + .await + .unwrap(); + } + for i in 0..4 { + session3 + .storage() + .put(format!("/pub/user3_{i}.txt"), vec![i as u8]) + .await + .unwrap(); + } + + // ==== Test 1: event_stream_for_user() - single user stream ==== + let mut stream = pubky + .event_stream_for_user(&pubky1, None) + .limit(10) + .subscribe() + .await + .unwrap(); + + let mut user1_events = Vec::new(); + while let Some(result) = stream.next().await { + let event = result.unwrap(); + user1_events.push(event.resource.path.to_string()); + if user1_events.len() >= 3 { + break; + } + } + drop(stream); + + assert_eq!( + user1_events.len(), + 3, + "event_stream_for_user: Should get 3 events for user1" + ); + assert!( + user1_events.iter().all(|p| p.contains("user1_")), + "event_stream_for_user: All events should be from user1" + ); + + // ==== Test 2: event_stream_for() with add_users() - multi-user stream ==== + let homeserver = server.public_key(); + + let mut stream = pubky + .event_stream_for(&homeserver) + .add_users([(&pubky1, None), (&pubky2, None)]) + .unwrap() + .limit(10) + .subscribe() + .await + .unwrap(); + + let mut multi_user_events = Vec::new(); + while let Some(result) = stream.next().await { + let event = result.unwrap(); + multi_user_events.push((event.resource.owner.z32(), event.resource.path.to_string())); + if multi_user_events.len() >= 5 { + break; + } + } + drop(stream); + + assert_eq!( + multi_user_events.len(), + 5, + "add_users: Should get 5 events total (3 from user1 + 2 from user2)" + ); + + let user1_count = multi_user_events + .iter() + .filter(|(u, _)| *u == pubky1.z32()) + .count(); + let user2_count = multi_user_events + .iter() + .filter(|(u, _)| *u == pubky2.z32()) + .count(); + let user3_count = multi_user_events + .iter() + .filter(|(u, _)| *u == pubky3.z32()) + .count(); + + assert_eq!(user1_count, 3, "add_users: Should get 3 events from user1"); + assert_eq!(user2_count, 2, "add_users: Should get 2 events from user2"); + assert_eq!( + user3_count, 0, + "add_users: Should get 0 events from user3 (not subscribed)" + ); + + // ==== Test 3: add_users() with per-user cursors ==== + // First, get events and capture cursor at position 2 for user1 + let mut stream = pubky + .event_stream_for(&homeserver) + .add_users([(&pubky1, None)]) + .unwrap() + .limit(3) + .subscribe() + .await + .unwrap(); + + let mut cursor_at_2: Option = None; + let mut count = 0; + while let Some(result) = stream.next().await { + let event = result.unwrap(); + count += 1; + if count == 2 { + cursor_at_2 = Some(event.cursor); + } + if count >= 3 { + break; + } + } + drop(stream); + + let cursor = cursor_at_2.expect("Should have captured cursor at position 2"); + + // Now subscribe with cursor - should get only 1 remaining event from user1 + let mut stream = pubky + .event_stream_for(&homeserver) + .add_users([(&pubky1, Some(cursor)), (&pubky2, None)]) + .unwrap() + .limit(10) + .subscribe() + .await + .unwrap(); + + let mut events_after_cursor = Vec::new(); + while let Some(result) = stream.next().await { + let event = result.unwrap(); + events_after_cursor.push((event.resource.owner.z32(), event.resource.path.to_string())); + if events_after_cursor.len() >= 3 { + break; + } + } + drop(stream); + + assert_eq!( + events_after_cursor.len(), + 3, + "Cursor: Should get 3 events (1 from user1 after cursor + 2 from user2)" + ); + + let user1_after = events_after_cursor + .iter() + .filter(|(u, _)| *u == pubky1.z32()) + .count(); + let user2_after = events_after_cursor + .iter() + .filter(|(u, _)| *u == pubky2.z32()) + .count(); + + assert_eq!( + user1_after, 1, + "Cursor: Should get 1 event from user1 (after cursor)" + ); + assert_eq!( + user2_after, 2, + "Cursor: Should get 2 events from user2 (no cursor, from beginning)" + ); + + // ==== Test 4: Builder options - reverse, path filter ==== + let mut stream = pubky + .event_stream_for(&homeserver) + .add_users([(&pubky1, None)]) + .unwrap() + .reverse() + .limit(3) + .subscribe() + .await + .unwrap(); + + let mut reverse_events = Vec::new(); + while let Some(result) = stream.next().await { + let event = result.unwrap(); + reverse_events.push(event.resource.path.to_string()); + if reverse_events.len() >= 3 { + break; + } + } + drop(stream); + + assert_eq!(reverse_events.len(), 3, "Reverse: Should get 3 events"); + assert!( + reverse_events[0].contains("user1_2"), + "Reverse: First should be newest (user1_2), got: {}", + reverse_events[0] + ); + assert!( + reverse_events[2].contains("user1_0"), + "Reverse: Last should be oldest (user1_0), got: {}", + reverse_events[2] + ); + + // ==== Test 5: Live mode with add_users() ==== + let mut stream = pubky + .event_stream_for(&homeserver) + .add_users([(&pubky1, None), (&pubky2, None)]) + .unwrap() + .live() + .subscribe() + .await + .unwrap(); + + // Consume historical events first (5 total) + for _ in 0..5 { + let _ = stream.next().await; + } + + // Spawn task to create a live event + let session1_clone = session1.clone(); + tokio::spawn(async move { + tokio::time::sleep(Duration::from_millis(100)).await; + session1_clone + .storage() + .put("/pub/live_event.txt", vec![99]) + .await + .unwrap(); + }); + + // Should receive the live event + let result = timeout(Duration::from_secs(5), stream.next()).await; + assert!(result.is_ok(), "Live: Should receive event within timeout"); + + let event = result + .unwrap() + .expect("Live: Stream should have event") + .unwrap(); + assert!( + event.resource.path.as_str().contains("live_event"), + "Live: Should receive the live event, got: {}", + event.resource.path + ); + + // ==== Test 6: Error case - add_users() with >50 users ==== + let many_keys: Vec<_> = (0..51).map(|_| Keypair::random().public_key()).collect(); + let many_refs: Vec<_> = many_keys.iter().map(|k| (k, None)).collect(); + + let result = pubky.event_stream_for(&homeserver).add_users(many_refs); + + assert!( + result.is_err(), + "add_users: Should error when adding >50 users" + ); + let err = result.unwrap_err().to_string(); + assert!( + err.contains("50 users"), + "add_users: Error should mention 50 user limit, got: {}", + err + ); +} diff --git a/pubky-sdk/Cargo.toml b/pubky-sdk/Cargo.toml index 2873e18f7..22da3a778 100644 --- a/pubky-sdk/Cargo.toml +++ b/pubky-sdk/Cargo.toml @@ -29,7 +29,7 @@ json = ["dep:serde", "reqwest/json"] [dependencies] pubky-common = { path = "../pubky-common", version = "0.6.0" } -thiserror = "2.0.11" +thiserror.workspace = true eventsource-stream = "0.2.3" url.workspace = true base64 = "0.22.1" diff --git a/pubky-sdk/src/actors/event_stream.rs b/pubky-sdk/src/actors/event_stream.rs index 0a12e3143..7c2501816 100644 --- a/pubky-sdk/src/actors/event_stream.rs +++ b/pubky-sdk/src/actors/event_stream.rs @@ -924,6 +924,78 @@ mod tests { assert!(err.contains("50 users"), "Got: {err}"); } + #[test] + fn build_request_url_constructs_correct_query_params() { + let client = crate::PubkyHttpClient::testnet().unwrap(); + let keys = test_pubkeys(3); + let homeserver = &keys[0]; + let user1 = &keys[1]; + let user2 = &keys[2]; + + // Test with all options: multiple users, cursors, limit, live, path + let builder = EventStreamBuilder::for_homeserver(client.clone(), homeserver) + .add_users([(user1, None), (user2, Some(EventCursor::new(42)))]) + .unwrap() + .limit(100) + .live() + .path("/pub/posts/"); + + let url = builder.build_request_url(homeserver).unwrap(); + + // Verify base URL + assert_eq!(url.scheme(), "https"); + assert_eq!(url.path(), "/events-stream"); + + // Verify query parameters + let query: Vec<_> = url.query_pairs().collect(); + + // Should have: user (x2), limit, live, path + assert_eq!(query.len(), 5, "Should have 5 query params: {:?}", query); + + // Check user params + let user_params: Vec<_> = query + .iter() + .filter(|(k, _)| k == "user") + .map(|(_, v)| v.to_string()) + .collect(); + assert_eq!(user_params.len(), 2); + assert!( + user_params.iter().any(|v| v == &user1.z32()), + "Should have user1 without cursor" + ); + assert!( + user_params + .iter() + .any(|v| v == &format!("{}:42", user2.z32())), + "Should have user2 with cursor" + ); + + // Check other params + assert!(query.iter().any(|(k, v)| k == "limit" && v == "100")); + assert!(query.iter().any(|(k, v)| k == "live" && v == "true")); + assert!(query.iter().any(|(k, v)| k == "path" && v == "/pub/posts/")); + + // Test reverse mode (mutually exclusive with live) + let builder_reverse = EventStreamBuilder::for_homeserver(client, homeserver) + .add_users([(user1, None)]) + .unwrap() + .reverse() + .limit(50); + + let url_reverse = builder_reverse.build_request_url(homeserver).unwrap(); + let query_reverse: Vec<_> = url_reverse.query_pairs().collect(); + + assert!( + query_reverse + .iter() + .any(|(k, v)| k == "reverse" && v == "true") + ); + assert!( + !query_reverse.iter().any(|(k, _)| k == "live"), + "Should not have live param when reverse is set" + ); + } + #[tokio::test] async fn subscribe_fails_with_no_users() { let client = crate::PubkyHttpClient::testnet().unwrap(); diff --git a/pubky-testnet/Cargo.toml b/pubky-testnet/Cargo.toml index bde36271b..0f7e458d2 100644 --- a/pubky-testnet/Cargo.toml +++ b/pubky-testnet/Cargo.toml @@ -19,7 +19,7 @@ tokio = { workspace = true, features = ["full"] } tracing-subscriber.workspace = true url.workspace = true -pubky = { version = "0.7.0-rc.1", path = "../pubky-sdk" } +pubky = { version = "0.7.0-rc.1", path = "../pubky-sdk", features = ["json"] } pubky-common = { path = "../pubky-common", version = "0.6.0" } pubky-homeserver = { path = "../pubky-homeserver", version = "0.6.0", default-features = false, features = [ "testing", From e25e4aad2d439e2accaa1372e546ade6a474e79d Mon Sep 17 00:00:00 2001 From: tomos Date: Mon, 2 Mar 2026 19:34:15 +0000 Subject: [PATCH 6/8] chore: v0.7.0-rc.2 --- Cargo.lock | 2 +- pubky-sdk/Cargo.toml | 2 +- pubky-sdk/bindings/js/pkg/package.json | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 556d79caa..df719f0c6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3969,7 +3969,7 @@ checksum = "33cb294fe86a74cbcf50d4445b37da762029549ebeea341421c7c70370f86cac" [[package]] name = "pubky" -version = "0.7.0-rc.1" +version = "0.7.0-rc.2" dependencies = [ "base64 0.22.1", "cookie", diff --git a/pubky-sdk/Cargo.toml b/pubky-sdk/Cargo.toml index 22da3a778..1adf4fb49 100644 --- a/pubky-sdk/Cargo.toml +++ b/pubky-sdk/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "pubky" description = "Pubky SDK" -version = "0.7.0-rc.1" +version = "0.7.0-rc.2" edition = "2024" authors = [ "SeverinAlexB ", diff --git a/pubky-sdk/bindings/js/pkg/package.json b/pubky-sdk/bindings/js/pkg/package.json index 1f6b2fc52..c6cf17072 100644 --- a/pubky-sdk/bindings/js/pkg/package.json +++ b/pubky-sdk/bindings/js/pkg/package.json @@ -2,7 +2,7 @@ "name": "@synonymdev/pubky", "type": "module", "description": "Pubky client", - "version": "0.7.0-rc.1", + "version": "0.7.0-rc.2", "license": "MIT", "repository": { "type": "git", From 789131edfd35cc68e2e711e5c4405dddbadd73a0 Mon Sep 17 00:00:00 2001 From: tomos Date: Wed, 4 Mar 2026 13:40:24 +0000 Subject: [PATCH 7/8] chore: v0.7.0-rc.3 --- Cargo.lock | 16 +++++++++------- e2e/Cargo.toml | 2 +- examples/rust/Cargo.toml | 6 +++--- pubky-common/Cargo.toml | 2 +- pubky-homeserver/Cargo.toml | 4 ++-- pubky-sdk/Cargo.toml | 4 ++-- pubky-sdk/bindings/js/Cargo.toml | 2 +- pubky-sdk/bindings/js/pkg/package.json | 2 +- pubky-testnet/Cargo.toml | 14 ++++++++------ 9 files changed, 28 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index df719f0c6..200f8928c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1467,7 +1467,7 @@ checksum = "d0881ea181b1df73ff77ffaaf9c7544ecc11e82fba9b5f27b262a3c73a332555" [[package]] name = "e2e" -version = "0.6.0" +version = "0.7.0-rc.3" dependencies = [ "bytes", "eventsource-stream", @@ -3969,7 +3969,7 @@ checksum = "33cb294fe86a74cbcf50d4445b37da762029549ebeea341421c7c70370f86cac" [[package]] name = "pubky" -version = "0.7.0-rc.2" +version = "0.7.0-rc.3" dependencies = [ "base64 0.22.1", "cookie", @@ -3995,7 +3995,7 @@ dependencies = [ [[package]] name = "pubky-common" -version = "0.6.0" +version = "0.7.0-rc.3" dependencies = [ "argon2", "base32", @@ -4016,7 +4016,7 @@ dependencies = [ [[package]] name = "pubky-core-examples" -version = "0.7.0" +version = "0.7.0-rc.3" dependencies = [ "anyhow", "base64 0.22.1", @@ -4035,7 +4035,7 @@ dependencies = [ [[package]] name = "pubky-homeserver" -version = "0.6.0" +version = "0.7.0-rc.3" dependencies = [ "anyhow", "async-dropper", @@ -4099,7 +4099,7 @@ dependencies = [ [[package]] name = "pubky-testnet" -version = "0.6.0" +version = "0.7.0-rc.3" dependencies = [ "anyhow", "clap", @@ -4109,6 +4109,8 @@ dependencies = [ "once_cell", "pkarr", "pkarr-relay", + "postgresql_archive", + "postgresql_commands", "postgresql_embedded", "pubky", "pubky-common", @@ -4139,7 +4141,7 @@ dependencies = [ [[package]] name = "pubky-wasm" -version = "0.7.0-rc.1" +version = "0.7.0-rc.3" dependencies = [ "console_log", "futures-util", diff --git a/e2e/Cargo.toml b/e2e/Cargo.toml index 3f21dd97a..eb32644ba 100644 --- a/e2e/Cargo.toml +++ b/e2e/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "e2e" edition.workspace = true -version = "0.6.0" +version = "0.7.0-rc.3" publish = false [dependencies] diff --git a/examples/rust/Cargo.toml b/examples/rust/Cargo.toml index 03a8313e2..386534dce 100644 --- a/examples/rust/Cargo.toml +++ b/examples/rust/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pubky-core-examples" -version = "0.7.0" +version = "0.7.0-rc.3" edition.workspace = true publish = false @@ -45,9 +45,9 @@ futures-util = "0.3.31" anyhow.workspace = true base64 = "0.22.1" clap = { version = "4.5.48", features = ["derive"] } -pubky = { path = "../../pubky-sdk", version = "0.7.0-rc.1" } +pubky = { path = "../../pubky-sdk" } pubky-testnet = { path = "../../pubky-testnet" } -pubky-common = { path = "../../pubky-common", version = "0.6.0" } +pubky-common = { path = "../../pubky-common" } reqwest.workspace = true rpassword = "7.4.0" tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } diff --git a/pubky-common/Cargo.toml b/pubky-common/Cargo.toml index e2b2727b3..28324d5a7 100644 --- a/pubky-common/Cargo.toml +++ b/pubky-common/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "pubky-common" description = "Types and struct in common between Pubky client and homeserver" -version = "0.6.0" +version = "0.7.0-rc.3" edition.workspace = true authors.workspace = true license.workspace = true diff --git a/pubky-homeserver/Cargo.toml b/pubky-homeserver/Cargo.toml index a99519a7c..b02a407a2 100644 --- a/pubky-homeserver/Cargo.toml +++ b/pubky-homeserver/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "pubky-homeserver" description = "Pubky core's homeserver." -version = "0.6.0" +version = "0.7.0-rc.3" edition.workspace = true authors.workspace = true license.workspace = true @@ -33,7 +33,7 @@ hex = "0.4.3" httpdate = "1.0.3" postcard = { version = "1.1.1", features = ["alloc"] } pkarr = { workspace = true, features = ["dht", "lmdb-cache", "tls"] } -pubky-common = { path = "../pubky-common", version = "0.6.0" } +pubky-common = { path = "../pubky-common" } serde = { version = "1.0.217", features = ["derive"] } tokio = { workspace = true, features = ["full"] } toml = "0.8.20" diff --git a/pubky-sdk/Cargo.toml b/pubky-sdk/Cargo.toml index 1adf4fb49..215ae1c7b 100644 --- a/pubky-sdk/Cargo.toml +++ b/pubky-sdk/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "pubky" description = "Pubky SDK" -version = "0.7.0-rc.2" +version = "0.7.0-rc.3" edition = "2024" authors = [ "SeverinAlexB ", @@ -28,7 +28,7 @@ default = [] json = ["dep:serde", "reqwest/json"] [dependencies] -pubky-common = { path = "../pubky-common", version = "0.6.0" } +pubky-common = { path = "../pubky-common" } thiserror.workspace = true eventsource-stream = "0.2.3" url.workspace = true diff --git a/pubky-sdk/bindings/js/Cargo.toml b/pubky-sdk/bindings/js/Cargo.toml index 144b26012..ac877c1ef 100644 --- a/pubky-sdk/bindings/js/Cargo.toml +++ b/pubky-sdk/bindings/js/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pubky-wasm" -version = "0.7.0-rc.1" +version = "0.7.0-rc.3" edition = "2024" description = "Pubky-Core Client WASM bindings" authors = [ diff --git a/pubky-sdk/bindings/js/pkg/package.json b/pubky-sdk/bindings/js/pkg/package.json index c6cf17072..ef4439da9 100644 --- a/pubky-sdk/bindings/js/pkg/package.json +++ b/pubky-sdk/bindings/js/pkg/package.json @@ -2,7 +2,7 @@ "name": "@synonymdev/pubky", "type": "module", "description": "Pubky client", - "version": "0.7.0-rc.2", + "version": "0.7.0-rc.3", "license": "MIT", "repository": { "type": "git", diff --git a/pubky-testnet/Cargo.toml b/pubky-testnet/Cargo.toml index 0f7e458d2..144bba135 100644 --- a/pubky-testnet/Cargo.toml +++ b/pubky-testnet/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "pubky-testnet" description = "A local test network for Pubky Core development." -version = "0.6.0" +version = "0.7.0-rc.3" edition.workspace = true authors.workspace = true license.workspace = true @@ -19,9 +19,9 @@ tokio = { workspace = true, features = ["full"] } tracing-subscriber.workspace = true url.workspace = true -pubky = { version = "0.7.0-rc.1", path = "../pubky-sdk", features = ["json"] } -pubky-common = { path = "../pubky-common", version = "0.6.0" } -pubky-homeserver = { path = "../pubky-homeserver", version = "0.6.0", default-features = false, features = [ +pubky = { path = "../pubky-sdk", features = ["json"] } +pubky-common = { path = "../pubky-common" } +pubky-homeserver = { path = "../pubky-homeserver", default-features = false, features = [ "testing", ] } pubky_test_utils = { path = "../test_utils/pubky_test", version = "0.1.0" } @@ -33,9 +33,11 @@ mainline = { workspace = true } clap.workspace = true dirs = "6.0.0" once_cell = "1.21.3" -postgresql_embedded = { version = "0.20", optional = true } +postgresql_embedded = { version = "=0.20.0", optional = true } +postgresql_commands = { version = "=0.20.0", optional = true } +postgresql_archive = { version = "=0.20.0", optional = true } rand = { version = "0.9", optional = true } [features] default = [] -embedded-postgres = ["postgresql_embedded", "rand"] +embedded-postgres = ["postgresql_embedded", "postgresql_commands", "postgresql_archive", "rand"] From ea5a27d40ede4c55a18332beb9c960f21162a4c7 Mon Sep 17 00:00:00 2001 From: tomos Date: Wed, 4 Mar 2026 14:10:07 +0000 Subject: [PATCH 8/8] docs: Explain why postgres dependencies are verison fixed --- pubky-testnet/Cargo.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pubky-testnet/Cargo.toml b/pubky-testnet/Cargo.toml index 144bba135..b6d18576f 100644 --- a/pubky-testnet/Cargo.toml +++ b/pubky-testnet/Cargo.toml @@ -33,6 +33,7 @@ mainline = { workspace = true } clap.workspace = true dirs = "6.0.0" once_cell = "1.21.3" +# Pinned to 0.20.0 to maintain compatibility with Rust 1.89.0 (0.20.2+ requires Rust 1.92.0) postgresql_embedded = { version = "=0.20.0", optional = true } postgresql_commands = { version = "=0.20.0", optional = true } postgresql_archive = { version = "=0.20.0", optional = true }