diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3096bd2..82cb71d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -21,6 +21,10 @@ on: branches: - main +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + jobs: package_and_upload_all_check: uses: libhal/ci/.github/workflows/package_and_upload_all.yml@5.x.y diff --git a/CMakeLists.txt b/CMakeLists.txt index 8137bea..44d8029 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -109,7 +109,7 @@ endif() # Benchmarking # ============================================================================== -if(FALSE) +if(TRUE) if(CMAKE_CROSSCOMPILING) message(STATUS "Cross compiling, skipping benchmarks") else() diff --git a/modules/async_context.cppm b/modules/async_context.cppm index 42ae6e2..1b525b0 100644 --- a/modules/async_context.cppm +++ b/modules/async_context.cppm @@ -108,7 +108,7 @@ export class operation_cancelled : public std::exception { [[nodiscard]] char const* what() const noexcept override { - return "An async::context ran out of memory!"; + return "This future has been cancelled!"; } }; @@ -217,6 +217,11 @@ public: } } + constexpr bool done() + { + return m_active_handle == std::noop_coroutine(); + } + void resume() { m_active_handle.resume(); @@ -291,7 +296,7 @@ private: #if DEBUGGING std::println("💾 Allocating {} words, current stack {}, new stack {}, " "stack pointer member address: 0x{:x}", - words_needed, + words_to_allocate, static_cast(m_stack_pointer), static_cast(new_stack_index), *m_stack_pointer); @@ -822,15 +827,16 @@ public: return m_operation.m_state.index() >= 1; } + template std::coroutine_handle<> await_suspend( - full_handle_type p_calling_coroutine) noexcept + std::coroutine_handle> p_calling_coroutine) noexcept { // This will not throw because the discriminate check was performed in // `await_ready()` via the done() function. `done()` checks if the state // is `handle_type` and if it is, it returns false, causing the code to // call await_suspend(). auto handle = std::get(m_operation.m_state); - full_handle_type::from_address(handle.address()) + std::coroutine_handle>::from_address(handle.address()) .promise() .continuation(p_calling_coroutine); return handle; diff --git a/test_package/conanfile.py b/test_package/conanfile.py index e323cf4..c76718b 100644 --- a/test_package/conanfile.py +++ b/test_package/conanfile.py @@ -47,6 +47,8 @@ def build(self): cmake.build() def test(self): - if not cross_building(self): + UNSUPPORTED_SYSTEM = (str(self.settings.arch) == "x86_64" and + str(self.settings.os) == "Linux") + if not cross_building(self) and not UNSUPPORTED_SYSTEM: bin_path = Path(self.cpp.build.bindirs[0]) / "test_package" self.run(bin_path.absolute(), env="conanrun") diff --git a/test_package/main.cpp b/test_package/main.cpp index b3267e4..dfff8fe 100644 --- a/test_package/main.cpp +++ b/test_package/main.cpp @@ -11,67 +11,149 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. - #include #include #include -#include +#include #include #include +#include + +#if not __ARM_EABI__ +#include +#endif import async_context; +struct round_robin_scheduler +{ + bool run_until_all_done(int p_iterations = 100) + { + for (int i = 0; i < p_iterations; i++) { + bool all_done = true; + for (auto const& ctx : contexts) { + if (not ctx->done()) { + all_done = false; + if (ctx->state() == async::blocked_by::nothing) { + ctx->resume(); + } + } + } + if (all_done) { + return true; + } + } + return false; + } + std::vector contexts{}; +}; + struct test_context : public async::context { std::array m_stack{}; - int sleep_count = 0; - test_context() + // NOTE: the scheduler isn't used in this example, but if it were used, it + // would be called within the do_schedule function. For example, rather than + // performing a sleep that blocks the whole thread, the context and its time + // duration could be passed to an API on the "round_robin_scheduler", that + // makes it aware of the time schedule requirements. + std::shared_ptr scheduler; + + test_context(test_context const&) = delete; + test_context& operator=(test_context const&) = delete; + test_context(test_context&&) = delete; + test_context& operator=(test_context&&) = delete; + + test_context(std::shared_ptr const& p_scheduler) + : scheduler(p_scheduler) { + scheduler->contexts.push_back(this); this->initialize_stack_memory(m_stack); } private: - void do_schedule(async::blocked_by p_blocked_state, - async::block_info) noexcept override + void do_schedule( + async::blocked_by p_blocked_state, + [[maybe_unused]] async::block_info p_block_info) noexcept override { - if (p_blocked_state == async::blocked_by::time) { - sleep_count++; + // Simulate I/O completion - immediately unblock + if (p_blocked_state == async::blocked_by::io) { + this->unblock_without_notification(); + } else if (p_blocked_state == async::blocked_by::time) { +#if not __ARM_EABI__ + if (auto* time = std::get_if(&p_block_info)) { + // Just block this thread vs doing something smart + std::this_thread::sleep_for(*time); + } +#endif + this->unblock_without_notification(); } } }; -async::future coro_double_delay(async::context&) +// Simulates reading sensor data with I/O delay +async::future read_sensor(async::context& ctx, std::string_view p_name) { using namespace std::chrono_literals; - std::println("Delay for 500ms"); - co_await 500ms; - std::println("Delay for another 500ms"); - co_await 500ms; - std::println("Returning!"); - co_return; + std::println(" ['{}': Sensor] Starting read...", p_name); + co_await ctx.block_by_io(); // Simulate I/O operation + std::println(" ['{}': Sensor] Read complete: 42", p_name); + co_return 42; } -int main() +// Processes data with computation delay +async::future process_data(async::context& ctx, + std::string_view p_name, + int value) { - test_context ctx; + using namespace std::chrono_literals; + std::println(" ['{}': Process] Processing {}...", p_name, value); + co_await 10ms; // Simulate processing time + int result = value * 2; + std::println(" ['{}': Process] Result: {}", p_name, result); + co_return result; +} - auto future_delay = coro_double_delay(ctx); +// Writes result with I/O delay +async::future write_actuator(async::context& ctx, + std::string_view p_name, + int value) +{ + std::println(" ['{}': Actuator] Writing {}...", p_name, value); + co_await ctx.block_by_io(); + std::println(" ['{}': Actuator] Write complete!", p_name); +} - assert(not future_delay.done()); +// Coordinates the full pipeline +async::future sensor_pipeline(async::context& ctx, + std::string_view p_name) +{ + std::println("Pipeline '{}' starting...", p_name); - future_delay.resume(); + int sensor_value = co_await read_sensor(ctx, p_name); + int processed = co_await process_data(ctx, p_name, sensor_value); + co_await write_actuator(ctx, p_name, processed); - assert(ctx.sleep_count == 1); + std::println("Pipeline '{}' complete!\n", p_name); +} + +int main() +{ + auto scheduler = std::make_shared(); - future_delay.resume(); + // Create context and add them to the scheduler + test_context ctx1(scheduler); + test_context ctx2(scheduler); - assert(ctx.sleep_count == 2); - assert(not future_delay.done()); + // Run two independent pipelines concurrently + auto pipeline1 = sensor_pipeline(ctx1, "System 1"); + auto pipeline2 = sensor_pipeline(ctx2, "System 2"); - future_delay.resume(); + scheduler->run_until_all_done(); - assert(future_delay.done()); + assert(pipeline1.done()); + assert(pipeline2.done()); + std::println("Both pipelines completed successfully!"); return 0; } diff --git a/tests/async.test.cpp b/tests/async.test.cpp index 6a16a0f..f43f731 100644 --- a/tests/async.test.cpp +++ b/tests/async.test.cpp @@ -197,7 +197,7 @@ void async_context_suite() std::println("Executing 'single_resource' coroutine"); while (io_in_use) { - // For some reason this segfaults on Linux + // TODO(#44): For some reason this segfaults on Linux // std::println("Resource unavailable, blocked by {}", // io_in_use.address()); co_await io_in_use.set_as_block_by_sync(p_context); @@ -572,7 +572,7 @@ void async_context_suite() auto b = [&suspension_count](async::context&) -> future { while (suspension_count < expected_suspensions) { suspension_count++; - // For some reason this segfaults on Linux + // TODO(#44): For some reason this segfaults on Linux // std::println("p_suspend_count = {}!", suspension_count); co_await std::suspend_always{}; } @@ -636,7 +636,7 @@ void async_context_suite() suspension_count = 0; while (suspension_count < expected_suspensions) { suspension_count++; - // For some reason this segfaults on Linux + // TODO(#44): For some reason this segfaults on Linux // std::println("p_suspend_count = {}!", suspension_count); co_await std::suspend_always{}; }