diff --git a/README.md b/README.md index a6222644..2a360a50 100644 --- a/README.md +++ b/README.md @@ -112,7 +112,9 @@ The built-in blocks provided by Protoflow are listed below: | Block | Description | |:------------------|:-------------------------------------------------------------------------------------------------------------------------------| +| [`Batch`] | Batches input strem into chunks of a specified size. | | [`Buffer`] | Stores all messages it receives. | +| [`Concat`] | Concatenates multiple input message streams into a single output stream. | | [`ConcatStrings`] | Concatenates the received string messages, with an optional delimiter string inserted between each message. | | [`Const`] | Sends a constant value. | | [`Count`] | Counts the number of messages it receives, while optionally passing them through. | @@ -121,24 +123,52 @@ The built-in blocks provided by Protoflow are listed below: | [`DecodeHex`] | Decodes hexadecimal stream to byte stream. | | [`DecodeJSON`] | Decodes JSON messages from a byte stream. | | [`Delay`] | Passes messages through while delaying them by a fixed or random duration. | +| [`Distinct`] | Removes duplicate values from the input stream. | | [`Drop`] | Discards all messages it receives. | | [`Encode`] | Encodes messages to a byte stream. | | [`EncodeCSV`] | Encodes the provided header and rows, given as `prost_types::Value`, into a CSV-formatted byte stream. | | [`EncodeHex`] | Encodes a byte stream into hexadecimal form. | | [`EncodeJSON`] | Encodes messages into JSON format. | | [`Hash`] | Computes the cryptographic hash of a byte stream. | +| [`MapInto`] | Maps a message from one type to another via Into trait. | +| [`Merge`] | Merges multiple input message streams into a single output stream by interleaving messages as they arrive. | | [`Random`] | Generates and sends a random value. | | [`ReadDir`] | Reads file names from a file system directory. | | [`ReadEnv`] | Reads the value of an environment variable. | | [`ReadFile`] | Reads bytes from the contents of a file. | | [`ReadSocket`] | Reads bytes from a TCP socket. | | [`ReadStdin`] | Reads bytes from standard input (aka stdin). | +| [`Replicate`] | Duplicates a single input message stream into multiple identical output streams. | +| [`Sort`] | Sorts a single input message stream in ascending order. | +| [`Split`] | Divides a single input message stream into multiple output streams using a round-robin approach. | | [`SplitString`] | Splits the received input message, with an optional delimiter string parameter. | | [`WriteFile`] | Writes or appends bytes to the contents of a file. | | [`WriteSocket`] | Writes bytes to a TCP socket | | [`WriteStderr`] | Writes bytes to standard error (aka stderr). | | [`WriteStdout`] | Writes bytes to standard output (aka stdout). | +#### [`Batch`] + +A block that simply stores all messages it receives. + +```mermaid +block-beta + columns 7 + Source space:2 Batch space:2 Sink + Source-- "input" -->Batch + Batch-- "output" -->Sink + + classDef block height:48px,padding:8px; + classDef hidden visibility:none; + class Batch block + class Source hidden + class Sink hidden +``` + +```bash +protoflow execute Batch +``` + #### [`Buffer`] A block that simply stores all messages it receives. @@ -159,6 +189,32 @@ block-beta protoflow execute Buffer ``` +#### [`Concat`] + +Concatenates multiple input message streams into a single output stream. + +```mermaid +block-beta + columns 7 + space:1 Source1 space:5 + space:3 Concat space:1 Sink space:1 + space:1 Source2 space:5 + Source1-- "input" -->Concat + Source2-- "input" -->Concat + Concat-- "output" -->Sink + + classDef block height:48px,padding:8px; + classDef hidden visibility:none; + class Concat block + class Source1 hidden + class Source2 hidden + class Sink hidden +``` + +```bash +protoflow execute Concat +``` + #### [`ConcatStrings`] A block for concatenating all string messages it receives, with an optional delimiter string inserted between each message @@ -250,6 +306,28 @@ block-beta protoflow execute Decode encoding=text ``` +#### [`Distinct`] + +Removes duplicate values from the input stream. + +```mermaid +block-beta + columns 7 + Source space:2 Distinct space:2 Sink + Source-- "input" -->Distinct + Distinct-- "output" -->Sink + + classDef block height:48px,padding:8px; + classDef hidden visibility:none; + class Distinct block + class Source hidden + class Sink hidden +``` + +```bash +protoflow execute Distinct +``` + #### [`DecodeCSV`] A block that decodes CSV files from a byte stream into a header and rows represented as `prost_types::Value` @@ -483,6 +561,54 @@ block-beta protoflow execute Hash algorithm=blake3 ``` +#### [`MapInto`] + +Maps a message from one type to another via Into trait. + +```mermaid +block-beta + columns 7 + Source space:2 MapInto space:2 Sink + Source-- "input" -->MapInto + MapInto-- "output" -->Sink + + classDef block height:48px,padding:8px; + classDef hidden visibility:none; + class MapInto block + class Source hidden + class Sink hidden +``` + +```bash +protoflow execute MapInto +``` + +#### [`Merge`] + +Combines multiple input message streams into a single output stream by interleaving messages as they arrive. + +```mermaid +block-beta + columns 7 + space:1 Source1 space:5 + space:3 Merge space:1 Sink space:1 + space:1 Source2 space:5 + Source1-- "input" -->Merge + Source2-- "input" -->Merge + Merge-- "output" -->Sink + + classDef block height:48px,padding:8px; + classDef hidden visibility:none; + class Merge block + class Source1 hidden + class Source2 hidden + class Sink hidden +``` + +```bash +protoflow execute Merge +``` + #### [`Random`] A block for generating and sending a random value. @@ -618,6 +744,83 @@ block-beta protoflow execute ReadStdin < input.txt ``` +#### [`Replicate`] + +Duplicates a single input message stream into multiple identical output streams. + +```mermaid +block-beta + columns 7 + space:5 Sink1 space:1 + space:1 Source space:1 Replicate space:3 + space:5 Sink2 space:1 + + Source-- "input" -->Replicate + Replicate-- "output" -->Sink1 + Replicate-- "output" -->Sink2 + + classDef block height:48px,padding:8px; + classDef hidden visibility:none; + class Replicate block + class Source1 hidden + class Source2 hidden + class Sink1 hidden + class Sink2 hidden +``` + +```bash +protoflow execute Replicate +``` + +#### [`Sort`] + +Sorts a single input message stream in ascending order. + +```mermaid +block-beta + columns 7 + Source space:2 Sort space:2 Sink + Source-- "input" -->Sort + Sort-- "output" -->Sink + + classDef block height:48px,padding:8px; + classDef hidden visibility:none; + class Sort block + class Source hidden + class Sink hidden + class Sink2 hidden +``` + +```bash +protoflow execute Sort +``` + +#### [`Split`] + +Divides a single input message stream into multiple output streams using a round-robin approach. + +```mermaid +block-beta + columns 7 + space:5 Sink1 space:1 + space:1 Source space:1 Split space:3 + space:5 Sink2 space:1 + Source-- "input" -->Split + Split-- "output_1" -->Sink1 + Split-- "output_2" -->Sink2 + + classDef block height:48px,padding:8px; + classDef hidden visibility:none; + class Split block + class Source hidden + class Sink1 hidden + class Sink2 hidden +``` + +```bash +protoflow execute Split +``` + #### [`SplitString`] A block that splits the received input message, with an optional delimiter string parameter @@ -794,7 +997,9 @@ To add a new block type implementation, make sure to examine and amend: [`echo_lines`]: lib/protoflow/examples/echo_lines [`examples`]: lib/protoflow/examples +[`Batch`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.Batch.html [`Buffer`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.Buffer.html +[`Concat`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.Concat.html [`ConcatStrings`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.ConcatStrings.html [`Const`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.Const.html [`Count`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.Count.html @@ -803,18 +1008,24 @@ To add a new block type implementation, make sure to examine and amend: [`DecodeHex`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.DecodeHex.html [`DecodeJSON`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.DecodeJson.html [`Delay`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.Delay.html +[`Distinct`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.Distinct.html [`Drop`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.Drop.html [`Encode`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.Encode.html [`EncodeCSV`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.EncodeCsv.html [`EncodeHex`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.EncodeHex.html [`EncodeJSON`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.EncodeJson.html [`Hash`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.Hash.html +[`MapInto`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.MapInto.html +[`Merge`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.Merge.html [`Random`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.Random.html [`ReadDir`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.ReadDir.html [`ReadEnv`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.ReadEnv.html [`ReadFile`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.ReadFile.html [`ReadSocket`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.ReadSocket.html [`ReadStdin`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.ReadStdin.html +[`Replicate`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.Replicate.html +[`Sort`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.Sort.html +[`Split`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.Split.html [`SplitString`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.SplitString.html [`WriteFile`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.WriteFile.html [`WriteSocket`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.WriteSocket.html diff --git a/lib/protoflow-blocks/doc/flow/batch.mmd b/lib/protoflow-blocks/doc/flow/batch.mmd new file mode 100644 index 00000000..6e702128 --- /dev/null +++ b/lib/protoflow-blocks/doc/flow/batch.mmd @@ -0,0 +1,11 @@ +block-beta + columns 7 + Source space:2 Batch space:2 Sink + Source-- "input" -->Batch + Batch-- "output" -->Sink + + classDef block height:48px,padding:8px; + classDef hidden visibility:none; + class Batch block + class Source hidden + class Sink hidden diff --git a/lib/protoflow-blocks/doc/flow/batch.seq.mmd b/lib/protoflow-blocks/doc/flow/batch.seq.mmd new file mode 100644 index 00000000..0a28ee2e --- /dev/null +++ b/lib/protoflow-blocks/doc/flow/batch.seq.mmd @@ -0,0 +1,26 @@ +sequenceDiagram + autonumber + participant BlockA as Another block + participant Batch.input as Batch.input port + participant Batch as Batch block + participant Batch.output as Batch.output port + participant BlockB as Another block + + BlockA-->>Batch: Connect + Batch-->>BlockB: Connect + + loop Batch process + loop loop Until batch size is reached + BlockA->>Batch: Message + Batch->>Batch: Store Batch messages + end + loop Send buffered messages + Batch->>BlockB: Message + end + end + + + BlockA-->>Batch: Disconnect + Batch-->>Batch.input: Close + Batch-->>Batch.output: Close + Batch-->>BlockB: Disconnect diff --git a/lib/protoflow-blocks/doc/flow/concat.mmd b/lib/protoflow-blocks/doc/flow/concat.mmd new file mode 100644 index 00000000..a5108805 --- /dev/null +++ b/lib/protoflow-blocks/doc/flow/concat.mmd @@ -0,0 +1,15 @@ +block-beta + columns 7 + space:1 Source1 space:5 + space:3 Concat space:1 Sink space:1 + space:1 Source2 space:5 + Source1-- "input" -->Concat + Source2-- "input" -->Concat + Concat-- "output" -->Sink + + classDef block height:48px,padding:8px; + classDef hidden visibility:none; + class Concat block + class Source1 hidden + class Source2 hidden + class Sink hidden diff --git a/lib/protoflow-blocks/doc/flow/concat.seq.mmd b/lib/protoflow-blocks/doc/flow/concat.seq.mmd new file mode 100644 index 00000000..bde6827e --- /dev/null +++ b/lib/protoflow-blocks/doc/flow/concat.seq.mmd @@ -0,0 +1,28 @@ +sequenceDiagram + autonumber + participant BlockA as Another block + participant BlockA2 as Another block + participant Concat.input as Concat.input port + participant Concat.input2 as Concat.input port + participant Concat as Concat block + participant Concat.output as Concat.output port + participant BlockB as Another block + + BlockA-->>Concat: Connect + BlockA2-->>Concat: Connect + Concat-->>BlockB: Connect + + loop Concat process + BlockA->>Concat: Message + Concat->>Concat: Store message + BlockA2->>Concat: Message + Concat->>Concat: Store message + end + Concat->>Concat: Concat messages + Concat->>BlockB: Message + BlockA-->>Concat: Disconnect + BlockA2-->>Concat: Disconnect + Concat-->>Concat.input: Close + Concat-->>Concat.input2: Close + Concat-->>Concat.output: Close + Concat-->>BlockB: Disconnect diff --git a/lib/protoflow-blocks/doc/flow/distinct.mmd b/lib/protoflow-blocks/doc/flow/distinct.mmd new file mode 100644 index 00000000..a9468026 --- /dev/null +++ b/lib/protoflow-blocks/doc/flow/distinct.mmd @@ -0,0 +1,11 @@ +block-beta + columns 7 + Source space:2 Distinct space:2 Sink + Source-- "input" -->Distinct + Distinct-- "output" -->Sink + + classDef block height:48px,padding:8px; + classDef hidden visibility:none; + class Distinct block + class Source hidden + class Sink hidden diff --git a/lib/protoflow-blocks/doc/flow/distinct.seq.mmd b/lib/protoflow-blocks/doc/flow/distinct.seq.mmd new file mode 100644 index 00000000..c8a33715 --- /dev/null +++ b/lib/protoflow-blocks/doc/flow/distinct.seq.mmd @@ -0,0 +1,21 @@ +sequenceDiagram + autonumber + participant BlockA as Another block + participant Distinct.input as Distinct.input port + participant Distinct as Distinct block + participant Distinct.output as Distinct.output port + participant BlockB as Another block + + BlockA-->>Distinct: Connect + Distinct-->>BlockB: Connect + + loop Distinct process + BlockA->>Distinct: Message + Distinct->>Distinct: Store distinct messages + end + + Distinct->>BlockB: Message + BlockA-->>Distinct: Disconnect + Distinct-->>Distinct.input: Close + Distinct-->>Distinct.output: Close + Distinct-->>BlockB: Disconnect diff --git a/lib/protoflow-blocks/doc/flow/map_into.mmd b/lib/protoflow-blocks/doc/flow/map_into.mmd new file mode 100644 index 00000000..735bbcef --- /dev/null +++ b/lib/protoflow-blocks/doc/flow/map_into.mmd @@ -0,0 +1,11 @@ +block-beta + columns 7 + Source space:2 MapInto space:2 Sink + Source-- "input" -->MapInto + MapInto-- "output" -->Sink + + classDef block height:48px,padding:8px; + classDef hidden visibility:none; + class MapInto block + class Source hidden + class Sink hidden diff --git a/lib/protoflow-blocks/doc/flow/map_into.seq.mmd b/lib/protoflow-blocks/doc/flow/map_into.seq.mmd new file mode 100644 index 00000000..b9d2c99f --- /dev/null +++ b/lib/protoflow-blocks/doc/flow/map_into.seq.mmd @@ -0,0 +1,21 @@ +sequenceDiagram + autonumber + participant BlockA as Another block + participant MapInto.input as MapInto.input port + participant MapInto as MapInto block + participant MapInto.output as MapInto.output port + participant BlockB as Another block + + BlockA-->>MapInto: Connect + MapInto-->>BlockB: Connect + + loop MapInto process + BlockA->>MapInto: Message + MapInto->>MapInto: Transform + MapInto->>BlockB: Message + end + + BlockA-->>MapInto: Disconnect + MapInto-->>MapInto.input: Close + MapInto-->>MapInto.output: Close + MapInto-->>BlockB: Disconnect diff --git a/lib/protoflow-blocks/doc/flow/merge.mmd b/lib/protoflow-blocks/doc/flow/merge.mmd new file mode 100644 index 00000000..90caf8e8 --- /dev/null +++ b/lib/protoflow-blocks/doc/flow/merge.mmd @@ -0,0 +1,15 @@ +block-beta + columns 7 + space:1 Source1 space:5 + space:3 Merge space:1 Sink space:1 + space:1 Source2 space:5 + Source1-- "input" -->Merge + Source2-- "input" -->Merge + Merge-- "output" -->Sink + + classDef block height:48px,padding:8px; + classDef hidden visibility:none; + class Merge block + class Source1 hidden + class Source2 hidden + class Sink hidden diff --git a/lib/protoflow-blocks/doc/flow/merge.seq.mmd b/lib/protoflow-blocks/doc/flow/merge.seq.mmd new file mode 100644 index 00000000..73d0f467 --- /dev/null +++ b/lib/protoflow-blocks/doc/flow/merge.seq.mmd @@ -0,0 +1,26 @@ +sequenceDiagram + autonumber + participant BlockA as Another block + participant BlockA2 as Another block + participant Merge.input as Merge.input port + participant Merge.input2 as Merge.input port + participant Merge as Merge block + participant Merge.output as Merge.output port + participant BlockB as Another block + + BlockA-->>Merge: Connect + BlockA2-->>Merge: Connect + Merge-->>BlockB: Connect + + loop Merge process + BlockA->>Merge: Message + Merge->>BlockB: Message + BlockA2->>Merge: Message + Merge->>BlockB: Message + end + BlockA-->>Merge: Disconnect + BlockA2-->>Merge: Disconnect + Merge-->>Merge.input: Close + Merge-->>Merge.input2: Close + Merge-->>Merge.output: Close + Merge-->>BlockB: Disconnect diff --git a/lib/protoflow-blocks/doc/flow/replicate.mmd b/lib/protoflow-blocks/doc/flow/replicate.mmd new file mode 100644 index 00000000..e2c629ec --- /dev/null +++ b/lib/protoflow-blocks/doc/flow/replicate.mmd @@ -0,0 +1,17 @@ +block-beta + columns 7 + space:5 Sink1 space:1 + space:1 Source space:1 Replicate space:3 + space:5 Sink2 space:1 + + Source-- "input" -->Replicate + Replicate-- "output" -->Sink1 + Replicate-- "output" -->Sink2 + + classDef block height:48px,padding:8px; + classDef hidden visibility:none; + class Replicate block + class Source1 hidden + class Source2 hidden + class Sink1 hidden + class Sink2 hidden diff --git a/lib/protoflow-blocks/doc/flow/replicate.seq.mmd b/lib/protoflow-blocks/doc/flow/replicate.seq.mmd new file mode 100644 index 00000000..5966892b --- /dev/null +++ b/lib/protoflow-blocks/doc/flow/replicate.seq.mmd @@ -0,0 +1,27 @@ +sequenceDiagram + autonumber + participant BlockA as Another block + participant Replicate.input as Replicate.input port + participant Replicate as Replicate block + participant Replicate.output_1 as Replicate.output port + participant BlockB as Another block + participant Replicate.output_2 as Replicate.output port + participant BlockC as Another block + + BlockA-->>Replicate: Connect + Replicate-->>BlockB: Connect + Replicate-->>BlockC: Connect + + loop Replicate process + BlockA->>Replicate: Message + Replicate->>BlockB: Message + Replicate->>BlockC: Message + + end + + BlockA-->>Replicate: Disconnect + Replicate-->>Replicate.input: Close + Replicate-->>Replicate.output_1: Close + Replicate-->>BlockB: Disconnect + Replicate-->>Replicate.output_2: Close + Replicate-->>BlockC: Disconnect diff --git a/lib/protoflow-blocks/doc/flow/sort.mmd b/lib/protoflow-blocks/doc/flow/sort.mmd new file mode 100644 index 00000000..c182da61 --- /dev/null +++ b/lib/protoflow-blocks/doc/flow/sort.mmd @@ -0,0 +1,11 @@ +block-beta + columns 7 + Source space:2 Sort space:2 Sink + Source-- "input" -->Sort + Sort-- "output" -->Sink + + classDef block height:48px,padding:8px; + classDef hidden visibility:none; + class Sort block + class Source hidden + class Sink hidden diff --git a/lib/protoflow-blocks/doc/flow/sort.seq.mmd b/lib/protoflow-blocks/doc/flow/sort.seq.mmd new file mode 100644 index 00000000..7269070f --- /dev/null +++ b/lib/protoflow-blocks/doc/flow/sort.seq.mmd @@ -0,0 +1,22 @@ +sequenceDiagram + autonumber + participant BlockA as Another block + participant Sort.input as Sort.input port + participant Sort as Sort block + participant Sort.output as Sort.output port + participant BlockB as Another block + + BlockA-->>Sort: Connect + Sort-->>BlockB: Connect + + loop Sort process + BlockA->>Sort: Message + Sort->>Sort: Store message + end + + Sort->>Sort: Sort messages + Sort->>BlockB: Message + BlockA-->>Sort: Disconnect + Sort-->>Sort.input: Close + Sort-->>Sort.output: Close + Sort-->>BlockB: Disconnect diff --git a/lib/protoflow-blocks/doc/flow/split.mmd b/lib/protoflow-blocks/doc/flow/split.mmd new file mode 100644 index 00000000..55bb0aad --- /dev/null +++ b/lib/protoflow-blocks/doc/flow/split.mmd @@ -0,0 +1,15 @@ +block-beta + columns 7 + space:5 Sink1 space:1 + space:1 Source space:1 Split space:3 + space:5 Sink2 space:1 + Source-- "input" -->Split + Split-- "output_1" -->Sink1 + Split-- "output_2" -->Sink2 + + classDef block height:48px,padding:8px; + classDef hidden visibility:none; + class Split block + class Source hidden + class Sink1 hidden + class Sink2 hidden diff --git a/lib/protoflow-blocks/doc/flow/split.seq.mmd b/lib/protoflow-blocks/doc/flow/split.seq.mmd new file mode 100644 index 00000000..3f61f282 --- /dev/null +++ b/lib/protoflow-blocks/doc/flow/split.seq.mmd @@ -0,0 +1,29 @@ +sequenceDiagram + autonumber + participant BlockA as Another block + participant Split.input as Split.input port + participant Split as Split block + participant Split.output_1 as Split.output_1 port + participant BlockB as Another block + participant Split.output_2 as Split.output_2 port + participant BlockC as Another block + + BlockA-->>Split: Connect + Split-->>BlockB: Connect + Split-->>BlockC: Connect + + loop Split process + BlockA->>Split: Message + Split->>Split: Decide the next output port + Split->>BlockB: Message + BlockA->>Split: Message + Split->>Split: Decide the next output port + Split->>BlockC: Message + end + + BlockA-->>Split: Disconnect + Split-->>Split.input: Close + Split-->>Split.output_1: Close + Split-->>BlockB: Disconnect + Split-->>Split.output_2: Close + Split-->>BlockC: Disconnect diff --git a/lib/protoflow-blocks/src/block_config.rs b/lib/protoflow-blocks/src/block_config.rs index a6c86827..d4c400e4 100644 --- a/lib/protoflow-blocks/src/block_config.rs +++ b/lib/protoflow-blocks/src/block_config.rs @@ -57,6 +57,11 @@ impl<'de> serde::Deserialize<'de> for BlockConfig { .unwrap() } + "Batch" | "Concat" | "Distinct" | "MapInto" | "Merge" | "Replicate" | "Sort" + | "Split" => FlowBlockConfig::deserialize(value.clone()) + .map(BlockConfig::Flow) + .unwrap(), + #[cfg(any( feature = "hash-blake3", feature = "hash-md5", diff --git a/lib/protoflow-blocks/src/block_tag.rs b/lib/protoflow-blocks/src/block_tag.rs index 546a3f46..93bf614c 100644 --- a/lib/protoflow-blocks/src/block_tag.rs +++ b/lib/protoflow-blocks/src/block_tag.rs @@ -5,7 +5,7 @@ use crate::{ BlockInstantiation, System, }; use enum_iterator::Sequence; -use protoflow_core::{types::Any, Block}; +use protoflow_core::{types::Any, Block, ComparableAny}; #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] #[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, Sequence)] @@ -18,6 +18,14 @@ pub enum BlockTag { Drop, Random, // FlowBlocks + Batch, + Concat, + Distinct, + MapInto, + Merge, + Replicate, + Sort, + Split, // HashBlocks #[cfg(any( feature = "hash-blake3", @@ -72,7 +80,9 @@ impl BlockTag { pub fn as_str(&self) -> &'static str { use BlockTag::*; match self { + Batch => "Batch", Buffer => "Buffer", + Concat => "Concat", Const => "Const", Count => "Count", Delay => "Delay", @@ -88,9 +98,12 @@ impl BlockTag { Decode => "Decode", DecodeHex => "DecodeHex", DecodeJson => "DecodeJSON", + Distinct => "Distinct", Encode => "Encode", EncodeHex => "EncodeHex", EncodeJson => "EncodeJSON", + MapInto => "MapInto", + Merge => "Merge", #[cfg(feature = "std")] ReadDir => "ReadDir", #[cfg(feature = "std")] @@ -101,6 +114,9 @@ impl BlockTag { ReadSocket => "ReadSocket", #[cfg(feature = "std")] ReadStdin => "ReadStdin", + Replicate => "Replicate", + Sort => "Sort", + Split => "Split", #[cfg(feature = "std")] WriteFile => "WriteFile", #[cfg(all(feature = "std", feature = "serde"))] @@ -123,10 +139,13 @@ impl FromStr for BlockTag { fn from_str(input: &str) -> Result { use BlockTag::*; Ok(match input { + "Batch" => Batch, "Buffer" => Buffer, + "Concat" => Concat, "Const" => Const, "Count" => Count, "Delay" => Delay, + "Distinct" => Distinct, "Drop" => Drop, "Random" => Random, #[cfg(any( @@ -142,6 +161,8 @@ impl FromStr for BlockTag { "Encode" => Encode, "EncodeHex" => EncodeHex, "EncodeJSON" => EncodeJson, + "MapInto" => MapInto, + "Merge" => Merge, #[cfg(feature = "std")] "ReadDir" => ReadDir, #[cfg(feature = "std")] @@ -152,6 +173,9 @@ impl FromStr for BlockTag { "ReadSocket" => ReadSocket, #[cfg(feature = "std")] "ReadStdin" => ReadStdin, + "Replicate" => Replicate, + "Sort" => Sort, + "Split" => Split, #[cfg(feature = "std")] "WriteFile" => WriteFile, #[cfg(all(feature = "std", feature = "serde"))] @@ -185,7 +209,9 @@ impl BlockInstantiation for BlockTag { fn instantiate(&self, system: &mut System) -> Box { use BlockTag::*; match self { + Batch => Box::new(super::Batch::::with_system(system, None)), Buffer => Box::new(super::Buffer::::with_system(system)), + Concat => Box::new(super::Concat::::with_system(system)), Const => Box::new(super::Const::::with_system(system, String::new())), Count => Box::new(super::Count::::with_system(system)), Delay => Box::new(super::Delay::::with_system(system, None)), @@ -201,9 +227,12 @@ impl BlockInstantiation for BlockTag { Decode => Box::new(super::Decode::::with_system(system, None)), DecodeHex => Box::new(super::DecodeHex::with_system(system)), DecodeJson => Box::new(super::DecodeJson::with_system(system)), + Distinct => Box::new(super::Distinct::::with_system(system)), Encode => Box::new(super::Encode::::with_system(system, None)), EncodeHex => Box::new(super::EncodeHex::with_system(system)), EncodeJson => Box::new(super::EncodeJson::with_system(system)), + MapInto => Box::new(super::MapInto::::with_system(system)), + Merge => Box::new(super::Merge::::with_system(system)), #[cfg(feature = "std")] ReadDir => Box::new(super::ReadDir::with_system(system)), #[cfg(feature = "std")] @@ -214,6 +243,9 @@ impl BlockInstantiation for BlockTag { ReadSocket => Box::new(super::ReadSocket::with_system(system, None)), #[cfg(feature = "std")] ReadStdin => Box::new(super::ReadStdin::with_system(system, None)), + Replicate => Box::new(super::Replicate::::with_system(system)), + Sort => Box::new(super::Sort::::with_system(system)), + Split => Box::new(super::Split::::with_system(system)), #[cfg(feature = "std")] WriteFile => Box::new(super::WriteFile::with_system(system, None)), #[cfg(all(feature = "std", feature = "serde"))] diff --git a/lib/protoflow-blocks/src/blocks/flow.rs b/lib/protoflow-blocks/src/blocks/flow.rs index 21496e25..b676021b 100644 --- a/lib/protoflow-blocks/src/blocks/flow.rs +++ b/lib/protoflow-blocks/src/blocks/flow.rs @@ -1,30 +1,203 @@ // This is free and unencumbered software released into the public domain. pub mod flow { + use crate::{InputPortName, OutputPortName}; + use super::{ - prelude::{Cow, Named}, - BlockConnections, BlockInstantiation, + prelude::{vec, Box, Cow, Named, Vec}, + BlockConnections, BlockInstantiation, System, }; - pub trait FlowBlocks {} + use protoflow_core::{Block, ComparableAny, Message}; + + pub trait FlowBlocks { + fn batch + 'static>(&mut self, batch_size: usize) -> Batch; + fn concat + 'static>(&mut self) -> Concat; + fn distinct + PartialEq + 'static>(&mut self) -> Distinct; + fn map_into + 'static, Output: Message + 'static>( + &mut self, + ) -> MapInto; + fn merge + 'static>(&mut self) -> Merge; + fn replicate + 'static>(&mut self) -> Replicate; + fn sort + PartialOrd + 'static>(&mut self) -> Sort; + fn split + 'static>(&mut self) -> Split; + } #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] #[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] - pub enum FlowBlockTag {} + pub enum FlowBlockTag { + Batch, + Concat, + Distinct, + MapInto, + Merge, + Replicate, + Sort, + Split, + } #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] #[derive(Clone, Debug)] - pub enum FlowBlockConfig {} + pub enum FlowBlockConfig { + Batch { + input: InputPortName, + output: OutputPortName, + }, + Concat { + input_1: InputPortName, + input_2: InputPortName, + output: OutputPortName, + }, + Distinct { + input: InputPortName, + output: OutputPortName, + }, + MapInto { + input: InputPortName, + output: OutputPortName, + }, + Merge { + input_1: InputPortName, + input_2: InputPortName, + output: OutputPortName, + }, + Replicate { + input: InputPortName, + output_1: OutputPortName, + output_2: OutputPortName, + }, + Sort { + input: InputPortName, + output: OutputPortName, + }, + Split { + input: InputPortName, + output_1: OutputPortName, + output_2: OutputPortName, + }, + } impl Named for FlowBlockConfig { fn name(&self) -> Cow { - unreachable!() + use FlowBlockConfig::*; + Cow::Borrowed(match self { + Batch { .. } => "Batch", + Concat { .. } => "Concat", + Distinct { .. } => "Distinct", + MapInto { .. } => "MapInto", + Merge { .. } => "Merge", + Replicate { .. } => "Replicate", + Sort { .. } => "Sort", + Split { .. } => "Split", + }) } } - impl BlockConnections for FlowBlockConfig {} + impl BlockConnections for FlowBlockConfig { + fn output_connections(&self) -> Vec<(&'static str, Option)> { + use FlowBlockConfig::*; + match self { + Batch { output, .. } => { + vec![("output", Some(output.clone()))] + } + Concat { output, .. } => { + vec![("output", Some(output.clone()))] + } + Distinct { output, .. } => { + vec![("output", Some(output.clone()))] + } + MapInto { output, .. } => { + vec![("output", Some(output.clone()))] + } + Merge { output, .. } => { + vec![("output", Some(output.clone()))] + } + Replicate { + output_1, output_2, .. + } => { + vec![ + ("output_1", Some(output_1.clone())), + ("output_2", Some(output_2.clone())), + ] + } + Sort { output, .. } => { + vec![("output", Some(output.clone()))] + } + Split { + output_1, output_2, .. + } => { + vec![ + ("output_1", Some(output_1.clone())), + ("output_2", Some(output_2.clone())), + ] + } + } + } + } + + impl BlockInstantiation for FlowBlockConfig { + fn instantiate(&self, system: &mut System) -> Box { + use super::SystemBuilding; + use FlowBlockConfig::*; + match self { + Batch { .. } => Box::new(super::Batch::new(system.input_any(), system.output())), + Concat { .. } => Box::new(super::Concat::new( + system.input_any(), + system.input_any(), + system.output(), + )), + Distinct { .. } => { + Box::new(super::Distinct::new(system.input_any(), system.output())) + } + MapInto { .. } => { + Box::new(super::MapInto::new(system.input_any(), system.output_any())) + } + Merge { .. } => Box::new(super::Merge::new( + system.input_any(), + system.input_any(), + system.output(), + )), + Replicate { .. } => Box::new(super::Replicate::new( + system.input_any(), + system.output(), + system.output(), + )), + Sort { .. } => Box::new(super::Sort::new( + system.input::(), + system.output(), + )), + Split { .. } => Box::new(super::Split::new( + system.input_any(), + system.output(), + system.output(), + )), + } + } + } + + mod batch; + pub use batch::*; + + mod concat; + pub use concat::*; + + mod distinct; + pub use distinct::*; + + mod map_into; + pub use map_into::*; + + mod merge; + pub use merge::*; + + mod replicate; + pub use replicate::*; + + mod sort; + pub use sort::*; - impl BlockInstantiation for FlowBlockConfig {} + mod split; + pub use split::*; } pub use flow::*; diff --git a/lib/protoflow-blocks/src/blocks/flow/batch.rs b/lib/protoflow-blocks/src/blocks/flow/batch.rs new file mode 100644 index 00000000..502f3c61 --- /dev/null +++ b/lib/protoflow-blocks/src/blocks/flow/batch.rs @@ -0,0 +1,162 @@ +// This is free and unencumbered software released into the public domain. + +use crate::{StdioConfig, StdioError, StdioSystem, System}; +use protoflow_core::{ + info, + prelude::{vec, Vec}, + types::Any, + Block, BlockResult, BlockRuntime, InputPort, Message, OutputPort, +}; +use protoflow_derive::Block; +use simple_mermaid::mermaid; + +/// Batches input strem into chunks of a specified size. +/// +/// # Block Diagram +#[doc = mermaid!("../../../doc/flow/batch.mmd")] +/// +/// # Sequence Diagram +#[doc = mermaid!("../../../doc/flow/batch.seq.mmd" framed)] +/// +/// # Examples +/// +/// ## Using the block in a system +/// +/// ```rust +/// # use protoflow_blocks::*; +/// # fn main() { +/// System::build(|s| { +/// let stdin = s.read_stdin(); +/// let batch = s.batch(2); +/// s.connect(&stdin.output, &batch.input); +/// }); +/// # } +/// ``` +/// +/// ## Running the block via the CLI +/// +/// ```console +/// $ protoflow execute Batch +/// ``` +/// +#[derive(Block, Clone)] +pub struct Batch { + /// The input message stream. + #[input] + pub input: InputPort, + + /// The output message stream. + #[output] + pub output: OutputPort, + + /// Batch size + #[parameter] + pub batch_size: usize, + + /// The internal state storing the messages received. + #[state] + messages: Vec, +} + +impl Batch { + pub fn new(input: InputPort, output: OutputPort) -> Self { + Self::with_params(input, output, None) + } + pub fn with_params( + input: InputPort, + output: OutputPort, + batch_size: Option, + ) -> Self { + Self { + input, + output, + batch_size: batch_size.unwrap_or(1), + messages: Vec::new(), + } + } + pub fn messages(&self) -> &Vec { + &self.messages + } +} + +impl Batch { + pub fn with_system(system: &System, batch_size: Option) -> Self { + use crate::SystemBuilding; + Self::with_params(system.input(), system.output(), batch_size) + } +} + +impl Block for Batch { + fn execute(&mut self, _runtime: &dyn BlockRuntime) -> BlockResult { + while let Some(message) = self.input.recv()? { + info!("Buffered one message"); + self.messages.push(message); + + if self.batch_size == self.messages().len() { + info!("Sending messages"); + for message in self.messages.drain(..) { + self.output.send(&message)? + } + } + } + + //send remaining messages + info!("Sending remaining messages"); + for message in self.messages.drain(..) { + self.output.send(&message)? + } + + Ok(()) + } +} + +#[cfg(feature = "std")] +impl StdioSystem for Batch { + fn build_system(config: StdioConfig) -> Result { + use crate::SystemBuilding; + + config.allow_only(vec!["batch-size"])?; + + Ok(System::build(|s| { + let batch_size = config.get::("batch-size").unwrap_or(1); + let stdin = config.read_stdin(s); + let batch = Batch::with_system(&s, Some(batch_size)); + s.connect(&stdin.output, &batch.input); + })) + } +} + +#[cfg(test)] +mod tests { + use protoflow_core::error; + + use super::Batch; + use crate::{FlowBlocks, System, SystemBuilding}; + + #[test] + fn instantiate_block() { + // Check that the block is constructible: + let _ = System::build(|s| { + let _ = s.block(Batch::::new(s.input(), s.output())); + }); + } + + #[test] + #[ignore = "requires stdin"] + fn run_batch_stdout() { + use super::*; + use crate::SysBlocks; + use protoflow_core::SystemBuilding; + + if let Err(e) = System::run(|s| { + let stdin = s.read_stdin(); + let batch = s.batch(2); + s.connect(&stdin.output, &batch.input); + + let stdout_1 = s.write_stdout(); + s.connect(&batch.output, &stdout_1.input); + }) { + error!("{}", e) + } + } +} diff --git a/lib/protoflow-blocks/src/blocks/flow/concat.rs b/lib/protoflow-blocks/src/blocks/flow/concat.rs new file mode 100644 index 00000000..b9a202a6 --- /dev/null +++ b/lib/protoflow-blocks/src/blocks/flow/concat.rs @@ -0,0 +1,190 @@ +// This is free and unencumbered software released into the public domain. +extern crate std; + +use crate::{ + prelude::{Arc, Vec}, + FlowBlocks, StdioConfig, StdioError, StdioSystem, SysBlocks, System, +}; +use protoflow_core::{ + error, info, types::Any, Block, BlockError, BlockResult, BlockRuntime, InputPort, Message, + OutputPort, PortError, +}; +use protoflow_derive::Block; +use simple_mermaid::mermaid; + +/// Combines multiple input streams into a single output stream in sequence. +/// +/// # Block Diagram +#[doc = mermaid!("../../../doc/flow/concat.mmd")] +/// +/// # Sequence Diagram +#[doc = mermaid!("../../../doc/flow/concat.seq.mmd" framed)] +/// +/// # Examples +/// +/// ## Using the block in a system +/// +/// ```rust +/// # use protoflow_blocks::*; +/// # fn main() { +/// System::build(|s| { +/// let stdin = s.read_stdin(); +/// +/// let replicate = s.replicate(); +/// s.connect(&stdin.output, &replicate.input); +/// +/// let concat = s.block(Concat::new(s.input(), s.input(), s.output())); +/// s.connect(&replicate.output_1, &concat.input_1); +/// s.connect(&replicate.output_2, &concat.input_2); +/// +/// let stdout_1 = s.write_stdout(); +/// s.connect(&concat.output, &stdout_1.input); +/// }); +/// # } +/// ``` +/// +/// ## Running the block via the CLI +/// +/// ```console +/// $ protoflow execute Concat +/// ``` +/// +#[derive(Block, Clone)] +pub struct Concat { + /// The input message stream. + #[input] + pub input_1: InputPort, + #[output] + pub input_2: InputPort, + #[output] + pub output: OutputPort, +} + +impl Concat { + pub fn new(input_1: InputPort, input_2: InputPort, output: OutputPort) -> Self { + Self { + input_1, + input_2, + output, + } + } +} +impl Concat { + pub fn with_system(system: &System) -> Self { + use crate::SystemBuilding; + Self::new(system.input(), system.input(), system.output()) + } +} + +impl Block for Concat { + fn execute(&mut self, runtime: &dyn BlockRuntime) -> BlockResult { + // Ensure the output channel is ready + runtime.wait_for(&self.output)?; + + let input1 = Arc::new(self.input_1.clone()); + let input2 = Arc::new(self.input_2.clone()); + + // Helper function to buffer messages from an input + fn buffer_input( + input: Arc>, + input_name: &str, + ) -> Result, PortError> { + let mut buffer = Vec::new(); + while let Ok(Some(message)) = input.recv() { + buffer.push(message); + } + info!("{} processed {} messages", input_name, buffer.len()); + Ok(buffer) + } + + // Spawn threads to process and buffer messages from both inputs + let handle1 = std::thread::spawn({ + let input1 = Arc::clone(&input1); + move || buffer_input(input1, "input1") + }); + + let handle2 = std::thread::spawn({ + let input2 = Arc::clone(&input2); + move || buffer_input(input2, "input2") + }); + + // Collect and handle thread results + let buffer1 = match handle1.join() { + Ok(result) => result.map_err(|e| { + error!("Error processing input1: {:?}", e); + BlockError::Other("Failed to process input1".into()) + })?, + Err(_) => { + error!("Thread for input1 panicked"); + return Err(BlockError::Other("Thread for input1 panicked".into())); + } + }; + + let buffer2 = match handle2.join() { + Ok(result) => result.map_err(|e| { + error!("Error processing input2: {:?}", e); + BlockError::Other("Failed to process input2".into()) + })?, + Err(_) => { + error!("Thread for input2 panicked"); + return Err(BlockError::Other("Thread for input2 panicked".into())); + } + }; + + // Concatenate and send messages to the output sequentially + info!( + "Concatenating {} messages from input1 with {} messages from input2", + buffer1.len(), + buffer2.len() + ); + + for message in buffer1.iter().chain(buffer2.iter()) { + if let Err(err) = self.output.send(message) { + error!("Failed to send message: {:?}", err); + return Err(err.into()); + } + } + + info!("All messages successfully sent to the output."); + Ok(()) + } +} + +#[cfg(feature = "std")] +impl StdioSystem for Concat { + fn build_system(config: StdioConfig) -> Result { + use crate::SystemBuilding; + config.reject_any()?; + + Ok(System::build(|s| { + let stdin = s.read_stdin(); + + let replicate = s.replicate(); + s.connect(&stdin.output, &replicate.input); + + let concat = s.block(Concat::new(s.input(), s.input(), s.output())); + + s.connect(&replicate.output_1, &concat.input_1); + s.connect(&replicate.output_2, &concat.input_2); + + let stdout_1 = s.write_stdout(); + s.connect(&concat.output, &stdout_1.input); + })) + } +} + +#[cfg(test)] +mod concat_tests { + use crate::{FlowBlocks, System}; + use protoflow_core::prelude::String; + + extern crate std; + + #[test] + fn instantiate_block() { + // Check that the block is constructible: + let _ = System::build(|s| { + let _ = s.concat::(); + }); + } +} diff --git a/lib/protoflow-blocks/src/blocks/flow/distinct.rs b/lib/protoflow-blocks/src/blocks/flow/distinct.rs new file mode 100644 index 00000000..056334bf --- /dev/null +++ b/lib/protoflow-blocks/src/blocks/flow/distinct.rs @@ -0,0 +1,140 @@ +// This is free and unencumbered software released into the public domain. + +use crate::{StdioConfig, StdioError, StdioSystem, System}; +use protoflow_core::{ + info, prelude::Vec, types::Any, Block, BlockResult, BlockRuntime, InputPort, Message, + OutputPort, +}; +use protoflow_derive::Block; +use simple_mermaid::mermaid; + +/// Removes duplicate values from the input stream. +/// +/// # Block Diagram +#[doc = mermaid!("../../../doc/flow/distinct.mmd")] +/// +/// # Sequence Diagram +#[doc = mermaid!("../../../doc/flow/distinct.seq.mmd" framed)] +/// +/// # Examples +/// +/// ## Using the block in a system +/// +/// ```rust +/// # use protoflow_blocks::*; +/// # fn main() { +/// System::build(|s| { +/// let stdin = s.read_stdin(); +/// let distinct = s.distinct(); +/// s.connect(&stdin.output, &distinct.input); +/// }); +/// # } +/// ``` +/// +/// ## Running the block via the CLI +/// +/// ```console +/// $ protoflow execute Distinct +/// ``` +/// +#[derive(Block, Clone)] +pub struct Distinct { + /// The input message stream. + #[input] + pub input: InputPort, + + /// The output message stream. + #[output] + pub output: OutputPort, + + /// The internal state storing the messages received. + #[state] + messages: Vec, +} + +impl Distinct { + pub fn new(input: InputPort, output: OutputPort) -> Self { + Self { + input, + output, + messages: Vec::new(), + } + } + + pub fn messages(&self) -> &Vec { + &self.messages + } +} + +impl Distinct { + pub fn with_system(system: &System) -> Self { + use crate::SystemBuilding; + Self::new(system.input(), system.output()) + } +} + +impl Block for Distinct { + fn execute(&mut self, _runtime: &dyn BlockRuntime) -> BlockResult { + while let Some(message) = self.input.recv()? { + info!("Buffered one message"); + if !self.messages.contains(&message) { + self.messages.push(message); + } + } + + info!("Sending messages"); + for message in self.messages.drain(..) { + self.output.send(&message)?; + } + + Ok(()) + } +} + +#[cfg(feature = "std")] +impl StdioSystem for Distinct { + fn build_system(config: StdioConfig) -> Result { + use crate::SystemBuilding; + + config.reject_any()?; + + Ok(System::build(|s| { + let stdin = config.read_stdin(s); + let distinct = s.block(Distinct::new(s.input(), s.output())); + s.connect(&stdin.output, &distinct.input); + })) + } +} + +#[cfg(test)] +mod tests { + use super::Distinct; + use crate::{System, SystemBuilding}; + + #[test] + fn instantiate_block() { + // Check that the block is constructible: + let _ = System::build(|s| { + let _ = s.block(Distinct::::new(s.input(), s.output())); + }); + } + + #[test] + #[ignore = "requires stdin"] + fn run_distinct_stdout() { + use super::*; + use crate::SysBlocks; + use protoflow_core::{error, SystemBuilding}; + + if let Err(e) = System::run(|s| { + let stdin = s.read_stdin(); + let distinct = s.block(Distinct::new(s.input(), s.output())); + s.connect(&stdin.output, &distinct.input); + + let stdout_1 = s.write_stdout(); + s.connect(&distinct.output, &stdout_1.input); + }) { + error!("{}", e) + } + } +} diff --git a/lib/protoflow-blocks/src/blocks/flow/map_into.rs b/lib/protoflow-blocks/src/blocks/flow/map_into.rs new file mode 100644 index 00000000..b11190be --- /dev/null +++ b/lib/protoflow-blocks/src/blocks/flow/map_into.rs @@ -0,0 +1,98 @@ +// This is free and unencumbered software released into the public domain. + +use crate::{prelude::Bytes, StdioConfig, StdioError, StdioSystem, System}; +use protoflow_core::{info, Block, BlockResult, BlockRuntime, InputPort, Message, OutputPort}; +use protoflow_derive::Block; +use simple_mermaid::mermaid; + +/// A block to map a message from one type to another. +/// +/// # Block Diagram +#[doc = mermaid!("../../../doc/flow/map_into.mmd")] +/// +/// # Sequence Diagram +#[doc = mermaid!("../../../doc/flow/map_into.seq.mmd" framed)] +/// +/// # Examples +/// +/// ## Using the block in a system +/// +/// ```rust +/// # use protoflow_blocks::*; +/// # fn main() { +/// System::build(|s| { +/// // TODO +/// }); +/// # } +/// ``` +/// +#[derive(Block, Clone)] +pub struct MapInto, Output: Message> { + /// The input message stream. + #[input] + pub input: InputPort, + + /// The output message stream. + #[output] + pub output: OutputPort, +} + +impl, Output: Message> MapInto { + pub fn new(input: InputPort, output: OutputPort) -> Self { + Self::with_params(input, output) + } +} + +impl, Output: Message> MapInto { + pub fn with_params(input: InputPort, output: OutputPort) -> Self { + Self { input, output } + } +} + +impl + 'static, Output: Message + 'static> MapInto { + pub fn with_system(system: &System) -> Self { + use crate::SystemBuilding; + Self::with_params(system.input(), system.output()) + } +} + +impl, Output: Message> Block for MapInto { + fn execute(&mut self, _: &dyn BlockRuntime) -> BlockResult { + while let Some(input) = self.input.recv()? { + info!("Received input: {:?}", input); + let output: Output = Into::into(input); + info!("Sending output: {:?}", output); + self.output.send(&output)?; + } + + Ok(()) + } +} +#[cfg(feature = "std")] +impl, Output: Message> StdioSystem for MapInto { + fn build_system(config: StdioConfig) -> Result { + use crate::SystemBuilding; + + config.reject_any()?; + + Ok(System::build(|s| { + let stdin = config.read_stdin(s); + let map = s.block(MapInto::::with_system(s)); + s.connect(&stdin.output, &map.input); + })) + } +} + +#[cfg(test)] +mod tests { + use super::MapInto; + use crate::{System, SystemBuilding}; + + #[test] + fn instantiate_block() { + // Check that the block is constructible: + let _ = System::build(|s| { + let _ = s.block(MapInto::::with_params(s.input(), s.output())); + }); + } +} diff --git a/lib/protoflow-blocks/src/blocks/flow/merge.rs b/lib/protoflow-blocks/src/blocks/flow/merge.rs new file mode 100644 index 00000000..903b03c4 --- /dev/null +++ b/lib/protoflow-blocks/src/blocks/flow/merge.rs @@ -0,0 +1,183 @@ +// This is free and unencumbered software released into the public domain. +extern crate std; + +use crate::{ + prelude::{format, Arc}, + FlowBlocks, StdioConfig, StdioError, StdioSystem, SysBlocks, System, +}; +use protoflow_core::{ + error, types::Any, Block, BlockError, BlockResult, BlockRuntime, InputPort, Message, OutputPort, +}; +use protoflow_derive::Block; +use simple_mermaid::mermaid; + +/// Combines multiple input message streams into a single output stream by interleaving messages as they arrive. +/// +/// # Block Diagram +#[doc = mermaid!("../../../doc/flow/merge.mmd")] +/// +/// # Sequence Diagram +#[doc = mermaid!("../../../doc/flow/merge.seq.mmd" framed)] +/// +/// # Examples +/// +/// ## Using the block in a system +/// +/// ```rust +/// # use protoflow_blocks::*; +/// # fn main() { +/// System::build(|s| { +/// let stdin = s.read_stdin(); +/// +/// let replicate = s.replicate(); +/// s.connect(&stdin.output, &replicate.input); +/// +/// let merge = s.block(Merge::new(s.input(), s.input(), s.output())); +/// s.connect(&replicate.output_1, &merge.input_1); +/// s.connect(&replicate.output_2, &merge.input_2); +/// +/// let stdout_1 = s.write_stdout(); +/// s.connect(&merge.output, &stdout_1.input); +/// }); +/// # } +/// ``` +/// +/// ## Running the block via the CLI +/// +/// ```console +/// $ protoflow execute Merge +/// ``` +/// +#[derive(Block, Clone)] +pub struct Merge { + /// The input message stream. + #[input] + pub input_1: InputPort, + #[output] + pub input_2: InputPort, + #[output] + pub output: OutputPort, +} + +impl Merge { + pub fn new(input_1: InputPort, input_2: InputPort, output: OutputPort) -> Self { + Self { + input_1, + input_2, + output, + } + } +} +impl Merge { + pub fn with_system(system: &System) -> Self { + use crate::SystemBuilding; + Self::new(system.input(), system.input(), system.output()) + } +} + +impl Block for Merge { + fn execute(&mut self, runtime: &dyn BlockRuntime) -> BlockResult { + runtime.wait_for(&self.output)?; + + let input1 = Arc::new(self.input_1.clone()); + let input2 = Arc::new(self.input_2.clone()); + let output = Arc::new(self.output.clone()); + + fn process_port( + input: Arc>, + output: Arc>, + ) -> Result<(), BlockError> { + while let Ok(Some(message)) = input.recv() { + if let Err(err) = output.send(&message) { + error!("Error sending message: {}", err); + return Err(BlockError::Other(format!("Error sending message: {}", err))); + } + } + Ok(()) + } + + let input1_thread = { + let input1_clone = Arc::clone(&input1); + let output_clone = Arc::clone(&output); + std::thread::spawn(move || process_port(input1_clone, output_clone)) + }; + + let input2_thread = { + let input2_clone = Arc::clone(&input2); + let output_clone = Arc::clone(&output); + std::thread::spawn(move || process_port(input2_clone, output_clone)) + }; + + if let Err(_) = input1_thread.join() { + error!("Thread for input1 panicked"); + return Err(BlockError::Other("Thread for input1 panicked".into())); + } + + if let Err(_) = input2_thread.join() { + error!("Thread for input2 panicked"); + return Err(BlockError::Other("Thread for input2 panicked".into())); + } + + Ok(()) + } +} + +#[cfg(feature = "std")] +impl StdioSystem for Merge { + fn build_system(config: StdioConfig) -> Result { + use crate::SystemBuilding; + config.reject_any()?; + + Ok(System::build(|s| { + let stdin = s.read_stdin(); + + let replicate = s.replicate(); + s.connect(&stdin.output, &replicate.input); + + let merge = s.block(Merge::new(s.input(), s.input(), s.output())); + + s.connect(&replicate.output_1, &merge.input_1); + s.connect(&replicate.output_2, &merge.input_2); + + let stdout_1 = s.write_stdout(); + s.connect(&merge.output, &stdout_1.input); + })) + } +} + +#[cfg(test)] +mod merge_tests { + use super::Merge; + use crate::{FlowBlocks, SysBlocks, System}; + use protoflow_core::{error, prelude::String, SystemBuilding}; + + extern crate std; + + #[test] + fn instantiate_block() { + // Check that the block is constructible: + let _ = System::build(|s| { + let _ = s.merge::(); + }); + } + #[test] + #[ignore = "requires stdin"] + fn run_block() { + if let Err(e) = System::run(|s| { + let stdin = s.read_stdin(); + + let replicate = s.replicate(); + s.connect(&stdin.output, &replicate.input); + + let merge = s.block(Merge::new(s.input(), s.input(), s.output())); + + s.connect(&replicate.output_1, &merge.input_1); + s.connect(&replicate.output_2, &merge.input_2); + + let stdout_1 = s.write_stdout(); + s.connect(&merge.output, &stdout_1.input); + }) { + error!("{}", e) + } + } +} diff --git a/lib/protoflow-blocks/src/blocks/flow/replicate.rs b/lib/protoflow-blocks/src/blocks/flow/replicate.rs new file mode 100644 index 00000000..8379927f --- /dev/null +++ b/lib/protoflow-blocks/src/blocks/flow/replicate.rs @@ -0,0 +1,123 @@ +// This is free and unencumbered software released into the public domain. + +use crate::{FlowBlocks, StdioConfig, StdioError, StdioSystem, SysBlocks, System}; +use protoflow_core::{ + info, types::Any, Block, BlockResult, BlockRuntime, InputPort, Message, OutputPort, +}; +use protoflow_derive::Block; +use simple_mermaid::mermaid; + +/// Duplicates a single input message stream into multiple identical output streams. +/// +/// # Block Diagram +#[doc = mermaid!("../../../doc/flow/replicate.mmd")] +/// +/// # Sequence Diagram +#[doc = mermaid!("../../../doc/flow/replicate.seq.mmd" framed)] +/// +/// # Examples +/// +/// ## Using the block in a system +/// +/// ```rust +/// # use protoflow_blocks::*; +/// # fn main() { +/// System::build(|s| { +/// let stdin = s.read_stdin(); +/// +/// let replicate = s.replicate(); +/// s.connect(&stdin.output, &replicate.input); +/// +/// let stdout_1 = s.write_stdout(); +/// s.connect(&replicate.output_1, &stdout_1.input); +/// +/// let stdout_2 = s.write_stdout(); +/// s.connect(&replicate.output_2, &stdout_2.input); +/// }); +/// # } +/// ``` +/// +/// ## Running the block via the CLI +/// +/// ```console +/// $ protoflow execute Replicate +/// ``` +/// +#[derive(Block, Clone)] +pub struct Replicate { + /// The input message stream. + #[input] + pub input: InputPort, + #[output] + pub output_1: OutputPort, + #[output] + pub output_2: OutputPort, +} + +impl Replicate { + pub fn new(input: InputPort, output_1: OutputPort, output_2: OutputPort) -> Self { + Self { + input, + output_1, + output_2, + } + } +} +impl Replicate { + pub fn with_system(system: &System) -> Self { + use crate::SystemBuilding; + Self::new(system.input(), system.output(), system.output()) + } +} + +impl Block for Replicate { + fn execute(&mut self, runtime: &dyn BlockRuntime) -> BlockResult { + runtime.wait_for(&self.output_1)?; + runtime.wait_for(&self.output_2)?; + + while let Some(message) = self.input.recv()? { + info!("Sending message"); + self.output_1.send(&message)?; + self.output_2.send(&message)?; + } + + Ok(()) + } +} + +#[cfg(feature = "std")] +impl StdioSystem for Replicate { + fn build_system(config: StdioConfig) -> Result { + use crate::SystemBuilding; + config.reject_any()?; + + Ok(System::build(|s| { + let stdin = s.read_stdin(); + + let replicate = s.replicate(); + s.connect(&stdin.output, &replicate.input); + + let stdout_1 = s.write_stdout(); + s.connect(&replicate.output_1, &stdout_1.input); + + let stdout_2 = s.write_stdout(); + s.connect(&replicate.output_2, &stdout_2.input); + })) + } +} + +#[cfg(test)] +mod replicate_tests { + use crate::{FlowBlocks, System}; + use protoflow_core::prelude::String; + + extern crate std; + + #[test] + fn instantiate_block() { + // Check that the block is constructible: + let _ = System::build(|s| { + let _ = s.replicate::(); + }); + } +} diff --git a/lib/protoflow-blocks/src/blocks/flow/sort.rs b/lib/protoflow-blocks/src/blocks/flow/sort.rs new file mode 100644 index 00000000..5dbc7956 --- /dev/null +++ b/lib/protoflow-blocks/src/blocks/flow/sort.rs @@ -0,0 +1,148 @@ +// This is free and unencumbered software released into the public domain. + +use core::cmp::Ordering; + +use crate::{StdioConfig, StdioError, StdioSystem, System}; +use protoflow_core::{ + error, info, prelude::Vec, types::Any, Block, BlockResult, BlockRuntime, InputPort, Message, + OutputPort, +}; +use protoflow_derive::Block; +use simple_mermaid::mermaid; + +/// Sorts a single input message stream in ascending order. +/// +/// # Block Diagram +#[doc = mermaid!("../../../doc/flow/sort.mmd")] +/// +/// # Sequence Diagram +#[doc = mermaid!("../../../doc/flow/sort.seq.mmd" framed)] +/// +/// # Examples +/// +/// ## Using the block in a system +/// +/// ```rust +/// # use protoflow_blocks::*; +/// # fn main() { +/// System::build(|s| { +/// let stdin = s.read_stdin(); +/// let sort = s.sort(); +/// s.connect(&stdin.output, &sort.input); +/// }); +/// # } +/// ``` +/// +/// ## Running the block via the CLI +/// +/// ```console +/// $ protoflow execute Sort +/// ``` +/// +#[derive(Block, Clone)] +pub struct Sort { + /// The input message stream. + #[input] + pub input: InputPort, + + /// The output message stream. + #[output] + pub output: OutputPort, + + /// The internal state storing the messages received. + #[state] + messages: Vec, +} + +impl Sort { + pub fn new(input: InputPort, output: OutputPort) -> Self { + Self { + input, + output, + messages: Vec::new(), + } + } + + pub fn messages(&self) -> &Vec { + &self.messages + } +} + +impl Sort { + pub fn with_system(system: &System) -> Self { + use crate::SystemBuilding; + Self::new(system.input(), system.output()) + } +} + +impl Block for Sort { + fn execute(&mut self, _runtime: &dyn BlockRuntime) -> BlockResult { + while let Some(message) = self.input.recv()? { + self.messages.push(message); + } + + info!("Sorting messages"); + self.messages.sort_by(|x, y| { + if let Some(ordering) = x.partial_cmp(y) { + ordering + } else { + error!("Incomparable values: {:?} and {:?}", x, y); + Ordering::Equal + } + }); + + for message in self.messages.drain(..) { + self.output.send(&message)?; + } + + Ok(()) + } +} + +#[cfg(feature = "std")] +impl StdioSystem for Sort { + fn build_system(config: StdioConfig) -> Result { + use crate::SystemBuilding; + + config.reject_any()?; + + Ok(System::build(|s| { + let stdin = config.read_stdin(s); + let sort = s.block(Sort::new(s.input(), s.output())); + s.connect(&stdin.output, &sort.input); + })) + } +} + +#[cfg(test)] +mod tests { + use super::Sort; + use crate::{System, SystemBuilding}; + + #[test] + fn instantiate_block() { + // Check that the block is constructible: + let _ = System::build(|s| { + let _ = s.block(Sort::::new(s.input(), s.output())); + }); + } + + #[test] + #[ignore = "requires stdin"] + fn run_sort_stdout() { + use super::*; + use crate::SysBlocks; + use protoflow_core::{error, SystemBuilding}; + + if let Err(e) = System::run(|s| { + let stdin = s.read_stdin(); + let sort = s.block(Sort::new(s.input(), s.output())); + s.connect(&stdin.output, &sort.input); + + let stdout_1 = s.write_stdout(); + s.connect(&sort.output, &stdout_1.input); + }) { + error!("{}", e) + } + } +} diff --git a/lib/protoflow-blocks/src/blocks/flow/split.rs b/lib/protoflow-blocks/src/blocks/flow/split.rs new file mode 100644 index 00000000..8175325e --- /dev/null +++ b/lib/protoflow-blocks/src/blocks/flow/split.rs @@ -0,0 +1,171 @@ +// This is free and unencumbered software released into the public domain. + +use crate::{FlowBlocks, StdioConfig, StdioError, StdioSystem, SysBlocks, System}; +use protoflow_core::{ + types::Any, Block, BlockResult, BlockRuntime, InputPort, Message, OutputPort, +}; +use protoflow_derive::Block; +use simple_mermaid::mermaid; + +/// Divides a single input message stream into multiple output streams using a round-robin approach. +/// +/// # Block Diagram +#[doc = mermaid!("../../../doc/flow/split.mmd")] +/// +/// # Sequence Diagram +#[doc = mermaid!("../../../doc/flow/split.seq.mmd" framed)] +/// +/// # Examples +/// +/// ## Using the block in a system +/// +/// ```rust +/// # use protoflow_blocks::*; +/// # fn main() { +/// System::build(|s| { +/// let stdin = s.read_stdin(); +/// let split = s.split(); +/// s.connect(&stdin.output, &split.input); +/// let stdout_1 = s.write_stdout(); +/// s.connect(&split.output_1, &stdout_1.input); +/// let stdout_2 = s.write_stdout(); +/// s.connect(&split.output_2, &stdout_2.input); +/// }); +/// # } +/// ``` +/// +/// ## Running the block via the CLI +/// +/// ```console +/// $ protoflow execute Split +/// ``` +/// +#[derive(Block, Clone)] +pub struct Split { + /// The input message stream. + #[input] + pub input: InputPort, + /// The output message stream + #[output] + pub output_1: OutputPort, + /// The output message stream + #[output] + pub output_2: OutputPort, + /// The internal state for keeping total number of messages + #[state] + pub message_count: u128, +} + +impl Split { + pub fn new(input: InputPort, output_1: OutputPort, output_2: OutputPort) -> Self { + Self { + input, + output_1, + output_2, + message_count: 0, + } + } +} +impl Split { + pub fn with_system(system: &System) -> Self { + use crate::SystemBuilding; + Self::new(system.input(), system.output(), system.output()) + } +} + +impl Block for Split { + fn execute(&mut self, runtime: &dyn BlockRuntime) -> BlockResult { + runtime.wait_for(&self.output_1)?; + runtime.wait_for(&self.output_2)?; + while let Some(message) = self.input.recv()? { + match self.message_count % 2 { + 0 => self.output_1.send(&message)?, + 1 => self.output_2.send(&message)?, + _ => {} + } + self.message_count += 1; + } + Ok(()) + } +} + +#[cfg(feature = "std")] +impl StdioSystem for Split { + fn build_system(config: StdioConfig) -> Result { + use crate::SystemBuilding; + config.reject_any()?; + + Ok(System::build(|s| { + let stdin = s.read_stdin(); + + let split = s.split(); + s.connect(&stdin.output, &split.input); + + let stdout_1 = s.write_stdout(); + s.connect(&split.output_1, &stdout_1.input); + + let stdout_2 = s.write_stdout(); + s.connect(&split.output_2, &stdout_2.input); + })) + } +} + +#[cfg(test)] +mod split_tests { + use crate::{CoreBlocks, FlowBlocks, SysBlocks, System}; + use protoflow_core::{error, prelude::String}; + extern crate std; + + #[test] + fn instantiate_block() { + // Check that the block is constructible: + let _ = System::build(|s| { + let _ = s.split::(); + }); + } + + #[test] + #[ignore = "requires stdin"] + fn run_split_stdout_and_file() { + use super::*; + use protoflow_core::SystemBuilding; + if let Err(e) = System::run(|s| { + let stdin = s.read_stdin(); + let split = s.split(); + s.connect(&stdin.output, &split.input); + + let stdout_1 = s.write_stdout(); + s.connect(&split.output_1, &stdout_1.input); + + let file = s.const_string("text.txt"); + let write_file = s.write_file().with_flags(crate::WriteFlags { + create: true, + append: true, + }); + s.connect(&file.output, &write_file.path); + s.connect(&split.output_2, &write_file.input); + }) { + error!("{}", e) + } + } + + #[test] + #[ignore = "requires stdin"] + fn run_split_to_stdout() { + use protoflow_core::SystemBuilding; + if let Err(e) = System::run(|s| { + let stdin = s.read_stdin(); + + let split = s.split(); + s.connect(&stdin.output, &split.input); + + let stdout_1 = s.write_stdout(); + s.connect(&split.output_1, &stdout_1.input); + + let stdout_2 = s.write_stdout(); + s.connect(&split.output_2, &stdout_2.input); + }) { + error!("{}", e) + } + } +} diff --git a/lib/protoflow-blocks/src/lib.rs b/lib/protoflow-blocks/src/lib.rs index 41d1decd..7d44d869 100644 --- a/lib/protoflow-blocks/src/lib.rs +++ b/lib/protoflow-blocks/src/lib.rs @@ -60,6 +60,14 @@ pub fn build_stdio_system( "Drop" => Drop::::build_system(config)?, "Random" => Random::::build_system(config)?, // FlowBlocks + "Batch" => Batch::::build_system(config)?, + "Concat" => Concat::::build_system(config)?, + "Distinct" => Distinct::::build_system(config)?, + "MapInto" => MapInto::::build_system(config)?, + "Merge" => Merge::::build_system(config)?, + "Replicate" => Replicate::::build_system(config)?, + "Sort" => Sort::::build_system(config)?, + "Split" => Split::::build_system(config)?, // HashBlocks #[cfg(any( feature = "hash-blake3", diff --git a/lib/protoflow-blocks/src/system.rs b/lib/protoflow-blocks/src/system.rs index 8d0da65a..633bb065 100644 --- a/lib/protoflow-blocks/src/system.rs +++ b/lib/protoflow-blocks/src/system.rs @@ -5,10 +5,11 @@ use crate::{ prelude::{fmt, Arc, Box, Bytes, FromStr, Rc, String, ToString}, types::{DelayType, Encoding}, - AllBlocks, Buffer, ConcatStrings, Const, CoreBlocks, Count, Decode, DecodeCsv, DecodeHex, - DecodeJson, Delay, Drop, Encode, EncodeCsv, EncodeHex, EncodeJson, FlowBlocks, HashBlocks, - IoBlocks, MathBlocks, Random, ReadDir, ReadEnv, ReadFile, ReadStdin, SplitString, SysBlocks, - TextBlocks, WriteFile, WriteStderr, WriteStdout, + AllBlocks, Batch, Buffer, Concat, ConcatStrings, Const, CoreBlocks, Count, Decode, DecodeCsv, + DecodeHex, DecodeJson, Delay, Distinct, Drop, Encode, EncodeCsv, EncodeHex, EncodeJson, + FlowBlocks, HashBlocks, IoBlocks, MapInto, MathBlocks, Merge, Random, ReadDir, ReadEnv, + ReadFile, ReadStdin, Replicate, Sort, Split, SplitString, SysBlocks, TextBlocks, WriteFile, + WriteStderr, WriteStdout, }; #[cfg(all(feature = "std", feature = "serde"))] use crate::{ReadSocket, WriteSocket}; @@ -178,7 +179,41 @@ impl CoreBlocks for System { } } -impl FlowBlocks for System {} +impl FlowBlocks for System { + fn batch + 'static>(&mut self, batch_size: usize) -> Batch { + self.0 + .block(Batch::::with_system(self, Some(batch_size))) + } + fn concat + 'static>(&mut self) -> Concat { + self.0.block(Concat::::with_system(self)) + } + + fn distinct + PartialEq + 'static>(&mut self) -> Distinct { + self.0.block(Distinct::::with_system(self)) + } + + fn map_into + 'static, Output: Message + 'static>( + &mut self, + ) -> MapInto { + self.0.block(MapInto::::with_system(self)) + } + + fn merge + 'static>(&mut self) -> Merge { + self.0.block(Merge::::with_system(self)) + } + + fn replicate + 'static>(&mut self) -> Replicate { + self.0.block(Replicate::::with_system(self)) + } + + fn sort + PartialOrd + 'static>(&mut self) -> Sort { + self.0.block(Sort::::with_system(self)) + } + + fn split + 'static>(&mut self) -> Split { + self.0.block(Split::::with_system(self)) + } +} #[cfg(not(any( feature = "hash-blake3", diff --git a/lib/protoflow-core/src/comparable_any.rs b/lib/protoflow-core/src/comparable_any.rs new file mode 100644 index 00000000..9e9e4847 --- /dev/null +++ b/lib/protoflow-core/src/comparable_any.rs @@ -0,0 +1,87 @@ +// This is free and unencumbered software released into the public domain. + +use core::{ + cmp::Ordering, + ops::{Deref, DerefMut}, +}; + +use crate::{prelude::prost_types::Any, Message}; + +#[derive(Debug, Clone, Default)] +pub struct ComparableAny(pub Any); + +impl PartialOrd for ComparableAny { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.0.value.cmp(&other.0.value)) + } +} + +impl PartialEq for ComparableAny { + fn eq(&self, other: &Self) -> bool { + self.0.value == other.0.value + } +} + +impl From for ComparableAny { + fn from(any: Any) -> Self { + ComparableAny(any) + } +} + +impl From for Any { + fn from(comparable: ComparableAny) -> Self { + comparable.0 + } +} + +impl AsRef for ComparableAny { + fn as_ref(&self) -> &Any { + &self.0 + } +} + +impl AsMut for ComparableAny { + fn as_mut(&mut self) -> &mut Any { + &mut self.0 + } +} + +impl Deref for ComparableAny { + type Target = Any; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for ComparableAny { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl prost::Message for ComparableAny { + fn encode_raw(&self, buf: &mut impl bytes::BufMut) { + self.0.encode_raw(buf); + } + + fn merge_field( + &mut self, + tag: u32, + wire_type: prost::encoding::WireType, + buf: &mut impl bytes::Buf, + ctx: prost::encoding::DecodeContext, + ) -> Result<(), prost::DecodeError> { + self.0.merge_field(tag, wire_type, buf, ctx) + } + + fn encoded_len(&self) -> usize { + self.0.encoded_len() + } + + fn clear(&mut self) { + self.0.clear(); + } +} + +impl Message for ComparableAny {} diff --git a/lib/protoflow-core/src/lib.rs b/lib/protoflow-core/src/lib.rs index 5dba7a95..a6bf160a 100644 --- a/lib/protoflow-core/src/lib.rs +++ b/lib/protoflow-core/src/lib.rs @@ -25,6 +25,9 @@ pub use block_error::*; mod block_runtime; pub use block_runtime::*; +mod comparable_any; +pub use comparable_any::*; + mod function_block; pub use function_block::*;