Skip to content

Commit 38901f4

Browse files
committed
Fail build if tokio spawn is used outside runtime
1 parent e893238 commit 38901f4

File tree

8 files changed

+100
-33
lines changed

8 files changed

+100
-33
lines changed

grpc/Cargo.toml

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,39 +13,46 @@ allowed_external_types = [
1313
]
1414

1515
[features]
16-
default = ["dns"]
16+
default = ["dns", "_runtime-tokio"]
1717
dns = ["dep:hickory-resolver"]
18+
# The following feature is used to ensure all modules use the runtime
19+
# abstraction instead of using tokio directly.
20+
# Using tower/buffer enables tokio's rt feature even though it's possible to
21+
# create Buffers with a user provided executor.
22+
_runtime-tokio = [
23+
"tokio/rt",
24+
"tokio/net",
25+
"dep:futures",
26+
"dep:socket2",
27+
"dep:tower",
28+
]
1829

1930
[dependencies]
2031
bytes = "1.10.1"
21-
futures = "0.3.31"
32+
futures = { version = "0.3.31", optional = true }
2233
futures-core = "0.3.31"
2334
futures-util = "0.3.31"
2435
hickory-resolver = { version = "0.25.1", optional = true }
2536
http = "1.1.0"
2637
http-body = "1.0.1"
2738
hyper = { version = "1.6.0", features = ["client", "http2"] }
28-
hyper-util = "0.1.14"
2939
once_cell = "1.19.0"
3040
parking_lot = "0.12.4"
3141
pin-project-lite = "0.2.16"
3242
rand = "0.9"
3343
serde = { version = "1.0.219", features = ["derive"] }
3444
serde_json = "1.0.140"
35-
socket2 = "0.5.10"
36-
tokio = { version = "1.37.0", features = [
37-
"sync",
38-
"rt",
39-
"net",
40-
"time",
41-
"macros",
42-
] }
45+
socket2 = { version = "0.5.10", optional = true }
46+
tokio = { version = "1.37.0", features = ["sync", "time", "macros"] }
4347
tokio-stream = "0.1.17"
4448
tonic = { version = "0.14.0", path = "../tonic", default-features = false, features = [
4549
"codegen",
46-
"transport",
4750
] }
48-
tower = { version = "0.5.2", features = ["buffer", "limit", "util"] }
51+
tower = { version = "0.5.2", features = [
52+
"limit",
53+
"util",
54+
"buffer",
55+
], optional = true }
4956
tower-service = "0.3.3"
5057
url = "2.5.0"
5158

grpc/src/client/channel.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,17 +37,16 @@ use std::{
3737
};
3838

3939
use tokio::sync::{mpsc, oneshot, watch, Notify};
40-
use tokio::task::AbortHandle;
4140

4241
use serde_json::json;
4342
use tonic::async_trait;
4443
use url::Url; // NOTE: http::Uri requires non-empty authority portion of URI
4544

46-
use crate::credentials::Credentials;
45+
use crate::attributes::Attributes;
4746
use crate::rt;
4847
use crate::service::{Request, Response, Service};
49-
use crate::{attributes::Attributes, rt::tokio::TokioRuntime};
5048
use crate::{client::ConnectivityState, rt::Runtime};
49+
use crate::{credentials::Credentials, rt::default_runtime};
5150

