@@ -32,12 +32,11 @@ async fn main() -> Result<()> {
3232 . transport (TcpServerTransport :: from (" 127.0.0.1:7878" ))
3333 . acceptor (Box :: new (| setup , _socket | {
3434 println! (" accept setup: {:?}" , setup );
35- // Use EchoRSocket as example RSocket, you can implement RSocket trait.
3635 Ok (Box :: new (EchoRSocket ))
3736 // Or you can reject setup
3837 // Err(From::from("SETUP_NOT_ALLOW"))
3938 }))
40- . on_start (|| println! (" +++++++ echo server started! +++++++" ))
39+ . on_start (Box :: new ( || println! (" +++++++ echo server started! +++++++" ) ))
4140 . serve ()
4241 . await
4342}
@@ -56,6 +55,7 @@ async fn main() -> Result<()> {
5655 . transport (TcpClientTransport :: from (" 127.0.0.1:7878" ))
5756 . setup (Payload :: from (" READY!" ))
5857 . mime_type (" text/plain" , " text/plain" )
58+ . on_close (Box :: new (|| println! (" connection closed" )))
5959 . start ()
6060 . await ? ;
6161 let req = Payload :: builder ()
@@ -64,8 +64,76 @@ async fn main() -> Result<()> {
6464 . build ();
6565 let res = cli . request_response (req ). await ? ;
6666 println! (" got: {:?}" , res );
67- cli . close ();
67+
68+ // If you want to block until socket disconnected.
69+ cli . wait_for_close (). await ;
70+
6871 Ok (())
6972}
73+ ```
74+
75+ ### Implement RSocket trait
76+
77+ Example for access Redis([ crates] ( https://crates.io/crates/redis ) ):
78+
79+ > NOTICE: add dependency in Cargo.toml => redis = { version = "0.19.0", features = [ "aio" ] }
80+
81+ ``` rust
82+ use std :: str :: FromStr ;
83+
84+ use redis :: Client as RedisClient ;
85+ use rsocket_rust :: async_trait;
86+ use rsocket_rust :: prelude :: * ;
87+ use rsocket_rust :: Result ;
88+
89+ #[derive(Clone )]
90+ pub struct RedisDao {
91+ inner : RedisClient ,
92+ }
93+
94+ // Create RedisDao from str.
95+ // Example: RedisDao::from_str("redis://127.0.0.1").expect("Connect redis failed!");
96+ impl FromStr for RedisDao {
97+ type Err = redis :: RedisError ;
98+
99+ fn from_str (s : & str ) -> std :: result :: Result <Self , Self :: Err > {
100+ let client = redis :: Client :: open (s )? ;
101+ Ok (RedisDao { inner : client })
102+ }
103+ }
104+
105+ #[async_trait]
106+ impl RSocket for RedisDao {
107+ async fn request_response (& self , req : Payload ) -> Result <Option <Payload >> {
108+ let client = self . inner. clone ();
109+ let mut conn = client . get_async_connection (). await ? ;
110+ let value : redis :: RedisResult <Option <String >> = redis :: cmd (" GET" )
111+ . arg (& [req . data_utf8 ()])
112+ . query_async (& mut conn )
113+ . await ;
114+ match value {
115+ Ok (Some (value )) => Ok (Some (Payload :: builder (). set_data_utf8 (& value ). build ())),
116+ Ok (None ) => Ok (None ),
117+ Err (e ) => Err (e . into ()),
118+ }
119+ }
120+
121+ async fn metadata_push (& self , _req : Payload ) -> Result <()> {
122+ todo! ()
123+ }
124+
125+ async fn fire_and_forget (& self , _req : Payload ) -> Result <()> {
126+ todo! ()
127+ }
128+
129+ fn request_stream (& self , _req : Payload ) -> Flux <Result <Payload >> {
130+ todo! ()
131+ }
132+
133+ fn request_channel (& self , _reqs : Flux <Result <Payload >>) -> Flux <Result <Payload >> {
134+ todo! ()
135+ }
136+ }
70137
71138```
139+
0 commit comments