From 9d6679eb8354446993bda755793a090b9ee0f3e0 Mon Sep 17 00:00:00 2001 From: Skiba Jan Date: Tue, 20 Jan 2026 10:51:43 +0100 Subject: [PATCH] beam-lib: add socket callback api --- beam-lib/src/http_util.rs | 25 ++++++++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/beam-lib/src/http_util.rs b/beam-lib/src/http_util.rs index 78b05dce..efb2b231 100644 --- a/beam-lib/src/http_util.rs +++ b/beam-lib/src/http_util.rs @@ -24,16 +24,16 @@ pub enum BeamError { #[error("The following receivers had invalid certificates which is why the request has been canceld: {0:?}")] InvalidReceivers(Vec), #[error("Other handler specific error: {0}")] - Other(Box) + Other(Box), } impl BeamError { - fn other>>(e: T) -> Self { + pub fn other>>(e: T) -> Self { Self::Other(e.into()) } } -pub type Result = std::result::Result; +pub type Result = std::result::Result; /// Long polling blocking options /// @@ -263,6 +263,25 @@ impl BeamClient { .map_err(Into::into) } } + + #[cfg(feature = "sockets")] + pub async fn handle_sockets(&self, cb: F) -> Result<()> + where + F: Fn(SocketTask, reqwest::Upgraded) -> Fut, + Fut: Future>, + { + loop { + let Some(socket_task) = self + .get_socket_tasks(&BlockingOptions::from_count(1)) + .await? + .pop() + else { + continue; + }; + let socket = self.connect_socket(&socket_task.id).await?; + cb(socket_task, socket).await?; + } + } } impl HandleInvalidReceiversExt for Response {