Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
576 changes: 246 additions & 330 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 1 addition & 3 deletions toad-jni/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,9 @@ nb = "1"
no-std-net = "0.6"
tinyvec = {version = "1.5", default_features = false, features = ["alloc", "rustc_1_55"]}
toad-msg = "0.18.1"
toad = "0.19.1"
toad = { path = "../toad" }
jni = {version = "0.21.1", features = ["invocation"]}
toad-stem = {version = "0.1.0", default_features = false}
toad-len = {version = "0.1.3", default_features = false}
toad-array = {version = "0.5.0", default_features = false}
log = "0.4"

embedded-time = "0.12"
2 changes: 1 addition & 1 deletion toad-jni/src/java/io/io_exception.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl<StepError> toad::platform::PlatformError<StepError, Throwable> for IOExcept
Self::new_caused_by(&mut java::env(), "", e)
}

fn clock(e: embedded_time::clock::Error) -> Self {
fn clock(e: toad::time::ClockError) -> Self {
Self::new(&mut java::env(), format!("{:?}", e))
}
}
1 change: 0 additions & 1 deletion toad/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ toad-macros = "0.2.0"
log = "0.4"
tinyvec = { version = "1.5", default_features = false, features = ["alloc", "rustc_1_55"] }
no-std-net = "0.6"
embedded-time = "0.12"
nb = "1"
rand = { version = "0.9", default_features = false }
rand_chacha = { version = "0.9", default_features = false }
Expand Down
12 changes: 5 additions & 7 deletions toad/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
#![allow(dead_code)]

use embedded_time::duration::Milliseconds;

use crate::retry::{Attempts, Strategy};
use crate::time::Millis;
use crate::time::{Millis, Milliseconds};

