Skip to content

Commit 80e2d16

Browse files
committed
add redis example.
1 parent 49c74b2 commit 80e2d16

File tree

4 files changed

+75
-1
lines changed

4 files changed

+75
-1
lines changed

examples/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ env_logger = "0.7.1"
1111
futures = "0.3.5"
1212
clap = "2.33.1"
1313
tokio-postgres = "0.5.4"
14+
redis = "0.16.0"
1415

1516
[dev-dependencies.rsocket_rust]
1617
path = "../rsocket"
@@ -46,3 +47,7 @@ path = "qps.rs"
4647
[[example]]
4748
name = "postgres"
4849
path = "postgres.rs"
50+
51+
[[example]]
52+
name = "redis"
53+
path = "redis.rs"

examples/docker-compose.yml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,12 @@ services:
1010
- 5432:5432
1111
volumes:
1212
- pgdata:/var/lib/postgresql/data
13+
redis:
14+
image: "redis:3-alpine"
15+
ports:
16+
- "6379:6379"
17+
command: ["redis-server", "--appendonly", "yes"]
1318

1419
volumes:
1520
pgdata:
1621
driver: local
17-

examples/redis.rs

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
use redis::Client as RedisClient;
2+
use rsocket_rust::{error::RSocketError, prelude::*};
3+
use rsocket_rust_transport_tcp::TcpServerTransport;
4+
use std::error::Error;
5+
use std::str::FromStr;
6+
7+
#[derive(Clone)]
8+
struct RedisDao {
9+
inner: RedisClient,
10+
}
11+
12+
#[tokio::main]
13+
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
14+
let dao = RedisDao::from_str("redis://127.0.0.1").expect("Connect redis failed!");
15+
RSocketFactory::receive()
16+
.acceptor(Box::new(move |_setup, _socket| Ok(Box::new(dao.clone()))))
17+
.transport(TcpServerTransport::from("127.0.0.1:7878"))
18+
.serve()
19+
.await
20+
}
21+
22+
impl FromStr for RedisDao {
23+
type Err = redis::RedisError;
24+
25+
fn from_str(s: &str) -> Result<Self, Self::Err> {
26+
let client = redis::Client::open(s)?;
27+
Ok(RedisDao { inner: client })
28+
}
29+
}
30+
31+
impl RSocket for RedisDao {
32+
fn request_response(&self, req: Payload) -> Mono<Result<Payload, RSocketError>> {
33+
let client = self.inner.clone();
34+
35+
Box::pin(async move {
36+
let mut conn: redis::aio::Connection = client
37+
.get_async_connection()
38+
.await
39+
.expect("Connect redis failed!");
40+
let value: String = redis::cmd("GET")
41+
.arg(&[req.data_utf8()])
42+
.query_async(&mut conn)
43+
.await
44+
.unwrap_or("<nil>".to_owned());
45+
Ok(Payload::builder().set_data_utf8(&value).build())
46+
})
47+
}
48+
49+
fn metadata_push(&self, _req: Payload) -> Mono<()> {
50+
unimplemented!()
51+
}
52+
fn fire_and_forget(&self, _req: Payload) -> Mono<()> {
53+
unimplemented!()
54+
}
55+
fn request_stream(&self, _req: Payload) -> Flux<Result<Payload, RSocketError>> {
56+
unimplemented!()
57+
}
58+
fn request_channel(
59+
&self,
60+
_reqs: Flux<Result<Payload, RSocketError>>,
61+
) -> Flux<Result<Payload, RSocketError>> {
62+
unimplemented!()
63+
}
64+
}

rsocket/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ log = "0.4.8"
1414
bytes = "0.5.4"
1515
futures = "0.3.5"
1616
lazy_static = "1.4.0"
17+
async-trait = "0.1.33"
1718

1819
[dependencies.tokio]
1920
version = "0.2.21"

0 commit comments

Comments
 (0)