1
1
use async_compat:: Compat ;
2
2
use futures:: future:: { self } ;
3
3
use futures_util:: FutureExt ;
4
+ use http_body_util:: Empty ;
5
+ use hyper:: body:: Bytes ;
6
+ use hyper_util:: rt:: TokioIo ;
4
7
use nginx_sys:: { ngx_http_core_loc_conf_t, NGX_LOG_ERR } ;
5
8
use ngx:: async_:: resolver:: Resolver ;
6
9
use ngx:: async_:: { spawn, Task } ;
@@ -12,13 +15,14 @@ use std::ptr::{addr_of, addr_of_mut, NonNull};
12
15
use std:: sync:: atomic:: { AtomicPtr , Ordering } ;
13
16
use std:: task:: Poll ;
14
17
use std:: time:: Instant ;
18
+ use tokio:: net:: TcpStream ;
15
19
16
20
use ngx:: core:: { self , Pool , Status } ;
17
21
use ngx:: ffi:: {
18
22
ngx_array_push, ngx_command_t, ngx_conf_t, ngx_connection_t, ngx_http_handler_pt,
19
23
ngx_http_module_t, ngx_http_phases_NGX_HTTP_ACCESS_PHASE, ngx_int_t, ngx_module_t,
20
- ngx_post_event, ngx_posted_events, ngx_str_t, ngx_uint_t,
21
- NGX_CONF_TAKE1 , NGX_HTTP_LOC_CONF , NGX_HTTP_LOC_CONF_OFFSET , NGX_HTTP_MODULE , NGX_LOG_EMERG ,
24
+ ngx_post_event, ngx_posted_events, ngx_str_t, ngx_uint_t, NGX_CONF_TAKE1 , NGX_HTTP_LOC_CONF ,
25
+ NGX_HTTP_LOC_CONF_OFFSET , NGX_HTTP_MODULE , NGX_LOG_EMERG ,
22
26
} ;
23
27
use ngx:: http:: { self , HTTPStatus , HttpModule , MergeConfigError , Request } ;
24
28
use ngx:: http:: { HttpModuleLocationConf , HttpModuleMainConf , NgxHttpCoreModule } ;
@@ -167,16 +171,54 @@ async fn resolve_something(
167
171
)
168
172
}
169
173
170
- async fn request_something ( uri : & str ) -> ( String , String ) {
174
+ async fn reqwest_something ( ) -> ( String , String ) {
171
175
let start = Instant :: now ( ) ;
172
- let _ = reqwest:: get ( uri )
176
+ let _ = reqwest:: get ( "https://example.com" )
173
177
. await
174
178
. expect ( "response" )
175
179
. text ( )
176
180
. await
177
181
. expect ( "body" ) ;
178
182
(
179
- format ! ( "X-Request-Time-{uri}" ) ,
183
+ "X-Reqwest-Time" . to_string ( ) ,
184
+ start. elapsed ( ) . as_millis ( ) . to_string ( ) ,
185
+ )
186
+ }
187
+
188
+ async fn hyper_something ( ) -> ( String , String ) {
189
+ let start = Instant :: now ( ) ;
190
+ let url = "http://httpbin.org/ip" . parse :: < hyper:: Uri > ( ) . expect ( "uri" ) ;
191
+ let host = url. host ( ) . expect ( "uri has no host" ) ;
192
+ let port = url. port_u16 ( ) . unwrap_or ( 80 ) ;
193
+
194
+ let address = format ! ( "{}:{}" , host, port) ;
195
+
196
+ let stream = TcpStream :: connect ( address) . await . expect ( "connect" ) ;
197
+
198
+ let io = TokioIo :: new ( stream) ;
199
+
200
+ // Create the Hyper client
201
+ let ( mut sender, conn) = hyper:: client:: conn:: http1:: handshake ( io)
202
+ . await
203
+ . expect ( "handshake" ) ;
204
+ // Spawn a task to poll the connection, driving the HTTP state
205
+ let http_task = spawn ( async move {
206
+ if let Err ( err) = conn. await {
207
+ println ! ( "Connection failed: {:?}" , err) ;
208
+ }
209
+ } ) ;
210
+ let authority = url. authority ( ) . unwrap ( ) . clone ( ) ;
211
+ let req = hyper:: Request :: builder ( )
212
+ . uri ( url)
213
+ . header ( hyper:: header:: HOST , authority. as_str ( ) )
214
+ . body ( Empty :: < Bytes > :: new ( ) )
215
+ . expect ( "body" ) ;
216
+ let _ = sender. send_request ( req) . await . expect ( "response" ) ;
217
+
218
+ http_task. cancel ( ) . await ;
219
+
220
+ (
221
+ "X-Hyper-Time" . to_string ( ) ,
180
222
start. elapsed ( ) . as_millis ( ) . to_string ( ) ,
181
223
)
182
224
}
@@ -197,8 +239,9 @@ async fn async_access(request: &mut Request) -> Status {
197
239
// yield_now
198
240
Box :: pin( waste_yield( ) ) ,
199
241
// reqwest
200
- Box :: pin( request_something( "https://example.com" ) ) ,
201
- Box :: pin( request_something( "https://example.org" ) ) ,
242
+ Box :: pin( reqwest_something( ) ) ,
243
+ // hyper
244
+ Box :: pin( hyper_something( ) ) ,
202
245
] ;
203
246
for ( header, value) in futures:: future:: join_all ( futs) . await {
204
247
request. add_header_out ( & header, & value) ;
0 commit comments