Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions libs/opsqueue_python/src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,17 +217,19 @@ impl ProducerClient {
py.allow_threads(|| {
let prefix = uuid::Uuid::now_v7().to_string();
tracing::debug!("Uploading submission chunks to object store subfolder {prefix}...");
let chunk_count = Python::with_gil(|py| {
self.block_unless_interrupted(async {
let chunk_contents = chunk_contents.bind(py);
let stream = futures::stream::iter(chunk_contents)
.map(|item| item.and_then(|item| item.extract()).map_err(Into::into));
let chunk_count = self.block_unless_interrupted(async {
let chunk_contents = std::iter::from_fn(move || {
Python::with_gil(|py|
chunk_contents.bind(py).clone().next()
.map(|item| item.and_then(
|item| item.extract()).map_err(Into::into)))
});
let stream = futures::stream::iter(chunk_contents);
self.object_store_client
.store_chunks(&prefix, ChunkType::Input, stream)
.await
.map_err(|e| CError(R(L(e))))
})
})?;
})?;
let chunk_count = chunk::ChunkIndex::from(chunk_count);
tracing::debug!("Finished uploading to object store. {prefix} contains {chunk_count} chunks");

Expand Down
8 changes: 4 additions & 4 deletions libs/opsqueue_python/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
# print("A")
# multiprocessing.set_start_method('forkserver')

PROJECT_ROOT = Path(__file__).parents[3]

@dataclass
class OpsqueueProcess:
Expand All @@ -34,8 +35,8 @@ def opsqueue_bin_location() -> Path:
)
return Path(deriv_path) / "bin" / "opsqueue"
else:
subprocess.run(["cargo", "build", "--quiet", "--bin", "opsqueue"])
return Path(".", "target", "debug", "opsqueue")
subprocess.run(["cargo", "build", "--quiet", "--bin", "opsqueue"], cwd=PROJECT_ROOT, check=True)
return PROJECT_ROOT / Path("target", "debug", "opsqueue")


@pytest.fixture
Expand All @@ -62,12 +63,11 @@ def opsqueue_service(
"--database-filename",
temp_dbname,
]
cwd = "../../"
env = os.environ.copy() # We copy the env so e.g. RUST_LOG and other env vars are propagated from outside of the invocation of pytest
if env.get("RUST_LOG") is None:
env["RUST_LOG"] = "off"

with subprocess.Popen(command, cwd=cwd, env=env) as process:
with subprocess.Popen(command, cwd=PROJECT_ROOT, env=env) as process:
try:
wrapper = OpsqueueProcess(port=port, process=process)
yield wrapper
Expand Down
Loading