diff --git a/migrations/0004_separate_activation_and_metadata.sql b/migrations/0004_separate_activation_and_metadata.sql new file mode 100644 index 00000000..c1bf5eec --- /dev/null +++ b/migrations/0004_separate_activation_and_metadata.sql @@ -0,0 +1,23 @@ +CREATE TABLE IF NOT EXISTS activation_blobs ( + id TEXT NOT NULL PRIMARY KEY, + activation BLOB NOT NULL +) STRICT; + +CREATE TABLE IF NOT EXISTS activation_metadata ( + id TEXT NOT NULL PRIMARY KEY, -- Can be joined with activation_blobs.id + namespace TEXT, + taskname TEXT NOT NULL DEFAULT "", + status TEXT NOT NULL, + received_at INTEGER NOT NULL DEFAULT 0, + added_at INTEGER NOT NULL, + processing_attempts INTEGER NOT NULL, + processing_deadline_duration INTEGER NOT NULL, + processing_deadline INTEGER, + at_most_once INTEGER NOT NULL DEFAULT 0, + on_attempts_exceeded INTEGER NOT NULL DEFAULT 1, -- 1 is Discard + expires_at INTEGER, + delay_until INTEGER +) STRICT; + +CREATE INDEX idx_metadata_status +ON activation_metadata (status, added_at, namespace, id); diff --git a/src/main.rs b/src/main.rs index b7fb2161..8647d69b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -87,6 +87,10 @@ async fn main() -> Result<(), Error> { Err(err) => error!("Failed to run full vacuum on startup: {:?}", err), } } + info!("Loading metadata state from sqlite"); + store.load_metadata().await?; + info!("Loading metadata complete"); + // Get startup time after migrations and vacuum let startup_time = Utc::now(); @@ -236,5 +240,9 @@ async fn main() -> Result<(), Error> { .on_completion(log_task_completion("maintenance_task", maintenance_task)) .await; + info!("Flushing metadata to sqlite"); + store.flush_metadata().await?; + info!("Shutdown complete"); + Ok(()) } diff --git a/src/store/inflight_activation.rs b/src/store/inflight_activation.rs index 56cef8b8..a3af7b84 100644 --- a/src/store/inflight_activation.rs +++ b/src/store/inflight_activation.rs @@ -12,9 +12,12 @@ use sqlx::{ SqliteRow, SqliteSynchronous, }, }; -use tracing::instrument; +use tracing::{error, instrument}; use crate::config::Config; +use crate::store::records::{ActivationBlob, ActivationMetadata}; + +use super::metadata_store::MetadataStore; /// The members of this enum should be synced with the members /// of InflightActivationStatus in sentry_protos @@ -120,6 +123,28 @@ impl InflightActivation { .num_milliseconds() }) } + + /// Create an InflightActivation from metadata and the activation blob + pub fn from_metadata_blob(metadata: ActivationMetadata, activation: &[u8]) -> Self { + Self { + id: metadata.id, + activation: activation.to_vec(), + status: metadata.status, + partition: 0, + offset: 0, + added_at: metadata.added_at, + received_at: metadata.received_at, + processing_attempts: metadata.processing_attempts, + processing_deadline_duration: metadata.processing_deadline_duration, + processing_deadline: metadata.processing_deadline, + expires_at: metadata.expires_at, + delay_until: metadata.delay_until, + at_most_once: metadata.at_most_once, + namespace: metadata.namespace, + taskname: metadata.taskname, + on_attempts_exceeded: metadata.on_attempts_exceeded, + } + } } #[derive(Clone, Copy, Debug)] @@ -136,7 +161,7 @@ impl From for QueryResult { } pub struct FailedTasksForwarder { - pub to_discard: Vec<(String, Vec)>, + pub to_discard: Vec, pub to_deadletter: Vec<(String, Vec)>, } @@ -259,6 +284,7 @@ pub struct InflightActivationStore { read_pool: SqlitePool, write_pool: SqlitePool, config: InflightActivationStoreConfig, + metadata_store: tokio::sync::Mutex, } impl InflightActivationStore { @@ -266,11 +292,13 @@ impl InflightActivationStore { let (read_pool, write_pool) = create_sqlite_pool(url).await?; sqlx::migrate!("./migrations").run(&write_pool).await?; + let metadata_store = tokio::sync::Mutex::new(MetadataStore::new()); Ok(Self { read_pool, write_pool, config, + metadata_store, }) } @@ -320,25 +348,14 @@ impl InflightActivationStore { /// Get an activation by id. Primarily used for testing pub async fn get_by_id(&self, id: &str) -> Result, Error> { - let row_result: Option = sqlx::query_as( + let meta_result = self.metadata_store.lock().await.get_by_id(id); + let Some(meta_result) = meta_result else { + return Ok(None); + }; + let blob_result: Option = sqlx::query_as( " - SELECT id, - activation, - partition, - offset, - added_at, - received_at, - processing_attempts, - expires_at, - delay_until, - processing_deadline_duration, - processing_deadline, - status, - at_most_once, - namespace, - taskname, - on_attempts_exceeded - FROM inflight_taskactivations + SELECT id, activation + FROM activation_blobs WHERE id = $1 ", ) @@ -346,11 +363,35 @@ impl InflightActivationStore { .fetch_optional(&self.read_pool) .await?; - let Some(row) = row_result else { + let Some(blob_result) = blob_result else { return Ok(None); }; + Ok(Some(InflightActivation::from_metadata_blob( + meta_result, + &blob_result.activation, + ))) + } + + /// Load state into the metadata_store from sqlite. + /// Used during application startup to rebuild in-memory state. + #[instrument(skip_all)] + pub async fn load_metadata(&self) -> Result<(), Error> { + self.metadata_store + .lock() + .await + .load_from_sqlite(&self.read_pool) + .await + } - Ok(Some(row.into())) + /// Flush any pending state in metadata_store into sqlite + pub async fn flush_metadata(&self) -> Result<(), Error> { + let atomic = self.write_pool.begin().await?; + let res = self.metadata_store.lock().await.commit(atomic).await; + + match res { + Ok(_) => Ok(()), + Err(err) => Err(err.into()), + } } #[instrument(skip_all)] @@ -358,60 +399,32 @@ impl InflightActivationStore { if batch.is_empty() { return Ok(QueryResult { rows_affected: 0 }); } - let mut query_builder = QueryBuilder::::new( - " - INSERT INTO inflight_taskactivations - ( - id, - activation, - partition, - offset, - added_at, - received_at, - processing_attempts, - expires_at, - delay_until, - processing_deadline_duration, - processing_deadline, - status, - at_most_once, - namespace, - taskname, - on_attempts_exceeded - ) - ", - ); - let rows = batch - .into_iter() - .map(TableRow::try_from) - .collect::, _>>()?; + + let metadata_batch = batch + .iter() + .map(ActivationMetadata::try_from) + .collect::, _>>()?; + + let mut atomic = self.write_pool.begin().await?; + + // Insert into the blob store and metadata + let mut query_builder = + QueryBuilder::::new("INSERT INTO activation_blobs (id, activation) "); let query = query_builder - .push_values(rows, |mut b, row| { + .push_values(batch.clone(), |mut b, row| { b.push_bind(row.id); b.push_bind(row.activation); - b.push_bind(row.partition); - b.push_bind(row.offset); - b.push_bind(row.added_at.timestamp()); - b.push_bind(row.received_at.timestamp()); - b.push_bind(row.processing_attempts); - b.push_bind(row.expires_at.map(|t| Some(t.timestamp()))); - b.push_bind(row.delay_until.map(|t| Some(t.timestamp()))); - b.push_bind(row.processing_deadline_duration); - if let Some(deadline) = row.processing_deadline { - b.push_bind(deadline.timestamp()); - } else { - // Add a literal null - b.push("null"); - } - b.push_bind(row.status); - b.push_bind(row.at_most_once); - b.push_bind(row.namespace); - b.push_bind(row.taskname); - b.push_bind(row.on_attempts_exceeded as i32); }) .push(" ON CONFLICT(id) DO NOTHING") .build(); - let meta_result = Ok(query.execute(&self.write_pool).await?.into()); + let blob_result = query.execute(&mut *atomic).await?; + + { + // append metadata to memory store and flush to sqlite. + let mut guard = self.metadata_store.lock().await; + guard.upsert_batch(metadata_batch)?; + guard.commit(atomic).await?; + } // Sync the WAL into the main database so we don't lose data on host failure. let checkpoint_timer = Instant::now(); @@ -432,7 +445,7 @@ impl InflightActivationStore { } metrics::histogram!("store.checkpoint.duration").record(checkpoint_timer.elapsed()); - meta_result + Ok(blob_result.into()) } #[instrument(skip_all)] @@ -442,41 +455,32 @@ impl InflightActivationStore { ) -> Result, Error> { let now = Utc::now(); - let grace_period = self.config.processing_deadline_grace_sec; - let mut query_builder = QueryBuilder::new(format!( - "UPDATE inflight_taskactivations - SET - processing_deadline = unixepoch( - 'now', '+' || (processing_deadline_duration + {grace_period}) || ' seconds' - ), - status = " - )); - query_builder.push_bind(InflightActivationStatus::Processing); - query_builder.push( - " - WHERE id = ( - SELECT id - FROM inflight_taskactivations - WHERE status = ", + let metadata_res = self.metadata_store.lock().await.get_pending_activation( + namespace, + now, + self.config.processing_deadline_grace_sec, + self.config.max_processing_attempts as i32, ); - query_builder.push_bind(InflightActivationStatus::Pending); - query_builder.push(" AND (expires_at IS NULL OR expires_at > "); - query_builder.push_bind(now.timestamp()); - query_builder.push(")"); - - if let Some(namespace) = namespace { - query_builder.push(" AND namespace = "); - query_builder.push_bind(namespace); + + if metadata_res.is_err() { + error!("Could not get pending activation from metadata store: {metadata_res:?}"); + return Ok(None); } - query_builder.push(" ORDER BY added_at LIMIT 1) RETURNING *"); + let metadata_opt = metadata_res.unwrap(); + let Some(metadata) = metadata_opt else { + return Ok(None); + }; - let result: Option = query_builder - .build_query_as::() - .fetch_optional(&self.write_pool) + // Fetch the activation from blob store + let result = sqlx::query("SELECT activation FROM activation_blobs WHERE id = $1") + .bind(metadata.id.clone()) + .fetch_one(&self.read_pool) .await?; - let Some(row) = result else { return Ok(None) }; - Ok(Some(row.into())) + Ok(Some(InflightActivation::from_metadata_blob( + metadata, + result.get::<&[u8], _>("activation"), + ))) } /// Get the age of the oldest pending activation in seconds. @@ -485,33 +489,10 @@ impl InflightActivationStore { /// Tasks with delay_until set, will have their age adjusted based on their /// delay time. No tasks = 0 lag pub async fn pending_activation_max_lag(&self, now: &DateTime) -> f64 { - let result = sqlx::query( - "SELECT received_at, delay_until - FROM inflight_taskactivations - WHERE status = $1 - AND processing_attempts = 0 - ORDER BY received_at ASC - LIMIT 1 - ", - ) - .bind(InflightActivationStatus::Pending) - .fetch_one(&self.read_pool) - .await; - - if let Ok(row) = result { - let received_at: DateTime = row.get("received_at"); - let delay_until: Option> = row.get("delay_until"); - let millis = now.signed_duration_since(received_at).num_milliseconds() - - delay_until.map_or(0, |delay_time| { - delay_time - .signed_duration_since(received_at) - .num_milliseconds() - }); - millis as f64 / 1000.0 - } else { - // If we couldn't find a row, there is no latency. - 0.0 - } + self.metadata_store + .lock() + .await + .pending_activation_max_lag(now) } #[instrument(skip_all)] @@ -522,19 +503,14 @@ impl InflightActivationStore { #[instrument(skip_all)] pub async fn count_by_status(&self, status: InflightActivationStatus) -> Result { - let result = - sqlx::query("SELECT COUNT(*) as count FROM inflight_taskactivations WHERE status = $1") - .bind(status) - .fetch_one(&self.read_pool) - .await?; - Ok(result.get::("count") as usize) + let count = self.metadata_store.lock().await.count_by_status(status); + + Ok(count) } pub async fn count(&self) -> Result { - let result = sqlx::query("SELECT COUNT(*) as count FROM inflight_taskactivations") - .fetch_one(&self.read_pool) - .await?; - Ok(result.get::("count") as usize) + let count = self.metadata_store.lock().await.count_all(); + Ok(count) } /// Update the status of a specific activation @@ -544,19 +520,37 @@ impl InflightActivationStore { id: &str, status: InflightActivationStatus, ) -> Result, Error> { - let result: Option = sqlx::query_as( - "UPDATE inflight_taskactivations SET status = $1 WHERE id = $2 RETURNING *", - ) - .bind(status) - .bind(id) - .fetch_optional(&self.write_pool) - .await?; + // Update metadata store + let metadata = { + let mut guard = self.metadata_store.lock().await; + let update_res = guard.set_status(id, status); + if update_res.is_err() { + println!("update res {id:?} {status:?}, {update_res:?}"); + error!("Could not update metadata store for id {id} got {update_res:?}"); + return Ok(None); + } - let Some(row) = result else { - return Ok(None); + guard.get_by_id(id) }; - Ok(Some(row.into())) + if metadata.is_none() { + error!("Could not update metadata store for id {id} got {metadata:?}"); + return Ok(None); + } + // TODO should we have immediate durability here? If so, we should commit to sqlite first, + // and then the metadata store to be consistent with other operations. + let metadata = metadata.unwrap(); + + // Fetch the activation from blob store + let result = sqlx::query("SELECT activation FROM activation_blobs WHERE id = $1") + .bind(id) + .fetch_one(&self.read_pool) + .await?; + + Ok(Some(InflightActivation::from_metadata_blob( + metadata, + result.get::<&[u8], _>("activation"), + ))) } #[instrument(skip_all)] @@ -565,17 +559,20 @@ impl InflightActivationStore { id: &str, deadline: Option>, ) -> Result<(), Error> { - sqlx::query("UPDATE inflight_taskactivations SET processing_deadline = $1 WHERE id = $2") - .bind(deadline.unwrap().timestamp()) - .bind(id) - .execute(&self.write_pool) - .await?; - Ok(()) + self.metadata_store + .lock() + .await + .set_processing_deadline(id, deadline) } #[instrument(skip_all)] pub async fn delete_activation(&self, id: &str) -> Result<(), Error> { - sqlx::query("DELETE FROM inflight_taskactivations WHERE id = $1") + self.metadata_store.lock().await.delete(id); + sqlx::query("DELETE FROM activation_blobs WHERE id = $1") + .bind(id) + .execute(&self.write_pool) + .await?; + sqlx::query("DELETE FROM activation_metadata WHERE id = $1") .bind(id) .execute(&self.write_pool) .await?; @@ -584,40 +581,48 @@ impl InflightActivationStore { #[instrument(skip_all)] pub async fn get_retry_activations(&self) -> Result, Error> { - Ok(sqlx::query_as( - " - SELECT id, - activation, - partition, - offset, - added_at, - received_at, - processing_attempts, - expires_at, - delay_until, - processing_deadline_duration, - processing_deadline, - status, - at_most_once, - namespace, - taskname, - on_attempts_exceeded - FROM inflight_taskactivations - WHERE status = $1 - ", - ) - .bind(InflightActivationStatus::Retry) - .fetch_all(&self.read_pool) - .await? - .into_iter() - .map(|row: TableRow| row.into()) - .collect()) + let retry_metadata = self.metadata_store.lock().await.get_retry_entry_map(); + + let mut query_builder = + QueryBuilder::new("SELECT id, activation FROM activation_blobs WHERE id IN ("); + let mut separated = query_builder.separated(", "); + for (id, _meta) in retry_metadata.iter() { + separated.push_bind(id); + } + separated.push_unseparated(")"); + let rows: Vec = query_builder + .build() + .fetch_all(&self.read_pool) + .await? + .into_iter() + .collect(); + + let retry_activations = rows + .iter() + .filter_map(|row| { + let id: String = row.get("id"); + let activation: Vec = row.get("activation"); + retry_metadata + .get(&id) + .map(|meta| InflightActivation::from_metadata_blob(meta.clone(), &activation)) + }) + .collect(); + + Ok(retry_activations) } pub async fn clear(&self) -> Result<(), Error> { - sqlx::query("DELETE FROM inflight_taskactivations") - .execute(&self.write_pool) + let mut atomic = self.write_pool.begin().await?; + sqlx::query("DELETE FROM activation_blobs") + .execute(&mut *atomic) .await?; + atomic.commit().await?; + + // TODO could this be done inside the sql transaction or will it deadlock? + // Proving it is safe, is much harder than it is to give up a tiny + // bit of consistency and make metadata_store just heal itself. + self.metadata_store.lock().await.clear(); + Ok(()) } @@ -626,70 +631,24 @@ impl InflightActivationStore { /// if a worker took the task and was killed, or failed. #[instrument(skip_all)] pub async fn handle_processing_deadline(&self) -> Result { - let now = Utc::now(); - let mut atomic = self.write_pool.begin().await?; - - // Idempotent tasks that fail their processing deadlines go directly to failure - // there are no retries, as the worker will reject the task due to idempotency keys. - let most_once_result = sqlx::query( - "UPDATE inflight_taskactivations - SET processing_deadline = null, status = $1 - WHERE processing_deadline < $2 AND at_most_once = TRUE AND status = $3", - ) - .bind(InflightActivationStatus::Failure) - .bind(now.timestamp()) - .bind(InflightActivationStatus::Processing) - .execute(&mut *atomic) - .await; - - let mut processing_deadline_modified_rows = 0; - if let Ok(query_res) = most_once_result { - processing_deadline_modified_rows = query_res.rows_affected(); - } - - // Update non-idempotent tasks. - // Increment processing_attempts by 1 and reset processing_deadline to null. - let result = sqlx::query( - "UPDATE inflight_taskactivations - SET processing_deadline = null, status = $1, processing_attempts = processing_attempts + 1 - WHERE processing_deadline < $2 AND status = $3", - ) - .bind(InflightActivationStatus::Pending) - .bind(now.timestamp()) - .bind(InflightActivationStatus::Processing) - .execute(&mut *atomic) - .await; - - atomic.commit().await?; - - if let Ok(query_res) = result { - processing_deadline_modified_rows += query_res.rows_affected(); - return Ok(processing_deadline_modified_rows); + if let Ok(updated) = self + .metadata_store + .lock() + .await + .handle_processing_deadline() + { + return Ok(updated); + } else { + return Err(anyhow!("Could not update tasks past processing_deadline")); } - - Err(anyhow!("Could not update tasks past processing_deadline")) } /// Update tasks that have exceeded their max processing attempts. /// These tasks are set to status=failure and will be handled by handle_failed_tasks accordingly. #[instrument(skip_all)] pub async fn handle_processing_attempts(&self) -> Result { - let processing_attempts_result = sqlx::query( - "UPDATE inflight_taskactivations - SET status = $1 - WHERE processing_attempts >= $2 AND status = $3", - ) - .bind(InflightActivationStatus::Failure) - .bind(self.config.max_processing_attempts as i32) - .bind(InflightActivationStatus::Pending) - .execute(&self.write_pool) - .await; - - if let Ok(query_res) = processing_attempts_result { - return Ok(query_res.rows_affected()); - } - - Err(anyhow!("Could not update tasks past processing_deadline")) + // TODO This is a no-op with metadata_store + Ok(0) } /// Perform upkeep work for tasks that are past expires_at deadlines @@ -700,16 +659,8 @@ impl InflightActivationStore { /// The number of impacted records is returned in a Result. #[instrument(skip_all)] pub async fn handle_expires_at(&self) -> Result { - let now = Utc::now(); - let query = sqlx::query( - "DELETE FROM inflight_taskactivations WHERE status = $1 AND expires_at IS NOT NULL AND expires_at < $2", - ) - .bind(InflightActivationStatus::Pending) - .bind(now.timestamp()) - .execute(&self.write_pool) - .await?; - - Ok(query.rows_affected()) + // TODO: This is a no-op with the metadata store. + Ok(0) } /// Perform upkeep work for tasks that are past delay_until deadlines @@ -720,20 +671,7 @@ impl InflightActivationStore { /// The number of impacted records is returned in a Result. #[instrument(skip_all)] pub async fn handle_delay_until(&self) -> Result { - let now = Utc::now(); - let update_result = sqlx::query( - r#"UPDATE inflight_taskactivations - SET status = $1 - WHERE delay_until IS NOT NULL AND delay_until < $2 AND status = $3 - "#, - ) - .bind(InflightActivationStatus::Pending) - .bind(now.timestamp()) - .bind(InflightActivationStatus::Delay) - .execute(&self.write_pool) - .await?; - - Ok(update_result.rows_affected()) + self.metadata_store.lock().await.handle_delay_until() } /// Perform upkeep work related to status=failure @@ -744,102 +682,124 @@ impl InflightActivationStore { /// complete. #[instrument(skip_all)] pub async fn handle_failed_tasks(&self) -> Result { - let mut atomic = self.write_pool.begin().await?; - - let failed_tasks: Vec = - sqlx::query("SELECT id, activation, on_attempts_exceeded FROM inflight_taskactivations WHERE status = $1") - .bind(InflightActivationStatus::Failure) - .fetch_all(&mut *atomic) - .await? - .into_iter() - .collect(); + // Get the forwarder state from metadata. The metadata will also do internal cleanup. + let failed_state = self.metadata_store.lock().await.handle_failed_tasks(); let mut forwarder = FailedTasksForwarder { - to_discard: vec![], + to_discard: failed_state.to_discard, to_deadletter: vec![], }; - for record in failed_tasks.iter() { - let activation_data: &[u8] = record.get("activation"); - let id: String = record.get("id"); - // We could be deadlettering because of activation.expires - // when a task expires we still deadletter if configured. - let on_attempts_exceeded_val: i32 = record.get("on_attempts_exceeded"); - let on_attempts_exceeded: OnAttemptsExceeded = - on_attempts_exceeded_val.try_into().unwrap(); - if on_attempts_exceeded == OnAttemptsExceeded::Discard - || on_attempts_exceeded == OnAttemptsExceeded::Unspecified - { - forwarder.to_discard.push((id, activation_data.to_vec())) - } else if on_attempts_exceeded == OnAttemptsExceeded::Deadletter { - forwarder.to_deadletter.push((id, activation_data.to_vec())) - } + let mut query_builder = + QueryBuilder::new("SELECT id, activation FROM activation_blobs WHERE id IN ("); + let mut separated = query_builder.separated(", "); + for id in failed_state.to_deadletter.iter() { + separated.push_bind(id); } + separated.push_unseparated(")"); + let rows: Vec = query_builder + .build() + .fetch_all(&self.read_pool) + .await? + .into_iter() + .collect(); - if !forwarder.to_discard.is_empty() { - let mut query_builder = QueryBuilder::new("UPDATE inflight_taskactivations "); - query_builder - .push("SET status = ") - .push_bind(InflightActivationStatus::Complete) - .push(" WHERE id IN ("); - - let mut separated = query_builder.separated(", "); - for (id, _body) in forwarder.to_discard.iter() { - separated.push_bind(id); - } - separated.push_unseparated(")"); - - query_builder.build().execute(&mut *atomic).await?; + for row in rows { + forwarder.to_deadletter.push(( + row.get::("id"), + row.get::, _>("activation"), + )); } - atomic.commit().await?; - Ok(forwarder) } /// Mark a collection of tasks as complete by id #[instrument(skip_all)] pub async fn mark_completed(&self, ids: Vec) -> Result { - let mut query_builder = QueryBuilder::new("UPDATE inflight_taskactivations "); - query_builder - .push("SET status = ") - .push_bind(InflightActivationStatus::Complete) - .push(" WHERE id IN ("); + let updated_count = self.metadata_store.lock().await.mark_completed(ids); - let mut separated = query_builder.separated(", "); - for id in ids.iter() { - separated.push_bind(id); - } - separated.push_unseparated(")"); - let result = query_builder.build().execute(&self.write_pool).await?; - - Ok(result.rows_affected()) + Ok(updated_count) } /// Remove completed tasks. /// This method is a garbage collector for the inflight task store. #[instrument(skip_all)] pub async fn remove_completed(&self) -> Result { - let query = sqlx::query("DELETE FROM inflight_taskactivations WHERE status = $1") + // Doing this in the sqlite transaction could deadlock. + let completed_ids = self.metadata_store.lock().await.remove_completed(); + + let mut atomic = self.write_pool.begin().await?; + // Remove legacy data. + sqlx::query("DELETE FROM inflight_taskactivations WHERE status = $1") .bind(InflightActivationStatus::Complete) - .execute(&self.write_pool) + .execute(&mut *atomic) .await?; - Ok(query.rows_affected()) + // Remove blobs and metadata + let mut query_builder = + QueryBuilder::::new("DELETE FROM activation_blobs WHERE id IN ("); + let mut separated = query_builder.separated(", "); + for id in completed_ids.iter() { + separated.push_bind(id); + } + separated.push_unseparated(")"); + query_builder.build().execute(&mut *atomic).await?; + + let mut query_builder = + QueryBuilder::::new("DELETE FROM activation_metadata WHERE id IN ("); + let mut separated = query_builder.separated(", "); + for id in completed_ids.iter() { + separated.push_bind(id); + } + separated.push_unseparated(")"); + let query_res = query_builder.build().execute(&mut *atomic).await?; + atomic.commit().await?; + + Ok(query_res.rows_affected()) } /// Remove killswitched tasks. #[instrument(skip_all)] pub async fn remove_killswitched(&self, killswitched_tasks: Vec) -> Result { + let ids = self + .metadata_store + .lock() + .await + .remove_killswitched(killswitched_tasks); + + // Remove blobs and metadata + let mut atomic = self.write_pool.begin().await?; let mut query_builder = - QueryBuilder::new("DELETE FROM inflight_taskactivations WHERE taskname IN ("); + QueryBuilder::::new("DELETE FROM activation_blobs WHERE id IN ("); let mut separated = query_builder.separated(", "); - for taskname in killswitched_tasks.iter() { - separated.push_bind(taskname); + for id in ids.iter() { + separated.push_bind(id); } separated.push_unseparated(")"); - let query = query_builder.build().execute(&self.write_pool).await?; + query_builder.build().execute(&mut *atomic).await?; - Ok(query.rows_affected()) + let mut query_builder = + QueryBuilder::::new("DELETE FROM activation_metadata WHERE id IN ("); + let mut separated = query_builder.separated(", "); + for id in ids.iter() { + separated.push_bind(id); + } + separated.push_unseparated(")"); + let query_res = query_builder.build().execute(&mut *atomic).await?; + atomic.commit().await?; + + Ok(query_res.rows_affected()) + } + + /// Flush any dirty metadata records to sqlite. + pub async fn flush_dirty(&self) { + let atomic = self.write_pool.begin().await.unwrap(); + self.metadata_store + .lock() + .await + .commit(atomic) + .await + .unwrap(); } } diff --git a/src/store/inflight_activation_tests.rs b/src/store/inflight_activation_tests.rs index 10a731d4..c4d591cc 100644 --- a/src/store/inflight_activation_tests.rs +++ b/src/store/inflight_activation_tests.rs @@ -79,7 +79,8 @@ async fn test_store() { let store = create_test_store().await; let batch = make_activations(2); - assert!(store.store(batch).await.is_ok()); + let res = store.store(batch).await; + assert!(res.is_ok()); let result = store.count().await; assert_eq!(result.unwrap(), 2); @@ -219,7 +220,7 @@ async fn test_get_pending_activation_skip_expires() { assert_counts( StatusCount { - pending: 1, + failure: 1, ..StatusCount::default() }, &store, @@ -356,9 +357,11 @@ async fn test_set_processing_deadline() { let store = create_test_store().await; let batch = make_activations(1); - assert!(store.store(batch.clone()).await.is_ok()); + let res = store.store(batch.clone()).await; + + assert!(res.is_ok()); - let deadline = Utc::now(); + let deadline = Utc::now() + chrono::Duration::seconds(5); assert!( store .set_processing_deadline("id_0", Some(deadline)) @@ -367,13 +370,16 @@ async fn test_set_processing_deadline() { ); let result = store.get_by_id("id_0").await.unwrap().unwrap(); - assert_eq!( + + // Assert within a second to avoid subsecond flakiness + assert!( result .processing_deadline .unwrap() .round_subsecs(0) - .timestamp(), - deadline.timestamp() + .timestamp() + - deadline.timestamp() + <= 1 ) } @@ -694,48 +700,6 @@ async fn test_handle_processing_deadline_no_retries_remaining() { .await; } -#[tokio::test] -async fn test_processing_attempts_exceeded() { - let config = create_integration_config(); - let store = create_test_store().await; - - let mut batch = make_activations(3); - batch[0].status = InflightActivationStatus::Pending; - batch[0].processing_deadline = Some(Utc.with_ymd_and_hms(2024, 11, 14, 21, 22, 23).unwrap()); - batch[0].processing_attempts = config.max_processing_attempts as i32; - - batch[1].status = InflightActivationStatus::Complete; - batch[1].added_at += Duration::from_secs(1); - - batch[2].status = InflightActivationStatus::Pending; - batch[2].processing_deadline = Some(Utc.with_ymd_and_hms(2024, 11, 14, 21, 22, 23).unwrap()); - batch[2].processing_attempts = config.max_processing_attempts as i32; - - assert!(store.store(batch.clone()).await.is_ok()); - assert_counts( - StatusCount { - complete: 1, - pending: 2, - ..StatusCount::default() - }, - &store, - ) - .await; - - let count = store.handle_processing_attempts().await; - assert!(count.is_ok()); - assert_eq!(count.unwrap(), 2); - assert_counts( - StatusCount { - complete: 1, - failure: 2, - ..StatusCount::default() - }, - &store, - ) - .await; -} - #[tokio::test] async fn test_remove_completed() { let store = create_test_store().await; @@ -982,39 +946,6 @@ async fn test_mark_completed() { .await; } -#[tokio::test] -async fn test_handle_expires_at() { - let store = create_test_store().await; - let mut batch = make_activations(3); - - // All expired tasks should be removed, regardless of order or other tasks. - batch[0].expires_at = Some(Utc::now() - (Duration::from_secs(5 * 60))); - batch[1].expires_at = Some(Utc::now() + (Duration::from_secs(5 * 60))); - batch[2].expires_at = Some(Utc::now() - (Duration::from_secs(5 * 60))); - - assert!(store.store(batch.clone()).await.is_ok()); - assert_counts( - StatusCount { - pending: 3, - ..StatusCount::default() - }, - &store, - ) - .await; - - let result = store.handle_expires_at().await; - assert!(result.is_ok()); - assert_eq!(result.unwrap(), 2); - assert_counts( - StatusCount { - pending: 1, - ..StatusCount::default() - }, - &store, - ) - .await; -} - #[tokio::test] async fn test_remove_killswitched() { let store = create_test_store().await; @@ -1160,7 +1091,7 @@ async fn test_db_size() { assert!(first_size > 0, "should have some bytes"); // Generate a large enough batch that we use another page. - let batch = make_activations(50); + let batch = make_activations(100); assert!(store.store(batch).await.is_ok()); let second_size = store.db_size().await.unwrap(); @@ -1209,7 +1140,7 @@ async fn test_pending_activation_max_lag_ignore_processing_attempts() { assert!(store.store(pending).await.is_ok()); let result = store.pending_activation_max_lag(&now).await; - assert!(10.00 < result); + assert!(10.00 <= result); assert!(result < 11.00); } @@ -1223,8 +1154,12 @@ async fn test_pending_activation_max_lag_account_for_delayed() { // the lag of a delayed task should begin *after* the delay has passed. pending[0].received_at = now - Duration::from_secs(520); pending[0].delay_until = Some(now - Duration::from_millis(22020)); + pending[0].status = InflightActivationStatus::Delay; assert!(store.store(pending).await.is_ok()); + // Advance state on delayed tasks. + store.handle_delay_until().await.unwrap(); + let result = store.pending_activation_max_lag(&now).await; assert!(22.00 < result, "result: {result}"); assert!(result < 23.00, "result: {result}"); diff --git a/src/store/metadata_store.rs b/src/store/metadata_store.rs new file mode 100644 index 00000000..dae99c3d --- /dev/null +++ b/src/store/metadata_store.rs @@ -0,0 +1,703 @@ +use std::{ + cmp::Reverse, + collections::{BTreeMap, BTreeSet, BinaryHeap, HashMap}, +}; + +use chrono::{DateTime, Duration, Utc}; +use sentry::capture_message; +use sentry_protos::taskbroker::v1::OnAttemptsExceeded; +use sqlx::{QueryBuilder, Sqlite, SqlitePool, Transaction, sqlite::SqliteQueryResult}; + +use crate::store::inflight_activation::InflightActivationStatus; +use crate::store::records::{ActivationMetadata, TimestampEntry}; + +/// Metadata result about failed tasks. +pub struct FailedState { + /// Ids of tasks that were discarded. + pub to_discard: Vec, + /// Ids of tasks that were sent to deadletter. + pub to_deadletter: Vec, +} + +/// The in-memory metadata store for activations. +/// This store is kept in memory, and is used to track runtime +/// state of activations. +/// +/// The upkeep loop is responsible for keeping this store synced to +/// sqlite. As records are moved through the state machine, they are +/// added to a 'dirty' set. When the [`Self::flush()`] is called, all +/// records in the dirty list are flushed to sqlite. +/// +/// This structure is typically wrapped in a Mutex +/// to enable safe concurrent access. +#[derive(Clone, Debug)] +pub struct MetadataStore { + /// The key/value of records. + records: BTreeMap, + + /// Heap of records that are delayed ordered by due deadline. This is a min-heap. + delayed: BinaryHeap>, + + /// Heap of records that are pending ordered by added datetime. This is a min-heap. + pending: BinaryHeap>, + + /// Heap of records that are processding ordered by completion deadline. This is a min-heap. + processing: BinaryHeap>, + + /// Retry status + retry: BTreeSet, + + /// Complete status + complete: BTreeSet, + + /// Failed status + failed: BTreeSet, + + /// Ids of records that have been mutated, and need to be flushed to storage + dirty_ids: BTreeSet, +} + +impl Default for MetadataStore { + fn default() -> Self { + Self::new() + } +} + +impl MetadataStore { + /// Create a new metadata store + pub fn new() -> Self { + Self { + records: BTreeMap::new(), + delayed: BinaryHeap::new(), + pending: BinaryHeap::new(), + processing: BinaryHeap::new(), + retry: BTreeSet::new(), + complete: BTreeSet::new(), + failed: BTreeSet::new(), + dirty_ids: BTreeSet::new(), + } + } + + /// Load metadata state from sqlite into memory. + pub async fn load_from_sqlite(&mut self, connection: &SqlitePool) -> anyhow::Result<()> { + let load_query: Vec = + sqlx::query_as("SELECT * FROM activation_metadata") + .fetch_all(connection) + .await? + .into_iter() + .collect(); + for row in load_query.into_iter() { + self.upsert(row)?; + } + Ok(()) + } + + /// Insert or update an activation metadata record + pub fn upsert(&mut self, metadata: ActivationMetadata) -> anyhow::Result<()> { + // File by status + match metadata.status { + InflightActivationStatus::Pending => { + self.pending.push(Reverse(metadata.pending_entry())) + } + InflightActivationStatus::Processing => { + self.processing.push(Reverse(metadata.processing_entry())) + } + InflightActivationStatus::Delay => self.delayed.push(Reverse(metadata.delayed_entry())), + InflightActivationStatus::Retry => { + self.retry.insert(metadata.id.clone()); + } + InflightActivationStatus::Failure => { + self.failed.insert(metadata.id.clone()); + } + InflightActivationStatus::Complete => { + self.complete.insert(metadata.id.clone()); + } + InflightActivationStatus::Unspecified => { + capture_message( + "Upsert called with unspecified status", + sentry::Level::Error, + ); + } + } + self.dirty_ids.insert(metadata.id.clone()); + self.records.insert(metadata.id.clone(), metadata); + + Ok(()) + } + + pub fn upsert_batch(&mut self, records: Vec) -> anyhow::Result<()> { + for record in records { + self.upsert(record)?; + } + Ok(()) + } + + /// Add all dirty records to the provided sqlite connection. + /// The connection can be part of a transaction guard. + pub async fn commit( + &mut self, + mut atomic: Transaction<'static, Sqlite>, + ) -> Result { + if self.dirty_ids.is_empty() { + return Ok(SqliteQueryResult::default()); + } + let mut query_builder = QueryBuilder::::new( + "INSERT INTO activation_metadata ( + id, + namespace, + taskname, + status, + received_at, + added_at, + processing_attempts, + processing_deadline_duration, + processing_deadline, + at_most_once, + on_attempts_exceeded, + expires_at, + delay_until + ) ", + ); + let query_builder = query_builder + .push_values(self.dirty_ids.iter(), |mut b, row| { + let row = self + .records + .get(row) + .expect("Dirty id not found in records"); + b.push_bind(row.id.clone()); + b.push_bind(row.namespace.clone()); + b.push_bind(row.taskname.clone()); + b.push_bind(row.status); + b.push_bind(row.received_at.timestamp()); + b.push_bind(row.added_at.timestamp()); + b.push_bind(row.processing_attempts); + b.push_bind(row.processing_deadline_duration); + if let Some(deadline) = row.processing_deadline { + b.push_bind(deadline.timestamp()); + } else { + // Add a literal null + b.push("null"); + } + b.push_bind(row.at_most_once); + b.push_bind(row.on_attempts_exceeded as i32); + + b.push_bind(row.expires_at.map(|t| Some(t.timestamp()))); + b.push_bind(row.delay_until.map(|t| Some(t.timestamp()))); + }) + .push( + " ON CONFLICT(id) DO UPDATE SET + namespace=excluded.namespace, + taskname=excluded.taskname, + status=excluded.status, + received_at=excluded.received_at, + added_at=excluded.added_at, + processing_attempts=excluded.processing_attempts, + processing_deadline_duration=excluded.processing_deadline_duration, + processing_deadline=excluded.processing_deadline, + at_most_once=excluded.at_most_once, + on_attempts_exceeded=excluded.on_attempts_exceeded, + expires_at=excluded.expires_at, + delay_until=excluded.delay_until", + ); + let query = query_builder.build(); + metrics::counter!("metadata_store.commit.records").increment(self.dirty_ids.len() as u64); + + let res = query.execute(&mut *atomic).await; + atomic.commit().await.unwrap(); + self.clear_dirty(); + + res + } + + /// Advance the state of a record and move it between the status containers. + fn advance_state(&mut self, record: &ActivationMetadata, new_status: InflightActivationStatus) { + // Remove from existing status containers + match record.status { + InflightActivationStatus::Pending => { + self.pending.retain(|entry| entry.0.id != record.id); + } + InflightActivationStatus::Processing => { + self.processing.retain(|entry| entry.0.id != record.id); + } + InflightActivationStatus::Delay => { + self.delayed.retain(|entry| entry.0.id != record.id); + } + InflightActivationStatus::Retry => { + self.retry.remove(&record.id); + } + InflightActivationStatus::Failure => { + self.failed.remove(&record.id); + } + InflightActivationStatus::Complete => { + self.complete.remove(&record.id); + } + InflightActivationStatus::Unspecified => {} + } + + // Add to new status containers + match new_status { + InflightActivationStatus::Pending => { + self.pending.push(Reverse(record.pending_entry())); + } + InflightActivationStatus::Processing => { + self.processing.push(Reverse(record.processing_entry())); + } + InflightActivationStatus::Delay => { + self.delayed.push(Reverse(record.delayed_entry())); + } + InflightActivationStatus::Retry => { + self.retry.insert(record.id.clone()); + } + InflightActivationStatus::Failure => { + self.failed.insert(record.id.clone()); + } + InflightActivationStatus::Complete => { + self.complete.insert(record.id.clone()); + } + InflightActivationStatus::Unspecified => {} + } + } + + /// Mark a set of records by id as completed. + pub fn mark_completed(&mut self, ids: Vec) -> u64 { + let mut updated = 0; + for id in ids.iter() { + if let Some(record) = self.records.get(id) { + let mut record = record.clone(); + self.advance_state(&record, InflightActivationStatus::Complete); + record.processing_deadline = None; + record.status = InflightActivationStatus::Complete; + + self.complete.insert(id.clone()); + self.dirty_ids.insert(id.clone()); + self.records.insert(id.clone(), record); + updated += 1; + } else { + println!("Warning: stale heap data encountered for id: {id}"); + self.add_to_failed(id.clone()); + } + } + + updated + } + + /// Remove all completed records from the in-memory store. + /// Get a vec of ids that were removed. + pub fn remove_completed(&mut self) -> Vec { + let mut removed = vec![]; + for id in self.complete.iter() { + self.dirty_ids.remove(id); + self.records.remove(id); + + removed.push(id.clone()); + } + self.complete.clear(); + + removed + } + + /// Reset all in-memory state. + pub fn clear(&mut self) { + self.records.clear(); + self.delayed.clear(); + self.pending.clear(); + self.processing.clear(); + self.retry.clear(); + self.complete.clear(); + self.failed.clear(); + self.dirty_ids.clear(); + } + + /// Clear the dirty set after a successful commit. + pub fn clear_dirty(&mut self) { + self.dirty_ids.clear(); + } + + /// Remove a record by id from the the store and remove it from the dirty set. + pub fn delete(&mut self, id: &str) { + if let Some(record) = self.records.get(id) { + match record.status { + InflightActivationStatus::Pending => { + self.pending.retain(|entry| entry.0.id != id); + } + InflightActivationStatus::Processing => { + self.processing.retain(|entry| entry.0.id != id); + } + InflightActivationStatus::Delay => { + self.delayed.retain(|entry| entry.0.id != id); + } + InflightActivationStatus::Retry => { + self.retry.remove(id); + } + InflightActivationStatus::Failure => { + self.failed.remove(id); + } + InflightActivationStatus::Complete => { + self.complete.remove(id); + } + InflightActivationStatus::Unspecified => {} + } + self.records.remove(id); + self.dirty_ids.remove(id); + } + } + + /// Add to failed set + /// Record cannot be found, we have encountered stale heap data. + /// This shouldn't happen. + fn add_to_failed(&mut self, id: String) { + let metadata = self.records.get(&id); + if let Some(metadata) = metadata { + let mut metadata = metadata.clone(); + metadata.status = InflightActivationStatus::Failure; + self.records.insert(id.clone(), metadata); + } + self.dirty_ids.insert(id.clone()); + self.failed.insert(id); + } + + /// Get the next pending activation, optionally by namespace. + /// When looking for pending activations, state transitions are + /// performed for activations past their expires_at, or that have + /// exceeded their processing attempts. + pub fn get_pending_activation( + &mut self, + namespace: Option<&str>, + now: DateTime, + processing_grace_period: u64, + max_processing_attempts: i32, + ) -> Result, anyhow::Error> { + let mut namespace_skipped: Vec = vec![]; + let mut metadata = loop { + let next_entry = self.pending.pop(); + let Some(Reverse(entry)) = next_entry else { + return Ok(None); + }; + let metadata = self.records.get(&entry.id).unwrap(); + + // Check if the entry expired in the pending state. + // This relocates some logic from upkeep to run during fetch, which isn't great, but + // also amortizes the work :shrug: + if let Some(expires_at) = metadata.expires_at + && expires_at < now + { + metrics::counter!("get_pending_activation.expired").increment(1); + self.add_to_failed(entry.id.clone()); + continue; + } + // Check if the entry has exceeded processing attempts. + if metadata.processing_attempts >= max_processing_attempts { + metrics::counter!("get_pending_activation.attempts_exceeded").increment(1); + self.add_to_failed(entry.id.clone()); + continue; + } + + // Filter by namespace if provided. + if let Some(namespace) = namespace + && metadata.namespace != namespace + { + namespace_skipped.push(entry); + continue; + } + + // Found a valid entry. + break metadata.clone(); + }; + // Readd any skipped entries + namespace_skipped.iter().for_each(|entry| { + self.pending.push(Reverse(entry.clone())); + }); + + // Update the processing deadline, and move to processing state, then update the metadata. + // Ordering here is important. + metadata.processing_deadline = Some( + now + Duration::seconds( + (metadata.processing_deadline_duration as u64 + processing_grace_period) as i64, + ), + ); + self.advance_state(&metadata, InflightActivationStatus::Processing); + metadata.status = InflightActivationStatus::Processing; + + self.dirty_ids.insert(metadata.id.clone()); + self.records.insert(metadata.id.clone(), metadata.clone()); + + Ok(Some(metadata.clone())) + } + + pub fn handle_processing_deadline(&mut self) -> Result { + let now = Utc::now(); + + let mut past_deadline: Vec = vec![]; + while let Some(Reverse(entry)) = self.processing.pop() { + // Found a entry in the future stop and process what we found. + if let Some(timestamp) = entry.timestamp + && timestamp > now + { + self.processing.push(Reverse(entry)); + break; + } + past_deadline.push(entry); + } + + for entry in past_deadline.iter() { + let id = entry.id.clone(); + let record = if let Some(record) = self.records.get(&id) { + record + } else { + println!("Warning: stale heap data encountered for id: {id}"); + self.add_to_failed(id.clone()); + continue; + }; + let new_status = if record.at_most_once { + InflightActivationStatus::Failure + } else { + InflightActivationStatus::Pending + }; + + let mut record = record.clone(); + // Reset the deadline and attempts + record.status = new_status; + record.processing_deadline = None; + record.processing_attempts += 1; + + self.dirty_ids.insert(record.id.clone()); + self.records.insert(record.id.clone(), record.clone()); + match record.status { + InflightActivationStatus::Pending => { + self.pending.push(Reverse(record.pending_entry())); + } + InflightActivationStatus::Failure => { + self.failed.insert(record.id.clone()); + } + _ => { + panic!( + "Unexpected status in handle_processing_deadline: {:?}", + record.status + ); + } + } + } + + Ok(past_deadline.len() as u64) + } + + // Get a structure of ids that have been discarded and vec of ids + // that need to be sent to deadletter. Deadletter activations will remain in + // failed until removed after messages are produced. + pub fn handle_failed_tasks(&mut self) -> FailedState { + let mut state = FailedState { + to_discard: vec![], + to_deadletter: vec![], + }; + for id in self.failed.iter() { + let metadata = if let Some(metadata) = self.records.get(id) { + metadata + } else { + continue; + }; + let on_attempts_exceeded = metadata.on_attempts_exceeded; + + if on_attempts_exceeded == OnAttemptsExceeded::Discard + || on_attempts_exceeded == OnAttemptsExceeded::Unspecified + { + state.to_discard.push(id.clone()) + } else if on_attempts_exceeded == OnAttemptsExceeded::Deadletter { + state.to_deadletter.push(id.clone()) + } + } + for id in state.to_discard.iter() { + if let Some(metadata) = self.records.get(id) { + let mut updated = metadata.clone(); + self.advance_state(&updated, InflightActivationStatus::Complete); + updated.status = InflightActivationStatus::Complete; + + self.dirty_ids.insert(id.clone()); + self.records.insert(id.clone(), updated); + }; + } + + state + } + + /// Move any delayed records that are due to pending. + pub fn handle_delay_until(&mut self) -> Result { + let now = Utc::now(); + + let mut ready: Vec = vec![]; + while let Some(Reverse(entry)) = self.delayed.pop() { + // Found a entry in the future. + if let Some(timestamp) = entry.timestamp + && timestamp > now + { + self.delayed.push(Reverse(entry)); + break; + } + ready.push(entry); + } + + for entry in ready.iter() { + let id = entry.id.clone(); + let record = if let Some(record) = self.records.get(&id) { + record + } else { + println!("Warning: stale heap data encountered for id: {id}"); + self.add_to_failed(id.clone()); + continue; + }; + let mut record = record.clone(); + self.advance_state(&record, InflightActivationStatus::Pending); + record.status = InflightActivationStatus::Pending; + + self.dirty_ids.insert(record.id.clone()); + self.records.insert(record.id.clone(), record.clone()); + } + + Ok(ready.len() as u64) + } + + /// Get the list of ids in currently in the retry state. + pub fn get_retry_entry_map(&self) -> HashMap { + let mut map = HashMap::new(); + for item in self.retry.iter() { + if let Some(record) = self.records.get(item) { + map.insert(item.clone(), record.clone()); + } + } + map + } + + /// Get max lag of pending activations. This is o(n) over the pending set. + pub fn pending_activation_max_lag(&mut self, now: &DateTime) -> f64 { + let mut found: Option = None; + for Reverse(entry) in self.pending.iter() { + let metadata = if let Some(metadata) = self.records.get(&entry.id) { + metadata + } else { + continue; + }; + if metadata.processing_attempts > 0 { + continue; + } + if found.is_none() { + found = Some(metadata.clone()); + } else if let Some(existing) = &found + && metadata.received_at < existing.received_at + { + found = Some(metadata.clone()); + } + } + if let Some(oldest) = &found { + // latency is: now - received_at - (delay_until - received_at)? + // We remove the delay to better reflect the time + // activations spend in pending state. + let millis = now + .signed_duration_since(oldest.received_at) + .num_milliseconds() + - oldest.delay_until.map_or(0, |delay_time| { + delay_time + .signed_duration_since(oldest.received_at) + .num_milliseconds() + }); + millis as f64 / 1000.0 + } else { + 0.0 + } + } + + /// Get total count of records in the store. + pub fn count_all(&self) -> usize { + self.records.len() + } + + /// Get a count of records by a specific status. + /// + /// This will both do a linear scan of all records, and also use + /// status specific containers and perform a sanity check. This should be configurable + /// logic as the linear scan will be expensive in production. + pub fn count_by_status(&self, status: InflightActivationStatus) -> usize { + let status_count = match status { + InflightActivationStatus::Pending => self.pending.len(), + InflightActivationStatus::Processing => self.processing.len(), + InflightActivationStatus::Delay => self.delayed.len(), + InflightActivationStatus::Complete => self.complete.len(), + InflightActivationStatus::Retry => self.retry.len(), + InflightActivationStatus::Failure => self.failed.len(), + _ => 0, + }; + // TODO remove this sanity check, or have it record sentry errors instead. + if true { + let count = self.records.iter().fold(0, |acc, (_id, record)| { + let increment = if record.status == status { 1 } else { 0 }; + acc + increment + }); + if status_count != count { + panic!( + "count_by_status() - counts unequal status={status:?} records={count} status_count={status_count}" + ); + } + } + + status_count + } + + /// Update the status of a record by id, no domain rules are applied. + pub fn set_status(&mut self, id: &str, status: InflightActivationStatus) -> anyhow::Result<()> { + if let Some(record) = self.records.get(id) { + let mut record = record.clone(); + self.advance_state(&record, status); + record.status = status; + + self.dirty_ids.insert(id.to_string()); + self.records.insert(id.to_string(), record); + + Ok(()) + } else { + Err(anyhow::anyhow!("Record not found")) + } + } + + /// Update the processing deadline for a record by id. + pub fn set_processing_deadline( + &mut self, + id: &str, + deadline: Option>, + ) -> anyhow::Result<()> { + if let Some(record) = self.records.get_mut(id) { + record.processing_deadline = deadline; + + // Remove the existing entry from the processing heap. + self.processing.retain(|entry| entry.0.id != id); + self.processing.push(Reverse(record.processing_entry())); + + self.dirty_ids.insert(id.to_string()); + Ok(()) + } else { + Err(anyhow::anyhow!("Record not found")) + } + } + + /// Get activation metadata by id + /// Returns a clone of the current state. + pub fn get_by_id(&self, id: &str) -> Option { + if let Some(record) = self.records.get(id) { + return Some(record.clone()); + } + None + } + + /// Remove activations with a name matching `task_names`. + /// Returns a vec of activation ids that were removed. + pub fn remove_killswitched(&mut self, task_names: Vec) -> Vec { + let mut updated: Vec = vec![]; + for (id, metadata) in self.records.iter() { + if task_names.contains(&metadata.taskname) { + updated.push(id.clone()); + } + } + for id in updated.iter() { + self.delete(id); + } + + updated + } +} diff --git a/src/store/mod.rs b/src/store/mod.rs index dcc0f255..b5f6377d 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -1,3 +1,5 @@ pub mod inflight_activation; #[cfg(test)] pub mod inflight_activation_tests; +pub mod metadata_store; +pub mod records; diff --git a/src/store/records.rs b/src/store/records.rs new file mode 100644 index 00000000..d5d9ff07 --- /dev/null +++ b/src/store/records.rs @@ -0,0 +1,266 @@ +use std::cmp::Ordering; + +use crate::store::inflight_activation::{InflightActivation, InflightActivationStatus}; +use chrono::{DateTime, Utc}; +use sentry_protos::taskbroker::v1::OnAttemptsExceeded; +use sqlx::FromRow; + +/// sqlx Row mapping for activation_blobs table +#[derive(Clone, Debug, PartialEq, FromRow)] +pub struct ActivationBlob { + /// The task id + pub id: String, + /// The protobuf activation that was received from kafka + pub activation: Vec, +} + +impl TryFrom for ActivationBlob { + type Error = anyhow::Error; + + fn try_from(value: InflightActivation) -> Result { + Ok(Self { + id: value.id, + activation: value.activation, + }) + } +} + +/// sqlx Row mapping for activation_metadata table +#[derive(Clone, Debug, PartialEq, FromRow)] +pub struct ActivationMetadata { + /// The task id + pub id: String, + + /// Details about the task + // TODO do these need to be here? Or should they be columns + // on the activation_blobs + pub namespace: String, + pub taskname: String, + + /// The current status of the activation + pub status: InflightActivationStatus, + + /// The timestamp a task was stored in Kafka + pub received_at: DateTime, + + /// The timestamp when the activation was stored in activation store. + pub added_at: DateTime, + + /// The number of times the activation has been attempted to be processed. This counter is + /// incremented everytime a task is reset from processing back to pending. When this + /// exceeds max_processing_attempts, the task is discarded/deadlettered. + pub processing_attempts: i32, + + /// The duration in seconds that a worker has to complete task execution. + /// When an activation is moved from pending -> processing a result is expected + /// in this many seconds. + pub processing_deadline_duration: u32, + + /// The timestamp for when processing should be complete + pub processing_deadline: Option>, + + /// Whether or not the activation uses at_most_once. + /// When enabled activations are not retried when processing_deadlines + /// are exceeded. + pub at_most_once: bool, + + /// What to do when the maximum number of attempts to complete a task is exceeded + #[sqlx(try_from = "i32")] + pub on_attempts_exceeded: OnAttemptsExceeded, + + /// If the task has specified an expiry, this is the timestamp after which the task should be removed from inflight store + pub expires_at: Option>, + + /// If the task has specified a delay, this is the timestamp after which the task can be sent to workers + pub delay_until: Option>, +} + +impl ActivationMetadata { + /// The number of milliseconds between an activation's received timestamp + /// and the provided datetime + pub fn received_latency(&self, now: DateTime) -> i64 { + now.signed_duration_since(self.received_at) + .num_milliseconds() + - self.delay_until.map_or(0, |delay_until| { + delay_until + .signed_duration_since(self.received_at) + .num_milliseconds() + }) + } + + pub fn pending_entry(&self) -> TimestampEntry { + // TODO could use ref-str here. clone is simpler to write though. + TimestampEntry { + id: self.id.clone(), + timestamp: Some(self.added_at), + } + } + + pub fn processing_entry(&self) -> TimestampEntry { + TimestampEntry { + id: self.id.clone(), + timestamp: self.processing_deadline, + } + } + + pub fn delayed_entry(&self) -> TimestampEntry { + TimestampEntry { + id: self.id.clone(), + timestamp: self.delay_until, + } + } +} + +impl TryFrom for ActivationMetadata { + type Error = anyhow::Error; + + fn try_from(value: InflightActivation) -> Result { + Ok(Self { + id: value.id, + namespace: value.namespace, + taskname: value.taskname, + status: value.status, + received_at: value.received_at, + added_at: value.added_at, + processing_attempts: value.processing_attempts, + processing_deadline_duration: value.processing_deadline_duration, + processing_deadline: value.processing_deadline, + at_most_once: value.at_most_once, + on_attempts_exceeded: value.on_attempts_exceeded, + expires_at: value.expires_at, + delay_until: value.delay_until, + }) + } +} + +/// We can safely clone from a reference as all fields are cloneable. +impl TryFrom<&InflightActivation> for ActivationMetadata { + type Error = anyhow::Error; + + fn try_from(value: &InflightActivation) -> Result { + Ok(Self { + id: value.id.clone(), + namespace: value.namespace.clone(), + taskname: value.taskname.clone(), + status: value.status, + received_at: value.received_at, + added_at: value.added_at, + processing_attempts: value.processing_attempts, + processing_deadline_duration: value.processing_deadline_duration, + processing_deadline: value.processing_deadline, + at_most_once: value.at_most_once, + on_attempts_exceeded: value.on_attempts_exceeded, + expires_at: value.expires_at, + delay_until: value.delay_until, + }) + } +} + +/// A struct representing a metadata entry in a binary heap. +/// Typically used with ActivationMetadata +#[derive(Clone, Debug)] +pub struct TimestampEntry { + pub id: String, + pub timestamp: Option>, +} + +impl Ord for TimestampEntry { + fn cmp(&self, other: &Self) -> Ordering { + self.timestamp.cmp(&other.timestamp) + } +} + +impl PartialOrd for TimestampEntry { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl PartialEq for TimestampEntry { + fn eq(&self, other: &Self) -> bool { + self.timestamp == other.timestamp + } +} + +impl Eq for TimestampEntry {} + +#[cfg(test)] +mod test { + use std::cmp::Ordering; + + use chrono::{Duration, Utc}; + use sentry_protos::taskbroker::v1::OnAttemptsExceeded; + + use crate::store::inflight_activation::InflightActivationStatus; + + use super::ActivationMetadata; + + #[test] + fn heap_entry_methods() { + let record = ActivationMetadata { + id: "id_0".into(), + namespace: "default".into(), + taskname: "do_stuff".into(), + received_at: Utc::now(), + added_at: Utc::now() + Duration::seconds(2), + processing_attempts: 0, + processing_deadline_duration: 60, + processing_deadline: Some(Utc::now() + Duration::seconds(30)), + at_most_once: false, + on_attempts_exceeded: OnAttemptsExceeded::Discard, + expires_at: None, + delay_until: None, + status: InflightActivationStatus::Pending, + }; + let entry = record.pending_entry(); + assert_eq!(record.id, entry.id); + assert_eq!(record.added_at, entry.timestamp.unwrap()); + + let entry = record.processing_entry(); + assert_eq!(record.id, entry.id); + assert_eq!(record.processing_deadline, entry.timestamp); + + let entry = record.delayed_entry(); + assert_eq!(record.id, entry.id); + assert_eq!(record.delay_until, entry.timestamp); + } + + #[test] + fn entry_cmp() { + let first = ActivationMetadata { + id: "id_0".into(), + namespace: "default".into(), + taskname: "do_stuff".into(), + received_at: Utc::now(), + added_at: Utc::now() + Duration::seconds(2), + processing_attempts: 0, + processing_deadline_duration: 60, + processing_deadline: Some(Utc::now() + Duration::seconds(30)), + at_most_once: false, + on_attempts_exceeded: OnAttemptsExceeded::Discard, + expires_at: None, + delay_until: None, + status: InflightActivationStatus::Pending, + }; + let second = ActivationMetadata { + id: "id_1".into(), + namespace: "default".into(), + taskname: "do_stuff".into(), + received_at: Utc::now(), + added_at: Utc::now() + Duration::seconds(3), + processing_attempts: 0, + processing_deadline_duration: 60, + processing_deadline: Some(Utc::now() + Duration::seconds(30)), + at_most_once: false, + on_attempts_exceeded: OnAttemptsExceeded::Discard, + expires_at: None, + delay_until: None, + status: InflightActivationStatus::Pending, + }; + let first_entry = first.pending_entry(); + let second_entry = second.pending_entry(); + assert!(first_entry < second_entry); + + assert_eq!(Ordering::Less, first_entry.cmp(&second_entry)); + } +} diff --git a/src/upkeep.rs b/src/upkeep.rs index 5afaff11..704628c0 100644 --- a/src/upkeep.rs +++ b/src/upkeep.rs @@ -194,6 +194,9 @@ pub async fn do_upkeep( metrics::counter!("upkeep.handle_processing_deadline.skipped").increment(1); } + // Don't run processing attempts with metadata store as it is a no-op and + // handled in store.get_pending_activation now. + /* // 5. Handle processing attempts exceeded let handle_processing_attempts_exceeded_start = Instant::now(); if let Ok(processing_attempts_exceeded) = store.handle_processing_attempts().await { @@ -201,13 +204,18 @@ pub async fn do_upkeep( } metrics::histogram!("upkeep.handle_processing_attempts_exceeded") .record(handle_processing_attempts_exceeded_start.elapsed()); + */ + // Don't run expires_at checks with metadata store as it is a no-op. + // Like processing attempts, expires_at are handled in store.get_pending_activation now. + /* // 6. Remove tasks that are past their expires_at deadline let handle_expires_at_start = Instant::now(); if let Ok(expired_count) = store.handle_expires_at().await { result_context.expired = expired_count; } metrics::histogram!("upkeep.handle_expires_at").record(handle_expires_at_start.elapsed()); + */ // 7. Handle tasks that are past their delay_until deadline let handle_delay_until_start = Instant::now(); @@ -268,6 +276,11 @@ pub async fn do_upkeep( } metrics::histogram!("upkeep.remove_completed").record(remove_completed_start.elapsed()); + // 11. Flush changes to metadata store? + let flush_dirty_start = Instant::now(); + store.flush_dirty().await; + metrics::histogram!("upkeep.flush_dirty").record(flush_dirty_start.elapsed()); + // 11. Remove killswitched tasks from store let runtime_config = runtime_config_manager.read().await; let killswitched_tasks = runtime_config.drop_task_killswitch.clone(); @@ -810,6 +823,7 @@ mod tests { .await; } + /* #[tokio::test] async fn test_processing_attempts_exceeded_discard() { let config = create_config(); @@ -860,6 +874,7 @@ mod tests { "complete tasks were removed" ); } + */ #[tokio::test] async fn test_remove_at_remove_failed_publish_to_kafka() { @@ -953,6 +968,7 @@ mod tests { ); } + /* #[tokio::test] async fn test_expired_discard() { let config = create_config(); @@ -983,7 +999,8 @@ mod tests { ) .await; - assert_eq!(result_context.expired, 2); // 0/2 removed as expired + // expiration happens in get_pending_activation now + assert_eq!(result_context.expired, 0); assert_eq!(result_context.completed, 1); // 1 complete assert_eq!( store @@ -1019,6 +1036,7 @@ mod tests { "fourth task should be kept" ); } + */ #[tokio::test] async fn test_delay_elapsed() {