Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion misc/python/materialize/mzcompose/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,7 @@ def replica_size(
"scale=1,workers=1,legacy": replica_size(1, 1, is_cc=False),
"scale=1,workers=2,legacy": replica_size(1, 2, is_cc=False),
# Intentionally not following the naming scheme
"free": replica_size(0, 0, disabled=True),
"free": replica_size(1, 1, disabled=True),
}

for i in range(0, 6):
Expand Down
2 changes: 1 addition & 1 deletion src/adapter/src/catalog/builtin_table_updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2136,7 +2136,7 @@ impl CatalogState {

let row = Row::pack_slice(&[
size.as_str().into(),
u64::from(alloc.scale).into(),
u64::cast_from(alloc.scale).into(),
u64::cast_from(alloc.workers).into(),
cpu_limit.as_nanocpus().into(),
memory_bytes.into(),
Expand Down
29 changes: 15 additions & 14 deletions src/catalog/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// by the Apache License, Version 2.0.

use std::collections::{BTreeMap, BTreeSet};
use std::num::NonZero;

use anyhow::bail;
use bytesize::ByteSize;
Expand Down Expand Up @@ -122,7 +123,7 @@ impl ClusterReplicaSizeMap {
bail!("No memory limit found in cluster definition for {name}");
};
replica.credits_per_hour = Numeric::from(
(memory_limit.0 * replica.scale * u64::try_from(replica.workers)?).0,
(memory_limit.0 * replica.scale.get() * u64::cast_from(replica.workers)).0,
) / Numeric::from(1 * GIB);
}
}
Expand Down Expand Up @@ -170,7 +171,7 @@ impl ClusterReplicaSizeMap {
// }
let mut inner = (0..=5)
.flat_map(|i| {
let workers: u8 = 1 << i;
let workers = 1 << i;
[
(format!("scale=1,workers={workers}"), None),
(format!("scale=1,workers={workers},mem=4GiB"), Some(4)),
Expand All @@ -185,8 +186,8 @@ impl ClusterReplicaSizeMap {
memory_limit: memory_limit.map(|gib| MemoryLimit(ByteSize::gib(gib))),
cpu_limit: None,
disk_limit: None,
scale: 1,
workers: workers.into(),
scale: NonZero::new(1).expect("not zero"),
workers: NonZero::new(workers).expect("not zero"),
credits_per_hour: 1.into(),
cpu_exclusive: false,
is_cc: false,
Expand All @@ -207,8 +208,8 @@ impl ClusterReplicaSizeMap {
memory_limit: None,
cpu_limit: None,
disk_limit: None,
scale,
workers: 1,
scale: NonZero::new(scale).expect("not zero"),
workers: NonZero::new(1).expect("not zero"),
credits_per_hour: scale.into(),
cpu_exclusive: false,
is_cc: false,
Expand All @@ -224,8 +225,8 @@ impl ClusterReplicaSizeMap {
memory_limit: None,
cpu_limit: None,
disk_limit: None,
scale,
workers: scale.into(),
scale: NonZero::new(scale).expect("not zero"),
workers: NonZero::new(scale.into()).expect("not zero"),
credits_per_hour: scale.into(),
cpu_exclusive: false,
is_cc: false,
Expand All @@ -241,8 +242,8 @@ impl ClusterReplicaSizeMap {
memory_limit: Some(MemoryLimit(ByteSize(u64::cast_from(scale) * (1 << 30)))),
cpu_limit: None,
disk_limit: None,
scale: 1,
workers: 8,
scale: NonZero::new(1).expect("not zero"),
workers: NonZero::new(8).expect("not zero"),
credits_per_hour: 1.into(),
cpu_exclusive: false,
is_cc: false,
Expand All @@ -259,8 +260,8 @@ impl ClusterReplicaSizeMap {
memory_limit: None,
cpu_limit: None,
disk_limit: None,
scale: 2,
workers: 4,
scale: NonZero::new(2).expect("not zero"),
workers: NonZero::new(4).expect("not zero"),
credits_per_hour: 2.into(),
cpu_exclusive: false,
is_cc: false,
Expand All @@ -276,8 +277,8 @@ impl ClusterReplicaSizeMap {
memory_limit: None,
cpu_limit: None,
disk_limit: None,
scale: 0,
workers: 0,
scale: NonZero::new(1).expect("not zero"),
workers: NonZero::new(1).expect("not zero"),
credits_per_hour: 0.into(),
cpu_exclusive: false,
is_cc: true,
Expand Down
38 changes: 24 additions & 14 deletions src/controller/src/clusters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

use std::collections::{BTreeMap, BTreeSet};
use std::fmt;
use std::num::NonZero;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::LazyLock;
Expand All @@ -34,9 +35,9 @@ use mz_orchestrator::{
CpuLimit, DiskLimit, LabelSelectionLogic, LabelSelector, MemoryLimit, Service, ServiceConfig,
ServiceEvent, ServicePort,
};
use mz_ore::halt;
use mz_ore::instrument;
use mz_ore::cast::CastInto;
use mz_ore::task::{self, AbortOnDropHandle};
use mz_ore::{halt, instrument};
use mz_repr::GlobalId;
use mz_repr::adt::numeric::Numeric;
use regex::Regex;
Expand Down Expand Up @@ -80,9 +81,9 @@ pub struct ReplicaAllocation {
/// The disk limit for each process in the replica.
pub disk_limit: Option<DiskLimit>,
/// The number of processes in the replica.
pub scale: u16,
pub scale: NonZero<u16>,
/// The number of worker threads in the replica.
pub workers: usize,
pub workers: NonZero<usize>,
/// The number of credits per hour that the replica consumes.
#[serde(deserialize_with = "mz_repr::adt::numeric::str_serde::deserialize")]
pub credits_per_hour: Numeric,
Expand Down Expand Up @@ -113,6 +114,7 @@ fn default_true() -> bool {
#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
fn test_replica_allocation_deserialization() {
use bytesize::ByteSize;
use mz_ore::{assert_err, assert_ok};

let data = r#"
{
Expand Down Expand Up @@ -143,8 +145,8 @@ fn test_replica_allocation_deserialization() {
cpu_exclusive: false,
is_cc: true,
swap_enabled: true,
scale: 16,
workers: 1,
scale: NonZero::new(16).unwrap(),
workers: NonZero::new(1).unwrap(),
selectors: BTreeMap::from([
("key1".to_string(), "value1".to_string()),
("key2".to_string(), "value2".to_string())
Expand All @@ -157,8 +159,8 @@ fn test_replica_allocation_deserialization() {
"cpu_limit": 0,
"memory_limit": "0GiB",
"disk_limit": "0MiB",
"scale": 0,
"workers": 0,
"scale": 1,
"workers": 1,
"credits_per_hour": "0",
"cpu_exclusive": true,
"disabled": true
Expand All @@ -178,11 +180,19 @@ fn test_replica_allocation_deserialization() {
cpu_exclusive: true,
is_cc: true,
swap_enabled: false,
scale: 0,
workers: 0,
scale: NonZero::new(1).unwrap(),
workers: NonZero::new(1).unwrap(),
selectors: Default::default(),
}
);

// `scale` and `workers` must be non-zero.
let data = r#"{"scale": 0, "workers": 1, "credits_per_hour": "0"}"#;
assert_err!(serde_json::from_str::<ReplicaAllocation>(data));
let data = r#"{"scale": 1, "workers": 0, "credits_per_hour": "0"}"#;
assert_err!(serde_json::from_str::<ReplicaAllocation>(data));
let data = r#"{"scale": 1, "workers": 1, "credits_per_hour": "0"}"#;
assert_ok!(serde_json::from_str::<ReplicaAllocation>(data));
}

/// Configures the location of a cluster replica.
Expand All @@ -202,7 +212,7 @@ impl ReplicaLocation {
computectl_addrs, ..
}) => computectl_addrs.len(),
ReplicaLocation::Managed(ManagedReplicaLocation { allocation, .. }) => {
allocation.scale.into()
allocation.scale.cast_into()
}
}
}
Expand All @@ -229,7 +239,7 @@ impl ReplicaLocation {
pub fn workers(&self) -> Option<usize> {
match self {
ReplicaLocation::Managed(ManagedReplicaLocation { allocation, .. }) => {
Some(allocation.workers * self.num_processes())
Some(allocation.workers.get() * self.num_processes())
}
ReplicaLocation::Unmanaged(_) => None,
}
Expand Down Expand Up @@ -661,12 +671,12 @@ where
init_container_image: self.init_container_image.clone(),
args: Box::new(move |assigned| {
let storage_timely_config = TimelyConfig {
workers: location.allocation.workers,
workers: location.allocation.workers.get(),
addresses: assigned.peer_addresses("storage"),
..storage_proto_timely_config
};
let compute_timely_config = TimelyConfig {
workers: location.allocation.workers,
workers: location.allocation.workers.get(),
addresses: assigned.peer_addresses("compute"),
..compute_proto_timely_config
};
Expand Down
21 changes: 12 additions & 9 deletions src/orchestrator-kubernetes/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

use std::collections::BTreeMap;
use std::future::Future;
use std::num::NonZero;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use std::{env, fmt};
Expand Down Expand Up @@ -49,6 +50,7 @@ use mz_orchestrator::{
OfflineReason, Orchestrator, Service, ServiceAssignments, ServiceConfig, ServiceEvent,
ServiceProcessMetrics, ServiceStatus, scheduling_config::*,
};
use mz_ore::cast::CastInto;
use mz_ore::retry::Retry;
use mz_ore::task::AbortOnDropHandle;
use serde::Deserialize;
Expand Down Expand Up @@ -216,7 +218,7 @@ impl Orchestrator for KubernetesOrchestrator {

#[derive(Clone, Copy)]
struct ServiceInfo {
scale: u16,
scale: NonZero<u16>,
}

struct NamespacedKubernetesOrchestrator {
Expand Down Expand Up @@ -267,7 +269,7 @@ enum WorkerCommand {
#[derive(Debug, Clone)]
struct ServiceDescription {
name: String,
scale: u16,
scale: NonZero<u16>,
service: K8sService,
stateful_set: StatefulSet,
pod_template_hash: String,
Expand Down Expand Up @@ -655,7 +657,7 @@ impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator {
status: None,
};

let hosts = (0..scale)
let hosts = (0..scale.get())
.map(|i| {
format!(
"{name}-{i}.{name}.{}.svc.cluster.local",
Expand Down Expand Up @@ -751,7 +753,7 @@ impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator {
let topology_spread = if scheduling_config.topology_spread.enabled {
let config = &scheduling_config.topology_spread;

if !config.ignore_non_singular_scale || scale <= 1 {
if !config.ignore_non_singular_scale || scale.get() == 1 {
let label_selector_requirements = (if config.ignore_non_singular_scale {
let mut replicas_selector_ignoring_scale = replicas_selector.clone();

Expand Down Expand Up @@ -1196,7 +1198,7 @@ impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator {
..Default::default()
},
service_name: Some(name.clone()),
replicas: Some(scale.into()),
replicas: Some(scale.cast_into()),
template: pod_template_spec,
update_strategy: Some(StatefulSetUpdateStrategy {
type_: Some("OnDelete".to_owned()),
Expand Down Expand Up @@ -1418,7 +1420,7 @@ impl OrchestratorWorker {
info: &ServiceInfo,
) -> Vec<ServiceProcessMetrics> {
if !self.collect_pod_metrics {
return (0..info.scale)
return (0..info.scale.get())
.map(|_| ServiceProcessMetrics::default())
.collect();
}
Expand Down Expand Up @@ -1559,8 +1561,9 @@ impl OrchestratorWorker {
Ok(usage)
}

let ret =
futures::future::join_all((0..info.scale).map(|i| get_metrics(self, name, i.into())));
let ret = futures::future::join_all(
(0..info.scale.cast_into()).map(|i| get_metrics(self, name, i)),
);

ret.await
}
Expand Down Expand Up @@ -1611,7 +1614,7 @@ impl OrchestratorWorker {
// Our pod recreation policy is simple: If a pod's template hash changed, delete it, and
// let the StatefulSet controller recreate it. Otherwise, patch the existing pod's
// annotations to line up with the ones in the spec.
for pod_id in 0..desc.scale {
for pod_id in 0..desc.scale.get() {
let pod_name = format!("{}-{pod_id}", desc.name);
let pod = match self.pod_api.get(&pod_name).await {
Ok(pod) => pod,
Expand Down
15 changes: 8 additions & 7 deletions src/orchestrator-process/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use std::fmt::Debug;
use std::fs::Permissions;
use std::future::Future;
use std::net::{IpAddr, SocketAddr, TcpListener as StdTcpListener};
use std::num::NonZero;
use std::os::unix::fs::PermissionsExt;
use std::os::unix::process::ExitStatusExt;
use std::path::{Path, PathBuf};
Expand Down Expand Up @@ -453,7 +454,7 @@ struct EnsureServiceConfig {
/// An optional limit on the CPU that the service can use.
pub cpu_limit: Option<CpuLimit>,
/// The number of copies of this service to run.
pub scale: u16,
pub scale: NonZero<u16>,
/// Arbitrary key–value pairs to attach to the service in the orchestrator
/// backend.
///
Expand Down Expand Up @@ -599,7 +600,7 @@ impl OrchestratorWorker {
services.get(&id).map(|states| states.len())
};
match old_scale {
Some(old) if old == usize::from(scale) => return Ok(()),
Some(old) if old == usize::cast_from(scale) => return Ok(()),
Some(_) => self.drop_service(&id).await?,
None => (),
}
Expand All @@ -622,7 +623,7 @@ impl OrchestratorWorker {

// Create the state for new processes.
let mut process_states = vec![];
for i in 0..scale.into() {
for i in 0..usize::cast_from(scale) {
let listen_addrs = &peer_addrs[i];

// Fill out placeholders in the command wrapper for this process.
Expand Down Expand Up @@ -1174,7 +1175,7 @@ impl From<ProcessStatus> for ServiceStatus {
}
}

fn socket_path(run_dir: &Path, port: &str, process: usize) -> String {
fn socket_path(run_dir: &Path, port: &str, process: u16) -> String {
let desired = run_dir
.join(format!("{port}-{process}"))
.to_string_lossy()
Expand All @@ -1199,13 +1200,13 @@ struct AddressedTcpListener {
#[derive(Debug, Clone)]
struct ProcessService {
run_dir: PathBuf,
scale: u16,
scale: NonZero<u16>,
}

impl Service for ProcessService {
fn addresses(&self, port: &str) -> Vec<String> {
(0..self.scale)
.map(|i| socket_path(&self.run_dir, port, i.into()))
(0..self.scale.get())
.map(|i| socket_path(&self.run_dir, port, i))
.collect()
}
}
3 changes: 2 additions & 1 deletion src/orchestrator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

use std::collections::BTreeMap;
use std::fmt;
use std::num::NonZero;
use std::str::FromStr;
use std::sync::Arc;

Expand Down Expand Up @@ -211,7 +212,7 @@ pub struct ServiceConfig {
/// An optional limit on the CPU that the service can use.
pub cpu_limit: Option<CpuLimit>,
/// The number of copies of this service to run.
pub scale: u16,
pub scale: NonZero<u16>,
/// Arbitrary key–value pairs to attach to the service in the orchestrator
/// backend.
///
Expand Down
Loading