Skip to content

Commit e17d605

Browse files
committed
WIP
1 parent 6724dd1 commit e17d605

File tree

3 files changed

+34
-21
lines changed

3 files changed

+34
-21
lines changed

libs/opsqueue_python/src/common.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -443,13 +443,12 @@ pub async fn check_signals_in_background() -> FatalPythonException {
443443

444444
/// Sets up a Tokio runtime to use for a client.
445445
///
446-
/// Rather than the current-thread scheduler,
447-
/// we use a (single extra!) background thread,
448-
/// allowing us to keep (GIL-less) tasks alive in the background
449-
/// even when returning back to Python
446+
/// We use a current_thread runtime since the Python GIL
447+
/// together with a multithreaded runtime and pyo3_log is a dangerous combination:
448+
/// - https://docs.rs/pyo3/latest/pyo3/marker/struct.Python.html#deadlocks
449+
/// - https://docs.rs/pyo3-log/latest/pyo3_log/#interaction-with-python-gil
450450
pub fn start_runtime() -> Arc<tokio::runtime::Runtime> {
451-
let runtime = tokio::runtime::Builder::new_multi_thread()
452-
.worker_threads(1)
451+
let runtime = tokio::runtime::Builder::new_current_thread()
453452
.enable_all()
454453
.build()
455454
.expect("Failed to create Tokio runtime in opsqueue client");

libs/opsqueue_python/src/producer.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -217,17 +217,19 @@ impl ProducerClient {
217217
py.allow_threads(|| {
218218
let prefix = uuid::Uuid::now_v7().to_string();
219219
tracing::debug!("Uploading submission chunks to object store subfolder {prefix}...");
220-
let chunk_count = Python::with_gil(|py| {
221-
self.block_unless_interrupted(async {
222-
let chunk_contents = chunk_contents.bind(py);
223-
let stream = futures::stream::iter(chunk_contents)
224-
.map(|item| item.and_then(|item| item.extract()).map_err(Into::into));
220+
let chunk_count = self.block_unless_interrupted(async {
221+
let chunk_contents = std::iter::from_fn(move || {
222+
Python::with_gil(|py|
223+
chunk_contents.bind(py).clone().next()
224+
.map(|item| item.and_then(
225+
|item| item.extract()).map_err(Into::into)))
226+
});
227+
let stream = futures::stream::iter(chunk_contents);
225228
self.object_store_client
226229
.store_chunks(&prefix, ChunkType::Input, stream)
227230
.await
228231
.map_err(|e| CError(R(L(e))))
229-
})
230-
})?;
232+
})?;
231233
let chunk_count = chunk::ChunkIndex::from(chunk_count);
232234
tracing::debug!("Finished uploading to object store. {prefix} contains {chunk_count} chunks");
233235

opsqueue/src/consumer/client.rs

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use std::{
22
collections::HashMap,
33
str::FromStr,
44
sync::{atomic::AtomicBool, Arc},
5+
thread,
56
time::Duration,
67
};
78

@@ -200,14 +201,25 @@ impl Client {
200201
}
201202

202203
let healthy = Arc::new(AtomicBool::new(true));
203-
tokio::spawn(Self::background_task(
204-
cancellation_token.clone(),
205-
healthy.clone(),
206-
in_flight_requests.clone(),
207-
ws_stream,
208-
ws_sink.clone(),
209-
config,
210-
));
204+
205+
let bg_cancellation_token = cancellation_token.clone();
206+
let bg_healthy = healthy.clone();
207+
let bg_in_flight_requests = in_flight_requests.clone();
208+
let bg_ws_sink = ws_sink.clone();
209+
thread::spawn(move || {
210+
tokio::runtime::Builder::new_current_thread()
211+
.enable_all()
212+
.build()
213+
.expect("Failed to create Tokio runtime for background client")
214+
.block_on(Self::background_task(
215+
bg_cancellation_token,
216+
bg_healthy,
217+
bg_in_flight_requests,
218+
ws_stream,
219+
bg_ws_sink,
220+
config,
221+
))
222+
});
211223

212224
let me = Self {
213225
in_flight_requests,

0 commit comments

Comments
 (0)