diff --git a/.github/workflows/validate-examples.yml b/.github/workflows/validate-examples.yml index 129c2c4a..4efc8d3a 100644 --- a/.github/workflows/validate-examples.yml +++ b/.github/workflows/validate-examples.yml @@ -144,7 +144,7 @@ jobs: fail-fast: false matrix: examples: - [ "actors", "bindings", "client", "configuration", "crypto", "invoke/grpc", "invoke/grpc-proxying", "pubsub", "query_state", "secrets-bulk" ] + [ "actors", "bindings", "client", "configuration", "crypto", "invoke/grpc", "invoke/grpc-proxying", "pubsub", "query_state", "resiliency/instance", "resiliency/simple", "secrets-bulk" ] steps: - name: Check out code uses: actions/checkout@v4 @@ -210,3 +210,5 @@ jobs: run: | cd examples ./validate.sh ${{ matrix.examples }} + + diff --git a/Cargo.toml b/Cargo.toml index 4d06b997..25ceca92 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,7 @@ axum = "0.7.4" tokio = { version = "1.29", features = ["sync"] } tokio-util = { version = "0.7.10", features = ["io"] } chrono = "0.4.24" +backon = "0.4.4" [build-dependencies] tonic-build = "0.11.0" @@ -98,6 +99,14 @@ path = "examples/query_state/query1.rs" name = "query_state_q2" path = "examples/query_state/query2.rs" +[[example]] +name = "resiliency-instance" +path = "examples/resiliency/instance/main.rs" + +[[example]] +name = "resiliency-simple" +path = "examples/resiliency/simple/main.rs" + [[example]] name = "secrets-bulk" path = "examples/secrets-bulk/app.rs" diff --git a/examples/actors/client.rs b/examples/actors/client.rs index 11eec37e..5a18dafb 100644 --- a/examples/actors/client.rs +++ b/examples/actors/client.rs @@ -12,10 +12,6 @@ pub struct MyRequest { #[tokio::main] async fn main() -> Result<(), Box> { - // TODO: Handle this issue in the sdk - // Introduce delay so that dapr grpc port is assigned before app tries to connect - std::thread::sleep(std::time::Duration::new(2, 0)); - // Define the Dapr address let addr = "https://127.0.0.1".to_string(); diff --git a/examples/client/client.rs b/examples/client/client.rs index b34d4244..fc048d69 100644 --- a/examples/client/client.rs +++ b/examples/client/client.rs @@ -1,9 +1,5 @@ #[tokio::main] async fn main() -> Result<(), Box> { - // TODO: Handle this issue in the sdk - // Introduce delay so that dapr grpc port is assigned before app tries to connect - std::thread::sleep(std::time::Duration::new(2, 0)); - // Set the Dapr address let addr = "https://127.0.0.1".to_string(); diff --git a/examples/configuration/main.rs b/examples/configuration/main.rs index 0413b5a2..61ce325d 100644 --- a/examples/configuration/main.rs +++ b/examples/configuration/main.rs @@ -5,10 +5,6 @@ type DaprClient = dapr::Client; #[tokio::main] async fn main() -> Result<(), Box> { - // TODO: Handle this issue in the sdk - // Introduce delay so that dapr grpc port is assigned before app tries to connect - std::thread::sleep(std::time::Duration::new(2, 0)); - // Set the Dapr address let addr = "https://127.0.0.1".to_string(); diff --git a/examples/crypto/main.rs b/examples/crypto/main.rs index 0fe42fb3..3c4332cd 100644 --- a/examples/crypto/main.rs +++ b/examples/crypto/main.rs @@ -1,13 +1,11 @@ use std::fs; use tokio::fs::File; -use tokio::time::sleep; use dapr::client::ReaderStream; #[tokio::main] async fn main() -> Result<(), Box> { - sleep(std::time::Duration::new(2, 0)).await; let addr = "https://127.0.0.1".to_string(); let mut client = dapr::Client::::connect(addr).await?; diff --git a/examples/invoke/grpc-proxying/client.rs b/examples/invoke/grpc-proxying/client.rs index 99172962..c84f8915 100644 --- a/examples/invoke/grpc-proxying/client.rs +++ b/examples/invoke/grpc-proxying/client.rs @@ -1,5 +1,3 @@ -use std::{thread, time::Duration}; - use hello_world::{greeter_client::GreeterClient, HelloRequest}; use tonic::metadata::MetadataValue; @@ -10,9 +8,6 @@ pub mod hello_world { #[tokio::main] async fn main() -> Result<(), Box> { - // Sleep to allow for the server to become available - thread::sleep(Duration::from_secs(5)); - // Get the Dapr port and create a connection let port: u16 = std::env::var("DAPR_GRPC_PORT").unwrap().parse().unwrap(); let address = format!("https://127.0.0.1:{}", port); diff --git a/examples/invoke/grpc/client.rs b/examples/invoke/grpc/client.rs index d3d3cff2..d0d6df70 100644 --- a/examples/invoke/grpc/client.rs +++ b/examples/invoke/grpc/client.rs @@ -1,5 +1,3 @@ -use std::{thread, time::Duration}; - use hello_world::{HelloReply, HelloRequest}; use prost::Message; @@ -11,9 +9,6 @@ type DaprClient = dapr::Client; #[tokio::main] async fn main() -> Result<(), Box> { - // Sleep to allow for the server to become available - thread::sleep(Duration::from_secs(5)); - // Set the Dapr address let address = "https://127.0.0.1".to_string(); diff --git a/examples/pubsub/publisher.rs b/examples/pubsub/publisher.rs index 543e6b28..328c8fd3 100644 --- a/examples/pubsub/publisher.rs +++ b/examples/pubsub/publisher.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, thread, time::Duration}; +use std::{collections::HashMap, time::Duration}; use dapr::serde::{Deserialize, Serialize}; use dapr::serde_json; @@ -17,10 +17,6 @@ struct Refund { #[tokio::main] async fn main() -> Result<(), Box> { - // TODO: Handle this issue in the sdk - // Introduce delay so that dapr grpc port is assigned before app tries to connect - thread::sleep(Duration::from_secs(2)); - // Set address for Dapr connection let addr = "https://127.0.0.1".to_string(); diff --git a/examples/query_state/query1.rs b/examples/query_state/query1.rs index dc6859f8..3fa60fd7 100644 --- a/examples/query_state/query1.rs +++ b/examples/query_state/query1.rs @@ -2,9 +2,6 @@ use serde_json::json; #[tokio::main] async fn main() -> Result<(), Box> { - // Introduce delay so that dapr grpc port is assigned before app tries to connect - std::thread::sleep(std::time::Duration::new(5, 0)); - // Set the Dapr address and create a connection let addr = "https://127.0.0.1".to_string(); diff --git a/examples/query_state/query2.rs b/examples/query_state/query2.rs index e8e0c7cb..15a0a3ad 100644 --- a/examples/query_state/query2.rs +++ b/examples/query_state/query2.rs @@ -2,9 +2,6 @@ use serde_json::json; #[tokio::main] async fn main() -> Result<(), Box> { - // Introduce delay so that dapr grpc port is assigned before app tries to connect - std::thread::sleep(std::time::Duration::new(5, 0)); - // Set the Dapr address and create a connection let addr = "https://127.0.0.1".to_string(); diff --git a/examples/resiliency/instance/README.md b/examples/resiliency/instance/README.md new file mode 100644 index 00000000..8c945546 --- /dev/null +++ b/examples/resiliency/instance/README.md @@ -0,0 +1,107 @@ +This example validates the resiliency of the instantiated client and does not +demonstrate any extra functionality. It is based off the configuration example +to connect to the sidecar and make a call for a configuration item stored in +redis. + +1. Insert a key with the value `hello` to redis using the following command: + + + + +```bash +docker exec dapr_redis redis-cli MSET hello "world" +``` + + + +2. Run the example without the sidecar + + + +```bash +cargo run --example resiliency-instance +``` + + + +3. Run the Dapr sidecar + + + +```bash +dapr run --app-id=rustapp --resources-path ../../components --dapr-grpc-port 3500 +``` + + + +4. Update the hello key with the value `world2` to redis using the following command: + + + + +```bash +docker exec dapr_redis redis-cli MSET hello "world2" +``` + + + +5. Run the Dapr sidecar (for the second time) + + + +```bash +dapr run --app-id=rustapp --resources-path ../../components --dapr-grpc-port 3500 +``` + + +The example app should make contact with the Dapr sidecar and the result should +be returned from the configuration request successfully. + +``` +Configuration value: ConfigurationItem { value: "world", version: "", metadata: {} } +``` diff --git a/examples/resiliency/instance/main.rs b/examples/resiliency/instance/main.rs new file mode 100644 index 00000000..3cdf60b5 --- /dev/null +++ b/examples/resiliency/instance/main.rs @@ -0,0 +1,47 @@ +use std::{ + thread, + time::{Duration, Instant}, +}; + +const CONFIGSTORE_NAME: &str = "configstore"; +type DaprClient = dapr::Client; + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Set the Dapr address + let addr = "https://127.0.0.1".to_string(); + + // Create the client + let start_time = Instant::now(); + let mut client = match DaprClient::connect(addr).await { + Ok(client) => { + println!("connected to dapr sidecar"); + client + } + Err(error) => { + panic!("failed to connect to dapr sidecar: {:?}", error) + } + }; + let client_start_duration = start_time.elapsed(); + println!("Client connection took: {:?}", client_start_duration); + + let key = String::from("hello"); + + // get key-value pair in the state store + let response = client + .get_configuration(CONFIGSTORE_NAME, vec![(&key)], None) + .await?; + let val = response.items.get("hello").unwrap(); + println!("Configuration value: {val:?}"); + + thread::sleep(Duration::from_secs(10)); + println!("app slept for 15 seconds"); + + let response = client + .get_configuration(CONFIGSTORE_NAME, vec![(&key)], None) + .await?; + let val = response.items.get("hello").unwrap(); + println!("Configuration value: {val:?}"); + + Ok(()) +} diff --git a/examples/resiliency/simple/README.md b/examples/resiliency/simple/README.md new file mode 100644 index 00000000..b1112786 --- /dev/null +++ b/examples/resiliency/simple/README.md @@ -0,0 +1,98 @@ +This example validates the resiliency and does not demonstrate any extra +functionality. It is based off the configuration example to connect to the +sidecar and make a call for a configuration item stored in redis. + +1. Insert a key with the value `hello` to redis using the following command: + + + + +```bash +docker exec dapr_redis redis-cli MSET hello "world" +``` + + + +2. Run the example without the sidecar using the following command: + + + +```bash +cargo run --example resiliency-simple +``` + + + +The result should be that the request will fail. + +3. Run the example without the sidecar (this time in the background) + + + +```bash +cargo run --example resiliency-simple +``` + + + + + +4. Run the Dapr sidecar + + + +```bash +dapr run --app-id=rustapp --resources-path ../../components --dapr-grpc-port 3500 +``` + + + +The example app should make contact with the Dapr sidecar and the result should +be returned from the configuration request successfully. + +``` +Configuration value: ConfigurationItem { value: "world", version: "", metadata: {} } +``` diff --git a/examples/resiliency/simple/main.rs b/examples/resiliency/simple/main.rs new file mode 100644 index 00000000..c3e2a2f6 --- /dev/null +++ b/examples/resiliency/simple/main.rs @@ -0,0 +1,35 @@ +use std::time::Instant; + +const CONFIGSTORE_NAME: &str = "configstore"; +type DaprClient = dapr::Client; + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Set the Dapr address + let addr = "https://127.0.0.1".to_string(); + + // Create the client + let start_time = Instant::now(); + let mut client = match DaprClient::connect(addr).await { + Ok(client) => { + println!("connected to dapr sidecar"); + client + } + Err(error) => { + panic!("failed to connect to dapr sidecar: {:?}", error) + } + }; + let client_start_duration = start_time.elapsed(); + println!("Client connection took: {client_start_duration:?}"); + + let key = String::from("hello"); + + // get key-value pair in the state store + let response = client + .get_configuration(CONFIGSTORE_NAME, vec![(&key)], None) + .await?; + let val = response.items.get("hello").unwrap(); + println!("Configuration value: {val:?}"); + + Ok(()) +} diff --git a/src/client.rs b/src/client.rs index 7ba101bf..59030406 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,5 +1,6 @@ +use backon::{ExponentialBuilder, Retryable}; use serde_json::Value; -use std::collections::HashMap; +use std::{collections::HashMap, env, time::Duration}; use async_trait::async_trait; use futures::StreamExt; @@ -552,7 +553,22 @@ pub trait DaprInterface: Sized { #[async_trait] impl DaprInterface for dapr_v1::dapr_client::DaprClient { async fn connect(addr: String) -> Result { - Ok(dapr_v1::dapr_client::DaprClient::connect(addr).await?) + const ENV_DAPR_API_MAX_RETRIES: &str = "DAPR_API_MAX_RETRIES"; + let dapr_api_max_retries = + env::var(ENV_DAPR_API_MAX_RETRIES).unwrap_or_else(|_| "0".to_owned()); + + let retry_policy = ExponentialBuilder::default() + .with_jitter() + .with_min_delay(Duration::from_secs(1)) + .with_max_delay(Duration::from_secs(5)) + .with_max_times(dapr_api_max_retries.parse::().unwrap()); + + let client = (|| dapr_v1::dapr_client::DaprClient::connect(addr.to_owned())) + .retry(&retry_policy) + .await + .expect("client connection"); + + Ok(client) } async fn invoke_service(