diff --git a/include/exec/__detail/__basic_sequence.hpp b/include/exec/__detail/__basic_sequence.hpp index f50fbc134..b31ddf615 100644 --- a/include/exec/__detail/__basic_sequence.hpp +++ b/include/exec/__detail/__basic_sequence.hpp @@ -21,6 +21,10 @@ #include "../../stdexec/__detail/__basic_sender.hpp" #include "../sequence_senders.hpp" +#include "../../stdexec/__detail/__completion_signatures.hpp" +#include "../../stdexec/__detail/__concepts.hpp" +#include "../../stdexec/__detail/__debug.hpp" +#include namespace exec { ////////////////////////////////////////////////////////////////////////////////////////////////// @@ -63,7 +67,27 @@ namespace exec { return _Self::__tag().get_env(*this); } + // make sure that get_completion_signatures does not SFINAE out + // when the trailing return-type is invalid but keep the + // trailing return-type when it is valid + struct get_completion_signatures_sfinae { + template _Self, class... _Env> + auto operator()(_Self&& __self, _Env&&... __env) const + -> decltype(__self.__tag().get_completion_signatures( + static_cast<_Self&&>(__self), + static_cast<_Env&&>(__env)...)) { + return {}; + } + }; + template _Self, class... _Env> + requires(stdexec::__is_debug_env<_Env> || ... || false) + || (!stdexec::__callable) + static auto get_completion_signatures(_Self&& __self, _Env&&... __env) { + return __self.__tag() + .get_completion_signatures(static_cast<_Self&&>(__self), static_cast<_Env&&>(__env)...); + } template _Self, class... _Env> + requires(!stdexec::__is_debug_env<_Env> && ... && true) static auto get_completion_signatures(_Self&& __self, _Env&&... __env) -> decltype(__self.__tag().get_completion_signatures( static_cast<_Self&&>(__self), @@ -71,14 +95,54 @@ namespace exec { return {}; } + // make sure that get_item_types does not SFINAE out + // when the trailing return-type is invalid but keep the + // trailing return-type when it is valid + struct get_item_types_sfinae { + template _Self, class... _Env> + auto + operator()(_Self&& __self, _Env&&... __env) const -> decltype(__self.__tag().get_item_types( + static_cast<_Self&&>(__self), + static_cast<_Env&&>(__env)...)) { + return {}; + } + }; + template _Self, class... _Env> + requires(stdexec::__is_debug_env<_Env> || ... || false) + || (!stdexec::__callable) + static auto get_item_types(_Self&& __self, _Env&&... __env) { + return __self.__tag() + .get_item_types(static_cast<_Self&&>(__self), static_cast<_Env&&>(__env)...); + } template _Self, class... _Env> + requires(!stdexec::__is_debug_env<_Env> && ... && true) static auto get_item_types(_Self&& __self, _Env&&... __env) -> decltype(__self.__tag() .get_item_types(static_cast<_Self&&>(__self), static_cast<_Env&&>(__env)...)) { return {}; } + // make sure that subscribe does not SFINAE out + // when the trailing return-type is invalid but keep the + // trailing return-type when it is valid + struct subscribe_sfinae { + template _Self, stdexec::receiver _Receiver> + auto operator()(_Self&& __self, _Receiver&& __rcvr) const noexcept(noexcept( + __self.__tag().subscribe(static_cast<_Self&&>(__self), static_cast<_Receiver&&>(__rcvr)))) + -> decltype(__self.__tag() + .subscribe(static_cast<_Self&&>(__self), static_cast<_Receiver&&>(__rcvr))) { + return __tag_t::subscribe(static_cast<_Self&&>(__self), static_cast<_Receiver&&>(__rcvr)); + } + }; + template _Self, stdexec::receiver _Receiver> + requires stdexec::__is_debug_env> + || (!stdexec::__callable) + static auto subscribe(_Self&& __self, _Receiver&& __rcvr) noexcept(noexcept( + __self.__tag().subscribe(static_cast<_Self&&>(__self), static_cast<_Receiver&&>(__rcvr)))) { + return __tag_t::subscribe(static_cast<_Self&&>(__self), static_cast<_Receiver&&>(__rcvr)); + } template _Self, stdexec::receiver _Receiver> + requires(!stdexec::__is_debug_env>) static auto subscribe(_Self&& __self, _Receiver&& __rcvr) noexcept(noexcept( __self.__tag().subscribe(static_cast<_Self&&>(__self), static_cast<_Receiver&&>(__rcvr)))) -> decltype(__self.__tag() diff --git a/include/exec/materialize.hpp b/include/exec/materialize.hpp index d59dd1b45..471066973 100644 --- a/include/exec/materialize.hpp +++ b/include/exec/materialize.hpp @@ -127,7 +127,8 @@ namespace exec { }; } // namespace __materialize - inline constexpr __materialize::__materialize_t materialize; + using materialize_t = __materialize::__materialize_t; + inline constexpr materialize_t materialize; namespace __dematerialize { using namespace stdexec; @@ -235,5 +236,6 @@ namespace exec { }; } // namespace __dematerialize - inline constexpr __dematerialize::__dematerialize_t dematerialize; + using dematerialize_t = __dematerialize::__dematerialize_t; + inline constexpr dematerialize_t dematerialize; } // namespace exec diff --git a/include/exec/sequence/iterate.hpp b/include/exec/sequence/iterate.hpp index 1a71dd255..f65cfb2b4 100644 --- a/include/exec/sequence/iterate.hpp +++ b/include/exec/sequence/iterate.hpp @@ -17,6 +17,10 @@ #pragma once #include "../../stdexec/__detail/__config.hpp" +#include "../../stdexec/__detail/__concepts.hpp" +#include "../../stdexec/__detail/__env.hpp" +#include "../../stdexec/__detail/__sender_introspection.hpp" +#include "../../stdexec/__detail/__tuple.hpp" #if STDEXEC_HAS_STD_RANGES() @@ -35,6 +39,13 @@ namespace exec { namespace __iterate { using namespace stdexec; + template + using __range_of_t = + stdexec::__mapply, STDEXEC_REMOVE_REFERENCE(_Data)>; + template + using __scheduler_of_t = + stdexec::__mapply, STDEXEC_REMOVE_REFERENCE(_Data)>; + template struct __operation_base { STDEXEC_ATTRIBUTE(no_unique_address) _Iterator __iterator_; @@ -79,18 +90,18 @@ namespace exec { using __sender_t = stdexec::__t<__sender, std::ranges::sentinel_t<_Range>>>; - template + template struct __operation { struct __t; }; - template + template struct __next_receiver { struct __t { using _Receiver = stdexec::__t<_ReceiverId>; using __id = __next_receiver; using receiver_concept = stdexec::receiver_t; - stdexec::__t<__operation<_Range, _ReceiverId>>* __op_; + stdexec::__t<__operation<_Scheduler, _Range, _ReceiverId>>* __op_; void set_value() noexcept { __op_->__start_next(); @@ -106,23 +117,35 @@ namespace exec { }; }; - template - struct __operation<_Range, _ReceiverId>::__t : __operation_base_t<_Range> { + struct trampoline_t { + operator trampoline_scheduler() const noexcept { + return {}; + } + }; + + template + struct __operation<_Scheduler, _Range, _ReceiverId>::__t : __operation_base_t<_Range> { using _Receiver = stdexec::__t<_ReceiverId>; + using __scheduler_t = _Scheduler; + + using __next_receiver_t = stdexec::__t<__next_receiver<__scheduler_t, _Range, _ReceiverId>>; + + __scheduler_t __scheduler_; + _Receiver __rcvr_; using __item_sender_t = - __result_of, __sender_t<_Range>>; - using __next_receiver_t = stdexec::__t<__next_receiver<_Range, _ReceiverId>>; + __result_of, __sender_t<_Range>>; std::optional< connect_result_t, __next_receiver_t> > __op_{}; - trampoline_scheduler __scheduler_{}; void __start_next() noexcept { - if (this->__iterator_ == this->__sentinel_) { + if ( + stdexec::get_stop_token(this->__rcvr_).stop_requested() + || this->__iterator_ == this->__sentinel_) { stdexec::set_value(static_cast<_Receiver&&>(__rcvr_)); } else { @@ -151,14 +174,24 @@ namespace exec { using _ReceiverId = __id<_Receiver>; _Receiver __rcvr_; - template - using __operation_t = __t<__operation<__decay_t<_Range>, _ReceiverId>>; + template + using __scheduler_t = stdexec::__if_c< + stdexec::__decays_to>, + trampoline_scheduler, + __scheduler_of_t<_Data> + >; + + template + using __operation_t = + __t<__operation<__scheduler_t<_Data>, __decay_t<__range_of_t<_Data>>, _ReceiverId>>; - template - auto operator()(__ignore, _Range&& __range) noexcept(__nothrow_move_constructible<_Receiver>) - -> __operation_t<_Range> { + template + auto operator()(__ignore, _Data&& __data) noexcept(__nothrow_move_constructible<_Receiver>) + -> __operation_t<_Data> { + auto [__scheduler, __range] = static_cast<_Data&&>(__data); return { {std::ranges::begin(__range), std::ranges::end(__range)}, + __scheduler, static_cast<_Receiver&&>(__rcvr_) }; } @@ -168,22 +201,41 @@ namespace exec { template requires __decay_copyable<_Range> auto operator()(_Range&& __range) const -> __well_formed_sequence_sender auto { - return make_sequence_expr(__decay_t<_Range>{static_cast<_Range&&>(__range)}); + return make_sequence_expr( + __decayed_tuple{trampoline_t{}, static_cast<_Range&&>(__range)}); + } + template + requires __decay_copyable<_Range> && __decay_copyable<_Scheduler> + auto operator()(_Scheduler&& __scheduler, _Range&& __range) const + -> __well_formed_sequence_sender auto { + return make_sequence_expr(__decayed_tuple<_Scheduler, _Range>{ + static_cast<_Scheduler&&>(__scheduler), static_cast<_Range&&>(__range)}); } using __completion_sigs = completion_signatures; + template + using __scheduler_t = stdexec::__if_c< + stdexec::__decays_to>, + trampoline_scheduler, + __scheduler_of_t<_Data> + >; + + template + using _NextReceiver = stdexec::__t<__next_receiver< + __scheduler_t<__data_of<_Sequence>>, + __range_of_t<__data_of<_Sequence>>, + __id<_Receiver> + >>; + template using __item_sender_t = __result_of< exec::sequence, - schedule_result_t, - __sender_t<__data_of<_Sequence>> + schedule_result_t<__scheduler_t<__data_of<_Sequence>>&>, + __sender_t<__range_of_t<__data_of<_Sequence>>> >; - template - using _NextReceiver = stdexec::__t<__next_receiver<__data_of<_Sequence>, __id<_Receiver>>>; - template using _NextSender = next_sender_of_t<_Receiver, __item_sender_t<_Sequence>>; @@ -198,14 +250,15 @@ namespace exec { return __sexpr_apply(static_cast<_SeqExpr&&>(__seq), __subscribe_fn<_Receiver>{__rcvr}); } - static auto get_completion_signatures(__ignore, __ignore = {}) noexcept - -> completion_signatures { + template _Sequence> + static auto + get_completion_signatures(_Sequence&&, __ignore = {}) noexcept -> __completion_sigs { return {}; } template _Sequence> - static auto get_item_types(_Sequence&&, __ignore) noexcept // - -> item_types<__item_sender_t<_Sequence>> { + static auto + get_item_types(_Sequence&&, __ignore) noexcept -> item_types<__item_sender_t<_Sequence>> { return {}; } diff --git a/include/exec/sequence/marbles.hpp b/include/exec/sequence/marbles.hpp new file mode 100644 index 000000000..374f2c97d --- /dev/null +++ b/include/exec/sequence/marbles.hpp @@ -0,0 +1,864 @@ +/* + * Copyright (c) 2023 Maikel Nadolski + * Copyright (c) 2023 NVIDIA Corporation + * + * Licensed under the Apache License Version 2.0 with LLVM Exceptions + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://llvm.org/LICENSE.txt + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include + +#include "../../stdexec/concepts.hpp" +#include "../../stdexec/execution.hpp" +#include "../sequence_senders.hpp" + +#include "./notification.hpp" +#include "../timed_scheduler.hpp" +#include "../../stdexec/__detail/__completion_signatures.hpp" +#include "../../stdexec/__detail/__config.hpp" +#include "../../stdexec/__detail/__execution_fwd.hpp" +#include "../../stdexec/__detail/__operation_states.hpp" +#include "../../stdexec/__detail/__sender_introspection.hpp" +#include "../../stdexec/__detail/__tuple.hpp" + +#include +#include +#include +#include + +namespace exec { + namespace __marbles { + using namespace stdexec; + using namespace std::chrono_literals; + + // + // marble_t<_Clock> represents a signal for a sequence + // sender and the frame at which the signal occurs/occured + // + // a marble diagram is a string that is parsed into a + // vector> + // + // Example: + // this marble diagram + // "--a--b--c|" + // is equivalent to this set of marble_t + // marble_t{2ms, ex::set_value, 'a'}, + // marble_t{5ms, ex::set_value, 'b'}, + // marble_t{8ms, ex::set_value, 'c'}, + // marble_t{8ms, sequence_end} + // which is displayed as + // set_value('a')@2ms, set_value('b')@5ms, + // set_value('c')@8ms, sequence_end()@8ms + // + // Diagram reference: + // Time: + // ' ' indicates that 0ms has elapsed - used to line up diagrams in a visually pleasing manner + // '-' indicates that 1ms has elapsed + // ' 0-9+(ms|s|m) ' + // indicates elapsed time at that point in the diagram that is equal to the specified number + // of ms - milliseconds, s - seconds, m - minutes + // NOTE: must have a preceding and following space to disambiguate from values + // '(' begins a group of signals that all occur on the frame that the group begins on + // ')' ends a group of signals that all occur on the frame that the group begins on + // Value: + // '0'-'9' 'a'-'z' 'A'-'Z' + // indicates that a value in the sequence completes with set_value( char ) + // '#' indicates that a value in the sequence completes with set_error(error_code(interrupted)) + // '.' indicates that a value in the sequence completes with set_stopped() + // Sequence: + // '=' indicates connect() on the sequence sender + // '^' indicates start() on the sequence operation + // '|' indicates that the sequence completes with set_value() + // '$' indicates that the sequence completes with set_stopped() + // '?' indicates that request_stop() was sent to the sequence from an external source + // + // record_marbles() will record a set of marbles from the signals of the specified sequence sender + // + + + // __value_t is a hammer to stop char from being treated like an integer + + struct __value_t { + char __c_; + + operator char() noexcept { + return __c_; + } + + friend auto operator==(const __value_t& __lhs, const __value_t& __rhs) noexcept -> bool { + return __lhs.__c_ == __rhs.__c_; + } + + friend inline std::string to_string(const __value_t& __self) noexcept { + const char __result[4] = {'\'', __self.__c_, '\'', '\0'}; + return __result; + } + }; + + enum class marble_selector_t { + uninitialized, + frame_only, + notification, + sequence_start, + sequence_connect, + sequence_value, + sequence_error, + sequence_stopped, + request_stop + }; + + struct sequence_start_t { + operator marble_selector_t() const noexcept { + return marble_selector_t::sequence_start; + } + friend inline std::string to_string(sequence_start_t) noexcept { + return {"sequence_start"}; + } + }; + static constexpr inline sequence_start_t sequence_start; + + struct sequence_connect_t { + operator marble_selector_t() const noexcept { + return marble_selector_t::sequence_connect; + } + friend inline std::string to_string(sequence_connect_t) noexcept { + return {"sequence_connect"}; + } + }; + static constexpr inline sequence_connect_t sequence_connect; + + struct sequence_end_t { + operator marble_selector_t() const noexcept { + return marble_selector_t::sequence_value; + } + friend inline std::string to_string(sequence_end_t) noexcept { + return {"sequence_end"}; + } + }; + static constexpr inline sequence_end_t sequence_end; + + struct sequence_error_t { + operator marble_selector_t() const noexcept { + return marble_selector_t::sequence_error; + } + friend inline std::string to_string(sequence_error_t) noexcept { + return {"sequence_error"}; + } + }; + static constexpr inline sequence_error_t sequence_error; + + struct sequence_stopped_t { + operator marble_selector_t() const noexcept { + return marble_selector_t::sequence_stopped; + } + friend inline std::string to_string(sequence_stopped_t) noexcept { + return {"sequence_stopped"}; + } + }; + static constexpr inline sequence_stopped_t sequence_stopped; + + struct request_stop_t { + operator marble_selector_t() const noexcept { + return marble_selector_t::request_stop; + } + friend inline std::string to_string(request_stop_t) noexcept { + return {"request_stop"}; + } + }; + static constexpr inline request_stop_t request_stop; + + using __completion_signatures_t = completion_signatures< + set_value_t(__value_t), + set_error_t(std::error_code), + set_error_t(std::exception_ptr), + set_stopped_t() + >; + + template + struct marble_t { + using __frame_t = typename _Clock::time_point; + using __duration_t = typename _Clock::duration; + using __notification_t = notification_t<__completion_signatures_t>; + using __marble_sender_t = typename __notification_t::__notification_sender_t; + + using selector = marble_selector_t; + + __frame_t __at_; + selector __selector_; + std::optional<__notification_t> __notification_; + + marble_t(__frame_t __at, set_error_t __tag, std::error_code __error) noexcept + : __at_{__at} + , __selector_{selector::notification} + , __notification_{} { + __notification_.emplace(__tag, __error); + } + marble_t(__frame_t __at, set_error_t __tag, std::exception_ptr __ex) noexcept + : __at_{__at} + , __selector_{selector::notification} + , __notification_{} { + __notification_.emplace(__tag, __ex); + } + marble_t(__frame_t __at, set_value_t __tag, char __c) noexcept + : __at_{__at} + , __selector_{selector::notification} + , __notification_{} { + __notification_.emplace(__tag, __value_t{__c}); + } + marble_t(__frame_t __at, set_value_t __tag, __value_t __v) noexcept + : __at_{__at} + , __selector_{selector::notification} + , __notification_{} { + __notification_.emplace(__tag, __v); + } + marble_t(__frame_t __at, set_stopped_t __tag) noexcept + : __at_{__at} + , __selector_{selector::notification} + , __notification_{} { + __notification_.emplace(__tag); + } + marble_t(__frame_t __at, sequence_start_t __tag) noexcept + : __at_{__at} + , __selector_{__tag} + , __notification_{} { + } + marble_t(__frame_t __at, sequence_connect_t __tag) noexcept + : __at_{__at} + , __selector_{__tag} + , __notification_{} { + } + marble_t(__frame_t __at, sequence_end_t __tag) noexcept + : __at_{__at} + , __selector_{__tag} + , __notification_{} { + } + marble_t(__frame_t __at, sequence_error_t __tag) noexcept + : __at_{__at} + , __selector_{__tag} + , __notification_{} { + } + marble_t(__frame_t __at, sequence_stopped_t __tag) noexcept + : __at_{__at} + , __selector_{__tag} + , __notification_{} { + } + marble_t(__frame_t __at, request_stop_t __tag) noexcept + : __at_{__at} + , __selector_{__tag} + , __notification_{} { + } + marble_t(__frame_t __at) noexcept + : __at_{__at} + , __selector_{selector::frame_only} + , __notification_{} { + } + + template + void visit(_Fn&& __fn) noexcept { + if (__notification_.has_value()) { + __notification_->visit(static_cast<_Fn&&>(__fn)); + } + } + + template + void visit_receiver(_Receiver&& __receiver) noexcept { + if (__selector_ == selector::notification) { + __notification_->visit_receiver(static_cast<_Receiver&&>(__receiver)); + } else { + stdexec::set_stopped(static_cast<_Receiver&&>(__receiver)); + } + } + + template + void visit_sequence_receiver(_Receiver&& __receiver) noexcept { + switch (__selector_) { + case selector::sequence_value: { + stdexec::set_value(static_cast<_Receiver&&>(__receiver)); + break; + } + case selector::sequence_error: { + stdexec::set_error(static_cast<_Receiver&&>(__receiver), std::exception_ptr{}); + break; + } + case selector::notification: { + if (value_notification()) { + stdexec::set_value(static_cast<_Receiver&&>(__receiver)); + break; + } + } + [[fallthrough]]; + case selector::request_stop: + [[fallthrough]]; + case selector::sequence_stopped: + [[fallthrough]]; + case selector::sequence_connect: + [[fallthrough]]; + case selector::sequence_start: + [[fallthrough]]; + case selector::frame_only: + [[fallthrough]]; + default: { + stdexec::set_stopped(static_cast<_Receiver&&>(__receiver)); + break; + } + }; + } + + [[nodiscard]] + auto visit_sender() noexcept -> __marble_sender_t { + return __notification_->visit_sender(); + } + + [[nodiscard]] + bool sequence_end() const noexcept { + return __selector_ == selector::sequence_value; + } + [[nodiscard]] + bool sequence_error() const noexcept { + return __selector_ == selector::sequence_error; + } + [[nodiscard]] + bool sequence_stopped() const noexcept { + return __selector_ == selector::sequence_stopped; + } + [[nodiscard]] + bool request_stop() const noexcept { + return __selector_ == selector::request_stop; + } + [[nodiscard]] + bool value_notification() const noexcept { + return __notification_.has_value() && __notification_->value(); + } + [[nodiscard]] + bool error_notification() const noexcept { + return __notification_.has_value() && __notification_->error(); + } + [[nodiscard]] + bool stopped_notification() const noexcept { + return __notification_.has_value() && __notification_->stopped(); + } + [[nodiscard]] + __frame_t frame() const noexcept { + return __at_; + } + __frame_t shift_frame_by(__duration_t __by) noexcept { + __frame_t __old_frame = __at_; + __at_ += __by; + return __old_frame; + } + __frame_t set_origin_frame(__frame_t __origin) noexcept { + __frame_t __old_frame = __at_; + __at_ += __origin.time_since_epoch(); + return __old_frame; + } + + friend auto operator==(const marble_t& __lhs, const marble_t& __rhs) noexcept -> bool { + return std::chrono::duration_cast(__lhs.__at_.time_since_epoch()) + == std::chrono::duration_cast(__rhs.__at_ + .time_since_epoch()) + && __lhs.__selector_ == __rhs.__selector_ + && __lhs.__notification_.has_value() == __rhs.__notification_.has_value() + && (__lhs.__notification_.has_value() && __rhs.__notification_.has_value() + ? __lhs.__notification_.value() == __rhs.__notification_.value() + : true); + } + + friend std::string to_string(const marble_t& __self) noexcept { + using std::to_string; + std::string __result; + switch (__self.__selector_) { + case selector::frame_only: { + __result = "frame"; + break; + } + case selector::notification: { + __result = to_string(__self.__notification_.value()); + break; + } + case selector::request_stop: { + __result = to_string(__marbles::request_stop) + "()"; + break; + } + case selector::sequence_start: { + __result = to_string(__marbles::sequence_start) + "()"; + break; + } + case selector::sequence_connect: { + __result = to_string(__marbles::sequence_connect) + "()"; + break; + } + case selector::sequence_value: { + __result = to_string(__marbles::sequence_end) + "()"; + break; + } + case selector::sequence_error: { + __result = to_string(__marbles::sequence_error) + "()"; + break; + } + case selector::sequence_stopped: { + __result = to_string(__marbles::sequence_stopped) + "()"; + break; + } + default: { + return {"uninitialized-marble"}; + } + }; + return __result + "@" + + to_string( + std::chrono::duration_cast(__self.__at_ + .time_since_epoch()) + .count()) + + "ms"; + } + }; + + struct get_marbles_from_t { + + template + constexpr auto operator()(_Clock __clock, __mstring<_Len> __diagram) const noexcept + -> std::vector> { + using __frame_t = typename _Clock::time_point; + using __duration_t = typename _Clock::duration; + + constexpr auto __make_span = + [](const __mstring<_LenB>& __string) noexcept { + return std::span{__string.__what_, _LenB - 1}; + }; + + std::vector> __marbles; + __frame_t __group_start_frame{-1ms}; + __frame_t __frame = __clock.now(); + auto __whole = __make_span(__diagram); + auto __remaining = __whole; + auto __consume_first = [&__remaining](std::size_t __skip) noexcept { + __remaining = __remaining.subspan(__skip); + }; + auto __push = [&](auto __tag, auto... __args) noexcept { + __marbles.emplace_back( + __group_start_frame == __frame_t{-1ms} ? __frame : __group_start_frame, + __tag, + __args...); + }; + while (!__remaining.empty()) { + __frame_t __next_frame{__frame}; + auto __advance_frame_by = [&__next_frame, + &__group_start_frame](__duration_t __by) noexcept { + __next_frame += __group_start_frame == __frame_t{-1ms} ? __by : 0ms; + }; + switch (__remaining.front()) { + case '-': { + __advance_frame_by(1ms); + __consume_first(1); + break; + } + case '(': { + __group_start_frame = __frame; + __consume_first(1); + break; + } + case ')': { + __group_start_frame = __frame_t{-1ms}; + __advance_frame_by(1ms); + __consume_first(1); + break; + } + case '|': { + __push(sequence_end); + __consume_first(1); + break; + } + case '=': { + __push(sequence_connect); + __consume_first(1); + break; + } + case '^': { + __push(sequence_start); + __consume_first(1); + break; + } + case '$': { + __push(sequence_stopped); + __consume_first(1); + break; + } + case '?': { + __push(request_stop); + __consume_first(1); + break; + } + case '#': { + __push(set_error, std::make_error_code(std::errc::interrupted)); + __consume_first(1); + break; + } + case '.': { + __push(set_stopped); + __consume_first(1); + break; + } + default: { + // use auto and math to derive the difference type + auto __consumed_in_default = __remaining.begin() - __remaining.begin(); + if ( + std::addressof(*__whole.begin()) == std::addressof(*__remaining.begin()) + || !!std::isspace(__remaining.front())) { + if (!!std::isspace(__remaining.front())) { + __consume_first(1); + ++__consumed_in_default; + } + // try to consume a duration at first char or after ' ' char. + if (!!std::isdigit(__remaining.front())) { + auto __valid_duration_suffix = [](auto c) noexcept { + return c == 'm' || c == 's'; + }; + auto __suffix_begin = std::ranges::find_if(__remaining, __valid_duration_suffix); + bool __all_digits = std::all_of(__remaining.begin(), __suffix_begin, [](auto c) { + return std::isdigit(c); + }); + if ( + __suffix_begin != __remaining.end() && __suffix_begin - __remaining.begin() > 0 + && __all_digits) { + auto __to_consume = __suffix_begin - __remaining.begin(); + long __duration = std::atol(__remaining.data()); + const auto __ms_str = "ms "_mstr; + const auto __ms = __make_span(__ms_str); + const auto __s_str = "s "_mstr; + const auto __s = __make_span(__s_str); + const auto __m_str = "m "_mstr; + const auto __m = __make_span(__m_str); + if (std::ranges::equal(__remaining.subspan(__to_consume, 3), __ms)) { + __to_consume += 2; + } else if (std::ranges::equal(__remaining.subspan(__to_consume, 2), __s)) { + __duration *= 1000; + __to_consume += 1; + } else if (std::ranges::equal(__remaining.subspan(__to_consume, 2), __m)) { + __duration = __duration * 1000 * 60; + __to_consume += 1; + } else { + __duration = -1; + __to_consume = 0; + //fallthrough + } + if (__duration >= 0 && __to_consume > 0) { + __advance_frame_by(std::chrono::milliseconds(__duration)); + __consume_first(__to_consume); + __consumed_in_default += __to_consume; + break; + } + } + } + } + if (!!std::isalnum(__remaining.front())) { + __advance_frame_by(1ms); + __push(set_value, __remaining.front()); + __consume_first(1); + ++__consumed_in_default; + break; + } + if (__consumed_in_default == 0) { + // parsing error + return __marbles; + } + break; + } + }; + __frame = __next_frame; + } + return __marbles; + } + }; + + template + struct __value_receiver { + using __t = __value_receiver; + using __id = __value_receiver; + using receiver_concept = stdexec::receiver_t; + + _Clock __clock_; + std::vector>* __recording_; + _Receiver* __receiver_; + + template + void set_value(_Args&&... __args) noexcept { + __recording_ + ->emplace_back(__clock_.now(), stdexec::set_value, static_cast<_Args&&>(__args)...); + stdexec::set_value(static_cast<_Receiver>(*__receiver_)); + } + + template + void set_error(_Error&& __error) noexcept { + __recording_ + ->emplace_back(__clock_.now(), stdexec::set_error, static_cast<_Error&&>(__error)); + stdexec::set_stopped(static_cast<_Receiver>(*__receiver_)); + } + + void set_stopped() noexcept { + __recording_->emplace_back(__clock_.now(), stdexec::set_stopped); + stdexec::set_stopped(static_cast<_Receiver>(*__receiver_)); + } + + auto get_env() const noexcept -> env_of_t<_Receiver> { + return stdexec::get_env(*__receiver_); + } + }; + + template + struct __value_operation { + using _Receiver = stdexec::__t<_ReceiverId>; + + struct __t { + using __id = __value_operation; + + using __receiver_t = __value_receiver<_Receiver, _Clock>; + + _Receiver __receiver_; + _Clock __clock_; + std::vector>* __recording_; + stdexec::connect_result_t<_Value, __receiver_t> __op_; + + __t( + _Value&& __value, + _Receiver&& __receiver, + _Clock __clock, + std::vector>* __recording) noexcept + : __receiver_{static_cast<_Receiver&&>(__receiver)} + , __clock_{__clock} + , __recording_(__recording) + , __op_{stdexec::connect( + static_cast<_Value&&>(__value), + __receiver_t{__clock_, __recording_, &__receiver_})} { + } + + void start() & noexcept { + stdexec::start(__op_); + } + }; + }; + + template + struct __value_sender { + struct __t { + using __id = __value_sender; + using sender_concept = stdexec::sender_t; + + template + using __value_operation_t = + stdexec::__t<__value_operation<_Value, stdexec::__id<_Receiver>, _Clock>>; + + _Clock __clock_; + std::vector>* __recording_; + _Value __value_; + + template _Self, class... _Env> + static auto get_completion_signatures(_Self&&, _Env&&...) noexcept + -> stdexec::completion_signatures { + return {}; + } + + template _Self, receiver _Receiver> + static auto connect(_Self&& __self, _Receiver&& __rcvr) + noexcept(__nothrow_move_constructible<_Receiver>) { + return __value_operation_t<_Receiver>{ + static_cast<_Value&&>(__self.__value_), + static_cast<_Receiver&&>(__rcvr), + __self.__clock_, + __self.__recording_}; + } + }; + }; + + template + struct __receiver { + using __t = __receiver; + using __id = __receiver; + using receiver_concept = stdexec::receiver_t; + + template + using __value_sender_t = stdexec::__t<__value_sender<_Value, _Clock>>; + + _Clock __clock_; + std::vector>* __recording_; + _Receiver* __receiver_; + + using __receiver_t = __value_receiver<_Receiver, _Clock>; + + template + auto set_next(_Value&& __value) noexcept -> next_sender auto { + return __value_sender_t<_Value>{__clock_, __recording_, static_cast<_Value&&>(__value)}; + } + + void set_value() noexcept { + __recording_->emplace_back(__clock_.now(), sequence_end); + stdexec::set_value(static_cast<_Receiver&&>(*__receiver_)); + } + + template + void set_error(_Error&&) noexcept { + __recording_->emplace_back(__clock_.now(), sequence_error); + stdexec::set_value(static_cast<_Receiver&&>(*__receiver_)); + } + + void set_stopped() noexcept { + __recording_->emplace_back(__clock_.now(), sequence_stopped); + stdexec::set_value(static_cast<_Receiver&&>(*__receiver_)); + } + + auto get_env() const noexcept -> env_of_t<_Receiver> { + return stdexec::get_env(*__receiver_); + } + }; + + template + struct __operation { + using _Receiver = stdexec::__t<_ReceiverId>; + + struct __t { + using __id = __operation; + + using __receiver_t = __receiver<_Receiver, _Clock>; + + _Receiver __receiver_; + _Clock __clock_; + std::vector>* __recording_; + exec::subscribe_result_t<_Sequence, __receiver_t> __op_; + + __t( + _Sequence&& __sequence, + _Receiver&& __receiver, + _Clock __clock, + std::vector>* __recording) noexcept + : __receiver_{static_cast<_Receiver&&>(__receiver)} + , __clock_{__clock} + , __recording_(__recording) + , __op_{exec::subscribe( + static_cast<_Sequence&&>(__sequence), + __receiver_t{__clock_, __recording_, &__receiver_})} { + } + + void start() & noexcept { + __recording_->emplace_back(__clock_.now(), sequence_start); + stdexec::start(__op_); + } + }; + }; + + template + struct __connect_fn { + _Receiver __rcvr_; + + using __receiver_id_t = __id<_Receiver>; + + template + using __operation_t = __t<__operation<_Child, __receiver_id_t, _Clock>>; + + template + auto operator()(__ignore, _Data&& __data, _Child&& __child) + noexcept(__nothrow_constructible_from< + __operation_t<_Child>, + _Child, + _Receiver, + _Clock, + std::vector>* + >) -> __operation_t<_Child> { + auto [__recording, __clock] = static_cast<_Data&&>(__data); + __recording->emplace_back(__clock.now(), sequence_connect); + return { + static_cast<_Child&&>(__child), static_cast<_Receiver&&>(__rcvr_), __clock, __recording}; + } + }; + + struct record_marbles_t { + template + auto operator()( + std::vector>* __recording, + _Clock __clock, + _Sequence&& __sequence) const { //-> __well_formed_sender auto { + auto __domain = __get_early_domain(static_cast<_Sequence&&>(__sequence)); + return transform_sender( + __domain, + __make_sexpr( + __decayed_tuple>*, _Clock>{__recording, __clock}, + static_cast<_Sequence&&>(__sequence))); + } + template + std::vector> + operator()(_Clock __clock, _Sequence&& __sequence) const noexcept { + std::vector> __recording; + auto __recorder = (*this)(&__recording, __clock, static_cast<_Sequence&&>(__sequence)); + stdexec::sync_wait(__recorder); + return __recording; + } + }; + + struct __record_marbles_impl : __sexpr_defaults { + + template + using __clock_of_t = __mapply<__q<__mback>, __data_of>; + + static constexpr auto get_completion_signatures = + [] _Sender, class... _Env>(_Sender&&, _Env&&...) noexcept + -> completion_signatures { + return {}; + }; + + static constexpr auto connect = + [] _Self, receiver _Receiver>(_Self&& __self, _Receiver&& __rcvr) noexcept( + __nothrow_callable<__sexpr_apply_t, _Self, __connect_fn<__clock_of_t<_Self>, _Receiver>>) + -> __call_result_t<__sexpr_apply_t, _Self, __connect_fn<__clock_of_t<_Self>, _Receiver>> { + return __sexpr_apply( + static_cast<_Self&&>(__self), + __connect_fn<__clock_of_t<_Self>, _Receiver>{static_cast<_Receiver&&>(__rcvr)}); + }; + }; + + } // namespace __marbles + + using __value_t = __marbles::__value_t; + + using sequence_start_t = __marbles::sequence_start_t; + static constexpr inline auto sequence_start = sequence_start_t{}; + + using sequence_connect_t = __marbles::sequence_connect_t; + static constexpr inline auto sequence_connect = sequence_connect_t{}; + + using sequence_end_t = __marbles::sequence_end_t; + static constexpr inline auto sequence_end = sequence_end_t{}; + + using sequence_error_t = __marbles::sequence_error_t; + static constexpr inline auto sequence_error = sequence_error_t{}; + + using sequence_stopped_t = __marbles::sequence_stopped_t; + static constexpr inline auto sequence_stopped = sequence_stopped_t{}; + + using request_stop_t = __marbles::request_stop_t; + static constexpr inline auto request_stop = request_stop_t{}; + + template + using marble_t = __marbles::marble_t<_Clock>; + + using get_marbles_from_t = __marbles::get_marbles_from_t; + + static constexpr inline auto get_marbles_from = get_marbles_from_t{}; + + using record_marbles_t = __marbles::record_marbles_t; + + static constexpr inline auto record_marbles = record_marbles_t{}; + +} // namespace exec + +namespace stdexec { + template <> + struct __sexpr_impl : exec::__marbles::__record_marbles_impl { }; +} // namespace stdexec diff --git a/include/exec/sequence/merge.hpp b/include/exec/sequence/merge.hpp index d028d7948..8cc1788d9 100644 --- a/include/exec/sequence/merge.hpp +++ b/include/exec/sequence/merge.hpp @@ -156,11 +156,11 @@ namespace exec { struct _INVALID_ARGUMENTS_TO_MERGE_ { }; - template + template using __error_t = __mexception< _INVALID_ARGUMENTS_TO_MERGE_, __children_of<_Self, __q<_WITH_SEQUENCES_>>, - _WITH_ENVIRONMENT_<_Env> + _WITH_ENVIRONMENT_<_Env>... >; template diff --git a/include/exec/sequence/merge_each.hpp b/include/exec/sequence/merge_each.hpp index 7e8b076de..4faaaf2d0 100644 --- a/include/exec/sequence/merge_each.hpp +++ b/include/exec/sequence/merge_each.hpp @@ -102,15 +102,17 @@ namespace exec { bool stop_requested() const noexcept { if constexpr (!unstoppable_token<__stop_token_t>) { return __stop_source_.stop_requested(); + } else { + return false; } - return false; } bool request_stop() noexcept { if constexpr (!unstoppable_token<__stop_token_t>) { return __stop_source_.request_stop(); + } else { + return false; } - return false; } inplace_stop_token get_token() const & noexcept { @@ -153,7 +155,7 @@ namespace exec { } virtual void nested_value_started() noexcept = 0; virtual void nested_value_complete() noexcept = 0; - virtual bool nested_value_fail() noexcept = 0; + virtual bool nested_sequence_fail() noexcept = 0; virtual void nested_value_break() noexcept = 0; virtual void error_complete() noexcept = 0; @@ -178,7 +180,7 @@ namespace exec { _ErrorStorage&, _Error >) { - if (this->nested_value_fail()) { + if (this->nested_sequence_fail()) { // We are the first child to complete with an error, so we must save the error. (Any // subsequent errors are ignored.) if constexpr (noexcept(__error_storage_ @@ -394,7 +396,7 @@ namespace exec { void nested_value_complete() noexcept override { complete_if_none_active(); } - bool nested_value_fail() noexcept override { + bool nested_sequence_fail() noexcept override { switch (__completion_.exchange(__completion_t::__error)) { case __completion_t::__started: // We must request stop. When the previous state is __error or __stopped, then stop has @@ -513,15 +515,16 @@ namespace exec { template void set_error(_Error&& __error) noexcept { auto __op = __op_; - stdexec::set_stopped(static_cast<_NestedValueReceiver&&>(__nested_value_op_->__receiver_)); - __op->store_error(static_cast<_Error&&>(__error)); + stdexec::set_error( + static_cast<_NestedValueReceiver&&>(__nested_value_op_->__receiver_), + static_cast<_Error&&>(__error)); __op->nested_value_break(); } void set_stopped() noexcept { auto __op = __op_; stdexec::set_stopped(static_cast<_NestedValueReceiver&&>(__nested_value_op_->__receiver_)); - __op->nested_value_complete(); + __op->nested_value_break(); } using __env_t = decltype(__op_->env_from(__declval>())); @@ -608,9 +611,7 @@ namespace exec { static auto get_completion_signatures(_Self&&, _Env&&...) noexcept -> stdexec::transform_completion_signatures< stdexec::completion_signatures_of_t<_NestedValueSender, _Env...>, - stdexec::completion_signatures, - stdexec::__sigs::__default_set_value, - drop + stdexec::completion_signatures > { return {}; } @@ -674,14 +675,14 @@ namespace exec { } void nested_sequence_complete() noexcept override { - auto __op = __op_; + auto& __op = *__op_; stdexec::set_value(static_cast<_NextReceiver&&>(this->__receiver_)); - __op->nested_sequence_complete(); + __op.nested_sequence_complete(); } void nested_sequence_break() noexcept override { - auto __op = __op_; + auto& __op = *__op_; stdexec::set_stopped(static_cast<_NextReceiver&&>(this->__receiver_)); - __op->nested_sequence_break(); + __op.nested_sequence_break(); } }; @@ -881,9 +882,7 @@ namespace exec { // include errors from senders of the nested sequences __error_types<__item_types_of_t<_Sequence, _Env...>, _Env...>, // include errors from the nested sequences - __error_types<__merge_each::__compute::__nested_sequences<_Sequence, _Env...>, _Env...>, - // include errors from all the item type senders of all the nested sequences - __error_types<__merge_each::__compute::__all_nested_values<_Sequence, _Env...>, _Env...> + __error_types<__merge_each::__compute::__nested_sequences<_Sequence, _Env...>, _Env...> >; // diff --git a/include/exec/sequence/notification.hpp b/include/exec/sequence/notification.hpp new file mode 100644 index 000000000..061071e90 --- /dev/null +++ b/include/exec/sequence/notification.hpp @@ -0,0 +1,295 @@ +/* + * Copyright (c) 2023 Maikel Nadolski + * Copyright (c) 2023 NVIDIA Corporation + * + * Licensed under the Apache License Version 2.0 with LLVM Exceptions + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://llvm.org/LICENSE.txt + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "../../stdexec/concepts.hpp" +#include "../../stdexec/execution.hpp" +#include "../sequence_senders.hpp" +#include "../../stdexec/__detail/__concepts.hpp" +#include "../../stdexec/__detail/__config.hpp" +#include "../../stdexec/__detail/__tuple.hpp" +#include +#include +#include +#include + +namespace std { + inline std::string to_string(const std::error_code __error) noexcept { + return __error.message(); + } + inline std::string to_string(const std::exception_ptr __ex) noexcept { + try { + std::rethrow_exception(__ex); + } catch (const std::exception& __ex) { + return __ex.what(); + } + } +} // namespace std + +namespace stdexec::__rcvrs { + inline std::string to_string(set_value_t) noexcept { + return {"set_value"}; + } + inline std::string to_string(set_error_t) noexcept { + return {"set_error"}; + } + inline std::string to_string(set_stopped_t) noexcept { + return {"set_stopped"}; + } +} // namespace stdexec::__rcvrs + +namespace exec::__sequence_sender { + inline std::string to_string(set_next_t) noexcept { + return {"set_next"}; + } +} // namespace exec::__sequence_sender + +namespace exec { + namespace __notification { + using namespace stdexec; + + // + // notification_t provides storage for any one of the + // completions in the specified completion_signatures<> + // + // notification_t can be compared, visited by a receiver + // to emit the stored signal, and provide a sender that will + // complete with the stored signal + // + + template + using __nothrow_decay_copyable_and_move_constructible_t = __mbool<( + (__nothrow_decay_copyable<_Ts> && __nothrow_move_constructible<__decay_t<_Ts>>) && ...)>; + + template + using __as_rvalues = set_value_t (*)(__decay_t...); + + template + using __as_error = set_error_t (*)(E...); + + // Here we convert all set_value(Args...) to set_value(__decay_t...). Note, we keep all + // error types as they are and unconditionally add set_stopped(). The indirection through the + // __completions_fn is to avoid a pack expansion bug in nvc++. + struct __completions_fn { + template + using __all_value_args_nothrow_decay_copyable = __value_types_t< + _CompletionSignatures, + __qq<__nothrow_decay_copyable_and_move_constructible_t>, + __qq<__mand_t> + >; + + template + using __f = __mtry_q<__concat_completion_signatures>::__f< + __eptr_completion_if_t<__all_value_args_nothrow_decay_copyable<_CompletionSignatures>>, + completion_signatures, + __transform_completion_signatures< + _CompletionSignatures, + __as_rvalues, + __as_error, + set_stopped_t (*)(), + __completion_signature_ptrs + > + >; + }; + + template + using __notification_storage_t = __for_each_completion_signature< + __minvoke<__completions_fn, _CompletionSignatures>, + __decayed_tuple, + std::variant + >; + + template + struct __notification_sender; + + template + struct notification_t { + using __notification_t = __notification_storage_t<_CompletionSignatures>; + using __notification_sender_t = stdexec::__t<__notification_sender<_CompletionSignatures>>; + + __notification_t __notification_{}; + + template + using __tag_of_t = + stdexec::__mapply, __decay_t<_Notification>>; + + template + notification_t(_Tag __tag, _Args&&... __args) + noexcept(noexcept(__notification_.template emplace<__decayed_tuple<_Tag, _Args...>>( + __decayed_tuple<_Tag, _Args...>{__tag, static_cast<_Args&&>(__args)...}))) { + __notification_.template emplace<__decayed_tuple<_Tag, _Args...>>( + __decayed_tuple<_Tag, _Args...>{__tag, static_cast<_Args&&>(__args)...}); + } + + template + auto visit(_Fn&& __fn) const noexcept { + return std::visit( + [&__fn](auto&& __tuple) noexcept { + return __tuple.apply( + [&__fn](auto __tag, auto&&... __args) noexcept { + return static_cast<_Fn&&>(__fn)(__tag, __args...); + }, + __tuple); + }, + __notification_); + } + template + void visit_receiver(_Receiver&& __receiver) noexcept { + std::visit( + [&__receiver](_Tuple&& __tuple) noexcept { + __tuple.apply( + [&__receiver](_Tag __tag, _Args&&... __args) noexcept { + __tag( + static_cast<_Receiver&&>(__receiver), static_cast<_Args&&>(__args)...); + }, + static_cast<_Tuple&&>(__tuple)); + }, + static_cast<__notification_t&&>(__notification_)); + } + auto visit_sender() noexcept -> __notification_sender_t; + + [[nodiscard]] + bool value() const noexcept { + return std::visit( + [](const _Tuple&) noexcept { + return stdexec::__decays_to>; + }, + __notification_); + } + [[nodiscard]] + bool error() const noexcept { + return std::visit( + [](const _Tuple&) noexcept { + return stdexec::__decays_to>; + }, + __notification_); + } + [[nodiscard]] + bool stopped() const noexcept { + return std::visit( + [](const _Tuple&) noexcept { + return stdexec::__decays_to>; + }, + __notification_); + } + + friend auto + operator==(const notification_t& __lhs, const notification_t& __rhs) noexcept -> bool { + using __self_t = notification_t; + return std::visit( + [](const _Lhs& __lhs, const _Rhs& __rhs) noexcept -> bool { + using __lhs_tag_t = __self_t::__tag_of_t<_Lhs>; + using __rhs_tag_t = __self_t::__tag_of_t<_Rhs>; + if constexpr ( + !std::same_as<__lhs_tag_t, __rhs_tag_t> + || stdexec::__v> + != stdexec::__v>) { + return false; + } else { + return __lhs.apply( + [&__lhs, + &__rhs](_LTag, const _LArgs&...) noexcept -> bool { + return [&__lhs, &__rhs](__indices<_Is...>) { + if constexpr ((std::equality_comparable_with< + const decltype(_Lhs::template __get<_Is+1>(__lhs))&, + const decltype(_Rhs::template __get<_Is+1>(__rhs))& + > + && ... && true)) { + return ( + ((_Lhs::template __get<_Is+1>(__lhs)) == (_Rhs::template __get<_Is+1>(__rhs))) + && ... && true); + } else { + return false; + } + }(__indices_for<_LArgs...>{}); + }, + __lhs); + } + }, + __lhs.__notification_, + __rhs.__notification_); + } + + friend std::string to_string(const notification_t& __self) noexcept { + using std::to_string; + return __self.visit([](auto __tag, const auto&... __args) { + int count = 0; + return to_string(__tag) + "(" + + (((count++ > 0 ? ", " : "") + to_string(__args)) + ... + std::string{}) + ")"; + }); + } + }; + + template + struct __notification_op { + using _Receiver = stdexec::__t<_ReceiverId>; + using __notification_t = notification_t<_CompletionSignatures>; + + struct __t { + using __id = __notification_op; + + _Receiver __receiver_; + __notification_t* __notification_; + + void start() & noexcept { + __notification_->visit_receiver(static_cast<_Receiver&&>(__receiver_)); + } + }; + }; + + template + struct __notification_sender { + using __notification_t = notification_t<_CompletionSignatures>; + struct __t { + using __id = __notification_sender; + using sender_concept = stdexec::sender_t; + + template + using __notification_op_t = + stdexec::__t<__notification_op<_ReceiverId, _CompletionSignatures>>; + + __notification_t* __notification_; + + template _Self, class... _Env> + static auto + get_completion_signatures(_Self&&, _Env&&...) noexcept -> _CompletionSignatures { + return {}; + } + + template _Self, receiver _Receiver> + static auto connect(_Self&& __self, _Receiver&& __rcvr) + noexcept(__nothrow_move_constructible<_Receiver>) + -> __notification_op_t> { + return {static_cast<_Receiver&&>(__rcvr), __self.__notification_}; + } + }; + }; + + template + auto notification_t<_CompletionSignatures>::visit_sender() noexcept + -> notification_t<_CompletionSignatures>::__notification_sender_t { + return {this}; + } + + } // namespace __notification + template + using notification_t = __notification::notification_t<_CompletionSignatures>; + + namespace __notification { + + } // namespace __notification +} // namespace exec diff --git a/include/exec/sequence/test_scheduler.hpp b/include/exec/sequence/test_scheduler.hpp new file mode 100644 index 000000000..fe460a759 --- /dev/null +++ b/include/exec/sequence/test_scheduler.hpp @@ -0,0 +1,1001 @@ +/* + * Copyright (c) 2024 Maikel Nadolski + * Copyright (c) 2024 NVIDIA Corporation + * + * Licensed under the Apache License Version 2.0 with LLVM Exceptions + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://llvm.org/LICENSE.txt + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "../../stdexec/__detail/__completion_signatures.hpp" +#include "../../stdexec/__detail/__concepts.hpp" +#include "../../stdexec/__detail/__execution_fwd.hpp" +#include "../../stdexec/__detail/__intrusive_mpsc_queue.hpp" +#include "../../stdexec/__detail/__meta.hpp" +#include "../../stdexec/__detail/__receivers.hpp" +#include "../../stdexec/__detail/__senders.hpp" +#include "../../stdexec/__detail/__senders_core.hpp" +#include "../../stdexec/__detail/__stop_token.hpp" +#include "../../stdexec/__detail/__transform_completion_signatures.hpp" +#include "../../stdexec/__detail/__type_traits.hpp" + +#include "./marbles.hpp" +#include "../__detail/intrusive_heap.hpp" +#include "../sequence.hpp" +#include "../sequence_senders.hpp" +#include "../timed_scheduler.hpp" +#include "../variant_sender.hpp" + +#include +#include +#include + +namespace exec { + class test_scheduler; + + // + // test_context implements a time-scheduler with additional test features. + // + // test_context is used to build tests using marble sequence senders + // and marble recorders + // + // test_clock_context & test_clock are used to represent time for the tests + // + // test_scheduler is used to queue tasks on the test_context at virtual-time points + // + // One or more __test_sequence(s) are constructed from marble + // diagrams and each schedules marbles on the test_scheduler + // when connected and started + // + // Once one or more __test_sequence(s) have been composed into an expression, + // the test_context is used to record the expression results as a + // set of marbles. To support the testing of infinite sequences + // the recording will be requested to stop at a default of 1000ms from + // start() if the expression has not completed. + // + // The expression result marbles are then compared to an expected + // set of marbles generated from a separate marble diagram + // + // An example of usage: + // + // TEST_CASE( + // "test_scheduler - test_context marble-sequence never", + // "[sequence_senders][test_scheduler]") { + // test_context __test{}; + // auto __clock = __test.get_clock(); + // CHECK(test_clock::time_point{0ms} == __clock.now()); + // + // // a sequence that will produce '0' at 1ms from start() + // // and then never complete + // auto __sequence = __test.get_marble_sequence_from( + // " -0-"_mstr); + // + // // the set of marbles for a sequence that contains '5' + // // at 1ms from start() and then is externally stopped + // // after 1000ms have elapsed since start() + // auto expected = get_marbles_from(__clock, + // "=^-5 998ms $"_mstr); + // + // // record an expression that is expected to turn '0' to '5' + // auto actual = __test.get_marbles_from( + // __sequence + // | then_each([](char c){ return c+5; })); + // + // CHECK(test_clock::time_point{1000ms} == __clock.now()); + // CAPTURE(__sequence.__marbles_); + // CHECK(expected == actual); + // } + // + + struct test_clock_context; + + // + // test_clock and test_clock_context implement a + // manually advanced virtual-time clock + // + // this is used to run tests that depend on time + // without consuming actual real-time + // + // So tests using virtual-time will complete + // fast in real-time + // + + struct test_clock { + using duration = std::chrono::milliseconds; + using rep = duration::rep; + using period = duration::period; + using time_point = std::chrono::time_point; + [[maybe_unused]] + static const bool is_steady = false; + + const test_clock_context* __context_; + + [[maybe_unused, nodiscard]] + time_point now() const noexcept; + }; + + struct test_clock_context { + using duration = typename test_clock::duration; + using rep = duration::rep; + using period = duration::period; + using time_point = typename test_clock::time_point; + [[maybe_unused]] + static const bool is_steady = test_clock::is_steady; + + time_point __now_{}; + + [[maybe_unused, nodiscard]] + time_point now() const noexcept { + return __now_; + } + + auto advance_now_to(time_point __new_now) noexcept -> time_point { + time_point __old_now = __now_; + __now_ = __new_now; + return __old_now; + } + auto advance_now_by(duration __by) noexcept -> time_point { + time_point __old_now = __now_; + __now_ += __by; + return __old_now; + } + }; + + inline typename test_clock::time_point test_clock::now() const noexcept { + return __context_->now(); + } + + + namespace _tst_sched { + using namespace stdexec::tags; + + struct test_operation_base { + enum class command_type { + uninitialized, + schedule, + stop + }; + enum class location { + uninitialized, + inert, + in_command_queue, + in_heap + }; + + ~test_operation_base() noexcept { + STDEXEC_ASSERT(location_ == location::inert); + } + + test_operation_base( + void (*set_value)(test_operation_base*) noexcept, + command_type command = command_type::schedule) noexcept + : command_{command} + , set_value_{set_value} { + } + + std::atomic next_{nullptr}; + location location_{location::inert}; + command_type command_; + void (*set_value_)(test_operation_base*) noexcept; + }; + + template + struct when_type { + when_type() = default; + + explicit when_type(Tp tp, std::size_t n = 0) noexcept + : time_point{std::move(tp)} + , counter{n} { + } + + Tp time_point{}; + std::size_t counter{}; + + friend auto operator<(const when_type& lhs, const when_type& rhs) noexcept -> bool { + return lhs.time_point < rhs.time_point + || (!(rhs.time_point < lhs.time_point) && lhs.counter < rhs.counter); + } + }; + + struct alignas(64) test_schedule_operation_base : test_operation_base { + using time_point = test_clock::time_point; + + test_schedule_operation_base( + time_point tp, + void (*set_stopped)(test_operation_base*) noexcept, + void (*set_value)(test_operation_base*) noexcept) noexcept + : test_operation_base{set_value, command_type::schedule} + , time_point_{tp} + , set_stopped_{set_stopped} { + } + + time_point time_point_; + // we increase the when counter to ensure that the heap is stable + // when two operations have the same time_point + // We do so only when the operation is started, not when it is constructed + when_type when_{}; + test_schedule_operation_base* prev_ = nullptr; + test_schedule_operation_base* left_ = nullptr; + test_schedule_operation_base* right_ = nullptr; + void (*set_stopped_)(test_operation_base*) noexcept; + }; + + struct alignas(64) test_stop_operation : test_operation_base { + test_stop_operation( + void (*set_value)(test_operation_base*) noexcept, + test_schedule_operation_base* target) noexcept + : test_operation_base{set_value, command_type::stop} + , target_{target} { + } + + test_schedule_operation_base* target_; + }; + + template + struct test_schedule_at_op { + class __t; + }; + + struct __recording_receiver; + struct __test_sequence; + struct __test_sequence_operation_base; + } // namespace _tst_sched + + class test_context { + private: + static constexpr std::ptrdiff_t context_closed = std::numeric_limits::min() / 2; + public: + using duration = test_clock::duration; + using time_point = test_clock::time_point; + + auto get_scheduler() noexcept -> test_scheduler; + auto get_clock() const noexcept -> test_clock; + auto now() const noexcept -> time_point; + + // parse a marble diagram into a set of marbles + template + auto get_marbles_from(stdexec::__mstring<_Len> __diagram) noexcept + -> std::vector> { + return exec::get_marbles_from(get_clock(), __diagram); + } + + // record the results of a sequence-sender as a set of marbles + template + auto get_marbles_from( + _Sequence&& __sequence, + typename test_clock::duration __stop_after = std::chrono::milliseconds(1000)) noexcept + -> std::vector>; + + + // return a sequence sender that will emit signals specified by the + // set of marbles provided + inline auto get_marble_sequence_from(std::vector> __marbles) noexcept + -> _tst_sched::__test_sequence; + + // parse a marble diagram into a set of marbles and return a sequence + // sender that will emit those marbles + template + auto get_marble_sequence_from(stdexec::__mstring<_Len> __diagram) noexcept + -> _tst_sched::__test_sequence; + + private: + template + friend struct _tst_sched::test_schedule_at_op; + + using command_type = _tst_sched::test_operation_base; + using task_type = _tst_sched::test_schedule_operation_base; + using stop_type = _tst_sched::test_stop_operation; + + void process_command_queue() { + while (command_type* op = command_queue_.pop_front()) { + STDEXEC_ASSERT(op->location_ == command_type::location::in_command_queue); + std::exchange(op->location_, command_type::location::inert); + if (op->command_ == command_type::command_type::schedule) { + auto* task = static_cast(op); + task->when_ = _tst_sched::when_type{task->time_point_, submission_counter_++}; + STDEXEC_ASSERT(task->location_ == command_type::location::inert); + std::exchange(task->location_, command_type::location::in_heap); + heap_.insert(task); + } else { + STDEXEC_ASSERT(op->command_ == command_type::command_type::stop); + auto* stop_op = static_cast(op); + STDEXEC_ASSERT(stop_op->target_->location_ == command_type::location::in_heap); + bool __erased = heap_.erase(stop_op->target_); + std::exchange(stop_op->target_->location_, command_type::location::inert); + if (__erased) { + stop_op->target_->set_stopped_(stop_op->target_); + } + stop_op->set_value_(stop_op); + } + } + } + + void clear_pending() { + STDEXEC_ASSERT(stop_requested_); + std::ptrdiff_t expected = 0; + while (!n_submissions_in_flight_ + .compare_exchange_weak(expected, context_closed, std::memory_order_relaxed) + && expected > 0) { + expected = 0; + } + task_type* op = heap_.front(); + while (op) { + STDEXEC_ASSERT(op->location_ == command_type::location::in_heap); + heap_.pop_front(); + std::exchange(op->location_, command_type::location::inert); + op->set_stopped_(op); + op = heap_.front(); + } + } + + void run() { + while (true) { + process_command_queue(); + task_type* op = heap_.front(); + if (!!op) { + STDEXEC_ASSERT(op->location_ == command_type::location::in_heap); + heap_.pop_front(); + std::exchange(op->location_, command_type::location::inert); + if (__clock_.now() < op->time_point_) { + __clock_.advance_now_to(op->time_point_); + } + op->set_value_(op); + std::exchange(op, nullptr); + } + bool stop_requested = stop_requested_; + ready_ = false; + if (stop_requested) { + clear_pending(); + break; + } + } + } + + void schedule(command_type* op) { + STDEXEC_ASSERT(op->location_ == command_type::location::inert); + std::ptrdiff_t n = n_submissions_in_flight_.fetch_add(1, std::memory_order_relaxed); + if (n < 0) { + if (op->command_ == command_type::command_type::schedule) { + static_cast(op)->set_stopped_(op); + } else { + STDEXEC_ASSERT(op->command_ == command_type::command_type::stop); + static_cast(op)->set_value_(op); + } + n_submissions_in_flight_ + .compare_exchange_strong(n, context_closed, std::memory_order_relaxed); + return; + } + std::exchange(op->location_, command_type::location::in_command_queue); + if (command_queue_.push_back(op)) { + ready_ = true; + } + n_submissions_in_flight_.fetch_sub(1, std::memory_order_relaxed); + } + + void request_stop() { + stop_requested_ = true; + process_command_queue(); + clear_pending(); + } + + friend struct _tst_sched::__recording_receiver; + friend struct _tst_sched::__test_sequence_operation_base; + + stdexec::__intrusive_mpsc_queue<&command_type::next_> command_queue_; + intrusive_heap< + task_type, + _tst_sched::when_type, + &task_type::when_, + &task_type::prev_, + &task_type::left_, + &task_type::right_ + > + heap_; + std::atomic n_submissions_in_flight_{0}; + bool ready_{false}; + bool stop_requested_{false}; + std::size_t submission_counter_{1}; + test_clock_context __clock_; + }; + + namespace _tst_sched { + template + class test_schedule_at_op::__t : _tst_sched::test_schedule_operation_base { + public: + using __id = test_schedule_at_op; + + __t(test_context& context, test_clock::time_point time_point, Receiver receiver) noexcept + : _tst_sched::test_schedule_operation_base{ + time_point, + [](_tst_sched::test_operation_base* op) noexcept { + auto* self = static_cast<__t*>(op); + int counter = self->ref_count_.fetch_sub(1, std::memory_order_relaxed); + if (counter == 1) { + self->stop_callback_.reset(); + stdexec::set_stopped(std::move(self->receiver_)); + } + }, + [](_tst_sched::test_operation_base* op) noexcept { + auto* self = static_cast<__t*>(op); + int counter = self->ref_count_.fetch_sub(1, std::memory_order_relaxed); + if (counter == 1) { + self->stop_callback_.reset(); + stdexec::set_value(std::move(self->receiver_)); + } + }} + , context_{context} + , receiver_{std::move(receiver)} + , stop_op_{ + [](_tst_sched::test_operation_base* op) noexcept { + auto* stop = static_cast<_tst_sched::test_stop_operation*>(op); + auto* self = static_cast<__t*>(stop->target_); + int counter = self->ref_count_.fetch_sub(1, std::memory_order_relaxed); + if (counter == 1) { + self->stop_callback_.reset(); + stdexec::set_stopped(std::move(self->receiver_)); + } + }, + this} { + } + + void start() & noexcept { + stop_callback_ + .emplace(stdexec::get_stop_token(stdexec::get_env(receiver_)), on_stopped_t{*this}); + int expected = 0; + if (ref_count_.compare_exchange_strong(expected, 1, std::memory_order_relaxed)) { + schedule_this(); + } else { + stop_callback_.reset(); + stdexec::set_stopped(std::move(receiver_)); + } + } + + private: + void schedule_this() noexcept { + context_.schedule(this); + } + + struct on_stopped_t { + __t& self_; + + void operator()() const noexcept { + self_.request_stop(); + } + }; + + using callback_type = typename stdexec::stop_token_of_t< + stdexec::env_of_t + >::template callback_type; + + void request_stop() noexcept { + if (ref_count_.fetch_add(1, std::memory_order_relaxed) == 1) { + context_.schedule(&stop_op_); + } + } + + test_context& context_; + Receiver receiver_; + _tst_sched::test_stop_operation stop_op_; + std::optional stop_callback_; + std::atomic ref_count_{0}; + }; + + } // namespace _tst_sched + + class test_scheduler { + public: + using time_point = test_clock::time_point; + using duration = test_clock::duration; + + class schedule_at_sender { + public: + using sender_concept = stdexec::sender_t; + using completion_signatures = + stdexec::completion_signatures; + + schedule_at_sender(test_context& context, test_clock::time_point time_point) noexcept + : context_{&context} + , time_point_{time_point} { + } + + [[nodiscard]] + auto get_env() const noexcept { + return stdexec::prop{ + stdexec::get_completion_scheduler, test_scheduler{*context_}}; + } + + template + auto connect(Receiver receiver) const & noexcept -> + typename _tst_sched::test_schedule_at_op::__t { + return {*context_, time_point_, std::move(receiver)}; + } + + private: + [[nodiscard]] + auto get_scheduler() const noexcept -> test_scheduler; + + test_context* context_; + test_clock::time_point time_point_; + }; + + explicit test_scheduler(test_context& context) noexcept + : context_{&context} { + } + + [[nodiscard]] + auto now() const noexcept -> time_point { + return context_->now(); + } + + [[nodiscard]] + auto schedule_at(time_point tp) const noexcept -> schedule_at_sender { + return schedule_at_sender{*context_, tp}; + } + + [[nodiscard]] + auto schedule() const noexcept -> schedule_at_sender { + return schedule_at(time_point()); + } + + auto operator==(const test_scheduler&) const noexcept -> bool = default; + + private: + test_context* context_; + }; + + inline auto test_context::get_scheduler() noexcept -> test_scheduler { + return test_scheduler{*this}; + } + + inline auto test_context::get_clock() const noexcept -> test_clock { + return test_clock{&this->__clock_}; + } + + inline auto test_context::now() const noexcept -> test_context::time_point { + return __clock_.now(); + } + + namespace _tst_sched { + struct __recording_receiver { + using __t = __recording_receiver; + using __id = __recording_receiver; + using receiver_concept = stdexec::receiver_t; + + test_context* __context_; + stdexec::inplace_stop_source* __stop_source_; + + void set_value() noexcept { + __stop_source_->request_stop(); + __context_->request_stop(); + } + void set_error(std::exception_ptr) noexcept { + __stop_source_->request_stop(); + __context_->request_stop(); + } + void set_stopped() noexcept { + __stop_source_->request_stop(); + __context_->request_stop(); + } + + using env_t = decltype(stdexec::__env::__join( + stdexec::prop{stdexec::get_scheduler, stdexec::__declval()}, + stdexec::prop{ + stdexec::get_stop_token, + stdexec::__declval()})); + [[nodiscard]] + auto get_env() const noexcept -> env_t { + return stdexec::__env::__join( + stdexec::prop{stdexec::get_scheduler, __context_->get_scheduler()}, + stdexec::prop{stdexec::get_stop_token, __stop_source_->get_token()}); + } + }; + + template + struct __next_receiver { + using __t = __next_receiver; + using __id = __next_receiver; + using receiver_concept = stdexec::receiver_t; + + using _Receiver = stdexec::__t<_ReceiverId>; + + _Receiver* __receiver_; + + void set_value() noexcept { + } + template + void set_error(_Error&&) noexcept { + } + void set_stopped() noexcept { + } + + [[nodiscard]] + auto get_env() const noexcept -> stdexec::env_of_t<_Receiver> { + return stdexec::get_env(*__receiver_); + } + }; + + // + // __proxy.. are a hammer to workaround a type handling bug in clang 19 + // + + template + struct __proxy_fn { + template + requires stdexec::__decays_to_derived_from<_Base, _Derived> + auto operator()(_Derived&& __derived, _Args&&... __args) const + noexcept(stdexec::__nothrow_callable< + decltype(_Fn), + stdexec::__copy_cvref_t<_Derived, _Base>, + _Args... + >) + -> stdexec::__call_result_t< + decltype(_Fn), + stdexec::__copy_cvref_t<_Derived, _Base>, + _Args... + > { + return _Fn( + static_cast&&>(__derived), + static_cast<_Args&&>(__args)...); + }; + }; + + template + struct __proxy_operation { + using _Sender = stdexec::__t<_SenderId>; + static_assert(stdexec::__callable); + using __operation_t = stdexec::connect_result_t<_Sender, _Receiver>; + using __t [[maybe_unused]] = __proxy_operation; + using __id [[maybe_unused]] = __proxy_operation; + + __operation_t __op_; + + __proxy_operation( + [[maybe_unused]] _Sender&& __sender, + [[maybe_unused]] + _Receiver&& __receiver) noexcept(stdexec::__nothrow_connectable<_Sender, _Receiver>) + : __op_{stdexec::connect( + static_cast<_Sender&&>(__sender), + static_cast<_Receiver&&>(__receiver))} { + } + + void start() noexcept { + stdexec::start(__op_); + } + }; + + template + struct __proxy_sender { + using _Sender = stdexec::__t<_SenderId>; + using __t [[maybe_unused]] = __proxy_sender; + using __id [[maybe_unused]] = __proxy_sender; + using sender_concept = typename _Sender::sender_concept; + + _Sender __sender_; + + explicit __proxy_sender(_Sender __sender) + : __sender_{__sender} { + } + + static constexpr auto get_completion_signatures = + [] _Self, class... _Env>(_Self&&, _Env&&...) noexcept( + stdexec::__nothrow_callable< + stdexec::get_completion_signatures_t, + stdexec::__copy_cvref_t<_Self, _Sender>, + _Env... + >) + -> stdexec::completion_signatures_of_t, _Env...> { + return {}; + }; + + template + using __operation_t = __proxy_operation<_SenderId, _Receiver>; + + static constexpr auto connect = + [] _Self, class _Receiver>( + _Self&& __self, + _Receiver&& __receiver) noexcept(stdexec::__nothrow_connectable<_Sender, _Receiver>) + -> __operation_t<_Receiver> { + return __operation_t<_Receiver>{ + static_cast&&>(__self.__sender_), + static_cast<_Receiver&&>(__receiver)}; + }; + }; + + // + // __test_sequence.. is a sequence-sender that produces a set of marbles as signals + // + + struct __test_sequence_operation_base { + test_context* __context_; + stdexec::inplace_stop_source __stop_source_{}; + + void request_stop() { + __stop_source_.request_stop(); + } + void stop_context() { + __context_->request_stop(); + } + }; + + template + struct __test_sequence_operation_part : __test_sequence_operation_base { + using _Receiver = stdexec::__t<_ReceiverId>; + + using __marble_t = marble_t; + using __marble_sender_t = __marble_t::__marble_sender_t; + using __time_point_t = typename test_scheduler::time_point; + using __receiver_t = __next_receiver<_ReceiverId>; + + std::vector<__marble_t> __marbles_; + _Receiver __receiver_; + __marble_t* __end_marble_{nullptr}; + std::size_t __active_ops_ = 0; + + __marble_t __requested_stop_marble_{__time_point_t{}, sequence_stopped}; + + struct __stop_callback_fn_t { + __test_sequence_operation_part* __self_; + void operator()() const noexcept { + auto& __self = *__self_; + __self.__requested_stop_marble_.set_origin_frame(__self.__context_->now()); + // cancel all pending ops + __self_->request_stop(); + if (__self.__active_ops_ == 0) { + if (!__self.__end_marble_) { + __self.__requested_stop_marble_ + .visit_sequence_receiver(static_cast<_Receiver&&>(__self.__receiver_)); + } + } + } + }; + + using __stop_callback_t = stdexec::stop_callback_for_t< + stdexec::stop_token_of_t>, + __stop_callback_fn_t + >; + + std::optional<__stop_callback_t> __on_stop_; + + __test_sequence_operation_part( + std::vector<__marble_t> __marbles, + test_context* __context, + _Receiver&& __receiver) noexcept + : __test_sequence_operation_base{__context} + , __marbles_{static_cast&&>(__marbles)} + , __receiver_{static_cast<_Receiver&&>(__receiver)} { + } + + template + static auto __schedule_at( + __test_sequence_operation_part& __self, + __marble_t& __marble, + _Completion&& __completion) noexcept; + static auto + __schedule_marble(__test_sequence_operation_part& __self, __marble_t& __marble) noexcept; + }; + + template + struct __test_sequence_operation : __test_sequence_operation_part<_ReceiverId> { + using __part_t = __test_sequence_operation_part<_ReceiverId>; + + using _Receiver = stdexec::__t<_ReceiverId>; + + using __marble_t = typename __part_t::__marble_t; + using __marble_sender_t = typename __part_t::__marble_sender_t; + using __time_point_t = typename __part_t::__time_point_t; + using __receiver_t = typename __part_t::__receiver_t; + + using __scheduled_marble_t = + stdexec::__call_result_t; + using __marble_op_t = stdexec::connect_result_t<__scheduled_marble_t, __receiver_t>; + + std::deque> __marble_ops_{}; + + __test_sequence_operation( + std::vector<__marble_t> __marbles, + test_context* __context, + _Receiver&& __receiver) noexcept + : __part_t{ + static_cast&&>(__marbles), + __context, + static_cast<_Receiver&&>(__receiver)} { + } + + void start() noexcept; + }; + + template + template + auto __test_sequence_operation_part<_ReceiverId>::__schedule_at( + __test_sequence_operation_part<_ReceiverId>& __self, + marble_t& __marble, + _Completion&& __completion) noexcept { + return stdexec::write_env( + // schedule the marble completion at the specified frame + exec::sequence( + exec::schedule_at(__self.__context_->get_scheduler(), __marble.frame()), + static_cast<_Completion&&>(__completion)), + stdexec::prop{stdexec::get_stop_token, __self.__stop_source_.get_token()}) + | stdexec::upon_error([](auto&&) noexcept { }) | stdexec::upon_stopped([]() noexcept { }) + | stdexec::then([&__self, &__marble]() noexcept { + // after each completion, update the __test_sequence_operation_part state + STDEXEC_ASSERT(__self.__active_ops_ > 0); + if ( + __marble.error_notification() || __marble.stopped_notification() + || __marble.sequence_error() || __marble.sequence_stopped() + || __marble.sequence_end()) { + // these marbles trigger the whole sequence + // to complete with no more items + if (!__self.__end_marble_) { + // set as the end marble + // this determines the signal that will be used to + // complete the sequence after all remaining active + // operations have completed + __self.__end_marble_ = &__marble; + } + // cancel all pending ops + __self.request_stop(); + } + if (--__self.__active_ops_ == 0) { + // all ops are complete, + if (!!__self.__end_marble_) { + __self.__on_stop_.reset(); + __self.__end_marble_ + ->visit_sequence_receiver(static_cast<_Receiver&&>(__self.__receiver_)); + } + // else this sequence never completes - + // this sequence must be stopped externally + } + }); + } + + template + auto __test_sequence_operation_part<_ReceiverId>::__schedule_marble( + __test_sequence_operation_part<_ReceiverId>& __self, + marble_t& __marble) noexcept { + + using __next_t = decltype(exec::set_next(__self.__receiver_, __marble.visit_sender())); + using __next_sender_t = + decltype(__schedule_at(__self, __marble, stdexec::__declval<__next_t>())); + using __end_sender_t = decltype(__schedule_at(__self, __marble, stdexec::just())); + struct __next_sender_id { + using __t [[maybe_unused]] = __next_sender_t; + }; + struct __end_sender_id { + using __t [[maybe_unused]] = __end_sender_t; + }; + + // WORKAROUND clang 19 would fail to compile the construction of the variant_sender. + // It was unable to find the matching value in the variant that would be constructed. + // __proxy_sender is a hammer to force the types to look different enough to + // distinguish which variant value to construct + using __next_sender_proxy_t = __proxy_sender<__next_sender_id>; + using __end_sender_proxy_t = __proxy_sender<__end_sender_id>; + + using __result_t = variant_sender<__next_sender_proxy_t, __end_sender_proxy_t>; + if (__marble.__notification_.has_value()) { + + auto __next = exec::set_next(__self.__receiver_, __marble.visit_sender()); + __next_sender_proxy_t __scheduled( + __schedule_at(__self, __marble, static_cast<__next_t&&>(__next))); + return __result_t{__scheduled}; + } else { + return __result_t{__end_sender_proxy_t{{__schedule_at(__self, __marble, stdexec::just())}}}; + } + } + + template + void __test_sequence_operation<_ReceiverId>::start() noexcept { + this->__on_stop_.emplace( + stdexec::get_stop_token(stdexec::get_env(this->__receiver_)), + typename __part_t::__stop_callback_fn_t(this)); + for (auto& __marble: this->__marbles_) { + __marble.set_origin_frame(this->__context_->now()); + auto& __op = __marble_ops_.emplace_back(); + __op.__emplace_from([this, &__marble]() { + return stdexec::connect( + __part_t::__schedule_marble(*this, __marble), __receiver_t{&this->__receiver_}); + }); + } + + this->__active_ops_ = this->__marble_ops_.size(); + for (auto& __op: this->__marble_ops_) { + stdexec::start(__op.value()); + } + } + + struct __test_sequence { + using __t = __test_sequence; + using __id = __test_sequence; + using sender_concept = exec::sequence_sender_t; + + using __marble_t = marble_t; + using __marble_sender_t = __marble_t::__marble_sender_t; + + test_context* __context_; + std::vector<__marble_t> __marbles_; + + template _Self, class... _Env> + static auto get_item_types(_Self&&, _Env&&...) noexcept -> item_types<__marble_sender_t> { + return {}; + } + + template _Self, class... _Env> + static auto + get_completion_signatures(_Self&&, _Env&&...) noexcept -> stdexec::completion_signatures< + stdexec::set_value_t(), + stdexec::set_error_t(std::error_code), + stdexec::set_error_t(std::exception_ptr), + stdexec::set_stopped_t() + > { + return {}; + } + + static constexpr auto subscribe = + [] _Sequence, stdexec::receiver _Receiver>( + _Sequence&& __sequence, + _Receiver __receiver) noexcept -> __test_sequence_operation> { + return { + static_cast<_Sequence&&>(__sequence).__marbles_, + static_cast<_Sequence&&>(__sequence).__context_, + static_cast<_Receiver&&>(__receiver)}; + }; + }; + } // namespace _tst_sched + + template + inline auto test_context::get_marbles_from( + _Sequence&& __sequence, + typename test_clock::duration __stop_after) noexcept -> std::vector> { + + std::vector> __recording; + stdexec::inplace_stop_source __source; + auto __clock = get_clock(); + + auto __op = stdexec::connect( + stdexec::when_all( + // record the sequence + exec::sequence( + // schedule connect and start of the sequence being recorded + // on the test scheduler + exec::schedule_at(get_scheduler(), __clock.now()), + record_marbles(&__recording, __clock, static_cast<_Sequence&&>(__sequence)) + // always complete with set_stopped to prevent the following + // scheduled request_stop from affecting the clock + , + stdexec::just_stopped()) + // this is used to stop a 'never' sequence + , + exec::schedule_at(get_scheduler(), __clock.now() + __stop_after) + | stdexec::then([&__source]() noexcept { __source.request_stop(); })), + _tst_sched::__recording_receiver{this, &__source}); + stdexec::start(__op); + + // dispatch the test context queues + run(); + + return __recording; + } + + inline auto + test_context::get_marble_sequence_from(std::vector> __marbles) noexcept + -> _tst_sched::__test_sequence { + return {this, static_cast>&&>(__marbles)}; + } + + template + inline auto test_context::get_marble_sequence_from(stdexec::__mstring<_Len> __diagram) noexcept + -> _tst_sched::__test_sequence { + return get_marble_sequence_from(get_marbles_from(__diagram)); + } +} // namespace exec diff --git a/include/exec/sequence_senders.hpp b/include/exec/sequence_senders.hpp index eec17a8af..1c0bf1350 100644 --- a/include/exec/sequence_senders.hpp +++ b/include/exec/sequence_senders.hpp @@ -150,10 +150,10 @@ namespace exec { using __sequence_sndr::set_next_t; inline constexpr set_next_t set_next; - template + template using next_sender_of_t = decltype(exec::set_next( stdexec::__declval&>(), - stdexec::__declval<_Sender>())); + stdexec::__declval<_Sequence>())); namespace __sequence_sndr { @@ -259,7 +259,7 @@ namespace exec { tag_invocable, _Env>; template - using __member_alias_t = typename __decay_t<__tfx_sequence_t<_Sequence, _Env>>::item_types; + using __member_alias_t = __decay_t<__tfx_sequence_t<_Sequence, _Env>>::item_types; template concept __with_member_alias = __mvalid<__member_alias_t, _Sequence, _Env>; @@ -494,6 +494,10 @@ namespace exec { stdexec::__mconst>::__f >; + // __sequence_completion_signatures_of_t + // makes a sender look like a sequence with itself as the only item + // + template using __sequence_completion_signatures_of_t = stdexec::__mapply< stdexec::__mtransform< @@ -548,14 +552,14 @@ namespace exec { using __subscribe_member_result_t = decltype(__declval<_Sequence>() .subscribe(__declval<_Receiver>())); - template - concept __subscribable_with_member = - __mvalid<__subscribe_member_result_t, _Sequence, _Receiver>; - template using __subscribe_static_member_result_t = decltype(STDEXEC_REMOVE_REFERENCE( _Sequence)::subscribe(__declval<_Sequence>(), __declval<_Receiver>())); + template + concept __subscribable_with_member = + __mvalid<__subscribe_member_result_t, _Sequence, _Receiver>; + template concept __subscribable_with_static_member = __mvalid<__subscribe_static_member_result_t, _Sequence, _Receiver>; @@ -571,8 +575,8 @@ namespace exec { // Instantiate __debug_sender via completion_signatures_of_t and // item_types_of_t to check that the actual completions and item_types // match the expected completions and values. - using __checked_signatures - [[maybe_unused]] = completion_signatures_of_t<_Sequence, env_of_t<_Receiver>>; + using __checked_signatures [[maybe_unused]] = + __sequence_completion_signatures_of_t<_Sequence, env_of_t<_Receiver>>; using __checked_item_types [[maybe_unused]] = item_types_of_t<_Sequence, env_of_t<_Receiver>>; } else { @@ -759,8 +763,31 @@ namespace exec { #define STDEXEC_ERROR_ENABLE_SEQUENCE_SENDER_IS_FALSE \ "\n" \ "\n" \ - "Trying to compute the sequences's item types resulted in an error. See\n" \ - "the rest of the compiler diagnostic for clues. Look for the string \"_ERROR_\".\n" + "The given type is not a sequence sender because stdexec::enable_sequence_sender\n" \ + "is false. Either:\n" \ + "\n" \ + "1. Give the type a nested `::sender_concept` type that is an alias for `stdexec::sender_t`,\n" \ + " as in:\n" \ + "\n" \ + " class MySequence\n" \ + " {\n" \ + " public:\n" \ + " using sender_concept = exec::sequence_sender_t;\n" \ + " ...\n" \ + " };\n" \ + "\n" \ + " or,\n" \ + "\n" \ + "2. Specialize the `stdexec::enable_sequence_sender` boolean trait for this type to true,\n" \ + "as follows:\n" \ + "\n" \ + " class MySequence\n" \ + " {\n" \ + " ...\n" \ + " };\n" \ + "\n" \ + " template <>\n" \ + " inline constexpr bool stdexec::enable_sequence_sender = true;\n" //////////////////////////////////////////////////////////////////////////////// #define STDEXEC_ERROR_GET_ITEM_TYPES_RETURNED_AN_ERROR \ @@ -823,8 +850,7 @@ namespace exec { __items_t, __sequence_sndr::__unrecognized_sequence_error_t<_Sequence, _Env...> >) { - static_assert( - stdexec::__mnever<__items_t>, STDEXEC_ERROR_CANNOT_COMPUTE_COMPLETION_SIGNATURES); + static_assert(stdexec::__mnever<__items_t>, STDEXEC_ERROR_GET_ITEM_TYPES_RETURNED_AN_ERROR); } else if constexpr (stdexec::__merror<__items_t>) { static_assert( !stdexec::__merror<__items_t>, STDEXEC_ERROR_GET_ITEM_TYPES_RETURNED_AN_ERROR); @@ -835,6 +861,11 @@ namespace exec { //} else { // stdexec::__diagnose_sender_concept_failure<_Sequence, _Env...>(); } +#if STDEXEC_MSVC() || STDEXEC_NVHPC() + // MSVC and NVHPC need more encouragement to print the type of the + // error. +// _Completions __what = 0; +#endif } } @@ -880,7 +911,7 @@ namespace exec { void __debug_sequence_sender(_Sequence&& __sequence, const _Env&) { if constexpr (!__is_debug_env<_Env>) { if constexpr (sequence_sender_in<_Sequence, _Env>) { - using __sigs_t = stdexec::__completion_signatures_of_t<_Sequence, __debug_env_t<_Env>>; + using __sigs_t = __sequence_completion_signatures_of_t<_Sequence, __debug_env_t<_Env>>; using __item_types_t = __sequence_sndr::__item_types_of_t<_Sequence, __debug_env_t<_Env>>; using __receiver_t = __debug_sequence_sender_receiver< stdexec::__cvref_id<_Sequence>, diff --git a/include/exec/static_thread_pool.hpp b/include/exec/static_thread_pool.hpp index b5a3872aa..667a0e7a4 100644 --- a/include/exec/static_thread_pool.hpp +++ b/include/exec/static_thread_pool.hpp @@ -278,9 +278,16 @@ namespace exec { #if STDEXEC_HAS_STD_RANGES() struct transform_iterate { - template - auto operator()(exec::iterate_t, Range&& range) -> __t> { - return {static_cast(range), pool_}; + template + using __range_of_t = + stdexec::__mapply, STDEXEC_REMOVE_REFERENCE(_Data)>; + template + auto operator()(exec::iterate_t, _Data&& data) + -> __t>> { + return { + static_cast<__range_of_t<_Data>&&>( + STDEXEC_REMOVE_REFERENCE(_Data)::template __get<1>(data)), + pool_}; } static_thread_pool_& pool_; diff --git a/test/exec/CMakeLists.txt b/test/exec/CMakeLists.txt index d5be71685..9464e71cd 100644 --- a/test/exec/CMakeLists.txt +++ b/test/exec/CMakeLists.txt @@ -49,7 +49,9 @@ set(exec_test_sources sequence/test_empty_sequence.cpp sequence/test_ignore_all_values.cpp sequence/test_iterate.cpp + sequence/test_test_scheduler.cpp sequence/test_transform_each.cpp + sequence/test_marbles.cpp sequence/test_merge.cpp sequence/test_merge_each.cpp sequence/test_merge_each_threaded.cpp diff --git a/test/exec/sequence/test_iterate.cpp b/test/exec/sequence/test_iterate.cpp index 5482b561f..103ede111 100644 --- a/test/exec/sequence/test_iterate.cpp +++ b/test/exec/sequence/test_iterate.cpp @@ -16,6 +16,7 @@ */ #include "exec/sequence/iterate.hpp" +#include "exec/sequence_senders.hpp" #include "stdexec/execution.hpp" #if STDEXEC_HAS_STD_RANGES() @@ -114,7 +115,7 @@ namespace { struct my_domain { template Sender, class _Env> auto transform_sender(Sender&& sender, _Env&&) const noexcept { - auto range = + auto [scheduler, range] = stdexec::__sexpr_apply(std::forward(sender), stdexec::__detail::__get_data{}); auto sum = std::accumulate(std::ranges::begin(range), std::ranges::end(range), 0); return stdexec::just(sum + 1); diff --git a/test/exec/sequence/test_marbles.cpp b/test/exec/sequence/test_marbles.cpp new file mode 100644 index 000000000..4fb59e025 --- /dev/null +++ b/test/exec/sequence/test_marbles.cpp @@ -0,0 +1,162 @@ +/* + * Copyright (c) 2023 Maikel Nadolski + * Copyright (c) 2023 NVIDIA Corporation + * + * Licensed under the Apache License Version 2.0 with LLVM Exceptions + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://llvm.org/LICENSE.txt + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "exec/sequence/marbles.hpp" + +#include "exec/sequence/empty_sequence.hpp" +#include "exec/sequence/merge.hpp" +#include "stdexec/__detail/__meta.hpp" +#include + +#include +#include +#include +#include +#include + +namespace { + + struct __clock_t { + using duration = std::chrono::milliseconds; + using rep = duration::rep; + using period = duration::period; + using time_point = std::chrono::time_point<__clock_t>; + [[maybe_unused]] + static const bool is_steady = true; + + time_point __now_{}; + + [[maybe_unused]] + time_point now() noexcept { + return __now_; + } + }; + + using __marble_t = exec::marble_t<__clock_t>; + using __marbles_t = std::vector<__marble_t>; + +#if STDEXEC_HAS_STD_RANGES() + + TEST_CASE("marbles - parse empty diagram", "[sequence_senders][marbles]") { + __clock_t __clock{}; + auto marbles = get_marbles_from(__clock, ""_mstr); + auto expected = __marbles_t{}; + CHECK(0 == marbles.size()); + CHECK(expected == marbles); + } + + TEST_CASE("marbles - parse never diagram", "[sequence_senders][marbles]") { + __clock_t __clock{}; + auto marbles = get_marbles_from(__clock, "--"_mstr); + auto expected = __marbles_t{}; + CHECK(0 == marbles.size()); + CHECK(expected == marbles); + } + + TEST_CASE("marbles - parse never with values diagram", "[sequence_senders][marbles]") { + __clock_t __clock{}; + auto marbles = get_marbles_from(__clock, "-a-b-"_mstr); + auto expected = __marbles_t{ + __marble_t{__clock.now() + 1ms, ex::set_value, 'a'}, + __marble_t{__clock.now() + 3ms, ex::set_value, 'b'} + }; + CHECK(2 == marbles.size()); + CHECK(expected == marbles); + } + + TEST_CASE("marbles - parse values diagram", "[sequence_senders][marbles]") { + __clock_t __clock{}; + auto marbles = get_marbles_from(__clock, "-a-b-|"_mstr); + auto expected = __marbles_t{ + __marble_t{__clock.now() + 1ms, ex::set_value, 'a'}, + __marble_t{__clock.now() + 3ms, ex::set_value, 'b'}, + __marble_t{__clock.now() + 5ms, sequence_end} + }; + CHECK(3 == marbles.size()); + CHECK(expected == marbles); + } + + TEST_CASE("marbles - parse values with skip ms diagram", "[sequence_senders][marbles]") { + __clock_t __clock{}; + auto marbles = get_marbles_from(__clock, "-a- 20ms b-|"_mstr); + auto expected = __marbles_t{ + __marble_t{__clock.now() + 1ms, ex::set_value, 'a'}, + __marble_t{__clock.now() + 23ms, ex::set_value, 'b'}, + __marble_t{__clock.now() + 25ms, sequence_end} + }; + CHECK(3 == marbles.size()); + CHECK(expected == marbles); + } + + TEST_CASE("marbles - parse values with skip s diagram", "[sequence_senders][marbles]") { + __clock_t __clock{}; + auto marbles = get_marbles_from(__clock, "-a- 2s b-|"_mstr); + auto expected = __marbles_t{ + __marble_t{__clock.now() + 1ms, ex::set_value, 'a'}, + __marble_t{__clock.now() + 2003ms, ex::set_value, 'b'}, + __marble_t{__clock.now() + 2005ms, sequence_end} + }; + CHECK(3 == marbles.size()); + CHECK(expected == marbles); + } + + TEST_CASE("marbles - parse values with skip m diagram", "[sequence_senders][marbles]") { + __clock_t __clock{}; + auto marbles = get_marbles_from(__clock, "-a- 2m b-|"_mstr); + auto expected = __marbles_t{ + __marble_t{__clock.now() + 1ms, ex::set_value, 'a'}, + __marble_t{__clock.now() + 120003ms, ex::set_value, 'b'}, + __marble_t{__clock.now() + 120005ms, sequence_end} + }; + CHECK(3 == marbles.size()); + CHECK(expected == marbles); + } + + TEST_CASE("marbles - parse values with skip first diagram", "[sequence_senders][marbles]") { + __clock_t __clock{}; + auto marbles = get_marbles_from(__clock, "20ms -a-b-|"_mstr); + auto expected = __marbles_t{ + __marble_t{__clock.now() + 21ms, ex::set_value, 'a'}, + __marble_t{__clock.now() + 23ms, ex::set_value, 'b'}, + __marble_t{__clock.now() + 25ms, sequence_end} + }; + CHECK(3 == marbles.size()); + CHECK(expected == marbles); + } + + TEST_CASE("marbles - record marbles of empty_sequence", "[sequence_senders][marbles]") { + __clock_t __clock{}; + auto actual = record_marbles(__clock, empty_sequence()); + auto expected = get_marbles_from(__clock, "=^|"_mstr); + CHECK(expected == actual); + } + + TEST_CASE("marbles - record marbles of range", "[sequence_senders][marbles]") { + __clock_t __clock{}; + auto actual = record_marbles(__clock, range('0', '3')); + auto expected = get_marbles_from(__clock, "=^(012|)"_mstr); + CHECK(expected == actual); + } + + TEST_CASE("marbles - record marbles of merged ranges", "[sequence_senders][marbles]") { + __clock_t __clock{}; + auto actual = record_marbles(__clock, merge(range('0', '2'), range('2', '4'))); + auto expected = get_marbles_from(__clock, "=^(0123|)"_mstr); + CHECK(expected == actual); + } +#endif // STDEXEC_HAS_STD_RANGES() +} // namespace diff --git a/test/exec/sequence/test_merge_each.cpp b/test/exec/sequence/test_merge_each.cpp index cffbb1eec..c19769342 100644 --- a/test/exec/sequence/test_merge_each.cpp +++ b/test/exec/sequence/test_merge_each.cpp @@ -20,27 +20,21 @@ #include "exec/sequence/merge.hpp" #include "exec/sequence/empty_sequence.hpp" #include "exec/sequence/iterate.hpp" +#include "exec/sequence/test_scheduler.hpp" #include "exec/sequence_senders.hpp" -#include "exec/variant_sender.hpp" -#include "exec/static_thread_pool.hpp" -#include "exec/timed_thread_scheduler.hpp" +#include "exec/timed_scheduler.hpp" #include "stdexec/__detail/__meta.hpp" -#include "stdexec/__detail/__read_env.hpp" +#include "stdexec/__detail/__senders_core.hpp" -#include #include #include #include +#include #include #include -#include -#include namespace { - using namespace std::chrono_literals; - using namespace exec; - namespace ex = stdexec; template concept __equivalent = __sequence_sndr::__all_contained_in<_A, _B> @@ -84,11 +78,6 @@ namespace { } }; - // a sequence adaptor that applies a function to each item - [[maybe_unused]] - static constexpr auto then_each = [](auto f) { - return exec::transform_each(ex::then(f)); - }; // a sequence adaptor that schedules each item to complete // on the specified scheduler [[maybe_unused]] @@ -111,87 +100,9 @@ namespace { }; return exec::transform_each(delay_adaptor); }; - // a sequence adaptor that applies a function to each item - // the function must produce a sequence - // all the sequences returned from the function are merged - [[maybe_unused]] - static constexpr auto flat_map = [](auto&& f) { - auto map_merge = [](auto&& sequence, auto&& f) noexcept { - return merge_each( - exec::transform_each( - static_cast(sequence), ex::then(static_cast(f)))); - }; - return stdexec::__binder_back{ - {static_cast(f)}, {}, {}}; - }; - // when_all requires a successful completion - // however stop_after_on has no successful completion - // this uses variant_sender to add a successful completion - // (the successful completion will never occur) - [[maybe_unused]] - static constexpr auto with_void = [](auto&& sender) noexcept - -> variant_sender, decltype(sender)> { - return {static_cast(sender)}; - }; - // with_stop_token_from adds get_stop_token query, that returns the - // token for the provided stop_source, to the receiver env - [[maybe_unused]] - static constexpr auto with_stop_token_from = [](auto& stop_source) noexcept { - return ex::write_env(ex::prop{ex::get_stop_token, stop_source.get_token()}); - }; - // log_start completes with the provided sequence after printing provided string - [[maybe_unused]] - auto log_start = [](auto sequence, auto message) { - return exec::sequence( - ex::read_env(ex::get_stop_token) | stdexec::then([message](auto&& token) noexcept { - UNSCOPED_INFO( - message << (token.stop_requested() ? ", stop was requested" : ", stop not requested") - << ", on thread id: " << std::this_thread::get_id()); - }), - ex::just(sequence)); - }; - // log_sequence prints the message when each value in the sequence is emitted - [[maybe_unused]] - auto log_sequence = [](auto sequence, auto message) { - return sequence | then_each([message](auto&& value) mutable noexcept { - UNSCOPED_INFO(message << ", on thread id: " << std::this_thread::get_id()); - return value; - }); - }; - // emits_stopped completes with set_stopped after printing info - [[maybe_unused]] - auto emits_stopped = []() { - return ex::just() | stdexec::let_value([]() noexcept { - UNSCOPED_INFO("emitting stopped, on thread id: " << std::this_thread::get_id()); - return ex::just_stopped(); - }); - }; - // emits_error completes with set_error(error) after printing info - [[maybe_unused]] - auto emits_error = [](auto error) { - return ex::just() | stdexec::let_value([error]() noexcept { - UNSCOPED_INFO(error.what() << ", on thread id: " << std::this_thread::get_id()); - return ex::just_error(error); - }); - }; - + #if STDEXEC_HAS_STD_RANGES() - // a sequence of numbers from itoa() - [[maybe_unused]] - static constexpr auto range = [](auto from, auto to) { - return exec::iterate(std::views::iota(from, to)); - }; - - template - struct as_sequence_t : Sender { - using sender_concept = sequence_sender_t; - using item_types = exec::item_types; - auto subscribe(auto receiver) { - return connect(set_next(receiver, *static_cast(this)), receiver); - } - }; - TEST_CASE( "merge_each - merge two sequence senders of no elements", "[sequence_senders][merge_each][empty_sequence]") { @@ -349,11 +260,136 @@ namespace { CHECK(v.has_value() == true); } + + TEST_CASE( + "merge_each - merge_each of marble sequences", + "[sequence_senders][merge_each][merge]") { + + test_context __test{}; + auto __clock = __test.get_clock(); + CHECK(test_clock::time_point{0ms} == __clock.now()); + auto __sequence0 = __test.get_marble_sequence_from(" 0--2|"_mstr); + auto __sequence1 = __test.get_marble_sequence_from(" -1-3 -4|"_mstr); + auto expected = get_marbles_from(__clock, "=^01-(23)-4|"_mstr); + auto actual = __test.get_marbles_from( + merge_each(merge(stdexec::just(__sequence0), stdexec::just(__sequence1)))); + CHECK(test_clock::time_point{6ms} == __clock.now()); + CAPTURE(__sequence0.__marbles_); + CAPTURE(__sequence1.__marbles_); + CHECK(expected == actual); + } + + TEST_CASE( + "merge_each - merge_each of marble sequences - concat", + "[sequence_senders][merge_each][iterate]") { + + test_context __test{}; + auto __clock = __test.get_clock(); + CHECK(test_clock::time_point{0ms} == __clock.now()); + auto __sequence0 = __test.get_marble_sequence_from(" 0--2|"_mstr); + auto __sequence1 = __test.get_marble_sequence_from(" -1-3-4|"_mstr); + auto expected = get_marbles_from(__clock, "=^0--2-1-3-4|"_mstr); + std::array<_tst_sched::__test_sequence, 2> __sequences{__sequence0, __sequence1}; + auto actual = __test.get_marbles_from( + merge_each(iterate(__test.get_scheduler(), std::views::all(__sequences)))); + CHECK(test_clock::time_point{10ms} == __clock.now()); + CAPTURE(__sequence0.__marbles_); + CAPTURE(__sequence1.__marbles_); + CHECK(expected == actual); + } + + TEST_CASE( + "merge_each - merge_each of marble sequences with error", + "[sequence_senders][merge_each][merge]") { + + test_context __test{}; + auto __clock = __test.get_clock(); + CHECK(test_clock::time_point{0ms} == __clock.now()); + auto __sequence0 = __test.get_marble_sequence_from(" 0--2|"_mstr); + auto __sequence1 = __test.get_marble_sequence_from(" -1-3#-4|"_mstr); + auto expected = get_marbles_from( + __clock, + // TODO FIX set_stopped issued instead of set_error + "=^01-(23)#$"_mstr); + auto actual = __test.get_marbles_from( + merge_each(merge(stdexec::just(__sequence0), stdexec::just(__sequence1)))); + CHECK(test_clock::time_point{4ms} == __clock.now()); + CAPTURE(__sequence0.__marbles_); + CAPTURE(__sequence1.__marbles_); + CHECK(expected == actual); + } + + + TEST_CASE( + "merge_each - merge_each of marble sequences with error - concat", + "[sequence_senders][merge_each][iterate]") { + + test_context __test{}; + auto __clock = __test.get_clock(); + CHECK(test_clock::time_point{0ms} == __clock.now()); + auto __sequence0 = __test.get_marble_sequence_from(" 0--2|"_mstr); + auto __sequence1 = __test.get_marble_sequence_from(" -1-3#-4|"_mstr); + auto expected = get_marbles_from( + __clock, + // TODO FIX set_stopped issued instead of set_error + "=^0--2-1-3#$"_mstr); + std::array<_tst_sched::__test_sequence, 2> __sequences{__sequence0, __sequence1}; + auto actual = __test.get_marbles_from( + merge_each(iterate(__test.get_scheduler(), std::views::all(__sequences)))); + CHECK(test_clock::time_point{8ms} == __clock.now()); + CAPTURE(__sequence0.__marbles_); + CAPTURE(__sequence1.__marbles_); + CHECK(expected == actual); + } + + TEST_CASE( + "merge_each - merge_each of marble sequences with a value stopped", + "[sequence_senders][merge_each][merge]") { + + test_context __test{}; + auto __clock = __test.get_clock(); + CHECK(test_clock::time_point{0ms} == __clock.now()); + auto __sequence0 = __test.get_marble_sequence_from(" 0--2|"_mstr); + auto __sequence1 = __test.get_marble_sequence_from(" -1-3.-4|"_mstr); + auto expected = get_marbles_from( + __clock, + // TODO FIX set_stopped issued instead of set_error + "=^01-(23).$"_mstr); + auto actual = __test.get_marbles_from( + merge_each(merge(stdexec::just(__sequence0), stdexec::just(__sequence1)))); + CHECK(test_clock::time_point{4ms} == __clock.now()); + CAPTURE(__sequence0.__marbles_); + CAPTURE(__sequence1.__marbles_); + CHECK(expected == actual); + } + + TEST_CASE( + "merge_each - merge_each of marble sequences with a value stopped - concat", + "[sequence_senders][merge_each][iterate]") { + + test_context __test{}; + auto __clock = __test.get_clock(); + CHECK(test_clock::time_point{0ms} == __clock.now()); + auto __sequence0 = __test.get_marble_sequence_from(" 0--2|"_mstr); + auto __sequence1 = __test.get_marble_sequence_from(" -1-3.-4|"_mstr); + auto expected = get_marbles_from( + __clock, + // TODO FIX set_stopped issued instead of set_error + "=^0--2-1-3.$"_mstr); + std::array<_tst_sched::__test_sequence, 2> __sequences{__sequence0, __sequence1}; + auto actual = __test.get_marbles_from( + merge_each(iterate(__test.get_scheduler(), std::views::all(__sequences)))); + CHECK(test_clock::time_point{8ms} == __clock.now()); + CAPTURE(__sequence0.__marbles_); + CAPTURE(__sequence1.__marbles_); + CHECK(expected == actual); + } + // TODO - fix problem with stopping # if 0 TEST_CASE( "merge_each - merge_each sender stops when a nested sequence fails", - "[sequence_senders][static_thread_pool][merge_each][merge][iterate]") { + "[sequence_senders][merge_each][merge][iterate]") { auto sequences = merge( log_start(range(100, 120), "range 100-120"), diff --git a/test/exec/sequence/test_merge_each_threaded.cpp b/test/exec/sequence/test_merge_each_threaded.cpp index 5624638cd..89b8dea80 100644 --- a/test/exec/sequence/test_merge_each_threaded.cpp +++ b/test/exec/sequence/test_merge_each_threaded.cpp @@ -31,6 +31,7 @@ #include #include #include +#include #include #include @@ -84,11 +85,6 @@ namespace { } }; - // a sequence adaptor that applies a function to each item - [[maybe_unused]] - static constexpr auto then_each = [](auto f) { - return exec::transform_each(ex::then(f)); - }; // a sequence adaptor that schedules each item to complete // on the specified scheduler [[maybe_unused]] @@ -109,87 +105,9 @@ namespace { }; return exec::transform_each(delay_adaptor); }; - // a sequence adaptor that applies a function to each item - // the function must produce a sequence - // all the sequences returned from the function are merged - [[maybe_unused]] - static constexpr auto flat_map = [](auto&& f) { - auto map_merge = [](auto&& sequence, auto&& f) noexcept { - return merge_each( - exec::transform_each( - static_cast(sequence), ex::then(static_cast(f)))); - }; - return stdexec::__binder_back{ - {static_cast(f)}, {}, {}}; - }; - // when_all requires a successful completion - // however stop_after_on has no successful completion - // this uses variant_sender to add a successful completion - // (the successful completion will never occur) - [[maybe_unused]] - static constexpr auto with_void = [](auto&& sender) noexcept - -> variant_sender, decltype(sender)> { - return {static_cast(sender)}; - }; - // with_stop_token_from adds get_stop_token query, that returns the - // token for the provided stop_source, to the receiver env - [[maybe_unused]] - static constexpr auto with_stop_token_from = [](auto& stop_source) noexcept { - return ex::write_env(ex::prop{ex::get_stop_token, stop_source.get_token()}); - }; - // log_start completes with the provided sequence after printing provided string - [[maybe_unused]] - auto log_start = [](auto sequence, auto message) { - return exec::sequence( - ex::read_env(ex::get_stop_token) | stdexec::then([message](auto&& token) noexcept { - UNSCOPED_INFO( - message << (token.stop_requested() ? ", stop was requested" : ", stop not requested") - << ", on thread id: " << std::this_thread::get_id()); - }), - ex::just(sequence)); - }; - // log_sequence prints the message when each value in the sequence is emitted - [[maybe_unused]] - auto log_sequence = [](auto sequence, auto message) { - return sequence | then_each([message](auto&& value) mutable noexcept { - UNSCOPED_INFO(message << ", on thread id: " << std::this_thread::get_id()); - return value; - }); - }; - // emits_stopped completes with set_stopped after printing info - [[maybe_unused]] - auto emits_stopped = []() { - return ex::just() | stdexec::let_value([]() noexcept { - UNSCOPED_INFO("emitting stopped, on thread id: " << std::this_thread::get_id()); - return ex::just_stopped(); - }); - }; - // emits_error completes with set_error(error) after printing info - [[maybe_unused]] - auto emits_error = [](auto error) { - return ex::just() | stdexec::let_value([error]() noexcept { - UNSCOPED_INFO(error.what() << ", on thread id: " << std::this_thread::get_id()); - return ex::just_error(error); - }); - }; #if STDEXEC_HAS_STD_RANGES() - // a sequence of numbers from itoa() - [[maybe_unused]] - static constexpr auto range = [](auto from, auto to) { - return exec::iterate(std::views::iota(from, to)); - }; - - template - struct as_sequence_t : Sender { - using sender_concept = sequence_sender_t; - using item_types = exec::item_types; - auto subscribe(auto receiver) { - return connect(set_next(receiver, *static_cast(this)), receiver); - } - }; - TEST_CASE( "merge_each - merge_each sender merges all items from multiple threads", "[sequence_senders][single_thread_context][merge_each][merge][iterate]") { @@ -260,7 +178,6 @@ namespace { // a sequence whose items are sequences auto sequences = merge( - ex::just(stop_after_on(sched1, 10ms)), // no items ex::just(range(100, 120)), // int items ex::just(empty_sequence()), // no items ex::just(range(200, 220)), // int items @@ -331,7 +248,6 @@ namespace { // a sequence whose items are sequences auto sequences = merge( - ex::just(stop_after_on(sched1, 10ms)), // no items ex::just(range(100, 120)), // int items ex::just(empty_sequence()), // no items ex::just(range(200, 220)), // int items diff --git a/test/exec/sequence/test_test_scheduler.cpp b/test/exec/sequence/test_test_scheduler.cpp new file mode 100644 index 000000000..3f28d0b66 --- /dev/null +++ b/test/exec/sequence/test_test_scheduler.cpp @@ -0,0 +1,254 @@ +/* + * Copyright (c) 2023 Maikel Nadolski + * Copyright (c) 2023 NVIDIA Corporation + * + * Licensed under the Apache License Version 2.0 with LLVM Exceptions + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://llvm.org/LICENSE.txt + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "exec/sequence/test_scheduler.hpp" + +#include "exec/sequence/marbles.hpp" +#include "exec/sequence/merge.hpp" +#include "exec/sequence/transform_each.hpp" +#include "exec/sequence.hpp" +#include "stdexec/__detail/__just.hpp" +#include "stdexec/__detail/__meta.hpp" +#include + +#include +#include +#include +#include +#include + +namespace { + + // a sequence adaptor that schedules each item to complete + // on the specified scheduler + [[maybe_unused]] + static constexpr auto continues_each_on = [](auto sched) { + return exec::transform_each(ex::continues_on(sched)); + }; + // a sequence adaptor that schedules each item to complete + // on the specified scheduler after the specified duration + [[maybe_unused]] + static constexpr auto delays_each_on = + [](Sched sched, duration_of_t after) noexcept { + auto delay_value = [](Value&& value, Sched sched, duration_of_t after) { + return sequence(schedule_after(sched, after), static_cast(value)); + }; + auto delay_adaptor = + stdexec::__binder_back>{ + {sched, after}, + {}, + {} + }; + return exec::transform_each(delay_adaptor); + }; + + using __marble_t = exec::marble_t; + using __marbles_t = std::vector<__marble_t>; + +#if STDEXEC_HAS_STD_RANGES() + + TEST_CASE("test_scheduler - parse empty diagram", "[sequence_senders][test_scheduler][marbles]") { + test_context __test{}; + auto __clock = __test.get_clock(); + auto marbles = get_marbles_from(__clock, ""_mstr); + auto expected = __marbles_t{}; + CHECK(marbles.size() == 0); + CHECK(marbles == expected); + } + + TEST_CASE( + "test_scheduler - record marbles via test_context", + "[sequence_senders][test_scheduler]") { + test_context __test{}; + auto __clock = __test.get_clock(); + CHECK(__clock.now() == test_clock::time_point{0ms}); + auto __scheduler = __test.get_scheduler(); + auto __sequence = __scheduler.schedule() | stdexec::then([]() noexcept { return '0'; }); + auto actual = __test.get_marbles_from(__sequence); + CHECK(__clock.now() == test_clock::time_point{0ms}); + auto expected = get_marbles_from(__clock, "=^(0|)"_mstr); + CHECK(actual == expected); + } + + TEST_CASE( + "test_scheduler - test_context schedule_after advances test_clock", + "[sequence_senders][test_scheduler]") { + test_context __test{}; + auto __clock = __test.get_clock(); + CHECK(__clock.now() == test_clock::time_point{0ms}); + auto __scheduler = __test.get_scheduler(); + auto __sequence = schedule_after(__scheduler, 2ms) + | stdexec::then([]() noexcept { return '0'; }); + auto expected = get_marbles_from(__clock, "=^--(0|)"_mstr); + auto actual = __test.get_marbles_from(__sequence); + CHECK(test_clock::time_point{2ms} == __clock.now()); + CHECK(expected == actual); + } + + TEST_CASE( + "test_scheduler - test_context marble-sequence advances test_clock", + "[sequence_senders][test_scheduler]") { + test_context __test{}; + auto __clock = __test.get_clock(); + CHECK(test_clock::time_point{0ms} == __clock.now()); + auto __sequence = __test.get_marble_sequence_from(" -a--b---c|"_mstr); + auto expected = get_marbles_from(__clock, "=^-a--b---c|"_mstr); + auto actual = __test.get_marbles_from(__sequence); + CHECK(test_clock::time_point{9ms} == __clock.now()); + CAPTURE(__sequence.__marbles_); + CHECK(expected == actual); + } + + TEST_CASE( + "test_scheduler - test_context marble-sequence never", + "[sequence_senders][test_scheduler]") { + test_context __test{}; + auto __clock = __test.get_clock(); + CHECK(test_clock::time_point{0ms} == __clock.now()); + auto __sequence = __test.get_marble_sequence_from(" -0-"_mstr); + auto expected = get_marbles_from(__clock, "=^-5 998ms $"_mstr); + auto actual = __test.get_marbles_from(__sequence | then_each([](char c) noexcept -> char { return c + 5; })); + CHECK(test_clock::time_point{1000ms} == __clock.now()); + CAPTURE(__sequence.__marbles_); + CHECK(expected == actual); + } + + TEST_CASE( + "test_scheduler - test_context marble-sequence error", + "[sequence_senders][test_scheduler]") { + test_context __test{}; + auto __clock = __test.get_clock(); + CHECK(test_clock::time_point{0ms} == __clock.now()); + auto __sequence = __test.get_marble_sequence_from(" -0--#"_mstr); + auto expected = get_marbles_from(__clock, "=^-5--#$"_mstr); + auto actual = __test.get_marbles_from(__sequence | then_each([](char c) noexcept -> char { return c + 5; })); + CHECK(test_clock::time_point{4ms} == __clock.now()); + CAPTURE(__sequence.__marbles_); + CHECK(expected == actual); + } + + TEST_CASE( + "test_scheduler - test_context marble-sequence error in middle", + "[sequence_senders][test_scheduler]") { + test_context __test{}; + auto __clock = __test.get_clock(); + CHECK(test_clock::time_point{0ms} == __clock.now()); + auto __sequence = __test.get_marble_sequence_from(" -0--#--1|"_mstr); + auto expected = get_marbles_from(__clock, "=^-5--#$"_mstr); + auto actual = __test.get_marbles_from(__sequence | then_each([](char c) noexcept -> char { return c + 5; })); + CHECK(test_clock::time_point{4ms} == __clock.now()); + CAPTURE(__sequence.__marbles_); + CHECK(expected == actual); + } + + TEST_CASE( + "test_scheduler - test_context marble-sequence stopped", + "[sequence_senders][test_scheduler]") { + test_context __test{}; + auto __clock = __test.get_clock(); + CHECK(test_clock::time_point{0ms} == __clock.now()); + auto __sequence = __test.get_marble_sequence_from(" -0--."_mstr); + auto expected = get_marbles_from(__clock, "=^-5--.$"_mstr); + auto actual = __test.get_marbles_from(__sequence | then_each([](char c) noexcept -> char { return c + 5; })); + CHECK(test_clock::time_point{4ms} == __clock.now()); + CAPTURE(__sequence.__marbles_); + CHECK(expected == actual); + } + + TEST_CASE( + "test_scheduler - test_context marble-sequence stopped in middle", + "[sequence_senders][test_scheduler]") { + test_context __test{}; + auto __clock = __test.get_clock(); + CHECK(test_clock::time_point{0ms} == __clock.now()); + auto __sequence = __test.get_marble_sequence_from(" -0--.--1|"_mstr); + auto expected = get_marbles_from(__clock, "=^-5--.$"_mstr); + auto actual = __test.get_marbles_from(__sequence | then_each([](char c) noexcept -> char { return c + 5; })); + CHECK(test_clock::time_point{4ms} == __clock.now()); + CAPTURE(__sequence.__marbles_); + CHECK(expected == actual); + } + + TEST_CASE( + "test_scheduler - test_context marble-sequence transform", + "[sequence_senders][test_scheduler]") { + test_context __test{}; + auto __clock = __test.get_clock(); + CHECK(test_clock::time_point{0ms} == __clock.now()); + auto __sequence = __test.get_marble_sequence_from(" -0--1---2|"_mstr); + auto expected = get_marbles_from(__clock, "=^-5--6---7|"_mstr); + auto actual = __test.get_marbles_from(__sequence | then_each([](char c) noexcept -> char { return c + 5; })); + CHECK(test_clock::time_point{9ms} == __clock.now()); + CAPTURE(__sequence.__marbles_); + CHECK(expected == actual); + } + + TEST_CASE( + "test_scheduler - test_context marble-sequence simple shift", + "[sequence_senders][test_scheduler]") { + test_context __test{}; + auto __clock = __test.get_clock(); + CHECK(test_clock::time_point{0ms} == __clock.now()); + auto __sequence = __test.get_marble_sequence_from(" 012--|"_mstr); + auto expected = get_marbles_from(__clock, "=^--012|"_mstr); + auto actual = __test.get_marbles_from(__sequence | delays_each_on(__test.get_scheduler(), 2ms)); + CHECK(test_clock::time_point{5ms} == __clock.now()); + CAPTURE(__sequence.__marbles_); + CHECK(expected == actual); + } + + TEST_CASE( + "test_scheduler - test_context multi-second marble-sequence shift", + "[sequence_senders][test_scheduler]") { + auto __real_time_now = std::chrono::steady_clock::now(); + test_context __test{}; + auto __clock = __test.get_clock(); + CHECK(test_clock::time_point{0ms} == __clock.now()); + + auto __sequence = __test.get_marble_sequence_from(" 5s 0 5s 1 5s 2 100ms |"_mstr); + auto expected = get_marbles_from(__clock, "=^ 5s 100ms 0 5s 1 5s 2 |"_mstr); + + auto actual = + __test.get_marbles_from(__sequence | delays_each_on(__test.get_scheduler(), 100ms), 16s); + + CHECK(test_clock::time_point{5s + 5s + 5s + 100ms + 3ms} == __clock.now()); + + auto __real_time_elapsed = std::chrono::duration_cast( + std::chrono::steady_clock::now() - __real_time_now); + CAPTURE(__sequence.__marbles_); + CAPTURE(__real_time_elapsed); + CHECK(expected == actual); + } + + TEST_CASE( + "test_scheduler - test_context marble-sequence merge", + "[sequence_senders][test_scheduler][merge]") { + test_context __test{}; + auto __clock = __test.get_clock(); + CHECK(test_clock::time_point{0ms} == __clock.now()); + auto __sequence0 = __test.get_marble_sequence_from(" 0--2|"_mstr); + auto __sequence1 = __test.get_marble_sequence_from(" -1-3 -4|"_mstr); + auto expected = get_marbles_from(__clock, "=^01-(23)-4|"_mstr); + auto actual = __test.get_marbles_from(merge(__sequence0, __sequence1)); + CHECK(test_clock::time_point{6ms} == __clock.now()); + CAPTURE(__sequence0.__marbles_); + CAPTURE(__sequence1.__marbles_); + CHECK(expected == actual); + } + +#endif // STDEXEC_HAS_STD_RANGES() +} // namespace diff --git a/test/test_common/sequences.hpp b/test/test_common/sequences.hpp new file mode 100644 index 000000000..6099c3649 --- /dev/null +++ b/test/test_common/sequences.hpp @@ -0,0 +1,175 @@ +/* + * Copyright (c) 2023 Maikel Nadolski + * Copyright (c) 2023 NVIDIA Corporation + * + * Licensed under the Apache License Version 2.0 with LLVM Exceptions + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://llvm.org/LICENSE.txt + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include "exec/sequence/iterate.hpp" +#include "exec/sequence/transform_each.hpp" +#include "exec/variant_sender.hpp" +#include "stdexec/__detail/__then.hpp" + +#include + +namespace Catch { + template + struct StringMaker> { + static std::string convert(const std::chrono::time_point<_Clock, _Duration>& __at) { + return std::to_string( + std::chrono::duration_cast(__at.time_since_epoch()) + .count()) + + "ms"; + } + }; + + template + struct StringMaker> { + static std::string convert(const std::chrono::duration<_Rep, _Period>& __duration) { + return std::to_string( + std::chrono::duration_cast(__duration).count()) + + "ms"; + } + }; + + template + struct StringMaker> { + static std::string convert(const exec::marble_t<_Clock>& __value) { + return to_string(__value); + } + }; + + template + struct StringMaker> { + static std::string convert(const exec::notification_t<_CompletionSignatures>& __value) { + return to_string(__value); + } + }; + + template _Tag> + struct StringMaker<_Tag> { + static std::string convert(const _Tag& __tag) { + return to_string(__tag); + } + }; +} // namespace Catch + +namespace { + using namespace exec; + namespace ex = stdexec; + using ex::operator""_mstr; + using namespace std::chrono_literals; + + template + struct as_sequence_t : Sender { + using sender_concept = sequence_sender_t; + template _Self, class... _Env> + static auto get_item_types(_Self&&, _Env&&...) noexcept -> exec::__item_types_of_t { + return {}; + } + auto subscribe(auto receiver) { + return connect(set_next(receiver, *static_cast(this)), receiver); + } + }; + + // a sequence adaptor that applies a function to each item + [[maybe_unused]] + static constexpr auto then_each = [](auto f) { + return exec::transform_each(stdexec::then(f)); + }; + // a sequence adaptor that applies a function to each item + // the function must produce a sequence + // all the sequences returned from the function are merged + [[maybe_unused]] + static constexpr auto flat_map = [](auto&& f) { + auto map_merge = [](auto&& sequence, auto&& f) noexcept { + return merge_each( + exec::transform_each( + static_cast(sequence), ex::then(static_cast(f)))); + }; + return stdexec::__binder_back{ + {static_cast(f)}, {}, {}}; + }; + // when_all requires a successful completion + // however stop_after_on has no successful completion + // this uses variant_sender to add a successful completion + // (the successful completion will never occur) + [[maybe_unused]] + static constexpr auto with_void = [](auto&& sender) noexcept + -> variant_sender, decltype(sender)> { + return {static_cast(sender)}; + }; + // with_stop_token_from adds get_stop_token query, that returns the + // token for the provided stop_source, to the receiver env + [[maybe_unused]] + static constexpr auto with_stop_token_from = [](auto& stop_source) noexcept { + return ex::write_env(ex::prop{ex::get_stop_token, stop_source.get_token()}); + }; + // log_start completes with the provided sequence after printing provided string + [[maybe_unused]] + static constexpr auto log_start = [](auto sequence, auto message) { + return exec::sequence( + ex::read_env(ex::get_stop_token) | stdexec::then([message](auto&& token) noexcept { + UNSCOPED_INFO( + message << (token.stop_requested() ? ", stop was requested" : ", stop not requested") + << ", on thread id: " << std::this_thread::get_id()); + }), + ex::just(sequence)); + }; + // log_sequence prints the message when each value in the sequence is emitted + [[maybe_unused]] + static constexpr auto log_sequence = [](auto sequence, auto message) { + return sequence | then_each([message](auto&& value) mutable noexcept { + UNSCOPED_INFO(message << ", on thread id: " << std::this_thread::get_id()); + return value; + }); + }; + // emits_stopped completes with set_stopped after printing info + [[maybe_unused]] + static constexpr auto emits_stopped = []() { + return ex::just() | stdexec::let_value([]() noexcept { + UNSCOPED_INFO("emitting stopped, on thread id: " << std::this_thread::get_id()); + return ex::just_stopped(); + }); + }; + // emits_error completes with set_error(error) after printing info + [[maybe_unused]] + static constexpr auto emits_error = [](auto error) { + return ex::just() | stdexec::let_value([error]() noexcept { + UNSCOPED_INFO(error.what() << ", on thread id: " << std::this_thread::get_id()); + return ex::just_error(error); + }); + }; + +#if STDEXEC_HAS_STD_RANGES() + + // a sequence of numbers from itoa() + [[maybe_unused]] + static constexpr auto range = [](auto from, auto to) { + return exec::iterate(std::views::iota(from, to)); + }; + +#endif // STDEXEC_HAS_STD_RANGES() + +} // namespace