5251
use super::service_config::ServiceConfig;
5352
use super::transport::{TransportRegistry, GLOBAL_TRANSPORT_REGISTRY};
@@ -156,7 +155,7 @@ impl Channel {
156155
inner: Arc::new(PersistentChannel::new(
157156
target,
158157
credentials,
159-
Arc::new(rt::tokio::TokioRuntime {}),
158+
default_runtime(),
160159
options,
161160
)),
162161
}
@@ -280,7 +279,7 @@ impl ActiveChannel {
280279
let resolver_opts = name_resolution::ResolverOptions {
281280
authority,
282281
work_scheduler,
283-
runtime: Arc::new(TokioRuntime {}),
282+
runtime: runtime.clone(),
284283
};
285284
let resolver = rb.build(&target, resolver_opts);
286285

@@ -373,7 +372,7 @@ impl InternalChannelController {
373372
connectivity_state: Arc<Watcher<ConnectivityState>>,
374373
runtime: Arc<dyn Runtime>,
375374
) -> Self {
376-
let lb = Arc::new(GracefulSwitchBalancer::new(wqtx.clone()));
375+
let lb = Arc::new(GracefulSwitchBalancer::new(wqtx.clone(), runtime.clone()));
377376

378377
Self {
379378
lb,
@@ -459,6 +458,7 @@ pub(super) struct GracefulSwitchBalancer {
459458
policy_builder: Mutex<Option<Arc<dyn LbPolicyBuilder>>>,
460459
work_scheduler: WorkQueueTx,
461460
pending: Mutex<bool>,
461+
runtime: Arc<dyn Runtime>,
462462
}
463463

464464
impl WorkScheduler for GracefulSwitchBalancer {
@@ -483,12 +483,13 @@ impl WorkScheduler for GracefulSwitchBalancer {
483483
}
484484

485485
impl GracefulSwitchBalancer {
486-
fn new(work_scheduler: WorkQueueTx) -> Self {
486+
fn new(work_scheduler: WorkQueueTx, runtime: Arc<dyn Runtime>) -> Self {
487487
Self {
488488
policy_builder: Mutex::default(),
489489
policy: Mutex::default(), // new(None::<Box<dyn LbPolicy>>),
490490
work_scheduler,
491491
pending: Mutex::default(),
492+
runtime,
492493
}
493494
}
494495

@@ -506,6 +507,7 @@ impl GracefulSwitchBalancer {
506507
let builder = GLOBAL_LB_REGISTRY.get_policy(policy_name).unwrap();
507508
let newpol = builder.build(LbPolicyOptions {
508509
work_scheduler: self.clone(),
510+
runtime: self.runtime.clone(),
509511
});
510512
*self.policy_builder.lock().unwrap() = Some(builder);
511513
*p = Some(newpol);

grpc/src/client/load_balancing/child_manager.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ use crate::client::load_balancing::{
3838
WeakSubchannel, WorkScheduler,
3939
};
4040
use crate::client::name_resolution::{Address, ResolverUpdate};
41+
use crate::rt::Runtime;
4142

4243
use super::{Subchannel, SubchannelState};
4344

@@ -47,6 +48,7 @@ pub struct ChildManager<T> {
4748
children: Vec<Child<T>>,
4849
update_sharder: Box<dyn ResolverUpdateSharder<T>>,
4950
pending_work: Arc<Mutex<HashSet<usize>>>,
51+
runtime: Arc<dyn Runtime>,
5052
}
5153

5254
struct Child<T> {
@@ -81,12 +83,16 @@ pub trait ResolverUpdateSharder<T>: Send {
8183
impl<T> ChildManager<T> {
8284
/// Creates a new ChildManager LB policy. shard_update is called whenever a
8385
/// resolver_update operation occurs.
84-
pub fn new(update_sharder: Box<dyn ResolverUpdateSharder<T>>) -> Self {
86+
pub fn new(
87+
update_sharder: Box<dyn ResolverUpdateSharder<T>>,
88+
runtime: Arc<dyn Runtime>,
89+
) -> Self {
8590
Self {
8691
update_sharder,
8792
subchannel_child_map: Default::default(),
8893
children: Default::default(),
8994
pending_work: Default::default(),
95+
runtime,
9096
}
9197
}
9298

@@ -197,6 +203,7 @@ impl<T: PartialEq + Hash + Eq + Send + Sync + 'static> LbPolicy for ChildManager
197203
});
198204
let policy = builder.build(LbPolicyOptions {
199205
work_scheduler: work_scheduler.clone(),
206+
runtime: self.runtime.clone(),
200207
});
201208
let state = LbState::initial();
202209
self.children.push(Child {

grpc/src/client/load_balancing/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ use tonic::{metadata::MetadataMap, Status};
4141

4242
use crate::{
4343
client::channel::WorkQueueTx,
44+
rt::Runtime,
4445
service::{Request, Response, Service},
4546
};
4647

@@ -64,6 +65,7 @@ pub struct LbPolicyOptions {
6465
/// A hook into the channel's work scheduler that allows the LbPolicy to
6566
/// request the ability to perform operations on the ChannelController.
6667
pub work_scheduler: Arc<dyn WorkScheduler>,
68+
pub runtime: Arc<dyn Runtime>,
6769
}
6870

6971
/// Used to asynchronously request a call into the LbPolicy's work method if

grpc/src/client/load_balancing/pick_first.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use crate::{
1313
name_resolution::{Address, ResolverUpdate},
1414
subchannel, ConnectivityState,
1515
},
16+
rt::Runtime,
1617
service::Request,
1718
};
1819

@@ -31,6 +32,7 @@ impl LbPolicyBuilder for Builder {
3132
work_scheduler: options.work_scheduler,
3233
subchannel: None,
3334
next_addresses: Vec::default(),
35+
runtime: options.runtime,
3436
})
3537
}
3638

@@ -47,6 +49,7 @@ struct PickFirstPolicy {
4749
work_scheduler: Arc<dyn WorkScheduler>,
4850
subchannel: Option<Arc<dyn Subchannel>>,
4951
next_addresses: Vec<Address>,
52+
runtime: Arc<dyn Runtime>,
5053
}
5154

5255
impl LbPolicy for PickFirstPolicy {
@@ -73,10 +76,10 @@ impl LbPolicy for PickFirstPolicy {
7376
self.next_addresses = addresses;
7477
let work_scheduler = self.work_scheduler.clone();
7578
// TODO: Implement Drop that cancels this task.
76-
tokio::task::spawn(async move {
79+
self.runtime.spawn(Box::pin(async move {
7780
sleep(Duration::from_millis(200)).await;
7881
work_scheduler.schedule_work();
79-
});
82+
}));
8083
// TODO: return a picker that queues RPCs.
8184
Ok(())
8285
}

grpc/src/client/subchannel.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ use std::{
2424
};
2525
use tokio::{
2626
sync::{mpsc, oneshot, watch, Notify},
27-
task::{AbortHandle, JoinHandle},
2827
time::{Duration, Instant},
2928
};
3029
use tonic::async_trait;
@@ -66,7 +65,7 @@ struct InternalSubchannelReadyState {
6665
}
6766

6867
struct InternalSubchannelTransientFailureState {
69-
abort_handle: Option<AbortHandle>,
68+
task_handle: Option<Box<dyn TaskHandle>>,
7069
error: String,
7170
}
7271

@@ -168,7 +167,7 @@ impl Drop for InternalSubchannelState {
168167
}
169168
}
170169
Self::TransientFailure(st) => {
171-
if let Some(ah) = &st.abort_handle {
170+
if let Some(ah) = &st.task_handle {
172171
ah.abort();
173172
}
174173
}
@@ -189,8 +188,8 @@ pub(crate) struct InternalSubchannel {
189188
struct InnerSubchannel {
190189
state: InternalSubchannelState,
191190
watchers: Vec<Arc<SubchannelStateWatcher>>, // TODO(easwars): Revisit the choice for this data structure.
192-
backoff_task: Option<JoinHandle<()>>,
193-
disconnect_task: Option<JoinHandle<()>>,
191+
backoff_task: Option<Box<dyn TaskHandle>>,
192+
disconnect_task: Option<Box<dyn TaskHandle>>,
194193
}
195194

196195
#[async_trait]
@@ -417,7 +416,7 @@ impl InternalSubchannel {
417416
let mut inner = self.inner.lock().unwrap();
418417
inner.state = InternalSubchannelState::TransientFailure(
419418
InternalSubchannelTransientFailureState {
420-
abort_handle: None,
419+
task_handle: None,
421420
error: err.clone(),
422421
},
423422
);
@@ -431,14 +430,14 @@ impl InternalSubchannel {
431430

432431
let backoff_interval = self.backoff.backoff_until();
433432
let state_machine_tx = self.state_machine_event_sender.clone();
434-
let backoff_task = tokio::task::spawn(async move {
433+
let backoff_task = self.runtime.spawn(Box::pin(async move {
435434
tokio::time::sleep_until(backoff_interval).await;
436435
let _ = state_machine_tx.send(SubchannelStateMachineEvent::BackoffExpired);
437-
});
436+
}));
438437
let mut inner = self.inner.lock().unwrap();
439438
inner.state =
440439
InternalSubchannelState::TransientFailure(InternalSubchannelTransientFailureState {
441-
abort_handle: Some(backoff_task.abort_handle()),
440+
task_handle: Some(backoff_task),
442441
error: err.clone(),
443442
});
444443
}

grpc/src/client/transport/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@ use crate::{rt::Runtime, service::Service};
22
use std::{sync::Arc, time::Duration};
33

44
mod registry;
5+
6+
// Using tower/buffer enables tokio's rt feature even though it's possible to
7+
// create Buffers with a user provided executor.
8+
#[cfg(feature = "_runtime-tokio")]
59
mod tonic;
610

711
use ::tonic::async_trait;

grpc/src/rt/mod.rs

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,10 @@
2323
*/
2424

2525
use ::tokio::io::{AsyncRead, AsyncWrite};
26-
use std::{future::Future, net::SocketAddr, pin::Pin, time::Duration};
26+
use std::{future::Future, net::SocketAddr, pin::Pin, sync::Arc, time::Duration};
2727

2828
pub(crate) mod hyper_wrapper;
29+
#[cfg(feature = "_runtime-tokio")]
2930
pub(crate) mod tokio;
3031

3132
/// An abstraction over an asynchronous runtime.
@@ -91,3 +92,45 @@ pub(crate) struct TcpOptions {
9192
}
9293

9394
pub(crate) trait TcpStream: AsyncRead + AsyncWrite + Send + Unpin {}
95+
96+
/// A fake runtime to satisfy the compiler when no runtime is enabled. This will
97+
///
98+
/// # Panics
99+
///
100+
/// Panics if any of its functions are called.
101+
#[derive(Default)]
102+
pub(crate) struct NoOpRuntime {}
103+
104+
impl Runtime for NoOpRuntime {
105+
fn spawn(
106+
&self,
107+
task: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
108+
) -> Box<dyn TaskHandle> {
109+
unimplemented!()
110+
}
111+
112+
fn get_dns_resolver(&self, opts: ResolverOptions) -> Result<Box<dyn DnsResolver>, String> {
113+
unimplemented!()
114+
}
115+
116+
fn sleep(&self, duration: std::time::Duration) -> Pin<Box<dyn Sleep>> {
117+
unimplemented!()
118+
}
119+
120+
fn tcp_stream(
121+
&self,
122+
target: SocketAddr,
123+
opts: TcpOptions,
124+
) -> Pin<Box<dyn Future<Output = Result<Box<dyn TcpStream>, String>> + Send>> {
125+
unimplemented!()
126+
}
127+
}
128+
129+
pub(crate) fn default_runtime() -> Arc<dyn Runtime> {
130+
#[cfg(feature = "_runtime-tokio")]
131+
{
132+
return Arc::new(tokio::TokioRuntime {});
133+
}
134+
#[allow(unreachable_code)]
135+
Arc::new(NoOpRuntime::default())
136+
}

0 commit comments

Comments
 (0)