diff --git a/README.md b/README.md index a6222644..a2f27a19 100644 --- a/README.md +++ b/README.md @@ -127,6 +127,7 @@ The built-in blocks provided by Protoflow are listed below: | [`EncodeHex`] | Encodes a byte stream into hexadecimal form. | | [`EncodeJSON`] | Encodes messages into JSON format. | | [`Hash`] | Computes the cryptographic hash of a byte stream. | +| [`MapFrom`] | Maps a message from one type to another. | | [`Random`] | Generates and sends a random value. | | [`ReadDir`] | Reads file names from a file system directory. | | [`ReadEnv`] | Reads the value of an environment variable. | @@ -483,6 +484,24 @@ block-beta protoflow execute Hash algorithm=blake3 ``` +#### [`MapFrom`] + +A block to map a message from one type to another. + +```mermaid +block-beta + columns 7 + Source space:2 MapFrom space:2 Sink + Source-- "input" -->MapFrom + MapFrom-- "output" -->Sink + + classDef block height:48px,padding:8px; + classDef hidden visibility:none; + class MapFrom block + class Source hidden + class Sink hidden +``` + #### [`Random`] A block for generating and sending a random value. @@ -809,6 +828,7 @@ To add a new block type implementation, make sure to examine and amend: [`EncodeHex`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.EncodeHex.html [`EncodeJSON`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.EncodeJson.html [`Hash`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.Hash.html +[`MapFrom`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.MapFrom.html [`Random`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.Random.html [`ReadDir`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.ReadDir.html [`ReadEnv`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.ReadEnv.html diff --git a/lib/protoflow-blocks/doc/core/map_from.mmd b/lib/protoflow-blocks/doc/core/map_from.mmd new file mode 100644 index 00000000..42d0e520 --- /dev/null +++ b/lib/protoflow-blocks/doc/core/map_from.mmd @@ -0,0 +1,11 @@ +block-beta + columns 7 + Source space:2 MapFrom space:2 Sink + Source-- "input" -->MapFrom + MapFrom-- "output" -->Sink + + classDef block height:48px,padding:8px; + classDef hidden visibility:none; + class MapFrom block + class Source hidden + class Sink hidden diff --git a/lib/protoflow-blocks/doc/core/map_from.seq.mmd b/lib/protoflow-blocks/doc/core/map_from.seq.mmd new file mode 100644 index 00000000..48cb5c7f --- /dev/null +++ b/lib/protoflow-blocks/doc/core/map_from.seq.mmd @@ -0,0 +1,21 @@ +sequenceDiagram + autonumber + participant BlockA as Another block + participant MapFrom.input as MapFrom.input port + participant MapFrom as MapFrom block + participant MapFrom.output as MapFrom.output port + participant BlockB as Another block + + BlockA-->>MapFrom: Connect + MapFrom-->>BlockB: Connect + + loop MapFrom process + BlockA->>MapFrom: Message + MapFrom->>MapFrom: Transform + MapFrom->>BlockB: Message + end + + BlockA-->>MapFrom: Disconnect + MapFrom-->>MapFrom.input: Close + MapFrom-->>MapFrom.output: Close + MapFrom-->>BlockB: Disconnect diff --git a/lib/protoflow-blocks/src/block_config.rs b/lib/protoflow-blocks/src/block_config.rs index a6c86827..744a351d 100644 --- a/lib/protoflow-blocks/src/block_config.rs +++ b/lib/protoflow-blocks/src/block_config.rs @@ -51,7 +51,7 @@ impl<'de> serde::Deserialize<'de> for BlockConfig { tag, value: Value::Mapping(_mapping), } => Ok(match tag.string.as_str() { - "Buffer" | "Const" | "Count" | "Delay" | "Drop" | "Random" => { + "Buffer" | "Const" | "Count" | "Delay" | "Drop" | "MapFrom" | "Random" => { CoreBlockConfig::deserialize(value.clone()) .map(BlockConfig::Core) .unwrap() diff --git a/lib/protoflow-blocks/src/block_tag.rs b/lib/protoflow-blocks/src/block_tag.rs index 546a3f46..f0ec2c6b 100644 --- a/lib/protoflow-blocks/src/block_tag.rs +++ b/lib/protoflow-blocks/src/block_tag.rs @@ -16,6 +16,7 @@ pub enum BlockTag { Count, Delay, Drop, + MapFrom, Random, // FlowBlocks // HashBlocks @@ -77,6 +78,7 @@ impl BlockTag { Count => "Count", Delay => "Delay", Drop => "Drop", + MapFrom => "MapFrom", Random => "Random", #[cfg(any( feature = "hash-blake3", @@ -128,6 +130,7 @@ impl FromStr for BlockTag { "Count" => Count, "Delay" => Delay, "Drop" => Drop, + "MapFrom" => MapFrom, "Random" => Random, #[cfg(any( feature = "hash-blake3", @@ -204,6 +207,7 @@ impl BlockInstantiation for BlockTag { Encode => Box::new(super::Encode::::with_system(system, None)), EncodeHex => Box::new(super::EncodeHex::with_system(system)), EncodeJson => Box::new(super::EncodeJson::with_system(system)), + MapFrom => Box::new(super::MapFrom::::with_system(system)), #[cfg(feature = "std")] ReadDir => Box::new(super::ReadDir::with_system(system)), #[cfg(feature = "std")] diff --git a/lib/protoflow-blocks/src/blocks/core.rs b/lib/protoflow-blocks/src/blocks/core.rs index f6f4e760..f5f81759 100644 --- a/lib/protoflow-blocks/src/blocks/core.rs +++ b/lib/protoflow-blocks/src/blocks/core.rs @@ -34,6 +34,10 @@ pub mod core { fn drop(&mut self) -> Drop; + fn map_from + 'static>( + &mut self, + ) -> MapFrom; + fn random(&mut self) -> Random; fn random_seeded(&mut self, seed: Option) -> Random; @@ -47,6 +51,7 @@ pub mod core { Count, Delay, Drop, + MapFrom, Random, } @@ -78,6 +83,11 @@ pub mod core { input: InputPortName, }, + MapFrom { + input: InputPortName, + output: OutputPortName, + }, + Random { output: OutputPortName, seed: Option, @@ -93,6 +103,7 @@ pub mod core { Count { .. } => "Count", Delay { .. } => "Delay", Drop { .. } => "Drop", + MapFrom { .. } => "MapFrom", Random { .. } => "Random", }) } @@ -109,6 +120,7 @@ pub mod core { } Delay { output, .. } => vec![("output", Some(output.clone()))], Drop { .. } => vec![], + MapFrom { output, .. } => vec![("output", Some(output.clone()))], Random { output, .. } => vec![("output", Some(output.clone()))], } } @@ -135,6 +147,10 @@ pub mod core { // TODO: Delay::with_system(system, Some(delay.clone()))) } Drop { .. } => Box::new(super::Drop::new(system.input_any())), // TODO: Drop::with_system(system) + MapFrom { .. } => Box::new(super::MapFrom::with_params( + system.input_any(), + system.output_any(), + )), Random { seed, .. } => { Box::new(super::Random::with_params(system.output::(), *seed)) // TODO: Random::with_system(system, *seed)) @@ -158,6 +174,9 @@ pub mod core { mod drop; pub use drop::*; + mod map_from; + pub use map_from::*; + mod random; pub use random::*; } diff --git a/lib/protoflow-blocks/src/blocks/core/map_from.rs b/lib/protoflow-blocks/src/blocks/core/map_from.rs new file mode 100644 index 00000000..8252ef52 --- /dev/null +++ b/lib/protoflow-blocks/src/blocks/core/map_from.rs @@ -0,0 +1,97 @@ +// This is free and unencumbered software released into the public domain. + +use crate::{prelude::Bytes, StdioConfig, StdioError, StdioSystem, System}; +use protoflow_core::{Block, BlockResult, BlockRuntime, InputPort, Message, OutputPort}; +use protoflow_derive::Block; +use simple_mermaid::mermaid; + +/// A block to map a message from one type to another. +/// +/// # Block Diagram +#[doc = mermaid!("../../../doc/core/map_from.mmd")] +/// +/// # Sequence Diagram +#[doc = mermaid!("../../../doc/core/map_from.seq.mmd" framed)] +/// +/// # Examples +/// +/// ## Using the block in a system +/// +/// ```rust +/// # use protoflow_blocks::*; +/// # fn main() { +/// System::build(|s| { +/// // TODO +/// }); +/// # } +/// ``` +/// +#[derive(Block, Clone)] +pub struct MapFrom> { + /// The input message stream. + #[input] + pub input: InputPort, + + /// The output message stream. + #[output] + pub output: OutputPort, +} + +impl> MapFrom { + pub fn new(input: InputPort, output: OutputPort) -> Self { + Self::with_params(input, output) + } +} + +impl> MapFrom { + pub fn with_params(input: InputPort, output: OutputPort) -> Self { + Self { input, output } + } +} + +impl + 'static> MapFrom { + pub fn with_system(system: &System) -> Self { + use crate::SystemBuilding; + Self::with_params(system.input(), system.output()) + } +} + +impl> Block for MapFrom { + fn execute(&mut self, _: &dyn BlockRuntime) -> BlockResult { + while let Some(input) = self.input.recv()? { + let output: Output = From::from(input); + self.output.send(&output)?; + } + + Ok(()) + } +} + +#[cfg(feature = "std")] +impl> StdioSystem for MapFrom { + fn build_system(config: StdioConfig) -> Result { + use crate::SystemBuilding; + + config.reject_any()?; + + Ok(System::build(|s| { + let stdin = config.read_stdin(s); + let map = s.block(MapFrom::::with_system(s)); + s.connect(&stdin.output, &map.input); + })) + } +} + +#[cfg(test)] +mod tests { + use super::MapFrom; + use crate::{System, SystemBuilding}; + + #[test] + fn instantiate_block() { + // Check that the block is constructible: + let _ = System::build(|s| { + let _ = s.block(MapFrom::::with_params(s.input(), s.output())); + }); + } +} diff --git a/lib/protoflow-blocks/src/lib.rs b/lib/protoflow-blocks/src/lib.rs index 41d1decd..62ff3469 100644 --- a/lib/protoflow-blocks/src/lib.rs +++ b/lib/protoflow-blocks/src/lib.rs @@ -58,6 +58,7 @@ pub fn build_stdio_system( "Count" => Count::::build_system(config)?, "Delay" => Delay::::build_system(config)?, "Drop" => Drop::::build_system(config)?, + "MapFrom" => MapFrom::::build_system(config)?, "Random" => Random::::build_system(config)?, // FlowBlocks // HashBlocks diff --git a/lib/protoflow-blocks/src/system.rs b/lib/protoflow-blocks/src/system.rs index 8d0da65a..513560a1 100644 --- a/lib/protoflow-blocks/src/system.rs +++ b/lib/protoflow-blocks/src/system.rs @@ -7,8 +7,8 @@ use crate::{ types::{DelayType, Encoding}, AllBlocks, Buffer, ConcatStrings, Const, CoreBlocks, Count, Decode, DecodeCsv, DecodeHex, DecodeJson, Delay, Drop, Encode, EncodeCsv, EncodeHex, EncodeJson, FlowBlocks, HashBlocks, - IoBlocks, MathBlocks, Random, ReadDir, ReadEnv, ReadFile, ReadStdin, SplitString, SysBlocks, - TextBlocks, WriteFile, WriteStderr, WriteStdout, + IoBlocks, MapFrom, MathBlocks, Random, ReadDir, ReadEnv, ReadFile, ReadStdin, SplitString, + SysBlocks, TextBlocks, WriteFile, WriteStderr, WriteStdout, }; #[cfg(all(feature = "std", feature = "serde"))] use crate::{ReadSocket, WriteSocket}; @@ -169,6 +169,12 @@ impl CoreBlocks for System { self.0.block(Drop::::with_system(self)) } + fn map_from + 'static>( + &mut self, + ) -> MapFrom { + self.0.block(MapFrom::::with_system(self)) + } + fn random(&mut self) -> Random { self.0.block(Random::::with_system(self, None)) }