@@ -3,19 +3,92 @@ extern crate log;
33
44use clap:: { App , Arg , SubCommand } ;
55use rsocket_rust:: prelude:: * ;
6- use rsocket_rust_transport_tcp:: { TcpClientTransport , TcpServerTransport } ;
7- use std:: error:: Error ;
6+ use rsocket_rust:: transport:: Connection ;
7+ use rsocket_rust_transport_tcp:: {
8+ TcpClientTransport , TcpServerTransport , UnixClientTransport , UnixServerTransport ,
9+ } ;
10+ use rsocket_rust_transport_websocket:: { WebsocketClientTransport , WebsocketServerTransport } ;
811use std:: fs;
912
13+ type Result < T > = rsocket_rust:: Result < T > ;
14+
1015enum RequestMode {
1116 FNF ,
1217 REQUEST ,
1318 STREAM ,
1419 CHANNEL ,
1520}
1621
22+ async fn serve < A , B > ( transport : A , mtu : usize ) -> Result < ( ) >
23+ where
24+ A : Send + Sync + ServerTransport < Item = B > + ' static ,
25+ B : Send + Sync + Transport + ' static ,
26+ {
27+ RSocketFactory :: receive ( )
28+ . transport ( transport)
29+ . fragment ( mtu)
30+ . acceptor ( Box :: new ( |setup, _socket| {
31+ info ! ( "accept setup: {:?}" , setup) ;
32+ Ok ( Box :: new ( EchoRSocket ) )
33+ // Or you can reject setup
34+ // Err(From::from("SETUP_NOT_ALLOW"))
35+ } ) )
36+ . on_start ( Box :: new ( || info ! ( "+++++++ echo server started! +++++++" ) ) )
37+ . serve ( )
38+ . await
39+ }
40+
41+ async fn connect < A , B > ( transport : A , mtu : usize , req : Payload , mode : RequestMode ) -> Result < ( ) >
42+ where
43+ A : Send + Sync + Transport < Conn = B > + ' static ,
44+ B : Send + Sync + Connection + ' static ,
45+ {
46+ let cli = RSocketFactory :: connect ( )
47+ . fragment ( mtu)
48+ . transport ( transport)
49+ . start ( )
50+ . await ?;
51+
52+ match mode {
53+ RequestMode :: FNF => {
54+ cli. fire_and_forget ( req) . await ;
55+ }
56+ RequestMode :: STREAM => {
57+ let mut results = cli. request_stream ( req) ;
58+ loop {
59+ match results. next ( ) . await {
60+ Some ( Ok ( v) ) => info ! ( "{:?}" , v) ,
61+ Some ( Err ( e) ) => {
62+ error ! ( "STREAM_RESPONSE FAILED: {:?}" , e) ;
63+ break ;
64+ }
65+ None => break ,
66+ }
67+ }
68+ }
69+ RequestMode :: CHANNEL => {
70+ let mut results = cli. request_channel ( Box :: pin ( futures:: stream:: iter ( vec ! [ Ok ( req) ] ) ) ) ;
71+ loop {
72+ match results. next ( ) . await {
73+ Some ( Ok ( v) ) => info ! ( "{:?}" , v) ,
74+ Some ( Err ( e) ) => {
75+ error ! ( "CHANNEL_RESPONSE FAILED: {:?}" , e) ;
76+ break ;
77+ }
78+ None => break ,
79+ }
80+ }
81+ }
82+ RequestMode :: REQUEST => {
83+ let res = cli. request_response ( req) . await . expect ( "Request failed!" ) ;
84+ info ! ( "{:?}" , res) ;
85+ }
86+ }
87+ Ok ( ( ) )
88+ }
89+
1790#[ tokio:: main]
18- async fn main ( ) -> Result < ( ) , Box < dyn Error + Send + Sync > > {
91+ async fn main ( ) -> Result < ( ) > {
1992 env_logger:: builder ( ) . format_timestamp_millis ( ) . init ( ) ;
2093
2194 let cli = App :: new ( "echo" )
@@ -106,18 +179,25 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
106179 . value_of ( "mtu" )
107180 . map ( |it| it. parse ( ) . expect ( "Invalid mtu string!" ) )
108181 . unwrap_or ( 0 ) ;
109- RSocketFactory :: receive ( )
110- . transport ( TcpServerTransport :: from ( addr) )
111- . fragment ( mtu)
112- . acceptor ( Box :: new ( |setup, _socket| {
113- info ! ( "accept setup: {:?}" , setup) ;
114- Ok ( Box :: new ( EchoRSocket ) )
115- // Or you can reject setup
116- // Err(From::from("SETUP_NOT_ALLOW"))
117- } ) )
118- . on_start ( Box :: new ( || info ! ( "+++++++ echo server started! +++++++" ) ) )
119- . serve ( )
120- . await
182+
183+ if addr. starts_with ( "ws://" ) {
184+ serve ( WebsocketServerTransport :: from ( addr) , mtu) . await
185+ } else if addr. starts_with ( "unix://" ) {
186+ let addr_owned = addr. to_owned ( ) ;
187+ tokio:: spawn ( async move {
188+ let _ = serve ( UnixServerTransport :: from ( addr_owned) , mtu) . await ;
189+ } ) ;
190+ let sockfile = addr. chars ( ) . skip ( 7 ) . collect :: < String > ( ) ;
191+ // Watch signal
192+ tokio:: signal:: ctrl_c ( ) . await ?;
193+ info ! ( "ctrl-c received!" ) ;
194+ if let Err ( e) = std:: fs:: remove_file ( & sockfile) {
195+ error ! ( "remove unix sock file failed: {}" , e) ;
196+ }
197+ Ok ( ( ) )
198+ } else {
199+ serve ( TcpServerTransport :: from ( addr) , mtu) . await
200+ }
121201 }
122202 ( "connect" , Some ( flags) ) => {
123203 let mut modes: Vec < RequestMode > = vec ! [ ] ;
@@ -147,12 +227,6 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
147227 . unwrap_or ( 0 ) ;
148228
149229 let addr = flags. value_of ( "URL" ) . expect ( "Missing URL" ) ;
150- let cli = RSocketFactory :: connect ( )
151- . fragment ( mtu)
152- . transport ( TcpClientTransport :: from ( addr) )
153- . start ( )
154- . await
155- . expect ( "Connect failed!" ) ;
156230 let mut bu = Payload :: builder ( ) ;
157231 if let Some ( data) = flags. value_of ( "input" ) {
158232 if data. starts_with ( "@" ) {
@@ -164,45 +238,14 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
164238 }
165239 }
166240 let req = bu. build ( ) ;
167-
168- match modes. pop ( ) . unwrap_or ( RequestMode :: REQUEST ) {
169- RequestMode :: FNF => {
170- cli. fire_and_forget ( req) . await ;
171- }
172- RequestMode :: STREAM => {
173- let mut results = cli. request_stream ( req) ;
174- loop {
175- match results. next ( ) . await {
176- Some ( Ok ( v) ) => info ! ( "{:?}" , v) ,
177- Some ( Err ( e) ) => {
178- error ! ( "STREAM_RESPONSE FAILED: {:?}" , e) ;
179- break ;
180- }
181- None => break ,
182- }
183- }
184- }
185- RequestMode :: CHANNEL => {
186- let mut results =
187- cli. request_channel ( Box :: pin ( futures:: stream:: iter ( vec ! [ Ok ( req) ] ) ) ) ;
188- loop {
189- match results. next ( ) . await {
190- Some ( Ok ( v) ) => info ! ( "{:?}" , v) ,
191- Some ( Err ( e) ) => {
192- error ! ( "CHANNEL_RESPONSE FAILED: {:?}" , e) ;
193- break ;
194- }
195- None => break ,
196- }
197- }
198- }
199- RequestMode :: REQUEST => {
200- let res = cli. request_response ( req) . await . expect ( "Request failed!" ) ;
201- info ! ( "{:?}" , res) ;
202- }
241+ let mode = modes. pop ( ) . unwrap_or ( RequestMode :: REQUEST ) ;
242+ if addr. starts_with ( "ws://" ) {
243+ connect ( WebsocketClientTransport :: from ( addr) , mtu, req, mode) . await
244+ } else if addr. starts_with ( "unix://" ) {
245+ connect ( UnixClientTransport :: from ( addr) , mtu, req, mode) . await
246+ } else {
247+ connect ( TcpClientTransport :: from ( addr) , mtu, req, mode) . await
203248 }
204-
205- Ok ( ( ) )
206249 }
207250 _ => Ok ( ( ) ) ,
208251 }
0 commit comments