Skip to content

Commit

Permalink
added async_pop
Browse files Browse the repository at this point in the history
  • Loading branch information
dietmarkuehl committed Dec 2, 2024
1 parent 7fa110f commit 3bc6553
Show file tree
Hide file tree
Showing 3 changed files with 178 additions and 12 deletions.
108 changes: 96 additions & 12 deletions include/beman/execution26/detail/bounded_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ class beman::execution26::bounded_queue : ::beman::execution26::detail::immovabl
using sender_concept = ::beman::execution26::sender_t;
using completion_signatures =
::beman::execution26::completion_signatures<::beman::execution26::set_value_t(),
::beman::execution26::set_error_t(
::beman::execution26::conqueue_errc),
::beman::execution26::set_error_t(::beman::execution26::conqueue_errc),
::beman::execution26::set_stopped_t()>;

bounded_queue& queue;
Expand All @@ -82,6 +81,50 @@ class beman::execution26::bounded_queue : ::beman::execution26::detail::immovabl
static_assert(::beman::execution26::sender<push_sender>);
static_assert(::beman::execution26::sender_in<push_sender>);

struct pop_sender {
struct state_base {
bounded_queue& queue;
state_base* next{};

state_base(bounded_queue& queue) : queue(queue) {}
virtual auto complete(T) -> void = 0;
virtual auto complete(::beman::execution26::conqueue_errc) -> void = 0;
};
template <typename Receiver>
struct state : state_base {
using operation_state_concept = ::beman::execution26::operation_state_t;

::std::remove_cvref_t<Receiver> receiver;

template <typename R>
state(bounded_queue& queue, R&& receiver)
: state_base(queue), receiver(::std::forward<R>(receiver)) {}

auto start() & noexcept {
this->queue.start_pop(*this);
}
auto complete(T val) -> void override { ::beman::execution26::set_value(::std::move(this->receiver), ::std::move(val)); }
auto complete(::beman::execution26::conqueue_errc error) -> void override {
::beman::execution26::set_error(::std::move(this->receiver), error);
}
};

using sender_concept = ::beman::execution26::sender_t;
using completion_signatures =
::beman::execution26::completion_signatures<::beman::execution26::set_value_t(T),
::beman::execution26::set_error_t(::beman::execution26::conqueue_errc),
::beman::execution26::set_stopped_t()>;

bounded_queue& queue;
template <::beman::execution26::receiver Receiver>
auto connect(Receiver&& receiver) && -> state<Receiver> {
static_assert(::beman::execution26::operation_state<state<Receiver>>);
return state<Receiver>(queue, ::std::forward<Receiver>(receiver));
}
};
static_assert(::beman::execution26::sender<pop_sender>);
static_assert(::beman::execution26::sender_in<pop_sender>);

static_assert(::std::same_as<value_type, typename Allocator::value_type>);

explicit bounded_queue(::std::size_t max, Allocator allocator = {})
Expand All @@ -101,13 +144,20 @@ class beman::execution26::bounded_queue : ::beman::execution26::detail::immovabl
return this->closed;
}
auto close() noexcept -> void {
std::lock_guard cerberus(this->mutex);
std::unique_lock cerberus(this->mutex);
this->closed = true;
while (not this->push_queue.empty()) {
this->push_queue.pop()->complete(::beman::execution26::conqueue_errc::closed);
}
push_sender_queue push_queue(std::move(this->push_queue));
pop_sender_queue pop_queue(std::move(this->pop_queue));
cerberus.unlock();
this->push_condition.notify_all();
this->pop_condition.notify_all();

while (not push_queue.empty()) {
push_queue.pop()->complete(::beman::execution26::conqueue_errc::closed);
}
while (not pop_queue.empty()) {
pop_queue.pop()->complete(::beman::execution26::conqueue_errc::closed);
}
}

auto push(const T& value) -> void { this->internal_push(value); }
Expand Down Expand Up @@ -163,7 +213,7 @@ class beman::execution26::bounded_queue : ::beman::execution26::detail::immovabl
this->push_notify(cerberus);
return rc;
}
auto async_pop() -> sender auto { return ::beman::execution26::just(T()); }
auto async_pop() -> sender auto { return pop_sender(*this); }

private:
using allocator_traits = ::std::allocator_traits<Allocator>;
Expand All @@ -179,6 +229,7 @@ class beman::execution26::bounded_queue : ::beman::execution26::detail::immovabl
using array_allocator_type = allocator_traits::template rebind_alloc<element_t>;
using array_allocator_traits = allocator_traits::template rebind_traits<element_t>;
using push_sender_queue = ::beman::execution26::detail::intrusive_queue<&push_sender::state_base::next>;
using pop_sender_queue = ::beman::execution26::detail::intrusive_queue<&pop_sender::state_base::next>;

