Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 7 additions & 8 deletions examples/rust/7-events_stream/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::Result;
use clap::Parser;
use futures_util::StreamExt;
use pubky::{EventCursor, EventStreamBuilder, EventType, Pubky, PublicKey};
use pubky::{EventCursor, EventStreamBuilder, Pubky, PublicKey};
use std::env;

#[derive(Parser, Debug)]
Expand Down Expand Up @@ -125,15 +125,14 @@ async fn main() -> Result<()> {
// Process events
while let Some(result) = stream.next().await {
let event = result?;
let hash_str = event
.event_type
.content_hash()
.map(|h| h.to_string())
.unwrap_or("-".into());
println!(
"[{}] {} (cursor: {}, hash: {})",
match event.event_type {
EventType::Put => "PUT",
EventType::Delete => "DEL",
},
event.resource,
event.cursor,
event.content_hash.unwrap_or_else(|| "-".to_string())
event.event_type, event.resource, event.cursor, hash_str
);
}

Expand Down
174 changes: 174 additions & 0 deletions pubky-common/src/events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
//! Event types shared across Pubky crates.
//!
//! This module provides unified types for event streaming functionality,
//! used by both the homeserver and SDK.

use std::fmt::Display;
use std::str::FromStr;

use crate::crypto::Hash;

/// Cursor for pagination in event queries.
///
/// The cursor represents the ID of an event and is used for pagination.
/// It can be parsed from a string representation of an integer.
///
/// Note: Uses `u64` internally, but Postgres BIGINT is signed (`i64`).
/// sea_query/sqlx binds `u64` values, which works correctly as long as
/// IDs stay within `i64::MAX` (~9.2 quintillion). Since event IDs are
/// auto-incrementing from 1, this is not a practical concern.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub struct EventCursor(u64);

impl EventCursor {
/// Create a new cursor from an event ID.
#[must_use]
pub fn new(id: u64) -> Self {
Self(id)
}

/// Get the underlying ID value.
#[must_use]
pub fn id(&self) -> u64 {
self.0
}
}

impl Display for EventCursor {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}

impl FromStr for EventCursor {
type Err = std::num::ParseIntError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(EventCursor(s.parse()?))
}
}

impl From<u64> for EventCursor {
fn from(id: u64) -> Self {
EventCursor(id)
}
}

impl TryFrom<&str> for EventCursor {
type Error = std::num::ParseIntError;

fn try_from(s: &str) -> Result<Self, Self::Error> {
s.parse()
}
}

impl TryFrom<String> for EventCursor {
type Error = std::num::ParseIntError;

fn try_from(s: String) -> Result<Self, Self::Error> {
s.parse()
}
}

/// Type of event in the event stream.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum EventType {
/// PUT event - resource created or updated, with its content hash.
Put {
/// Blake3 hash of the content.
content_hash: Hash,
},
/// DELETE event - resource deleted.
Delete,
}

impl EventType {
/// Get the string representation of the event type.
pub fn as_str(&self) -> &'static str {
match self {
EventType::Put { .. } => "PUT",
EventType::Delete => "DEL",
}
}

/// Get the content hash if this is a PUT event.
pub fn content_hash(&self) -> Option<&Hash> {
match self {
EventType::Put { content_hash } => Some(content_hash),
EventType::Delete => None,
}
}
}

impl Display for EventType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.as_str())
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn cursor_display_and_from_str() {
let cursor = EventCursor::new(12345);
assert_eq!(cursor.to_string(), "12345");

let parsed: EventCursor = "67890".parse().unwrap();
assert_eq!(parsed.id(), 67890);

let from_u64: EventCursor = 111u64.into();
assert_eq!(from_u64.id(), 111);

let try_from_str = EventCursor::try_from("222").unwrap();
assert_eq!(try_from_str.id(), 222);

let try_from_string = EventCursor::try_from("333".to_string()).unwrap();
assert_eq!(try_from_string.id(), 333);
}

#[test]
fn cursor_ordering() {
let c1 = EventCursor::new(1);
let c2 = EventCursor::new(2);
let c3 = EventCursor::new(2);

assert!(c1 < c2);
assert!(c2 > c1);
assert_eq!(c2, c3);
}

#[test]
fn event_type_display() {
let put = EventType::Put {
content_hash: Hash::from_bytes([0; 32]),
};
let del = EventType::Delete;

assert_eq!(put.to_string(), "PUT");
assert_eq!(del.to_string(), "DEL");
assert_eq!(put.as_str(), "PUT");
assert_eq!(del.as_str(), "DEL");
}

#[test]
fn event_type_content_hash() {
let hash = Hash::from_bytes([1; 32]);
let put = EventType::Put {
content_hash: hash.clone(),
};
let del = EventType::Delete;

assert_eq!(put.content_hash(), Some(&hash));
assert_eq!(del.content_hash(), None);
}

#[test]
fn cursor_parse_error() {
assert!("abc".parse::<EventCursor>().is_err());
assert!("".parse::<EventCursor>().is_err());
assert!("-1".parse::<EventCursor>().is_err());
assert!("12.34".parse::<EventCursor>().is_err());
}
}
1 change: 1 addition & 0 deletions pubky-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub mod auth;
pub mod capabilities;
pub mod constants;
pub mod crypto;
pub mod events;
mod keys;
pub mod namespaces;
pub mod recovery_file;
Expand Down
15 changes: 4 additions & 11 deletions pubky-homeserver/src/persistence/files/events/events_entity.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,11 @@
use pubky_common::crypto::Hash;
use pubky_common::crypto::PublicKey;
use pubky_common::events::{EventCursor, EventType};
use sea_query::Iden;
use sqlx::{postgres::PgRow, FromRow, Row};

use crate::{
persistence::{
files::events::{
events_repository::{EventCursor, EventIden},
EventType,
},
sql::user::UserIden,
},
persistence::{files::events::events_repository::EventIden, sql::user::UserIden},
shared::webdav::{EntryPath, WebDavPath},
};

Expand Down Expand Up @@ -39,8 +34,6 @@ impl FromRow<'_, PgRow> for EventEntity {
let user_pubkey =
PublicKey::try_from_z32(&user_public_key).map_err(|e| sqlx::Error::Decode(e.into()))?;
let event_type_str: String = row.try_get(EventIden::Type.to_string().as_str())?;
let user_public_key =
PublicKey::try_from_z32(&user_public_key).map_err(|e| sqlx::Error::Decode(e.into()))?;
let path: String = row.try_get(EventIden::Path.to_string().as_str())?;
let path = WebDavPath::new(&path).map_err(|e| sqlx::Error::Decode(e.into()))?;
let created_at: sqlx::types::chrono::NaiveDateTime =
Expand Down Expand Up @@ -78,8 +71,8 @@ impl FromRow<'_, PgRow> for EventEntity {
id,
event_type,
user_id,
user_pubkey,
path: EntryPath::new(user_public_key, path),
user_pubkey: user_pubkey.clone(),
path: EntryPath::new(user_pubkey, path),
created_at,
})
}
Expand Down
Loading
Loading