Skip to content

Commit a725c55

Browse files
committed
WIP
1 parent 7961a28 commit a725c55

File tree

4 files changed

+35
-11
lines changed

4 files changed

+35
-11
lines changed

Cargo.lock

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

libs/opsqueue_python/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ thiserror = "1.0.65"
3131

3232
# Python FFI:
3333
pyo3 = { version = "0.25.1", features = ["chrono", "experimental-async"] }
34-
# pyo3-async-runtimes = { version = "0.23.0", features = ["tokio-runtime", "unstable-streams"] }
34+
pyo3-async-runtimes = { version = "0.25.0", features = ["tokio-runtime"] }
3535

3636
# Logging/tracing:
3737
pyo3-log = "0.12.1"

libs/opsqueue_python/src/common.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -456,7 +456,7 @@ pub async fn check_signals_in_background() -> FatalPythonException {
456456
/// allowing us to keep (GIL-less) tasks alive in the background
457457
/// even when returning back to Python
458458
pub fn start_runtime() -> Arc<tokio::runtime::Runtime> {
459-
let runtime = tokio::runtime::Builder::new_multi_thread()
459+
let runtime = tokio::runtime::Builder::new_current_thread()
460460
.worker_threads(1)
461461
.enable_all()
462462
.build()

libs/opsqueue_python/src/producer.rs

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -539,20 +539,30 @@ impl PyChunksAsyncIter {
539539

540540
// async fn __anext__(slf: PyRef<'_, Self>) -> PyResult<usize> {
541541
// async fn __anext__(mut _pyself: PyRefMut<'_, Self>) -> PyResult<i32> {
542-
fn __anext__(&self) -> Option<usize> {
543-
todo!()
542+
// fn __anext__(&self) -> Option<usize> {
543+
fn __anext__<'py>(&'py self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
544544
// println!("A");
545545

546546
// println!("B");
547547
// let stream = self.stream.clone();
548548
// println!("C");
549-
550-
// let res = AsyncAllowThreads(self.runtime.spawn(async move { stream.lock().await.next().await})).await.expect("Top level spawn to succeed");
551-
// match res {
552-
// None => Ok(None),
553-
// Some(Ok(val)) => Ok(Some(val)),
554-
// Some(Err(e)) => Err(e.into()),
555-
// }
549+
let stream = self.stream.clone();
550+
let runtime = self.runtime.clone();
551+
pyo3_async_runtimes::tokio::future_into_py(
552+
py,
553+
AsyncAllowThreads(Box::pin(async move {
554+
todo!();
555+
let res = runtime
556+
.spawn(async move { stream.lock().await.next().await })
557+
.await
558+
.expect("Top level spawn to succeed");
559+
match res {
560+
None => Err(PyStopAsyncIteration::new_err(())),
561+
Some(Ok(val)) => Ok(Some(val)),
562+
Some(Err(e)) => Err(e.into()),
563+
}
564+
})),
565+
)
556566
// pyo3_async_runtimes::generic::future_into_py::<TokioRuntimeThatIsInScope, _, _>(
557567
// slf.py(),
558568
// async move {

0 commit comments

Comments
 (0)