From 972f4d28104d378a353651a90091231523879c7a Mon Sep 17 00:00:00 2001 From: Yura Menshov Date: Tue, 17 Dec 2024 13:38:01 +0200 Subject: [PATCH 1/3] added Clock block --- README.md | 34 ++++ lib/protoflow-blocks/Cargo.toml | 4 +- lib/protoflow-blocks/doc/core/clock.mmd | 9 + lib/protoflow-blocks/doc/core/clock.seq.mmd | 12 ++ lib/protoflow-blocks/src/blocks/core.rs | 22 +++ lib/protoflow-blocks/src/blocks/core/clock.rs | 168 ++++++++++++++++++ lib/protoflow-blocks/src/lib.rs | 1 + lib/protoflow-blocks/src/system.rs | 12 +- 8 files changed, 257 insertions(+), 5 deletions(-) create mode 100644 lib/protoflow-blocks/doc/core/clock.mmd create mode 100644 lib/protoflow-blocks/doc/core/clock.seq.mmd create mode 100644 lib/protoflow-blocks/src/blocks/core/clock.rs diff --git a/README.md b/README.md index a6222644..610e3f9d 100644 --- a/README.md +++ b/README.md @@ -113,6 +113,7 @@ The built-in blocks provided by Protoflow are listed below: | Block | Description | |:------------------|:-------------------------------------------------------------------------------------------------------------------------------| | [`Buffer`] | Stores all messages it receives. | +| [`Clock`] | Periodically sends current timestamp. | | [`ConcatStrings`] | Concatenates the received string messages, with an optional delimiter string inserted between each message. | | [`Const`] | Sends a constant value. | | [`Count`] | Counts the number of messages it receives, while optionally passing them through. | @@ -159,6 +160,38 @@ block-beta protoflow execute Buffer ``` +#### [`Clock`] + +A block that periodically sends current timestamp. + +```mermaid +block-beta + columns 4 + Clock space:2 Sink + Clock-- "output" -->Sink + + classDef block height:48px,padding:8px; + classDef hidden visibility:none; + class Clock block + class Sink hidden +``` + +```bash +protoflow execute Clock fixed=2 +``` + +```bash +protoflow execute Clock fixed=0.5 +``` + +```bash +protoflow execute Clock random=1..5 +``` + +```bash +protoflow execute Clock random=0.5..1.5 +``` + #### [`ConcatStrings`] A block for concatenating all string messages it receives, with an optional delimiter string inserted between each message @@ -795,6 +828,7 @@ To add a new block type implementation, make sure to examine and amend: [`examples`]: lib/protoflow/examples [`Buffer`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.Buffer.html +[`Clock`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.Clock.html [`ConcatStrings`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.ConcatStrings.html [`Const`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.Const.html [`Count`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.Count.html diff --git a/lib/protoflow-blocks/Cargo.toml b/lib/protoflow-blocks/Cargo.toml index 52c54740..02786628 100644 --- a/lib/protoflow-blocks/Cargo.toml +++ b/lib/protoflow-blocks/Cargo.toml @@ -25,8 +25,9 @@ hash-sha2 = ["dep:sha2"] rand = ["protoflow-core/rand"] std = [ - "blake3?/std", "protoflow-core/std", + "chrono/std", + "blake3?/std", "serde?/std", "sysml-model?/std", "tracing?/std", @@ -67,6 +68,7 @@ struson = "0.5" sysml-model = { version = "=0.2.3", default-features = false, optional = true } ubyte = { version = "0.10", default-features = false } csv = "1.3.1" +chrono = { version = "0.4.39", default-features = false, features = ["now"] } [dev-dependencies] bytes = "1.8.0" diff --git a/lib/protoflow-blocks/doc/core/clock.mmd b/lib/protoflow-blocks/doc/core/clock.mmd new file mode 100644 index 00000000..37a38b9c --- /dev/null +++ b/lib/protoflow-blocks/doc/core/clock.mmd @@ -0,0 +1,9 @@ +block-beta + columns 4 + Clock space:2 Sink + Clock-- "output" -->Sink + + classDef block height:48px,padding:8px; + classDef hidden visibility:none; + class Clock block + class Sink hidden diff --git a/lib/protoflow-blocks/doc/core/clock.seq.mmd b/lib/protoflow-blocks/doc/core/clock.seq.mmd new file mode 100644 index 00000000..6a6be6ab --- /dev/null +++ b/lib/protoflow-blocks/doc/core/clock.seq.mmd @@ -0,0 +1,12 @@ +sequenceDiagram + autonumber + participant Clock as Clock block + participant Clock.output as Clock.output port + participant BlockA as Another block + + Clock-->>BlockA: Connect + + Clock->>BlockA: Message + + Clock-->>Clock.output: Close + Clock-->>BlockA: Disconnect diff --git a/lib/protoflow-blocks/src/blocks/core.rs b/lib/protoflow-blocks/src/blocks/core.rs index 15557ba0..0420837b 100644 --- a/lib/protoflow-blocks/src/blocks/core.rs +++ b/lib/protoflow-blocks/src/blocks/core.rs @@ -14,6 +14,16 @@ pub mod core { pub trait CoreBlocks { fn buffer + 'static>(&mut self) -> Buffer; + fn clock(&mut self, delay: DelayType) -> Clock; + + fn clock_fixed(&mut self, delay: Duration) -> Clock { + self.clock(DelayType::Fixed(delay)) + } + + fn clock_random(&mut self, delay: Range) -> Clock { + self.clock(DelayType::Random(delay)) + } + fn const_string(&mut self, value: impl ToString) -> Const; fn count(&mut self) -> Count; @@ -41,6 +51,7 @@ pub mod core { #[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] pub enum CoreBlockTag { Buffer, + Clock, Const, Count, Delay, @@ -55,6 +66,11 @@ pub mod core { input: InputPortName, }, + Clock { + output: OutputPortName, + delay: DelayType, + }, + Const { output: OutputPortName, value: String, @@ -87,6 +103,7 @@ pub mod core { use CoreBlockConfig::*; Cow::Borrowed(match self { Buffer { .. } => "Buffer", + Clock { .. } => "Clock", Const { .. } => "Const", Count { .. } => "Count", Delay { .. } => "Delay", @@ -101,6 +118,7 @@ pub mod core { use CoreBlockConfig::*; match self { Buffer { .. } => vec![], + Clock { output, .. } => vec![("output", Some(output.clone()))], Const { output, .. } => vec![("output", Some(output.clone()))], Count { output, count, .. } => { vec![("output", output.clone()), ("count", Some(count.clone()))] @@ -118,6 +136,7 @@ pub mod core { use CoreBlockConfig::*; match self { Buffer { .. } => Box::new(super::Buffer::new(system.input_any())), // TODO: Buffer::with_system(system) + Clock { delay, .. } => Box::new(super::Clock::with_system(system, delay.clone())), Const { value, .. } => Box::new(super::Const::with_system(system, value.clone())), Count { .. } => Box::new(super::Count::new( system.input_any(), @@ -144,6 +163,9 @@ pub mod core { mod buffer; pub use buffer::*; + mod clock; + pub use clock::*; + mod r#const; pub use r#const::*; diff --git a/lib/protoflow-blocks/src/blocks/core/clock.rs b/lib/protoflow-blocks/src/blocks/core/clock.rs new file mode 100644 index 00000000..5f1a98ab --- /dev/null +++ b/lib/protoflow-blocks/src/blocks/core/clock.rs @@ -0,0 +1,168 @@ +// This is free and unencumbered software released into the public domain. + +use crate::{ + prelude::{vec, String}, + types::DelayType, + StdioConfig, StdioError, StdioSystem, System, +}; +use protoflow_core::{Block, BlockResult, BlockRuntime, Message, OutputPort}; +use protoflow_derive::Block; +use simple_mermaid::mermaid; + +/// A block that periodically sends current timestamp. +/// +/// This block sends current timestamp on its output port, with interval specified by the parameter. +/// +/// The timestamp is a Unix UTC timestamp in microseconds passed as a [`i64`] value. +/// +/// The block waits for the output port to be connected before sending the value. +/// +/// The block does not have any input ports nor state. +/// +/// # Block Diagram +// #[doc = mermaid!("../../../doc/core/clock.mmd")] +/// +/// # Sequence Diagram +// #[doc = mermaid!("../../../doc/core/clock.seq.mmd" framed)] +/// +/// # Examples +/// +/// ## Using the block in a system +/// +/// ```rust +/// # use protoflow_blocks::*; +/// # use std::time::Duration; +/// # fn main() { +/// System::build(|s| { +/// let stdin = s.clock_fixed(Duration::from_secs(1)); +/// let encode_lines = s.encode_lines(); +/// let stdout = s.write_stdout(); +/// s.connect(&stdin.output, &encode_lines.input); +/// s.connect(&encode_lines.output, &stdout.input); +/// }); +/// # } +/// ``` +/// +/// ## Running the block via the CLI +/// +/// ```console +/// $ protoflow execute Clock fixed=2 +/// ``` +/// +/// ```console +/// $ protoflow execute Clock fixed=0.5 +/// ``` +/// +/// ```console +/// $ protoflow execute Clock random=1..5 +/// ``` +/// +/// ```console +/// $ protoflow execute Clock random=0.5..1.5 +/// ``` +/// +#[derive(Block, Clone)] +pub struct Clock { + /// The port to send the timestamp on. + #[output] + pub output: OutputPort, + + /// A delay between outputs. + #[parameter] + pub delay: DelayType, +} + +impl Clock { + pub fn new(output: OutputPort, delay: DelayType) -> Self { + Self::with_params(output, delay) + } +} + +impl Clock { + pub fn with_params(output: OutputPort, delay: DelayType) -> Self { + Self { output, delay } + } +} + +impl Clock { + pub fn with_system(system: &System, delay: DelayType) -> Self { + use crate::SystemBuilding; + Self::with_params(system.output(), delay) + } +} + +impl Block for Clock { + fn execute(&mut self, runtime: &dyn BlockRuntime) -> BlockResult { + runtime.wait_for(&self.output)?; + + loop { + let now = chrono::Utc::now().timestamp_micros(); + self.output.send(&now)?; + + let duration = match self.delay { + DelayType::Fixed(duration) => duration, + DelayType::Random(ref range) => runtime.random_duration(range.clone()), + }; + runtime.sleep_for(duration)?; + } + } +} + +fn parse_range(range_str: &String) -> Option<(f64, f64)> { + use crate::prelude::Duration; + + if let Some(range_str) = range_str.split_once("..") { + match (range_str.0.parse::(), range_str.1.parse::()) { + (Ok(range0), Ok(range1)) => Some((range0, range1)), + _ => None, + } + } else { + None + } +} + +#[cfg(feature = "std")] +impl StdioSystem for Clock { + fn build_system(config: StdioConfig) -> Result { + use crate::{prelude::Duration, CoreBlocks, IoBlocks, SysBlocks, SystemBuilding}; + + let delay_type = if let Some(delay) = config.get_opt::("fixed")? { + DelayType::Fixed(Duration::from_secs_f64(delay)) + } else if let Some(delay) = config.get_opt::("random")? { + if let Some(range) = parse_range(&delay) { + DelayType::Random( + (Duration::from_secs_f64(range.0)..Duration::from_secs_f64(range.1)), + ) + } else { + return Err(StdioError::InvalidParameter("random")); + } + } else { + return Err(StdioError::MissingParameter("fixed or random")); + }; + + Ok(System::build(|s| { + let stdin = s.clock(delay_type); + let encode_lines = s.encode_lines(); + let stdout = s.write_stdout(); + s.connect(&stdin.output, &encode_lines.input); + s.connect(&encode_lines.output, &stdout.input); + })) + } +} + +#[cfg(test)] +mod tests { + use super::Clock; + use crate::{prelude::Duration, DelayType, System, SystemBuilding}; + + #[test] + fn instantiate_block() { + // Check that the block is constructible: + let _ = System::build(|s| { + let _ = s.block(Clock::with_params( + s.output(), + DelayType::Fixed(Duration::from_secs(1)), + )); + }); + } +} diff --git a/lib/protoflow-blocks/src/lib.rs b/lib/protoflow-blocks/src/lib.rs index 30050a0b..8d773f29 100644 --- a/lib/protoflow-blocks/src/lib.rs +++ b/lib/protoflow-blocks/src/lib.rs @@ -54,6 +54,7 @@ pub fn build_stdio_system( Ok(match system_name.as_ref() { // CoreBlocks "Buffer" => Buffer::::build_system(config)?, + "Clock" => Clock::build_system(config)?, "Const" => Const::::build_system(config)?, "Count" => Count::::build_system(config)?, "Delay" => Delay::::build_system(config)?, diff --git a/lib/protoflow-blocks/src/system.rs b/lib/protoflow-blocks/src/system.rs index 695ccfba..6c551204 100644 --- a/lib/protoflow-blocks/src/system.rs +++ b/lib/protoflow-blocks/src/system.rs @@ -5,10 +5,10 @@ use crate::{ prelude::{fmt, Arc, Box, FromStr, Rc, String, ToString}, types::{DelayType, Encoding}, - AllBlocks, Buffer, ConcatStrings, Const, CoreBlocks, Count, Decode, DecodeCsv, DecodeHex, - DecodeJson, Delay, Drop, Encode, EncodeCsv, EncodeHex, EncodeJson, FlowBlocks, HashBlocks, - IoBlocks, MathBlocks, Random, ReadDir, ReadEnv, ReadFile, ReadSocket, ReadStdin, SplitString, - SysBlocks, TextBlocks, WriteFile, WriteSocket, WriteStderr, WriteStdout, + AllBlocks, Buffer, Clock, ConcatStrings, Const, CoreBlocks, Count, Decode, DecodeCsv, + DecodeHex, DecodeJson, Delay, Drop, Encode, EncodeCsv, EncodeHex, EncodeJson, FlowBlocks, + HashBlocks, IoBlocks, MathBlocks, Random, ReadDir, ReadEnv, ReadFile, ReadSocket, ReadStdin, + SplitString, SysBlocks, TextBlocks, WriteFile, WriteSocket, WriteStderr, WriteStdout, }; use protoflow_core::{ Block, BlockID, BlockResult, BoxedBlockType, InputPort, Message, OutputPort, PortID, @@ -133,6 +133,10 @@ impl CoreBlocks for System { self.0.block(Buffer::::with_system(self)) } + fn clock(&mut self, delay: DelayType) -> Clock { + self.0.block(Clock::with_system(self, delay)) + } + fn const_string(&mut self, value: impl ToString) -> Const { self.0 .block(Const::::with_system(self, value.to_string())) From 60616947c802d382116bafa892d07a1483c4d2d1 Mon Sep 17 00:00:00 2001 From: Yura Menshov Date: Wed, 18 Dec 2024 09:06:53 +0200 Subject: [PATCH 2/3] fix warnings --- lib/protoflow-blocks/src/blocks/core/clock.rs | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/lib/protoflow-blocks/src/blocks/core/clock.rs b/lib/protoflow-blocks/src/blocks/core/clock.rs index 5f1a98ab..d5a87f56 100644 --- a/lib/protoflow-blocks/src/blocks/core/clock.rs +++ b/lib/protoflow-blocks/src/blocks/core/clock.rs @@ -1,11 +1,7 @@ // This is free and unencumbered software released into the public domain. -use crate::{ - prelude::{vec, String}, - types::DelayType, - StdioConfig, StdioError, StdioSystem, System, -}; -use protoflow_core::{Block, BlockResult, BlockRuntime, Message, OutputPort}; +use crate::{prelude::String, types::DelayType, StdioConfig, StdioError, StdioSystem, System}; +use protoflow_core::{Block, BlockResult, BlockRuntime, OutputPort}; use protoflow_derive::Block; use simple_mermaid::mermaid; @@ -109,8 +105,6 @@ impl Block for Clock { } fn parse_range(range_str: &String) -> Option<(f64, f64)> { - use crate::prelude::Duration; - if let Some(range_str) = range_str.split_once("..") { match (range_str.0.parse::(), range_str.1.parse::()) { (Ok(range0), Ok(range1)) => Some((range0, range1)), @@ -131,7 +125,7 @@ impl StdioSystem for Clock { } else if let Some(delay) = config.get_opt::("random")? { if let Some(range) = parse_range(&delay) { DelayType::Random( - (Duration::from_secs_f64(range.0)..Duration::from_secs_f64(range.1)), + Duration::from_secs_f64(range.0)..Duration::from_secs_f64(range.1), ) } else { return Err(StdioError::InvalidParameter("random")); From 9b90bd8b598490c7fd33d8262cea3259df5889ae Mon Sep 17 00:00:00 2001 From: Yura Menshov Date: Wed, 18 Dec 2024 09:07:15 +0200 Subject: [PATCH 3/3] fix mermaid --- lib/protoflow-blocks/src/blocks/core/clock.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/protoflow-blocks/src/blocks/core/clock.rs b/lib/protoflow-blocks/src/blocks/core/clock.rs index d5a87f56..e9aae696 100644 --- a/lib/protoflow-blocks/src/blocks/core/clock.rs +++ b/lib/protoflow-blocks/src/blocks/core/clock.rs @@ -16,10 +16,10 @@ use simple_mermaid::mermaid; /// The block does not have any input ports nor state. /// /// # Block Diagram -// #[doc = mermaid!("../../../doc/core/clock.mmd")] +#[doc = mermaid!("../../../doc/core/clock.mmd")] /// /// # Sequence Diagram -// #[doc = mermaid!("../../../doc/core/clock.seq.mmd" framed)] +#[doc = mermaid!("../../../doc/core/clock.seq.mmd" framed)] /// /// # Examples ///