From 938b6a7ce869ff7ef8a110eefdec3beb5ca34e11 Mon Sep 17 00:00:00 2001 From: Yura Menshov Date: Wed, 18 Dec 2024 13:11:58 +0200 Subject: [PATCH 1/4] rework Buffer block --- lib/protoflow-blocks/doc/core/buffer.mmd | 11 +++- lib/protoflow-blocks/doc/core/buffer.seq.mmd | 27 +++++++-- lib/protoflow-blocks/src/blocks/core.rs | 6 +- .../src/blocks/core/buffer.rs | 55 +++++++++++++------ 4 files changed, 75 insertions(+), 24 deletions(-) diff --git a/lib/protoflow-blocks/doc/core/buffer.mmd b/lib/protoflow-blocks/doc/core/buffer.mmd index 4e43746e..c64a9777 100644 --- a/lib/protoflow-blocks/doc/core/buffer.mmd +++ b/lib/protoflow-blocks/doc/core/buffer.mmd @@ -1,9 +1,16 @@ block-beta - columns 4 - Source space:2 Buffer + columns 7 + Source space:2 Count space:2 Sink + space:7 + space:7 + space:3 Pulse space:3 Source-- "input" -->Buffer + Pulse-- "trigger" -->Buffer + Buffer-- "output" -->Sink classDef block height:48px,padding:8px; classDef hidden visibility:none; class Buffer block class Source hidden + class Sink hidden + class Pulse hidden diff --git a/lib/protoflow-blocks/doc/core/buffer.seq.mmd b/lib/protoflow-blocks/doc/core/buffer.seq.mmd index cb4b1779..59031aa9 100644 --- a/lib/protoflow-blocks/doc/core/buffer.seq.mmd +++ b/lib/protoflow-blocks/doc/core/buffer.seq.mmd @@ -1,15 +1,32 @@ sequenceDiagram autonumber - participant BlockA as Another block + participant BlockA as Another block (input source) participant Buffer.input as Buffer.input port participant Buffer as Buffer block + participant Buffer.output as Buffer.output port + participant BlockB as Another block (downstream sink) + participant Buffer.trigger as Buffer.trigger port + participant BlockC as Another block (trigger source) - BlockA-->>Buffer: Connect + BlockA-->>Buffer: Connect (input) + BlockC-->>Buffer: Connect (trigger) + Buffer-->>BlockB: Connect (output) - loop Buffer process + loop Storing messages BlockA->>Buffer: Message - Buffer->>Buffer: Store message + Buffer->>Buffer: Store message internally end - BlockA-->>Buffer: Disconnect + BlockC->>Buffer: Trigger + loop Releasing messages + Buffer->>BlockB: Stored Message + end + + BlockA-->>Buffer: Disconnect (input) Buffer-->>Buffer.input: Close + + BlockC-->>Buffer: Disconnect (trigger) + Buffer-->>Buffer.trigger: Close + + Buffer-->>BlockB: Disconnect (output) + Buffer-->>Buffer.output: Close diff --git a/lib/protoflow-blocks/src/blocks/core.rs b/lib/protoflow-blocks/src/blocks/core.rs index f6f4e760..5675f1dd 100644 --- a/lib/protoflow-blocks/src/blocks/core.rs +++ b/lib/protoflow-blocks/src/blocks/core.rs @@ -119,7 +119,11 @@ pub mod core { use super::SystemBuilding; use CoreBlockConfig::*; match self { - Buffer { .. } => Box::new(super::Buffer::new(system.input_any())), // TODO: Buffer::with_system(system) + Buffer { .. } => Box::new(super::Buffer::<_, ()>::new( + system.input_any(), + system.input(), + system.output_any(), + )), // TODO: Buffer::with_system(system) Const { value, .. } => Box::new(super::Const::with_system(system, value.clone())), Count { .. } => Box::new(super::Count::new( system.input_any(), diff --git a/lib/protoflow-blocks/src/blocks/core/buffer.rs b/lib/protoflow-blocks/src/blocks/core/buffer.rs index aef0fc9d..8334537b 100644 --- a/lib/protoflow-blocks/src/blocks/core/buffer.rs +++ b/lib/protoflow-blocks/src/blocks/core/buffer.rs @@ -1,7 +1,9 @@ // This is free and unencumbered software released into the public domain. -use crate::{prelude::VecDeque, StdioConfig, StdioError, StdioSystem, System}; -use protoflow_core::{types::Any, Block, BlockResult, BlockRuntime, InputPort, Message}; +use crate::{prelude::Vec, StdioConfig, StdioError, StdioSystem, System}; +use protoflow_core::{ + types::Any, Block, BlockResult, BlockRuntime, InputPort, Message, OutputPort, +}; use protoflow_derive::Block; use simple_mermaid::mermaid; @@ -35,47 +37,68 @@ use simple_mermaid::mermaid; /// ``` /// #[derive(Block, Clone)] -pub struct Buffer { +pub struct Buffer { /// The input message stream. #[input] - pub input: InputPort, + pub input: InputPort, + + /// The trigger port. + #[input] + pub trigger: InputPort, + + /// The output message stream. + #[output] + pub output: OutputPort, /// The internal state storing the messages received. #[state] - messages: VecDeque, + messages: Vec, } -impl Buffer { - pub fn new(input: InputPort) -> Self { +impl Buffer { + pub fn new( + input: InputPort, + trigger: InputPort, + output: OutputPort, + ) -> Self { Self { input, - messages: VecDeque::new(), + trigger, + output, + messages: Vec::new(), } } - pub fn messages(&self) -> &VecDeque { + pub fn messages(&self) -> &Vec { &self.messages } } -impl Buffer { +impl Buffer { pub fn with_system(system: &System) -> Self { use crate::SystemBuilding; - Self::new(system.input()) + Self::new(system.input(), system.input(), system.output()) } } -impl Block for Buffer { - fn execute(&mut self, _runtime: &dyn BlockRuntime) -> BlockResult { +impl Block for Buffer { + fn execute(&mut self, _: &dyn BlockRuntime) -> BlockResult { while let Some(message) = self.input.recv()? { - self.messages.push_back(message); + self.messages.push(message); } + + while let Some(_) = self.trigger.recv()? { + for message in &self.messages { + self.output.send(message)?; + } + } + Ok(()) } } #[cfg(feature = "std")] -impl StdioSystem for Buffer { +impl StdioSystem for Buffer { fn build_system(config: StdioConfig) -> Result { use crate::{CoreBlocks, SystemBuilding}; @@ -98,7 +121,7 @@ mod tests { fn instantiate_block() { // Check that the block is constructible: let _ = System::build(|s| { - let _ = s.block(Buffer::::new(s.input())); + let _ = s.block(Buffer::::new(s.input(), s.input(), s.output())); }); } } From 63d3ae624a39017afc1448676f15af4e3b5dff7c Mon Sep 17 00:00:00 2001 From: Yura Menshov Date: Wed, 18 Dec 2024 13:30:50 +0200 Subject: [PATCH 2/4] fix description --- lib/protoflow-blocks/src/blocks/core/buffer.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/lib/protoflow-blocks/src/blocks/core/buffer.rs b/lib/protoflow-blocks/src/blocks/core/buffer.rs index 8334537b..ffda13f0 100644 --- a/lib/protoflow-blocks/src/blocks/core/buffer.rs +++ b/lib/protoflow-blocks/src/blocks/core/buffer.rs @@ -7,7 +7,11 @@ use protoflow_core::{ use protoflow_derive::Block; use simple_mermaid::mermaid; -/// A block that simply stores all messages it receives. +/// A block that stores all messages it receives, +/// and sends them downstream when triggered. +/// +/// When triggered, the block will send all messages it received so far, +/// _WITHOUT_ clearing the internal buffer. /// /// # Block Diagram #[doc = mermaid!("../../../doc/core/buffer.mmd")] From b6b0fb3fca09ba65459a50eccbce8a305cbb4a8f Mon Sep 17 00:00:00 2001 From: Yura Menshov Date: Thu, 19 Dec 2024 12:39:42 +0200 Subject: [PATCH 3/4] fix CoreBlocks trait --- lib/protoflow-blocks/src/blocks/core.rs | 4 +++- lib/protoflow-blocks/src/blocks/core/buffer.rs | 4 ++-- lib/protoflow-blocks/src/lib.rs | 2 +- lib/protoflow-blocks/src/system.rs | 6 ++++-- 4 files changed, 10 insertions(+), 6 deletions(-) diff --git a/lib/protoflow-blocks/src/blocks/core.rs b/lib/protoflow-blocks/src/blocks/core.rs index 5675f1dd..aec6f4be 100644 --- a/lib/protoflow-blocks/src/blocks/core.rs +++ b/lib/protoflow-blocks/src/blocks/core.rs @@ -12,7 +12,9 @@ pub mod core { use protoflow_core::{Block, Message}; pub trait CoreBlocks { - fn buffer + 'static>(&mut self) -> Buffer; + fn buffer + 'static, Trigger: Message + 'static>( + &mut self, + ) -> Buffer; fn const_bytes>(&mut self, value: T) -> Const; diff --git a/lib/protoflow-blocks/src/blocks/core/buffer.rs b/lib/protoflow-blocks/src/blocks/core/buffer.rs index ffda13f0..f0b82919 100644 --- a/lib/protoflow-blocks/src/blocks/core/buffer.rs +++ b/lib/protoflow-blocks/src/blocks/core/buffer.rs @@ -102,7 +102,7 @@ impl Block for Buffer { } #[cfg(feature = "std")] -impl StdioSystem for Buffer { +impl StdioSystem for Buffer { fn build_system(config: StdioConfig) -> Result { use crate::{CoreBlocks, SystemBuilding}; @@ -110,7 +110,7 @@ impl StdioSystem for Buffer { Ok(System::build(|s| { let stdin = config.read_stdin(s); - let buffer = s.buffer(); + let buffer = s.buffer::<_, ()>(); s.connect(&stdin.output, &buffer.input); })) } diff --git a/lib/protoflow-blocks/src/lib.rs b/lib/protoflow-blocks/src/lib.rs index 41d1decd..6753517e 100644 --- a/lib/protoflow-blocks/src/lib.rs +++ b/lib/protoflow-blocks/src/lib.rs @@ -53,7 +53,7 @@ pub fn build_stdio_system( use prelude::String; Ok(match system_name.as_ref() { // CoreBlocks - "Buffer" => Buffer::::build_system(config)?, + "Buffer" => Buffer::build_system(config)?, "Const" => Const::::build_system(config)?, "Count" => Count::::build_system(config)?, "Delay" => Delay::::build_system(config)?, diff --git a/lib/protoflow-blocks/src/system.rs b/lib/protoflow-blocks/src/system.rs index 1ac658c0..18838f99 100644 --- a/lib/protoflow-blocks/src/system.rs +++ b/lib/protoflow-blocks/src/system.rs @@ -131,8 +131,10 @@ impl SystemBuilding for System { impl AllBlocks for System {} impl CoreBlocks for System { - fn buffer + 'static>(&mut self) -> Buffer { - self.0.block(Buffer::::with_system(self)) + fn buffer + 'static, Trigger: Message + 'static>( + &mut self, + ) -> Buffer { + self.0.block(Buffer::::with_system(self)) } fn const_bytes>(&mut self, value: T) -> Const { From d28052d1e2fa8513741dddd47c87bd6a8395ff2b Mon Sep 17 00:00:00 2001 From: Yura Menshov Date: Thu, 19 Dec 2024 13:13:18 +0200 Subject: [PATCH 4/4] change to proper example --- lib/protoflow-blocks/src/blocks/core/buffer.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/lib/protoflow-blocks/src/blocks/core/buffer.rs b/lib/protoflow-blocks/src/blocks/core/buffer.rs index f0b82919..e171c5f8 100644 --- a/lib/protoflow-blocks/src/blocks/core/buffer.rs +++ b/lib/protoflow-blocks/src/blocks/core/buffer.rs @@ -28,8 +28,14 @@ use simple_mermaid::mermaid; /// # fn main() { /// System::build(|s| { /// let stdin = s.read_stdin(); +/// let hello = s.const_string("Hello, World!"); +/// let encode = s.encode_lines(); /// let buffer = s.buffer(); -/// s.connect(&stdin.output, &buffer.input); +/// let stdout = s.write_stdout(); +/// s.connect(&hello.output, &encode.input); +/// s.connect(&encode.output, &buffer.input); +/// s.connect(&stdin.output, &buffer.trigger); +/// s.connect(&buffer.output, &stdout.input); /// }); /// # } /// ```