diff --git a/broker/src/serve_tasks.rs b/broker/src/serve_tasks.rs index 6d047a99..0bb07a1e 100644 --- a/broker/src/serve_tasks.rs +++ b/broker/src/serve_tasks.rs @@ -40,6 +40,7 @@ pub(crate) fn router() -> Router { let state = TasksState::default(); Router::new() .route("/v1/tasks", get(get_tasks).post(post_task)) + .route("/v1/tasks/{task_id}", get(get_task_by_id)) .route("/v1/tasks/{task_id}/results", get(get_results_for_task)) .route("/v1/tasks/{task_id}/results/{app_id}", put(put_result)) .with_state(state) @@ -212,6 +213,31 @@ async fn get_tasks( }) } +async fn get_task_by_id( + State(state): State, + Path(task_id): Path, + mut block: HowLongToBlock, + msg: MsgSigned, +) -> Result { + if !(block.wait_count.is_none() || block.wait_count == Some(1)) { + return Err(StatusCode::BAD_REQUEST); + } + block.wait_count = Some(1); + let Some(task) = state.task_manager + .wait_for_tasks(&block, |task| task.id() == &task_id && (msg.get_from() == task.get_from() || task.get_to().contains(msg.get_from()))) + .await? + .next() + else { + return Err(StatusCode::NOT_FOUND); + }; + let body = serde_json::to_vec(&*task) + .map_err(|e| { + warn!("Failed to serialize task: {e}"); + StatusCode::INTERNAL_SERVER_ERROR + })?; + Ok(([(header::CONTENT_TYPE, HeaderValue::from_static("application/json"))], body).into_response()) +} + trait MsgFilterTrait { // fn new() -> Self; fn from(&self) -> Option<&AppOrProxyId>; diff --git a/proxy/src/serve_tasks.rs b/proxy/src/serve_tasks.rs index 6c01e10d..7e9f6594 100644 --- a/proxy/src/serve_tasks.rs +++ b/proxy/src/serve_tasks.rs @@ -39,6 +39,7 @@ pub(crate) fn router(client: &SamplyHttpClient) -> Router { Router::new() // We need both path variants so the server won't send us into a redirect loop (/tasks, /tasks/, ...) .route("/v1/tasks", get(handler_task).post(handler_task)) + .route("/v1/tasks/{task_id}", get(handler_task)) .route("/v1/tasks/{task_id}/results", get(handler_task)) .route("/v1/tasks/{task_id}/results/{app_id}", put(handler_task)) .with_state(state)