diff --git a/crates/base/src/worker/pool.rs b/crates/base/src/worker/pool.rs index 318a7844d..d89b2133f 100644 --- a/crates/base/src/worker/pool.rs +++ b/crates/base/src/worker/pool.rs @@ -26,16 +26,19 @@ use ext_workers::context::UserWorkerProfile; use ext_workers::context::WorkerContextInitOpts; use ext_workers::context::WorkerRuntimeOpts; use ext_workers::errors::WorkerError; +use futures_util::future::join_all; use http_v02::Request; use hyper_v014::Body; use log::error; use tokio::sync::mpsc; use tokio::sync::mpsc::UnboundedSender; +use tokio::sync::oneshot; use tokio::sync::oneshot::Sender; use tokio::sync::Notify; use tokio::sync::OwnedSemaphorePermit; use tokio::sync::Semaphore; use tokio::sync::TryAcquireError; +use tokio::time::timeout; use tokio_util::sync::CancellationToken; use uuid::Uuid; @@ -442,6 +445,7 @@ impl WorkerPool { is_retired: Arc::new(AtomicFlag::default()), }; + let (early_drop_tx, early_drop_rx) = mpsc::unbounded_channel(); let (req_end_timing_tx, req_end_timing_rx) = mpsc::unbounded_channel::<()>(); @@ -453,6 +457,7 @@ impl WorkerPool { user_worker_rt_opts.cancel = Some(cancel.clone()); worker_options.timing = Some(Timing { + early_drop_rx, status: status.clone(), req: (req_start_timing_rx, req_end_timing_rx), }); @@ -472,6 +477,7 @@ impl WorkerPool { Ok(surface) => { let profile = UserWorkerProfile { worker_request_msg_tx: surface.msg_tx, + early_drop_tx, timing_tx_pair: (req_start_timing_tx, req_end_timing_tx), service_path, permit: permit.map(Arc::new), @@ -656,6 +662,24 @@ impl WorkerPool { self.metric_src.decl_active_user_workers(); } + async fn try_cleanup_idle_workers(&mut self, timeout_ms: usize) -> usize { + let mut rxs = vec![]; + for profile in self.user_workers.values_mut() { + let (tx, rx) = oneshot::channel(); + if profile.early_drop_tx.send(tx).is_ok() { + rxs.push(timeout(Duration::from_millis(timeout_ms as u64), rx)); + } + } + + join_all(rxs) + .await + .into_iter() + .filter_map(|it| it.ok()) + .map(|it| it.unwrap_or_default()) + .filter(|it| *it) + .count() + } + fn retire(&mut self, key: &Uuid) { if let Some(profile) = self.user_workers.get_mut(key) { let registry = self @@ -795,6 +819,10 @@ pub async fn create_user_worker_pool( break; } } + + Some(UserWorkerMsgs::TryCleanupIdleWorkers(timeout_ms, res_tx)) => { + let _ = res_tx.send(worker_pool.try_cleanup_idle_workers(timeout_ms).await); + } } } } diff --git a/crates/base/src/worker/supervisor/strategy_per_worker.rs b/crates/base/src/worker/supervisor/strategy_per_worker.rs index 29cccb0b3..c952e4ecc 100644 --- a/crates/base/src/worker/supervisor/strategy_per_worker.rs +++ b/crates/base/src/worker/supervisor/strategy_per_worker.rs @@ -143,8 +143,10 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) { } = args; let Timing { + mut early_drop_rx, status: TimingStatus { demand, is_retired }, req: (_, mut req_end_rx), + .. } = timing.unwrap_or_default(); let UserWorkerRuntimeOpts { @@ -467,6 +469,18 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) { complete_reason = Some(ShutdownReason::Memory); } + Some(tx) = early_drop_rx.recv() => { + let mut acknowledged = false; + if state.have_all_pending_tasks_been_resolved() { + if let Some(func) = dispatch_early_drop_beforeunload_fn.take() { + early_retire_fn(); + func(); + acknowledged = true; + } + } + let _ = tx.send(acknowledged); + } + _ = &mut early_drop_fut => { info!("early termination has been triggered: isolate: {:?}", key); complete_reason = Some(ShutdownReason::EarlyDrop); diff --git a/crates/base/test_cases/main/index.ts b/crates/base/test_cases/main/index.ts index b8b85cf34..f19dcd544 100644 --- a/crates/base/test_cases/main/index.ts +++ b/crates/base/test_cases/main/index.ts @@ -14,10 +14,18 @@ function parseIntFromHeadersOrDefault(req: Request, key: string, val?: number) { return parsedValue; } -Deno.serve((req: Request) => { +Deno.serve(async (req: Request) => { console.log(req.url); const url = new URL(req.url); const { pathname } = url; + + // handle health checks + if (pathname === "/_internal/cleanup-idle-workers") { + return Response.json({ + count: await EdgeRuntime.userWorkers.tryCleanupIdleWorkers(1000), + }); + } + const path_parts = pathname.split("/"); let service_name = path_parts[1]; diff --git a/crates/base/tests/integration_tests.rs b/crates/base/tests/integration_tests.rs index 92fdfe0e5..4d866f206 100644 --- a/crates/base/tests/integration_tests.rs +++ b/crates/base/tests/integration_tests.rs @@ -3911,6 +3911,66 @@ async fn test_request_absent_timeout() { unreachable!("test failed"); } +#[tokio::test] +#[serial] +async fn test_user_workers_cleanup_idle_workers() { + let (tx, mut rx) = mpsc::unbounded_channel(); + let tb = TestBedBuilder::new("./test_cases/main") + .with_per_worker_policy(None) + .with_worker_event_sender(Some(tx)) + .build() + .await; + + let resp = tb + .request(|b| { + b.uri("/sleep-5000ms") + .header("x-worker-timeout-ms", HeaderValue::from_static("3600000")) + .body(Body::empty()) + .context("can't make request") + }) + .await + .unwrap(); + + assert_eq!(resp.status().as_u16(), StatusCode::OK); + + let mut resp = tb + .request(|b| { + b.uri("/_internal/cleanup-idle-workers") + .body(Body::empty()) + .context("can't make request") + }) + .await + .unwrap(); + + assert_eq!(resp.status().as_u16(), StatusCode::OK); + + let bytes = hyper_v014::body::HttpBody::collect(resp.body_mut()) + .await + .unwrap() + .to_bytes(); + + let payload = serde_json::from_slice::(&bytes).unwrap(); + let count = payload.get("count").unwrap().as_u64().unwrap(); + + assert_eq!(count, 1); + + sleep(Duration::from_secs(3)).await; + + rx.close(); + tb.exit(Duration::from_secs(TESTBED_DEADLINE_SEC)).await; + while let Some(ev) = rx.recv().await { + let WorkerEvents::Shutdown(ev) = ev.event else { + continue; + }; + if ev.reason != ShutdownReason::EarlyDrop { + break; + } + return; + } + + unreachable!("test failed"); +} + #[derive(Deserialize)] struct ErrorResponsePayload { msg: String, diff --git a/ext/workers/context.rs b/ext/workers/context.rs index 75f9ad2a0..1e2ef16db 100644 --- a/ext/workers/context.rs +++ b/ext/workers/context.rs @@ -138,6 +138,7 @@ impl Default for UserWorkerRuntimeOpts { #[derive(Debug, Clone)] pub struct UserWorkerProfile { pub worker_request_msg_tx: mpsc::UnboundedSender, + pub early_drop_tx: mpsc::UnboundedSender>, pub timing_tx_pair: ( mpsc::UnboundedSender>, mpsc::UnboundedSender<()>, @@ -226,6 +227,7 @@ pub struct TimingStatus { #[derive(Debug)] pub struct Timing { + pub early_drop_rx: mpsc::UnboundedReceiver>, pub status: TimingStatus, pub req: ( mpsc::UnboundedReceiver>, @@ -235,10 +237,12 @@ pub struct Timing { impl Default for Timing { fn default() -> Self { + let (_, dumb_early_drop_rx) = unbounded_channel(); let (_, dumb_start_rx) = unbounded_channel::>(); let (_, dumb_end_rx) = unbounded_channel::<()>(); Self { + early_drop_rx: dumb_early_drop_rx, status: TimingStatus::default(), req: (dumb_start_rx, dumb_end_rx), } @@ -277,6 +281,7 @@ pub enum UserWorkerMsgs { ), Idle(Uuid), Shutdown(Uuid), + TryCleanupIdleWorkers(usize, oneshot::Sender), } pub type SendRequestResult = (Response, mpsc::UnboundedSender<()>); diff --git a/ext/workers/lib.rs b/ext/workers/lib.rs index 8bb258f41..eb098f9ac 100644 --- a/ext/workers/lib.rs +++ b/ext/workers/lib.rs @@ -69,6 +69,7 @@ deno_core::extension!( op_user_worker_create, op_user_worker_fetch_build, op_user_worker_fetch_send, + op_user_worker_cleanup_idle_workers, ], esm_entry_point = "ext:user_workers/user_workers.js", esm = ["user_workers.js",] @@ -631,6 +632,30 @@ pub async fn op_user_worker_fetch_send( Ok(response) } +#[op2(async)] +#[number] +pub async fn op_user_worker_cleanup_idle_workers( + state: Rc>, + #[number] timeout_ms: usize, +) -> usize { + let msg_tx = { + state + .borrow() + .borrow::>() + .clone() + }; + + let (tx, rx) = oneshot::channel(); + if msg_tx + .send(UserWorkerMsgs::TryCleanupIdleWorkers(timeout_ms, tx)) + .is_err() + { + return 0; + } + + (rx.await).unwrap_or_default() +} + /// Wraps a [`mpsc::Receiver`] in a [`Stream`] that can be used as a Hyper [`Body`]. pub struct BodyStream(pub mpsc::Receiver>); diff --git a/ext/workers/user_workers.js b/ext/workers/user_workers.js index 7a95f2a3b..91525ce51 100644 --- a/ext/workers/user_workers.js +++ b/ext/workers/user_workers.js @@ -12,6 +12,7 @@ const { TypeError } = primordials; const { op_user_worker_fetch_send, op_user_worker_create, + op_user_worker_cleanup_idle_workers, } = ops; const NO_SUPABASE_TAG_WARN_MSG = @@ -144,6 +145,10 @@ class UserWorker { return new UserWorker(key); } + + static async tryCleanupIdleWorkers(timeoutMs) { + return await op_user_worker_cleanup_idle_workers(timeoutMs); + } } const SUPABASE_USER_WORKERS = UserWorker; diff --git a/types/global.d.ts b/types/global.d.ts index 853d139d4..798a411b5 100644 --- a/types/global.d.ts +++ b/types/global.d.ts @@ -121,7 +121,9 @@ declare namespace EdgeRuntime { request: Request, options?: UserWorkerFetchOptions, ): Promise; + static create(opts: UserWorkerCreateOptions): Promise; + static tryCleanupIdleWorkers(timeoutMs: number): Promise; } export function scheduleTermination(): void;