|  | 
|  | 1 | +use crate::prelude::{ProtonClient, ProtonClientError, Result}; | 
|  | 2 | +use clickhouse::query::RowCursor; | 
|  | 3 | +use clickhouse::Row; | 
|  | 4 | +use serde::Deserialize; | 
|  | 5 | + | 
|  | 6 | +impl ProtonClient { | 
|  | 7 | +    /// | 
|  | 8 | +    /// Executes a streaming query, returning a [`RowCursor`] to obtain results | 
|  | 9 | +    /// as they become available from the stream. The key difference compared to fetch is that, | 
|  | 10 | +    /// for streaming query, the returned result is a unbounded stream. Also, | 
|  | 11 | +    /// a fetch_stream query will keep running continuously returning fresh data | 
|  | 12 | +    /// until the application terminates.. | 
|  | 13 | +    /// | 
|  | 14 | +    /// # Example | 
|  | 15 | +    /// | 
|  | 16 | +    /// ```no_run | 
|  | 17 | +    ///  use proton_client::ProtonClient; | 
|  | 18 | +    ///  use proton_client::prelude::Result; | 
|  | 19 | +    /// | 
|  | 20 | +    ///  async fn example() -> Result<()> { | 
|  | 21 | +    /// | 
|  | 22 | +    /// #[derive(Debug, clickhouse::Row, serde::Deserialize)] | 
|  | 23 | +    /// struct MyRow { | 
|  | 24 | +    ///     no: u32, | 
|  | 25 | +    ///     name: String, | 
|  | 26 | +    /// } | 
|  | 27 | +    /// | 
|  | 28 | +    /// let client = ProtonClient::new("http://localhost:3218"); | 
|  | 29 | +    /// | 
|  | 30 | +    ///  let mut cursor = client | 
|  | 31 | +    ///     .fetch_stream::<MyRow>("SELECT ?fields from (test_stream) WHERE no BETWEEN 500 AND 504") | 
|  | 32 | +    ///     .await | 
|  | 33 | +    ///     .expect("[main/fetch]: Failed to fetch stream data"); | 
|  | 34 | +    /// | 
|  | 35 | +    /// while let Some(MyRow { name, no }) = cursor.next().await.expect("[main/fetch]: Failed to fetch data") { | 
|  | 36 | +    ///     println!("{name}: {no}"); | 
|  | 37 | +    /// } | 
|  | 38 | +    /// # Ok(()) } | 
|  | 39 | +    /// ``` | 
|  | 40 | +    pub async fn fetch_stream<T>(&self, query: &str) -> Result<RowCursor<T>> | 
|  | 41 | +    where | 
|  | 42 | +        T: Row + for<'b> Deserialize<'b>, | 
|  | 43 | +    { | 
|  | 44 | +        // Here we use the client without compression. For details, see: | 
|  | 45 | +        // https://github.com/timeplus-io/proton-rust-client/issues/6 | 
|  | 46 | +        match self.client_without_compression.query(query).fetch::<T>() { | 
|  | 47 | +            Ok(cursor) => Ok(cursor), | 
|  | 48 | +            Err(e) => Err(ProtonClientError::FetchFailed(e.to_string())), | 
|  | 49 | +        } | 
|  | 50 | +    } | 
|  | 51 | +} | 
0 commit comments