Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion nexus-common/src/db/graph/queries/put.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ pub fn create_post(
.param("content", post.content.to_string())
.param("indexed_at", post.indexed_at)
.param("kind", kind.trim_matches('"'))
.param("attachments", post.attachments.clone().unwrap_or(vec![]));
.param("attachments", post.attachments.clone().unwrap_or_default());

// Handle "replied" relationship
cypher_query = add_relationship_params(
Expand Down
192 changes: 122 additions & 70 deletions nexus-watcher/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ mod processor_runner;
mod stats;
mod traits;

/// Module exports
pub use constants::{PROCESSING_TIMEOUT_SECS, WATCHER_CONFIG_FILE_NAME};
use nexus_common::types::DynError;
pub use processor::EventProcessor;
pub use processor_runner::EventProcessorRunner;
pub use stats::ProcessedStats;
pub use traits::{TEventProcessor, TEventProcessorRunner};

use crate::NexusWatcherBuilder;
Expand All @@ -17,6 +17,7 @@ use nexus_common::models::homeserver::Homeserver;
use nexus_common::utils::create_shutdown_rx;
use nexus_common::{DaemonConfig, WatcherConfig};
use pubky_app_specs::PubkyId;
use std::future::Future;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::watch::Receiver;
Expand Down Expand Up @@ -68,84 +69,135 @@ impl NexusWatcher {
NexusWatcherBuilder(watcher_config).start(shutdown_rx).await
}

/// Starts the Nexus Watcher with 3 parallel loops:
///
/// 1. **Default homeserver loop**: Processes events from the default homeserver defined in [`WatcherConfig`].
/// 2. **External homeservers loop**: Processes events from all external monitored homeservers, excluding the default.
/// 3. **Reserved loop**: Placeholder for future use.
///
/// All loops share the same tick interval ([`WatcherConfig::watcher_sleep`]) and listen for
/// the shutdown signal to exit gracefully.
/// Initializes configuration, persists the default homeserver, and delegates to [`Self::run_tasks`].
pub async fn start(shutdown_rx: Receiver<bool>, config: WatcherConfig) -> Result<(), DynError> {
debug!(?config, "Running NexusWatcher with ");

let config_hs = PubkyId::try_from(config.homeserver.as_str())?;
Homeserver::persist_if_unknown(config_hs).await?;

let watcher_sleep = config.watcher_sleep;
let ev_processor_runner = EventProcessorRunner::from_config(&config, shutdown_rx.clone());
let ev_processor_runner = Arc::new(ev_processor_runner);

// Thread 1: Default homeserver processing
let default_hs_handle = {
let runner = ev_processor_runner.clone();
let mut shutdown = shutdown_rx.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_millis(watcher_sleep));
loop {
tokio::select! {
_ = shutdown.changed() => {
info!("SIGINT received, exiting default homeserver loop");
break;
}
_ = interval.tick() => {
debug!("Indexing default homeserver…");
_ = runner
.run_default_homeserver()
.await
.inspect_err(|e| error!("Failed to run default homeserver event processor: {e}"));
}
}
let runner = EventProcessorRunner::from_config(&config, shutdown_rx.clone());
Self::run_tasks(shutdown_rx, Arc::new(runner), config.watcher_sleep).await
}

/// Spawns processing tasks and waits for completion using a [`tokio::task::JoinSet`].
///
/// Three parallel tasks are spawned:
/// 1. **Default homeserver task**: calls [`TEventProcessorRunner::run_default_homeserver`] each tick.
/// 2. **External homeservers task**: calls [`TEventProcessorRunner::run_external_homeservers`] each tick.
/// 3. **Shutdown forwarder task**: bridges the external `shutdown_rx` (e.g. SIGINT) into an internal channel.
///
/// When any task exits (normally or via panic), `JoinSet::join_next` returns and the
/// remaining tasks are signalled to shut down gracefully via the internal channel.
///
/// Separated from [`Self::start`] to allow injection of mock runners in tests.
pub async fn run_tasks(
shutdown_rx: Receiver<bool>,
runner: Arc<dyn TEventProcessorRunner>,
watcher_sleep: u64,
) -> Result<(), DynError> {
let (internal_shutdown_tx, internal_shutdown_rx) = tokio::sync::watch::channel(false);
let mut set: tokio::task::JoinSet<&'static str> = tokio::task::JoinSet::new();

// Task 1: Default homeserver processing
Self::spawn_processing_loop(
&mut set,
runner.clone(),
internal_shutdown_rx.clone(),
watcher_sleep,
"default homeserver",
|runner| async move { runner.run_default_homeserver().await },
);

// Task 2: External homeservers processing
Self::spawn_processing_loop(
&mut set,
runner.clone(),
internal_shutdown_rx.clone(),
watcher_sleep,
"external homeservers",
|runner| async move { runner.run_external_homeservers().await },
);

// Task 3: Forwarder — bridges external SIGINT into the internal shutdown channel
set.spawn(async move {
let mut shutdown_rx = shutdown_rx;
let mut internal_shutdown_rx = internal_shutdown_rx;
tokio::select! {
_ = shutdown_rx.changed() => {
info!("SIGINT received, forwarding shutdown to watcher tasks");
}
})
};

// Thread 2: External homeservers processing
let external_hss_handle = {
let runner = ev_processor_runner.clone();
let mut shutdown = shutdown_rx.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_millis(watcher_sleep));
loop {
tokio::select! {
_ = shutdown.changed() => {
info!("SIGINT received, exiting other homeservers loop");
break;
}
_ = interval.tick() => {
debug!("Indexing other homeservers…");
_ = runner
.run_external_homeservers()
.await
.inspect_err(|e| error!("Failed to start event processors run: {e}"));
}
}
_ = internal_shutdown_rx.changed() => {
info!("Internal shutdown received in forwarder, exiting");
}
})
};

// Thread 3: Reserved for future use
let reserved_handle = {
let mut shutdown = shutdown_rx.clone();
tokio::spawn(async move {
// TODO: Reserved for future use
let _ = shutdown.changed().await;
info!("SIGINT received, exiting reserved loop");
})
};

let _ = tokio::try_join!(default_hs_handle, external_hss_handle, reserved_handle);
}
"shutdown forwarder"
});

