Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ on:
branches:
- main

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

jobs:
package_and_upload_all_check:
uses: libhal/ci/.github/workflows/package_and_upload_all.yml@5.x.y
Expand Down
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ endif()
# Benchmarking
# ==============================================================================

if(FALSE)
if(TRUE)
if(CMAKE_CROSSCOMPILING)
message(STATUS "Cross compiling, skipping benchmarks")
else()
Expand Down
14 changes: 10 additions & 4 deletions modules/async_context.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ export class operation_cancelled : public std::exception
{
[[nodiscard]] char const* what() const noexcept override
{
return "An async::context ran out of memory!";
return "This future has been cancelled!";
}
};

Expand Down Expand Up @@ -217,6 +217,11 @@ public:
}
}

constexpr bool done()
{
return m_active_handle == std::noop_coroutine();
}

void resume()
{
m_active_handle.resume();
Expand Down Expand Up @@ -291,7 +296,7 @@ private:
#if DEBUGGING
std::println("💾 Allocating {} words, current stack {}, new stack {}, "
"stack pointer member address: 0x{:x}",
words_needed,
words_to_allocate,
static_cast<void*>(m_stack_pointer),
static_cast<void*>(new_stack_index),
*m_stack_pointer);
Expand Down Expand Up @@ -822,15 +827,16 @@ public:
return m_operation.m_state.index() >= 1;
}

