Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion misc/python/materialize/cli/orchestratord.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ def main():
"--environment-name",
default="12345678-1234-1234-1234-123456789012",
)
parser_environment.add_argument("--environment-id", required=False)
parser_environment.add_argument("--postgres-url", default=DEFAULT_POSTGRES)
parser_environment.add_argument("--s3-bucket", default=DEFAULT_MINIO)
parser_environment.add_argument("--license-key-file")
Expand Down Expand Up @@ -206,7 +207,10 @@ def environment(args: argparse.Namespace):
cluster=args.kind_cluster_name,
)

environment_id = str(uuid4())
if args.environment_id is None:
environment_id = str(uuid4())
else:
environment_id = args.environment_id

def root_psql(cmd: str):
env_kubectl(
Expand Down
37 changes: 35 additions & 2 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ use mz_compute_client::controller::error::InstanceMissing;
use mz_compute_types::ComputeInstanceId;
use mz_compute_types::dataflows::DataflowDescription;
use mz_compute_types::plan::Plan;
use mz_controller::clusters::{ClusterConfig, ClusterEvent, ClusterStatus, ProcessId};
use mz_controller::clusters::{
ClusterConfig, ClusterEvent, ClusterStatus, ProcessId, ReplicaLocation,
};
use mz_controller::{ControllerConfig, Readiness};
use mz_controller_types::{ClusterId, ReplicaId, WatchSetId};
use mz_expr::{MapFilterProject, OptimizedMirRelationExpr, RowSetFinishing};
Expand All @@ -129,6 +131,7 @@ use mz_ore::{
use mz_persist_client::PersistClient;
use mz_persist_client::batch::ProtoBatch;
use mz_persist_client::usage::{ShardsUsageReferenced, StorageUsageClient};
use mz_repr::adt::numeric::Numeric;
use mz_repr::explain::{ExplainConfig, ExplainFormat};
use mz_repr::global_id::TransientIdGen;
use mz_repr::optimize::OptimizerFeatures;
Expand All @@ -145,7 +148,7 @@ use mz_sql::plan::{
OnTimeoutAction, Params, QueryWhen,
};
use mz_sql::session::user::User;
use mz_sql::session::vars::SystemVars;
use mz_sql::session::vars::{MAX_CREDIT_CONSUMPTION_RATE, SystemVars, Var};
use mz_sql_parser::ast::ExplainStage;
use mz_sql_parser::ast::display::AstDisplay;
use mz_storage_client::client::TableData;
Expand Down Expand Up @@ -1869,6 +1872,18 @@ impl Coordinator {
.update_orchestrator_scheduling_config(scheduling_config);
self.controller.update_configuration(dyncfg_updates);

self.validate_resource_limit_numeric(
Numeric::zero(),
self.current_credit_consumption_rate(),
|system_vars| {
self.license_key
.max_credit_consumption_rate()
.map_or_else(|| system_vars.max_credit_consumption_rate(), Numeric::from)
},
"cluster replica",
MAX_CREDIT_CONSUMPTION_RATE.name(),
)?;

let mut policies_to_set: BTreeMap<CompactionWindow, CollectionIdBundle> =
Default::default();

Expand Down Expand Up @@ -3875,6 +3890,24 @@ impl Coordinator {
let _ = internal_cmd_tx.send(Message::StorageUsagePrune(expired));
});
}

fn current_credit_consumption_rate(&self) -> Numeric {
self.catalog()
.user_cluster_replicas()
.filter_map(|replica| match &replica.config.location {
ReplicaLocation::Managed(location) => Some(location.size_for_billing()),
ReplicaLocation::Unmanaged(_) => None,
})
.map(|size| {
self.catalog()
.cluster_replica_sizes()
.0
.get(size)
.expect("location size is validated against the cluster replica sizes")
.credits_per_hour
})
.sum()
}
}

#[cfg(test)]
Expand Down
20 changes: 2 additions & 18 deletions src/adapter/src/coord/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1695,24 +1695,8 @@ impl Coordinator {
MAX_REPLICAS_PER_CLUSTER.name(),
)?;
}
let current_credit_consumption_rate = self
.catalog()
.user_cluster_replicas()
.filter_map(|replica| match &replica.config.location {
ReplicaLocation::Managed(location) => Some(location.size_for_billing()),
ReplicaLocation::Unmanaged(_) => None,
})
.map(|size| {
self.catalog()
.cluster_replica_sizes()
.0
.get(size)
.expect("location size is validated against the cluster replica sizes")
.credits_per_hour
})
.sum();
self.validate_resource_limit_numeric(
current_credit_consumption_rate,
self.current_credit_consumption_rate(),
new_credit_consumption_rate,
|system_vars| {
self.license_key
Expand Down Expand Up @@ -1830,7 +1814,7 @@ impl Coordinator {
/// Validate a specific type of float resource limit and return an error if that limit is exceeded.
///
/// This is very similar to [`Self::validate_resource_limit`] but for numerics.
fn validate_resource_limit_numeric<F>(
pub(crate) fn validate_resource_limit_numeric<F>(
&self,
current_amount: Numeric,
new_amount: Numeric,
Expand Down