// Block until the first task exits for any reason
let mut had_error = false;

match set.join_next().await {
Some(Ok(label)) => info!("First task to exit: {label}"),
Some(Err(e)) => {
error!("Task failed (panic/cancel): {e}");
had_error = true;
}
None => unreachable!("JoinSet is non-empty"),
}

// Signal the remaining tasks to stop gracefully
let _ = internal_shutdown_tx.send(true);

// Drain remaining tasks
while let Some(result) = set.join_next().await {
match &result {
Ok(label) => info!("Task exited: {label}"),
Err(e) => {
error!("Task failed (panic/cancel): {e}");
had_error = true;
}
}
}

if had_error {
return Err("Nexus Watcher stopped: one or more tasks failed".into());
}

info!("Nexus Watcher shut down gracefully");
Ok(())
}

fn spawn_processing_loop<F, Fut>(
set: &mut tokio::task::JoinSet<&'static str>,
runner: Arc<dyn TEventProcessorRunner>,
mut shutdown: Receiver<bool>,
watcher_sleep: u64,
label: &'static str,
process: F,
) where
F: Fn(Arc<dyn TEventProcessorRunner>) -> Fut + Send + 'static,
Fut: Future<Output = Result<ProcessedStats, DynError>> + Send,
{
set.spawn(async move {
let mut interval = tokio::time::interval(Duration::from_millis(watcher_sleep));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

loop {
tokio::select! {
_ = shutdown.changed() => {
info!("Shutdown received, exiting {label} loop");
break;
}
_ = interval.tick() => {
debug!("Indexing {label}…");
_ = process(runner.clone())
.await
.inspect_err(|e| error!("Failed to run {label} event processor: {e}"));
}
}
}
label
});
}
}
137 changes: 137 additions & 0 deletions nexus-watcher/tests/service/missed_tick_skip.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
use anyhow::Result;
use nexus_common::models::event::EventProcessorError;
use nexus_common::types::DynError;
use nexus_watcher::service::{NexusWatcher, TEventProcessor, TEventProcessorRunner};
use pubky_app_specs::PubkyId;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::watch::Receiver;

const TEST_HS_ID: &str = "1hb71xx9km3f4pw5izsy1gn19ff1uuuqonw4mcygzobwkryujoiy";

/// Processor that sleeps on the first N invocations and is instant afterward.
/// This pattern is essential: `MissedTickBehavior::Skip` only produces
/// observable waiting when processing time drops *below* the interval after
/// a slow round. When every round exceeds the interval, Skip degenerates to
/// immediate-fire behavior identical to Burst.
struct VariableDelayProcessor {
call_count: AtomicUsize,
slow_first_n: usize,
slow_delay: Duration,
hs_id: PubkyId,
}

