Skip to content

Commit 7cfcf7b

Browse files
committed
Address review
1 parent e6afa5f commit 7cfcf7b

File tree

10 files changed

+39
-44
lines changed

10 files changed

+39
-44
lines changed

grpc/Cargo.toml

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,7 @@ authors = ["gRPC Authors"]
66
license = "MIT"
77

88
[package.metadata.cargo_check_external_types]
9-
allowed_external_types = [
10-
"tonic::*",
11-
"futures_core::stream::Stream",
12-
"tokio::sync::oneshot::Sender",
13-
]
9+
allowed_external_types = ["tonic::*", "tokio::sync::oneshot::Sender"]
1410

1511
[features]
1612
default = ["dns", "_runtime-tokio"]
@@ -22,16 +18,13 @@ dns = ["dep:hickory-resolver", "_runtime-tokio"]
2218
_runtime-tokio = [
2319
"tokio/rt",
2420
"tokio/net",
25-
"dep:futures",
21+
"tokio/time",
2622
"dep:socket2",
2723
"dep:tower",
2824
]
2925

3026
[dependencies]
3127
bytes = "1.10.1"
32-
futures = { version = "0.3.31", optional = true }
33-
futures-core = "0.3.31"
34-
futures-util = "0.3.31"
3528
hickory-resolver = { version = "0.25.1", optional = true }
3629
http = "1.1.0"
3730
http-body = "1.0.1"
@@ -43,8 +36,8 @@ rand = "0.9"
4336
serde = { version = "1.0.219", features = ["derive"] }
4437
serde_json = "1.0.140"
4538
socket2 = { version = "0.5.10", optional = true }
46-
tokio = { version = "1.37.0", features = ["sync", "time", "macros"] }
47-
tokio-stream = "0.1.17"
39+
tokio = { version = "1.37.0", features = ["sync", "macros"] }
40+
tokio-stream = { version = "0.1.17", default-features = false }
4841
tonic = { version = "0.14.0", path = "../tonic", default-features = false, features = [
4942
"codegen",
5043
] }

grpc/examples/inmemory.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
use std::any::Any;
22

3-
use futures_util::stream::StreamExt;
43
use grpc::service::{Message, Request, Response, Service};
54
use grpc::{client::ChannelOptions, inmemory};
5+
use tokio_stream::StreamExt;
66
use tonic::async_trait;
77

88
struct Handler {}

grpc/examples/multiaddr.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
use std::any::Any;
22

3-
use futures_util::StreamExt;
43
use grpc::service::{Message, Request, Response, Service};
54
use grpc::{client::ChannelOptions, inmemory};
5+
use tokio_stream::StreamExt;
66
use tonic::async_trait;
77

