diff --git a/CMakeLists.txt b/CMakeLists.txt index 2ac0fa12c9..2d1cca1612 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -273,6 +273,7 @@ set(TARGET_CORE_SOURCES src/device/CalibrationHandler.cpp src/device/Version.cpp src/pipeline/Pipeline.cpp + src/pipeline/PipelineStateApi.cpp src/pipeline/AssetManager.cpp src/pipeline/MessageQueue.cpp src/pipeline/Node.cpp @@ -281,6 +282,8 @@ set(TARGET_CORE_SOURCES src/pipeline/ThreadedHostNode.cpp src/pipeline/DeviceNode.cpp src/pipeline/DeviceNodeGroup.cpp + src/pipeline/node/internal/PipelineEventAggregation.cpp + src/pipeline/node/internal/PipelineStateMerge.cpp src/pipeline/node/internal/XLinkIn.cpp src/pipeline/node/internal/XLinkOut.cpp src/pipeline/node/ColorCamera.cpp @@ -356,6 +359,9 @@ set(TARGET_CORE_SOURCES src/pipeline/datatype/PointCloudConfig.cpp src/pipeline/datatype/ObjectTrackerConfig.cpp src/pipeline/datatype/PointCloudData.cpp + src/pipeline/datatype/PipelineEvent.cpp + src/pipeline/datatype/PipelineState.cpp + src/pipeline/datatype/PipelineEventAggregationConfig.cpp src/pipeline/datatype/RGBDData.cpp src/pipeline/datatype/MessageGroup.cpp src/pipeline/datatype/ThermalConfig.cpp @@ -372,6 +378,8 @@ set(TARGET_CORE_SOURCES src/utility/Initialization.cpp src/utility/Resources.cpp src/utility/Platform.cpp + src/utility/PipelineEventDispatcher.cpp + src/utility/PipelineImplHelper.cpp src/utility/RecordReplay.cpp src/utility/McapImpl.cpp src/utility/Environment.cpp diff --git a/PipelineDebugging.md b/PipelineDebugging.md new file mode 100644 index 0000000000..899e886830 --- /dev/null +++ b/PipelineDebugging.md @@ -0,0 +1,211 @@ +# Pipeline Debugging + +## Notes + +- Pipeline debugging on RVC2 is very limited compared to RVC4. +- States are calculated for individual nodes, not for node groups (e.g. instead of the `DetectionNetwork` state, `NeuralNetwork` and `DetectionParser` states are calculated). + +## API + +The pipeline state can be retrieved from the pipeline object by calling the `getPipelineState()` method which exposes methods for selecting which data to retrieve: + +```python +pipelineState = pipeline.getPipelineState().nodes().detailed() +``` + +The method `nodes()` takes an optional argument to select which nodes to include in the state by their IDs. If no argument or multiple IDs are provided, the following methods can be used to select the desired data: + +- `summary() -> PipelineState`: returns data pertaining the main loop and I\O blocks only. Statistics for individual inputs, outputs and other timings are not included. +- `detailed() -> PipelineState`: returns all data including statistics for individual inputs, outputs and other timings. +- `outputs() -> map(NodeId, map(str, OutputQueueState))`: returns state of all outputs per node. +- `inputs() -> map(NodeId, map(str, InputQueueState))`: returns state of all inputs per node. +- `otherTimings() -> map(NodeId, map(str, Timing))`: returns other timing statistics per node. + +Specifying a single node ID to the `nodes()` method allows further filtering of the data: + +- `summary() -> NodeState`: returns summary data for the specified node similarly to above. +- `detailed() -> NodeState`: returns detailed data for the specified node similarly to above. +- `outputs() -> map(str, OutputQueueState)`: returns states of all or specific outputs of the node. If only one output name is provided, the return type is `OutputQueueState`. +- `inputs() -> map(str, InputQueueState)`: returns states of all or specific inputs of the node. If only one input name is provided, the return type is `InputQueueState`. +- `otherTimings() -> map(str, Timing)`: returns other timing statistics of the node. If only one timing name is provided, the return type is `Timing`. + +## Pipeline Events + +### Class + +```cpp +class PipelineEvent : public Buffer { + public: + enum class Type : std::int32_t { + CUSTOM = 0, + LOOP = 1, + INPUT = 2, + OUTPUT = 3, + INPUT_BLOCK = 4, + OUTPUT_BLOCK = 5, + }; + enum class Interval : std::int32_t { NONE = 0, START = 1, END = 2 }; + + int64_t nodeId = -1; + int32_t status = 0; + std::optional queueSize; + Interval interval = Interval::NONE; + Type type = Type::CUSTOM; + std::string source; +}; +``` + +### Description + +`PipelineEvent` can have different types depending on what kind of event is being reported: + +- `CUSTOM` events can be defined by the node developers to report and track relevant information (e.g. timing of specific operations). These need to be manually added. +- `LOOP` events track the main processing loop timings. They can only track one loop per node. These can be generated by using the `mainLoop` method in the main `while` loop for simple nodes, or by using tracked events in more complex threaded nodes. +- `INPUT` events track input queue operations and states. These are automatically added. +- `OUTPUT` events track output queue operations and states. These are automatically added. +- `INPUT_BLOCK` events track a group of input operations. Only one input block can be tracked per node. These need to be manually added. +- `OUTPUT_BLOCK` events track a group of output operations. Only one output block can be tracked per node. These need to be manually added. + +The `PipelineEvent` also contains the source node ID, the status code used to indicate success or failure of the operation (for `tryGet` and `trySend`), the queue size (if applicable - inputs only), the source or name of the event, the timestamp, the sequence number, and the interval (start, end or none). The interval is none when the end and start events are the same (e.g. simple main loop). + +Pipeline events are generated using the `PipelineEventDispatcher` of a node. They can be created manually by using the `startEvent` and `endEvent` methods or by using the `BlockPipelineEvent` helper class (created using the `blockEvent` method) to automatically create start (in constructor) and end (in destructor) events for a block of code. + +## Pipeline State + +The pipeline state contains a map of `NodeState` objects by node ID. + +### NodeState Class + +```cpp +class NodeState { + public: + struct DurationEvent { + PipelineEvent startEvent; + uint64_t durationUs; + }; + struct TimingStats { + uint64_t minMicros = -1; + uint64_t maxMicros = 0; + uint64_t averageMicrosRecent = 0; + uint64_t stdDevMicrosRecent = 0; + uint64_t minMicrosRecent = -1; + uint64_t maxMicrosRecent = 0; + uint64_t medianMicrosRecent = 0; + }; + struct Timing { + float fps = 0.0f; + TimingStats durationStats; + }; + struct QueueStats { + uint32_t maxQueued = 0; + uint32_t minQueuedRecent = 0; + uint32_t maxQueuedRecent = 0; + uint32_t medianQueuedRecent = 0; + }; + struct InputQueueState { + enum class State : std::int32_t { + IDLE = 0, + WAITING = 1, + BLOCKED = 2 + } state = State::IDLE; + uint32_t numQueued = 0; + Timing timing; + QueueStats queueStats; + }; + struct OutputQueueState { + enum class State : std::int32_t { IDLE = 0, SENDING = 1 } state = State::IDLE; + Timing timing; + }; + enum class State : std::int32_t { IDLE = 0, GETTING_INPUTS = 1, PROCESSING = 2, SENDING_OUTPUTS = 3 }; + + State state = State::IDLE; + std::vector events; + std::unordered_map outputStates; + std::unordered_map inputStates; + Timing inputsGetTiming; + Timing outputsSendTiming; + Timing mainLoopTiming; + std::unordered_map otherTimings; +}; +``` + +### NodeState Description + +The `NodeState` class contains information about the state of the node, optional list of recent events (the number of events stored is limited), output and input queue states, timings for getting inputs, sending outputs, main loop timing, and other timings added by the node developer. + +#### Node State + +The node can be in one of the following states: + +- `IDLE`: the node is not currently processing anything. This is only possible before the node has entered its main loop. +- `GETTING_INPUTS`: the node is currently trying to get inputs (in the input block). +- `PROCESSING`: the node is currently processing data (it is not in the input or the output block). +- `SENDING_OUTPUTS`: the node is currently trying to send outputs (in the output block). + +#### Duration Event + +The `DurationEvent` merges a start and end `PipelineEvent` by storing the start event and the calculated duration in microseconds. + +#### Timing + +The `Timing` struct contains the calculated frames per second and the duration statistics in microseconds: + +- `minMicros`: minimum duration recorded. +- `maxMicros`: maximum duration recorded. +- `averageMicrosRecent`: average duration over recent events. +- `stdDevMicrosRecent`: standard deviation of duration over recent events. +- `minMicrosRecent`: minimum duration over recent events. +- `maxMicrosRecent`: maximum duration over recent events. +- `medianMicrosRecent`: median duration over recent events. + +The timing is invalid if the minimum duration is larger than the maximum duration. This struct provides an `isValid()` method to check for validity. + +#### Queue Stats + +The `QueueStats` struct contains statistics about the queue sizes: + +- `maxQueued`: maximum number of input messages queued. +- `minQueuedRecent`: minimum number of input messages queued over recent events. +- `maxQueuedRecent`: maximum number of input messages queued over recent events. +- `medianQueuedRecent`: median number of input messages queued over recent events. + +#### Input Queue State + +The `InputQueueState` struct contains the current number of queued messages, the timing information, the queue statistics and the current state of the input: + +- `IDLE`: the input is waiting to get data and there are no outputs waiting to send data. +- `WAITING`: the input is waiting for a message to arrive (the input queue is empty). +- `BLOCKED`: the input queue is full and an output is attempting to send data. + +#### Output Queue State + +The `OutputQueueState` struct contains the timing information and the current state of the output: + +- `IDLE`: the output is not currently sending data. +- `SENDING`: the output is currently sending data. If the output is blocked due to a full queue on the input the state will be `SENDING`, otherwise the send should be instantaneous and the state will return to `IDLE`. + +## Operation Overview + +### Schema + +![Pipeline Debugging Graph](./images/pipeline_debugging_graph.png) + +### Description + +Each node in the pipeline has a `pipelineEventOutput` output that emits `PipelineEvent` events related to the node's operation. These events are created and sent using the `PipelineEventDispatcher` object in each node. The event output is linked to one of the `PipelineEventAggregation` nodes, depending on where the node is running (by default events do not get sent from device to host, it is however possible to subscribe to the events of a node by simply creating an output queue). + +The `PipelineEventAggregation` node collects events from the nodes running on the same device and merges them into a `PipelineState` object by calculating duration statistics, events per second, and various states. The state is then sent to the `StateMerge` node which runs on host and merges device and host states into a single `PipelineState` object. + +## Pipeline Event Aggregation + +The `PipelineEventAggregation` node collects `PipelineEvent` events in a separate thread. To improve performance, it does not update the state for every event, but rather collects events into a buffer and processes them in batches at a fixed interval. The node can be configured to adjust the processing interval and the maximum number of stored recent events per node. + +The events are processed by their type and source in pairs (start and end). Interval events (where the interval is not `NONE`) are matched by their sequence numbers. Ping events (where the interval is `NONE`) are matched by looking for the event with with the previous sequence number. + +The state is filtered before sending according to the input config. By default, the node waits for a request before sending the state. If the `repeat` flag in the config is set to true, the state is sent at regular intervals. + +## State Merge + +The `StateMerge` node collects `PipelineState` objects from multiple `PipelineEventAggregation` nodes and merges them into a single `PipelineState` object. The merging is done by combining the node states from each device into a single map of node states - it is expected that node IDs are unique. + +The node waits for the input config and forwards it to connected `PipelineEventAggregation` nodes. The returned states are matched by the config sequence number to ensure that they correspond to the same request. diff --git a/bindings/python/CMakeLists.txt b/bindings/python/CMakeLists.txt index b0b01eed73..434c10438c 100644 --- a/bindings/python/CMakeLists.txt +++ b/bindings/python/CMakeLists.txt @@ -98,7 +98,7 @@ set(SOURCE_LIST src/pipeline/node/UVCBindings.cpp src/pipeline/node/ToFBindings.cpp src/pipeline/node/PointCloudBindings.cpp - src/pipeline/node/SyncBindings.cpp + src/pipeline/node/PipelineEventAggregationBindings.cpp src/pipeline/node/MessageDemuxBindings.cpp src/pipeline/node/HostNodeBindings.cpp src/pipeline/node/RecordBindings.cpp @@ -139,6 +139,8 @@ set(SOURCE_LIST src/pipeline/datatype/PointCloudConfigBindings.cpp src/pipeline/datatype/ObjectTrackerConfigBindings.cpp src/pipeline/datatype/PointCloudDataBindings.cpp + src/pipeline/datatype/PipelineEventBindings.cpp + src/pipeline/datatype/PipelineStateBindings.cpp src/pipeline/datatype/TransformDataBindings.cpp src/pipeline/datatype/ImageAlignConfigBindings.cpp src/pipeline/datatype/ImgAnnotationsBindings.cpp @@ -152,6 +154,7 @@ set(SOURCE_LIST src/remote_connection/RemoteConnectionBindings.cpp src/utility/EventsManagerBindings.cpp + src/utility/PipelineEventDispatcherBindings.cpp ) if(DEPTHAI_MERGED_TARGET) list(APPEND SOURCE_LIST @@ -415,4 +418,4 @@ endif() ######################## if (DEPTHAI_PYTHON_ENABLE_EXAMPLES) add_subdirectory(../../examples/python ${CMAKE_BINARY_DIR}/examples/python) -endif() \ No newline at end of file +endif() diff --git a/bindings/python/src/DatatypeBindings.cpp b/bindings/python/src/DatatypeBindings.cpp index d5c92c840f..2661dbe2a3 100644 --- a/bindings/python/src/DatatypeBindings.cpp +++ b/bindings/python/src/DatatypeBindings.cpp @@ -31,6 +31,8 @@ void bind_tracklets(pybind11::module& m, void* pCallstack); void bind_benchmarkreport(pybind11::module& m, void* pCallstack); void bind_pointcloudconfig(pybind11::module& m, void* pCallstack); void bind_pointclouddata(pybind11::module& m, void* pCallstack); +void bind_pipelineevent(pybind11::module& m, void* pCallstack); +void bind_pipelinestate(pybind11::module& m, void* pCallstack); void bind_transformdata(pybind11::module& m, void* pCallstack); void bind_rgbddata(pybind11::module& m, void* pCallstack); void bind_imagealignconfig(pybind11::module& m, void* pCallstack); @@ -73,6 +75,8 @@ void DatatypeBindings::addToCallstack(std::deque& callstack) { callstack.push_front(bind_benchmarkreport); callstack.push_front(bind_pointcloudconfig); callstack.push_front(bind_pointclouddata); + callstack.push_front(bind_pipelineevent); + callstack.push_front(bind_pipelinestate); callstack.push_front(bind_transformdata); callstack.push_front(bind_imagealignconfig); callstack.push_front(bind_imageannotations); @@ -134,6 +138,8 @@ void DatatypeBindings::bind(pybind11::module& m, void* pCallstack) { .value("ImageAlignConfig", DatatypeEnum::ImageAlignConfig) .value("ImgAnnotations", DatatypeEnum::ImgAnnotations) .value("RGBDData", DatatypeEnum::RGBDData) + .value("PipelineEvent", DatatypeEnum::PipelineEvent) + .value("PipelineState", DatatypeEnum::PipelineState) .value("ImageFiltersConfig", DatatypeEnum::ImageFiltersConfig) .value("ToFDepthConfidenceFilterConfig", DatatypeEnum::ToFDepthConfidenceFilterConfig) .value("DynamicCalibrationControl", DatatypeEnum::DynamicCalibrationControl) diff --git a/bindings/python/src/pipeline/PipelineBindings.cpp b/bindings/python/src/pipeline/PipelineBindings.cpp index 5652ce0f18..c33aa6e2e6 100644 --- a/bindings/python/src/pipeline/PipelineBindings.cpp +++ b/bindings/python/src/pipeline/PipelineBindings.cpp @@ -1,4 +1,3 @@ - #include "PipelineBindings.hpp" #include @@ -66,6 +65,9 @@ void PipelineBindings::bind(pybind11::module& m, void* pCallstack) { py::class_ globalProperties(m, "GlobalProperties", DOC(dai, GlobalProperties)); py::class_ recordConfig(m, "RecordConfig", DOC(dai, RecordConfig)); py::class_ recordVideoConfig(recordConfig, "VideoEncoding", DOC(dai, RecordConfig, VideoEncoding)); + py::class_ pipelineStateApi(m, "PipelineStateApi", DOC(dai, PipelineStateApi)); + py::class_ nodesStateApi(m, "NodesStateApi", DOC(dai, NodesStateApi)); + py::class_ nodeStateApi(m, "NodeStateApi", DOC(dai, NodeStateApi)); py::class_ pipeline(m, "Pipeline", DOC(dai, Pipeline, 2)); /////////////////////////////////////////////////////////////////////// @@ -104,6 +106,59 @@ void PipelineBindings::bind(pybind11::module& m, void* pCallstack) { .def_readwrite("videoEncoding", &RecordConfig::videoEncoding, DOC(dai, RecordConfig, videoEncoding)) .def_readwrite("compressionLevel", &RecordConfig::compressionLevel, DOC(dai, RecordConfig, compressionLevel)); + pipelineStateApi.def("nodes", static_cast(&PipelineStateApi::nodes), DOC(dai, PipelineStateApi, nodes)) + .def("nodes", + static_cast&)>(&PipelineStateApi::nodes), + py::arg("nodeIds"), + DOC(dai, PipelineStateApi, nodes, 2)) + .def("nodes", + static_cast(&PipelineStateApi::nodes), + py::arg("nodeId"), + DOC(dai, PipelineStateApi, nodes, 3)); + + nodesStateApi.def("summary", &NodesStateApi::summary, DOC(dai, NodesStateApi, summary)) + .def("detailed", &NodesStateApi::detailed, DOC(dai, NodesStateApi, detailed)) + .def("outputs", &NodesStateApi::outputs, DOC(dai, NodesStateApi, outputs)) + .def("inputs", &NodesStateApi::inputs, DOC(dai, NodesStateApi, inputs)) + .def("otherTimings", &NodesStateApi::otherTimings, DOC(dai, NodesStateApi, otherTimings)); + + nodeStateApi.def("summary", &NodeStateApi::summary, DOC(dai, NodeStateApi, summary)) + .def("detailed", &NodeStateApi::detailed, DOC(dai, NodeStateApi, detailed)) + .def("outputs", + static_cast (NodeStateApi::*)()>(&NodeStateApi::outputs), + DOC(dai, NodeStateApi, outputs)) + .def("outputs", + static_cast (NodeStateApi::*)(const std::vector&)>( + &NodeStateApi::outputs), + py::arg("outputNames"), + DOC(dai, NodeStateApi, outputs, 2)) + .def("outputs", + static_cast(&NodeStateApi::outputs), + py::arg("outputName"), + DOC(dai, NodeStateApi, outputs, 3)) + .def("inputs", + static_cast (NodeStateApi::*)()>(&NodeStateApi::inputs), + DOC(dai, NodeStateApi, inputs)) + .def("inputs", + static_cast (NodeStateApi::*)(const std::vector&)>(&NodeStateApi::inputs), + py::arg("inputNames"), + DOC(dai, NodeStateApi, inputs, 2)) + .def("inputs", + static_cast(&NodeStateApi::inputs), + py::arg("inputName"), + DOC(dai, NodeStateApi, inputs, 3)) + .def("otherTimings", + static_cast (NodeStateApi::*)()>(&NodeStateApi::otherTimings), + DOC(dai, NodeStateApi, otherTimings)) + .def("otherTimings", + static_cast (NodeStateApi::*)(const std::vector&)>(&NodeStateApi::otherTimings), + py::arg("timingNames"), + DOC(dai, NodeStateApi, otherTimings, 2)) + .def("otherTimings", + static_cast(&NodeStateApi::otherTimings), + py::arg("timingName"), + DOC(dai, NodeStateApi, otherTimings)); + // bind pipeline pipeline .def(py::init([](bool createImplicitDevice) { @@ -257,6 +312,8 @@ void PipelineBindings::bind(pybind11::module& m, void* pCallstack) { .def("isRunning", &Pipeline::isRunning) .def("processTasks", &Pipeline::processTasks, py::arg("waitForTasks") = false, py::arg("timeoutSeconds") = -1.0) .def("enableHolisticRecord", &Pipeline::enableHolisticRecord, py::arg("recordConfig"), DOC(dai, Pipeline, enableHolisticRecord)) - .def("enableHolisticReplay", &Pipeline::enableHolisticReplay, py::arg("recordingPath"), DOC(dai, Pipeline, enableHolisticReplay)); + .def("enableHolisticReplay", &Pipeline::enableHolisticReplay, py::arg("recordingPath"), DOC(dai, Pipeline, enableHolisticReplay)) + .def("enablePipelineDebugging", &Pipeline::enablePipelineDebugging, py::arg("enable") = true, DOC(dai, Pipeline, enablePipelineDebugging)) + .def("getPipelineState", &Pipeline::getPipelineState, DOC(dai, Pipeline, getPipelineState)); ; } diff --git a/bindings/python/src/pipeline/datatype/PipelineEventBindings.cpp b/bindings/python/src/pipeline/datatype/PipelineEventBindings.cpp new file mode 100644 index 0000000000..23435044f5 --- /dev/null +++ b/bindings/python/src/pipeline/datatype/PipelineEventBindings.cpp @@ -0,0 +1,59 @@ +#include +#include + +#include "DatatypeBindings.hpp" +#include "pipeline/CommonBindings.hpp" + +// depthai +#include "depthai/pipeline/datatype/PipelineEvent.hpp" + +// pybind +#include +#include + +// #include "spdlog/spdlog.h" + +void bind_pipelineevent(pybind11::module& m, void* pCallstack) { + using namespace dai; + + py::class_, Buffer, std::shared_ptr> pipelineEvent(m, "PipelineEvent", DOC(dai, PipelineEvent)); + py::enum_ pipelineEventType(pipelineEvent, "Type", DOC(dai, PipelineEvent, Type)); + py::enum_ pipelineEventInterval(pipelineEvent, "Interval", DOC(dai, PipelineEvent, Interval)); + + /////////////////////////////////////////////////////////////////////// + /////////////////////////////////////////////////////////////////////// + /////////////////////////////////////////////////////////////////////// + // Call the rest of the type defines, then perform the actual bindings + Callstack* callstack = (Callstack*)pCallstack; + auto cb = callstack->top(); + callstack->pop(); + cb(m, pCallstack); + // Actual bindings + /////////////////////////////////////////////////////////////////////// + /////////////////////////////////////////////////////////////////////// + /////////////////////////////////////////////////////////////////////// + + pipelineEventType.value("CUSTOM", PipelineEvent::Type::CUSTOM) + .value("LOOP", PipelineEvent::Type::LOOP) + .value("INPUT", PipelineEvent::Type::INPUT) + .value("OUTPUT", PipelineEvent::Type::OUTPUT); + pipelineEventInterval.value("NONE", PipelineEvent::Interval::NONE) + .value("START", PipelineEvent::Interval::START) + .value("END", PipelineEvent::Interval::END); + + // Message + pipelineEvent.def(py::init<>()) + .def("__repr__", &PipelineEvent::str) + .def_readwrite("nodeId", &PipelineEvent::nodeId, DOC(dai, PipelineEvent, nodeId)) + .def_readwrite("status", &PipelineEvent::status, DOC(dai, PipelineEvent, status)) + .def_readwrite("queueSize", &PipelineEvent::queueSize, DOC(dai, PipelineEvent, queueSize)) + .def_readwrite("interval", &PipelineEvent::interval, DOC(dai, PipelineEvent, interval)) + .def_readwrite("type", &PipelineEvent::type, DOC(dai, PipelineEvent, type)) + .def_readwrite("source", &PipelineEvent::source, DOC(dai, PipelineEvent, source)) + .def("getTimestamp", &PipelineEvent::Buffer::getTimestamp, DOC(dai, Buffer, getTimestamp)) + .def("getTimestampDevice", &PipelineEvent::Buffer::getTimestampDevice, DOC(dai, Buffer, getTimestampDevice)) + .def("getSequenceNum", &PipelineEvent::Buffer::getSequenceNum, DOC(dai, Buffer, getSequenceNum)) + .def("setTimestamp", &PipelineEvent::setTimestamp, DOC(dai, Buffer, setTimestamp)) + .def("setTimestampDevice", &PipelineEvent::setTimestampDevice, DOC(dai, Buffer, setTimestampDevice)) + .def("setSequenceNum", &PipelineEvent::setSequenceNum, DOC(dai, Buffer, setSequenceNum)); +} diff --git a/bindings/python/src/pipeline/datatype/PipelineStateBindings.cpp b/bindings/python/src/pipeline/datatype/PipelineStateBindings.cpp new file mode 100644 index 0000000000..328fa9cb7f --- /dev/null +++ b/bindings/python/src/pipeline/datatype/PipelineStateBindings.cpp @@ -0,0 +1,115 @@ +#include + +#include "DatatypeBindings.hpp" + +// depthai +#include "depthai/pipeline/datatype/PipelineState.hpp" + +// pybind +#include +#include + +// #include "spdlog/spdlog.h" + +void bind_pipelinestate(pybind11::module& m, void* pCallstack) { + using namespace dai; + + py::class_ nodeState(m, "NodeState", DOC(dai, NodeState)); + py::class_ durationEvent(nodeState, "DurationEvent", DOC(dai, NodeState, DurationEvent)); + py::class_ nodeStateTimingStats(nodeState, "TimingStats", DOC(dai, NodeState, TimingStats)); + py::class_ nodeStateTiming(nodeState, "Timing", DOC(dai, NodeState, Timing)); + py::class_ nodeStateQueueStats(nodeState, "QueueStats", DOC(dai, NodeState, QueueStats)); + py::class_ nodeStateInputQueueState(nodeState, "InputQueueState", DOC(dai, NodeState, InputQueueState)); + py::class_ nodeStateOutputQueueState(nodeState, "OutputQueueState", DOC(dai, NodeState, OutputQueueState)); + py::class_, Buffer, std::shared_ptr> pipelineState(m, "PipelineState", DOC(dai, PipelineState)); + + py::enum_(nodeStateInputQueueState, "State", DOC(dai, NodeState, InputQueueState, State)) + .value("IDLE", NodeState::InputQueueState::State::IDLE) + .value("WAITING", NodeState::InputQueueState::State::WAITING) + .value("BLOCKED", NodeState::InputQueueState::State::BLOCKED); + py::enum_(nodeStateOutputQueueState, "State", DOC(dai, NodeState, OutputQueueState, State)) + .value("IDLE", NodeState::OutputQueueState::State::IDLE) + .value("SENDING", NodeState::OutputQueueState::State::SENDING); + py::enum_(nodeState, "State", DOC(dai, NodeState, State)) + .value("IDLE", NodeState::State::IDLE) + .value("GETTING_INPUTS", NodeState::State::GETTING_INPUTS) + .value("PROCESSING", NodeState::State::PROCESSING) + .value("SENDING_OUTPUTS", NodeState::State::SENDING_OUTPUTS); + + /////////////////////////////////////////////////////////////////////// + /////////////////////////////////////////////////////////////////////// + /////////////////////////////////////////////////////////////////////// + // Call the rest of the type defines, then perform the actual bindings + Callstack* callstack = (Callstack*)pCallstack; + auto cb = callstack->top(); + callstack->pop(); + cb(m, pCallstack); + // Actual bindings + /////////////////////////////////////////////////////////////////////// + /////////////////////////////////////////////////////////////////////// + /////////////////////////////////////////////////////////////////////// + + durationEvent.def(py::init<>()) + .def("__repr__", &NodeState::DurationEvent::str) + .def_readwrite("startEvent", &NodeState::DurationEvent::startEvent, DOC(dai, NodeState, DurationEvent, startEvent)) + .def_readwrite("durationUs", &NodeState::DurationEvent::durationUs, DOC(dai, NodeState, DurationEvent, durationUs)); + + nodeStateTimingStats.def(py::init<>()) + .def("__repr__", &NodeState::TimingStats::str) + .def_readwrite("minMicros", &NodeState::TimingStats::minMicros, DOC(dai, NodeState, TimingStats, minMicros)) + .def_readwrite("maxMicros", &NodeState::TimingStats::maxMicros, DOC(dai, NodeState, TimingStats, maxMicros)) + .def_readwrite("averageMicrosRecent", &NodeState::TimingStats::averageMicrosRecent, DOC(dai, NodeState, TimingStats, averageMicrosRecent)) + .def_readwrite("stdDevMicrosRecent", &NodeState::TimingStats::stdDevMicrosRecent, DOC(dai, NodeState, TimingStats, stdDevMicrosRecent)) + .def_readwrite("minMicrosRecent", &NodeState::TimingStats::minMicrosRecent, DOC(dai, NodeState, TimingStats, minMicrosRecent)) + .def_readwrite("maxMicrosRecent", &NodeState::TimingStats::maxMicrosRecent, DOC(dai, NodeState, TimingStats, maxMicrosRecent)) + .def_readwrite("medianMicrosRecent", &NodeState::TimingStats::medianMicrosRecent, DOC(dai, NodeState, TimingStats, medianMicrosRecent)); + + nodeStateTiming.def(py::init<>()) + .def("__repr__", &NodeState::Timing::str) + .def_readwrite("fps", &NodeState::Timing::fps, DOC(dai, NodeState, Timing, fps)) + .def_readwrite("durationStats", &NodeState::Timing::durationStats, DOC(dai, NodeState, Timing, durationStats)) + .def("isValid", &NodeState::Timing::isValid, DOC(dai, NodeState, Timing, isValid)); + + nodeStateQueueStats.def(py::init<>()) + .def("__repr__", &NodeState::QueueStats::str) + .def_readwrite("maxQueued", &NodeState::QueueStats::maxQueued, DOC(dai, NodeState, QueueStats, maxQueued)) + .def_readwrite("minQueuedRecent", &NodeState::QueueStats::minQueuedRecent, DOC(dai, NodeState, QueueStats, minQueuedRecent)) + .def_readwrite("maxQueuedRecent", &NodeState::QueueStats::maxQueuedRecent, DOC(dai, NodeState, QueueStats, maxQueuedRecent)) + .def_readwrite("medianQueuedRecent", &NodeState::QueueStats::medianQueuedRecent, DOC(dai, NodeState, QueueStats, medianQueuedRecent)); + + nodeStateInputQueueState.def(py::init<>()) + .def("__repr__", &NodeState::InputQueueState::str) + .def_readwrite("state", &NodeState::InputQueueState::state, DOC(dai, NodeState, InputQueueState, state)) + .def_readwrite("numQueued", &NodeState::InputQueueState::numQueued, DOC(dai, NodeState, InputQueueState, numQueued)) + .def_readwrite("timing", &NodeState::InputQueueState::timing, DOC(dai, NodeState, InputQueueState, timing)) + .def_readwrite("queueStats", &NodeState::InputQueueState::queueStats, DOC(dai, NodeState, InputQueueState, queueStats)) + .def("isValid", &NodeState::InputQueueState::isValid, DOC(dai, NodeState, InputQueueState, isValid)); + + nodeStateOutputQueueState.def(py::init<>()) + .def("__repr__", &NodeState::OutputQueueState::str) + .def_readwrite("state", &NodeState::OutputQueueState::state, DOC(dai, NodeState, OutputQueueState, state)) + .def_readwrite("timing", &NodeState::OutputQueueState::timing, DOC(dai, NodeState, OutputQueueState, timing)) + .def("isValid", &NodeState::OutputQueueState::isValid, DOC(dai, NodeState, OutputQueueState, isValid)); + + nodeState.def(py::init<>()) + .def("__repr__", &NodeState::str) + .def_readwrite("state", &NodeState::state, DOC(dai, NodeState, state)) + .def_readwrite("events", &NodeState::events, DOC(dai, NodeState, events)) + .def_readwrite("inputStates", &NodeState::inputStates, DOC(dai, NodeState, inputStates)) + .def_readwrite("outputStates", &NodeState::outputStates, DOC(dai, NodeState, outputStates)) + .def_readwrite("inputsGetTiming", &NodeState::inputsGetTiming, DOC(dai, NodeState, inputsGetTiming)) + .def_readwrite("outputsSendTiming", &NodeState::outputsSendTiming, DOC(dai, NodeState, outputsSendTiming)) + .def_readwrite("mainLoopTiming", &NodeState::mainLoopTiming, DOC(dai, NodeState, mainLoopTiming)) + .def_readwrite("otherTimings", &NodeState::otherTimings, DOC(dai, NodeState, otherTimings)); + + // Message + pipelineState.def(py::init<>()) + .def("__repr__", &PipelineState::str) + .def_readwrite("nodeStates", &PipelineState::nodeStates, DOC(dai, PipelineState, nodeStates)) + .def("getTimestamp", &PipelineState::Buffer::getTimestamp, DOC(dai, Buffer, getTimestamp)) + .def("getTimestampDevice", &PipelineState::Buffer::getTimestampDevice, DOC(dai, Buffer, getTimestampDevice)) + .def("getSequenceNum", &PipelineState::Buffer::getSequenceNum, DOC(dai, Buffer, getSequenceNum)) + .def("setTimestamp", &PipelineState::setTimestamp, DOC(dai, Buffer, setTimestamp)) + .def("setTimestampDevice", &PipelineState::setTimestampDevice, DOC(dai, Buffer, setTimestampDevice)) + .def("setSequenceNum", &PipelineState::setSequenceNum, DOC(dai, Buffer, setSequenceNum)); +} diff --git a/bindings/python/src/pipeline/node/NodeBindings.cpp b/bindings/python/src/pipeline/node/NodeBindings.cpp index 0f1f67190a..230e48ba12 100644 --- a/bindings/python/src/pipeline/node/NodeBindings.cpp +++ b/bindings/python/src/pipeline/node/NodeBindings.cpp @@ -153,6 +153,7 @@ void bind_uvc(pybind11::module& m, void* pCallstack); void bind_thermal(pybind11::module& m, void* pCallstack); void bind_tof(pybind11::module& m, void* pCallstack); void bind_pointcloud(pybind11::module& m, void* pCallstack); +void bind_pipelineeventaggregation(pybind11::module& m, void* pCallstack); void bind_sync(pybind11::module& m, void* pCallstack); void bind_messagedemux(pybind11::module& m, void* pCallstack); void bind_hostnode(pybind11::module& m, void* pCallstack); @@ -204,6 +205,7 @@ void NodeBindings::addToCallstack(std::deque& callstack) { callstack.push_front(bind_thermal); callstack.push_front(bind_tof); callstack.push_front(bind_pointcloud); + callstack.push_front(bind_pipelineeventaggregation); callstack.push_front(bind_sync); callstack.push_front(bind_messagedemux); callstack.push_front(bind_hostnode); @@ -449,6 +451,8 @@ void NodeBindings::bind(pybind11::module& m, void* pCallstack) { .def("error", [](dai::ThreadedNode& node, const std::string& msg) { node.pimpl->logger->error(msg); }) .def("critical", [](dai::ThreadedNode& node, const std::string& msg) { node.pimpl->logger->critical(msg); }) .def("isRunning", &ThreadedNode::isRunning, DOC(dai, ThreadedNode, isRunning)) + .def("mainLoop", &ThreadedNode::mainLoop, DOC(dai, ThreadedNode, mainLoop)) .def("setLogLevel", &ThreadedNode::setLogLevel, DOC(dai, ThreadedNode, setLogLevel)) - .def("getLogLevel", &ThreadedNode::getLogLevel, DOC(dai, ThreadedNode, getLogLevel)); + .def("getLogLevel", &ThreadedNode::getLogLevel, DOC(dai, ThreadedNode, getLogLevel)) + .def_readonly("pipelineEventOutput", &ThreadedNode::pipelineEventOutput, DOC(dai, ThreadedNode, pipelineEventOutput)); } diff --git a/bindings/python/src/pipeline/node/PipelineEventAggregationBindings.cpp b/bindings/python/src/pipeline/node/PipelineEventAggregationBindings.cpp new file mode 100644 index 0000000000..3f8fa27479 --- /dev/null +++ b/bindings/python/src/pipeline/node/PipelineEventAggregationBindings.cpp @@ -0,0 +1,37 @@ +#include "Common.hpp" +#include "depthai/pipeline/node/internal/PipelineEventAggregation.hpp" +#include "depthai/properties/internal/PipelineEventAggregationProperties.hpp" + +void bind_pipelineeventaggregation(pybind11::module& m, void* pCallstack) { + // using namespace dai; + // using namespace dai::node::internal; + // + // // Node and Properties declare upfront + // py::class_ pipelineEventAggregationProperties( + // m, "PipelineEventAggregationProperties", DOC(dai, PipelineEventAggregationProperties)); + // auto pipelineEventAggregation = ADD_NODE(PipelineEventAggregation); + // + /////////////////////////////////////////////////////////////////////// + /////////////////////////////////////////////////////////////////////// + /////////////////////////////////////////////////////////////////////// + // Call the rest of the type defines, then perform the actual bindings + Callstack* callstack = (Callstack*)pCallstack; + auto cb = callstack->top(); + callstack->pop(); + cb(m, pCallstack); + // Actual bindings + /////////////////////////////////////////////////////////////////////// + /////////////////////////////////////////////////////////////////////// + /////////////////////////////////////////////////////////////////////// + // + // // Properties + // pipelineEventAggregationProperties.def_readwrite("aggregationWindowSize", &PipelineEventAggregationProperties::aggregationWindowSize) + // .def_readwrite("eventBatchSize", &PipelineEventAggregationProperties::eventBatchSize); + // + // // Node + // pipelineEventAggregation.def_readonly("out", &PipelineEventAggregation::out, DOC(dai, node, PipelineEventAggregation, out)) + // .def_readonly("inputs", &PipelineEventAggregation::inputs, DOC(dai, node, PipelineEventAggregation, inputs)) + // .def("setRunOnHost", &PipelineEventAggregation::setRunOnHost, py::arg("runOnHost"), DOC(dai, node, PipelineEventAggregation, setRunOnHost)) + // .def("runOnHost", &PipelineEventAggregation::runOnHost, DOC(dai, node, PipelineEventAggregation, runOnHost)); + // daiNodeModule.attr("PipelineEventAggregation").attr("Properties") = pipelineEventAggregationProperties; +} diff --git a/bindings/python/src/py_bindings.cpp b/bindings/python/src/py_bindings.cpp index 122d030163..cbbb4400e1 100644 --- a/bindings/python/src/py_bindings.cpp +++ b/bindings/python/src/py_bindings.cpp @@ -37,6 +37,7 @@ #include "pipeline/node/NodeBindings.hpp" #include "remote_connection/RemoteConnectionBindings.hpp" #include "utility/EventsManagerBindings.hpp" +#include "utility/PipelineEventDispatcherBindings.hpp" #ifdef DEPTHAI_HAVE_OPENCV_SUPPORT #include #endif @@ -87,6 +88,7 @@ PYBIND11_MODULE(depthai, m) callstack.push_front(&CalibrationHandlerBindings::bind); callstack.push_front(&ZooBindings::bind); callstack.push_front(&EventsManagerBindings::bind); + callstack.push_front(&PipelineEventDispatcherBindings::bind); callstack.push_front(&RemoteConnectionBindings::bind); callstack.push_front(&FilterParamsBindings::bind); // end of the callstack diff --git a/bindings/python/src/pybind11_common.hpp b/bindings/python/src/pybind11_common.hpp index ed09b4879c..e0f3db10f4 100644 --- a/bindings/python/src/pybind11_common.hpp +++ b/bindings/python/src/pybind11_common.hpp @@ -1,6 +1,6 @@ #pragma once -#if(_MSC_VER >= 1910) || !defined(_MSC_VER) +#if (_MSC_VER >= 1910) || !defined(_MSC_VER) #ifndef HAVE_SNPRINTF #define HAVE_SNPRINTF #endif diff --git a/bindings/python/src/utility/PipelineEventDispatcherBindings.cpp b/bindings/python/src/utility/PipelineEventDispatcherBindings.cpp new file mode 100644 index 0000000000..930eea849c --- /dev/null +++ b/bindings/python/src/utility/PipelineEventDispatcherBindings.cpp @@ -0,0 +1,30 @@ +#include "PipelineEventDispatcherBindings.hpp" + +#include "depthai/utility/PipelineEventDispatcher.hpp" + +void PipelineEventDispatcherBindings::bind(pybind11::module& m, void* pCallstack) { + using namespace dai; + /////////////////////////////////////////////////////////////////////// + /////////////////////////////////////////////////////////////////////// + /////////////////////////////////////////////////////////////////////// + // Call the rest of the type defines, then perform the actual bindings + Callstack* callstack = (Callstack*)pCallstack; + auto cb = callstack->top(); + callstack->pop(); + cb(m, pCallstack); + /////////////////////////////////////////////////////////////////////// + /////////////////////////////////////////////////////////////////////// + /////////////////////////////////////////////////////////////////////// + + using namespace dai::utility; + auto pipelineEventDispatcher = py::class_(m, "PipelineEventDispatcher"); + + pipelineEventDispatcher.def(py::init(), py::arg("output")) + .def("setNodeId", &PipelineEventDispatcher::setNodeId, py::arg("id"), DOC(dai, utility, PipelineEventDispatcher, setNodeId)) + .def("startCustomEvent", &PipelineEventDispatcher::startCustomEvent, py::arg("source"), DOC(dai, utility, PipelineEventDispatcher, startCustomEvent)) + .def("endCustomEvent", &PipelineEventDispatcher::endCustomEvent, py::arg("source"), DOC(dai, utility, PipelineEventDispatcher, endCustomEvent)) + .def("pingCustomEvent", &PipelineEventDispatcher::pingCustomEvent, py::arg("source"), DOC(dai, utility, PipelineEventDispatcher, pingCustomEvent)) + .def("inputBlockEvent", &PipelineEventDispatcher::inputBlockEvent, DOC(dai, utility, PipelineEventDispatcher, inputBlockEvent)) + .def("outputBlockEvent", &PipelineEventDispatcher::outputBlockEvent, DOC(dai, utility, PipelineEventDispatcher, outputBlockEvent)) + .def("customBlockEvent", &PipelineEventDispatcher::customBlockEvent, py::arg("source"), DOC(dai, utility, PipelineEventDispatcher, customBlockEvent)); +} diff --git a/bindings/python/src/utility/PipelineEventDispatcherBindings.hpp b/bindings/python/src/utility/PipelineEventDispatcherBindings.hpp new file mode 100644 index 0000000000..36e074193e --- /dev/null +++ b/bindings/python/src/utility/PipelineEventDispatcherBindings.hpp @@ -0,0 +1,8 @@ +#pragma once + +// pybind +#include "pybind11_common.hpp" + +struct PipelineEventDispatcherBindings { + static void bind(pybind11::module& m, void* pCallstack); +}; diff --git a/bindings/python/tests/messsage_queue_test.py b/bindings/python/tests/messsage_queue_test.py index 120c87b2e9..0334ca52f6 100644 --- a/bindings/python/tests/messsage_queue_test.py +++ b/bindings/python/tests/messsage_queue_test.py @@ -387,7 +387,7 @@ def __init__(self, name: str): self.output = self.createOutput() def run(self): - while self.isRunning(): + while self.mainLoop(): buffer = self.input.get() self.output.send(buffer) @@ -399,7 +399,7 @@ def __init__(self, name: str): self.output = self.createOutput() def run(self): - while self.isRunning(): + while self.mainLoop(): buffer = dai.Buffer() self.output.send(buffer) time.sleep(0.001) diff --git a/cmake/Depthai/DepthaiDeviceRVC4Config.cmake b/cmake/Depthai/DepthaiDeviceRVC4Config.cmake index 3dc31c417e..a48a3120db 100644 --- a/cmake/Depthai/DepthaiDeviceRVC4Config.cmake +++ b/cmake/Depthai/DepthaiDeviceRVC4Config.cmake @@ -3,4 +3,4 @@ set(DEPTHAI_DEVICE_RVC4_MATURITY "snapshot") # "version if applicable" -set(DEPTHAI_DEVICE_RVC4_VERSION "0.0.1+f9afe9de6d5ad2b5e774419fd8cf55e4e5324873") +set(DEPTHAI_DEVICE_RVC4_VERSION "0.0.1+c0b8b2ade4a80da127684ad795ccf37cd334ffd9") diff --git a/cmake/Depthai/DepthaiDeviceSideConfig.cmake b/cmake/Depthai/DepthaiDeviceSideConfig.cmake index c24530edc6..2d5ed5327b 100644 --- a/cmake/Depthai/DepthaiDeviceSideConfig.cmake +++ b/cmake/Depthai/DepthaiDeviceSideConfig.cmake @@ -2,7 +2,7 @@ set(DEPTHAI_DEVICE_SIDE_MATURITY "snapshot") # "full commit hash of device side binary" -set(DEPTHAI_DEVICE_SIDE_COMMIT "dd6bf31e9ae1e4f4d75804f077f575b967d1b757") +set(DEPTHAI_DEVICE_SIDE_COMMIT "f98bd6c417b92eae6dcd4757abf8eedf375d8bbd") # "version if applicable" set(DEPTHAI_DEVICE_SIDE_VERSION "") diff --git a/examples/cpp/AprilTags/april_tags_replay.cpp b/examples/cpp/AprilTags/april_tags_replay.cpp index c03958fa4c..71779178d5 100644 --- a/examples/cpp/AprilTags/april_tags_replay.cpp +++ b/examples/cpp/AprilTags/april_tags_replay.cpp @@ -27,7 +27,7 @@ class ImageReplay : public dai::NodeCRTP { return; } - while(isRunning()) { + while(mainLoop()) { // Read the frame from the camera cv::Mat frame; if(!cap.read(frame)) { @@ -96,4 +96,4 @@ int main() { pipeline.wait(); return 0; -} \ No newline at end of file +} diff --git a/examples/cpp/HostNodes/image_manip_host.cpp b/examples/cpp/HostNodes/image_manip_host.cpp index ba1d5df8c5..db37c81972 100644 --- a/examples/cpp/HostNodes/image_manip_host.cpp +++ b/examples/cpp/HostNodes/image_manip_host.cpp @@ -5,11 +5,14 @@ #include "depthai/pipeline/node/host/Display.hpp" #include "depthai/pipeline/node/host/Replay.hpp" +#ifndef VIDEO_PATH + #define VIDEO_PATH "" +#endif + int main(int argc, char** argv) { - if(argc <= 1) { - std::cout << "Video parameter is missing" << std::endl; - std::cout << "Usage: ./image_manip_host video_path" << std::endl; - return -1; + std::string videoFile = VIDEO_PATH; + if(argc > 1) { + videoFile = argv[1]; } dai::Pipeline pipeline(false); @@ -27,7 +30,7 @@ int main(int argc, char** argv) { manip->initialConfig->addFlipVertical(); manip->initialConfig->setFrameType(dai::ImgFrame::Type::RGB888p); - replay->setReplayVideoFile(argv[1]); + replay->setReplayVideoFile(videoFile); replay->setOutFrameType(dai::ImgFrame::Type::NV12); replay->setFps(30); replay->setSize(1280, 720); diff --git a/examples/cpp/HostNodes/threaded_host_node.cpp b/examples/cpp/HostNodes/threaded_host_node.cpp index 90b95560cd..60c02c0080 100644 --- a/examples/cpp/HostNodes/threaded_host_node.cpp +++ b/examples/cpp/HostNodes/threaded_host_node.cpp @@ -10,7 +10,7 @@ class TestPassthrough : public dai::node::CustomThreadedNode { Output output = dai::Node::Output{*this, {}}; void run() override { - while(isRunning()) { + while(mainLoop()) { auto buffer = input.get(); if(buffer) { std::cout << "The passthrough node received a buffer!" << std::endl; @@ -25,7 +25,7 @@ class TestSink : public dai::node::CustomThreadedNode { Input input = dai::Node::Input{*this, {}}; void run() override { - while(isRunning()) { + while(mainLoop()) { auto buffer = input.get(); if(buffer) { std::cout << "The sink node received a buffer!" << std::endl; @@ -39,7 +39,7 @@ class TestSource : public dai::node::CustomThreadedNode { Output output = dai::Node::Output{*this, {}}; void run() override { - while(isRunning()) { + while(mainLoop()) { auto buffer = std::make_shared(); std::cout << "The source node is sending a buffer!" << std::endl; output.send(buffer); diff --git a/examples/cpp/Misc/CMakeLists.txt b/examples/cpp/Misc/CMakeLists.txt index 9e42fa359b..ba18767501 100644 --- a/examples/cpp/Misc/CMakeLists.txt +++ b/examples/cpp/Misc/CMakeLists.txt @@ -2,4 +2,5 @@ project(misc_examples) cmake_minimum_required(VERSION 3.10) add_subdirectory(AutoReconnect) -add_subdirectory(Projectors) \ No newline at end of file +add_subdirectory(Projectors) +add_subdirectory(PipelineDebugging) diff --git a/examples/cpp/Misc/PipelineDebugging/CMakeLists.txt b/examples/cpp/Misc/PipelineDebugging/CMakeLists.txt new file mode 100644 index 0000000000..99af9ea6cb --- /dev/null +++ b/examples/cpp/Misc/PipelineDebugging/CMakeLists.txt @@ -0,0 +1,8 @@ +project(pipeline_debugging_examples) +cmake_minimum_required(VERSION 3.10) + +## function: dai_add_example(example_name example_src enable_test use_pcl) +## function: dai_set_example_test_labels(example_name ...) + +dai_add_example(get_pipeline_state get_pipeline_state.cpp ON OFF) +dai_add_example(node_pipeline_events node_pipeline_events.cpp ON OFF) diff --git a/examples/cpp/Misc/PipelineDebugging/get_pipeline_state.cpp b/examples/cpp/Misc/PipelineDebugging/get_pipeline_state.cpp new file mode 100644 index 0000000000..29f0df73fe --- /dev/null +++ b/examples/cpp/Misc/PipelineDebugging/get_pipeline_state.cpp @@ -0,0 +1,107 @@ +#include +#include + +#include "depthai/depthai.hpp" + +int main() { + dai::Pipeline pipeline; + pipeline.enablePipelineDebugging(); + + auto monoLeft = pipeline.create()->build(dai::CameraBoardSocket::CAM_B); + auto monoRight = pipeline.create()->build(dai::CameraBoardSocket::CAM_C); + + auto stereo = pipeline.create(); + + auto monoLeftOut = monoLeft->requestFullResolutionOutput(); + auto monoRightOut = monoRight->requestFullResolutionOutput(); + + monoLeftOut->link(stereo->left); + monoRightOut->link(stereo->right); + + stereo->setRectification(true); + stereo->setExtendedDisparity(true); + stereo->setLeftRightCheck(true); + + auto disparityQueue = stereo->disparity.createOutputQueue(); + + double maxDisparity = 1.0; + pipeline.start(); + while(pipeline.isRunning()) { + auto disparity = disparityQueue->get(); + cv::Mat npDisparity = disparity->getFrame(); + + double minVal, curMax; + cv::minMaxLoc(npDisparity, &minVal, &curMax); + maxDisparity = std::max(maxDisparity, curMax); + + // Normalize the disparity image to an 8-bit scale. + cv::Mat normalized; + npDisparity.convertTo(normalized, CV_8UC1, 255.0 / maxDisparity); + + cv::Mat colorizedDisparity; + cv::applyColorMap(normalized, colorizedDisparity, cv::COLORMAP_JET); + + // Set pixels with zero disparity to black. + colorizedDisparity.setTo(cv::Scalar(0, 0, 0), normalized == 0); + + cv::imshow("disparity", colorizedDisparity); + + int key = cv::waitKey(1); + if(key == 'q') { + break; + } else if(key == 's') { + auto state = pipeline.getPipelineState().nodes().detailed(); + try { + // Assuming these APIs exist similarly in C++ + auto pipelineState = pipeline.getPipelineState().nodes().detailed(); + + for(const auto& [nodeId, nodeState] : pipelineState.nodeStates) { + std::string nodeName = pipeline.getNode(nodeId)->getName(); + + std::cout << "\n# State for node " << nodeName << " (" << nodeId << "):\n"; + + std::cout << "## State: " << (int)nodeState.state << "\n"; + + std::cout << "## mainLoopTiming: " << (nodeState.mainLoopTiming.isValid() ? "" : "invalid") << "\n"; + if(nodeState.mainLoopTiming.isValid()) { + std::cout << "-----\n" << nodeState.mainLoopTiming.str() << "\n-----\n"; + } + + std::cout << "## inputsGetTiming: " << (nodeState.inputsGetTiming.isValid() ? "" : "invalid") << "\n"; + if(nodeState.inputsGetTiming.isValid()) { + std::cout << "-----\n" << nodeState.inputsGetTiming.str() << "\n-----\n"; + } + + std::cout << "## outputsSendTiming: " << (nodeState.outputsSendTiming.isValid() ? "" : "invalid") << "\n"; + if(nodeState.outputsSendTiming.isValid()) { + std::cout << "-----\n" << nodeState.outputsSendTiming.str() << "\n-----\n"; + } + + std::cout << "## inputStates: " << (nodeState.inputStates.empty() ? "empty" : "") << "\n"; + for(const auto& [inputName, inputState] : nodeState.inputStates) { + if(inputState.isValid()) + std::cout << "### " << inputName << ":\n-----" << inputState.str() << "\n-----\n"; + else + std::cout << "### " << inputName << ": invalid\n"; + } + + std::cout << "## outputStates: " << (nodeState.outputStates.empty() ? "empty" : "") << "\n"; + for(const auto& [outputName, outputState] : nodeState.outputStates) { + std::cout << "### " << outputName << ":\n-----" << outputState.str() << "\n-----\n"; + } + + std::cout << "## otherTimings: " << (nodeState.otherTimings.empty() ? "empty" : "") << "\n"; + for(const auto& [otherName, otherTiming] : nodeState.otherTimings) { + std::cout << "### " << otherName << ":\n-----" << otherTiming.str() << "\n-----\n"; + } + } + } catch(const std::runtime_error& e) { + std::cerr << "Error getting pipeline state: " << e.what() << "\n"; + } + } + } + + pipeline.stop(); + + return 0; +} diff --git a/examples/cpp/Misc/PipelineDebugging/node_pipeline_events.cpp b/examples/cpp/Misc/PipelineDebugging/node_pipeline_events.cpp new file mode 100644 index 0000000000..e19dee459f --- /dev/null +++ b/examples/cpp/Misc/PipelineDebugging/node_pipeline_events.cpp @@ -0,0 +1,63 @@ +#include +#include + +#include "depthai/depthai.hpp" + +int main() { + dai::Pipeline pipeline; + pipeline.enablePipelineDebugging(); + + auto monoLeft = pipeline.create()->build(dai::CameraBoardSocket::CAM_B); + auto monoRight = pipeline.create()->build(dai::CameraBoardSocket::CAM_C); + + auto stereo = pipeline.create(); + + auto monoLeftOut = monoLeft->requestFullResolutionOutput(); + auto monoRightOut = monoRight->requestFullResolutionOutput(); + + monoLeftOut->link(stereo->left); + monoRightOut->link(stereo->right); + + stereo->setRectification(true); + stereo->setExtendedDisparity(true); + stereo->setLeftRightCheck(true); + + auto disparityQueue = stereo->disparity.createOutputQueue(); + auto monoLeftEventQueue = monoLeft->pipelineEventOutput.createOutputQueue(1, false); + + double maxDisparity = 1.0; + pipeline.start(); + while(pipeline.isRunning()) { + auto disparity = disparityQueue->get(); + auto latestNodeEvent = monoLeftEventQueue->tryGet(); + + cv::Mat npDisparity = disparity->getFrame(); + + double minVal, curMax; + cv::minMaxLoc(npDisparity, &minVal, &curMax); + maxDisparity = std::max(maxDisparity, curMax); + + // Normalize the disparity image to an 8-bit scale. + cv::Mat normalized; + npDisparity.convertTo(normalized, CV_8UC1, 255.0 / maxDisparity); + + cv::Mat colorizedDisparity; + cv::applyColorMap(normalized, colorizedDisparity, cv::COLORMAP_JET); + + // Set pixels with zero disparity to black. + colorizedDisparity.setTo(cv::Scalar(0, 0, 0), normalized == 0); + + cv::imshow("disparity", colorizedDisparity); + + std::cout << "Latest event from MonoLeft camera node: " << (latestNodeEvent ? latestNodeEvent->str() : "No event"); + + int key = cv::waitKey(1); + if(key == 'q') { + break; + } + } + + pipeline.stop(); + + return 0; +} diff --git a/examples/cpp/RGBD/rgbd.cpp b/examples/cpp/RGBD/rgbd.cpp index 59369862ab..d4f73f2734 100644 --- a/examples/cpp/RGBD/rgbd.cpp +++ b/examples/cpp/RGBD/rgbd.cpp @@ -15,7 +15,7 @@ class RerunNode : public dai::NodeCRTP { const auto rec = rerun::RecordingStream("rerun"); rec.spawn().exit_on_failure(); rec.log_static("world", rerun::ViewCoordinates::RDF); - while(isRunning()) { + while(mainLoop()) { auto pclIn = inputPCL.get(); auto rgbdIn = inputRGBD.get(); if(pclIn != nullptr) { diff --git a/examples/cpp/RGBD/rgbd_pcl_processing.cpp b/examples/cpp/RGBD/rgbd_pcl_processing.cpp index c869aec804..155543f723 100644 --- a/examples/cpp/RGBD/rgbd_pcl_processing.cpp +++ b/examples/cpp/RGBD/rgbd_pcl_processing.cpp @@ -15,7 +15,7 @@ class CustomPCLProcessingNode : public dai::NodeCRTP(); auto pclOut = std::make_shared(); if(pclIn != nullptr) { diff --git a/examples/cpp/RVC2/VSLAM/rerun_node.hpp b/examples/cpp/RVC2/VSLAM/rerun_node.hpp index dcea0fd91b..0c7fd1a41e 100644 --- a/examples/cpp/RVC2/VSLAM/rerun_node.hpp +++ b/examples/cpp/RVC2/VSLAM/rerun_node.hpp @@ -41,7 +41,7 @@ class RerunNode : public dai::NodeCRTP { rec.spawn().exit_on_failure(); rec.log_static("world", rerun::ViewCoordinates::FLU); rec.log("world/ground", rerun::Boxes3D::from_half_sizes({{3.f, 3.f, 0.00001f}})); - while(isRunning()) { + while(mainLoop()) { std::shared_ptr transData = inputTrans.get(); auto imgFrame = inputImg.get(); if(!intrinsicsSet) { @@ -107,4 +107,4 @@ class RerunNode : public dai::NodeCRTP { float fx = 400.0; float fy = 400.0; bool intrinsicsSet = false; -}; \ No newline at end of file +}; diff --git a/examples/python/AprilTags/april_tags_replay.py b/examples/python/AprilTags/april_tags_replay.py index 01cae83a97..fe6dec7f26 100644 --- a/examples/python/AprilTags/april_tags_replay.py +++ b/examples/python/AprilTags/april_tags_replay.py @@ -24,7 +24,7 @@ def __init__(self): imgFrame.setType(dai.ImgFrame.Type.GRAY8) self.imgFrame = imgFrame def run(self): - while self.isRunning(): + while self.mainLoop(): self.output.send(self.imgFrame) time.sleep(0.03) diff --git a/examples/python/CMakeLists.txt b/examples/python/CMakeLists.txt index cd9f379f4d..590f15599f 100644 --- a/examples/python/CMakeLists.txt +++ b/examples/python/CMakeLists.txt @@ -221,6 +221,12 @@ dai_set_example_test_labels(image_manip_remap ondevice rvc2_all rvc4 rvc4rgb ci) add_python_example(reconnect_callback Misc/AutoReconnect/reconnect_callback.py) dai_set_example_test_labels(reconnect_callback ondevice rvc2_all rvc4 rvc4rgb ci) +add_python_example(get_pipeline_state Misc/PipelineDebugging/get_pipeline_state.py) +dai_set_example_test_labels(get_pipeline_state ondevice rvc2_all rvc4 rvc4rgb ci) + +add_python_example(node_pipeline_events Misc/PipelineDebugging/node_pipeline_events.py) +dai_set_example_test_labels(node_pipeline_events ondevice rvc2_all rvc4 rvc4rgb ci) + ## SystemLogger add_python_example(system_logger RVC2/SystemLogger/system_information.py) dai_set_example_test_labels(system_logger ondevice rvc2_all ci) diff --git a/examples/python/HostNodes/host_camera.py b/examples/python/HostNodes/host_camera.py index fd458b84b8..83dd78089a 100644 --- a/examples/python/HostNodes/host_camera.py +++ b/examples/python/HostNodes/host_camera.py @@ -13,7 +13,7 @@ def run(self): if not cap.isOpened(): p.stop() raise RuntimeError("Error: Couldn't open host camera") - while self.isRunning(): + while self.mainLoop(): # Read the frame from the camera ret, frame = cap.read() if not ret: @@ -40,4 +40,4 @@ def run(self): key = cv2.waitKey(1) if key == ord('q'): p.stop() - break \ No newline at end of file + break diff --git a/examples/python/HostNodes/threaded_host_nodes.py b/examples/python/HostNodes/threaded_host_nodes.py index 7b3c508279..4118fd7887 100644 --- a/examples/python/HostNodes/threaded_host_nodes.py +++ b/examples/python/HostNodes/threaded_host_nodes.py @@ -31,7 +31,7 @@ def onStop(self): print("Goodbye from", self.name) def run(self): - while self.isRunning(): + while self.mainLoop(): buffer = self.input.get() print("The passthrough node received a buffer!") self.output.send(buffer) @@ -47,7 +47,7 @@ def onStart(self): print("Hello, this is", self.name) def run(self): - while self.isRunning(): + while self.mainLoop(): buffer = self.input.get() del buffer print(f"{self.name} node received a buffer!") @@ -59,7 +59,7 @@ def __init__(self, name: str): self.output = self.createOutput() def run(self): - while self.isRunning(): + while self.mainLoop(): buffer = dai.Buffer() print(f"{self.name} node is sending a buffer!") self.output.send(buffer) @@ -77,7 +77,7 @@ def __init__(self, name: str): self.passthrough1.output.link(self.passthrough2.input) def run(self): - while self.isRunning(): + while self.mainLoop(): buffer = self.input.get() self.output.send(buffer) @@ -105,4 +105,4 @@ def run(self): p.start() while p.isRunning(): time.sleep(1) - print("Pipeline is running...") \ No newline at end of file + print("Pipeline is running...") diff --git a/examples/python/Misc/PipelineDebugging/get_pipeline_state.py b/examples/python/Misc/PipelineDebugging/get_pipeline_state.py new file mode 100644 index 0000000000..8022d49427 --- /dev/null +++ b/examples/python/Misc/PipelineDebugging/get_pipeline_state.py @@ -0,0 +1,79 @@ +import depthai as dai +import numpy as np +import cv2 + +with dai.Pipeline() as pipeline: + # Pipeline debugging is disabled by default. + # You can also enable it by setting the DEPTHAI_PIPELINE_DEBUGGING environment variable to '1' + pipeline.enablePipelineDebugging(True) + + monoLeft = pipeline.create(dai.node.Camera).build(dai.CameraBoardSocket.CAM_B) + monoRight = pipeline.create(dai.node.Camera).build(dai.CameraBoardSocket.CAM_C) + stereo = pipeline.create(dai.node.StereoDepth) + + # Linking + monoLeftOut = monoLeft.requestFullResolutionOutput() + monoRightOut = monoRight.requestFullResolutionOutput() + monoLeftOut.link(stereo.left) + monoRightOut.link(stereo.right) + + stereo.setRectification(True) + stereo.setExtendedDisparity(True) + stereo.setLeftRightCheck(True) + + disparityQueue = stereo.disparity.createOutputQueue() + + colorMap = cv2.applyColorMap(np.arange(256, dtype=np.uint8), cv2.COLORMAP_JET) + colorMap[0] = [0, 0, 0] # to make zero-disparity pixels black + + with pipeline: + pipeline.start() + maxDisparity = 1 + while pipeline.isRunning(): + disparity = disparityQueue.get() + assert isinstance(disparity, dai.ImgFrame) + npDisparity = disparity.getFrame() + maxDisparity = max(maxDisparity, np.max(npDisparity)) + colorizedDisparity = cv2.applyColorMap(((npDisparity / maxDisparity) * 255).astype(np.uint8), colorMap) + cv2.imshow("disparity", colorizedDisparity) + key = cv2.waitKey(1) + if key == ord('q'): + pipeline.stop() + break + elif key == ord('s'): + try: + # If pipeline debugging is disabled, this will raise an exception + pipelineState = pipeline.getPipelineState().nodes().detailed() + for nodeId, nodeState in pipelineState.nodeStates.items(): + nodeName = pipeline.getNode(nodeId).getName() + print(f"\n# State for node {pipeline.getNode(nodeId).getName()} ({nodeId}):") + print(f"## State: {nodeState.state}") + print(f"## mainLoopTiming: {'invalid' if not nodeState.mainLoopTiming.isValid() else ''}") + if(nodeState.mainLoopTiming.isValid()): + print("-----") + print(nodeState.mainLoopTiming) + print("-----") + print(f"## inputsGetTiming: {'invalid' if not nodeState.inputsGetTiming.isValid() else ''}") + if(nodeState.inputsGetTiming.isValid()): + print("-----") + print(nodeState.inputsGetTiming) + print("-----") + print(f"## outputsSendTiming: {'invalid' if not nodeState.outputsSendTiming.isValid() else ''}") + if(nodeState.outputsSendTiming.isValid()): + print("-----") + print(nodeState.outputsSendTiming) + print("-----") + print(f"## inputStates: {'empty' if not nodeState.inputStates else ''}") + for inputName, inputState in nodeState.inputStates.items(): + if inputState.isValid(): + print(f"### {inputName}:\n-----{inputState}\n-----") + else: + print(f"### {inputName}: invalid") + print(f"## outputStates: {'empty' if not nodeState.outputStates else ''}") + for outputName, outputState in nodeState.outputStates.items(): + print(f"### {outputName}:\n-----{outputState}\n-----") + print(f"## otherTimings: {'empty' if not nodeState.otherTimings else ''}") + for otherName, otherTiming in nodeState.otherTimings.items(): + print(f"### {otherName}:\n-----{otherTiming}\n-----") + except Exception as e: + print("Error getting pipeline state:", e) diff --git a/examples/python/Misc/PipelineDebugging/node_pipeline_events.py b/examples/python/Misc/PipelineDebugging/node_pipeline_events.py new file mode 100644 index 0000000000..bd896fdefa --- /dev/null +++ b/examples/python/Misc/PipelineDebugging/node_pipeline_events.py @@ -0,0 +1,45 @@ +import depthai as dai +import numpy as np +import cv2 + +with dai.Pipeline() as pipeline: + # Pipeline debugging is disabled by default. + # You can also enable it by setting the DEPTHAI_PIPELINE_DEBUGGING environment variable to '1' + pipeline.enablePipelineDebugging(True) + + monoLeft = pipeline.create(dai.node.Camera).build(dai.CameraBoardSocket.CAM_B) + monoRight = pipeline.create(dai.node.Camera).build(dai.CameraBoardSocket.CAM_C) + stereo = pipeline.create(dai.node.StereoDepth) + + # Linking + monoLeftOut = monoLeft.requestFullResolutionOutput() + monoRightOut = monoRight.requestFullResolutionOutput() + monoLeftOut.link(stereo.left) + monoRightOut.link(stereo.right) + + stereo.setRectification(True) + stereo.setExtendedDisparity(True) + stereo.setLeftRightCheck(True) + + disparityQueue = stereo.disparity.createOutputQueue() + monoLeftEventQueue = monoLeft.pipelineEventOutput.createOutputQueue() + + colorMap = cv2.applyColorMap(np.arange(256, dtype=np.uint8), cv2.COLORMAP_JET) + colorMap[0] = [0, 0, 0] # to make zero-disparity pixels black + + with pipeline: + pipeline.start() + maxDisparity = 1 + while pipeline.isRunning(): + disparity = disparityQueue.get() + latestEvent = monoLeftEventQueue.tryGet() + assert isinstance(disparity, dai.ImgFrame) + npDisparity = disparity.getFrame() + maxDisparity = max(maxDisparity, np.max(npDisparity)) + colorizedDisparity = cv2.applyColorMap(((npDisparity / maxDisparity) * 255).astype(np.uint8), colorMap) + cv2.imshow("disparity", colorizedDisparity) + print(f"Latest event from MonoLeft camera node: {latestEvent if latestEvent is not None else 'No event'}") + key = cv2.waitKey(1) + if key == ord('q'): + pipeline.stop() + break diff --git a/examples/python/RGBD/rgbd.py b/examples/python/RGBD/rgbd.py index 4f22020971..5e15d796d6 100644 --- a/examples/python/RGBD/rgbd.py +++ b/examples/python/RGBD/rgbd.py @@ -21,7 +21,7 @@ def run(self): rr.init("", spawn=True) rr.log("world", rr.ViewCoordinates.RDF) rr.log("world/ground", rr.Boxes3D(half_sizes=[3.0, 3.0, 0.00001])) - while self.isRunning(): + while self.mainLoop(): try: inPointCloud = self.inputPCL.get() except dai.MessageQueue.QueueException: diff --git a/examples/python/RGBD/rgbd_o3d.py b/examples/python/RGBD/rgbd_o3d.py index 0f7d2787f7..5ba5140155 100644 --- a/examples/python/RGBD/rgbd_o3d.py +++ b/examples/python/RGBD/rgbd_o3d.py @@ -33,7 +33,7 @@ def key_callback(vis, action, mods): ) vis.add_geometry(coordinateFrame) first = True - while self.isRunning(): + while self.mainLoop(): try: inPointCloud = self.inputPCL.tryGet() except dai.MessageQueue.QueueException: diff --git a/examples/python/RGBD/rgbd_pcl_processing.py b/examples/python/RGBD/rgbd_pcl_processing.py index 804d83e77c..0b1930bace 100644 --- a/examples/python/RGBD/rgbd_pcl_processing.py +++ b/examples/python/RGBD/rgbd_pcl_processing.py @@ -11,7 +11,7 @@ def __init__(self): self.thresholdDistance = 3000.0 def run(self): - while self.isRunning(): + while self.mainLoop(): try: inPointCloud = self.inputPCL.get() except dai.MessageQueue.QueueException: diff --git a/examples/python/RVC2/VSLAM/rerun_node.py b/examples/python/RVC2/VSLAM/rerun_node.py index 4211b7ef1e..caefda601a 100644 --- a/examples/python/RVC2/VSLAM/rerun_node.py +++ b/examples/python/RVC2/VSLAM/rerun_node.py @@ -36,7 +36,7 @@ def run(self): rr.init("", spawn=True) rr.log("world", rr.ViewCoordinates.FLU) rr.log("world/ground", rr.Boxes3D(half_sizes=[3.0, 3.0, 0.00001])) - while self.isRunning(): + while self.mainLoop(): transData = self.inputTrans.get() imgFrame = self.inputImg.get() if not self.intrinsicsSet: @@ -63,4 +63,4 @@ def run(self): points, colors = pclGrndData.getPointsRGB() rr.log("world/ground_pcl", rr.Points3D(points, colors=colors, radii=[0.01])) if mapData is not None: - rr.log("map", rr.Image(mapData.getCvFrame())) \ No newline at end of file + rr.log("map", rr.Image(mapData.getCvFrame())) diff --git a/examples/python/Visualizer/visualizer_encoded.py b/examples/python/Visualizer/visualizer_encoded.py index 9c6ab195af..78d0eca6fe 100644 --- a/examples/python/Visualizer/visualizer_encoded.py +++ b/examples/python/Visualizer/visualizer_encoded.py @@ -20,7 +20,7 @@ def __init__(self): def setLabelMap(self, labelMap): self.labelMap = labelMap def run(self): - while self.isRunning(): + while self.mainLoop(): nnData = self.inputDet.get() detections = nnData.detections imgAnnt = dai.ImgAnnotations() diff --git a/images/pipeline_debugging_graph.png b/images/pipeline_debugging_graph.png new file mode 100644 index 0000000000..e4340cd99d Binary files /dev/null and b/images/pipeline_debugging_graph.png differ diff --git a/include/depthai/pipeline/MessageQueue.hpp b/include/depthai/pipeline/MessageQueue.hpp index e7667e6238..00d2ce98db 100644 --- a/include/depthai/pipeline/MessageQueue.hpp +++ b/include/depthai/pipeline/MessageQueue.hpp @@ -7,6 +7,7 @@ // project #include "depthai/pipeline/datatype/ADatatype.hpp" #include "depthai/utility/LockingQueue.hpp" +#include "depthai/utility/PipelineEventDispatcherInterface.hpp" // shared namespace dai { @@ -37,26 +38,37 @@ class MessageQueue : public std::enable_shared_from_this { private: void callCallbacks(std::shared_ptr msg); + std::shared_ptr pipelineEventDispatcher; public: // DataOutputQueue constructor explicit MessageQueue(unsigned int maxSize = 16, bool blocking = true); - explicit MessageQueue(std::string name, unsigned int maxSize = 16, bool blocking = true); + explicit MessageQueue(std::string name, + unsigned int maxSize = 16, + bool blocking = true, + std::shared_ptr pipelineEventDispatcher = nullptr); MessageQueue(const MessageQueue& c) - : enable_shared_from_this(c), queue(c.queue), name(c.name), callbacks(c.callbacks), uniqueCallbackId(c.uniqueCallbackId){}; + : enable_shared_from_this(c), + queue(c.queue), + name(c.name), + callbacks(c.callbacks), + uniqueCallbackId(c.uniqueCallbackId), + pipelineEventDispatcher(c.pipelineEventDispatcher) {}; MessageQueue(MessageQueue&& m) noexcept : enable_shared_from_this(m), queue(std::move(m.queue)), name(std::move(m.name)), callbacks(std::move(m.callbacks)), - uniqueCallbackId(m.uniqueCallbackId){}; + uniqueCallbackId(m.uniqueCallbackId), + pipelineEventDispatcher(m.pipelineEventDispatcher) {}; MessageQueue& operator=(const MessageQueue& c) { queue = c.queue; name = c.name; callbacks = c.callbacks; uniqueCallbackId = c.uniqueCallbackId; + pipelineEventDispatcher = c.pipelineEventDispatcher; return *this; } @@ -65,6 +77,7 @@ class MessageQueue : public std::enable_shared_from_this { name = std::move(m.name); callbacks = std::move(m.callbacks); uniqueCallbackId = m.uniqueCallbackId; + pipelineEventDispatcher = m.pipelineEventDispatcher; return *this; } @@ -200,8 +213,23 @@ class MessageQueue : public std::enable_shared_from_this { throw QueueException(CLOSED_QUEUE_MESSAGE); } std::shared_ptr val = nullptr; - if(!queue.tryPop(val)) return nullptr; - return std::dynamic_pointer_cast(val); + auto getInput = [this, &val]() -> std::shared_ptr { + if(!queue.tryPop(val)) { + return nullptr; + } + return std::dynamic_pointer_cast(val); + }; + if(pipelineEventDispatcher) { + auto blockEvent = pipelineEventDispatcher->blockEvent(PipelineEvent::Type::INPUT, name); + blockEvent.setQueueSize(getSize()); + auto result = getInput(); + if(!result) { + blockEvent.cancel(); + } + return result; + } else { + return getInput(); + } } /** @@ -221,8 +249,17 @@ class MessageQueue : public std::enable_shared_from_this { template std::shared_ptr get() { std::shared_ptr val = nullptr; - if(!queue.waitAndPop(val)) { - throw QueueException(CLOSED_QUEUE_MESSAGE); + auto getInput = [this, &val]() { + if(!queue.waitAndPop(val)) { + throw QueueException(CLOSED_QUEUE_MESSAGE); + } + }; + if(pipelineEventDispatcher) { + auto blockEvent = pipelineEventDispatcher->blockEvent(PipelineEvent::Type::INPUT, name); + blockEvent.setQueueSize(getSize()); + getInput(); + } else { + getInput(); } return std::dynamic_pointer_cast(val); } diff --git a/include/depthai/pipeline/Node.hpp b/include/depthai/pipeline/Node.hpp index cd6ef073ef..bc114a5304 100644 --- a/include/depthai/pipeline/Node.hpp +++ b/include/depthai/pipeline/Node.hpp @@ -20,6 +20,7 @@ #include "depthai/capabilities/Capability.hpp" #include "depthai/pipeline/datatype/DatatypeEnum.hpp" #include "depthai/properties/Properties.hpp" +#include "depthai/utility/PipelineEventDispatcherInterface.hpp" // libraries #include @@ -28,6 +29,9 @@ namespace dai { // fwd declare Pipeline class Pipeline; class PipelineImpl; +namespace utility { +class PipelineImplHelper; +} // fwd declare input queue class class InputQueue; @@ -38,6 +42,7 @@ class InputQueue; class Node : public std::enable_shared_from_this { friend class Pipeline; friend class PipelineImpl; + friend class utility::PipelineImplHelper; friend class Device; public: @@ -65,7 +70,9 @@ class Node : public std::enable_shared_from_this { static constexpr auto DEFAULT_NAME = ""; #define DEFAULT_TYPES \ { \ - { DatatypeEnum::Buffer, true } \ + { \ + DatatypeEnum::Buffer, true \ + } \ } static constexpr auto DEFAULT_BLOCKING = true; static constexpr auto DEFAULT_QUEUE_SIZE = 3; @@ -83,6 +90,8 @@ class Node : public std::enable_shared_from_this { std::vector inputMapRefs; std::vector*> nodeRefs; + std::shared_ptr pipelineEventDispatcher; + // helpers for setting refs void setOutputRefs(std::initializer_list l); void setOutputRefs(Output* outRef); @@ -129,11 +138,12 @@ class Node : public std::enable_shared_from_this { std::vector queueConnections; Type type = Type::MSender; // Slave sender not supported yet OutputDescription desc; + std::shared_ptr pipelineEventDispatcher; public: // std::vector possibleCapabilities; - Output(Node& par, OutputDescription desc, bool ref = true) : parent(par), desc(std::move(desc)) { + Output(Node& par, OutputDescription desc, bool ref = true) : parent(par), desc(std::move(desc)), pipelineEventDispatcher(par.pipelineEventDispatcher) { // Place oneself to the parents references if(ref) { par.setOutputRefs(this); @@ -346,16 +356,14 @@ class Node : public std::enable_shared_from_this { public: std::vector possibleDatatypes; explicit Input(Node& par, InputDescription desc, bool ref = true) - : MessageQueue(std::move(desc.name), desc.queueSize, desc.blocking), + : MessageQueue(desc.name.empty() ? par.createUniqueInputName() : desc.name, desc.queueSize, desc.blocking, par.pipelineEventDispatcher), parent(par), waitForMessage(desc.waitForMessage), + group(desc.group), possibleDatatypes(std::move(desc.types)) { if(ref) { par.setInputRefs(this); } - if(getName().empty()) { - setName(par.createUniqueInputName()); - } } /** diff --git a/include/depthai/pipeline/NodeObjInfo.hpp b/include/depthai/pipeline/NodeObjInfo.hpp index da1c87a0d1..8b4f931841 100644 --- a/include/depthai/pipeline/NodeObjInfo.hpp +++ b/include/depthai/pipeline/NodeObjInfo.hpp @@ -15,6 +15,8 @@ struct NodeObjInfo { std::string name; std::string alias; + std::string deviceId; + bool deviceNode = true; std::vector properties; @@ -27,6 +29,6 @@ struct NodeObjInfo { std::unordered_map, NodeIoInfo, IoInfoKey> ioInfo; }; -DEPTHAI_SERIALIZE_EXT(NodeObjInfo, id, parentId, name, alias, properties, logLevel, ioInfo); +DEPTHAI_SERIALIZE_EXT(NodeObjInfo, id, parentId, name, alias, deviceId, deviceNode, properties, logLevel, ioInfo); } // namespace dai diff --git a/include/depthai/pipeline/Pipeline.hpp b/include/depthai/pipeline/Pipeline.hpp index 83457a05f9..c5c5d03805 100644 --- a/include/depthai/pipeline/Pipeline.hpp +++ b/include/depthai/pipeline/Pipeline.hpp @@ -12,14 +12,18 @@ #include "AssetManager.hpp" #include "DeviceNode.hpp" #include "Node.hpp" +#include "PipelineStateApi.hpp" #include "depthai/device/CalibrationHandler.hpp" #include "depthai/device/Device.hpp" #include "depthai/openvino/OpenVINO.hpp" +#include "depthai/pipeline/datatype/PipelineEventAggregationConfig.hpp" #include "depthai/utility/AtomicBool.hpp" // shared #include "depthai/device/BoardConfig.hpp" +#include "depthai/pipeline/InputQueue.hpp" #include "depthai/pipeline/PipelineSchema.hpp" +#include "depthai/pipeline/datatype/PipelineState.hpp" #include "depthai/properties/GlobalProperties.hpp" #include "depthai/utility/RecordReplay.hpp" @@ -31,6 +35,7 @@ class PipelineImpl : public std::enable_shared_from_this { friend class Pipeline; friend class Node; friend class DeviceBase; + friend class utility::PipelineImplHelper; public: PipelineImpl(Pipeline& pipeline, bool createImplicitDevice = true) : assetManager("/pipeline/"), parent(pipeline) { @@ -45,6 +50,28 @@ class PipelineImpl : public std::enable_shared_from_this { PipelineImpl& operator=(PipelineImpl&&) = delete; ~PipelineImpl(); +protected: + // Record and Replay + RecordConfig recordConfig; + bool enableHolisticRecordReplay = false; + std::unordered_map recordReplayFilenames; + bool removeRecordReplayFiles = true; + std::string defaultDeviceId; + // Is the pipeline building on host? Some steps should be skipped when building on device + bool buildingOnHost = true; + + // Pipeline events + bool enablePipelineDebugging = false; + std::shared_ptr pipelineStateOut; + std::shared_ptr pipelineStateRequest; + std::shared_ptr pipelineStateTraceOut; + std::shared_ptr pipelineStateTraceRequest; + + // Access to nodes + std::vector> getAllNodes() const; + std::shared_ptr getNode(Node::Id id) const; + std::vector> getSourceNodes(); + private: // static functions static bool isSamePipeline(const Node::Output& out, const Node::Input& in); @@ -52,7 +79,8 @@ class PipelineImpl : public std::enable_shared_from_this { // Functions Node::Id getNextUniqueId(); - PipelineSchema getPipelineSchema(SerializationType type = DEFAULT_SERIALIZATION_TYPE) const; + PipelineSchema getPipelineSchema(SerializationType type = DEFAULT_SERIALIZATION_TYPE, bool includePipelineDebugging = true) const; + PipelineSchema getDevicePipelineSchema(SerializationType type = DEFAULT_SERIALIZATION_TYPE, bool includePipelineDebugging = true) const; Device::Config getDeviceConfig() const; void setCameraTuningBlobPath(const fs::path& path); void setXLinkChunkSize(int sizeBytes); @@ -63,11 +91,6 @@ class PipelineImpl : public std::enable_shared_from_this { void setBoardConfig(BoardConfig board); BoardConfig getBoardConfig() const; - // Access to nodes - std::vector> getAllNodes() const; - std::shared_ptr getNode(Node::Id id) const; - std::vector> getSourceNodes(); - void serialize(PipelineSchema& schema, Assets& assets, std::vector& assetStorage, SerializationType type = DEFAULT_SERIALIZATION_TYPE) const; nlohmann::json serializeToJson(bool includeAssets) const; void remove(std::shared_ptr node); @@ -85,6 +108,9 @@ class PipelineImpl : public std::enable_shared_from_this { bool isHostOnly() const; bool isDeviceOnly() const; + // Pipeline state getters + PipelineStateApi getPipelineState(); + // Must be incremented and unique for each node Node::Id latestId = 0; // Pipeline asset manager @@ -97,6 +123,7 @@ class PipelineImpl : public std::enable_shared_from_this { // using NodeMap = std::unordered_map>; // NodeMap nodeMap; std::vector> nodes; + std::vector> xlinkBridges; // TODO(themarpe) - refactor, connections are now carried by nodes instead using NodeConnectionMap = std::unordered_map>; @@ -108,13 +135,6 @@ class PipelineImpl : public std::enable_shared_from_this { // Board configuration BoardConfig board; - // Record and Replay - RecordConfig recordConfig; - bool enableHolisticRecordReplay = false; - std::unordered_map recordReplayFilenames; - bool removeRecordReplayFiles = true; - std::string defaultDeviceId; - // Output queues std::vector> outputQueues; @@ -288,7 +308,12 @@ class Pipeline { /** * @returns Pipeline schema */ - PipelineSchema getPipelineSchema(SerializationType type = DEFAULT_SERIALIZATION_TYPE) const; + PipelineSchema getPipelineSchema(SerializationType type = DEFAULT_SERIALIZATION_TYPE, bool includePipelineDebugging = true) const; + + /** + * @returns Device pipeline schema (without host only nodes and connections) + */ + PipelineSchema getDevicePipelineSchema(SerializationType type = DEFAULT_SERIALIZATION_TYPE, bool includePipelineDebugging = true) const; // void loadAssets(AssetManager& assetManager); void serialize(PipelineSchema& schema, Assets& assets, std::vector& assetStorage) const { @@ -477,6 +502,10 @@ class Pipeline { void build() { impl()->build(); } + void buildDevice() { + impl()->buildingOnHost = false; + impl()->build(); + } void start() { impl()->start(); } @@ -506,6 +535,12 @@ class Pipeline { /// Record and Replay void enableHolisticRecord(const RecordConfig& config); void enableHolisticReplay(const std::string& pathToRecording); + + /// Pipeline debugging + void enablePipelineDebugging(bool enable = true); + + // Pipeline state getters + PipelineStateApi getPipelineState(); }; } // namespace dai diff --git a/include/depthai/pipeline/PipelineSchema.hpp b/include/depthai/pipeline/PipelineSchema.hpp index b8cd497d0d..a9fee1adaf 100644 --- a/include/depthai/pipeline/PipelineSchema.hpp +++ b/include/depthai/pipeline/PipelineSchema.hpp @@ -14,8 +14,9 @@ struct PipelineSchema { std::vector connections; GlobalProperties globalProperties; std::unordered_map nodes; + std::vector> bridges; }; -DEPTHAI_SERIALIZE_EXT(PipelineSchema, connections, globalProperties, nodes); +DEPTHAI_SERIALIZE_EXT(PipelineSchema, connections, globalProperties, nodes, bridges); } // namespace dai diff --git a/include/depthai/pipeline/PipelineStateApi.hpp b/include/depthai/pipeline/PipelineStateApi.hpp new file mode 100644 index 0000000000..b2d1e61bc5 --- /dev/null +++ b/include/depthai/pipeline/PipelineStateApi.hpp @@ -0,0 +1,97 @@ +#pragma once + +#include "Node.hpp" +#include "depthai/pipeline/datatype/PipelineEventAggregationConfig.hpp" +#include "depthai/pipeline/datatype/PipelineState.hpp" + +namespace dai { + +/** + * pipeline.getState().nodes({nodeId1}).summary() -> std::unordered_map; + * pipeline.getState().nodes({nodeId1}).detailed() -> std::unordered_map; + * pipeline.getState().nodes(nodeId1).detailed() -> NodeState; + * pipeline.getState().nodes({nodeId1}).outputs() -> std::unordered_map; + * pipeline.getState().nodes({nodeId1}).outputs({outputName1}) -> std::unordered_map; + * pipeline.getState().nodes({nodeId1}).outputs(outputName) -> TimingStats; + * pipeline.getState().nodes({nodeId1}).events(); + * pipeline.getState().nodes({nodeId1}).inputs() -> std::unordered_map; + * pipeline.getState().nodes({nodeId1}).inputs({inputName1}) -> std::unordered_map; + * pipeline.getState().nodes({nodeId1}).inputs(inputName) -> QueueState; + * pipeline.getState().nodes({nodeId1}).otherStats() -> std::unordered_map; + * pipeline.getState().nodes({nodeId1}).otherStats({statName1}) -> std::unordered_map; + * pipeline.getState().nodes({nodeId1}).outputs(statName) -> TimingStats; + */ +class NodesStateApi { + std::vector nodeIds; + + std::shared_ptr pipelineStateOut; + std::shared_ptr pipelineStateRequest; + + public: + explicit NodesStateApi(std::vector nodeIds, std::shared_ptr pipelineStateOut, std::shared_ptr pipelineStateRequest) + : nodeIds(std::move(nodeIds)), pipelineStateOut(pipelineStateOut), pipelineStateRequest(pipelineStateRequest) {} + PipelineState summary(); + PipelineState detailed(); + std::unordered_map> outputs(); + std::unordered_map> inputs(); + std::unordered_map> otherTimings(); +}; +class NodeStateApi { + Node::Id nodeId; + + std::shared_ptr pipelineStateOut; + std::shared_ptr pipelineStateRequest; + + public: + explicit NodeStateApi(Node::Id nodeId, std::shared_ptr pipelineStateOut, std::shared_ptr pipelineStateRequest) + : nodeId(nodeId), pipelineStateOut(pipelineStateOut), pipelineStateRequest(pipelineStateRequest) {} + NodeState summary() { + return NodesStateApi({nodeId}, pipelineStateOut, pipelineStateRequest).summary().nodeStates[nodeId]; + } + NodeState detailed() { + return NodesStateApi({nodeId}, pipelineStateOut, pipelineStateRequest).detailed().nodeStates[nodeId]; + } + std::unordered_map outputs() { + return NodesStateApi({nodeId}, pipelineStateOut, pipelineStateRequest).outputs()[nodeId]; + } + std::unordered_map inputs() { + return NodesStateApi({nodeId}, pipelineStateOut, pipelineStateRequest).inputs()[nodeId]; + } + std::unordered_map otherTimings() { + return NodesStateApi({nodeId}, pipelineStateOut, pipelineStateRequest).otherTimings()[nodeId]; + } + std::unordered_map outputs(const std::vector& outputNames); + NodeState::OutputQueueState outputs(const std::string& outputName); + std::vector events(); + std::unordered_map inputs(const std::vector& inputNames); + NodeState::InputQueueState inputs(const std::string& inputName); + std::unordered_map otherTimings(const std::vector& timingNames); + NodeState::Timing otherTimings(const std::string& timingName); +}; +class PipelineStateApi { + std::shared_ptr pipelineStateOut; + std::shared_ptr pipelineStateRequest; + std::vector nodeIds; // empty means all nodes + + public: + PipelineStateApi(std::shared_ptr pipelineStateOut, + std::shared_ptr pipelineStateRequest, + const std::vector>& allNodes) + : pipelineStateOut(std::move(pipelineStateOut)), pipelineStateRequest(std::move(pipelineStateRequest)) { + for(const auto& n : allNodes) { + nodeIds.push_back(n->id); + } + } + NodesStateApi nodes() { + return NodesStateApi(nodeIds, pipelineStateOut, pipelineStateRequest); + } + NodesStateApi nodes(const std::vector& nodeIds) { + return NodesStateApi(nodeIds, pipelineStateOut, pipelineStateRequest); + } + NodeStateApi nodes(Node::Id nodeId) { + return NodeStateApi(nodeId, pipelineStateOut, pipelineStateRequest); + } + void stateAsync(std::function callback, std::optional config = std::nullopt); +}; + +} // namespace dai diff --git a/include/depthai/pipeline/ThreadedNode.hpp b/include/depthai/pipeline/ThreadedNode.hpp index d9657f6a5f..dbc1c20596 100644 --- a/include/depthai/pipeline/ThreadedNode.hpp +++ b/include/depthai/pipeline/ThreadedNode.hpp @@ -9,12 +9,21 @@ namespace dai { class ThreadedNode : public Node { + friend class PipelineImpl; + friend class utility::PipelineImplHelper; + private: JoiningThread thread; AtomicBool running{false}; + protected: + void initPipelineEventDispatcher(int64_t nodeId); + public: + Output pipelineEventOutput{*this, {"pipelineEventOutput", DEFAULT_GROUP, {{{DatatypeEnum::PipelineEvent, false}}}}}; + using Node::Node; + ThreadedNode(); virtual ~ThreadedNode(); @@ -45,6 +54,8 @@ class ThreadedNode : public Node { // check if still running bool isRunning() const; + bool mainLoop(); + /** * @brief Sets the logging severity level for this node. * @@ -59,6 +70,10 @@ class ThreadedNode : public Node { */ virtual dai::LogLevel getLogLevel() const; + utility::PipelineEventDispatcherInterface::BlockPipelineEvent inputBlockEvent(); + utility::PipelineEventDispatcherInterface::BlockPipelineEvent outputBlockEvent(); + utility::PipelineEventDispatcherInterface::BlockPipelineEvent blockEvent(PipelineEvent::Type type, const std::string& source); + class Impl; spimpl::impl_ptr pimpl; }; diff --git a/include/depthai/pipeline/datatype/DatatypeEnum.hpp b/include/depthai/pipeline/datatype/DatatypeEnum.hpp index 949d7bd631..aaaf0fe1ca 100644 --- a/include/depthai/pipeline/datatype/DatatypeEnum.hpp +++ b/include/depthai/pipeline/datatype/DatatypeEnum.hpp @@ -44,6 +44,9 @@ enum class DatatypeEnum : std::int32_t { DynamicCalibrationResult, CalibrationQuality, CoverageData, + PipelineEvent, + PipelineState, + PipelineEventAggregationConfig, }; bool isDatatypeSubclassOf(DatatypeEnum parent, DatatypeEnum children); diff --git a/include/depthai/pipeline/datatype/PipelineEvent.hpp b/include/depthai/pipeline/datatype/PipelineEvent.hpp new file mode 100644 index 0000000000..69168f317a --- /dev/null +++ b/include/depthai/pipeline/datatype/PipelineEvent.hpp @@ -0,0 +1,43 @@ +#pragma once + +#include + +#include "depthai/common/optional.hpp" +#include "depthai/pipeline/datatype/Buffer.hpp" + +namespace dai { + +/** + * Pipeline event message. + */ +class PipelineEvent : public Buffer { + public: + enum class Type : std::int32_t { + CUSTOM = 0, + LOOP = 1, + INPUT = 2, + OUTPUT = 3, + INPUT_BLOCK = 4, + OUTPUT_BLOCK = 5, + }; + enum class Interval : std::int32_t { NONE = 0, START = 1, END = 2 }; + + PipelineEvent() = default; + virtual ~PipelineEvent(); + + int64_t nodeId = -1; + int32_t status = 0; + std::optional queueSize; + Interval interval = Interval::NONE; + Type type = Type::CUSTOM; + std::string source; + + void serialize(std::vector& metadata, DatatypeEnum& datatype) const override { + metadata = utility::serialize(*this); + datatype = DatatypeEnum::PipelineEvent; + }; + + DEPTHAI_SERIALIZE(PipelineEvent, Buffer::ts, Buffer::tsDevice, Buffer::sequenceNum, nodeId, status, queueSize, interval, type, source); +}; + +} // namespace dai diff --git a/include/depthai/pipeline/datatype/PipelineEventAggregationConfig.hpp b/include/depthai/pipeline/datatype/PipelineEventAggregationConfig.hpp new file mode 100644 index 0000000000..ede06195e2 --- /dev/null +++ b/include/depthai/pipeline/datatype/PipelineEventAggregationConfig.hpp @@ -0,0 +1,36 @@ +#pragma once +#include +#include + +#include "depthai/common/optional.hpp" +#include "depthai/pipeline/datatype/Buffer.hpp" +#include "depthai/pipeline/datatype/DatatypeEnum.hpp" + +namespace dai { + +class NodeEventAggregationConfig { + public: + int64_t nodeId = -1; + std::optional> inputs; + std::optional> outputs; + std::optional> others; + bool events = false; + + DEPTHAI_SERIALIZE(NodeEventAggregationConfig, nodeId, inputs, outputs, others, events); +}; + +/// PipelineEventAggregationConfig configuration structure +class PipelineEventAggregationConfig : public Buffer { + public: + std::vector nodes; + std::optional repeatIntervalSeconds = std::nullopt; // Keep sending the aggregated state without waiting for new config + + PipelineEventAggregationConfig() = default; + virtual ~PipelineEventAggregationConfig(); + + void serialize(std::vector& metadata, DatatypeEnum& datatype) const override; + + DEPTHAI_SERIALIZE(PipelineEventAggregationConfig, Buffer::ts, Buffer::tsDevice, Buffer::sequenceNum, nodes, repeatIntervalSeconds); +}; + +} // namespace dai diff --git a/include/depthai/pipeline/datatype/PipelineState.hpp b/include/depthai/pipeline/datatype/PipelineState.hpp new file mode 100644 index 0000000000..8ffc2af2d2 --- /dev/null +++ b/include/depthai/pipeline/datatype/PipelineState.hpp @@ -0,0 +1,120 @@ +#pragma once + +#include + +#include "depthai/pipeline/datatype/Buffer.hpp" +#include "depthai/pipeline/datatype/PipelineEvent.hpp" + +namespace dai { + +class NodeState { + public: + struct DurationEvent { + PipelineEvent startEvent; + uint64_t durationUs; + DEPTHAI_SERIALIZE(DurationEvent, startEvent, durationUs); + }; + struct TimingStats { + uint64_t minMicros = -1; + uint64_t maxMicros = 0; + uint64_t averageMicrosRecent = 0; + uint64_t stdDevMicrosRecent = 0; + uint64_t minMicrosRecent = -1; + uint64_t maxMicrosRecent = 0; + uint64_t medianMicrosRecent = 0; + DEPTHAI_SERIALIZE(TimingStats, minMicros, maxMicros, averageMicrosRecent, stdDevMicrosRecent, minMicrosRecent, maxMicrosRecent, medianMicrosRecent); + }; + struct Timing { + float fps = 0.0f; + TimingStats durationStats; + + bool isValid() const { + return durationStats.minMicros <= durationStats.maxMicros; + } + + DEPTHAI_SERIALIZE(Timing, fps, durationStats); + }; + struct QueueStats { + uint32_t maxQueued = 0; + uint32_t minQueuedRecent = 0; + uint32_t maxQueuedRecent = 0; + uint32_t medianQueuedRecent = 0; + DEPTHAI_SERIALIZE(QueueStats, maxQueued, minQueuedRecent, maxQueuedRecent, medianQueuedRecent); + }; + struct InputQueueState { + // Current state of the input queue. + enum class State : std::int32_t { + IDLE = 0, + WAITING = 1, // Waiting to receive a message + BLOCKED = 2 // An output attempted to send to this input, but the input queue was full + } state = State::IDLE; + // Number of messages currently queued in the input queue + uint32_t numQueued = 0; + // Timing info about this input + Timing timing; + // Queue usage stats + QueueStats queueStats; + + bool isValid() const { + return timing.isValid(); + } + + DEPTHAI_SERIALIZE(InputQueueState, state, numQueued, timing, queueStats); + }; + struct OutputQueueState { + // Current state of the output queue. Send should ideally be instant. This is not the case when the input queue is full. + // In that case, the state will be SENDING until there is space in the input queue (unless trySend is used). + enum class State : std::int32_t { IDLE = 0, SENDING = 1 } state = State::IDLE; + // Timing info about this output + Timing timing; + + bool isValid() const { + return timing.isValid(); + } + + DEPTHAI_SERIALIZE(OutputQueueState, state, timing); + }; + enum class State : std::int32_t { IDLE = 0, GETTING_INPUTS = 1, PROCESSING = 2, SENDING_OUTPUTS = 3 }; + + // Current state of the node - idle only when not running + State state = State::IDLE; + // Optional list of recent events + std::vector events; + // Info about each output + std::unordered_map outputStates; + // Info about each input + std::unordered_map inputStates; + // Time spent getting inputs in a loop + Timing inputsGetTiming; + // Time spent sending outputs in a loop + Timing outputsSendTiming; + // Main node loop timing (processing time + inputs get + outputs send) + Timing mainLoopTiming; + // Other timings that the developer of the node decided to add + std::unordered_map otherTimings; + + DEPTHAI_SERIALIZE(NodeState, state, events, outputStates, inputStates, inputsGetTiming, outputsSendTiming, mainLoopTiming, otherTimings); +}; + +/** + * Pipeline event message. + */ +class PipelineState : public Buffer { + public: + PipelineState() = default; + virtual ~PipelineState(); + + std::unordered_map nodeStates; + uint32_t configSequenceNum = 0; + + void serialize(std::vector& metadata, DatatypeEnum& datatype) const override { + metadata = utility::serialize(*this); + datatype = DatatypeEnum::PipelineState; + }; + + nlohmann::json toJson() const; + + DEPTHAI_SERIALIZE(PipelineState, Buffer::ts, Buffer::tsDevice, Buffer::sequenceNum, nodeStates, configSequenceNum); +}; + +} // namespace dai diff --git a/include/depthai/pipeline/node/internal/PipelineEventAggregation.hpp b/include/depthai/pipeline/node/internal/PipelineEventAggregation.hpp new file mode 100644 index 0000000000..dada66dcd7 --- /dev/null +++ b/include/depthai/pipeline/node/internal/PipelineEventAggregation.hpp @@ -0,0 +1,61 @@ +#pragma once + +#include + +// shared +#include + +namespace dai { +namespace node { +namespace internal { + +/** + * @brief Sync node. Performs syncing between image frames + */ +class PipelineEventAggregation : public DeviceNodeCRTP, public HostRunnable { + private: + bool runOnHostVar = false; + + public: + constexpr static const char* NAME = "PipelineEventAggregation"; + using DeviceNodeCRTP::DeviceNodeCRTP; + + /** + * A map of inputs + */ + InputMap inputs{*this, "inputs", {DEFAULT_NAME, DEFAULT_GROUP, false, 32, {{{DatatypeEnum::PipelineEvent, false}}}, DEFAULT_WAIT_FOR_MESSAGE}}; + + /** + * Input PipelineEventAggregationConfig message with state request parameters + */ + Input request{*this, {"request", DEFAULT_GROUP, DEFAULT_BLOCKING, DEFAULT_QUEUE_SIZE, {{{DatatypeEnum::PipelineEventAggregationConfig, false}}}, false}}; + + /** + * Output message of type PipelineState + */ + Output out{*this, {"out", DEFAULT_GROUP, {{{DatatypeEnum::PipelineState, false}}}}}; + + /** + * Continuous output message of type PipelineState + */ + Output outTrace{*this, {"outTrace", DEFAULT_GROUP, {{{DatatypeEnum::PipelineState, false}}}}}; + + /** + * Specify whether to run on host or device + * By default, the node will run on device. + */ + void setRunOnHost(bool runOnHost); + + /** + * Check if the node is set to run on host + */ + bool runOnHost() const override; + + void run() override; + + PipelineEventAggregation& setTraceOutput(bool enable); +}; + +} // namespace internal +} // namespace node +} // namespace dai diff --git a/include/depthai/pipeline/node/internal/PipelineStateMerge.hpp b/include/depthai/pipeline/node/internal/PipelineStateMerge.hpp new file mode 100644 index 0000000000..ac08bf07db --- /dev/null +++ b/include/depthai/pipeline/node/internal/PipelineStateMerge.hpp @@ -0,0 +1,46 @@ +#pragma once + +#include "depthai/pipeline/ThreadedHostNode.hpp" + +namespace dai { +namespace node { + +/** + * @brief PipelineStateMerge node. Merges PipelineState messages from device and host into a single output. + */ +class PipelineStateMerge : public CustomThreadedNode { + bool hasDeviceNodes = false; + bool hasHostNodes = false; + + bool allowReconfiguration = true; + + public: + constexpr static const char* NAME = "PipelineStateMerge"; + + Input inputDevice{*this, {"inputDevice", DEFAULT_GROUP, false, 4, {{DatatypeEnum::PipelineState, false}}}}; + Input inputHost{*this, {"inputHost", DEFAULT_GROUP, false, 4, {{DatatypeEnum::PipelineState, false}}}}; + + /** + * Input PipelineEventAggregationConfig message with state request parameters + */ + Input request{*this, {"request", DEFAULT_GROUP, DEFAULT_BLOCKING, DEFAULT_QUEUE_SIZE, {{{DatatypeEnum::PipelineEventAggregationConfig, false}}}, false}}; + + /** + * Output PipelineEventAggregationConfig message with state request parameters + */ + Output outRequest{*this, {"outRequest", DEFAULT_GROUP, {{{DatatypeEnum::PipelineEventAggregationConfig, false}}}}}; + + /** + * Output message of type + */ + Output out{*this, {"out", DEFAULT_GROUP, {{{DatatypeEnum::PipelineState, false}}}}}; + + std::shared_ptr build(bool hasDeviceNodes, bool hasHostNodes); + + PipelineStateMerge& setAllowConfiguration(bool allow); + + void run() override; +}; + +} // namespace node +} // namespace dai diff --git a/include/depthai/properties/internal/PipelineEventAggregationProperties.hpp b/include/depthai/properties/internal/PipelineEventAggregationProperties.hpp new file mode 100644 index 0000000000..bbca333f79 --- /dev/null +++ b/include/depthai/properties/internal/PipelineEventAggregationProperties.hpp @@ -0,0 +1,22 @@ +#pragma once + +#include "depthai/properties/Properties.hpp" + +namespace dai { + +/** + * Specify properties for Sync. + */ +struct PipelineEventAggregationProperties : PropertiesSerializable { + uint32_t aggregationWindowSize = 100; + uint32_t statsUpdateIntervalMs = 1000; + uint32_t eventWaitWindow = 16; + // Enables traceOut output. Will use the first received repeating configuration. + bool traceOutput = false; + + ~PipelineEventAggregationProperties() override; +}; + +DEPTHAI_SERIALIZE_EXT(PipelineEventAggregationProperties, aggregationWindowSize, statsUpdateIntervalMs, eventWaitWindow, traceOutput); + +} // namespace dai diff --git a/include/depthai/remote_connection/RemoteConnection.hpp b/include/depthai/remote_connection/RemoteConnection.hpp index 636a3cc9b9..45332eb14f 100644 --- a/include/depthai/remote_connection/RemoteConnection.hpp +++ b/include/depthai/remote_connection/RemoteConnection.hpp @@ -77,7 +77,7 @@ class RemoteConnection { * * @param pipeline The pipeline to register. */ - void registerPipeline(const Pipeline& pipeline); + void registerPipeline(Pipeline& pipeline); /** * @brief Waits for a key event. diff --git a/include/depthai/utility/CircularBuffer.hpp b/include/depthai/utility/CircularBuffer.hpp new file mode 100644 index 0000000000..580630b886 --- /dev/null +++ b/include/depthai/utility/CircularBuffer.hpp @@ -0,0 +1,168 @@ +#pragma once +#include +#include + +namespace dai { +namespace utility { + +template +class CircularBuffer { + std::vector buffer; + size_t maxSize; + size_t index = 0; + + size_t start() const { + return (buffer.size() < maxSize) ? 0 : index; + } + + public: + CircularBuffer(size_t size) : maxSize(size) { + buffer.reserve(size); + } + T& add(T value) { + if(buffer.size() < maxSize) { + buffer.push_back(value); + return buffer.back(); + } else { + buffer[index] = value; + index = (index + 1) % maxSize; + return buffer[(index + maxSize - 1) % maxSize]; + } + } + std::vector getBuffer() const { + std::vector result; + if(buffer.size() < maxSize) { + result = buffer; + } else { + result.insert(result.end(), buffer.begin() + index, buffer.end()); + result.insert(result.end(), buffer.begin(), buffer.begin() + index); + } + return result; + } + T first() const { + if(buffer.empty()) { + throw std::runtime_error("CircularBuffer is empty"); + } + if(buffer.size() < maxSize) { + return buffer.front(); + } else { + return buffer[index]; + } + } + T last() const { + if(buffer.empty()) { + throw std::runtime_error("CircularBuffer is empty"); + } + if(buffer.size() < maxSize) { + return buffer.back(); + } else { + return buffer[(index + maxSize - 1) % maxSize]; + } + } + size_t size() const { + return buffer.size(); + } + + // =========================================================== + // Forward iterator + // =========================================================== + class iterator { + public: + using iterator_category = std::forward_iterator_tag; + using value_type = T; + using difference_type = std::ptrdiff_t; + using pointer = T*; + using reference = T&; + + iterator(CircularBuffer* parent, size_t pos) : parent(parent), pos(pos) {} + + reference operator*() { + return parent->buffer[(parent->start() + pos) % parent->buffer.size()]; + } + pointer operator->() { + return &(**this); + } + + iterator& operator++() { + ++pos; + return *this; + } + iterator operator++(int) { + iterator tmp = *this; + ++(*this); + return tmp; + } + + bool operator==(const iterator& other) const { + return parent == other.parent && pos == other.pos; + } + bool operator!=(const iterator& other) const { + return !(*this == other); + } + + private: + CircularBuffer* parent; + size_t pos; + }; + + iterator begin() { + return iterator(this, 0); + } + iterator end() { + return iterator(this, buffer.size()); + } + + // =========================================================== + // Reverse iterator + // =========================================================== + class reverse_iterator { + public: + using iterator_category = std::bidirectional_iterator_tag; + using value_type = T; + using difference_type = std::ptrdiff_t; + using pointer = T*; + using reference = T&; + + reverse_iterator(CircularBuffer* parent, size_t pos) : parent(parent), pos(pos) {} + + reference operator*() { + // Map reverse position to logical order + size_t logicalIndex = (parent->start() + parent->buffer.size() - 1 - pos) % parent->buffer.size(); + return parent->buffer[logicalIndex]; + } + pointer operator->() { + return &(**this); + } + + reverse_iterator& operator++() { + ++pos; + return *this; + } + reverse_iterator operator++(int) { + reverse_iterator tmp = *this; + ++(*this); + return tmp; + } + + bool operator==(const reverse_iterator& other) const { + return parent == other.parent && pos == other.pos; + } + bool operator!=(const reverse_iterator& other) const { + return !(*this == other); + } + + private: + CircularBuffer* parent; + size_t pos; + }; + + reverse_iterator rbegin() { + return reverse_iterator(this, 0); + } + reverse_iterator rend() { + return reverse_iterator(this, buffer.size()); + } +}; + +} // namespace utility +} // namespace dai diff --git a/include/depthai/utility/ImageManipImpl.hpp b/include/depthai/utility/ImageManipImpl.hpp index 505e0e09c1..c8cd81cbc5 100644 --- a/include/depthai/utility/ImageManipImpl.hpp +++ b/include/depthai/utility/ImageManipImpl.hpp @@ -59,49 +59,53 @@ void loop(N& node, std::shared_ptr inImage; - while(node.isRunning()) { + while(node.mainLoop()) { std::shared_ptr pConfig; bool hasConfig = false; bool needsImage = true; bool skipImage = false; - if(node.inputConfig.getWaitForMessage()) { - pConfig = node.inputConfig.template get(); - hasConfig = true; - if(inImage != nullptr && hasConfig && pConfig->getReusePreviousImage()) { - needsImage = false; - } - skipImage = pConfig->getSkipCurrentImage(); - } else { - pConfig = node.inputConfig.template tryGet(); - if(pConfig != nullptr) { - hasConfig = true; - } - } + { + auto blockEvent = node.inputBlockEvent(); - if(needsImage) { - inImage = node.inputImage.template get(); - if(inImage == nullptr) { - logger->warn("No input image, skipping frame"); - continue; - } - if(!hasConfig) { - auto _pConfig = node.inputConfig.template tryGet(); - if(_pConfig != nullptr) { - pConfig = _pConfig; + if(node.inputConfig.getWaitForMessage()) { + pConfig = node.inputConfig.template get(); + hasConfig = true; + if(inImage != nullptr && hasConfig && pConfig->getReusePreviousImage()) { + needsImage = false; + } + skipImage = pConfig->getSkipCurrentImage(); + } else { + pConfig = node.inputConfig.template tryGet(); + if(pConfig != nullptr) { hasConfig = true; } } - if(skipImage) { - continue; + + if(needsImage) { + inImage = node.inputImage.template get(); + if(inImage == nullptr) { + logger->warn("No input image, skipping frame"); + continue; + } + if(!hasConfig) { + auto _pConfig = node.inputConfig.template tryGet(); + if(_pConfig != nullptr) { + pConfig = _pConfig; + hasConfig = true; + } + } + if(skipImage) { + continue; + } } - } - // if has new config, parse and check if any changes - if(hasConfig) { - config = *pConfig; - } - if(!node.inputConfig.getWaitForMessage() && config.getReusePreviousImage()) { - logger->warn("reusePreviousImage is only taken into account when inputConfig is synchronous"); + // if has new config, parse and check if any changes + if(hasConfig) { + config = *pConfig; + } + if(!node.inputConfig.getWaitForMessage() && config.getReusePreviousImage()) { + logger->warn("reusePreviousImage is only taken into account when inputConfig is synchronous"); + } } auto startP = std::chrono::steady_clock::now(); @@ -135,7 +139,11 @@ void loop(N& node, if(!success) { logger->error("Processing failed, potentially unsupported config"); } - node.out.send(outImage); + { + auto blockEvent = node.outputBlockEvent(); + + node.out.send(outImage); + } } else { logger->error( "Output image is bigger ({}B) than maximum frame size specified in properties ({}B) - skipping frame.\nPlease use the setMaxOutputFrameSize " @@ -457,8 +465,7 @@ class ColorChange { template