diff --git a/nexus-common/src/db/graph/queries/put.rs b/nexus-common/src/db/graph/queries/put.rs index a1f8bdf7..23b2e81b 100644 --- a/nexus-common/src/db/graph/queries/put.rs +++ b/nexus-common/src/db/graph/queries/put.rs @@ -81,7 +81,7 @@ pub fn create_post( .param("content", post.content.to_string()) .param("indexed_at", post.indexed_at) .param("kind", kind.trim_matches('"')) - .param("attachments", post.attachments.clone().unwrap_or(vec![])); + .param("attachments", post.attachments.clone().unwrap_or_default()); // Handle "replied" relationship cypher_query = add_relationship_params( diff --git a/nexus-watcher/src/service/mod.rs b/nexus-watcher/src/service/mod.rs index b07a6d98..d5d00e26 100644 --- a/nexus-watcher/src/service/mod.rs +++ b/nexus-watcher/src/service/mod.rs @@ -4,11 +4,11 @@ mod processor_runner; mod stats; mod traits; -/// Module exports pub use constants::{PROCESSING_TIMEOUT_SECS, WATCHER_CONFIG_FILE_NAME}; use nexus_common::types::DynError; pub use processor::EventProcessor; pub use processor_runner::EventProcessorRunner; +pub use stats::ProcessedStats; pub use traits::{TEventProcessor, TEventProcessorRunner}; use crate::NexusWatcherBuilder; @@ -17,6 +17,7 @@ use nexus_common::models::homeserver::Homeserver; use nexus_common::utils::create_shutdown_rx; use nexus_common::{DaemonConfig, WatcherConfig}; use pubky_app_specs::PubkyId; +use std::future::Future; use std::path::PathBuf; use std::sync::Arc; use tokio::sync::watch::Receiver; @@ -68,84 +69,135 @@ impl NexusWatcher { NexusWatcherBuilder(watcher_config).start(shutdown_rx).await } - /// Starts the Nexus Watcher with 3 parallel loops: - /// - /// 1. **Default homeserver loop**: Processes events from the default homeserver defined in [`WatcherConfig`]. - /// 2. **External homeservers loop**: Processes events from all external monitored homeservers, excluding the default. - /// 3. **Reserved loop**: Placeholder for future use. - /// - /// All loops share the same tick interval ([`WatcherConfig::watcher_sleep`]) and listen for - /// the shutdown signal to exit gracefully. + /// Initializes configuration, persists the default homeserver, and delegates to [`Self::run_tasks`]. pub async fn start(shutdown_rx: Receiver, config: WatcherConfig) -> Result<(), DynError> { debug!(?config, "Running NexusWatcher with "); let config_hs = PubkyId::try_from(config.homeserver.as_str())?; Homeserver::persist_if_unknown(config_hs).await?; - let watcher_sleep = config.watcher_sleep; - let ev_processor_runner = EventProcessorRunner::from_config(&config, shutdown_rx.clone()); - let ev_processor_runner = Arc::new(ev_processor_runner); - - // Thread 1: Default homeserver processing - let default_hs_handle = { - let runner = ev_processor_runner.clone(); - let mut shutdown = shutdown_rx.clone(); - tokio::spawn(async move { - let mut interval = tokio::time::interval(Duration::from_millis(watcher_sleep)); - loop { - tokio::select! { - _ = shutdown.changed() => { - info!("SIGINT received, exiting default homeserver loop"); - break; - } - _ = interval.tick() => { - debug!("Indexing default homeserver…"); - _ = runner - .run_default_homeserver() - .await - .inspect_err(|e| error!("Failed to run default homeserver event processor: {e}")); - } - } + let runner = EventProcessorRunner::from_config(&config, shutdown_rx.clone()); + Self::run_tasks(shutdown_rx, Arc::new(runner), config.watcher_sleep).await + } + + /// Spawns processing tasks and waits for completion using a [`tokio::task::JoinSet`]. + /// + /// Three parallel tasks are spawned: + /// 1. **Default homeserver task**: calls [`TEventProcessorRunner::run_default_homeserver`] each tick. + /// 2. **External homeservers task**: calls [`TEventProcessorRunner::run_external_homeservers`] each tick. + /// 3. **Shutdown forwarder task**: bridges the external `shutdown_rx` (e.g. SIGINT) into an internal channel. + /// + /// When any task exits (normally or via panic), `JoinSet::join_next` returns and the + /// remaining tasks are signalled to shut down gracefully via the internal channel. + /// + /// Separated from [`Self::start`] to allow injection of mock runners in tests. + pub async fn run_tasks( + shutdown_rx: Receiver, + runner: Arc, + watcher_sleep: u64, + ) -> Result<(), DynError> { + let (internal_shutdown_tx, internal_shutdown_rx) = tokio::sync::watch::channel(false); + let mut set: tokio::task::JoinSet<&'static str> = tokio::task::JoinSet::new(); + + // Task 1: Default homeserver processing + Self::spawn_processing_loop( + &mut set, + runner.clone(), + internal_shutdown_rx.clone(), + watcher_sleep, + "default homeserver", + |runner| async move { runner.run_default_homeserver().await }, + ); + + // Task 2: External homeservers processing + Self::spawn_processing_loop( + &mut set, + runner.clone(), + internal_shutdown_rx.clone(), + watcher_sleep, + "external homeservers", + |runner| async move { runner.run_external_homeservers().await }, + ); + + // Task 3: Forwarder — bridges external SIGINT into the internal shutdown channel + set.spawn(async move { + let mut shutdown_rx = shutdown_rx; + let mut internal_shutdown_rx = internal_shutdown_rx; + tokio::select! { + _ = shutdown_rx.changed() => { + info!("SIGINT received, forwarding shutdown to watcher tasks"); } - }) - }; - - // Thread 2: External homeservers processing - let external_hss_handle = { - let runner = ev_processor_runner.clone(); - let mut shutdown = shutdown_rx.clone(); - tokio::spawn(async move { - let mut interval = tokio::time::interval(Duration::from_millis(watcher_sleep)); - loop { - tokio::select! { - _ = shutdown.changed() => { - info!("SIGINT received, exiting other homeservers loop"); - break; - } - _ = interval.tick() => { - debug!("Indexing other homeservers…"); - _ = runner - .run_external_homeservers() - .await - .inspect_err(|e| error!("Failed to start event processors run: {e}")); - } - } + _ = internal_shutdown_rx.changed() => { + info!("Internal shutdown received in forwarder, exiting"); } - }) - }; - - // Thread 3: Reserved for future use - let reserved_handle = { - let mut shutdown = shutdown_rx.clone(); - tokio::spawn(async move { - // TODO: Reserved for future use - let _ = shutdown.changed().await; - info!("SIGINT received, exiting reserved loop"); - }) - }; - - let _ = tokio::try_join!(default_hs_handle, external_hss_handle, reserved_handle); + } + "shutdown forwarder" + }); + + // Block until the first task exits for any reason + let mut had_error = false; + + match set.join_next().await { + Some(Ok(label)) => info!("First task to exit: {label}"), + Some(Err(e)) => { + error!("Task failed (panic/cancel): {e}"); + had_error = true; + } + None => unreachable!("JoinSet is non-empty"), + } + + // Signal the remaining tasks to stop gracefully + let _ = internal_shutdown_tx.send(true); + + // Drain remaining tasks + while let Some(result) = set.join_next().await { + match &result { + Ok(label) => info!("Task exited: {label}"), + Err(e) => { + error!("Task failed (panic/cancel): {e}"); + had_error = true; + } + } + } + + if had_error { + return Err("Nexus Watcher stopped: one or more tasks failed".into()); + } + info!("Nexus Watcher shut down gracefully"); Ok(()) } + + fn spawn_processing_loop( + set: &mut tokio::task::JoinSet<&'static str>, + runner: Arc, + mut shutdown: Receiver, + watcher_sleep: u64, + label: &'static str, + process: F, + ) where + F: Fn(Arc) -> Fut + Send + 'static, + Fut: Future> + Send, + { + set.spawn(async move { + let mut interval = tokio::time::interval(Duration::from_millis(watcher_sleep)); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + loop { + tokio::select! { + _ = shutdown.changed() => { + info!("Shutdown received, exiting {label} loop"); + break; + } + _ = interval.tick() => { + debug!("Indexing {label}…"); + _ = process(runner.clone()) + .await + .inspect_err(|e| error!("Failed to run {label} event processor: {e}")); + } + } + } + label + }); + } } diff --git a/nexus-watcher/tests/service/missed_tick_skip.rs b/nexus-watcher/tests/service/missed_tick_skip.rs new file mode 100644 index 00000000..38790e41 --- /dev/null +++ b/nexus-watcher/tests/service/missed_tick_skip.rs @@ -0,0 +1,137 @@ +use anyhow::Result; +use nexus_common::models::event::EventProcessorError; +use nexus_common::types::DynError; +use nexus_watcher::service::{NexusWatcher, TEventProcessor, TEventProcessorRunner}; +use pubky_app_specs::PubkyId; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use tokio::sync::watch::Receiver; + +const TEST_HS_ID: &str = "1hb71xx9km3f4pw5izsy1gn19ff1uuuqonw4mcygzobwkryujoiy"; + +/// Processor that sleeps on the first N invocations and is instant afterward. +/// This pattern is essential: `MissedTickBehavior::Skip` only produces +/// observable waiting when processing time drops *below* the interval after +/// a slow round. When every round exceeds the interval, Skip degenerates to +/// immediate-fire behavior identical to Burst. +struct VariableDelayProcessor { + call_count: AtomicUsize, + slow_first_n: usize, + slow_delay: Duration, + hs_id: PubkyId, +} + +#[async_trait::async_trait] +impl TEventProcessor for VariableDelayProcessor { + fn get_homeserver_id(&self) -> PubkyId { + self.hs_id.clone() + } + + async fn run_internal(self: Arc) -> Result<(), EventProcessorError> { + let n = self.call_count.fetch_add(1, Ordering::SeqCst); + if n < self.slow_first_n { + tokio::time::sleep(self.slow_delay).await; + } + Ok(()) + } +} + +/// Minimal runner that returns the shared `VariableDelayProcessor` on each +/// `build()` call. External homeservers are empty so only the default-HS +/// loop triggers builds. +struct VariableDelayRunner { + processor: Arc, + build_timestamps: std::sync::Mutex>, + shutdown_rx: Receiver, +} + +#[async_trait::async_trait] +impl TEventProcessorRunner for VariableDelayRunner { + fn shutdown_rx(&self) -> Receiver { + self.shutdown_rx.clone() + } + + fn default_homeserver(&self) -> &str { + TEST_HS_ID + } + + fn monitored_homeservers_limit(&self) -> usize { + 0 + } + + async fn external_homeservers_by_priority(&self) -> Result, DynError> { + Ok(vec![]) + } + + /// Records the instant of each `build()` call, then returns the shared processor. + /// + /// `NexusWatcher::run_tasks` calls `build()` on each interval tick for the + /// default homeserver loop. By collecting these timestamps we can assert that + /// calls stay spaced out (Skip) instead of clustering in burst catch-up mode. + async fn build(&self, _homeserver_id: String) -> Result, DynError> { + self.build_timestamps.lock().unwrap().push(Instant::now()); + Ok(self.processor.clone()) + } +} + +/// Verifies that `run_tasks` uses `MissedTickBehavior::Skip`. +/// +/// The first processing round sleeps 500 ms (exceeding the 100 ms interval), +/// causing several ticks to be missed. Subsequent rounds are instant. +/// +/// With **Skip**, missed ticks are dropped and the loop resumes with regular +/// spacing, i.e. no immediate back-to-back `build()` calls. +/// +/// With **Burst** (tokio default), missed ticks fire immediately after the +/// slow round and create near-zero inter-build gaps. +#[tokio_shared_rt::test(shared)] +async fn test_no_burst_after_slow_processing_round() -> Result<()> { + let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false); + + let processor = Arc::new(VariableDelayProcessor { + call_count: AtomicUsize::new(0), + slow_first_n: 1, + slow_delay: Duration::from_millis(500), + hs_id: PubkyId::try_from(TEST_HS_ID).unwrap(), + }); + + let runner = Arc::new(VariableDelayRunner { + processor, + build_timestamps: std::sync::Mutex::new(Vec::new()), + shutdown_rx: shutdown_rx.clone(), + }); + + tokio::spawn(async move { + tokio::time::sleep(Duration::from_millis(1500)).await; + let _ = shutdown_tx.send(true); + }); + + let _ = NexusWatcher::run_tasks(shutdown_rx, runner.clone(), 100).await; + + let timestamps = runner.build_timestamps.lock().unwrap().clone(); + + assert!( + timestamps.len() >= 3, + "Expected at least 3 build() calls, got {}", + timestamps.len() + ); + + // We do not require perfect 100 ms cadence because test hosts can be noisy, + // but any "burst catch-up" would produce near-zero gaps. Keep a conservative + // floor that catches burst behavior without being CI-fragile. + let min_expected_non_burst_gap = Duration::from_millis(30); + + for i in 1..timestamps.len() { + let gap = timestamps[i].duration_since(timestamps[i - 1]); + assert!( + gap >= min_expected_non_burst_gap, + "Gap between build() call {} and {} was {gap:?}, expected >= {min_expected_non_burst_gap:?}. \ + This suggests burst catch-up ticks fired instead of being skipped.", + i - 1, + i, + ); + } + + Ok(()) +} diff --git a/nexus-watcher/tests/service/mod.rs b/nexus-watcher/tests/service/mod.rs index 0b77f7bc..cdf3d9f4 100644 --- a/nexus-watcher/tests/service/mod.rs +++ b/nexus-watcher/tests/service/mod.rs @@ -1,5 +1,7 @@ pub mod event_processing_multiple_homeservers; pub mod event_processor_prioritization; +pub mod missed_tick_skip; pub mod mock_event_processor; +pub mod shutdown_guard; pub mod signal; pub mod utils; diff --git a/nexus-watcher/tests/service/shutdown_guard.rs b/nexus-watcher/tests/service/shutdown_guard.rs new file mode 100644 index 00000000..fb97f207 --- /dev/null +++ b/nexus-watcher/tests/service/shutdown_guard.rs @@ -0,0 +1,143 @@ +use crate::service::utils::{ + create_random_homeservers_and_persist, setup, MockEventProcessorResult, + MockEventProcessorRunner, +}; +use anyhow::Result; +use nexus_common::types::DynError; +use nexus_watcher::service::{ + NexusWatcher, ProcessedStats, TEventProcessor, TEventProcessorRunner, +}; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::watch::Receiver; +use tokio::time::sleep; + +const WATCHER_SLEEP_MS: u64 = 100; +const SHUTDOWN_DELAY_MS: u64 = 500; + +/// Controls which processing path should panic. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +enum PanicTarget { + DefaultHomeserver, + ExternalHomeservers, +} + +/// Wraps a [`MockEventProcessorRunner`] and panics in one selected run method. +struct PanickingRunner { + inner: MockEventProcessorRunner, + panic_target: PanicTarget, +} + +#[async_trait::async_trait] +impl TEventProcessorRunner for PanickingRunner { + fn shutdown_rx(&self) -> Receiver { + self.inner.shutdown_rx() + } + + fn default_homeserver(&self) -> &str { + self.inner.default_homeserver() + } + + fn monitored_homeservers_limit(&self) -> usize { + self.inner.monitored_homeservers_limit() + } + + async fn external_homeservers_by_priority(&self) -> Result, DynError> { + self.inner.external_homeservers_by_priority().await + } + + async fn build(&self, homeserver_id: String) -> Result, DynError> { + self.inner.build(homeserver_id).await + } + + async fn run_default_homeserver(&self) -> Result { + if matches!(self.panic_target, PanicTarget::DefaultHomeserver) { + panic!("simulated default HS task crash"); + } + self.inner.run_default_homeserver().await + } + + async fn run_external_homeservers(&self) -> Result { + if matches!(self.panic_target, PanicTarget::ExternalHomeservers) { + panic!("simulated external HS task crash"); + } + self.inner.run_external_homeservers().await + } +} + +async fn setup_runner_and_channel() -> Result<( + MockEventProcessorRunner, + tokio::sync::watch::Sender, + tokio::sync::watch::Receiver, +)> { + let mut event_processor_list = setup().await?; + let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false); + + for _ in 0..3 { + create_random_homeservers_and_persist( + &mut event_processor_list, + None, + MockEventProcessorResult::Success, + None, + shutdown_rx.clone(), + ) + .await; + } + + let runner = MockEventProcessorRunner::new(event_processor_list, 3, shutdown_rx.clone()); + Ok((runner, shutdown_tx, shutdown_rx)) +} + +async fn assert_panic_propagates(panic_target: PanicTarget) -> Result<()> { + let (inner, _shutdown_tx, shutdown_rx) = setup_runner_and_channel().await?; + let runner = Arc::new(PanickingRunner { + inner, + panic_target, + }); + + let result = NexusWatcher::run_tasks(shutdown_rx, runner, WATCHER_SLEEP_MS).await; + assert!( + result.is_err(), + "run_tasks should return Err when a task panics" + ); + Ok(()) +} + +/// Test A: Sending an external shutdown signal causes `run_tasks` to return `Ok`. +/// Verifies the forwarder task bridges the external signal into the internal channel, +/// causing all processing tasks to exit gracefully. +#[tokio_shared_rt::test(shared)] +async fn test_run_tasks_clean_shutdown_via_external_signal() -> Result<()> { + let (runner, shutdown_tx, shutdown_rx) = setup_runner_and_channel().await?; + let runner = Arc::new(runner); + + tokio::spawn(async move { + sleep(Duration::from_millis(SHUTDOWN_DELAY_MS)).await; + let _ = shutdown_tx.send(true); + }); + + let result = NexusWatcher::run_tasks(shutdown_rx, runner, WATCHER_SLEEP_MS).await; + assert!( + result.is_ok(), + "run_tasks should return Ok on clean shutdown" + ); + Ok(()) +} + +/// Test B: A panic in `run_default_homeserver` causes the spawned task to crash. +/// `JoinSet::join_next` surfaces the panic, then the remaining tasks are signalled +/// to stop via the internal shutdown channel. +/// `run_tasks` returns `Err` because the panicked `JoinError` is observed during drain. +#[tokio_shared_rt::test(shared)] +async fn test_run_tasks_default_hs_panic_propagates_via_guard() -> Result<()> { + assert_panic_propagates(PanicTarget::DefaultHomeserver).await +} + +/// Test C: A panic in `run_external_homeservers` causes the spawned task to crash. +/// `JoinSet::join_next` surfaces the panic, then the remaining tasks are signalled +/// to stop via the internal shutdown channel. +/// `run_tasks` returns `Err` because the panicked `JoinError` is observed during drain. +#[tokio_shared_rt::test(shared)] +async fn test_run_tasks_external_hs_panic_propagates_via_guard() -> Result<()> { + assert_panic_propagates(PanicTarget::ExternalHomeservers).await +}