Skip to content

Commit 49c74b2

Browse files
committed
remove Sync trait for Mono and Flux.
1 parent 4a4d2cd commit 49c74b2

File tree

3 files changed

+27
-29
lines changed

3 files changed

+27
-29
lines changed

examples/Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,7 @@ log = "0.4.8"
1010
env_logger = "0.7.1"
1111
futures = "0.3.5"
1212
clap = "2.33.1"
13-
postgres = "0.17.3"
14-
r2d2_postgres = "0.16.0"
13+
tokio-postgres = "0.5.4"
1514

1615
[dev-dependencies.rsocket_rust]
1716
path = "../rsocket"

examples/postgres.rs

Lines changed: 24 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,36 @@
11
#[macro_use]
22
extern crate log;
33

4-
use postgres::NoTls;
5-
use r2d2_postgres::{
6-
r2d2::{self, Pool},
7-
PostgresConnectionManager,
8-
};
94
use rsocket_rust::{error::RSocketError, prelude::*};
105
use rsocket_rust_transport_tcp::TcpServerTransport;
116
use std::error::Error;
7+
use std::sync::Arc;
8+
use tokio_postgres::{Client as PgClient, NoTls};
129

1310
#[tokio::main]
1411
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
1512
env_logger::builder().format_timestamp_millis().init();
16-
let dao = Dao::try_new().expect("Connect failed!");
13+
let dao = Dao::try_new().await?;
1714
RSocketFactory::receive()
18-
.acceptor(Box::new(move |_, _| {
19-
info!("accept new socket!");
20-
Ok(Box::new(dao.clone()))
21-
}))
15+
.acceptor(Box::new(move |_, _| Ok(Box::new(dao.clone()))))
2216
.on_start(Box::new(|| info!("server start success!!!")))
2317
.transport(TcpServerTransport::from("127.0.0.1:7878"))
2418
.serve()
2519
.await
2620
}
2721

28-
#[derive(Clone, Debug)]
22+
#[derive(Clone)]
2923
struct Dao {
30-
pool: Pool<PostgresConnectionManager<NoTls>>,
24+
client: Arc<PgClient>,
3125
}
3226

3327
impl RSocket for Dao {
3428
fn request_response(&self, _: Payload) -> Mono<Result<Payload, RSocketError>> {
35-
let pool = self.pool.clone();
29+
let client = self.client.clone();
3630
Box::pin(async move {
37-
// TODO: something wrong here!!!
38-
let mut client = pool.get().expect("Get client from pool failed!");
3931
let row = client
4032
.query_one("SELECT 'world' AS hello", &[])
33+
.await
4134
.expect("Execute SQL failed!");
4235
let result: String = row.get("hello");
4336
Ok(Payload::builder().set_data_utf8(&result).build())
@@ -65,16 +58,22 @@ impl RSocket for Dao {
6558
}
6659

6760
impl Dao {
68-
fn try_new() -> Result<Dao, Box<dyn Error + Sync + Send>> {
69-
let manager: PostgresConnectionManager<NoTls> = PostgresConnectionManager::new(
70-
"host=localhost user=postgres password=postgres"
71-
.parse()
72-
.unwrap(),
73-
NoTls,
74-
);
75-
let pool: Pool<PostgresConnectionManager<NoTls>> =
76-
r2d2::Pool::new(manager).expect("Create pool failed!");
61+
async fn try_new() -> Result<Dao, Box<dyn Error + Sync + Send>> {
62+
let (client, connection) =
63+
tokio_postgres::connect("host=localhost user=postgres password=postgres", NoTls)
64+
.await?;
65+
66+
// The connection object performs the actual communication with the database,
67+
// so spawn it off to run on its own.
68+
tokio::spawn(async move {
69+
if let Err(e) = connection.await {
70+
eprintln!("connection error: {}", e);
71+
}
72+
});
73+
7774
info!("==> create postgres pool success!");
78-
Ok(Dao { pool })
75+
Ok(Dao {
76+
client: Arc::new(client),
77+
})
7978
}
8079
}

rsocket/src/spi.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ pub type ServerResponder = Box<
1818
+ Fn(SetupPayload, Box<dyn RSocket>) -> Result<Box<dyn RSocket>, Box<dyn Error>>,
1919
>;
2020

21-
pub type Mono<T> = Pin<Box<dyn Send + Sync + Future<Output = T>>>;
22-
pub type Flux<T> = Pin<Box<dyn Send + Sync + Stream<Item = T>>>;
21+
pub type Mono<T> = Pin<Box<dyn Send + Future<Output = T>>>;
22+
pub type Flux<T> = Pin<Box<dyn Send + Stream<Item = T>>>;
2323

2424
pub trait RSocket: Sync + Send {
2525
fn metadata_push(&self, req: Payload) -> Mono<()>;

0 commit comments

Comments
 (0)