From 007212bda394c4b99600401c9e7bf3c81928d394 Mon Sep 17 00:00:00 2001 From: Jake Goulding Date: Tue, 2 Jul 2024 15:44:27 -0400 Subject: [PATCH] Introduce limits for number of active processes This also fuses together the unique ID generation with resource concurrency limits. As part of this, we expose some hooks that we can use to provide metrics about how many people are waiting for containers / processes. --- compiler/base/orchestrator/src/coordinator.rs | 209 +++++++------ .../orchestrator/src/coordinator/limits.rs | 296 ++++++++++++++++++ ui/src/main.rs | 66 +++- ui/src/metrics.rs | 14 + ui/src/server_axum/websocket.rs | 1 + 5 files changed, 482 insertions(+), 104 deletions(-) create mode 100644 compiler/base/orchestrator/src/coordinator/limits.rs diff --git a/compiler/base/orchestrator/src/coordinator.rs b/compiler/base/orchestrator/src/coordinator.rs index 5846cc30..4a6e2719 100644 --- a/compiler/base/orchestrator/src/coordinator.rs +++ b/compiler/base/orchestrator/src/coordinator.rs @@ -19,7 +19,7 @@ use tokio::{ join, process::{Child, ChildStdin, ChildStdout, Command}, select, - sync::{mpsc, oneshot, OnceCell, OwnedSemaphorePermit, Semaphore}, + sync::{mpsc, oneshot, OnceCell}, task::{JoinHandle, JoinSet}, time::{self, MissedTickBehavior}, }; @@ -37,6 +37,8 @@ use crate::{ DropErrorDetailsExt, }; +pub mod limits; + #[derive(Debug, Clone, PartialEq, Eq)] pub struct Versions { pub stable: ChannelVersions, @@ -820,83 +822,60 @@ enum DemultiplexCommand { ListenOnce(JobId, oneshot::Sender), } -/// The [`Coordinator`][] usually represents a mostly-global resource, -/// such as a Docker container. To avoid conflicts, each container -/// must have a unique name, but that uniqueness can only be -/// guaranteed by whoever is creating [`Coordinator`][]s via the -/// [`CoordinatorFactory`][]. -pub trait IdProvider: Send + Sync + fmt::Debug + 'static { - fn next(&self) -> String; -} +type ResourceError = Box; +type ResourceResult = std::result::Result; -/// A reasonable choice when there's a single [`IdProvider`][] in the -/// entire process. +/// Mediate resource limits and names for created objects. /// -/// This represents uniqueness via a combination of +/// The [`Coordinator`][] requires mostly-global resources, such as +/// Docker containers or running processes. This trait covers two cases: /// -/// 1. **process start time** — this helps avoid conflicts from other -/// processes, assuming they were started at least one second apart. +/// 1. To avoid conflicts, each container must have a unique name. +/// 2. Containers and processes compete for CPU / memory. /// -/// 2. **instance counter** — this avoids conflicts from other -/// [`Coordinator`][]s started inside this process. -#[derive(Debug)] -pub struct GlobalIdProvider { - start: u64, - id: AtomicU64, +/// Only a global view guarantees the unique names and resource +/// allocation, so whoever creates [`Coordinator`][]s via the +/// [`CoordinatorFactory`][] is responsible. +pub trait ResourceLimits: Send + Sync + fmt::Debug + 'static { + /// Block until resources for a container are available. + fn next_container(&self) -> BoxFuture<'static, ResourceResult>>; } -impl GlobalIdProvider { - pub fn new() -> Self { - let now = std::time::SystemTime::now(); - let start = now - .duration_since(std::time::UNIX_EPOCH) - .unwrap_or_default() - .as_secs(); - - let id = AtomicU64::new(0); - - Self { start, id } - } +/// Represents one allowed Docker container (or equivalent). +pub trait ContainerPermit: Send + Sync + fmt::Debug + fmt::Display + 'static { + /// Block until resources for a process are available. + fn next_process(&self) -> BoxFuture<'static, ResourceResult>>; } -impl IdProvider for GlobalIdProvider { - fn next(&self) -> String { - let start = self.start; - let id = self.id.fetch_add(1, Ordering::SeqCst); - - format!("{start}-{id}") - } -} +/// Represents one allowed process. +pub trait ProcessPermit: Send + Sync + fmt::Debug + 'static {} /// Enforces a limited number of concurrent `Coordinator`s. #[derive(Debug)] pub struct CoordinatorFactory { - semaphore: Arc, - ids: Arc, + limits: Arc, } impl CoordinatorFactory { - pub fn new(ids: Arc, maximum: usize) -> Self { - let semaphore = Arc::new(Semaphore::new(maximum)); - - Self { semaphore, ids } + pub fn new(limits: Arc) -> Self { + Self { limits } } pub fn build(&self) -> Coordinator where - B: Backend + From>, + B: Backend + Default, { - let semaphore = self.semaphore.clone(); + let limits = self.limits.clone(); - let backend = B::from(self.ids.clone()); + let backend = B::default(); - Coordinator::new(semaphore, backend) + Coordinator::new(limits, backend) } } #[derive(Debug)] pub struct Coordinator { - semaphore: Arc, + limits: Arc, backend: B, stable: OnceCell, beta: OnceCell, @@ -917,11 +896,11 @@ impl Coordinator where B: Backend, { - pub fn new(semaphore: Arc, backend: B) -> Self { + pub fn new(limits: Arc, backend: B) -> Self { let token = CancellationToken::new(); Self { - semaphore, + limits, backend, stable: OnceCell::new(), beta: OnceCell::new(), @@ -1159,9 +1138,9 @@ where container .get_or_try_init(|| { - let semaphore = self.semaphore.clone(); + let limits = self.limits.clone(); let token = self.token.clone(); - Container::new(channel, semaphore, token, &self.backend) + Container::new(channel, limits, token, &self.backend) }) .await } @@ -1169,7 +1148,7 @@ where #[derive(Debug)] struct Container { - permit: OwnedSemaphorePermit, + permit: Box, task: JoinHandle>, kill_child: Option, modify_cargo_toml: ModifyCargoToml, @@ -1179,16 +1158,14 @@ struct Container { impl Container { async fn new( channel: Channel, - semaphore: Arc, + limits: Arc, token: CancellationToken, backend: &impl Backend, ) -> Result { - let permit = semaphore - .acquire_owned() - .await - .context(AcquirePermitSnafu)?; + let permit = limits.next_container().await.context(AcquirePermitSnafu)?; - let (mut child, kill_child, stdin, stdout) = backend.run_worker_in_background(channel)?; + let (mut child, kill_child, stdin, stdout) = + backend.run_worker_in_background(channel, &permit)?; let IoQueue { mut tasks, to_worker_tx, @@ -1289,6 +1266,7 @@ impl Container { ) -> Result, VersionError> { let v = self.spawn_cargo_task(token.clone(), cmd).await?; let SpawnCargo { + permit: _permit, task, stdin_tx, stdout_rx, @@ -1320,6 +1298,7 @@ impl Container { let token = Default::default(); let ActiveExecution { + permit: _permit, task, stdin_tx, stdout_rx, @@ -1357,6 +1336,7 @@ impl Container { modify_cargo_toml.context(CouldNotModifyCargoTomlSnafu)?; let SpawnCargo { + permit, task, stdin_tx, stdout_rx, @@ -1396,6 +1376,7 @@ impl Container { .boxed(); Ok(ActiveExecution { + permit, task, stdin_tx, stdout_rx, @@ -1411,6 +1392,7 @@ impl Container { let token = Default::default(); let ActiveCompilation { + permit: _permit, task, stdout_rx, stderr_rx, @@ -1448,6 +1430,7 @@ impl Container { modify_cargo_toml.context(CouldNotModifyCargoTomlSnafu)?; let SpawnCargo { + permit, task, stdin_tx, stdout_rx, @@ -1493,6 +1476,7 @@ impl Container { .boxed(); Ok(ActiveCompilation { + permit, task, stdout_rx, stderr_rx, @@ -1506,6 +1490,7 @@ impl Container { let token = Default::default(); let ActiveFormatting { + permit: _permit, task, stdout_rx, stderr_rx, @@ -1540,6 +1525,7 @@ impl Container { modify_cargo_toml.context(CouldNotModifyCargoTomlSnafu)?; let SpawnCargo { + permit, task, stdin_tx, stdout_rx, @@ -1578,6 +1564,7 @@ impl Container { .boxed(); Ok(ActiveFormatting { + permit, task, stdout_rx, stderr_rx, @@ -1591,6 +1578,7 @@ impl Container { let token = Default::default(); let ActiveClippy { + permit: _permit, task, stdout_rx, stderr_rx, @@ -1622,6 +1610,7 @@ impl Container { modify_cargo_toml.context(CouldNotModifyCargoTomlSnafu)?; let SpawnCargo { + permit, task, stdin_tx, stdout_rx, @@ -1652,6 +1641,7 @@ impl Container { .boxed(); Ok(ActiveClippy { + permit, task, stdout_rx, stderr_rx, @@ -1662,6 +1652,7 @@ impl Container { let token = Default::default(); let ActiveMiri { + permit: _permit, task, stdout_rx, stderr_rx, @@ -1693,6 +1684,7 @@ impl Container { modify_cargo_toml.context(CouldNotModifyCargoTomlSnafu)?; let SpawnCargo { + permit, task, stdin_tx, stdout_rx, @@ -1723,6 +1715,7 @@ impl Container { .boxed(); Ok(ActiveMiri { + permit, task, stdout_rx, stderr_rx, @@ -1736,6 +1729,7 @@ impl Container { let token = Default::default(); let ActiveMacroExpansion { + permit: _permit, task, stdout_rx, stderr_rx, @@ -1767,6 +1761,7 @@ impl Container { modify_cargo_toml.context(CouldNotModifyCargoTomlSnafu)?; let SpawnCargo { + permit, task, stdin_tx, stdout_rx, @@ -1797,6 +1792,7 @@ impl Container { .boxed(); Ok(ActiveMacroExpansion { + permit, task, stdout_rx, stderr_rx, @@ -1810,6 +1806,12 @@ impl Container { ) -> Result { use spawn_cargo_error::*; + let permit = self + .permit + .next_process() + .await + .context(AcquirePermitSnafu)?; + let (stdin_tx, mut stdin_rx) = mpsc::channel(8); let (stdout_tx, stdout_rx) = mpsc::channel(8); let (stderr_tx, stderr_rx) = mpsc::channel(8); @@ -1893,6 +1895,7 @@ impl Container { }); Ok(SpawnCargo { + permit, task, stdin_tx, stdout_rx, @@ -1932,6 +1935,7 @@ impl Container { } pub struct ActiveExecution { + pub permit: Box, pub task: BoxFuture<'static, Result>, pub stdin_tx: mpsc::Sender, pub stdout_rx: mpsc::Receiver, @@ -1976,6 +1980,7 @@ pub enum ExecuteError { } pub struct ActiveCompilation { + pub permit: Box, pub task: BoxFuture<'static, Result>, pub stdout_rx: mpsc::Receiver, pub stderr_rx: mpsc::Receiver, @@ -2023,6 +2028,7 @@ pub enum CompileError { } pub struct ActiveFormatting { + pub permit: Box, pub task: BoxFuture<'static, Result>, pub stdout_rx: mpsc::Receiver, pub stderr_rx: mpsc::Receiver, @@ -2070,6 +2076,7 @@ pub enum FormatError { } pub struct ActiveClippy { + pub permit: Box, pub task: BoxFuture<'static, Result>, pub stdout_rx: mpsc::Receiver, pub stderr_rx: mpsc::Receiver, @@ -2111,6 +2118,7 @@ pub enum ClippyError { } pub struct ActiveMiri { + pub permit: Box, pub task: BoxFuture<'static, Result>, pub stdout_rx: mpsc::Receiver, pub stderr_rx: mpsc::Receiver, @@ -2152,6 +2160,7 @@ pub enum MiriError { } pub struct ActiveMacroExpansion { + pub permit: Box, pub task: BoxFuture<'static, Result>, pub stdout_rx: mpsc::Receiver, pub stderr_rx: mpsc::Receiver, @@ -2193,6 +2202,7 @@ pub enum MacroExpansionError { } struct SpawnCargo { + permit: Box, task: JoinHandle>, stdin_tx: mpsc::Sender, stdout_rx: mpsc::Receiver, @@ -2220,6 +2230,9 @@ pub enum SpawnCargoError { #[snafu(display("Unable to send kill message"))] Kill { source: MultiplexedSenderError }, + + #[snafu(display("Could not acquire a process permit"))] + AcquirePermit { source: ResourceError }, } #[derive(Debug, Clone)] @@ -2533,8 +2546,9 @@ pub trait Backend { fn run_worker_in_background( &self, channel: Channel, + id: impl fmt::Display, ) -> Result<(Child, Option, ChildStdin, ChildStdout)> { - let (mut start, kill) = self.prepare_worker_command(channel); + let (mut start, kill) = self.prepare_worker_command(channel, id); let mut child = start .stdin(Stdio::piped()) @@ -2547,15 +2561,23 @@ pub trait Backend { Ok((child, kill, stdin, stdout)) } - fn prepare_worker_command(&self, channel: Channel) -> (Command, Option); + fn prepare_worker_command( + &self, + channel: Channel, + id: impl fmt::Display, + ) -> (Command, Option); } impl Backend for &B where B: Backend, { - fn prepare_worker_command(&self, channel: Channel) -> (Command, Option) { - B::prepare_worker_command(self, channel) + fn prepare_worker_command( + &self, + channel: Channel, + id: impl fmt::Display, + ) -> (Command, Option) { + B::prepare_worker_command(self, channel, id) } } @@ -2605,27 +2627,16 @@ fn basic_secure_docker_command() -> Command { ) } -pub struct DockerBackend { - ids: Arc, -} - -impl From> for DockerBackend { - fn from(ids: Arc) -> Self { - Self { ids } - } -} - -impl DockerBackend { - fn next_name(&self) -> String { - let id = self.ids.next(); - - format!("playground-{id}") - } -} +#[derive(Default)] +pub struct DockerBackend(()); impl Backend for DockerBackend { - fn prepare_worker_command(&self, channel: Channel) -> (Command, Option) { - let name = self.next_name(); + fn prepare_worker_command( + &self, + channel: Channel, + id: impl fmt::Display, + ) -> (Command, Option) { + let name = format!("playground-{id}"); let mut command = basic_secure_docker_command(); command @@ -2704,8 +2715,8 @@ pub enum Error { #[snafu(display("Unable to load original Cargo.toml"))] CouldNotLoadCargoToml { source: ModifyCargoTomlError }, - #[snafu(display("Could not acquire a semaphore permit"))] - AcquirePermit { source: tokio::sync::AcquireError }, + #[snafu(display("Could not acquire a container permit"))] + AcquirePermit { source: ResourceError }, } struct IoQueue { @@ -2809,8 +2820,8 @@ mod tests { project_dir: TempDir, } - impl From> for TestBackend { - fn from(_ids: Arc) -> Self { + impl Default for TestBackend { + fn default() -> Self { static COMPILE_WORKER_ONCE: Once = Once::new(); COMPILE_WORKER_ONCE.call_once(|| { @@ -2846,7 +2857,11 @@ mod tests { } impl Backend for TestBackend { - fn prepare_worker_command(&self, channel: Channel) -> (Command, Option) { + fn prepare_worker_command( + &self, + channel: Channel, + _id: impl fmt::Display, + ) -> (Command, Option) { let channel_dir = self.project_dir.path().join(channel.to_str()); let mut command = Command::new("./target/debug/worker"); @@ -2864,12 +2879,11 @@ mod tests { .unwrap_or(5) }); - static TEST_COORDINATOR_ID_PROVIDER: Lazy> = - Lazy::new(|| Arc::new(GlobalIdProvider::new())); + static TEST_COORDINATOR_ID_PROVIDER: Lazy> = + Lazy::new(|| Arc::new(limits::Global::new(100, *MAX_CONCURRENT_TESTS))); - static TEST_COORDINATOR_FACTORY: Lazy = Lazy::new(|| { - CoordinatorFactory::new(TEST_COORDINATOR_ID_PROVIDER.clone(), *MAX_CONCURRENT_TESTS) - }); + static TEST_COORDINATOR_FACTORY: Lazy = + Lazy::new(|| CoordinatorFactory::new(TEST_COORDINATOR_ID_PROVIDER.clone())); fn new_coordinator_test() -> Coordinator { TEST_COORDINATOR_FACTORY.build() @@ -3146,6 +3160,7 @@ mod tests { let token = Default::default(); let ActiveExecution { + permit: _permit, task, stdin_tx, stdout_rx, @@ -3198,6 +3213,7 @@ mod tests { let token = Default::default(); let ActiveExecution { + permit: _permit, task, stdin_tx, stdout_rx, @@ -3253,6 +3269,7 @@ mod tests { let token = CancellationToken::new(); let ActiveExecution { + permit: _permit, task, stdin_tx: _, stdout_rx, @@ -3316,6 +3333,7 @@ mod tests { let token = CancellationToken::new(); let ActiveExecution { + permit: _permit, task, stdin_tx: _, stdout_rx, @@ -3397,6 +3415,7 @@ mod tests { let token = Default::default(); let ActiveCompilation { + permit: _permit, task, stdout_rx, stderr_rx, diff --git a/compiler/base/orchestrator/src/coordinator/limits.rs b/compiler/base/orchestrator/src/coordinator/limits.rs new file mode 100644 index 00000000..0e1f741f --- /dev/null +++ b/compiler/base/orchestrator/src/coordinator/limits.rs @@ -0,0 +1,296 @@ +use futures::{future::BoxFuture, prelude::*}; +use std::{ + fmt, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, +}; +use tokio::sync::{OwnedSemaphorePermit, Semaphore}; + +use super::{ContainerPermit, ProcessPermit, ResourceLimits, ResourceResult}; + +/// Describe how the resource was (or was not) acquired. +#[derive(Debug, Copy, Clone, PartialEq)] +pub enum Acquisition { + /// The success path + Acquired, + /// The caller did not wait to acquire anything + Aborted, + /// Could not acquire + Error, +} + +impl Acquisition { + fn from_result(value: &Result) -> Self { + match value { + Ok(_) => Acquisition::Acquired, + Err(_) => Acquisition::Error, + } + } +} + +/// Hooks for monitoring how resources are requested and used. +pub trait Lifecycle: Send + Sync + fmt::Debug + Clone + 'static { + fn container_start(&self) {} + fn container_acquired(&self, #[allow(unused)] how: Acquisition) {} + fn container_release(&self) {} + + fn process_start(&self) {} + fn process_acquired(&self, #[allow(unused)] how: Acquisition) {} + fn process_release(&self) {} +} + +/// Does nothing for each event. +#[derive(Debug, Clone)] +pub struct NoOpLifecycle; + +impl Lifecycle for NoOpLifecycle {} + +/// Prints to stderr for each event. +#[derive(Debug, Clone)] +pub struct StderrLifecycle; + +impl Lifecycle for StderrLifecycle { + fn container_start(&self) { + eprintln!("container_start"); + } + + fn container_acquired(&self, how: Acquisition) { + eprintln!("container_acquired {how:?}"); + } + + fn container_release(&self) { + eprintln!("container_release"); + } + + fn process_start(&self) { + eprintln!("process_start"); + } + + fn process_acquired(&self, how: Acquisition) { + eprintln!("process_acquired {how:?}"); + } + + fn process_release(&self) { + eprintln!("process_release"); + } +} + +/// A reasonable choice when there's a single [`ResourceLimits`][] in +/// the entire process. +/// +/// This represents uniqueness via a combination of +/// +/// 1. **process start time** — this helps avoid conflicts from other +/// processes, assuming they were started at least one second +/// apart. +/// +/// 2. **instance counter** — this avoids conflicts from other +/// [`Coordinator`][super::Coordinator]s started inside this +/// process. +#[derive(Debug)] +pub struct Global { + lifecycle: L, + container_semaphore: Arc, + process_semaphore: Arc, + start: u64, + id: AtomicU64, +} + +/// Manages containers +#[derive(Debug)] +struct TrackContainer +where + L: Lifecycle, +{ + lifecycle: L, + #[allow(unused)] + container_permit: OwnedSemaphorePermit, + process_semaphore: Arc, + start: u64, + id: u64, +} + +/// Manages processess +#[derive(Debug)] +struct TrackProcess +where + L: Lifecycle, +{ + lifecycle: L, + #[allow(unused)] + process_permit: OwnedSemaphorePermit, +} + +impl Global { + pub fn new(container_limit: usize, process_limit: usize) -> Self { + Self::with_lifecycle(container_limit, process_limit, NoOpLifecycle) + } +} + +impl Global +where + L: Lifecycle, +{ + pub fn with_lifecycle(container_limit: usize, process_limit: usize, lifecycle: L) -> Self { + let container_semaphore = Arc::new(Semaphore::new(container_limit)); + let process_semaphore = Arc::new(Semaphore::new(process_limit)); + + let now = std::time::SystemTime::now(); + let start = now + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + + let id = AtomicU64::new(0); + + Self { + lifecycle, + container_semaphore, + process_semaphore, + start, + id, + } + } +} + +impl ResourceLimits for Global +where + L: Lifecycle, +{ + fn next_container(&self) -> BoxFuture<'static, ResourceResult>> { + let lifecycle = self.lifecycle.clone(); + let container_semaphore = self.container_semaphore.clone(); + let process_semaphore = self.process_semaphore.clone(); + let start = self.start; + let id = self.id.fetch_add(1, Ordering::SeqCst); + + async move { + let guard = ContainerAcquireGuard::start(&lifecycle); + + let container_permit = container_semaphore.acquire_owned().await; + let container_permit = guard.complete(container_permit)?; + + let token = TrackContainer { + lifecycle, + container_permit, + process_semaphore, + start, + id, + }; + Ok(Box::new(token) as _) + } + .boxed() + } +} + +impl fmt::Display for TrackContainer +where + L: Lifecycle, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let Self { start, id, .. } = self; + write!(f, "{start}-{id}") + } +} + +impl ContainerPermit for TrackContainer +where + L: Lifecycle, +{ + fn next_process(&self) -> BoxFuture<'static, ResourceResult>> { + let lifecycle = self.lifecycle.clone(); + let process_semaphore = self.process_semaphore.clone(); + + async move { + let guard = ProcessAcquireGuard::start(&lifecycle); + + let process_permit = process_semaphore.acquire_owned().await; + let process_permit = guard.complete(process_permit)?; + + let token = TrackProcess { + lifecycle, + process_permit, + }; + Ok(Box::new(token) as _) + } + .boxed() + } +} + +impl Drop for TrackContainer +where + L: Lifecycle, +{ + fn drop(&mut self) { + self.lifecycle.container_release() + } +} + +impl ProcessPermit for TrackProcess where L: Lifecycle {} + +impl Drop for TrackProcess +where + L: Lifecycle, +{ + fn drop(&mut self) { + self.lifecycle.process_release() + } +} + +/// Lifecycle drop guard for containers +struct ContainerAcquireGuard<'a, L: Lifecycle>(&'a L, Acquisition); + +impl<'a, L> ContainerAcquireGuard<'a, L> +where + L: Lifecycle, +{ + fn start(lifecycle: &'a L) -> Self { + lifecycle.container_start(); + Self(lifecycle, Acquisition::Aborted) + } + + fn complete(mut self, r: Result) -> Result { + self.1 = Acquisition::from_result(&r); + r + } +} + +impl<'a, L> Drop for ContainerAcquireGuard<'a, L> +where + L: Lifecycle, +{ + fn drop(&mut self) { + self.0.container_acquired(self.1); + } +} + +/// Lifecycle drop guard for processes +struct ProcessAcquireGuard<'a, L>(&'a L, Acquisition) +where + L: Lifecycle; + +impl<'a, L> ProcessAcquireGuard<'a, L> +where + L: Lifecycle, +{ + fn start(lifecycle: &'a L) -> Self { + lifecycle.process_start(); + Self(lifecycle, Acquisition::Aborted) + } + + fn complete(mut self, r: Result) -> Result { + self.1 = Acquisition::from_result(&r); + r + } +} + +impl<'a, L> Drop for ProcessAcquireGuard<'a, L> +where + L: Lifecycle, +{ + fn drop(&mut self) { + self.0.process_acquired(self.1); + } +} diff --git a/ui/src/main.rs b/ui/src/main.rs index 710881f7..8decaaad 100644 --- a/ui/src/main.rs +++ b/ui/src/main.rs @@ -1,6 +1,9 @@ #![deny(rust_2018_idioms)] -use orchestrator::coordinator::{CoordinatorFactory, GlobalIdProvider, IdProvider}; +use orchestrator::coordinator::{ + limits::{self, Acquisition}, + CoordinatorFactory, ResourceLimits, +}; use std::{ net::SocketAddr, path::{Path, PathBuf}, @@ -11,7 +14,8 @@ use tracing_subscriber::EnvFilter; const DEFAULT_ADDRESS: &str = "127.0.0.1"; const DEFAULT_PORT: u16 = 5000; -const DEFAULT_COORDINATORS_LIMIT: usize = 10; +const DEFAULT_COORDINATORS_LIMIT: usize = 25; +const DEFAULT_PROCESSES_LIMIT: usize = 10; mod env; mod gist; @@ -46,8 +50,7 @@ struct Config { metrics_token: Option, feature_flags: FeatureFlags, request_db_path: Option, - id_provider: Arc, - coordinators_limit: usize, + limits: Arc, port: u16, root: PathBuf, } @@ -105,13 +108,22 @@ impl Config { let request_db_path = env::var_os("PLAYGROUND_REQUEST_DATABASE").map(Into::into); - let id_provider = Arc::new(GlobalIdProvider::new()); - let coordinators_limit = env::var("PLAYGROUND_COORDINATORS_LIMIT") .ok() .and_then(|l| l.parse().ok()) .unwrap_or(DEFAULT_COORDINATORS_LIMIT); + let processes_limit = env::var("PLAYGROUND_PROCESSES_LIMIT") + .ok() + .and_then(|l| l.parse().ok()) + .unwrap_or(DEFAULT_PROCESSES_LIMIT); + + let limits = Arc::new(limits::Global::with_lifecycle( + coordinators_limit, + processes_limit, + LifecycleMetrics, + )); + Self { address, cors_enabled, @@ -119,8 +131,7 @@ impl Config { metrics_token, feature_flags, request_db_path, - id_provider, - coordinators_limit, + limits, port, root, } @@ -158,7 +169,7 @@ impl Config { } fn coordinator_factory(&self) -> CoordinatorFactory { - CoordinatorFactory::new(self.id_provider.clone(), self.coordinators_limit) + CoordinatorFactory::new(self.limits.clone()) } fn server_socket_addr(&self) -> SocketAddr { @@ -184,3 +195,40 @@ impl MetricsToken { MetricsToken(Arc::new(token.into())) } } + +#[derive(Debug, Copy, Clone)] +struct LifecycleMetrics; + +impl limits::Lifecycle for LifecycleMetrics { + fn container_start(&self) { + metrics::CONTAINER_QUEUE.inc(); + } + + fn container_acquired(&self, how: limits::Acquisition) { + metrics::CONTAINER_QUEUE.dec(); + + if how == Acquisition::Acquired { + metrics::CONTAINER_ACTIVE.inc(); + } + } + + fn container_release(&self) { + metrics::CONTAINER_ACTIVE.dec(); + } + + fn process_start(&self) { + metrics::PROCESS_QUEUE.inc(); + } + + fn process_acquired(&self, how: limits::Acquisition) { + metrics::PROCESS_QUEUE.dec(); + + if how == Acquisition::Acquired { + metrics::PROCESS_ACTIVE.inc(); + } + } + + fn process_release(&self) { + metrics::PROCESS_ACTIVE.dec(); + } +} diff --git a/ui/src/metrics.rs b/ui/src/metrics.rs index b223f46f..37b64044 100644 --- a/ui/src/metrics.rs +++ b/ui/src/metrics.rs @@ -44,6 +44,20 @@ lazy_static! { &["success"], ) .unwrap(); + pub(crate) static ref CONTAINER_QUEUE: IntGauge = register_int_gauge!( + "playground_container_queue", + "Number of waiters for a container" + ) + .unwrap(); + pub(crate) static ref CONTAINER_ACTIVE: IntGauge = + register_int_gauge!("playground_container_active", "Number of active containers").unwrap(); + pub(crate) static ref PROCESS_QUEUE: IntGauge = register_int_gauge!( + "playground_process_queue", + "Number of waiters for a process" + ) + .unwrap(); + pub(crate) static ref PROCESS_ACTIVE: IntGauge = + register_int_gauge!("playground_process_active", "Number of active processs").unwrap(); } #[derive(Debug, Copy, Clone, strum::IntoStaticStr)] diff --git a/ui/src/server_axum/websocket.rs b/ui/src/server_axum/websocket.rs index d97ed9df..d86b6232 100644 --- a/ui/src/server_axum/websocket.rs +++ b/ui/src/server_axum/websocket.rs @@ -662,6 +662,7 @@ async fn handle_execute_inner( use CompletedOrAbandoned::*; let coordinator::ActiveExecution { + permit: _permit, mut task, stdin_tx, mut stdout_rx,