diff --git a/misc/python/materialize/cli/orchestratord.py b/misc/python/materialize/cli/orchestratord.py index 551702198c84c..f8fb08ae5d159 100644 --- a/misc/python/materialize/cli/orchestratord.py +++ b/misc/python/materialize/cli/orchestratord.py @@ -64,6 +64,7 @@ def main(): "--environment-name", default="12345678-1234-1234-1234-123456789012", ) + parser_environment.add_argument("--environment-id", required=False) parser_environment.add_argument("--postgres-url", default=DEFAULT_POSTGRES) parser_environment.add_argument("--s3-bucket", default=DEFAULT_MINIO) parser_environment.add_argument("--license-key-file") @@ -206,7 +207,10 @@ def environment(args: argparse.Namespace): cluster=args.kind_cluster_name, ) - environment_id = str(uuid4()) + if args.environment_id is None: + environment_id = str(uuid4()) + else: + environment_id = args.environment_id def root_psql(cmd: str): env_kubectl( diff --git a/src/adapter/src/coord.rs b/src/adapter/src/coord.rs index 84c807435dd63..f6df1e68fce84 100644 --- a/src/adapter/src/coord.rs +++ b/src/adapter/src/coord.rs @@ -108,7 +108,9 @@ use mz_compute_client::controller::error::InstanceMissing; use mz_compute_types::ComputeInstanceId; use mz_compute_types::dataflows::DataflowDescription; use mz_compute_types::plan::Plan; -use mz_controller::clusters::{ClusterConfig, ClusterEvent, ClusterStatus, ProcessId}; +use mz_controller::clusters::{ + ClusterConfig, ClusterEvent, ClusterStatus, ProcessId, ReplicaLocation, +}; use mz_controller::{ControllerConfig, Readiness}; use mz_controller_types::{ClusterId, ReplicaId, WatchSetId}; use mz_expr::{MapFilterProject, OptimizedMirRelationExpr, RowSetFinishing}; @@ -129,6 +131,7 @@ use mz_ore::{ use mz_persist_client::PersistClient; use mz_persist_client::batch::ProtoBatch; use mz_persist_client::usage::{ShardsUsageReferenced, StorageUsageClient}; +use mz_repr::adt::numeric::Numeric; use mz_repr::explain::{ExplainConfig, ExplainFormat}; use mz_repr::global_id::TransientIdGen; use mz_repr::optimize::OptimizerFeatures; @@ -145,7 +148,7 @@ use mz_sql::plan::{ OnTimeoutAction, Params, QueryWhen, }; use mz_sql::session::user::User; -use mz_sql::session::vars::SystemVars; +use mz_sql::session::vars::{MAX_CREDIT_CONSUMPTION_RATE, SystemVars, Var}; use mz_sql_parser::ast::ExplainStage; use mz_sql_parser::ast::display::AstDisplay; use mz_storage_client::client::TableData; @@ -1869,6 +1872,18 @@ impl Coordinator { .update_orchestrator_scheduling_config(scheduling_config); self.controller.update_configuration(dyncfg_updates); + self.validate_resource_limit_numeric( + Numeric::zero(), + self.current_credit_consumption_rate(), + |system_vars| { + self.license_key + .max_credit_consumption_rate() + .map_or_else(|| system_vars.max_credit_consumption_rate(), Numeric::from) + }, + "cluster replica", + MAX_CREDIT_CONSUMPTION_RATE.name(), + )?; + let mut policies_to_set: BTreeMap = Default::default(); @@ -3875,6 +3890,24 @@ impl Coordinator { let _ = internal_cmd_tx.send(Message::StorageUsagePrune(expired)); }); } + + fn current_credit_consumption_rate(&self) -> Numeric { + self.catalog() + .user_cluster_replicas() + .filter_map(|replica| match &replica.config.location { + ReplicaLocation::Managed(location) => Some(location.size_for_billing()), + ReplicaLocation::Unmanaged(_) => None, + }) + .map(|size| { + self.catalog() + .cluster_replica_sizes() + .0 + .get(size) + .expect("location size is validated against the cluster replica sizes") + .credits_per_hour + }) + .sum() + } } #[cfg(test)] diff --git a/src/adapter/src/coord/ddl.rs b/src/adapter/src/coord/ddl.rs index 5e10c0c36894e..4119cbaa6e41d 100644 --- a/src/adapter/src/coord/ddl.rs +++ b/src/adapter/src/coord/ddl.rs @@ -1695,24 +1695,8 @@ impl Coordinator { MAX_REPLICAS_PER_CLUSTER.name(), )?; } - let current_credit_consumption_rate = self - .catalog() - .user_cluster_replicas() - .filter_map(|replica| match &replica.config.location { - ReplicaLocation::Managed(location) => Some(location.size_for_billing()), - ReplicaLocation::Unmanaged(_) => None, - }) - .map(|size| { - self.catalog() - .cluster_replica_sizes() - .0 - .get(size) - .expect("location size is validated against the cluster replica sizes") - .credits_per_hour - }) - .sum(); self.validate_resource_limit_numeric( - current_credit_consumption_rate, + self.current_credit_consumption_rate(), new_credit_consumption_rate, |system_vars| { self.license_key @@ -1830,7 +1814,7 @@ impl Coordinator { /// Validate a specific type of float resource limit and return an error if that limit is exceeded. /// /// This is very similar to [`Self::validate_resource_limit`] but for numerics. - fn validate_resource_limit_numeric( + pub(crate) fn validate_resource_limit_numeric( &self, current_amount: Numeric, new_amount: Numeric,