diff --git a/libs/opsqueue_python/src/producer.rs b/libs/opsqueue_python/src/producer.rs index ca837f4..2f1e8f3 100644 --- a/libs/opsqueue_python/src/producer.rs +++ b/libs/opsqueue_python/src/producer.rs @@ -217,17 +217,19 @@ impl ProducerClient { py.allow_threads(|| { let prefix = uuid::Uuid::now_v7().to_string(); tracing::debug!("Uploading submission chunks to object store subfolder {prefix}..."); - let chunk_count = Python::with_gil(|py| { - self.block_unless_interrupted(async { - let chunk_contents = chunk_contents.bind(py); - let stream = futures::stream::iter(chunk_contents) - .map(|item| item.and_then(|item| item.extract()).map_err(Into::into)); + let chunk_count = self.block_unless_interrupted(async { + let chunk_contents = std::iter::from_fn(move || { + Python::with_gil(|py| + chunk_contents.bind(py).clone().next() + .map(|item| item.and_then( + |item| item.extract()).map_err(Into::into))) + }); + let stream = futures::stream::iter(chunk_contents); self.object_store_client .store_chunks(&prefix, ChunkType::Input, stream) .await .map_err(|e| CError(R(L(e)))) - }) - })?; + })?; let chunk_count = chunk::ChunkIndex::from(chunk_count); tracing::debug!("Finished uploading to object store. {prefix} contains {chunk_count} chunks"); diff --git a/libs/opsqueue_python/tests/conftest.py b/libs/opsqueue_python/tests/conftest.py index 978fb4e..3c3d6c1 100644 --- a/libs/opsqueue_python/tests/conftest.py +++ b/libs/opsqueue_python/tests/conftest.py @@ -19,6 +19,7 @@ # print("A") # multiprocessing.set_start_method('forkserver') +PROJECT_ROOT = Path(__file__).parents[3] @dataclass class OpsqueueProcess: @@ -34,8 +35,8 @@ def opsqueue_bin_location() -> Path: ) return Path(deriv_path) / "bin" / "opsqueue" else: - subprocess.run(["cargo", "build", "--quiet", "--bin", "opsqueue"]) - return Path(".", "target", "debug", "opsqueue") + subprocess.run(["cargo", "build", "--quiet", "--bin", "opsqueue"], cwd=PROJECT_ROOT, check=True) + return PROJECT_ROOT / Path("target", "debug", "opsqueue") @pytest.fixture @@ -62,12 +63,11 @@ def opsqueue_service( "--database-filename", temp_dbname, ] - cwd = "../../" env = os.environ.copy() # We copy the env so e.g. RUST_LOG and other env vars are propagated from outside of the invocation of pytest if env.get("RUST_LOG") is None: env["RUST_LOG"] = "off" - with subprocess.Popen(command, cwd=cwd, env=env) as process: + with subprocess.Popen(command, cwd=PROJECT_ROOT, env=env) as process: try: wrapper = OpsqueueProcess(port=port, process=process) yield wrapper