88
struct Handler {

grpc/src/client/load_balancing/pick_first.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ use std::{
44
time::Duration,
55
};
66

7-
use tokio::time::sleep;
87
use tonic::metadata::MetadataMap;
98

109
use crate::{
@@ -75,9 +74,10 @@ impl LbPolicy for PickFirstPolicy {
7574

7675
self.next_addresses = addresses;
7776
let work_scheduler = self.work_scheduler.clone();
77+
let runtime = self.runtime.clone();
7878
// TODO: Implement Drop that cancels this task.
7979
self.runtime.spawn(Box::pin(async move {
80-
sleep(Duration::from_millis(200)).await;
80+
runtime.sleep(Duration::from_millis(200)).await;
8181
work_scheduler.schedule_work();
8282
}));
8383
// TODO: return a picker that queues RPCs.

grpc/src/client/subchannel.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,15 @@ use crate::{
1515
service::{Request, Response, Service},
1616
};
1717
use core::panic;
18+
use std::time::{Duration, Instant};
1819
use std::{
1920
collections::BTreeMap,
2021
error::Error,
2122
fmt::{Debug, Display},
2223
ops::Sub,
2324
sync::{Arc, Mutex, RwLock, Weak},
2425
};
25-
use tokio::{
26-
sync::{mpsc, oneshot, watch, Notify},
27-
time::{Duration, Instant},
28-
};
26+
use tokio::sync::{mpsc, oneshot, watch, Notify};
2927
use tonic::async_trait;
3028

3129
type SharedService = Arc<dyn Service>;
@@ -358,7 +356,7 @@ impl InternalSubchannel {
358356

359357
let connect_task = self.runtime.spawn(Box::pin(async move {
360358
tokio::select! {
361-
_ = tokio::time::sleep(min_connect_timeout) => {
359+
_ = runtime.sleep(min_connect_timeout) => {
362360
let _ = state_machine_tx.send(SubchannelStateMachineEvent::ConnectionTimedOut);
363361
}
364362
result = transport.connect(address.to_string().clone(), runtime, &transport_opts) => {
@@ -400,7 +398,7 @@ impl InternalSubchannel {
400398
// terminated? But what can we do with that error other than logging
401399
// it, which the transport can do as well?
402400
if let Err(e) = closed_rx.await {
403-
eprintln!("Transport closed with error: {}", e.to_string())
401+
eprintln!("Transport closed with error: {e}",)
404402
};
405403
let _ = state_machine_tx.send(SubchannelStateMachineEvent::ConnectionTerminated);
406404
}));
@@ -430,8 +428,11 @@ impl InternalSubchannel {
430428

431429
let backoff_interval = self.backoff.backoff_until();
432430
let state_machine_tx = self.state_machine_event_sender.clone();
431+
let runtime = self.runtime.clone();
433432
let backoff_task = self.runtime.spawn(Box::pin(async move {
434-
tokio::time::sleep_until(backoff_interval).await;
433+
runtime
434+
.sleep(backoff_interval.saturating_duration_since(Instant::now()))
435+
.await;
435436
let _ = state_machine_tx.send(SubchannelStateMachineEvent::BackoffExpired);
436437
}));
437438
let mut inner = self.inner.lock().unwrap();

grpc/src/client/transport/mod.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,18 +24,18 @@ pub(crate) struct ConnectedTransport {
2424
// can hold config relevant to a particular transport.
2525
#[derive(Default)]
2626
pub(crate) struct TransportOptions {
27-
pub init_stream_window_size: Option<u32>,
28-
pub init_connection_window_size: Option<u32>,
29-
pub http2_keep_alive_interval: Option<Duration>,
30-
pub http2_keep_alive_timeout: Option<Duration>,
31-
pub http2_keep_alive_while_idle: Option<bool>,
32-
pub http2_max_header_list_size: Option<u32>,
33-
pub http2_adaptive_window: Option<bool>,
34-
pub concurrency_limit: Option<usize>,
35-
pub rate_limit: Option<(u64, Duration)>,
36-
pub tcp_keepalive: Option<Duration>,
37-
pub tcp_nodelay: bool,
38-
pub connect_deadline: Option<Instant>,
27+
pub(crate) init_stream_window_size: Option<u32>,
28+
pub(crate) init_connection_window_size: Option<u32>,
29+
pub(crate) http2_keep_alive_interval: Option<Duration>,
30+
pub(crate) http2_keep_alive_timeout: Option<Duration>,
31+
pub(crate) http2_keep_alive_while_idle: Option<bool>,
32+
pub(crate) http2_max_header_list_size: Option<u32>,
33+
pub(crate) http2_adaptive_window: Option<bool>,
34+
pub(crate) concurrency_limit: Option<usize>,
35+
pub(crate) rate_limit: Option<(u64, Duration)>,
36+
pub(crate) tcp_keepalive: Option<Duration>,
37+
pub(crate) tcp_nodelay: bool,
38+
pub(crate) connect_deadline: Option<Instant>,
3939
}
4040

4141
#[async_trait]

grpc/src/client/transport/registry.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,12 @@ use std::{
1010
/// the address type they are intended to handle.
1111
#[derive(Default, Clone)]
1212
pub(crate) struct TransportRegistry {
13-
m: Arc<Mutex<HashMap<String, Arc<dyn Transport>>>>,
13+
inner: Arc<Mutex<HashMap<String, Arc<dyn Transport>>>>,
1414
}
1515

1616
impl Debug for TransportRegistry {
1717
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
18-
let m = self.m.lock().unwrap();
18+
let m = self.inner.lock().unwrap();
1919
for key in m.keys() {
2020
write!(f, "k: {key:?}")?
2121
}
@@ -31,15 +31,15 @@ impl TransportRegistry {
3131

3232
/// Add a transport into the registry.
3333
pub(crate) fn add_transport(&self, address_type: &str, transport: impl Transport + 'static) {
34-
self.m
34+
self.inner
3535
.lock()
3636
.unwrap()
3737
.insert(address_type.to_string(), Arc::new(transport));
3838
}
3939

4040
/// Retrieve a name resolver from the registry, or None if not found.
4141
pub(crate) fn get_transport(&self, address_type: &str) -> Result<Arc<dyn Transport>, String> {
42-
self.m
42+
self.inner
4343
.lock()
4444
.unwrap()
4545
.get(address_type)

grpc/src/client/transport/tonic/mod.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@ use crate::service::Message;
88
use crate::service::Request as GrpcRequest;
99
use crate::service::Response as GrpcResponse;
1010
use bytes::Bytes;
11-
use futures::stream::StreamExt;
12-
use futures::Stream;
1311
use http::uri::PathAndQuery;
1412
use http::Request as HttpRequest;
1513
use http::Response as HttpResponse;
@@ -27,6 +25,8 @@ use std::{
2725
sync::Arc,
2826
task::{Context, Poll},
2927
};
28+
use tokio_stream::Stream;
29+
use tokio_stream::StreamExt;
3030
use tonic::Request as TonicRequest;
3131
use tonic::Response as TonicResponse;
3232
use tonic::Streaming;
@@ -95,14 +95,14 @@ impl Service for TonicTransport {
9595

9696
/// Helper function to create an error response stream.
9797
fn create_error_response(status: Status) -> GrpcResponse {
98-
let stream = futures::stream::once(async { Err(status) });
98+
let stream = tokio_stream::once(Err(status));
9999
TonicResponse::new(Box::pin(stream))
100100
}
101101

102102
fn convert_request(req: GrpcRequest) -> TonicRequest<Pin<Box<dyn Stream<Item = Bytes> + Send>>> {
103103
let (metadata, extensions, stream) = req.into_parts();
104104

105-
let bytes_stream = Box::pin(stream.filter_map(|msg| async {
105+
let bytes_stream = Box::pin(stream.filter_map(|msg| {
106106
if let Ok(bytes) = (msg as Box<dyn Any>).downcast::<Bytes>() {
107107
Some(*bytes)
108108
} else {
@@ -119,7 +119,7 @@ fn convert_response(res: Result<TonicResponse<Streaming<Bytes>>, Status>) -> Grp
119119
let response = match res {
120120
Ok(s) => s,
121121
Err(e) => {
122-
let stream = futures::stream::once(async { Err(e) });
122+
let stream = tokio_stream::once(Err(e));
123123
return TonicResponse::new(Box::pin(stream));
124124
}
125125
};

grpc/src/inmemory/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ static INMEMORY_NETWORK_TYPE: &str = "inmemory";
128128

129129
pub fn reg() {
130130
GLOBAL_TRANSPORT_REGISTRY.add_transport(INMEMORY_NETWORK_TYPE, ClientTransport::new());
131+
global_registry().add_builder(Box::new(InMemoryResolverBuilder));
131132
}
132133

133134
struct InMemoryResolverBuilder;

grpc/src/service.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424

2525
use std::{any::Any, pin::Pin};
2626

27-
use futures_core::Stream;
27+
use tokio_stream::Stream;
2828
use tonic::{async_trait, Request as TonicRequest, Response as TonicResponse, Status};
2929

3030
pub type Request = TonicRequest<Pin<Box<dyn Stream<Item = Box<dyn Message>> + Send + Sync>>>;

0 commit comments

Comments
 (0)