diff --git a/CHANGELOG.md b/CHANGELOG.md index 13664ea0..3141b920 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,10 +8,34 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 **Note:** Version 0 of Semantic Versioning is handled differently from version 1 and above. The minor version will be incremented upon a breaking change and the patch version will be incremented for features. -# 2026-03-31 +## 2026-03-31 +- yellowstone-grpc-client-13.0.0 - yellowstone-grpc-client-nodejs-5.0.7 +- yellowstone-grpc-client-simple-12.2.0 +- yellowstone-grpc-geyser-12.2.0 +- yellowstone-grpc-proto-12.1.0 + + +### Breaking + +Introduce API breaking that should facilitate maintenance and compilation errors. + +- Removed generic interceptor types from `GeyserGrpcClient` +- Replaced `impl Sink` with `SubscribeRequestSink` type. +- Replaced `impl Sink` with `SubscribeDeshredRequestSink` type. +- Replaced `impl Stream>` with `GeyserStream` type. +- Replaced `impl Stream>` with `DeshredStream` type +- `SubscribeRequestSink`now returns `SubscribeRequestSinkError` instead of `mpsc::SendError`. +- Removed `GeyserGrpcClientError::SubscribeSendError` variant as no code branch could raise this error. + +Referenced PR(s) :[721](https://github.com/rpcpool/yellowstone-grpc/pull/721) + + +# 2026-03-31 + - yellowstone-grpc-client-12.2.0 +- yellowstone-grpc-client-nodejs-5.0.7 - yellowstone-grpc-client-simple-12.2.0 - yellowstone-grpc-geyser-12.2.0 - yellowstone-grpc-proto-12.1.0 diff --git a/Cargo.lock b/Cargo.lock index 96495f0d..eb9da97c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5311,7 +5311,7 @@ checksum = "9edde0db4769d2dc68579893f2306b26c6ecfbe0ef499b013d731b7b9247e0b9" [[package]] name = "yellowstone-grpc-client" -version = "12.2.0" +version = "13.0.0" dependencies = [ "bytes", "futures", diff --git a/Cargo.toml b/Cargo.toml index f0fc17d4..c08c573e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -95,7 +95,7 @@ solana-transaction-error = "3.0.0" spl-token-2022-interface = "2.0.0" # Yellowstone -yellowstone-grpc-client = { path = "yellowstone-grpc-client", version = "12.2.0" } +yellowstone-grpc-client = { path = "yellowstone-grpc-client", version = "13.0.0" } yellowstone-grpc-geyser = { path = "yellowstone-grpc-geyser", version = "12.2.0" } yellowstone-grpc-proto = { path = "yellowstone-grpc-proto", version = "12.1.0", default-features = false } diff --git a/examples/rust/src/bin/client.rs b/examples/rust/src/bin/client.rs index b6685bb7..f2550459 100644 --- a/examples/rust/src/bin/client.rs +++ b/examples/rust/src/bin/client.rs @@ -21,9 +21,7 @@ use { }, tokio::{fs, sync::Mutex}, tonic::transport::{channel::ClientTlsConfig, Certificate}, - yellowstone_grpc_client::{ - GeyserGrpcBuilder, GeyserGrpcClient, GeyserGrpcClientError, Interceptor, - }, + yellowstone_grpc_client::{GeyserGrpcBuilder, GeyserGrpcClient}, yellowstone_grpc_geyser::plugin::{convert_from, filter::message::FilteredUpdate}, yellowstone_grpc_proto::{ geyser::SlotStatus, @@ -659,7 +657,7 @@ impl Action { } async fn run_action( - mut client: GeyserGrpcClient, + mut client: GeyserGrpcClient, action: &Action, commitment: Option, ) -> anyhow::Result<()> { @@ -782,7 +780,7 @@ async fn main() -> anyhow::Result<()> { .await } -async fn geyser_health_watch(mut client: GeyserGrpcClient) -> anyhow::Result<()> { +async fn geyser_health_watch(mut client: GeyserGrpcClient) -> anyhow::Result<()> { let mut stream = client.health_watch().await?; info!("stream opened"); while let Some(message) = stream.next().await { @@ -793,7 +791,7 @@ async fn geyser_health_watch(mut client: GeyserGrpcClient) -> } async fn geyser_subscribe( - mut client: GeyserGrpcClient, + mut client: GeyserGrpcClient, request: SubscribeRequest, resub: usize, stats: bool, @@ -1047,8 +1045,7 @@ async fn geyser_subscribe( ping: None, from_slot: None, }) - .await - .map_err(GeyserGrpcClientError::SubscribeSendError)?; + .await?; } } info!("stream closed"); @@ -1056,7 +1053,7 @@ async fn geyser_subscribe( } async fn geyser_subscribe_deshred( - mut client: GeyserGrpcClient, + mut client: GeyserGrpcClient, request: SubscribeDeshredRequest, stats: bool, ) -> anyhow::Result<()> { diff --git a/yellowstone-grpc-client/Cargo.toml b/yellowstone-grpc-client/Cargo.toml index a8c80c33..d77ee98f 100644 --- a/yellowstone-grpc-client/Cargo.toml +++ b/yellowstone-grpc-client/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "yellowstone-grpc-client" -version = "12.2.0" +version = "13.0.0" authors = { workspace = true } edition = { workspace = true } description = "Yellowstone gRPC Geyser Simple Client" diff --git a/yellowstone-grpc-client/src/lib.rs b/yellowstone-grpc-client/src/lib.rs index 04671bd5..1e7e40d8 100644 --- a/yellowstone-grpc-client/src/lib.rs +++ b/yellowstone-grpc-client/src/lib.rs @@ -5,6 +5,7 @@ use { channel::mpsc, sink::{Sink, SinkExt}, stream::Stream, + StreamExt, }, std::{path::PathBuf, time::Duration}, tokio::net::UnixStream, @@ -53,19 +54,20 @@ impl Interceptor for InterceptorXToken { pub enum GeyserGrpcClientError { #[error("gRPC status: {0}")] TonicStatus(#[from] Status), - #[error("Failed to send subscribe request: {0}")] - SubscribeSendError(#[from] mpsc::SendError), } pub type GeyserGrpcClientResult = Result; +/// +/// See [`GeyserGrpcBuilder`] for constructing a client with custom options. +/// #[derive(Clone)] -pub struct GeyserGrpcClient { - pub health: HealthClient>, - pub geyser: GeyserClient>, +pub struct GeyserGrpcClient { + pub health: HealthClient>, + pub geyser: GeyserClient>, } -impl GeyserGrpcClient<()> { +impl GeyserGrpcClient { pub fn build_from_shared( endpoint: impl Into, ) -> GeyserGrpcBuilderResult { @@ -77,10 +79,145 @@ impl GeyserGrpcClient<()> { } } -impl GeyserGrpcClient { +#[derive(Debug, thiserror::Error)] +#[error(transparent)] +pub struct SubscribeRequestSinkError(#[from] mpsc::SendError); + +/// +/// A sink returned by the [`GeyserGrpcClient::subscribe`]. +/// +/// The sink is used to send [`SubscribeRequest`] updates to the server. +/// +#[derive(Clone)] +pub struct SubscribeRequestSink { + inner: mpsc::UnboundedSender, +} + +impl Sink for SubscribeRequestSink { + type Error = SubscribeRequestSinkError; + + fn poll_ready( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.inner.poll_ready(cx).map_err(Into::into) + } + + fn start_send( + mut self: std::pin::Pin<&mut Self>, + item: SubscribeRequest, + ) -> Result<(), Self::Error> { + self.inner.start_send_unpin(item).map_err(Into::into) + } + + fn poll_flush( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.inner.poll_flush_unpin(cx).map_err(Into::into) + } + + fn poll_close( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.inner.poll_close_unpin(cx).map_err(Into::into) + } +} + +/// +/// Streams returned by the [`GeyserGrpcClient::subscribe`]. +/// +/// The stream yields [`SubscribeUpdate`] from the server. +/// +pub struct GeyserStream { + inner: Streaming, +} + +impl Stream for GeyserStream { + type Item = Result; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.inner.poll_next_unpin(cx) + } +} + +/// +/// Streams returned by the [`GeyserGrpcClient::subscribe_deshred`]. +/// +/// The stream yields [`SubscribeUpdateDeshred`] from the server. +/// +pub struct DeshredStream { + inner: Streaming, +} + +impl Stream for DeshredStream { + type Item = Result; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.inner.poll_next_unpin(cx) + } +} + +/// +/// Errors returns by the [`SubscribeDeshredRequestSink`] when sending subscription updates to the server. +/// +#[derive(Debug, thiserror::Error)] +#[error(transparent)] +pub struct SubscribeDeshredRequestSinkError(#[from] mpsc::SendError); + +/// +/// Sinks returned by the [`GeyserGrpcClient::subscribe_deshred`]. +/// +/// The sink is used to send [`SubscribeDeshredRequest`] updates to the server. +/// +pub struct SubscribeDeshredRequestSink { + inner: mpsc::UnboundedSender, +} + +impl Sink for SubscribeDeshredRequestSink { + type Error = SubscribeDeshredRequestSinkError; + + fn poll_ready( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.inner.poll_ready(cx).map_err(Into::into) + } + + fn start_send( + mut self: std::pin::Pin<&mut Self>, + item: SubscribeDeshredRequest, + ) -> Result<(), Self::Error> { + self.inner.start_send_unpin(item).map_err(Into::into) + } + + fn poll_flush( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.inner.poll_flush_unpin(cx).map_err(Into::into) + } + + fn poll_close( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.inner.poll_close_unpin(cx).map_err(Into::into) + } +} + +impl GeyserGrpcClient { + // TODO: check if we need to make this function `pub(crate` instead as users of the lib should use the builder to construct a client anyway. pub const fn new( - health: HealthClient>, - geyser: GeyserClient>, + health: HealthClient>, + geyser: GeyserClient>, ) -> Self { Self { health, geyser } } @@ -104,35 +241,70 @@ impl GeyserGrpcClient { Ok(response.into_inner()) } - // Subscribe + /// + /// Establish a subscription to the gRPC server without an initial request. + /// + /// If you don't plan to change the [`SubscribeRequest`] after the initial subscription, consider using [`GeyserGrpcClient::subscribe_once`] instead for a simpler API. + /// + /// # Returns + /// + /// A tuple of ([`SubscribeRequestSink`], [`GeyserStream`]): + /// + /// - [`SubscribeRequestSink`]: a sink to send `SubscribeRequest` updates to the server. + /// The server will update the subscription based on the latest request received from the sink. + /// request received from the sink. + /// - [`GeyserStream`]: a stream of `SubscribeUpdate` from the server. + /// The stream will yield updates based on the latest request received from the sink. + /// + /// # Lifecyle and dropping rules + /// + /// The subscription will remain active until the stream is dropped or the server closes the connection. + /// + /// # Initial [`SubscribeRequest`] + /// + /// You have to provide a [`SubscribeRequest`] to the server to start receiving updates. + /// pub async fn subscribe( &mut self, - ) -> GeyserGrpcClientResult<( - impl Sink, - impl Stream>, - )> { + ) -> GeyserGrpcClientResult<(SubscribeRequestSink, GeyserStream)> { self.subscribe_with_request(None).await } + /// + /// Similar to [`GeyserGrpcClient::subscribe`] but allows you to provide an initial [`SubscribeRequest`] to the server. + /// pub async fn subscribe_with_request( &mut self, request: Option, - ) -> GeyserGrpcClientResult<( - impl Sink + use, - impl Stream> + use, - )> { + ) -> GeyserGrpcClientResult<(SubscribeRequestSink, GeyserStream)> { let (mut subscribe_tx, subscribe_rx) = mpsc::unbounded(); if let Some(request) = request { - subscribe_tx - .send(request) - .await - .map_err(GeyserGrpcClientError::SubscribeSendError)?; + match subscribe_tx.send(request).await { + Ok(_) => (), + Err(e) => unreachable!( + "channel cannot be disconnected or full at this point, got error: {e}" + ), + } } let response: Response> = self.geyser.subscribe(subscribe_rx).await?; - Ok((subscribe_tx, response.into_inner())) + Ok(( + SubscribeRequestSink { + inner: subscribe_tx, + }, + GeyserStream { + inner: response.into_inner(), + }, + )) } + /// + /// Subscribe to updates with an initial request. + /// + /// Unlike [`GeyserGrpcClient::subscribe`], it does not return the sink to send subsequent requests, + /// so it is only useful for one-off subscription that does not need to update the request after + /// the initial subscription. + /// pub async fn subscribe_once( &mut self, request: SubscribeRequest, @@ -142,35 +314,63 @@ impl GeyserGrpcClient { .map(|(_sink, stream)| stream) } - // Subscribe Deshred + /// + /// Subscribe to deshred (only transactions right now). + /// + /// Deshredded updates are events happening before any replay, they are not guaranteed to be valid or even to be included in the ledger, + /// but they are emitted at the earliest possible time with the most information available. + /// + /// # Deshred vs Processed + /// + /// Deshredded transactions have not been replayed yet, so they do not contains any replayed metadata, such as status, log messages, or compute units used. + /// pub async fn subscribe_deshred( &mut self, - ) -> GeyserGrpcClientResult<( - impl Sink, - impl Stream>, - )> { + ) -> GeyserGrpcClientResult<(SubscribeDeshredRequestSink, DeshredStream)> { self.subscribe_deshred_with_request(None).await } + /// + /// See [`GeyserGrpcClient::subscribe_deshred`] for more details. + /// + /// # Arguments + /// + /// * `request`: an optional initial [`SubscribeDeshredRequest`] to send to the server when establishing the subscription. + /// If provided, the server will start sending deshredded updates based on the request immediately after the subscription is established. + /// If not provided, the server will wait for the first request from the sink before sending any updates. + /// pub async fn subscribe_deshred_with_request( &mut self, request: Option, - ) -> GeyserGrpcClientResult<( - impl Sink + use, - impl Stream> + use, - )> { + ) -> GeyserGrpcClientResult<(SubscribeDeshredRequestSink, DeshredStream)> { let (mut subscribe_tx, subscribe_rx) = mpsc::unbounded(); if let Some(request) = request { - subscribe_tx - .send(request) - .await - .map_err(GeyserGrpcClientError::SubscribeSendError)?; + match subscribe_tx.send(request).await { + Ok(_) => (), + Err(e) => unreachable!( + "channel cannot be disconnected or full at this point, got error: {e}" + ), + } } let response: Response> = self.geyser.subscribe_deshred(subscribe_rx).await?; - Ok((subscribe_tx, response.into_inner())) + Ok(( + SubscribeDeshredRequestSink { + inner: subscribe_tx, + }, + DeshredStream { + inner: response.into_inner(), + }, + )) } + /// + /// Subscribe to deshred updates with an initial request. + /// + /// Unlike [`GeyserGrpcClient::subscribe_deshred`], it does not return the sink to send subsequent requests, + /// so it is only useful for one-off subscription that does not need to update the request after + /// the initial subscription. + /// pub async fn subscribe_deshred_once( &mut self, request: SubscribeDeshredRequest, @@ -252,14 +452,26 @@ impl GeyserGrpcClient { #[derive(Debug, thiserror::Error)] pub enum GeyserGrpcBuilderError { + /// + /// Raised when invalid x-token is provided, such as empty string or string with non-ASCII characters. #[error("Failed to parse x-token: {0}")] MetadataValueError(#[from] InvalidMetadataValue), + /// + /// Raised when there is an error in the underlying gRPC transport, such as invalid URI, connection failure, TLS configuration error, etc. + /// #[error("gRPC transport error: {0}")] TonicError(#[from] tonic::transport::Error), } pub type GeyserGrpcBuilderResult = Result; +/// +/// The builder for constructing a [`GeyserGrpcClient`] with custom options. +/// +/// The builder provides a fluent API to configure both the gRPC transport options and the Geyser client options. +/// For transport options, it exposes the similar configuration as [`Endpoint`] builder since it is used to construct. +/// +/// Use [`GeyserGrpcBuilder::connect`] or [`GeyserGrpcBuilder::connect_lazy`] to create a [`GeyserGrpcClient`] from configured builder. #[derive(Debug)] pub struct GeyserGrpcBuilder { pub endpoint: Endpoint, @@ -294,10 +506,7 @@ impl GeyserGrpcBuilder { } // Create client - fn build( - self, - channel: Channel, - ) -> GeyserGrpcBuilderResult> { + fn build(self, channel: Channel) -> GeyserGrpcBuilderResult { let interceptor = InterceptorXToken { x_token: self.x_token, x_request_snapshot: self.x_request_snapshot, @@ -323,16 +532,19 @@ impl GeyserGrpcBuilder { )) } - pub async fn connect( - self, - ) -> GeyserGrpcBuilderResult> { + /// + /// Builds an instance of [`GeyserGrpcClient`] by connecting to the gRPC server. + /// + pub async fn connect(self) -> GeyserGrpcBuilderResult { let channel = self.endpoint.connect().await?; self.build(channel) } - pub fn connect_lazy( - self, - ) -> GeyserGrpcBuilderResult> { + /// + /// Builds an instance of [`GeyserGrpcClient`] without actually connecting to the gRPC server. + /// This will wait for the first gRPC call to trigger the connection to the server, and it will use the configured options in the builder for that connection. + /// + pub fn connect_lazy(self) -> GeyserGrpcBuilderResult { let channel = self.endpoint.connect_lazy(); self.build(channel) } @@ -345,7 +557,7 @@ impl GeyserGrpcBuilder { pub async fn connect_uds( self, path: impl Into, - ) -> GeyserGrpcBuilderResult> { + ) -> GeyserGrpcBuilderResult { let path = path.into(); // tonic needs an Endpoint to hang config off of, but the URI is ignored @@ -363,7 +575,8 @@ impl GeyserGrpcBuilder { self.build(channel) } - // Set x-token + /// + /// Sets `x-token` credentials for the client. The token will be included in the metadata of every gRPC request sent by the client. pub fn x_token(self, x_token: Option) -> GeyserGrpcBuilderResult where T: TryInto, @@ -374,7 +587,8 @@ impl GeyserGrpcBuilder { }) } - // Include `x-request-snapshot` + /// + /// Sets the `x-request-snapshot` flag for the client. This flag will be included in the metadata of every gRPC request sent by the client. pub fn set_x_request_snapshot(self, value: bool) -> Self { Self { x_request_snapshot: value, @@ -382,7 +596,9 @@ impl GeyserGrpcBuilder { } } - // Endpoint options + /// + /// Sets endpoint options + /// pub fn connect_timeout(self, dur: Duration) -> Self { Self { endpoint: self.endpoint.connect_timeout(dur), diff --git a/yellowstone-grpc-geyser/src/plugin/filter/message.rs b/yellowstone-grpc-geyser/src/plugin/filter/message.rs index 642a0988..95dcd6ed 100644 --- a/yellowstone-grpc-geyser/src/plugin/filter/message.rs +++ b/yellowstone-grpc-geyser/src/plugin/filter/message.rs @@ -1022,23 +1022,31 @@ impl FilteredUpdateEntry { #[cfg(any(test, feature = "bench"))] pub mod tests { + #[cfg(test)] + use super::{FilteredUpdate, FilteredUpdateOneof}; + #[cfg(test)] + use crate::plugin::{ + filter::{ + encoder::{AccountEncoder, TransactionEncoder}, + message::{FilteredUpdateAccount, FilteredUpdateTransaction}, + }, + message::{MessageSlot, MessageTransaction, SlotStatus}, + }; + #[cfg(test)] + use prost::Message as _; + #[cfg(test)] + use yellowstone_grpc_proto::geyser::SubscribeUpdate; use { - super::{FilteredUpdate, FilteredUpdateBlock, FilteredUpdateFilters, FilteredUpdateOneof}, + super::{FilteredUpdateBlock, FilteredUpdateFilters}, crate::plugin::{ convert_to, - filter::{ - encoder::{AccountEncoder, TransactionEncoder}, - message::{FilteredUpdateAccount, FilteredUpdateTransaction}, - name::FilterName, - FilterAccountsDataSlice, - }, + filter::{name::FilterName, FilterAccountsDataSlice}, message::{ - MessageAccount, MessageAccountInfo, MessageBlockMeta, MessageEntry, MessageSlot, - MessageTransaction, MessageTransactionInfo, SlotStatus, + MessageAccount, MessageAccountInfo, MessageBlockMeta, MessageEntry, + MessageTransactionInfo, }, }, bytes::Bytes, - prost::Message as _, prost_011::Message as _, prost_types::Timestamp, solana_hash::Hash, @@ -1054,7 +1062,7 @@ pub mod tests { sync::{Arc, OnceLock}, time::SystemTime, }, - yellowstone_grpc_proto::geyser::{SubscribeUpdate, SubscribeUpdateBlockMeta}, + yellowstone_grpc_proto::geyser::SubscribeUpdateBlockMeta, }; pub fn create_message_filters(names: &[&str]) -> FilteredUpdateFilters { @@ -1273,6 +1281,7 @@ pub mod tests { .collect() } + #[cfg(test)] fn encode_decode_cmp(filters: &[&str], message: FilteredUpdateOneof) { let msg = FilteredUpdate { filters: create_message_filters(filters),