template<typename U>
std::coroutine_handle<> await_suspend(
full_handle_type p_calling_coroutine) noexcept
std::coroutine_handle<promise<U>> p_calling_coroutine) noexcept
{
// This will not throw because the discriminate check was performed in
// `await_ready()` via the done() function. `done()` checks if the state
// is `handle_type` and if it is, it returns false, causing the code to
// call await_suspend().
auto handle = std::get<handle_type>(m_operation.m_state);
full_handle_type::from_address(handle.address())
std::coroutine_handle<promise<U>>::from_address(handle.address())
.promise()
.continuation(p_calling_coroutine);
return handle;
Expand Down
4 changes: 3 additions & 1 deletion test_package/conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ def build(self):
cmake.build()

def test(self):
if not cross_building(self):
UNSUPPORTED_SYSTEM = (str(self.settings.arch) == "x86_64" and
str(self.settings.os) == "Linux")
if not cross_building(self) and not UNSUPPORTED_SYSTEM:
bin_path = Path(self.cpp.build.bindirs[0]) / "test_package"
self.run(bin_path.absolute(), env="conanrun")
134 changes: 108 additions & 26 deletions test_package/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,67 +11,149 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <cassert>

#include <chrono>
#include <coroutine>
#include <memory_resource>
#include <memory>
#include <print>
#include <variant>
#include <vector>

#if not __ARM_EABI__
#include <thread>
#endif

import async_context;

struct round_robin_scheduler
{
bool run_until_all_done(int p_iterations = 100)
{
for (int i = 0; i < p_iterations; i++) {
bool all_done = true;
for (auto const& ctx : contexts) {
if (not ctx->done()) {
all_done = false;
if (ctx->state() == async::blocked_by::nothing) {
ctx->resume();
}
}
}
if (all_done) {
return true;
}
}
return false;
}
std::vector<async::context*> contexts{};
};

struct test_context : public async::context
{
std::array<async::uptr, 8192> m_stack{};
int sleep_count = 0;
test_context()
// NOTE: the scheduler isn't used in this example, but if it were used, it
// would be called within the do_schedule function. For example, rather than
// performing a sleep that blocks the whole thread, the context and its time
// duration could be passed to an API on the "round_robin_scheduler", that
// makes it aware of the time schedule requirements.
std::shared_ptr<round_robin_scheduler> scheduler;

test_context(test_context const&) = delete;
test_context& operator=(test_context const&) = delete;
test_context(test_context&&) = delete;
test_context& operator=(test_context&&) = delete;

test_context(std::shared_ptr<round_robin_scheduler> const& p_scheduler)
: scheduler(p_scheduler)
{
scheduler->contexts.push_back(this);
this->initialize_stack_memory(m_stack);
}

private:
void do_schedule(async::blocked_by p_blocked_state,
async::block_info) noexcept override
void do_schedule(
async::blocked_by p_blocked_state,
[[maybe_unused]] async::block_info p_block_info) noexcept override
{
if (p_blocked_state == async::blocked_by::time) {
sleep_count++;
// Simulate I/O completion - immediately unblock
if (p_blocked_state == async::blocked_by::io) {
this->unblock_without_notification();
} else if (p_blocked_state == async::blocked_by::time) {
#if not __ARM_EABI__
if (auto* time = std::get_if<std::chrono::nanoseconds>(&p_block_info)) {
// Just block this thread vs doing something smart
std::this_thread::sleep_for(*time);
}
#endif
this->unblock_without_notification();
}
}
};

async::future<void> coro_double_delay(async::context&)
// Simulates reading sensor data with I/O delay
async::future<int> read_sensor(async::context& ctx, std::string_view p_name)
{
using namespace std::chrono_literals;
std::println("Delay for 500ms");
co_await 500ms;
std::println("Delay for another 500ms");
co_await 500ms;
std::println("Returning!");
co_return;
std::println(" ['{}': Sensor] Starting read...", p_name);
co_await ctx.block_by_io(); // Simulate I/O operation
std::println(" ['{}': Sensor] Read complete: 42", p_name);
co_return 42;
}

int main()
// Processes data with computation delay
async::future<int> process_data(async::context& ctx,
std::string_view p_name,
int value)
{
test_context ctx;
using namespace std::chrono_literals;
std::println(" ['{}': Process] Processing {}...", p_name, value);
co_await 10ms; // Simulate processing time
int result = value * 2;
std::println(" ['{}': Process] Result: {}", p_name, result);
co_return result;
}

auto future_delay = coro_double_delay(ctx);
// Writes result with I/O delay
async::future<void> write_actuator(async::context& ctx,
std::string_view p_name,
int value)
{
std::println(" ['{}': Actuator] Writing {}...", p_name, value);
co_await ctx.block_by_io();
std::println(" ['{}': Actuator] Write complete!", p_name);
}

assert(not future_delay.done());
// Coordinates the full pipeline
async::future<void> sensor_pipeline(async::context& ctx,
std::string_view p_name)
{
std::println("Pipeline '{}' starting...", p_name);

future_delay.resume();
int sensor_value = co_await read_sensor(ctx, p_name);
int processed = co_await process_data(ctx, p_name, sensor_value);
co_await write_actuator(ctx, p_name, processed);

assert(ctx.sleep_count == 1);
std::println("Pipeline '{}' complete!\n", p_name);
}

int main()
{
auto scheduler = std::make_shared<round_robin_scheduler>();

future_delay.resume();
// Create context and add them to the scheduler
test_context ctx1(scheduler);
test_context ctx2(scheduler);

assert(ctx.sleep_count == 2);
assert(not future_delay.done());
// Run two independent pipelines concurrently
auto pipeline1 = sensor_pipeline(ctx1, "System 1");
auto pipeline2 = sensor_pipeline(ctx2, "System 2");

future_delay.resume();
scheduler->run_until_all_done();

assert(future_delay.done());
assert(pipeline1.done());
assert(pipeline2.done());

std::println("Both pipelines completed successfully!");
return 0;
}
6 changes: 3 additions & 3 deletions tests/async.test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ void async_context_suite()

std::println("Executing 'single_resource' coroutine");
while (io_in_use) {
// For some reason this segfaults on Linux
// TODO(#44): For some reason this segfaults on Linux
// std::println("Resource unavailable, blocked by {}",
// io_in_use.address());
co_await io_in_use.set_as_block_by_sync(p_context);
Expand Down Expand Up @@ -572,7 +572,7 @@ void async_context_suite()
auto b = [&suspension_count](async::context&) -> future<int> {
while (suspension_count < expected_suspensions) {
suspension_count++;
// For some reason this segfaults on Linux
// TODO(#44): For some reason this segfaults on Linux
// std::println("p_suspend_count = {}!", suspension_count);
co_await std::suspend_always{};
}
Expand Down Expand Up @@ -636,7 +636,7 @@ void async_context_suite()
suspension_count = 0;
while (suspension_count < expected_suspensions) {
suspension_count++;
// For some reason this segfaults on Linux
// TODO(#44): For some reason this segfaults on Linux
// std::println("p_suspend_count = {}!", suspension_count);
co_await std::suspend_always{};
}
Expand Down