Skip to content

Commit f7e55e8

Browse files
committed
Merge #220: Remove / fix leftover TODO comments
Approved-by: Qqwy Priority: Normal Auto-deploy: false
2 parents cc9948a + 766f1da commit f7e55e8

File tree

14 files changed

+52
-143
lines changed

14 files changed

+52
-143
lines changed

libs/opsqueue_python/opsqueue_python.nix

Lines changed: 3 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -19,30 +19,9 @@ buildPythonPackage rec {
1919
name = "opsqueue_python";
2020
src = ../../.;
2121

22-
# TODO: I couldn't get it to work with the filter enabled.
23-
# We're now copying slightly too much to the Nix store.
24-
# Re-enable properly.
25-
26-
# srcWhitelist = [
27-
# "Cargo.toml"
28-
# "Cargo.lock"
29-
# "libs/opsqueue_python"
30-
# "libs/opsqueue_python/Cargo.toml"
31-
# "libs/opsqueue_python/pyproject.toml"
32-
# "libs/opsqueue_python/src(/.*)?"
33-
# "libs/opsqueue_python/python(/.*)?"
34-
35-
# # Opsqueue is a dependency, so that needs to be included too
36-
# "opsqueue"
37-
# "opsqueue/Cargo.toml"
38-
# "opsqueue/.cargo(/.*)?"
39-
# "opsqueue/build\.rs"
40-
# "opsqueue/opsqueue_example_database_schema\.db"
41-
# # "opsqueue/app(/.*)?"
42-
# "opsqueue/migrations(/.*)?"
43-
# "opsqueue/src(/.*)?"
44-
# ];
45-
22+
# We're copying slightly too much to the Nix store here,
23+
# but using the more granular file filter was very error-prone.
24+
# This is one thing that could be improved a little in the future.
4625
srcGlobalWhitelist = [
4726
".py"
4827
".pyi"

libs/opsqueue_python/python/opsqueue/exceptions.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,6 @@ class SubmissionFailedError(Exception):
1414
1515
This means that part of the submission is missing and therefore
1616
it can never be 'completed' anymore.
17-
18-
TODO include chunk error in error message,
19-
and get at info about failed chunk.
2017
"""
2118

2219
def __init__(
@@ -86,7 +83,7 @@ class ChunkCountIsZeroError(IncorrectUsageError):
8683
"""
8784
Raised when making an empty submission.
8885
89-
TODO it is likely that empty submissoins should not be a user error
86+
TODO it is likely that empty submissions should not be a user error
9087
but rather be handled transparently by Opsqueue.
9188
"""
9289

libs/opsqueue_python/python/opsqueue/producer.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,6 @@ def insert_submission(
142142
143143
Raises:
144144
- `ChunkSizeIsZeroError` if passing an incorrect chunk size of zero;
145-
- `ChunkCountIsZeroError` if passing an empty list of operations;
146145
- `InternalProducerClientError` if there is a low-level internal error.
147146
"""
148147
return self.insert_submission_chunks(
@@ -164,7 +163,8 @@ def blocking_stream_completed_submission(
164163
165164
Raises:
166165
- `InternalProducerClientError` if there is a low-level internal error.
167-
- TODO special exception for when the submission fails.
166+
- `SubmissionFailedError` if the submission failed permanently
167+
(after retrying a consumer kept failing on one of the chunks)
168168
"""
169169
return _unchunk_iterator(
170170
self.blocking_stream_completed_submission_chunks(submission_id),
@@ -205,9 +205,9 @@ def run_submission_chunks(
205205
(If opsqueue or the object storage cannot be reached, exceptions will also be raised).
206206
207207
Raises:
208-
- `ChunkCountIsZeroError` if passing an empty list of operations;
209208
- `InternalProducerClientError` if there is a low-level internal error.
210-
- TODO special exception for when the submission fails.
209+
- `SubmissionFailedError` if the submission failed permanently
210+
(after retrying a consumer kept failing on one of the chunks)
211211
"""
212212
submission_id = self.insert_submission_chunks(
213213
chunk_contents,
@@ -225,7 +225,10 @@ async def async_run_submission_chunks(
225225
strategic_metadata: None | dict[str, str | int] = None,
226226
chunk_size: None | int = None,
227227
) -> AsyncIterator[bytes]:
228-
# TODO: the insertion is not async yet.
228+
# NOTE: the insertion is not currently async.
229+
# Why? Simplicity. This is unlikely to be the bottleneck
230+
# for most async apps.
231+
# If it does cause a problem in the future this can be revisited
229232
submission_id = self.insert_submission_chunks(
230233
chunk_contents,
231234
metadata=metadata,
@@ -248,7 +251,6 @@ def insert_submission_chunks(
248251
returning an ID you can use to track the submission's progress afterwards.
249252
250253
Raises:
251-
- `ChunkCountIsZeroError` if passing an empty list of operations;
252254
- `InternalProducerClientError` if there is a low-level internal error.
253255
"""
254256
otel_trace_carrier = tracing.current_opentelemetry_tracecontext_to_carrier()
@@ -270,7 +272,8 @@ def blocking_stream_completed_submission_chunks(
270272
271273
Raises:
272274
- `InternalProducerClientError` if there is a low-level internal error.
273-
- TODO special exception for when the submission fails.
275+
- `SubmissionFailedError` if the submission failed permanently
276+
(after retrying a consumer kept failing on one of the chunks)
274277
"""
275278
return self.inner.blocking_stream_completed_submission_chunks(submission_id) # type: ignore[no-any-return]
276279

libs/opsqueue_python/src/consumer.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -230,8 +230,6 @@ impl ConsumerClient {
230230
E<ChunkRetrievalError, E<InternalConsumerClientError, IncorrectUsage<LimitIsZero>>>,
231231
>,
232232
> {
233-
// TODO: Currently we do short-polling here if there are no chunks available.
234-
// This is quite suboptimal; long-polling would be much nicer.
235233
const POLL_INTERVAL: Duration = Duration::from_millis(500);
236234
loop {
237235
let res = self.block_unless_interrupted(async {

libs/opsqueue_python/src/errors.rs

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,7 @@ use opsqueue::common::chunk::ChunkId;
66
use opsqueue::common::errors::{
77
ChunkNotFound, IncorrectUsage, SubmissionNotFound, UnexpectedOpsqueueConsumerServerResponse, E,
88
};
9-
use opsqueue::common::NonZeroIsZero;
10-
use pyo3::exceptions::{PyBaseException, PyException};
9+
use pyo3::exceptions::PyBaseException;
1110
use pyo3::{import_exception, Bound, PyErr, Python};
1211

1312
use crate::common::{ChunkIndex, SubmissionId};
@@ -20,7 +19,6 @@ import_exception!(opsqueue.exceptions, IncorrectUsageError);
2019
import_exception!(opsqueue.exceptions, TryFromIntError);
2120
import_exception!(opsqueue.exceptions, ChunkNotFoundError);
2221
import_exception!(opsqueue.exceptions, SubmissionNotFoundError);
23-
import_exception!(opsqueue.exceptions, ChunkCountIsZeroError);
2422
import_exception!(opsqueue.exceptions, NewObjectStoreClientError);
2523
import_exception!(opsqueue.exceptions, SubmissionNotCompletedYetError);
2624

@@ -119,12 +117,6 @@ impl From<CError<opsqueue::object_store::ChunkStorageError>> for PyErr {
119117
}
120118
}
121119

122-
impl From<CError<NonZeroIsZero<opsqueue::common::chunk::ChunkIndex>>> for PyErr {
123-
fn from(value: CError<NonZeroIsZero<opsqueue::common::chunk::ChunkIndex>>) -> Self {
124-
ChunkCountIsZeroError::new_err(value.0.to_string())
125-
}
126-
}
127-
128120
impl<T: Error> From<CError<IncorrectUsage<T>>> for PyErr {
129121
fn from(value: CError<IncorrectUsage<T>>) -> Self {
130122
IncorrectUsageError::new_err(value.0.to_string())
@@ -180,14 +172,6 @@ impl From<CError<opsqueue::object_store::NewObjectStoreClientError>> for PyErr {
180172
}
181173
}
182174

183-
// TODO: Only temporary. We want to get rid of all usage of anyhow
184-
// in the boundary to PyO3
185-
impl From<CError<anyhow::Error>> for PyErr {
186-
fn from(value: CError<anyhow::Error>) -> Self {
187-
PyException::new_err(value.0.to_string())
188-
}
189-
}
190-
191175
impl From<CError<UnexpectedOpsqueueConsumerServerResponse>> for PyErr {
192176
fn from(value: CError<UnexpectedOpsqueueConsumerServerResponse>) -> Self {
193177
UnexpectedOpsqueueConsumerServerResponseError::new_err(value.0.to_string())

libs/opsqueue_python/src/producer.rs

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,21 +8,18 @@ use pyo3::{
88
};
99

1010
use futures::{stream::BoxStream, StreamExt, TryStreamExt};
11+
use opsqueue::{
12+
common::errors::E::{self, L, R},
13+
object_store::{ChunksStorageError, NewObjectStoreClientError},
14+
producer::client::{Client as ActualClient, InternalProducerClientError},
15+
};
1116
use opsqueue::{
1217
common::{chunk, submission, StrategicMetadataMap},
1318
object_store::{ChunkRetrievalError, ChunkType},
1419
producer::common::ChunkContents,
1520
tracing::CarrierMap,
1621
E,
1722
};
18-
use opsqueue::{
19-
common::{
20-
errors::E::{self, L, R},
21-
NonZeroIsZero,
22-
},
23-
object_store::{ChunksStorageError, NewObjectStoreClientError},
24-
producer::client::{Client as ActualClient, InternalProducerClientError},
25-
};
2623
use ux_serde::u63;
2724

2825
use crate::{
@@ -163,6 +160,9 @@ impl ProducerClient {
163160
})
164161
}
165162

163+
/// Directly inserts a submission without sending the chunks to GCS
164+
/// (but immediately embedding them in the DB).
165+
/// NOTE: This does not support StrategicMetadata currently
166166
#[pyo3(signature = (chunk_contents, metadata=None, chunk_size=None, otel_trace_carrier=CarrierMap::default()))]
167167
pub fn insert_submission_direct(
168168
&self,
@@ -172,7 +172,6 @@ impl ProducerClient {
172172
chunk_size: Option<u64>,
173173
otel_trace_carrier: CarrierMap,
174174
) -> CPyResult<SubmissionId, E<FatalPythonException, InternalProducerClientError>> {
175-
// TODO
176175
let strategic_metadata = Default::default();
177176

178177
py.allow_threads(|| {
@@ -208,7 +207,6 @@ impl ProducerClient {
208207
SubmissionId,
209208
E![
210209
FatalPythonException,
211-
NonZeroIsZero<chunk::ChunkIndex>,
212210
ChunksStorageError,
213211
InternalProducerClientError,
214212
],
@@ -227,7 +225,7 @@ impl ProducerClient {
227225
self.object_store_client
228226
.store_chunks(&prefix, ChunkType::Input, stream)
229227
.await
230-
.map_err(|e| CError(R(R(L(e)))))
228+
.map_err(|e| CError(R(L(e))))
231229
})
232230
})?;
233231
let chunk_count = chunk::ChunkIndex::from(chunk_count);
@@ -250,7 +248,7 @@ impl ProducerClient {
250248
tracing::debug!("Submitting finished; Submission ID {submission_id} assigned to subfolder {prefix}");
251249
submission_id.into()
252250
})
253-
.map_err(|e| R(R(R(e))).into())
251+
.map_err(|e| R(R(e)).into())
254252
})
255253
})
256254
}
@@ -269,7 +267,6 @@ impl ProducerClient {
269267
InternalProducerClientError,
270268
],
271269
> {
272-
// TODO: Use CPyResult instead
273270
py.allow_threads(|| {
274271
self.block_unless_interrupted(async move {
275272
match self
@@ -299,7 +296,6 @@ impl ProducerClient {
299296
E![
300297
FatalPythonException,
301298
errors::SubmissionFailed,
302-
NonZeroIsZero<chunk::ChunkIndex>,
303299
ChunksStorageError,
304300
InternalProducerClientError,
305301
],
@@ -325,7 +321,7 @@ impl ProducerClient {
325321
CError(match e {
326322
L(e) => L(e),
327323
R(L(e)) => R(L(e)),
328-
R(R(e)) => R(R(R(R(e)))),
324+
R(R(e)) => R(R(R(e))),
329325
})
330326
})?;
331327
Ok(res)

libs/opsqueue_python/tests/test_rountrip.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,26 @@ def run_consumer() -> None:
4747
assert res == sum(range(1, 101))
4848

4949

50+
def test_empty_submission(
51+
opsqueue: OpsqueueProcess, any_consumer_strategy: Strategy
52+
) -> None:
53+
"""
54+
Empty submissions ought to be supported without problems.
55+
Opsqueue should immediately consider these 'completed'
56+
and no errors should be thrown.
57+
"""
58+
producer_client = ProducerClient(
59+
f"localhost:{opsqueue.port}", "file:///tmp/opsqueue/test_empty_submission"
60+
)
61+
62+
input_iter: list[int] = []
63+
output_iter: Iterator[int] = producer_client.run_submission(
64+
input_iter, chunk_size=20
65+
)
66+
res = sum(output_iter)
67+
assert res == 0
68+
69+
5070
def test_roundtrip_explicit_serialization_format(
5171
opsqueue: OpsqueueProcess,
5272
any_consumer_strategy: Strategy,

opsqueue/src/common/chunk.rs

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ use ux_serde::u63;
55

66
use super::errors::TryFromIntError;
77
use super::submission::SubmissionId;
8-
use super::MayBeZero;
98

109
/// Index of this particular chunk in a submission.
1110
#[derive(
@@ -64,12 +63,6 @@ impl ChunkIndex {
6463
}
6564
}
6665

67-
impl MayBeZero for ChunkIndex {
68-
fn is_zero(&self) -> bool {
69-
self.0 == u63::new(0)
70-
}
71-
}
72-
7366
impl PartialEq<u64> for ChunkIndex {
7467
fn eq(&self, other: &u64) -> bool {
7568
u64::from(*self) == *other

opsqueue/src/common/mod.rs

Lines changed: 0 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1,60 +1,9 @@
1-
use std::fmt::Debug;
2-
31
use rustc_hash::FxHashMap;
42

53
pub mod chunk;
64
pub mod errors;
75
pub mod submission;
86

9-
pub trait MayBeZero {
10-
fn is_zero(&self) -> bool;
11-
}
12-
13-
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, serde::Serialize)]
14-
pub struct NonZero<T>(T);
15-
16-
impl<'de, T: MayBeZero + Debug + serde::Deserialize<'de>> serde::Deserialize<'de> for NonZero<T> {
17-
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
18-
where
19-
D: serde::Deserializer<'de>,
20-
{
21-
let inner = T::deserialize(deserializer)?;
22-
NonZero::try_from(inner).map_err(|e| {
23-
serde::de::Error::invalid_value(
24-
serde::de::Unexpected::Other(&format!("zero (0)-value {e:?}")),
25-
&&*format!("a non-zero value of type{}", std::any::type_name::<T>()),
26-
)
27-
})
28-
}
29-
}
30-
31-
impl<T> NonZero<T> {
32-
pub fn inner(&self) -> &T {
33-
&self.0
34-
}
35-
pub fn into_inner(self) -> T {
36-
self.0
37-
}
38-
}
39-
40-
#[derive(thiserror::Error, Debug)]
41-
#[error("Given a zero-value where a non-zero value is expected: {0:?}")]
42-
pub struct NonZeroIsZero<T>(T);
43-
44-
impl<T: MayBeZero> NonZero<T> {
45-
/// builds a new NonZero from a given value.
46-
///
47-
/// Nicer would be to implement TryFrom but we cannot
48-
/// because of the blanket implementation `TryFrom<T, U: Into<T>> ...` :-(
49-
pub fn try_from(value: T) -> Result<Self, NonZeroIsZero<T>> {
50-
if value.is_zero() {
51-
Err(NonZeroIsZero(value))
52-
} else {
53-
Ok(Self(value))
54-
}
55-
}
56-
}
57-
587
/// As values, we support the largest number value SQLite supports by itself,
598
/// which should be sufficient for most 'ID' fields, which is what this feature is intended for.
609
///

0 commit comments

Comments
 (0)