template <typename... Args>
auto construct(element_t* element, Args&&... args) {
Expand Down Expand Up @@ -237,16 +288,33 @@ class beman::execution26::bounded_queue : ::beman::execution26::detail::immovabl
return true;
}
auto pop_notify(auto& cerberus) {
// if (not this->pop_queue.empty())
// this->pop_queue.pop()->complete();
// else
this->pop_condition.notify_one();
if (not this->pop_queue.empty())
{
element_t* element{this->get(this->tail)};
::std::remove_cvref_t<T> val{std::move(element->value)};
this->destroy(element);
++this->tail;
auto state{this->pop_queue.pop()};
this->push_notify(cerberus);
state->complete(std::move(val));
}
else
{
cerberus.unlock();
this->pop_condition.notify_one();
}
}
auto push_notify(auto& cerberus) {
if (not this->push_queue.empty())
{
cerberus.unlock();
this->push_queue.pop()->complete();
}
else
{
cerberus.unlock();
this->push_condition.notify_one();
}
}
template <typename TT>
auto internal_async_push(TT&& value) -> sender auto {
Expand All @@ -264,17 +332,33 @@ class beman::execution26::bounded_queue : ::beman::execution26::detail::immovabl
return false;
}
}
auto start_pop(pop_sender::state_base& s) -> bool {
std::unique_lock cerberus(this->mutex);
if (this->head != this->tail) {
element_t* element(this->get(tail));
std::remove_cvref_t<T> val(std::move(element->value));
++this->tail;
this->destroy(element);
this->push_notify(cerberus);
s.complete(std::move(val));
return true;
} else {
this->pop_queue.push(&s);
return false;
}
}

Allocator allocator;
array_allocator_type array_allocator;
mutable ::std::mutex mutex;
push_sender_queue push_queue;
::std::condition_variable push_condition;
pop_sender_queue pop_queue;
::std::condition_variable pop_condition;
::std::size_t max;
element_t* elements;
::std::uint64_t head{}; // the next element to push to
::std::uint64_t tail{}; // the next element to push from
push_sender_queue push_queue;
bool closed{};
};

Expand Down
6 changes: 6 additions & 0 deletions include/beman/execution26/detail/intrusive_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ struct intrusive_queue<next> {
T* head{};
T* tail{};

intrusive_queue() = default;
intrusive_queue(intrusive_queue&& other)
: head(::std::exchange(other.head, nullptr))
, tail(::std::exchange(other.tail, nullptr))
{
}
auto push(T* n) -> void {
if (this->head) {
std::exchange(this->tail, n)->*next = n;
Expand Down
76 changes: 76 additions & 0 deletions tests/beman/execution26/bounded-queue.test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <concepts>
#include <string>
#include <thread>
#include <iostream> //-dk:TODO remove

// ----------------------------------------------------------------------------

Expand Down Expand Up @@ -199,6 +200,79 @@ auto test_async_push(auto one, auto two, auto three, auto four, auto five) -> vo
ASSERT(c5 == 2);
}

template <typename T>
auto test_async_pop(auto one, auto two, auto three, auto four, auto five) -> void {
struct receiver {
using receiver_concept = test_std::receiver_t;
int& complete;
std::vector<T>& vals;
auto set_value(T val) && noexcept -> void { this->complete = 1; vals.push_back(val); }
auto set_error(test_std::conqueue_errc) && noexcept -> void { this->complete = 2; }
auto set_stopped() && noexcept -> void { this->complete = 3; }
};
static_assert(test_std::receiver<receiver>);

test_std::bounded_queue<T> queue(2);
std::vector<T> vals;
queue.push(one);
queue.push(two);

auto s4{queue.async_pop()};
auto s2{queue.async_pop()};
auto s1{queue.async_pop()};
auto s3{queue.async_pop()};
auto s5{queue.async_pop()};

int c1{}, c2{}, c3{}, c4{}, c5{};

auto op4{test_std::connect(std::move(s4), receiver{c4, vals})};
ASSERT(c4 == 0);
auto op2{test_std::connect(std::move(s2), receiver{c2, vals})};
ASSERT(c2 == 0);
auto op1{test_std::connect(std::move(s1), receiver{c1, vals})};
ASSERT(c1 == 0);
auto op3{test_std::connect(std::move(s3), receiver{c3, vals})};
ASSERT(c3 == 0);
auto op5{test_std::connect(std::move(s5), receiver{c5, vals})};
ASSERT(c5 == 0);

test_std::start(op1);
ASSERT(c1 == 1);
ASSERT(vals.size() == 1u);
ASSERT(vals.back() == one);

test_std::start(op2);
ASSERT(c2 == 1);
ASSERT(vals.size() == 2u);
ASSERT(vals.back() == two);

test_std::start(op3);
ASSERT(c3 == 0);
ASSERT(vals.size() == 2u);

test_std::start(op4);
ASSERT(c4 == 0);
ASSERT(vals.size() == 2u);

test_std::start(op5);
ASSERT(c5 == 0);
ASSERT(vals.size() == 2u);

queue.push(three);
ASSERT(c3 == 1);
ASSERT(vals.size() == 3u);
ASSERT(vals.back() == three);

queue.push(four);
ASSERT(c4 == 1);
ASSERT(vals.size() == 4u);
ASSERT(vals.back() == four);

queue.close();
ASSERT(c5 == 2);
ASSERT(vals.size() == 4u);
}

} // namespace

TEST(bounded_queue) {
Expand All @@ -220,4 +294,6 @@ TEST(bounded_queue) {

test_async_push<int>(1, 2, 3, 4, 5);
test_async_push<std::string>("one"s, "two"s, "three"s, "four"s, "five"s);
test_async_pop<int>(1, 2, 3, 4, 5);
test_async_pop<std::string>("one"s, "two"s, "three"s, "four"s, "five"s);
}

0 comments on commit 3bc6553

Please sign in to comment.