diff --git a/include/beman/execution26/detail/bounded_queue.hpp b/include/beman/execution26/detail/bounded_queue.hpp index 92de8653..f23fb158 100644 --- a/include/beman/execution26/detail/bounded_queue.hpp +++ b/include/beman/execution26/detail/bounded_queue.hpp @@ -67,8 +67,7 @@ class beman::execution26::bounded_queue : ::beman::execution26::detail::immovabl using sender_concept = ::beman::execution26::sender_t; using completion_signatures = ::beman::execution26::completion_signatures<::beman::execution26::set_value_t(), - ::beman::execution26::set_error_t( - ::beman::execution26::conqueue_errc), + ::beman::execution26::set_error_t(::beman::execution26::conqueue_errc), ::beman::execution26::set_stopped_t()>; bounded_queue& queue; @@ -82,6 +81,50 @@ class beman::execution26::bounded_queue : ::beman::execution26::detail::immovabl static_assert(::beman::execution26::sender); static_assert(::beman::execution26::sender_in); + struct pop_sender { + struct state_base { + bounded_queue& queue; + state_base* next{}; + + state_base(bounded_queue& queue) : queue(queue) {} + virtual auto complete(T) -> void = 0; + virtual auto complete(::beman::execution26::conqueue_errc) -> void = 0; + }; + template + struct state : state_base { + using operation_state_concept = ::beman::execution26::operation_state_t; + + ::std::remove_cvref_t receiver; + + template + state(bounded_queue& queue, R&& receiver) + : state_base(queue), receiver(::std::forward(receiver)) {} + + auto start() & noexcept { + this->queue.start_pop(*this); + } + auto complete(T val) -> void override { ::beman::execution26::set_value(::std::move(this->receiver), ::std::move(val)); } + auto complete(::beman::execution26::conqueue_errc error) -> void override { + ::beman::execution26::set_error(::std::move(this->receiver), error); + } + }; + + using sender_concept = ::beman::execution26::sender_t; + using completion_signatures = + ::beman::execution26::completion_signatures<::beman::execution26::set_value_t(T), + ::beman::execution26::set_error_t(::beman::execution26::conqueue_errc), + ::beman::execution26::set_stopped_t()>; + + bounded_queue& queue; + template <::beman::execution26::receiver Receiver> + auto connect(Receiver&& receiver) && -> state { + static_assert(::beman::execution26::operation_state>); + return state(queue, ::std::forward(receiver)); + } + }; + static_assert(::beman::execution26::sender); + static_assert(::beman::execution26::sender_in); + static_assert(::std::same_as); explicit bounded_queue(::std::size_t max, Allocator allocator = {}) @@ -101,13 +144,20 @@ class beman::execution26::bounded_queue : ::beman::execution26::detail::immovabl return this->closed; } auto close() noexcept -> void { - std::lock_guard cerberus(this->mutex); + std::unique_lock cerberus(this->mutex); this->closed = true; - while (not this->push_queue.empty()) { - this->push_queue.pop()->complete(::beman::execution26::conqueue_errc::closed); - } + push_sender_queue push_queue(std::move(this->push_queue)); + pop_sender_queue pop_queue(std::move(this->pop_queue)); + cerberus.unlock(); this->push_condition.notify_all(); this->pop_condition.notify_all(); + + while (not push_queue.empty()) { + push_queue.pop()->complete(::beman::execution26::conqueue_errc::closed); + } + while (not pop_queue.empty()) { + pop_queue.pop()->complete(::beman::execution26::conqueue_errc::closed); + } } auto push(const T& value) -> void { this->internal_push(value); } @@ -163,7 +213,7 @@ class beman::execution26::bounded_queue : ::beman::execution26::detail::immovabl this->push_notify(cerberus); return rc; } - auto async_pop() -> sender auto { return ::beman::execution26::just(T()); } + auto async_pop() -> sender auto { return pop_sender(*this); } private: using allocator_traits = ::std::allocator_traits; @@ -179,6 +229,7 @@ class beman::execution26::bounded_queue : ::beman::execution26::detail::immovabl using array_allocator_type = allocator_traits::template rebind_alloc; using array_allocator_traits = allocator_traits::template rebind_traits; using push_sender_queue = ::beman::execution26::detail::intrusive_queue<&push_sender::state_base::next>; + using pop_sender_queue = ::beman::execution26::detail::intrusive_queue<&pop_sender::state_base::next>; template auto construct(element_t* element, Args&&... args) { @@ -237,16 +288,33 @@ class beman::execution26::bounded_queue : ::beman::execution26::detail::immovabl return true; } auto pop_notify(auto& cerberus) { - // if (not this->pop_queue.empty()) - // this->pop_queue.pop()->complete(); - // else - this->pop_condition.notify_one(); + if (not this->pop_queue.empty()) + { + element_t* element{this->get(this->tail)}; + ::std::remove_cvref_t val{std::move(element->value)}; + this->destroy(element); + ++this->tail; + auto state{this->pop_queue.pop()}; + this->push_notify(cerberus); + state->complete(std::move(val)); + } + else + { + cerberus.unlock(); + this->pop_condition.notify_one(); + } } auto push_notify(auto& cerberus) { if (not this->push_queue.empty()) + { + cerberus.unlock(); this->push_queue.pop()->complete(); + } else + { + cerberus.unlock(); this->push_condition.notify_one(); + } } template auto internal_async_push(TT&& value) -> sender auto { @@ -264,17 +332,33 @@ class beman::execution26::bounded_queue : ::beman::execution26::detail::immovabl return false; } } + auto start_pop(pop_sender::state_base& s) -> bool { + std::unique_lock cerberus(this->mutex); + if (this->head != this->tail) { + element_t* element(this->get(tail)); + std::remove_cvref_t val(std::move(element->value)); + ++this->tail; + this->destroy(element); + this->push_notify(cerberus); + s.complete(std::move(val)); + return true; + } else { + this->pop_queue.push(&s); + return false; + } + } Allocator allocator; array_allocator_type array_allocator; mutable ::std::mutex mutex; + push_sender_queue push_queue; ::std::condition_variable push_condition; + pop_sender_queue pop_queue; ::std::condition_variable pop_condition; ::std::size_t max; element_t* elements; ::std::uint64_t head{}; // the next element to push to ::std::uint64_t tail{}; // the next element to push from - push_sender_queue push_queue; bool closed{}; }; diff --git a/include/beman/execution26/detail/intrusive_queue.hpp b/include/beman/execution26/detail/intrusive_queue.hpp index 11ff226e..b7839840 100644 --- a/include/beman/execution26/detail/intrusive_queue.hpp +++ b/include/beman/execution26/detail/intrusive_queue.hpp @@ -17,6 +17,12 @@ struct intrusive_queue { T* head{}; T* tail{}; + intrusive_queue() = default; + intrusive_queue(intrusive_queue&& other) + : head(::std::exchange(other.head, nullptr)) + , tail(::std::exchange(other.tail, nullptr)) + { + } auto push(T* n) -> void { if (this->head) { std::exchange(this->tail, n)->*next = n; diff --git a/tests/beman/execution26/bounded-queue.test.cpp b/tests/beman/execution26/bounded-queue.test.cpp index a122b193..5cfbc686 100644 --- a/tests/beman/execution26/bounded-queue.test.cpp +++ b/tests/beman/execution26/bounded-queue.test.cpp @@ -6,6 +6,7 @@ #include #include #include +#include //-dk:TODO remove // ---------------------------------------------------------------------------- @@ -199,6 +200,79 @@ auto test_async_push(auto one, auto two, auto three, auto four, auto five) -> vo ASSERT(c5 == 2); } +template +auto test_async_pop(auto one, auto two, auto three, auto four, auto five) -> void { + struct receiver { + using receiver_concept = test_std::receiver_t; + int& complete; + std::vector& vals; + auto set_value(T val) && noexcept -> void { this->complete = 1; vals.push_back(val); } + auto set_error(test_std::conqueue_errc) && noexcept -> void { this->complete = 2; } + auto set_stopped() && noexcept -> void { this->complete = 3; } + }; + static_assert(test_std::receiver); + + test_std::bounded_queue queue(2); + std::vector vals; + queue.push(one); + queue.push(two); + + auto s4{queue.async_pop()}; + auto s2{queue.async_pop()}; + auto s1{queue.async_pop()}; + auto s3{queue.async_pop()}; + auto s5{queue.async_pop()}; + + int c1{}, c2{}, c3{}, c4{}, c5{}; + + auto op4{test_std::connect(std::move(s4), receiver{c4, vals})}; + ASSERT(c4 == 0); + auto op2{test_std::connect(std::move(s2), receiver{c2, vals})}; + ASSERT(c2 == 0); + auto op1{test_std::connect(std::move(s1), receiver{c1, vals})}; + ASSERT(c1 == 0); + auto op3{test_std::connect(std::move(s3), receiver{c3, vals})}; + ASSERT(c3 == 0); + auto op5{test_std::connect(std::move(s5), receiver{c5, vals})}; + ASSERT(c5 == 0); + + test_std::start(op1); + ASSERT(c1 == 1); + ASSERT(vals.size() == 1u); + ASSERT(vals.back() == one); + + test_std::start(op2); + ASSERT(c2 == 1); + ASSERT(vals.size() == 2u); + ASSERT(vals.back() == two); + + test_std::start(op3); + ASSERT(c3 == 0); + ASSERT(vals.size() == 2u); + + test_std::start(op4); + ASSERT(c4 == 0); + ASSERT(vals.size() == 2u); + + test_std::start(op5); + ASSERT(c5 == 0); + ASSERT(vals.size() == 2u); + + queue.push(three); + ASSERT(c3 == 1); + ASSERT(vals.size() == 3u); + ASSERT(vals.back() == three); + + queue.push(four); + ASSERT(c4 == 1); + ASSERT(vals.size() == 4u); + ASSERT(vals.back() == four); + + queue.close(); + ASSERT(c5 == 2); + ASSERT(vals.size() == 4u); +} + } // namespace TEST(bounded_queue) { @@ -220,4 +294,6 @@ TEST(bounded_queue) { test_async_push(1, 2, 3, 4, 5); test_async_push("one"s, "two"s, "three"s, "four"s, "five"s); + test_async_pop(1, 2, 3, 4, 5); + test_async_pop("one"s, "two"s, "three"s, "four"s, "five"s); } \ No newline at end of file