/// Bytes / Second
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
Expand All @@ -17,9 +15,9 @@ pub struct Con {
///
/// Defaults to an exponential retry strategy:
/// ```
/// use embedded_time::duration::Milliseconds;
/// use toad::config::Con;
/// use toad::retry::Strategy;
/// use toad::time::Milliseconds;
///
/// assert_eq!(Con::default().unacked_retry_strategy,
/// Strategy::Exponential { init_min: Milliseconds(500),
Expand All @@ -35,9 +33,9 @@ pub struct Con {
///
/// Defaults to a lazy exponential retry strategy:
/// ```
/// use embedded_time::duration::Milliseconds;
/// use toad::config::Con;
/// use toad::retry::Strategy;
/// use toad::time::Milliseconds;
///
/// assert_eq!(Con::default().acked_retry_strategy,
/// Strategy::Exponential { init_min: Milliseconds(1_000),
Expand Down Expand Up @@ -68,9 +66,9 @@ pub struct Non {
///
/// Defaults to a pessimistic exponential retry strategy:
/// ```
/// use embedded_time::duration::Milliseconds;
/// use toad::config::Non;
/// use toad::retry::Strategy;
/// use toad::time::Milliseconds;
///
/// assert_eq!(Non::default().retry_strategy,
/// Strategy::Exponential { init_min: Milliseconds(250),
Expand Down Expand Up @@ -146,8 +144,8 @@ pub struct Msg {
/// Defaults to 5000 milliseconds.
///
/// ```
/// use embedded_time::duration::Milliseconds;
/// use toad::config::Msg;
/// use toad::time::Milliseconds;
///
/// assert_eq!(Msg::default().multicast_response_leisure,
/// Milliseconds(5000u64));
Expand Down
17 changes: 7 additions & 10 deletions toad/src/platform.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use core::fmt::Debug;

use ::toad_msg::{Id, OptNumber, OptValue, OptionMap, Token, TryIntoBytes};
use embedded_time::Instant;
use naan::prelude::MonadOnce;
use no_std_net::SocketAddr;
#[cfg(feature = "alloc")]
Expand All @@ -13,7 +12,7 @@ use crate::net::{Addrd, Socket};
use crate::req::Req;
use crate::resp::Resp;
use crate::step::Step;
use crate::time::Clock;
use crate::time::{Clock, Instant};
use crate::todo::String;

/// Default [`PlatformError`] implementation
Expand All @@ -23,7 +22,7 @@ pub enum Error<Step, Socket> {
MessageToBytes(::toad_msg::to_bytes::MessageToBytesError),
Step(Step),
Socket(Socket),
Clock(embedded_time::clock::Error),
Clock(crate::time::ClockError),
}

impl<Step, Socket> PlatformError<Step, Socket> for Error<Step, Socket>
Expand All @@ -42,7 +41,7 @@ impl<Step, Socket> PlatformError<Step, Socket> for Error<Step, Socket>
Self::Socket(e)
}

fn clock(e: embedded_time::clock::Error) -> Self {
fn clock(e: crate::time::ClockError) -> Self {
Self::Clock(e)
}
}
Expand All @@ -59,7 +58,7 @@ pub trait PlatformError<StepError, SocketError>: Sized + core::fmt::Debug {
fn socket(e: SocketError) -> Self;

/// Convert a clock error to PlatformError
fn clock(e: embedded_time::clock::Error) -> Self;
fn clock(e: crate::time::ClockError) -> Self;
}

/// The runtime component of the `Platform` abstraction
Expand Down Expand Up @@ -89,8 +88,6 @@ pub trait Platform<Steps>
/// including the system time and datagrams currently
/// in the network socket
fn snapshot(&self) -> Result<Snapshot<Self::Types>, Self::Error> {
use embedded_time::Clock;

self.socket()
.poll()
.map_err(Self::Error::socket)
Expand Down Expand Up @@ -321,7 +318,7 @@ pub trait PlatformTypes: Sized + 'static + core::fmt::Debug {
#[non_exhaustive]
pub struct Snapshot<P: PlatformTypes> {
/// The current system time at the start of the step pipe
pub time: Instant<P::Clock>,
pub time: Instant,

/// A UDP datagram received from somewhere
pub recvd_dgram: Option<Addrd<<P::Socket as Socket>::Dgram>>,
Expand Down Expand Up @@ -405,9 +402,9 @@ impl<P: PlatformTypes> PartialEq for Effect<P> {
/// we've attempted to send this request and whether we
/// should consider it poisoned.
#[derive(Debug, Clone, Copy)]
pub struct Retryable<P: PlatformTypes, T>(pub T, pub crate::retry::RetryTimer<P::Clock>);
pub struct Retryable<T>(pub T, pub crate::retry::RetryTimer);

impl<P: PlatformTypes, T> Retryable<P, T> {
impl<T> Retryable<T> {
/// Gets the data, discarding the retry timer
pub fn unwrap(self) -> T {
self.0
Expand Down
69 changes: 20 additions & 49 deletions toad/src/retry.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
use core::ops::{Add, Mul, RangeInclusive, Sub};

use embedded_time::duration::Milliseconds;
use embedded_time::Instant;
use naan::prelude::Monad;
use rand::{Rng, SeedableRng};

use crate::time::{Clock, Millis};
use crate::time::{Instant, Millis, Milliseconds};

/// A non-blocking timer that allows a fixed-delay or exponential-backoff retry,
/// that lives alongside some operation to retry.
Expand All @@ -14,9 +11,8 @@ use crate::time::{Clock, Millis};
/// we don't have the luxury of a memory allocator :)
///
/// ```
/// use embedded_time::clock::Clock;
/// use embedded_time::duration::Milliseconds;
/// use toad::retry;
/// use toad::time::{Clock, Milliseconds};
///
/// # main();
/// fn main() {
Expand Down Expand Up @@ -46,31 +42,25 @@ use crate::time::{Clock, Millis};
/// }
/// }
/// ```
#[derive(Debug)]
pub struct RetryTimer<C: Clock> {
start: Instant<C>,
last_attempted_at: Option<Instant<C>>,
#[derive(Debug, Clone, Copy)]
pub struct RetryTimer {
start: Instant,
last_attempted_at: Option<Instant>,
init: Millis,
strategy: Strategy,
attempts: Attempts,
max_attempts: Attempts,
}

impl<C> RetryTimer<C> where C: Clock
{
impl RetryTimer {
/// Create a new retrier
pub fn new(start: Instant<C>, strategy: Strategy, max_attempts: Attempts) -> Self {
pub fn new(start: Instant, strategy: Strategy, max_attempts: Attempts) -> Self {
Self { start,
strategy,
last_attempted_at: None,
init: if strategy.has_jitter() {
let mut rand =
Ok(start.duration_since_epoch()).bind(Millis::try_from)
.map(|Milliseconds(ms)| {
rand_chacha::ChaCha8Rng::seed_from_u64(ms)
})
.unwrap();

let Milliseconds(ms) = start.duration_since_epoch();
let mut rand = rand_chacha::ChaCha8Rng::seed_from_u64(ms);
Milliseconds(rand.random_range(strategy.range()))
} else {
Milliseconds(*strategy.range().start())
Expand All @@ -85,7 +75,7 @@ impl<C> RetryTimer<C> where C: Clock
/// Returns `nb::Error::WouldBlock` when we have not yet
/// waited the appropriate amount of time to retry.
pub fn what_should_i_do(&mut self,
now: Instant<C>)
now: Instant)
-> nb::Result<YouShould, core::convert::Infallible> {
if self.attempts >= self.max_attempts {
Ok(YouShould::Cry)
Expand All @@ -101,18 +91,18 @@ impl<C> RetryTimer<C> where C: Clock
}

/// Get the instant this retry timer was first attempted
pub fn first_attempted_at(&self) -> Instant<C> {
pub fn first_attempted_at(&self) -> Instant {
self.start
}

/// Get the instant this retry timer was last attempted (if at all)
pub fn last_attempted_at(&self) -> Instant<C> {
pub fn last_attempted_at(&self) -> Instant {
self.last_attempted_at
.unwrap_or_else(|| self.first_attempted_at())
}

/// Get the next time at which this should be retried
pub fn next_attempt_at(&self) -> Instant<C> {
pub fn next_attempt_at(&self) -> Instant {
let after_start = match self.strategy {
| Strategy::Delay { .. } => Milliseconds(self.init.0 * (self.attempts.0 as u64)),
| Strategy::Exponential { .. } => {
Expand All @@ -124,21 +114,7 @@ impl<C> RetryTimer<C> where C: Clock
}
}

impl<C> Copy for RetryTimer<C> where C: Clock {}
impl<C> Clone for RetryTimer<C> where C: Clock
{
fn clone(&self) -> Self {
Self { start: self.start,
init: self.init,
last_attempted_at: self.last_attempted_at,
strategy: self.strategy,
attempts: self.attempts,
max_attempts: self.max_attempts }
}
}

impl<C> PartialEq for RetryTimer<C> where C: Clock
{
impl PartialEq for RetryTimer {
fn eq(&self, other: &Self) -> bool {
self.start == other.start
&& self.init == other.init
Expand All @@ -149,7 +125,7 @@ impl<C> PartialEq for RetryTimer<C> where C: Clock
}
}

impl<C> Eq for RetryTimer<C> where C: Clock {}
impl Eq for RetryTimer {}

/// A number of attempts
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
Expand Down Expand Up @@ -258,10 +234,8 @@ impl Strategy {

#[cfg(test)]
mod test {
use embedded_time::rate::Fraction;
use embedded_time::Clock;

use super::*;
use crate::time::{Clock, ClockError, Instant};

#[derive(Debug)]
pub struct FakeClock(pub *const u64);
Expand All @@ -271,12 +245,9 @@ mod test {
}
}

impl embedded_time::Clock for FakeClock {
type T = u64;

const SCALING_FACTOR: Fraction = Fraction::new(1, 1000);

fn try_now(&self) -> Result<Instant<Self>, embedded_time::clock::Error> {
impl Clock for FakeClock {
fn try_now(&self) -> Result<Instant, ClockError> {
// FakeClock stores raw millisecond values
unsafe { Ok(Instant::new(*self.0)) }
}
}
Expand Down
17 changes: 5 additions & 12 deletions toad/src/std/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
#![allow(clippy::many_single_char_names)]

use embedded_time::rate::Fraction;

/// Networking! woohoo!
pub mod net;
use core::marker::PhantomData;
Expand Down Expand Up @@ -88,7 +86,7 @@ impl<StepError, SocketError> PlatformError<StepError, SocketError> for io::Error
io::Error::new(io::ErrorKind::Other, format!("{:?}", e))
}

fn clock(e: embedded_time::clock::Error) -> Self {
fn clock(e: crate::time::ClockError) -> Self {
io::Error::new(io::ErrorKind::Other, format!("{:?}", e))
}
}
Expand Down Expand Up @@ -172,7 +170,7 @@ impl<Sec, Steps> crate::platform::Platform<Steps> for Platform<Sec, Steps>
}
}

/// Implement [`embedded_time::Clock`] using [`std::time`] primitives
/// Implement [`crate::time::Clock`] using [`std::time`] primitives
#[derive(Debug, Clone, Copy)]
pub struct Clock(std::time::Instant);

Expand All @@ -189,15 +187,10 @@ impl Clock {
}
}

impl embedded_time::Clock for Clock {
type T = u64;

// microseconds
const SCALING_FACTOR: Fraction = Fraction::new(1, 1_000_000);

fn try_now(&self) -> Result<embedded_time::Instant<Self>, embedded_time::clock::Error> {
impl crate::time::Clock for Clock {
fn try_now(&self) -> Result<crate::time::Instant, crate::time::ClockError> {
let now = std::time::Instant::now();
let elapsed = now.duration_since(self.0);
Ok(embedded_time::Instant::new(elapsed.as_micros() as u64))
Ok(crate::time::Instant::new(elapsed.as_millis() as u64))
}
}
9 changes: 3 additions & 6 deletions toad/src/step/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub mod runtime {
#[allow(missing_docs)]
pub type HandleAcks<M, S> = handle_acks::HandleAcks<S, Map<M, Addrd<Token>, ()>>;
#[allow(missing_docs)]
pub type Retry<P, A, S> = retry::Retry<S, Array<A, (retry::State<Clock<P>>, Addrd<Message<P>>)>>;
pub type Retry<P, A, S> = retry::Retry<S, Array<A, (retry::State, Addrd<Message<P>>)>>;
#[allow(missing_docs)]
pub type BufferResponses<P, M, S> =
buffer_responses::BufferResponses<S,
Expand All @@ -41,9 +41,7 @@ pub mod runtime {
pub type ProvisionIds<P, M, A, S> =
provision_ids::ProvisionIds<P,
S,
Map<M,
SocketAddrWithDefault,
Array<A, Stamped<Clock<P>, IdWithDefault>>>>;
Map<M, SocketAddrWithDefault, Array<A, Stamped<IdWithDefault>>>>;
#[allow(missing_docs)]
pub type Observe<P, A, S> = observe::Observe<S,
Array<A, observe::Sub<P>>,
Expand Down Expand Up @@ -457,14 +455,13 @@ impl<P: PlatformTypes> Step<P> for () {
#[cfg(test)]
#[allow(missing_docs)]
pub mod test {
use embedded_time::Clock;

use super::*;
use crate::test;
use crate::test::ClockMock;

pub fn default_snapshot() -> platform::Snapshot<test::Platform> {
platform::Snapshot { time: ClockMock::new().try_now().unwrap(),
platform::Snapshot { time: ClockMock::instant(0),
recvd_dgram: Some(crate::net::Addrd(Default::default(),
crate::test::dummy_addr())),
config: crate::config::Config::default() }
Expand Down
Loading
Loading