From 3c2d3ca7a8c74920bfce9c5b5fd0d80ef9a53497 Mon Sep 17 00:00:00 2001 From: Adrien Guillo Date: Mon, 1 Dec 2025 13:50:23 -0800 Subject: [PATCH] Improve control plane logging --- quickwit/quickwit-common/src/metrics.rs | 20 +- quickwit/quickwit-common/src/pretty.rs | 75 +++++-- .../src/control_plane.rs | 171 +++++++++------ .../src/indexing_scheduler/mod.rs | 4 +- .../src/ingest/ingest_controller.rs | 59 ++++-- .../src/ingest/scaling_arbiter.rs | 23 +++ .../quickwit-control-plane/src/metrics.rs | 70 +++++-- .../quickwit-control-plane/src/model/mod.rs | 19 +- .../src/model/shard_table.rs | 194 +++++++++++------- .../quickwit-ingest/src/ingest_v2/ingester.rs | 20 +- .../quickwit-ingest/src/ingest_v2/state.rs | 20 +- quickwit/quickwit-proto/src/types/position.rs | 43 +++- 12 files changed, 490 insertions(+), 228 deletions(-) diff --git a/quickwit/quickwit-common/src/metrics.rs b/quickwit/quickwit-common/src/metrics.rs index db88280794f..193def5e01a 100644 --- a/quickwit/quickwit-common/src/metrics.rs +++ b/quickwit/quickwit-common/src/metrics.rs @@ -13,9 +13,8 @@ // limitations under the License. use std::collections::{BTreeMap, HashMap}; -use std::sync::OnceLock; +use std::sync::{LazyLock, OnceLock}; -use once_cell::sync::Lazy; use prometheus::{Gauge, HistogramOpts, Opts, TextEncoder}; pub use prometheus::{ Histogram, HistogramTimer, HistogramVec as PrometheusHistogramVec, IntCounter, @@ -438,16 +437,17 @@ impl InFlightDataGauges { } } -/// This function returns `index_name` or projects it to `` if per-index metrics are disabled. -pub fn index_label(index_name: &str) -> &str { - static PER_INDEX_METRICS_ENABLED: OnceLock = OnceLock::new(); - let per_index_metrics_enabled: bool = *PER_INDEX_METRICS_ENABLED - .get_or_init(|| !crate::get_bool_from_env("QW_DISABLE_PER_INDEX_METRICS", false)); - if per_index_metrics_enabled { - index_name +/// This function returns `index_id` as is if per-index metrics are enabled, or projects it to +/// `"__any__"` otherwise. +pub fn index_label(index_id: &str) -> &str { + static PER_INDEX_METRICS_ENABLED: LazyLock = + LazyLock::new(|| !crate::get_bool_from_env("QW_DISABLE_PER_INDEX_METRICS", false)); + + if *PER_INDEX_METRICS_ENABLED { + index_id } else { "__any__" } } -pub static MEMORY_METRICS: Lazy = Lazy::new(MemoryMetrics::default); +pub static MEMORY_METRICS: LazyLock = LazyLock::new(MemoryMetrics::default); diff --git a/quickwit/quickwit-common/src/pretty.rs b/quickwit/quickwit-common/src/pretty.rs index 77f1b9c9c5f..ae2ce56113a 100644 --- a/quickwit/quickwit-common/src/pretty.rs +++ b/quickwit/quickwit-common/src/pretty.rs @@ -23,14 +23,15 @@ impl PrettySample { } } -impl fmt::Debug for PrettySample +impl fmt::Debug for PrettySample where - I: IntoIterator + Clone, - T: fmt::Debug, + I: IntoIterator + Clone, + I::Item: fmt::Debug, { fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { write!(formatter, "[")?; - // in general we will get passed a reference (&[...], &HashMap...) or a Map<_> of them. + + // In general, we will receive a reference (&[...], &HashMap...) or a Map<_> of them. // So we either perform a Copy, or a cheap Clone of a simple struct let mut iter = self.0.clone().into_iter().enumerate(); for (i, item) in &mut iter { @@ -55,9 +56,9 @@ pub trait PrettyDisplay { fn pretty_display(&self) -> impl fmt::Display; } -struct PrettyDurationDisplay<'a>(&'a Duration); +struct DurationPrettyDisplay<'a>(&'a Duration); -impl fmt::Display for PrettyDurationDisplay<'_> { +impl fmt::Display for DurationPrettyDisplay<'_> { fn fmt(&self, formatter: &mut fmt::Formatter) -> fmt::Result { // This is enough for my current use cases. To be extended as you see fit. let duration_millis = self.0.as_millis(); @@ -76,7 +77,37 @@ impl fmt::Display for PrettyDurationDisplay<'_> { impl PrettyDisplay for Duration { fn pretty_display(&self) -> impl fmt::Display { - PrettyDurationDisplay(self) + DurationPrettyDisplay(self) + } +} + +struct SequencePrettyDisplay(I); + +impl fmt::Display for SequencePrettyDisplay +where + I: IntoIterator + Clone, + I::Item: fmt::Display, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "[")?; + + // In general, we will receive a reference (&[...], &HashMap...) or a Map<_> of them. + // So we either perform a Copy, or a cheap Clone of a simple struct + let mut iter = self.0.clone().into_iter().peekable(); + + while let Some(item) = iter.next() { + write!(f, "{item}")?; + if iter.peek().is_some() { + write!(f, ", ")?; + } + } + write!(f, "]") + } +} + +impl PrettyDisplay for &[T] { + fn pretty_display(&self) -> impl fmt::Display { + SequencePrettyDisplay(*self) } } @@ -103,17 +134,29 @@ mod tests { } #[test] - fn test_pretty_duration() { - let pretty_duration = Duration::from_millis(0); - assert_eq!(format!("{}", pretty_duration.pretty_display()), "0ms"); + fn test_duration_pretty_display() { + let duration = Duration::from_millis(0); + assert_eq!(format!("{}", duration.pretty_display()), "0ms"); - let pretty_duration = Duration::from_millis(125); - assert_eq!(format!("{}", pretty_duration.pretty_display()), "125ms"); + let duration = Duration::from_millis(125); + assert_eq!(format!("{}", duration.pretty_display()), "125ms"); + + let duration = Duration::from_millis(1_000); + assert_eq!(format!("{}", duration.pretty_display()), "1.0s"); + + let duration = Duration::from_millis(1_125); + assert_eq!(format!("{}", duration.pretty_display()), "1.12s"); + } + + #[test] + fn test_sequence_pretty_display() { + let empty_slice: &[i32] = &[]; + assert_eq!(format!("{}", empty_slice.pretty_display()), "[]"); - let pretty_duration = Duration::from_millis(1_000); - assert_eq!(format!("{}", pretty_duration.pretty_display()), "1.0s"); + let slice_one: &[i32] = &[1]; + assert_eq!(format!("{}", slice_one.pretty_display()), "[1]"); - let pretty_duration = Duration::from_millis(1_125); - assert_eq!(format!("{}", pretty_duration.pretty_display()), "1.12s"); + let slice_two: &[i32] = &[1, 2]; + assert_eq!(format!("{}", slice_two.pretty_display()), "[1, 2]"); } } diff --git a/quickwit/quickwit-control-plane/src/control_plane.rs b/quickwit/quickwit-control-plane/src/control_plane.rs index 451e6651681..01530ea422f 100644 --- a/quickwit/quickwit-control-plane/src/control_plane.rs +++ b/quickwit/quickwit-control-plane/src/control_plane.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeSet, HashMap}; +use std::collections::{BTreeMap, BTreeSet}; use std::fmt; use std::fmt::Formatter; use std::num::NonZeroUsize; @@ -22,6 +22,7 @@ use anyhow::Context; use async_trait::async_trait; use futures::stream::FuturesUnordered; use futures::{Future, StreamExt}; +use itertools::Itertools; use quickwit_actors::{ Actor, ActorContext, ActorExitStatus, ActorHandle, DeferableReplyHandler, Handler, Mailbox, Supervisor, Universe, WeakMailbox, @@ -29,6 +30,7 @@ use quickwit_actors::{ use quickwit_cluster::{ ClusterChange, ClusterChangeStream, ClusterChangeStreamFactory, ClusterNode, }; +use quickwit_common::pretty::PrettyDisplay; use quickwit_common::pubsub::EventSubscriber; use quickwit_common::uri::Uri; use quickwit_common::{Progress, shared_consts}; @@ -52,7 +54,7 @@ use quickwit_proto::types::{IndexId, IndexUid, NodeId, ShardId, SourceId, Source use serde::Serialize; use serde_json::{Value as JsonValue, json}; use tokio::sync::watch; -use tracing::{debug, error, info}; +use tracing::{Level, debug, enabled, error, info}; use crate::IndexerPool; use crate::cooldown_map::{CooldownMap, CooldownStatus}; @@ -219,6 +221,7 @@ impl Actor for ControlPlane { async fn initialize(&mut self, ctx: &ActorContext) -> Result<(), ActorExitStatus> { crate::metrics::CONTROL_PLANE_METRICS.restart_total.inc(); + self.model .load_from_metastore(&mut self.metastore, ctx.progress()) .await @@ -312,7 +315,12 @@ impl ControlPlane { shard_ids: &[ShardId], progress: &Progress, ) -> anyhow::Result<()> { - info!(shard_ids=?shard_ids, source_uid=?source_uid, "deleting shards"); + debug!( + index_uid=%source_uid.index_uid, + source_id=%source_uid.source_id, + shard_ids=%shard_ids.pretty_display(), + "deleting shards" + ); let delete_shards_request = DeleteShardsRequest { index_uid: Some(source_uid.index_uid.clone()), source_id: source_uid.source_id.clone(), @@ -329,7 +337,15 @@ impl ControlPlane { .protect_future(self.metastore.delete_shards(delete_shards_request)) .await .context("failed to delete shards from metastore")?; + self.model.delete_shards(source_uid, shard_ids); + + info!( + index_uid=%source_uid.index_uid, + source_id=%source_uid.source_id, + shard_ids=%shard_ids.pretty_display(), + "deleted shards" + ); Ok(()) } @@ -351,30 +367,33 @@ impl ControlPlane { }) .unwrap_or_default(); - let mut per_index_shards_json: HashMap> = HashMap::new(); + let mut per_index_and_leader_shards_json: BTreeMap< + IndexUid, + BTreeMap>, + > = BTreeMap::new(); for (source_uid, shard_entries) in self.model.all_shards_with_source() { - let index_uid = source_uid.index_uid.clone(); - let source_id = source_uid.source_id.clone(); - let shards_json = shard_entries.map(|shard_entry| { - json!({ - "index_uid": index_uid, - "source_id": source_id, - "shard_id": shard_entry.shard_id.clone(), + for shard_entry in shard_entries { + let shard_json = json!({ + "index_uid": source_uid.index_uid, + "source_id": source_uid.source_id, + "shard_id": shard_entry.shard_id, "shard_state": shard_entry.shard_state().as_json_str_name(), - "leader_id": shard_entry.leader_id.clone(), - "follower_id": shard_entry.follower_id.clone(), + "leader_id": shard_entry.leader_id, + "follower_id": shard_entry.follower_id, "publish_position_inclusive": shard_entry.publish_position_inclusive(), - }) - }); - per_index_shards_json - .entry(index_uid.clone()) - .or_default() - .extend(shards_json); + }); + per_index_and_leader_shards_json + .entry(source_uid.index_uid.clone()) + .or_default() + .entry(shard_entry.leader_id.clone()) + .or_default() + .push(shard_json); + } } json!({ "physical_indexing_plan": physical_indexing_plan, - "shard_table": per_index_shards_json, + "shard_table": per_index_and_leader_shards_json, }) } @@ -422,7 +441,21 @@ impl Handler for ControlPlane { shard_positions_update: ShardPositionsUpdate, ctx: &ActorContext, ) -> Result<(), ActorExitStatus> { - debug!(shard_positions_update=?shard_positions_update, "shard positions update"); + if enabled!(Level::DEBUG) { + let pretty_positions: Vec = shard_positions_update + .updated_shard_positions + .iter() + .map(|(shard_id, position)| format!("{shard_id}:{}", position.pretty_display())) + .sorted() + .collect(); + + debug!( + index_uid=%shard_positions_update.source_uid.index_uid, + source_id=%shard_positions_update.source_uid.source_id, + positions=%pretty_positions.as_slice().pretty_display(), + "received shard positions update" + ); + } let Some(shard_entries) = self .model .get_shards_for_source_mut(&shard_positions_update.source_uid) @@ -438,7 +471,13 @@ impl Handler for ControlPlane { Some(shard.publish_position_inclusive().max(position.clone())); if position.is_eof() { // identify shards that have reached EOF but have not yet been removed. - info!(shard_id=%shard_id, position=?position, "received eof shard via gossip"); + info!( + index_uid=%shard_positions_update.source_uid.index_uid, + source_id=%shard_positions_update.source_uid.source_id, + %shard_id, + position=%position.pretty_display(), + "received shard eof via gossip" + ); shard_ids_to_close.push(shard_id); } } @@ -534,6 +573,8 @@ impl DeferableReplyHandler for ControlPlane { reply: impl FnOnce(Self::Reply) + Send + Sync + 'static, ctx: &ActorContext, ) -> Result<(), ActorExitStatus> { + debug!("creating index"); + let response = match ctx .protect_future(self.metastore.create_index(request)) .await @@ -551,6 +592,8 @@ impl DeferableReplyHandler for ControlPlane { return Err(ActorExitStatus::from(anyhow::anyhow!(serde_error))); } }; + let index_uid = index_metadata.index_uid.clone(); + // Now, create index can also add sources to support creating indexes automatically from // index and source config templates. let should_rebuild_plan = !index_metadata.sources.is_empty(); @@ -565,6 +608,7 @@ impl DeferableReplyHandler for ControlPlane { } else { reply(Ok(response)); } + info!(%index_uid, "created index"); Ok(()) } } @@ -630,8 +674,6 @@ impl Handler for ControlPlane { { return convert_metastore_error(metastore_error); }; - info!(%index_uid, "deleted index"); - let ingester_needing_resync: BTreeSet = self .model .list_shards_for_index(&index_uid) @@ -648,6 +690,7 @@ impl Handler for ControlPlane { // the metastore. We should update the state of the control plane. let _rebuild_plan_waiter = self.rebuild_plan_debounced(ctx); + info!(%index_uid, "deleted index"); let response = EmptyResponse {}; Ok(Ok(response)) } @@ -669,6 +712,7 @@ impl Handler for ControlPlane { match serde_utils::from_json_str(&request.source_config_json) { Ok(source_config) => source_config, Err(error) => { + error!(%error, "failed to deserialize source config"); return Ok(Err(ControlPlaneError::from(error))); } }; @@ -707,6 +751,7 @@ impl Handler for ControlPlane { match serde_utils::from_json_str(&request.source_config_json) { Ok(source_config) => source_config, Err(error) => { + error!(%error, "failed to deserialize source config"); return Ok(Err(ControlPlaneError::from(error))); } }; @@ -723,12 +768,11 @@ impl Handler for ControlPlane { .update_source(&index_uid, source_config) .context("failed to add source")?; - info!(%index_uid, source_id, "updated source"); - // TODO: Refine the event. Notify index will have the effect to reload the entire state from // the metastore. We should update the state of the control plane. let _rebuild_plan_waiter = self.rebuild_plan_debounced(ctx); + info!(%index_uid, source_id, "updated source"); let response = EmptyResponse {}; Ok(Ok(response)) } @@ -756,7 +800,6 @@ impl Handler for ControlPlane { { return Ok(Err(ControlPlaneError::from(error))); }; - info!(%index_uid, source_id, enabled=enable, "toggled source"); let mutation_occurred = self .model @@ -766,7 +809,9 @@ impl Handler for ControlPlane { if mutation_occurred { let _rebuild_plan_waiter = self.rebuild_plan_debounced(ctx); } - Ok(Ok(EmptyResponse {})) + info!(%index_uid, source_id, enabled=enable, "toggled source"); + let response = EmptyResponse {}; + Ok(Ok(response)) } } @@ -783,6 +828,7 @@ impl Handler for ControlPlane { ) -> Result, ActorExitStatus> { let index_uid: IndexUid = request.index_uid().clone(); let source_id = request.source_id.clone(); + debug!(%index_uid, source_id, "deleting source"); let source_uid = SourceUid { index_uid: index_uid.clone(), @@ -815,10 +861,14 @@ impl Handler for ControlPlane { .sync_with_ingesters(&ingesters_needing_resync, &self.model); self.model.delete_source(&source_uid); - let _rebuild_plan_waiter = self.rebuild_plan_debounced(ctx); - let response = EmptyResponse {}; + info!( + index_uid=%source_uid.index_uid, + source_id=%source_uid.source_id, + "deleted source" + ); + let response = EmptyResponse {}; Ok(Ok(response)) } } @@ -851,7 +901,8 @@ impl Handler for ControlPlane { return convert_metastore_error(metastore_error); }; // Return ok regardless of whether the call was successful or debounced - Ok(Ok(EmptyResponse {})) + let response = EmptyResponse {}; + Ok(Ok(response)) } } @@ -956,7 +1007,7 @@ impl EventSubscriber for ControlPlaneEventSubscriber { .send_message(local_shards_update) .await { - error!(error=%error, "failed to forward local shards update to control plane"); + error!(%error, "failed to forward local shards update to control plane"); } } } @@ -969,7 +1020,7 @@ impl EventSubscriber for ControlPlaneEventSubscriber { .send_message(shard_positions_update) .await { - error!(error=%error, "failed to forward shard positions update to control plane"); + error!(%error, "failed to forward shard positions update to control plane"); } } } @@ -1058,10 +1109,9 @@ impl Handler for ControlPlane { message: RebalanceShardsCallback, _ctx: &ActorContext, ) -> Result { - info!( - "closing {} shards after rebalance", - message.closed_shards.len() - ); + let num_closed_shards = message.closed_shards.len(); + debug!("closing {num_closed_shards} shards after rebalance"); + for closed_shard in message.closed_shards { let shard_id = closed_shard.shard_id().clone(); let source_uid = SourceUid { @@ -1097,14 +1147,14 @@ async fn watcher_indexers( if node.enabled_services().contains(&QuickwitService::Indexer) && let Err(error) = mailbox.send_message(IndexerJoined(node)).await { - error!(error=%error, "failed to forward `IndexerJoined` event to control plane"); + error!(%error, "failed to forward `IndexerJoined` event to control plane"); } } ClusterChange::Remove(node) => { if node.enabled_services().contains(&QuickwitService::Indexer) && let Err(error) = mailbox.send_message(IndexerLeft(node)).await { - error!(error=%error, "failed to forward `IndexerLeft` event to control plane"); + error!(%error, "failed to forward `IndexerLeft` event to control plane"); } } ClusterChange::Update(_) => { @@ -1755,14 +1805,13 @@ mod tests { #[tokio::test] async fn test_delete_shard_on_eof() { - quickwit_common::setup_logging_for_tests(); let universe = Universe::with_accelerated_time(); - let node_id = NodeId::new("control-plane-node".to_string()); + let node_id = NodeId::new("test-control-plane".to_string()); let indexer_pool = IndexerPool::default(); let (client_mailbox, client_inbox) = universe.create_test_mailbox(); let client = IndexingServiceClient::from_mailbox::(client_mailbox); let indexer_node_info = IndexerNodeInfo { - node_id: NodeId::new("indexer-node-1".to_string()), + node_id: NodeId::new("test-indexer".to_string()), generation_id: 0, client, indexing_tasks: Vec::new(), @@ -1808,7 +1857,7 @@ mod tests { index_uid: Some(index_0.index_uid.clone()), source_id: INGEST_V2_SOURCE_ID.to_string(), shard_id: Some(ShardId::from(17)), - leader_id: "test_node".to_string(), + leader_id: "test-ingester".to_string(), publish_position_inclusive: Some(Position::Beginning), ..Default::default() }; @@ -1861,14 +1910,14 @@ mod tests { .unwrap(); let indexing_tasks = last_applied_physical_plan .indexing_tasks_per_indexer() - .get("indexer-node-1") + .get("test-indexer") .unwrap(); assert_eq!(indexing_tasks.len(), 1); assert_eq!(indexing_tasks[0].shard_ids, [ShardId::from(17)]); let control_plane_debug_info = control_plane_mailbox.ask(GetDebugInfo).await.unwrap(); - let shard = - &control_plane_debug_info["shard_table"]["test-index-0:00000000000000000000000000"][0]; + let shard = &control_plane_debug_info["shard_table"] + ["test-index-0:00000000000000000000000000"]["test-ingester"][0]; assert_eq!(shard["shard_id"], "00000000000000000017"); assert_eq!(shard["publish_position_inclusive"], "00000000000000001000"); @@ -1892,7 +1941,7 @@ mod tests { .unwrap(); let indexing_tasks = last_applied_physical_plan .indexing_tasks_per_indexer() - .get("indexer-node-1") + .get("test-indexer") .unwrap(); assert!(indexing_tasks.is_empty()); @@ -1909,12 +1958,12 @@ mod tests { #[tokio::test] async fn test_fill_shard_table_position_from_metastore_on_startup() { let universe = Universe::with_accelerated_time(); - let node_id = NodeId::new("control-plane-node".to_string()); + let node_id = NodeId::new("test-control-plane".to_string()); let indexer_pool = IndexerPool::default(); let (client_mailbox, _client_inbox) = universe.create_test_mailbox(); let client = IndexingServiceClient::from_mailbox::(client_mailbox); let indexer_node_info = IndexerNodeInfo { - node_id: NodeId::new("indexer-node-1".to_string()), + node_id: NodeId::new("test-indexer".to_string()), generation_id: 0, client, indexing_tasks: Vec::new(), @@ -1943,7 +1992,7 @@ mod tests { index_uid: Some(index_metadata.index_uid.clone()), source_id: INGEST_V2_SOURCE_ID.to_string(), shard_id: Some(ShardId::from(17)), - leader_id: "test_node".to_string(), + leader_id: "test-ingester".to_string(), publish_position_inclusive: Some(Position::Offset(1234u64.into())), ..Default::default() }; @@ -1975,8 +2024,8 @@ mod tests { MetastoreServiceClient::from_mock(mock_metastore), ); let control_plane_debug_info = control_plane_mailbox.ask(GetDebugInfo).await.unwrap(); - let shard = - &control_plane_debug_info["shard_table"]["test-index:00000000000000000000000000"][0]; + let shard = &control_plane_debug_info["shard_table"] + ["test-index:00000000000000000000000000"]["test-ingester"][0]; assert_eq!(shard["shard_id"], "00000000000000000017"); assert_eq!(shard["publish_position_inclusive"], "00000000000000001234"); @@ -1987,12 +2036,12 @@ mod tests { async fn test_delete_non_existing_shard() { quickwit_common::setup_logging_for_tests(); let universe = Universe::default(); - let node_id = NodeId::new("control-plane-node".to_string()); + let node_id = NodeId::new("test-control-plane".to_string()); let indexer_pool = IndexerPool::default(); let (client_mailbox, _client_inbox) = universe.create_test_mailbox(); let client = IndexingServiceClient::from_mailbox::(client_mailbox); let indexer_node_info = IndexerNodeInfo { - node_id: NodeId::new("indexer-node-1".to_string()), + node_id: NodeId::new("test-indexer".to_string()), generation_id: 0, client, indexing_tasks: Vec::new(), @@ -2080,7 +2129,7 @@ mod tests { async fn test_delete_index() { quickwit_common::setup_logging_for_tests(); let universe = Universe::default(); - let node_id = NodeId::new("control-plane-node".to_string()); + let node_id = NodeId::new("test-control-plane".to_string()); let indexer_pool = IndexerPool::default(); let ingester_pool = IngesterPool::default(); @@ -2194,7 +2243,7 @@ mod tests { async fn test_delete_source() { quickwit_common::setup_logging_for_tests(); let universe = Universe::default(); - let node_id = NodeId::new("control-plane-node".to_string()); + let node_id = NodeId::new("test-control-plane".to_string()); let indexer_pool = IndexerPool::default(); let ingester_pool = IngesterPool::default(); @@ -2610,8 +2659,8 @@ mod tests { control_plane_mailbox.ask(callback).await.unwrap(); let control_plane_debug_info = control_plane_mailbox.ask(GetDebugInfo).await.unwrap(); - let shard = - &control_plane_debug_info["shard_table"]["test-index:00000000000000000000000000"][0]; + let shard = &control_plane_debug_info["shard_table"] + ["test-index:00000000000000000000000000"]["test-ingester"][0]; assert_eq!(shard["shard_id"], "00000000000000000000"); assert_eq!(shard["shard_state"], "closed"); @@ -2749,8 +2798,8 @@ mod tests { control_plane_debug_info["physical_indexing_plan"][0]["node_id"], "test-ingester" ); - let shard = - &control_plane_debug_info["shard_table"]["test-index:00000000000000000000000000"][0]; + let shard = &control_plane_debug_info["shard_table"] + ["test-index:00000000000000000000000000"]["test-ingester"][0]; assert_eq!(shard["index_uid"], "test-index:00000000000000000000000000"); assert_eq!(shard["source_id"], INGEST_V2_SOURCE_ID); assert_eq!(shard["shard_id"], "00000000000000000000"); @@ -2759,7 +2808,7 @@ mod tests { assert_eq!(shard["follower_id"], JsonValue::Null); assert_eq!( shard["publish_position_inclusive"], - Position::Beginning.to_string() + json!(Position::Beginning) ); universe.assert_quit().await; diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs index 24992209b9c..8c42dea40fc 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs @@ -392,7 +392,7 @@ impl IndexingScheduler { notify_on_drop: Option>, ) { debug!(new_physical_plan=?new_physical_plan, "apply physical indexing plan"); - crate::metrics::CONTROL_PLANE_METRICS.apply_total.inc(); + crate::metrics::CONTROL_PLANE_METRICS.apply_plan_total.inc(); for (node_id, indexing_tasks) in new_physical_plan.indexing_tasks_per_indexer() { // We don't want to block on a slow indexer so we apply this change asynchronously // TODO not blocking is cool, but we need to make sure there is not accumulation @@ -413,7 +413,7 @@ impl IndexingScheduler { .await { warn!( - error=%error, + %error, node_id=%indexer.node_id, generation_id=indexer.generation_id, "failed to apply indexing plan to indexer" diff --git a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs index 4296c57311b..123efbe60b3 100644 --- a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs +++ b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs @@ -85,7 +85,7 @@ fn fire_and_forget( ) { tokio::spawn(async move { if let Err(_timeout_elapsed) = tokio::time::timeout(FIRE_AND_FORGET_TIMEOUT, fut).await { - error!(operation=%operation, "timeout elapsed"); + error!(%operation, "timeout elapsed"); } }); } @@ -1019,7 +1019,6 @@ impl IngestController { debug!("no ingester available"); return Vec::new(); } - let mut per_leader_open_shards: HashMap<&str, Vec<&ShardEntry>> = HashMap::new(); for shard in model.all_shards() { @@ -1031,11 +1030,11 @@ impl IngestController { .push(shard); } } - // We tolerate an ingester with 10% more shards than the average. // Let's first identify the list of shards we want to "move". let num_open_shards_per_leader_threshold = (num_open_shards * 11).div_ceil(10 * num_ingesters); + let mut shards_to_move: Vec = Vec::new(); let mut rng = thread_rng(); @@ -1051,7 +1050,19 @@ impl IngestController { ); } } + let num_shards_to_rebalance = shards_to_move.len(); + if num_shards_to_rebalance == 0 { + info!("no shards to rebalance"); + } else { + info!( + num_open_shards, + num_available_ingesters = num_ingesters, + rebalance_threshold = num_open_shards_per_leader_threshold, + num_shards_to_rebalance, + "rebalancing shards" + ); + } shards_to_move } @@ -1068,40 +1079,60 @@ impl IngestController { progress: &Progress, ) -> MetastoreResult>> { let Ok(rebalance_guard) = self.rebalance_lock.clone().try_lock_owned() else { + info!("skipping rebalance: another rebalance is already in progress"); return Ok(None); }; self.stats.num_rebalance_shards_ops += 1; let shards_to_move: Vec = self.rebalance_compute_shards_to_move(model); + + crate::metrics::CONTROL_PLANE_METRICS + .rebalance_shards + .set(shards_to_move.len() as i64); + if shards_to_move.is_empty() { return Ok(None); } + let mut per_source_num_shards_to_open: HashMap = HashMap::new(); - let num_shards_to_move = shards_to_move.len(); - info!("rebalancing {} shards", num_shards_to_move); - - let mut new_shards_source_uids: HashMap = HashMap::new(); for shard in &shards_to_move { - *new_shards_source_uids + *per_source_num_shards_to_open .entry(shard.source_uid()) .or_default() += 1; } - let mut successfully_source_uids: HashMap = self - .try_open_shards(new_shards_source_uids, model, &Default::default(), progress) - .await?; + let mut per_source_num_opened_shards: HashMap = self + .try_open_shards( + per_source_num_shards_to_open, + model, + &Default::default(), + progress, + ) + .await + .inspect_err(|error| { + error!(%error, "failed to open shards during rebalance"); + crate::metrics::CONTROL_PLANE_METRICS + .rebalance_shards + .set(0); + })?; + + let num_opened_shards: usize = per_source_num_opened_shards.values().sum(); + + crate::metrics::CONTROL_PLANE_METRICS + .rebalance_shards + .set(num_opened_shards as i64); - for source_uid in successfully_source_uids.keys() { + for source_uid in per_source_num_opened_shards.keys() { // We temporarily disable the ability the scale down the number of shards for // the source to avoid closing the shards we just opened. model.drain_scaling_permits(source_uid, ScalingMode::Down); } // Let's close one of the shard to move for every successfully newly opened shards. - let mut shards_to_close = Vec::new(); + let mut shards_to_close = Vec::with_capacity(shards_to_move.len()); for shard in shards_to_move { let source_uid = shard.source_uid(); - let Some(num_open_shards) = successfully_source_uids.get_mut(&source_uid) else { + let Some(num_open_shards) = per_source_num_opened_shards.get_mut(&source_uid) else { continue; }; if *num_open_shards == 0 { diff --git a/quickwit/quickwit-control-plane/src/ingest/scaling_arbiter.rs b/quickwit/quickwit-control-plane/src/ingest/scaling_arbiter.rs index 3cd6911fbaa..7294b9f9176 100644 --- a/quickwit/quickwit-control-plane/src/ingest/scaling_arbiter.rs +++ b/quickwit/quickwit-control-plane/src/ingest/scaling_arbiter.rs @@ -129,6 +129,7 @@ mod tests { scaling_arbiter.should_scale( ShardStats { num_open_shards: 0, + num_closed_shards: 0, avg_short_term_ingestion_rate: 0.0, avg_long_term_ingestion_rate: 0.0, }, @@ -140,6 +141,7 @@ mod tests { scaling_arbiter.should_scale( ShardStats { num_open_shards: 1, + num_closed_shards: 0, avg_short_term_ingestion_rate: 5.0, avg_long_term_ingestion_rate: 6.0, }, @@ -151,6 +153,7 @@ mod tests { scaling_arbiter.should_scale( ShardStats { num_open_shards: 1, + num_closed_shards: 0, avg_short_term_ingestion_rate: 8.1, avg_long_term_ingestion_rate: 8.1, }, @@ -162,6 +165,7 @@ mod tests { scaling_arbiter.should_scale( ShardStats { num_open_shards: 2, + num_closed_shards: 0, avg_short_term_ingestion_rate: 8.1, avg_long_term_ingestion_rate: 8.1, }, @@ -173,6 +177,7 @@ mod tests { scaling_arbiter.should_scale( ShardStats { num_open_shards: 2, + num_closed_shards: 0, avg_short_term_ingestion_rate: 3.0, avg_long_term_ingestion_rate: 1.5, }, @@ -184,6 +189,7 @@ mod tests { scaling_arbiter.should_scale( ShardStats { num_open_shards: 1, + num_closed_shards: 0, avg_short_term_ingestion_rate: 3.0, avg_long_term_ingestion_rate: 1.5, }, @@ -195,6 +201,7 @@ mod tests { scaling_arbiter.should_scale( ShardStats { num_open_shards: 1, + num_closed_shards: 0, avg_short_term_ingestion_rate: 8.0, avg_long_term_ingestion_rate: 3.0, }, @@ -213,6 +220,7 @@ mod tests { scaling_arbiter.should_scale( ShardStats { num_open_shards: 0, + num_closed_shards: 0, avg_short_term_ingestion_rate: 0.0, avg_long_term_ingestion_rate: 0.0, }, @@ -224,6 +232,7 @@ mod tests { scaling_arbiter.should_scale( ShardStats { num_open_shards: 2, + num_closed_shards: 0, avg_short_term_ingestion_rate: 5.0, avg_long_term_ingestion_rate: 6.0, }, @@ -235,6 +244,7 @@ mod tests { scaling_arbiter.should_scale( ShardStats { num_open_shards: 1, + num_closed_shards: 0, avg_short_term_ingestion_rate: 8.1, avg_long_term_ingestion_rate: 8.1, }, @@ -246,6 +256,7 @@ mod tests { scaling_arbiter.should_scale( ShardStats { num_open_shards: 2, + num_closed_shards: 0, avg_short_term_ingestion_rate: 8.1, avg_long_term_ingestion_rate: 8.1, }, @@ -257,6 +268,7 @@ mod tests { scaling_arbiter.should_scale( ShardStats { num_open_shards: 2, + num_closed_shards: 0, avg_short_term_ingestion_rate: 3.0, avg_long_term_ingestion_rate: 1.5, }, @@ -268,6 +280,7 @@ mod tests { scaling_arbiter.should_scale( ShardStats { num_open_shards: 1, + num_closed_shards: 0, avg_short_term_ingestion_rate: 3.0, avg_long_term_ingestion_rate: 1.5, }, @@ -279,6 +292,7 @@ mod tests { scaling_arbiter.should_scale( ShardStats { num_open_shards: 1, + num_closed_shards: 0, avg_short_term_ingestion_rate: 8.0, avg_long_term_ingestion_rate: 3.1, }, @@ -291,6 +305,7 @@ mod tests { scaling_arbiter.should_scale( ShardStats { num_open_shards: 2, + num_closed_shards: 0, avg_short_term_ingestion_rate: 8.1, avg_long_term_ingestion_rate: 5., }, @@ -308,6 +323,7 @@ mod tests { let shard_stats = ShardStats { num_open_shards: 0, + num_closed_shards: 0, avg_short_term_ingestion_rate: 0., avg_long_term_ingestion_rate: 0., }; @@ -322,6 +338,7 @@ mod tests { let shard_stats = ShardStats { num_open_shards: 1, + num_closed_shards: 0, avg_short_term_ingestion_rate: 5.0, avg_long_term_ingestion_rate: 6.1, }; @@ -336,6 +353,7 @@ mod tests { let shard_stats = ShardStats { num_open_shards: 2, + num_closed_shards: 0, avg_short_term_ingestion_rate: 5.0, avg_long_term_ingestion_rate: 1.1, }; @@ -350,6 +368,7 @@ mod tests { let shard_stats = ShardStats { num_open_shards: 2, + num_closed_shards: 0, avg_short_term_ingestion_rate: 5.0, avg_long_term_ingestion_rate: 6.1, }; @@ -364,6 +383,7 @@ mod tests { let shard_stats = ShardStats { num_open_shards: 5, + num_closed_shards: 0, avg_short_term_ingestion_rate: 5.0, avg_long_term_ingestion_rate: 1.1, }; @@ -384,6 +404,7 @@ mod tests { let shard_stats = ShardStats { num_open_shards: 0, + num_closed_shards: 0, avg_short_term_ingestion_rate: 0.0, avg_long_term_ingestion_rate: 0.0, }; @@ -393,6 +414,7 @@ mod tests { let shard_stats = ShardStats { num_open_shards: 1, + num_closed_shards: 0, avg_short_term_ingestion_rate: 0.0, avg_long_term_ingestion_rate: 0.0, }; @@ -408,6 +430,7 @@ mod tests { let shard_stats = ShardStats { num_open_shards: 1, + num_closed_shards: 0, avg_short_term_ingestion_rate: 5.0, avg_long_term_ingestion_rate: 1.0, }; diff --git a/quickwit/quickwit-control-plane/src/metrics.rs b/quickwit/quickwit-control-plane/src/metrics.rs index 119ea4af0b6..5e534c4f176 100644 --- a/quickwit/quickwit-control-plane/src/metrics.rs +++ b/quickwit/quickwit-control-plane/src/metrics.rs @@ -24,13 +24,22 @@ pub struct ShardLocalityMetrics { } pub struct ControlPlaneMetrics { + // Indexes and shards tracked by the control plane. pub indexes_total: IntGauge, + pub open_shards: IntGaugeVec<1>, + pub closed_shards: IntGaugeVec<1>, + + // Operations performed by the control plane. + pub apply_plan_total: IntCounter, + pub rebalance_shards: IntGauge, pub restart_total: IntCounter, pub schedule_total: IntCounter, - pub apply_total: IntCounter, + + // Metastore errors. pub metastore_error_aborted: IntCounter, pub metastore_error_maybe_executed: IntCounter, - pub open_shards_total: IntGaugeVec<1>, + + // Indexing plan metrics. pub local_shards: IntGauge, pub remote_shards: IntGauge, } @@ -46,20 +55,54 @@ impl ControlPlaneMetrics { impl Default for ControlPlaneMetrics { fn default() -> Self { - let shards = new_gauge_vec( + let open_shards = new_gauge_vec( "shards", + "Number of open and closed shards tracked by the ingest controller", + "control_plane", + &[("state", "open")], + ["index_id"], + ); + let closed_shards = new_gauge_vec( + "shards", + "Number of open and closed shards tracked by the ingest controller", + "control_plane", + &[("state", "closed")], + ["index_id"], + ); + let indexed_shards = new_gauge_vec( + "indexed_shards", "Number of (remote/local) shards in the indexing plan", "control_plane", &[], ["locality"], ); - let local_shards = shards.with_label_values(["local"]); - let remote_shards = shards.with_label_values(["remote"]); + let local_shards = indexed_shards.with_label_values(["local"]); + let remote_shards = indexed_shards.with_label_values(["remote"]); + ControlPlaneMetrics { - indexes_total: new_gauge("indexes_total", "Number of indexes.", "control_plane", &[]), + indexes_total: new_gauge( + "indexes_total", + "Number of indexes tracked by the control plane.", + "control_plane", + &[], + ), + open_shards, + closed_shards, + apply_plan_total: new_counter( + "apply_plan_total", + "Number of control plane `apply plan` operations.", + "control_plane", + &[], + ), + rebalance_shards: new_gauge( + "rebalance_shards", + "Number of shards rebalanced by the control plane.", + "control_plane", + &[], + ), restart_total: new_counter( "restart_total", - "Number of control plane restart.", + "Number of control plane restarts.", "control_plane", &[], ), @@ -69,12 +112,6 @@ impl Default for ControlPlaneMetrics { "control_plane", &[], ), - apply_total: new_counter( - "apply_total", - "Number of control plane `apply plan` operations.", - "control_plane", - &[], - ), metastore_error_aborted: new_counter( "metastore_error_aborted", "Number of aborted metastore transaction (= do not trigger a control plane \ @@ -89,13 +126,6 @@ impl Default for ControlPlaneMetrics { "control_plane", &[], ), - open_shards_total: new_gauge_vec( - "open_shards_total", - "Number of open shards per source.", - "control_plane", - &[], - ["index_id"], - ), local_shards, remote_shards, } diff --git a/quickwit/quickwit-control-plane/src/model/mod.rs b/quickwit/quickwit-control-plane/src/model/mod.rs index 688654b99ec..0d0431a67ce 100644 --- a/quickwit/quickwit-control-plane/src/model/mod.rs +++ b/quickwit/quickwit-control-plane/src/model/mod.rs @@ -216,7 +216,6 @@ impl ControlPlaneModel { }; let fp_changed = !index_model.index_config.equals_fingerprint(&index_config); index_model.index_config = index_config; - self.update_metrics(); Ok(fp_changed) } @@ -377,18 +376,34 @@ impl ControlPlaneModel { source_uid: &SourceUid, shard_infos: &ShardInfos, ) -> ShardStats { + debug!( + index_uid=%source_uid.index_uid, + source_id=%source_uid.source_id, + "updating shards" + ); self.shard_table.update_shards(source_uid, shard_infos) } /// Sets the state of the shards identified by their index UID, source ID, and shard IDs to /// `Closed`. pub fn close_shards(&mut self, source_uid: &SourceUid, shard_ids: &[ShardId]) -> Vec { + debug!( + index_uid=%source_uid.index_uid, + source_id=%source_uid.source_id, + shard_ids=%shard_ids.pretty_display(), + "closing shards" + ); self.shard_table.close_shards(source_uid, shard_ids) } /// Removes the shards identified by their index UID, source ID, and shard IDs. pub fn delete_shards(&mut self, source_uid: &SourceUid, shard_ids: &[ShardId]) { - info!(source_uid=%source_uid, shard_ids=?shard_ids, "removing shards from model"); + debug!( + index_uid=%source_uid.index_uid, + source_id=%source_uid.source_id, + shard_ids=%shard_ids.pretty_display(), + "deleting shards" + ); self.shard_table.delete_shards(source_uid, shard_ids); } diff --git a/quickwit/quickwit-control-plane/src/model/shard_table.rs b/quickwit/quickwit-control-plane/src/model/shard_table.rs index 8d2d0943785..0377d553ac6 100644 --- a/quickwit/quickwit-control-plane/src/model/shard_table.rs +++ b/quickwit/quickwit-control-plane/src/model/shard_table.rs @@ -18,6 +18,7 @@ use std::ops::{Deref, DerefMut}; use std::time::Duration; use fnv::{FnvHashMap, FnvHashSet}; +use quickwit_common::metrics::index_label; use quickwit_common::rate_limiter::{RateLimiter, RateLimiterSettings}; use quickwit_common::tower::ConstantRate; use quickwit_ingest::{RateMibPerSec, ShardInfo, ShardInfos}; @@ -102,11 +103,37 @@ impl ShardTableEntry { self.shard_entries.is_empty() } - fn num_open_shards(&self) -> usize { - self.shard_entries - .values() - .filter(|shard_entry| shard_entry.is_open()) - .count() + fn shards_stats(&self) -> ShardStats { + let mut num_open_shards = 0; + let mut num_closed_shards = 0; + let mut short_term_ingestion_rate_sum = 0; + let mut long_term_ingestion_rate_sum = 0; + + for shard_entry in self.shard_entries.values() { + if shard_entry.is_open() { + num_open_shards += 1; + short_term_ingestion_rate_sum += shard_entry.short_term_ingestion_rate.0 as usize; + long_term_ingestion_rate_sum += shard_entry.long_term_ingestion_rate.0 as usize; + } else if shard_entry.is_closed() { + num_closed_shards += 1; + } + } + let avg_short_term_ingestion_rate = if num_open_shards > 0 { + short_term_ingestion_rate_sum as f32 / num_open_shards as f32 + } else { + 0.0 + }; + let avg_long_term_ingestion_rate = if num_open_shards > 0 { + long_term_ingestion_rate_sum as f32 / num_open_shards as f32 + } else { + 0.0 + }; + ShardStats { + num_open_shards, + num_closed_shards, + avg_short_term_ingestion_rate, + avg_long_term_ingestion_rate, + } } } @@ -138,7 +165,7 @@ impl<'a> ShardLocations<'a> { // A table that keeps track of the existing shards for each index and source, // and for each ingester, the list of shards it is supposed to host. // -// (All mutable methods must maintain the two consistent) +// (All mutable methods must maintain these two invariants.) #[derive(Debug, Default)] pub(crate) struct ShardTable { table_entries: FnvHashMap, @@ -424,18 +451,45 @@ impl ShardTable { } pub fn update_shard_metrics_for_source_uid(&self, source_uid: &SourceUid) { - let num_open_shards: usize = - if let Some(shard_table_entry) = self.table_entries.get(source_uid) { - shard_table_entry.num_open_shards() - } else { - 0 - }; - let index_label = - quickwit_common::metrics::index_label(source_uid.index_uid.index_id.as_str()); + let Some(table_entry) = self.table_entries.get(source_uid) else { + return; + }; + let index_id = source_uid.index_uid.index_id.as_str(); + let index_label = index_label(index_id); + + // If `index_label(index_id)` returns `index_id`, then per-index metrics are enabled and we + // can update the metrics for this specific index. + if index_label == index_id { + let shard_stats = table_entry.shards_stats(); + crate::metrics::CONTROL_PLANE_METRICS + .open_shards + .with_label_values([index_label]) + .set(shard_stats.num_open_shards as i64); + crate::metrics::CONTROL_PLANE_METRICS + .closed_shards + .with_label_values([index_label]) + .set(shard_stats.num_closed_shards as i64); + return; + } + // Per-index metrics are disabled, so we update the metrics for all sources. + let mut num_open_shards = 0; + let mut num_closed_shards = 0; + + for shard_entry in self.all_shards() { + if shard_entry.is_open() { + num_open_shards += 1; + } else if shard_entry.is_closed() { + num_closed_shards += 1; + } + } crate::metrics::CONTROL_PLANE_METRICS - .open_shards_total + .open_shards .with_label_values([index_label]) .set(num_open_shards as i64); + crate::metrics::CONTROL_PLANE_METRICS + .closed_shards + .with_label_values([index_label]) + .set(num_closed_shards as i64); } pub fn update_shards( @@ -443,71 +497,51 @@ impl ShardTable { source_uid: &SourceUid, shard_infos: &ShardInfos, ) -> ShardStats { - let mut num_open_shards = 0; - let mut short_term_ingestion_rate_sum = RateMibPerSec::default(); - let mut long_term_ingestion_rate_sum = RateMibPerSec::default(); - - if let Some(table_entry) = self.table_entries.get_mut(source_uid) { - for shard_info in shard_infos { - let ShardInfo { - shard_id, - shard_state, - short_term_ingestion_rate, - long_term_ingestion_rate, - } = shard_info; - - if let Some(shard_entry) = table_entry.shard_entries.get_mut(shard_id) { - shard_entry.short_term_ingestion_rate = *short_term_ingestion_rate; - shard_entry.long_term_ingestion_rate = *long_term_ingestion_rate; - // `ShardInfos` are broadcasted via Chitchat and eventually consistent. As a - // result, we can only trust the `Closed` state, which is final. - if shard_state.is_closed() { - shard_entry.set_shard_state(ShardState::Closed); - } - } - } - for shard_entry in table_entry.shard_entries.values() { - if shard_entry.is_open() { - num_open_shards += 1; - short_term_ingestion_rate_sum += shard_entry.short_term_ingestion_rate; - long_term_ingestion_rate_sum += shard_entry.long_term_ingestion_rate; + let Some(table_entry) = self.table_entries.get_mut(source_uid) else { + return ShardStats::default(); + }; + for shard_info in shard_infos { + let ShardInfo { + shard_id, + shard_state, + short_term_ingestion_rate, + long_term_ingestion_rate, + } = shard_info; + + if let Some(shard_entry) = table_entry.shard_entries.get_mut(shard_id) { + shard_entry.short_term_ingestion_rate = *short_term_ingestion_rate; + shard_entry.long_term_ingestion_rate = *long_term_ingestion_rate; + // `ShardInfos` are broadcasted via Chitchat and eventually consistent. As a + // result, we can only trust the `Closed` state, which is final. + if shard_state.is_closed() { + shard_entry.set_shard_state(ShardState::Closed); } } } - let avg_short_term_ingestion_rate = if num_open_shards > 0 { - short_term_ingestion_rate_sum.0 as f32 / num_open_shards as f32 - } else { - 0.0 - }; - - let avg_long_term_ingestion_rate = if num_open_shards > 0 { - long_term_ingestion_rate_sum.0 as f32 / num_open_shards as f32 - } else { - 0.0 - }; - - ShardStats { - num_open_shards, - avg_short_term_ingestion_rate, - avg_long_term_ingestion_rate, - } + table_entry.shards_stats() } /// Sets the state of the shards identified by their index UID, source ID, and shard IDs to /// `Closed`. pub fn close_shards(&mut self, source_uid: &SourceUid, shard_ids: &[ShardId]) -> Vec { + let Some(table_entry) = self.table_entries.get_mut(source_uid) else { + return Vec::new(); + }; let mut closed_shard_ids = Vec::new(); - if let Some(table_entry) = self.table_entries.get_mut(source_uid) { - for shard_id in shard_ids { - if let Some(shard_entry) = table_entry.shard_entries.get_mut(shard_id) { - if !shard_entry.is_closed() { - shard_entry.set_shard_state(ShardState::Closed); - closed_shard_ids.push(shard_id.clone()); - } - } else { - info!(shard=%shard_id, "ignoring attempt to close shard: it is unknown (probably because it has been deleted)"); + for shard_id in shard_ids { + if let Some(shard_entry) = table_entry.shard_entries.get_mut(shard_id) { + if !shard_entry.is_closed() { + shard_entry.set_shard_state(ShardState::Closed); + closed_shard_ids.push(shard_id.clone()); } + } else { + info!( + index_id=%source_uid.index_uid.index_id, + source_id=%source_uid.source_id, + %shard_id, + "ignoring attempt to close shard: it is unknown (probably because it has been deleted)" + ); } } self.update_shard_metrics_for_source_uid(source_uid); @@ -516,14 +550,15 @@ impl ShardTable { /// Removes the shards identified by their index UID, source ID, and shard IDs. pub fn delete_shards(&mut self, source_uid: &SourceUid, shard_ids: &[ShardId]) { + let Some(table_entry) = self.table_entries.get_mut(source_uid) else { + return; + }; let mut shard_entries_to_remove: Vec = Vec::new(); - if let Some(table_entry) = self.table_entries.get_mut(source_uid) { - for shard_id in shard_ids { - if let Some(shard_entry) = table_entry.shard_entries.remove(shard_id) { - shard_entries_to_remove.push(shard_entry); - } else { - warn!(shard=%shard_id, "deleting a non-existing shard"); - } + for shard_id in shard_ids { + if let Some(shard_entry) = table_entry.shard_entries.remove(shard_id) { + shard_entries_to_remove.push(shard_entry); + } else { + warn!(shard=%shard_id, "deleting a non-existing shard"); } } for shard_entry in shard_entries_to_remove { @@ -533,8 +568,8 @@ impl ShardTable { &mut self.ingester_shards, ); } - self.update_shard_metrics_for_source_uid(source_uid); self.check_invariant(); + self.update_shard_metrics_for_source_uid(source_uid); } pub fn acquire_scaling_permits( @@ -574,9 +609,10 @@ impl ShardTable { #[derive(Clone, Copy, Default)] pub(crate) struct ShardStats { pub num_open_shards: usize, - /// Average short-term ingestion rate (MiB/s) per open shard + pub num_closed_shards: usize, + /// Average short-term ingestion rate (MiB/s) over all open shards. pub avg_short_term_ingestion_rate: f32, - /// Average long-term ingestion rate (MiB/s) per open shard + /// Average long-term ingestion rate (MiB/s) over all open shards. pub avg_long_term_ingestion_rate: f32, } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index af8f7ece9f8..a6a2c5f5474 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -13,7 +13,7 @@ // limitations under the License. use std::collections::hash_map::Entry; -use std::collections::{HashMap, HashSet}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::fmt; use std::path::Path; use std::sync::Arc; @@ -995,10 +995,10 @@ impl Ingester { let truncate_up_to_position_inclusive = subrequest.truncate_up_to_position_inclusive(); if truncate_up_to_position_inclusive.is_eof() { - state_guard.delete_shard(&queue_id).await; + state_guard.delete_shard(&queue_id, "indexer-rpc").await; } else { state_guard - .truncate_shard(&queue_id, truncate_up_to_position_inclusive) + .truncate_shard(&queue_id, truncate_up_to_position_inclusive, "indexer-rpc") .await; } } @@ -1054,12 +1054,12 @@ impl Ingester { Err(_) => { return json!({ "status": "initializing", - "shards": [], + "shards": {}, "mrecordlog": {}, }); } }; - let mut per_index_shards_json: HashMap> = HashMap::new(); + let mut per_index_shards_json: BTreeMap> = BTreeMap::new(); for (queue_id, shard) in &state_guard.shards { let Some((index_uid, source_id, shard_id)) = split_queue_id(queue_id) else { @@ -1179,7 +1179,9 @@ impl IngesterService for Ingester { .collect(); info!(queues=?remove_queue_ids, "removing queues"); for queue_id in remove_queue_ids { - state_guard.delete_shard(&queue_id).await; + state_guard + .delete_shard(&queue_id, "control-plane-retain-shards-rpc") + .await; } self.check_decommissioning_status(&mut state_guard); Ok(RetainShardsResponse {}) @@ -1226,9 +1228,11 @@ impl EventSubscriber for WeakIngesterState { for (shard_id, shard_position) in shard_positions_update.updated_shard_positions { let queue_id = queue_id(&index_uid, &source_id, &shard_id); if shard_position.is_eof() { - state_guard.delete_shard(&queue_id).await; + state_guard.delete_shard(&queue_id, "indexer-gossip").await; } else if !shard_position.is_beginning() { - state_guard.truncate_shard(&queue_id, shard_position).await; + state_guard + .truncate_shard(&queue_id, shard_position, "indexer-gossip") + .await; } } } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/state.rs b/quickwit/quickwit-ingest/src/ingest_v2/state.rs index 401e31e258e..ebeba9ccfaf 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/state.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/state.rs @@ -336,14 +336,14 @@ impl DerefMut for FullyLockedIngesterState<'_> { impl FullyLockedIngesterState<'_> { /// Deletes the shard identified by `queue_id` from the ingester state. It removes the /// mrecordlog queue first and then removes the associated in-memory shard and rate trackers. - pub async fn delete_shard(&mut self, queue_id: &QueueId) { + pub async fn delete_shard(&mut self, queue_id: &QueueId, initiator: &'static str) { match self.mrecordlog.delete_queue(queue_id).await { Ok(_) | Err(DeleteQueueError::MissingQueue(_)) => { self.rate_trackers.remove(queue_id); // Log only if the shard was actually removed. if let Some(shard) = self.shards.remove(queue_id) { - info!("deleted shard `{queue_id}`"); + info!("deleted shard `{queue_id}` initiated via `{initiator}`"); if let Some(doc_mapper) = shard.doc_mapper_opt { // At this point, we hold the lock so we can safely check the strong count. @@ -371,6 +371,7 @@ impl FullyLockedIngesterState<'_> { &mut self, queue_id: &QueueId, truncate_up_to_position_inclusive: Position, + initiator: &'static str, ) { // TODO: Replace with if-let-chains when stabilized. let Some(truncate_up_to_offset_inclusive) = truncate_up_to_position_inclusive.as_u64() @@ -389,7 +390,10 @@ impl FullyLockedIngesterState<'_> { .await { Ok(_) => { - info!("truncated shard `{queue_id}` at {truncate_up_to_position_inclusive}"); + info!( + "truncated shard `{queue_id}` at {truncate_up_to_position_inclusive} \ + initiated via `{initiator}`" + ); shard.truncation_position_inclusive = truncate_up_to_position_inclusive; } Err(TruncateError::MissingQueue(_)) => { @@ -410,12 +414,18 @@ impl FullyLockedIngesterState<'_> { info!("resetting shards"); for shard_ids in &advise_reset_shards_response.shards_to_delete { for queue_id in shard_ids.queue_ids() { - self.delete_shard(&queue_id).await; + self.delete_shard(&queue_id, "control-plane-reset-shards-rpc") + .await; } } for shard_id_positions in &advise_reset_shards_response.shards_to_truncate { for (queue_id, publish_position) in shard_id_positions.queue_id_positions() { - self.truncate_shard(&queue_id, publish_position).await; + self.truncate_shard( + &queue_id, + publish_position, + "control-plane-reset-shards-rpc", + ) + .await; } } } diff --git a/quickwit/quickwit-proto/src/types/position.rs b/quickwit/quickwit-proto/src/types/position.rs index ba2cafddcfc..983aeae2b65 100644 --- a/quickwit/quickwit-proto/src/types/position.rs +++ b/quickwit/quickwit-proto/src/types/position.rs @@ -18,6 +18,7 @@ use std::{fmt, mem}; use bytes::{Bytes, BytesMut}; use bytestring::ByteString; use prost::{self, DecodeError}; +use quickwit_common::pretty::PrettyDisplay; use serde::{Deserialize, Serialize}; const BEGINNING: &str = ""; @@ -109,6 +110,37 @@ impl Debug for Position { } } +// Caution: This is also the serialization format for chitchat and serde. Modify with care. +impl Display for Position { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + Self::Beginning => write!(f, "{BEGINNING}"), + Self::Offset(offset) => write!(f, "{offset}"), + Self::Eof(Some(offset)) => write!(f, "{EOF_PREFIX}{offset}"), + Self::Eof(None) => write!(f, "{EOF_PREFIX}"), + } + } +} + +struct PositionPrettyDisplay<'a>(&'a Position); + +impl fmt::Display for PositionPrettyDisplay<'_> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self.0 { + Position::Beginning => write!(f, "beginning"), + Position::Offset(offset) => write!(f, "{offset}"), + Position::Eof(Some(offset)) => write!(f, "eof({offset})"), + Position::Eof(None) => write!(f, "eof"), + } + } +} + +impl PrettyDisplay for Position { + fn pretty_display(&self) -> impl fmt::Display { + PositionPrettyDisplay(self) + } +} + impl Position { pub fn offset(offset: impl Into) -> Self { Self::Offset(offset.into()) @@ -178,17 +210,6 @@ impl Position { } } -impl Display for Position { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self { - Self::Beginning => write!(f, "{BEGINNING}"), - Self::Offset(offset) => write!(f, "{offset}"), - Self::Eof(Some(offset)) => write!(f, "{EOF_PREFIX}{offset}"), - Self::Eof(None) => write!(f, "{EOF_PREFIX}"), - } - } -} - impl From for Position { fn from(position: ByteString) -> Self { match &position[..] {