From e0cd29926eab0594f681f47731ba41c6a58fddf9 Mon Sep 17 00:00:00 2001 From: Jan Teske Date: Mon, 24 Nov 2025 14:08:20 +0100 Subject: [PATCH] Enforce non-zero replica scale and workers This commit changes `ReplicaAllocation` to enforce non-zero values for `scale` and `workers`. This makes Materialize refuse to start with replica sizes that specify zero for these values. Previously Materialize would start up but would panic as soon as one tried to create a replica with such an invalid size. --- misc/python/materialize/mzcompose/__init__.py | 2 +- .../src/catalog/builtin_table_updates.rs | 2 +- src/catalog/src/config.rs | 29 +++++++------- src/controller/src/clusters.rs | 38 ++++++++++++------- src/orchestrator-kubernetes/src/lib.rs | 21 +++++----- src/orchestrator-process/src/lib.rs | 15 ++++---- src/orchestrator/src/lib.rs | 3 +- src/ore/src/cast.rs | 7 ++++ 8 files changed, 70 insertions(+), 47 deletions(-) diff --git a/misc/python/materialize/mzcompose/__init__.py b/misc/python/materialize/mzcompose/__init__.py index 2a2362cf24431..b9738845acac1 100644 --- a/misc/python/materialize/mzcompose/__init__.py +++ b/misc/python/materialize/mzcompose/__init__.py @@ -720,7 +720,7 @@ def replica_size( "scale=1,workers=1,legacy": replica_size(1, 1, is_cc=False), "scale=1,workers=2,legacy": replica_size(1, 2, is_cc=False), # Intentionally not following the naming scheme - "free": replica_size(0, 0, disabled=True), + "free": replica_size(1, 1, disabled=True), } for i in range(0, 6): diff --git a/src/adapter/src/catalog/builtin_table_updates.rs b/src/adapter/src/catalog/builtin_table_updates.rs index 714b731d87d7c..b0f8571b9c783 100644 --- a/src/adapter/src/catalog/builtin_table_updates.rs +++ b/src/adapter/src/catalog/builtin_table_updates.rs @@ -2136,7 +2136,7 @@ impl CatalogState { let row = Row::pack_slice(&[ size.as_str().into(), - u64::from(alloc.scale).into(), + u64::cast_from(alloc.scale).into(), u64::cast_from(alloc.workers).into(), cpu_limit.as_nanocpus().into(), memory_bytes.into(), diff --git a/src/catalog/src/config.rs b/src/catalog/src/config.rs index 7c333da851159..63f5ad26e2ba6 100644 --- a/src/catalog/src/config.rs +++ b/src/catalog/src/config.rs @@ -8,6 +8,7 @@ // by the Apache License, Version 2.0. use std::collections::{BTreeMap, BTreeSet}; +use std::num::NonZero; use anyhow::bail; use bytesize::ByteSize; @@ -122,7 +123,7 @@ impl ClusterReplicaSizeMap { bail!("No memory limit found in cluster definition for {name}"); }; replica.credits_per_hour = Numeric::from( - (memory_limit.0 * replica.scale * u64::try_from(replica.workers)?).0, + (memory_limit.0 * replica.scale.get() * u64::cast_from(replica.workers)).0, ) / Numeric::from(1 * GIB); } } @@ -170,7 +171,7 @@ impl ClusterReplicaSizeMap { // } let mut inner = (0..=5) .flat_map(|i| { - let workers: u8 = 1 << i; + let workers = 1 << i; [ (format!("scale=1,workers={workers}"), None), (format!("scale=1,workers={workers},mem=4GiB"), Some(4)), @@ -185,8 +186,8 @@ impl ClusterReplicaSizeMap { memory_limit: memory_limit.map(|gib| MemoryLimit(ByteSize::gib(gib))), cpu_limit: None, disk_limit: None, - scale: 1, - workers: workers.into(), + scale: NonZero::new(1).expect("not zero"), + workers: NonZero::new(workers).expect("not zero"), credits_per_hour: 1.into(), cpu_exclusive: false, is_cc: false, @@ -207,8 +208,8 @@ impl ClusterReplicaSizeMap { memory_limit: None, cpu_limit: None, disk_limit: None, - scale, - workers: 1, + scale: NonZero::new(scale).expect("not zero"), + workers: NonZero::new(1).expect("not zero"), credits_per_hour: scale.into(), cpu_exclusive: false, is_cc: false, @@ -224,8 +225,8 @@ impl ClusterReplicaSizeMap { memory_limit: None, cpu_limit: None, disk_limit: None, - scale, - workers: scale.into(), + scale: NonZero::new(scale).expect("not zero"), + workers: NonZero::new(scale.into()).expect("not zero"), credits_per_hour: scale.into(), cpu_exclusive: false, is_cc: false, @@ -241,8 +242,8 @@ impl ClusterReplicaSizeMap { memory_limit: Some(MemoryLimit(ByteSize(u64::cast_from(scale) * (1 << 30)))), cpu_limit: None, disk_limit: None, - scale: 1, - workers: 8, + scale: NonZero::new(1).expect("not zero"), + workers: NonZero::new(8).expect("not zero"), credits_per_hour: 1.into(), cpu_exclusive: false, is_cc: false, @@ -259,8 +260,8 @@ impl ClusterReplicaSizeMap { memory_limit: None, cpu_limit: None, disk_limit: None, - scale: 2, - workers: 4, + scale: NonZero::new(2).expect("not zero"), + workers: NonZero::new(4).expect("not zero"), credits_per_hour: 2.into(), cpu_exclusive: false, is_cc: false, @@ -276,8 +277,8 @@ impl ClusterReplicaSizeMap { memory_limit: None, cpu_limit: None, disk_limit: None, - scale: 0, - workers: 0, + scale: NonZero::new(1).expect("not zero"), + workers: NonZero::new(1).expect("not zero"), credits_per_hour: 0.into(), cpu_exclusive: false, is_cc: true, diff --git a/src/controller/src/clusters.rs b/src/controller/src/clusters.rs index bc5abde955cdc..3a3af24ffa99d 100644 --- a/src/controller/src/clusters.rs +++ b/src/controller/src/clusters.rs @@ -11,6 +11,7 @@ use std::collections::{BTreeMap, BTreeSet}; use std::fmt; +use std::num::NonZero; use std::str::FromStr; use std::sync::Arc; use std::sync::LazyLock; @@ -34,9 +35,9 @@ use mz_orchestrator::{ CpuLimit, DiskLimit, LabelSelectionLogic, LabelSelector, MemoryLimit, Service, ServiceConfig, ServiceEvent, ServicePort, }; -use mz_ore::halt; -use mz_ore::instrument; +use mz_ore::cast::CastInto; use mz_ore::task::{self, AbortOnDropHandle}; +use mz_ore::{halt, instrument}; use mz_repr::GlobalId; use mz_repr::adt::numeric::Numeric; use regex::Regex; @@ -80,9 +81,9 @@ pub struct ReplicaAllocation { /// The disk limit for each process in the replica. pub disk_limit: Option, /// The number of processes in the replica. - pub scale: u16, + pub scale: NonZero, /// The number of worker threads in the replica. - pub workers: usize, + pub workers: NonZero, /// The number of credits per hour that the replica consumes. #[serde(deserialize_with = "mz_repr::adt::numeric::str_serde::deserialize")] pub credits_per_hour: Numeric, @@ -113,6 +114,7 @@ fn default_true() -> bool { #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux` fn test_replica_allocation_deserialization() { use bytesize::ByteSize; + use mz_ore::{assert_err, assert_ok}; let data = r#" { @@ -143,8 +145,8 @@ fn test_replica_allocation_deserialization() { cpu_exclusive: false, is_cc: true, swap_enabled: true, - scale: 16, - workers: 1, + scale: NonZero::new(16).unwrap(), + workers: NonZero::new(1).unwrap(), selectors: BTreeMap::from([ ("key1".to_string(), "value1".to_string()), ("key2".to_string(), "value2".to_string()) @@ -157,8 +159,8 @@ fn test_replica_allocation_deserialization() { "cpu_limit": 0, "memory_limit": "0GiB", "disk_limit": "0MiB", - "scale": 0, - "workers": 0, + "scale": 1, + "workers": 1, "credits_per_hour": "0", "cpu_exclusive": true, "disabled": true @@ -178,11 +180,19 @@ fn test_replica_allocation_deserialization() { cpu_exclusive: true, is_cc: true, swap_enabled: false, - scale: 0, - workers: 0, + scale: NonZero::new(1).unwrap(), + workers: NonZero::new(1).unwrap(), selectors: Default::default(), } ); + + // `scale` and `workers` must be non-zero. + let data = r#"{"scale": 0, "workers": 1, "credits_per_hour": "0"}"#; + assert_err!(serde_json::from_str::(data)); + let data = r#"{"scale": 1, "workers": 0, "credits_per_hour": "0"}"#; + assert_err!(serde_json::from_str::(data)); + let data = r#"{"scale": 1, "workers": 1, "credits_per_hour": "0"}"#; + assert_ok!(serde_json::from_str::(data)); } /// Configures the location of a cluster replica. @@ -202,7 +212,7 @@ impl ReplicaLocation { computectl_addrs, .. }) => computectl_addrs.len(), ReplicaLocation::Managed(ManagedReplicaLocation { allocation, .. }) => { - allocation.scale.into() + allocation.scale.cast_into() } } } @@ -229,7 +239,7 @@ impl ReplicaLocation { pub fn workers(&self) -> Option { match self { ReplicaLocation::Managed(ManagedReplicaLocation { allocation, .. }) => { - Some(allocation.workers * self.num_processes()) + Some(allocation.workers.get() * self.num_processes()) } ReplicaLocation::Unmanaged(_) => None, } @@ -661,12 +671,12 @@ where init_container_image: self.init_container_image.clone(), args: Box::new(move |assigned| { let storage_timely_config = TimelyConfig { - workers: location.allocation.workers, + workers: location.allocation.workers.get(), addresses: assigned.peer_addresses("storage"), ..storage_proto_timely_config }; let compute_timely_config = TimelyConfig { - workers: location.allocation.workers, + workers: location.allocation.workers.get(), addresses: assigned.peer_addresses("compute"), ..compute_proto_timely_config }; diff --git a/src/orchestrator-kubernetes/src/lib.rs b/src/orchestrator-kubernetes/src/lib.rs index 0ea7a282c73c5..c51c05745b0c7 100644 --- a/src/orchestrator-kubernetes/src/lib.rs +++ b/src/orchestrator-kubernetes/src/lib.rs @@ -9,6 +9,7 @@ use std::collections::BTreeMap; use std::future::Future; +use std::num::NonZero; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; use std::{env, fmt}; @@ -49,6 +50,7 @@ use mz_orchestrator::{ OfflineReason, Orchestrator, Service, ServiceAssignments, ServiceConfig, ServiceEvent, ServiceProcessMetrics, ServiceStatus, scheduling_config::*, }; +use mz_ore::cast::CastInto; use mz_ore::retry::Retry; use mz_ore::task::AbortOnDropHandle; use serde::Deserialize; @@ -216,7 +218,7 @@ impl Orchestrator for KubernetesOrchestrator { #[derive(Clone, Copy)] struct ServiceInfo { - scale: u16, + scale: NonZero, } struct NamespacedKubernetesOrchestrator { @@ -267,7 +269,7 @@ enum WorkerCommand { #[derive(Debug, Clone)] struct ServiceDescription { name: String, - scale: u16, + scale: NonZero, service: K8sService, stateful_set: StatefulSet, pod_template_hash: String, @@ -655,7 +657,7 @@ impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator { status: None, }; - let hosts = (0..scale) + let hosts = (0..scale.get()) .map(|i| { format!( "{name}-{i}.{name}.{}.svc.cluster.local", @@ -751,7 +753,7 @@ impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator { let topology_spread = if scheduling_config.topology_spread.enabled { let config = &scheduling_config.topology_spread; - if !config.ignore_non_singular_scale || scale <= 1 { + if !config.ignore_non_singular_scale || scale.get() == 1 { let label_selector_requirements = (if config.ignore_non_singular_scale { let mut replicas_selector_ignoring_scale = replicas_selector.clone(); @@ -1196,7 +1198,7 @@ impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator { ..Default::default() }, service_name: Some(name.clone()), - replicas: Some(scale.into()), + replicas: Some(scale.cast_into()), template: pod_template_spec, update_strategy: Some(StatefulSetUpdateStrategy { type_: Some("OnDelete".to_owned()), @@ -1418,7 +1420,7 @@ impl OrchestratorWorker { info: &ServiceInfo, ) -> Vec { if !self.collect_pod_metrics { - return (0..info.scale) + return (0..info.scale.get()) .map(|_| ServiceProcessMetrics::default()) .collect(); } @@ -1559,8 +1561,9 @@ impl OrchestratorWorker { Ok(usage) } - let ret = - futures::future::join_all((0..info.scale).map(|i| get_metrics(self, name, i.into()))); + let ret = futures::future::join_all( + (0..info.scale.cast_into()).map(|i| get_metrics(self, name, i)), + ); ret.await } @@ -1611,7 +1614,7 @@ impl OrchestratorWorker { // Our pod recreation policy is simple: If a pod's template hash changed, delete it, and // let the StatefulSet controller recreate it. Otherwise, patch the existing pod's // annotations to line up with the ones in the spec. - for pod_id in 0..desc.scale { + for pod_id in 0..desc.scale.get() { let pod_name = format!("{}-{pod_id}", desc.name); let pod = match self.pod_api.get(&pod_name).await { Ok(pod) => pod, diff --git a/src/orchestrator-process/src/lib.rs b/src/orchestrator-process/src/lib.rs index 9f7c8613c725a..345f99b60878b 100644 --- a/src/orchestrator-process/src/lib.rs +++ b/src/orchestrator-process/src/lib.rs @@ -14,6 +14,7 @@ use std::fmt::Debug; use std::fs::Permissions; use std::future::Future; use std::net::{IpAddr, SocketAddr, TcpListener as StdTcpListener}; +use std::num::NonZero; use std::os::unix::fs::PermissionsExt; use std::os::unix::process::ExitStatusExt; use std::path::{Path, PathBuf}; @@ -453,7 +454,7 @@ struct EnsureServiceConfig { /// An optional limit on the CPU that the service can use. pub cpu_limit: Option, /// The number of copies of this service to run. - pub scale: u16, + pub scale: NonZero, /// Arbitrary key–value pairs to attach to the service in the orchestrator /// backend. /// @@ -599,7 +600,7 @@ impl OrchestratorWorker { services.get(&id).map(|states| states.len()) }; match old_scale { - Some(old) if old == usize::from(scale) => return Ok(()), + Some(old) if old == usize::cast_from(scale) => return Ok(()), Some(_) => self.drop_service(&id).await?, None => (), } @@ -622,7 +623,7 @@ impl OrchestratorWorker { // Create the state for new processes. let mut process_states = vec![]; - for i in 0..scale.into() { + for i in 0..usize::cast_from(scale) { let listen_addrs = &peer_addrs[i]; // Fill out placeholders in the command wrapper for this process. @@ -1174,7 +1175,7 @@ impl From for ServiceStatus { } } -fn socket_path(run_dir: &Path, port: &str, process: usize) -> String { +fn socket_path(run_dir: &Path, port: &str, process: u16) -> String { let desired = run_dir .join(format!("{port}-{process}")) .to_string_lossy() @@ -1199,13 +1200,13 @@ struct AddressedTcpListener { #[derive(Debug, Clone)] struct ProcessService { run_dir: PathBuf, - scale: u16, + scale: NonZero, } impl Service for ProcessService { fn addresses(&self, port: &str) -> Vec { - (0..self.scale) - .map(|i| socket_path(&self.run_dir, port, i.into())) + (0..self.scale.get()) + .map(|i| socket_path(&self.run_dir, port, i)) .collect() } } diff --git a/src/orchestrator/src/lib.rs b/src/orchestrator/src/lib.rs index 2879e82b64c4a..d47dd83fdfec2 100644 --- a/src/orchestrator/src/lib.rs +++ b/src/orchestrator/src/lib.rs @@ -9,6 +9,7 @@ use std::collections::BTreeMap; use std::fmt; +use std::num::NonZero; use std::str::FromStr; use std::sync::Arc; @@ -211,7 +212,7 @@ pub struct ServiceConfig { /// An optional limit on the CPU that the service can use. pub cpu_limit: Option, /// The number of copies of this service to run. - pub scale: u16, + pub scale: NonZero, /// Arbitrary key–value pairs to attach to the service in the orchestrator /// backend. /// diff --git a/src/ore/src/cast.rs b/src/ore/src/cast.rs index 703da1003a5a5..4ae1226747e9b 100644 --- a/src/ore/src/cast.rs +++ b/src/ore/src/cast.rs @@ -68,6 +68,13 @@ macro_rules! cast_from { pub const fn [< $from _to_ $to >](from: $from) -> $to { from as $to } + + impl crate::cast::CastFrom> for $to { + #[allow(clippy::as_conversions)] + fn cast_from(from: std::num::NonZero<$from>) -> $to { + from.get() as $to + } + } } }; }