#[async_trait::async_trait]
impl TEventProcessor for VariableDelayProcessor {
fn get_homeserver_id(&self) -> PubkyId {
self.hs_id.clone()
}

async fn run_internal(self: Arc<Self>) -> Result<(), EventProcessorError> {
let n = self.call_count.fetch_add(1, Ordering::SeqCst);
if n < self.slow_first_n {
tokio::time::sleep(self.slow_delay).await;
}
Ok(())
}
}

/// Minimal runner that returns the shared `VariableDelayProcessor` on each
/// `build()` call. External homeservers are empty so only the default-HS
/// loop triggers builds.
struct VariableDelayRunner {
processor: Arc<VariableDelayProcessor>,
build_timestamps: std::sync::Mutex<Vec<Instant>>,
shutdown_rx: Receiver<bool>,
}

#[async_trait::async_trait]
impl TEventProcessorRunner for VariableDelayRunner {
fn shutdown_rx(&self) -> Receiver<bool> {
self.shutdown_rx.clone()
}

fn default_homeserver(&self) -> &str {
TEST_HS_ID
}

fn monitored_homeservers_limit(&self) -> usize {
0
}

async fn external_homeservers_by_priority(&self) -> Result<Vec<String>, DynError> {
Ok(vec![])
}

/// Records the instant of each `build()` call, then returns the shared processor.
///
/// `NexusWatcher::run_tasks` calls `build()` on each interval tick for the
/// default homeserver loop. By collecting these timestamps we can assert that
/// calls stay spaced out (Skip) instead of clustering in burst catch-up mode.
async fn build(&self, _homeserver_id: String) -> Result<Arc<dyn TEventProcessor>, DynError> {
self.build_timestamps.lock().unwrap().push(Instant::now());
Ok(self.processor.clone())
}
}

/// Verifies that `run_tasks` uses `MissedTickBehavior::Skip`.
///
/// The first processing round sleeps 500 ms (exceeding the 100 ms interval),
/// causing several ticks to be missed. Subsequent rounds are instant.
///
/// With **Skip**, missed ticks are dropped and the loop resumes with regular
/// spacing, i.e. no immediate back-to-back `build()` calls.
///
/// With **Burst** (tokio default), missed ticks fire immediately after the
/// slow round and create near-zero inter-build gaps.
#[tokio_shared_rt::test(shared)]
async fn test_no_burst_after_slow_processing_round() -> Result<()> {
let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);

let processor = Arc::new(VariableDelayProcessor {
call_count: AtomicUsize::new(0),
slow_first_n: 1,
slow_delay: Duration::from_millis(500),
hs_id: PubkyId::try_from(TEST_HS_ID).unwrap(),
});

let runner = Arc::new(VariableDelayRunner {
processor,
build_timestamps: std::sync::Mutex::new(Vec::new()),
shutdown_rx: shutdown_rx.clone(),
});

tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(1500)).await;
let _ = shutdown_tx.send(true);
});

let _ = NexusWatcher::run_tasks(shutdown_rx, runner.clone(), 100).await;

let timestamps = runner.build_timestamps.lock().unwrap().clone();

assert!(
timestamps.len() >= 3,
"Expected at least 3 build() calls, got {}",
timestamps.len()
);

// We do not require perfect 100 ms cadence because test hosts can be noisy,
// but any "burst catch-up" would produce near-zero gaps. Keep a conservative
// floor that catches burst behavior without being CI-fragile.
let min_expected_non_burst_gap = Duration::from_millis(30);

for i in 1..timestamps.len() {
let gap = timestamps[i].duration_since(timestamps[i - 1]);
assert!(
gap >= min_expected_non_burst_gap,
"Gap between build() call {} and {} was {gap:?}, expected >= {min_expected_non_burst_gap:?}. \
This suggests burst catch-up ticks fired instead of being skipped.",
i - 1,
i,
);
}

Ok(())
}
2 changes: 2 additions & 0 deletions nexus-watcher/tests/service/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
pub mod event_processing_multiple_homeservers;
pub mod event_processor_prioritization;
pub mod missed_tick_skip;
pub mod mock_event_processor;
pub mod shutdown_guard;
pub mod signal;
pub mod utils;
Loading