From 3f23f6903267973817f7fd55a166f4121e01263b Mon Sep 17 00:00:00 2001 From: rebelzion Date: Wed, 29 Apr 2026 04:03:57 -0700 Subject: [PATCH 1/5] =?UTF-8?q?feat(IN-09):=20#79=20#80=20=E2=80=94=20serv?= =?UTF-8?q?er=20map=20+=20reassign=5Fcluster?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace `allocated_servers: Vec` with `servers: HashMap` (cluster_id key) so execute_merge/split can look up and release servers by cluster_id. Add `SpatialIndex::reassign_cluster(from, to)` — O(n) bulk entity reassignment called during merge execution. Closes #79, closes #80. Co-Authored-By: Claude Opus 4.6 --- crates/arcane-infra/src/cluster_manager.rs | 12 ++++++------ crates/arcane-spatial/src/index.rs | 9 +++++++++ 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/crates/arcane-infra/src/cluster_manager.rs b/crates/arcane-infra/src/cluster_manager.rs index e4d088e..cff450c 100644 --- a/crates/arcane-infra/src/cluster_manager.rs +++ b/crates/arcane-infra/src/cluster_manager.rs @@ -17,8 +17,8 @@ pub struct ClusterManager { model: Arc, pool: Arc, spatial_index: SpatialIndex, - /// Allocated cluster servers. active_count = allocated_servers.len(). - allocated_servers: Vec, + /// cluster_id → ServerHandle. One entry per live cluster server. + servers: HashMap, } impl ClusterManager { @@ -31,7 +31,7 @@ impl ClusterManager { model, pool, spatial_index, - allocated_servers: Vec::new(), + servers: HashMap::new(), } } @@ -128,12 +128,12 @@ impl ClusterManager { }; let _decisions = self.model.evaluate(&view); // Minimal apply: if we have clusters in the world and no servers allocated, allocate one. - if !self.allocated_servers.is_empty() { + if !self.servers.is_empty() { return Ok(()); } match self.pool.allocate() { Ok(handle) => { - self.allocated_servers.push(handle); + self.servers.insert(handle.server_id, handle); Ok(()) } Err(e) => Err(format!( @@ -145,7 +145,7 @@ impl ClusterManager { /// Current number of active clusters (for tests / metrics). pub fn active_cluster_count(&self) -> u32 { - self.allocated_servers.len() as u32 + self.servers.len() as u32 } /// Snapshot of cluster geometry from the spatial index (for visualization / debugging). diff --git a/crates/arcane-spatial/src/index.rs b/crates/arcane-spatial/src/index.rs index f9dfa4e..d61d263 100644 --- a/crates/arcane-spatial/src/index.rs +++ b/crates/arcane-spatial/src/index.rs @@ -111,6 +111,15 @@ impl SpatialIndex { cluster_ids } + /// Bulk-reassign all entities currently in `from` to `to`. Called by ClusterManager during merge. + pub fn reassign_cluster(&mut self, from: Uuid, to: Uuid) { + for (cluster_id, _) in self.entities.values_mut() { + if *cluster_id == from { + *cluster_id = to; + } + } + } + /// Return all entities as (entity_id, cluster_id, position) triples. /// Used by ClusterManager to populate WorldStateView.players. pub fn snapshot_entities(&self) -> Vec<(Uuid, Uuid, Vec3)> { From be2f313daac4b61760e8d6ef0acd82f4093ef836 Mon Sep 17 00:00:00 2001 From: rebelzion Date: Wed, 29 Apr 2026 04:04:28 -0700 Subject: [PATCH 2/5] =?UTF-8?q?feat(IN-09):=20#81=20=E2=80=94=20ExecutionC?= =?UTF-8?q?onfig=20and=20guardrail=20state?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add ExecutionConfig struct (min_confidence, merge/split cooldown ticks, max_per_cycle) with sensible defaults. Add merge_cooldowns and split_cooldowns hashmaps to ClusterManager. Tick down and prune cooldowns at the start of each evaluation cycle. Closes #81. Co-Authored-By: Claude Opus 4.6 --- crates/arcane-infra/src/cluster_manager.rs | 41 ++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/crates/arcane-infra/src/cluster_manager.rs b/crates/arcane-infra/src/cluster_manager.rs index cff450c..32ad1eb 100644 --- a/crates/arcane-infra/src/cluster_manager.rs +++ b/crates/arcane-infra/src/cluster_manager.rs @@ -12,6 +12,30 @@ use std::collections::HashMap; use std::sync::Arc; use uuid::Uuid; +/// Guardrails for decision execution. All thresholds are per-cycle or per-pair. +#[derive(Clone, Debug)] +pub struct ExecutionConfig { + /// Minimum model confidence to execute a decision (0.0–1.0). + pub min_confidence: f32, + /// Ticks to suppress further merges involving the surviving cluster after a merge. + pub merge_cooldown_ticks: u32, + /// Ticks to suppress further splits involving either resulting cluster after a split. + pub split_cooldown_ticks: u32, + /// Maximum decisions executed per evaluation cycle (merge + split combined). + pub max_per_cycle: usize, +} + +impl Default for ExecutionConfig { + fn default() -> Self { + Self { + min_confidence: 0.7, + merge_cooldown_ticks: 20, + split_cooldown_ticks: 30, + max_per_cycle: 3, + } + } +} + /// Central coordinator: assignments, topology, clustering model. pub struct ClusterManager { model: Arc, @@ -19,6 +43,11 @@ pub struct ClusterManager { spatial_index: SpatialIndex, /// cluster_id → ServerHandle. One entry per live cluster server. servers: HashMap, + exec_config: ExecutionConfig, + /// cluster_id → remaining cooldown ticks after a merge. + merge_cooldowns: HashMap, + /// cluster_id → remaining cooldown ticks after a split. + split_cooldowns: HashMap, } impl ClusterManager { @@ -32,6 +61,9 @@ impl ClusterManager { pool, spatial_index, servers: HashMap::new(), + exec_config: ExecutionConfig::default(), + merge_cooldowns: HashMap::new(), + split_cooldowns: HashMap::new(), } } @@ -79,6 +111,15 @@ impl ClusterManager { /// Run one evaluation cycle: build view from spatial snapshot, run model, apply decisions. /// Without SpacetimeDB we allocate from pool when we have clusters (entities) and no servers yet. pub fn run_evaluation_cycle(&mut self) -> Result<(), String> { + self.merge_cooldowns + .values_mut() + .for_each(|v| *v = v.saturating_sub(1)); + self.merge_cooldowns.retain(|_, v| *v > 0); + self.split_cooldowns + .values_mut() + .for_each(|v| *v = v.saturating_sub(1)); + self.split_cooldowns.retain(|_, v| *v > 0); + let snapshot = self.spatial_index.snapshot_for_view(); if snapshot.is_empty() { return Ok(()); From 8394a5ec5bee5ff6ceedb1af36c8632fc9ec9268 Mon Sep 17 00:00:00 2001 From: rebelzion Date: Wed, 29 Apr 2026 04:05:28 -0700 Subject: [PATCH 3/5] =?UTF-8?q?feat(IN-09):=20#82=20#83=20=E2=80=94=20exec?= =?UTF-8?q?ute=5Fmerge=20and=20execute=5Fsplit?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add execute_merge(): confidence/cooldown guardrails, SpatialIndex reassign_cluster, pool.release, servers map cleanup, merge cooldown recording. Add execute_split(): confidence/cooldown guardrails, pool.allocate, group_b entity migration via update_entity, split cooldown on both resulting cluster ids. Pool exhaustion is a non-fatal skip. Add tracing dep to arcane-infra for warn! calls. Closes #82, closes #83. Co-Authored-By: Claude Opus 4.6 --- crates/arcane-infra/Cargo.toml | 1 + crates/arcane-infra/src/cluster_manager.rs | 91 +++++++++++++++++++++- 2 files changed, 91 insertions(+), 1 deletion(-) diff --git a/crates/arcane-infra/Cargo.toml b/crates/arcane-infra/Cargo.toml index 7faa036..e2077a1 100644 --- a/crates/arcane-infra/Cargo.toml +++ b/crates/arcane-infra/Cargo.toml @@ -20,6 +20,7 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" tokio = { version = "1", features = ["rt-multi-thread", "net", "sync", "macros"], optional = true } tokio-tungstenite = { version = "0.21", optional = true } +tracing = "0.1" uuid = { version = "1.0", features = ["v4"] } [features] diff --git a/crates/arcane-infra/src/cluster_manager.rs b/crates/arcane-infra/src/cluster_manager.rs index 32ad1eb..4a5ca07 100644 --- a/crates/arcane-infra/src/cluster_manager.rs +++ b/crates/arcane-infra/src/cluster_manager.rs @@ -1,7 +1,7 @@ //! ClusterManager (IN-01) — central coordinator. use arcane_core::{ - clustering_model::{ClusterInfo, PlayerInfo, WorldStateView}, + clustering_model::{ClusterDecision, ClusterInfo, DecisionType, PlayerInfo, WorldStateView}, types::Vec2, IClusteringModel, IServerPool, ServerHandle, }; @@ -184,6 +184,95 @@ impl ClusterManager { } } + /// Execute a Merge decision: migrate all entities from source to target, release source server. + /// Returns Ok(()) for skipped decisions (confidence/cooldown) and Err only for malformed input. + fn execute_merge(&mut self, decision: &ClusterDecision) -> Result<(), String> { + let source = decision + .source_cluster_id + .ok_or("merge decision missing source_cluster_id")?; + let target = decision + .target_cluster_id + .ok_or("merge decision missing target_cluster_id")?; + + if decision.confidence < self.exec_config.min_confidence { + return Ok(()); + } + if self.merge_cooldowns.contains_key(&source) + || self.merge_cooldowns.contains_key(&target) + { + return Ok(()); + } + + self.spatial_index.reassign_cluster(source, target); + if let Err(e) = self.pool.release(source) { + tracing::warn!("merge: pool.release({}) failed: {}", source, e.detail); + } + self.servers.remove(&source); + self.merge_cooldowns + .insert(target, self.exec_config.merge_cooldown_ticks); + Ok(()) + } + + /// Execute a Split decision: allocate a new server, migrate group_b entities to it. + /// Returns Ok(()) for skipped decisions (confidence/cooldown/pool exhaustion) and Err only for + /// malformed input. + fn execute_split(&mut self, decision: &ClusterDecision) -> Result<(), String> { + let cluster = decision + .cluster_id + .ok_or("split decision missing cluster_id")?; + let group_b = decision + .split_group_b + .as_ref() + .ok_or("split decision missing split_group_b")?; + + if group_b.is_empty() { + return Err("split_group_b is empty".to_string()); + } + if decision + .split_group_a + .as_ref() + .map_or(true, |g| g.is_empty()) + { + return Err("split_group_a is empty".to_string()); + } + if decision.confidence < self.exec_config.min_confidence { + return Ok(()); + } + if self.split_cooldowns.contains_key(&cluster) { + return Ok(()); + } + + let new_handle = match self.pool.allocate() { + Ok(h) => h, + Err(e) => { + tracing::warn!("split: pool exhausted, skipping: {}", e.detail); + return Ok(()); + } + }; + let new_cluster_id = new_handle.server_id; + self.servers.insert(new_cluster_id, new_handle); + + // Snapshot current positions before mutating the index. + let positions: HashMap = self + .spatial_index + .snapshot_entities() + .into_iter() + .map(|(eid, _, pos)| (eid, pos)) + .collect(); + + for &entity_id in group_b { + if let Some(&pos) = positions.get(&entity_id) { + self.spatial_index + .update_entity(entity_id, new_cluster_id, pos); + } + } + + let cooldown = self.exec_config.split_cooldown_ticks; + self.split_cooldowns.insert(cluster, cooldown); + self.split_cooldowns.insert(new_cluster_id, cooldown); + Ok(()) + } + /// Current number of active clusters (for tests / metrics). pub fn active_cluster_count(&self) -> u32 { self.servers.len() as u32 From a2f9f1672f54e32ca0c03a92ded4b4f5dad5fbea Mon Sep 17 00:00:00 2001 From: rebelzion Date: Wed, 29 Apr 2026 04:05:50 -0700 Subject: [PATCH 4/5] =?UTF-8?q?feat(IN-09):=20#84=20=E2=80=94=20wire=20exe?= =?UTF-8?q?cute=5Fmerge/split=20into=20run=5Fevaluation=5Fcycle?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace discarded `let _decisions` with a decision execution loop: bootstrap server allocation now happens before model evaluation; model decisions are applied in priority order up to max_per_cycle per tick; execution errors are logged as warnings and do not abort the cycle. Closes #84. Co-Authored-By: Claude Opus 4.6 --- crates/arcane-infra/src/cluster_manager.rs | 41 +++++++++++++++------- 1 file changed, 29 insertions(+), 12 deletions(-) diff --git a/crates/arcane-infra/src/cluster_manager.rs b/crates/arcane-infra/src/cluster_manager.rs index 4a5ca07..4b17e15 100644 --- a/crates/arcane-infra/src/cluster_manager.rs +++ b/crates/arcane-infra/src/cluster_manager.rs @@ -167,21 +167,38 @@ impl ClusterManager { clusters, players, }; - let _decisions = self.model.evaluate(&view); - // Minimal apply: if we have clusters in the world and no servers allocated, allocate one. - if !self.servers.is_empty() { - return Ok(()); + // Bootstrap: allocate one server when clusters exist but none are tracked yet. + if self.servers.is_empty() { + match self.pool.allocate() { + Ok(handle) => { + self.servers.insert(handle.server_id, handle); + } + Err(e) => { + return Err(format!( + "pool allocate failed: {} - {}", + e.code as u32, e.detail + )); + } + } } - match self.pool.allocate() { - Ok(handle) => { - self.servers.insert(handle.server_id, handle); - Ok(()) + + let decisions = self.model.evaluate(&view); + let max = self.exec_config.max_per_cycle; + let mut executed = 0; + for decision in decisions.iter() { + if executed >= max { + break; + } + let result = match decision.decision_type { + DecisionType::Merge => self.execute_merge(decision), + DecisionType::Split => self.execute_split(decision), + }; + match result { + Ok(()) => executed += 1, + Err(e) => tracing::warn!("decision execution error: {}", e), } - Err(e) => Err(format!( - "pool allocate failed: {} - {}", - e.code as u32, e.detail - )), } + Ok(()) } /// Execute a Merge decision: migrate all entities from source to target, release source server. From 42ef47136bc3b8eaeaadc4804c964dce1a3cfce0 Mon Sep 17 00:00:00 2001 From: rebelzion Date: Wed, 29 Apr 2026 04:09:22 -0700 Subject: [PATCH 5/5] =?UTF-8?q?feat(IN-09):=20#85=20=E2=80=94=20integratio?= =?UTF-8?q?n=20tests=20+=20bootstrap=20and=20cooldown=20fixes?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add 7 integration tests: merge execution, split execution, confidence guardrail, merge cooldown, pool exhaustion skip, max_per_cycle cap, cooldown expiry. Fix bootstrap: allocate servers keyed by cluster_id (not server_id) so execute_merge/split can look up the correct handle. Fix cooldown edge case: skip inserting cooldown entry when ticks=0 to avoid blocking same-cycle decisions (would break max_per_cycle=∞ case). Export ExecutionConfig from arcane-infra lib.rs. Closes #85. Co-Authored-By: Claude Opus 4.6 --- crates/arcane-infra/src/cluster_manager.rs | 32 +- crates/arcane-infra/src/lib.rs | 2 +- .../tests/decision_execution_tests.rs | 314 ++++++++++++++++++ 3 files changed, 340 insertions(+), 8 deletions(-) create mode 100644 crates/arcane-infra/tests/decision_execution_tests.rs diff --git a/crates/arcane-infra/src/cluster_manager.rs b/crates/arcane-infra/src/cluster_manager.rs index 4b17e15..b7bebe3 100644 --- a/crates/arcane-infra/src/cluster_manager.rs +++ b/crates/arcane-infra/src/cluster_manager.rs @@ -76,6 +76,12 @@ impl ClusterManager { ) } + /// Override the execution config (for tests or custom deployments). + pub fn with_exec_config(mut self, config: ExecutionConfig) -> Self { + self.exec_config = config; + self + } + /// Create with a named clustering model. Supported values: "rules" (default), "affinity". /// The "affinity" variant requires the `affinity-clustering` feature flag. pub fn with_model(model_type: &str) -> Self { @@ -167,11 +173,19 @@ impl ClusterManager { clusters, players, }; - // Bootstrap: allocate one server when clusters exist but none are tracked yet. - if self.servers.is_empty() { + // Bootstrap: allocate a server for each cluster that doesn't have one yet. + // Key = cluster_id so execute_merge/split can look up and release by cluster_id. + let unserved: Vec = self + .spatial_index + .snapshot_for_view() + .into_iter() + .map(|g| g.cluster_id) + .filter(|id| !self.servers.contains_key(id)) + .collect(); + for cluster_id in unserved { match self.pool.allocate() { Ok(handle) => { - self.servers.insert(handle.server_id, handle); + self.servers.insert(cluster_id, handle); } Err(e) => { return Err(format!( @@ -225,8 +239,10 @@ impl ClusterManager { tracing::warn!("merge: pool.release({}) failed: {}", source, e.detail); } self.servers.remove(&source); - self.merge_cooldowns - .insert(target, self.exec_config.merge_cooldown_ticks); + if self.exec_config.merge_cooldown_ticks > 0 { + self.merge_cooldowns + .insert(target, self.exec_config.merge_cooldown_ticks); + } Ok(()) } @@ -285,8 +301,10 @@ impl ClusterManager { } let cooldown = self.exec_config.split_cooldown_ticks; - self.split_cooldowns.insert(cluster, cooldown); - self.split_cooldowns.insert(new_cluster_id, cooldown); + if cooldown > 0 { + self.split_cooldowns.insert(cluster, cooldown); + self.split_cooldowns.insert(new_cluster_id, cooldown); + } Ok(()) } diff --git a/crates/arcane-infra/src/lib.rs b/crates/arcane-infra/src/lib.rs index 470ef97..38a53fd 100644 --- a/crates/arcane-infra/src/lib.rs +++ b/crates/arcane-infra/src/lib.rs @@ -34,7 +34,7 @@ pub mod ws_server; #[cfg(feature = "cluster-ws")] pub use arcane_core::cluster_simulation::{ClusterSimulation, ClusterTickContext, GameAction}; -pub use cluster_manager::ClusterManager; +pub use cluster_manager::{ClusterManager, ExecutionConfig}; pub use cluster_server::ClusterServer; pub use redis_channel::RedisReplicationChannel; pub use replication_channel_manager::ReplicationChannelManager; diff --git a/crates/arcane-infra/tests/decision_execution_tests.rs b/crates/arcane-infra/tests/decision_execution_tests.rs new file mode 100644 index 0000000..c22ed8c --- /dev/null +++ b/crates/arcane-infra/tests/decision_execution_tests.rs @@ -0,0 +1,314 @@ +//! Integration tests for ClusterManager decision execution (IN-09 #85). +//! +//! Tests: merge, split, confidence guardrail, cooldown guardrail, +//! pool exhaustion, max_per_cycle cap, cooldown expiry. + +use arcane_core::{ + clustering_model::{ + ClusterDecision, DecisionReason, DecisionType, ModelInfo, ValidationResult, WorldStateView, + }, + IClusteringModel, Vec3, +}; +use arcane_infra::{ClusterManager, ExecutionConfig}; +use arcane_pool::LocalPool; +use std::sync::{Arc, Mutex}; +use uuid::Uuid; + +// ── helpers ──────────────────────────────────────────────────────────────── + +fn uid(n: u8) -> Uuid { + Uuid::from_bytes([n, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]) +} + +fn pos(x: f64) -> Vec3 { + Vec3::new(x, 0.0, 0.0) +} + +fn reason() -> DecisionReason { + DecisionReason { + code: "test".to_string(), + detail: String::new(), + } +} + +fn merge_decision(source: Uuid, target: Uuid, confidence: f32) -> ClusterDecision { + ClusterDecision { + decision_type: DecisionType::Merge, + priority: 1, + reason: reason(), + confidence, + source_cluster_id: Some(source), + target_cluster_id: Some(target), + cluster_id: None, + split_group_a: None, + split_group_b: None, + } +} + +fn split_decision(cluster: Uuid, group_a: Vec, group_b: Vec, confidence: f32) -> ClusterDecision { + ClusterDecision { + decision_type: DecisionType::Split, + priority: 1, + reason: reason(), + confidence, + source_cluster_id: None, + target_cluster_id: None, + cluster_id: Some(cluster), + split_group_a: Some(group_a), + split_group_b: Some(group_b), + } +} + +// ── mock model ───────────────────────────────────────────────────────────── + +/// Returns a fixed list of decisions on every evaluate() call. +struct MockModel { + decisions: Mutex>, +} + +impl MockModel { + fn new(decisions: Vec) -> Arc { + Arc::new(Self { + decisions: Mutex::new(decisions), + }) + } + /// Replace the decision list mid-test (for multi-cycle tests). + fn set_decisions(&self, decisions: Vec) { + *self.decisions.lock().unwrap() = decisions; + } +} + +impl IClusteringModel for MockModel { + fn evaluate(&self, _view: &WorldStateView) -> Vec { + self.decisions.lock().unwrap().clone() + } + fn get_model_info(&self) -> ModelInfo { + ModelInfo { + model_type: "mock".to_string(), + version: "0".to_string(), + trained_at: None, + feature_count: None, + } + } + fn validate_view(&self, _view: &WorldStateView) -> ValidationResult { + ValidationResult { + valid: true, + warnings: vec![], + errors: vec![], + } + } +} + +// ── tests ────────────────────────────────────────────────────────────────── + +/// After a merge (A→B) all entities previously in A are now in B, and active_cluster_count drops. +#[test] +fn merge_moves_entities_and_releases_server() { + let model = MockModel::new(vec![]); + let pool = Arc::new(LocalPool::new(8)); + let mut mgr = ClusterManager::new(model.clone(), pool, arcane_spatial::SpatialIndex::new()); + + let (ca, cb) = (uid(1), uid(2)); + mgr.update_entity(uid(10), ca, pos(0.0)); + mgr.update_entity(uid(11), ca, pos(1.0)); + mgr.update_entity(uid(20), cb, pos(100.0)); + + // Bootstrap: allocate a server for each cluster. + mgr.run_evaluation_cycle().unwrap(); + assert_eq!(mgr.active_cluster_count(), 2); + + // Issue a merge decision. + model.set_decisions(vec![merge_decision(ca, cb, 0.9)]); + mgr.run_evaluation_cycle().unwrap(); + + // All entities should now be in cb. + let snapshot = mgr.snapshot_for_view(); + let ca_geom = snapshot.iter().find(|g| g.cluster_id == ca); + let cb_geom = snapshot.iter().find(|g| g.cluster_id == cb); + assert!(ca_geom.is_none(), "cluster A should be gone after merge"); + assert!(cb_geom.is_some(), "cluster B should still exist"); + assert_eq!(cb_geom.unwrap().entity_count, 3, "all 3 entities now in B"); + assert_eq!(mgr.active_cluster_count(), 1); +} + +/// After a split, group_b entities belong to a new cluster; group_a stays; server count increases. +#[test] +fn split_creates_new_cluster_and_partitions_entities() { + let (e1, e2, e3, e4) = (uid(1), uid(2), uid(3), uid(4)); + let cluster = uid(10); + let model = MockModel::new(vec![]); + let pool = Arc::new(LocalPool::new(8)); + let mut mgr = ClusterManager::new(model.clone(), pool, arcane_spatial::SpatialIndex::new()); + + for &(eid, x) in &[(e1, 0.0), (e2, 1.0), (e3, 100.0), (e4, 101.0)] { + mgr.update_entity(eid, cluster, pos(x)); + } + + mgr.run_evaluation_cycle().unwrap(); + assert_eq!(mgr.active_cluster_count(), 1); + + let decision = split_decision(cluster, vec![e1, e2], vec![e3, e4], 0.9); + model.set_decisions(vec![decision]); + mgr.run_evaluation_cycle().unwrap(); + + let snapshot = mgr.snapshot_for_view(); + assert_eq!(snapshot.len(), 2, "two clusters after split"); + assert_eq!(mgr.active_cluster_count(), 2); + + // Original cluster retains group_a entities (e1, e2). + let orig = snapshot.iter().find(|g| g.cluster_id == cluster).unwrap(); + assert_eq!(orig.entity_count, 2); +} + +/// A decision with confidence below the threshold is silently skipped. +#[test] +fn merge_skipped_below_confidence_threshold() { + let model = MockModel::new(vec![]); + let pool = Arc::new(LocalPool::new(8)); + let mut mgr = ClusterManager::new(model.clone(), pool, arcane_spatial::SpatialIndex::new()) + .with_exec_config(ExecutionConfig { + min_confidence: 0.7, + ..ExecutionConfig::default() + }); + + let (ca, cb) = (uid(1), uid(2)); + mgr.update_entity(uid(10), ca, pos(0.0)); + mgr.update_entity(uid(20), cb, pos(100.0)); + mgr.run_evaluation_cycle().unwrap(); + assert_eq!(mgr.active_cluster_count(), 2); + + // Confidence 0.5 < 0.7 threshold → skip. + model.set_decisions(vec![merge_decision(ca, cb, 0.5)]); + mgr.run_evaluation_cycle().unwrap(); + + // Still 2 clusters, nothing moved. + assert_eq!(mgr.active_cluster_count(), 2); + let snapshot = mgr.snapshot_for_view(); + assert!(snapshot.iter().any(|g| g.cluster_id == ca)); + assert!(snapshot.iter().any(|g| g.cluster_id == cb)); +} + +/// After a merge, a second merge involving the surviving cluster is blocked by cooldown. +#[test] +fn merge_skipped_during_cooldown() { + let model = MockModel::new(vec![]); + let pool = Arc::new(LocalPool::new(8)); + let mut mgr = ClusterManager::new(model.clone(), pool, arcane_spatial::SpatialIndex::new()) + .with_exec_config(ExecutionConfig { + merge_cooldown_ticks: 10, + ..ExecutionConfig::default() + }); + + let (ca, cb, cc) = (uid(1), uid(2), uid(3)); + mgr.update_entity(uid(10), ca, pos(0.0)); + mgr.update_entity(uid(20), cb, pos(100.0)); + mgr.update_entity(uid(30), cc, pos(200.0)); + mgr.run_evaluation_cycle().unwrap(); + assert_eq!(mgr.active_cluster_count(), 3); + + // Merge A→B (succeeds). + model.set_decisions(vec![merge_decision(ca, cb, 0.9)]); + mgr.run_evaluation_cycle().unwrap(); + assert_eq!(mgr.active_cluster_count(), 2); + + // Next tick: try merge C→B — B is under cooldown, skip. + model.set_decisions(vec![merge_decision(cc, cb, 0.9)]); + mgr.run_evaluation_cycle().unwrap(); + assert_eq!(mgr.active_cluster_count(), 2, "merge blocked by cooldown on B"); +} + +/// Pool exhaustion on allocate() during a split is a non-fatal skip — cluster is unchanged. +#[test] +fn split_skipped_on_pool_exhaustion() { + let (e1, e2, e3, e4) = (uid(1), uid(2), uid(3), uid(4)); + let cluster = uid(10); + let model = MockModel::new(vec![]); + // capacity=1: bootstrap uses the only server, leaving pool empty for the split. + let pool = Arc::new(LocalPool::new(1)); + let mut mgr = ClusterManager::new(model.clone(), pool, arcane_spatial::SpatialIndex::new()); + + for &(eid, x) in &[(e1, 0.0), (e2, 1.0), (e3, 100.0), (e4, 101.0)] { + mgr.update_entity(eid, cluster, pos(x)); + } + + mgr.run_evaluation_cycle().unwrap(); + assert_eq!(mgr.active_cluster_count(), 1); + + let decision = split_decision(cluster, vec![e1, e2], vec![e3, e4], 0.9); + model.set_decisions(vec![decision]); + // Pool exhausted → skip; should NOT error. + mgr.run_evaluation_cycle().unwrap(); + + // Still 1 cluster, all 4 entities still in it. + assert_eq!(mgr.active_cluster_count(), 1); + let geom = mgr.snapshot_for_view(); + assert_eq!(geom.len(), 1); + assert_eq!(geom[0].entity_count, 4); +} + +/// Only max_per_cycle decisions are executed per tick; the rest are deferred. +#[test] +fn max_per_cycle_cap_enforced() { + // Build 4 clusters that could be merged into cluster 0. + let clusters: Vec = (0..5).map(uid).collect(); + let model = MockModel::new(vec![]); + let pool = Arc::new(LocalPool::new(16)); + let mut mgr = ClusterManager::new(model.clone(), pool, arcane_spatial::SpatialIndex::new()) + .with_exec_config(ExecutionConfig { + max_per_cycle: 2, + merge_cooldown_ticks: 0, // no cooldown to isolate cap behavior + ..ExecutionConfig::default() + }); + + for (i, &cid) in clusters.iter().enumerate() { + mgr.update_entity(uid(20 + i as u8), cid, pos(i as f64 * 50.0)); + } + mgr.run_evaluation_cycle().unwrap(); + assert_eq!(mgr.active_cluster_count(), 5); + + // 4 merge decisions targeting clusters[0]; only 2 should execute. + let decisions: Vec = (1..5) + .map(|i| merge_decision(uid(i), uid(0), 0.9)) + .collect(); + model.set_decisions(decisions); + mgr.run_evaluation_cycle().unwrap(); + + // 5 - 2 = 3 clusters remaining after exactly 2 merges. + assert_eq!(mgr.active_cluster_count(), 3); +} + +/// After cooldown_ticks evaluation cycles the blocked cluster can merge again. +#[test] +fn cooldown_expires_after_n_ticks() { + let model = MockModel::new(vec![]); + let pool = Arc::new(LocalPool::new(8)); + let cooldown = 3u32; + let mut mgr = ClusterManager::new(model.clone(), pool, arcane_spatial::SpatialIndex::new()) + .with_exec_config(ExecutionConfig { + merge_cooldown_ticks: cooldown, + ..ExecutionConfig::default() + }); + + let (ca, cb, cc) = (uid(1), uid(2), uid(3)); + mgr.update_entity(uid(10), ca, pos(0.0)); + mgr.update_entity(uid(20), cb, pos(100.0)); + mgr.update_entity(uid(30), cc, pos(200.0)); + mgr.run_evaluation_cycle().unwrap(); + assert_eq!(mgr.active_cluster_count(), 3); + + // First merge A→B; B now under cooldown for 3 ticks. + model.set_decisions(vec![merge_decision(ca, cb, 0.9)]); + mgr.run_evaluation_cycle().unwrap(); + assert_eq!(mgr.active_cluster_count(), 2); + + // Idle cycles to burn down the cooldown. + model.set_decisions(vec![]); + for _ in 0..cooldown { + mgr.run_evaluation_cycle().unwrap(); + } + + // Now merge C→B should succeed (cooldown expired). + model.set_decisions(vec![merge_decision(cc, cb, 0.9)]); + mgr.run_evaluation_cycle().unwrap(); + assert_eq!(mgr.active_cluster_count(), 1, "second merge should execute after cooldown"); +}