Skip to content

Commit

Permalink
added async_push functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
dietmarkuehl committed Dec 2, 2024
1 parent 42e952a commit 7fa110f
Show file tree
Hide file tree
Showing 7 changed files with 241 additions and 41 deletions.
142 changes: 108 additions & 34 deletions include/beman/execution26/detail/bounded_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@
#define INCLUDED_BEMAN_EXECUTION26_DETAIL_BOUNDED_QUEUE

#include <beman/execution26/detail/immovable.hpp>
#include <beman/execution26/detail/intrusive_queue.hpp>
#include <beman/execution26/detail/conqueue_errc.hpp>
#include <beman/execution26/detail/conqueue_error.hpp>
#include <beman/execution26/detail/sender.hpp>
#include <beman/execution26/detail/just.hpp> //-dk:TODO remove
#include <beman/execution26/execution.hpp>
#include <concepts>
#include <memory>
#include <mutex>
#include <optional>
#include <condition_variable>
#include <type_traits>
#include <utility>
#include <cstddef>

Expand All @@ -32,6 +33,55 @@ class beman::execution26::bounded_queue : ::beman::execution26::detail::immovabl
using value_type = T;
using allocator_type = Allocator;

struct push_sender {
struct state_base {
::std::remove_cvref_t<T> value;
bounded_queue& queue;
state_base* next{};

template <typename V>
state_base(V&& value, bounded_queue& queue) : value(::std::forward<V>(value)), queue(queue) {}
virtual auto complete() -> void = 0;
virtual auto complete(::beman::execution26::conqueue_errc) -> void = 0;
};
template <typename Receiver>
struct state : state_base {
using operation_state_concept = ::beman::execution26::operation_state_t;

::std::remove_cvref_t<Receiver> receiver;

template <typename R>
state(bounded_queue& queue, T&& value, R&& receiver)
: state_base(::std::move(value), queue), receiver(::std::forward<R>(receiver)) {}

auto start() & noexcept {
if (this->queue.start_push(*this))
this->complete();
}
auto complete() -> void override { ::beman::execution26::set_value(::std::move(this->receiver)); }
auto complete(::beman::execution26::conqueue_errc error) -> void override {
::beman::execution26::set_error(::std::move(this->receiver), error);
}
};

using sender_concept = ::beman::execution26::sender_t;
using completion_signatures =
::beman::execution26::completion_signatures<::beman::execution26::set_value_t(),
::beman::execution26::set_error_t(
::beman::execution26::conqueue_errc),
::beman::execution26::set_stopped_t()>;

bounded_queue& queue;
::std::remove_cvref_t<T> value;
template <::beman::execution26::receiver Receiver>
auto connect(Receiver&& receiver) && -> state<Receiver> {
static_assert(::beman::execution26::operation_state<state<Receiver>>);
return state<Receiver>(queue, ::std::move(value), ::std::forward<Receiver>(receiver));
}
};
static_assert(::beman::execution26::sender<push_sender>);
static_assert(::beman::execution26::sender_in<push_sender>);

static_assert(::std::same_as<value_type, typename Allocator::value_type>);

explicit bounded_queue(::std::size_t max, Allocator allocator = {})
Expand All @@ -51,8 +101,11 @@ class beman::execution26::bounded_queue : ::beman::execution26::detail::immovabl
return this->closed;
}
auto close() noexcept -> void {
std::lock_guard cerberos(this->mutex);
std::lock_guard cerberus(this->mutex);
this->closed = true;
while (not this->push_queue.empty()) {
this->push_queue.pop()->complete(::beman::execution26::conqueue_errc::closed);
}
this->push_condition.notify_all();
this->pop_condition.notify_all();
}
Expand All @@ -76,8 +129,7 @@ class beman::execution26::bounded_queue : ::beman::execution26::detail::immovabl
T rc{::std::move(element->value)};
this->destroy(element);
++this->tail;
cerberus.unlock();
this->push_condition.notify_one();
this->push_notify(cerberus);
return rc;
}
auto pop(::std::error_code& ec) -> ::std::optional<T> {
Expand All @@ -91,8 +143,7 @@ class beman::execution26::bounded_queue : ::beman::execution26::detail::immovabl
::std::optional<T> rc{::std::move(element->value)};
this->destroy(element);
++this->tail;
cerberus.unlock();
this->push_condition.notify_one();
this->push_notify(cerberus);
return rc;
}
auto try_pop(::std::error_code& ec) -> ::std::optional<T> {
Expand All @@ -109,8 +160,7 @@ class beman::execution26::bounded_queue : ::beman::execution26::detail::immovabl
::std::optional<T> rc{::std::move(element->value)};
this->destroy(element);
++this->tail;
cerberus.unlock();
this->push_condition.notify_one();
this->push_notify(cerberus);
return rc;
}
auto async_pop() -> sender auto { return ::beman::execution26::just(T()); }
Expand All @@ -128,6 +178,7 @@ class beman::execution26::bounded_queue : ::beman::execution26::detail::immovabl
};
using array_allocator_type = allocator_traits::template rebind_alloc<element_t>;
using array_allocator_traits = allocator_traits::template rebind_traits<element_t>;
using push_sender_queue = ::beman::execution26::detail::intrusive_queue<&push_sender::state_base::next>;

template <typename... Args>
auto construct(element_t* element, Args&&... args) {
Expand Down Expand Up @@ -158,39 +209,61 @@ class beman::execution26::bounded_queue : ::beman::execution26::detail::immovabl
}
template <typename Arg>
auto internal_push(Arg&& arg, ::beman::execution26::conqueue_errc& error) -> bool {
{
::std::unique_lock cerberus(this->mutex);
this->push_condition.wait(cerberus,
[this] { return this->closed || this->head - this->tail < this->max; });
if (this->closed) {
error = ::beman::execution26::conqueue_errc::closed;
return false;
}
this->construct(this->get(this->head), ::std::forward<Arg>(arg));
++this->head;
::std::unique_lock cerberus(this->mutex);
this->push_condition.wait(cerberus, [this] { return this->closed || this->head - this->tail < this->max; });
if (this->closed) {
error = ::beman::execution26::conqueue_errc::closed;
return false;
}
this->pop_condition.notify_one();
this->construct(this->get(this->head), ::std::forward<Arg>(arg));
++this->head;
this->pop_notify(cerberus);
return true;
}
template <typename Arg>
auto internal_try_push(Arg&& arg, ::std::error_code& ec) -> bool {
{
::std::unique_lock cerberus(this->mutex);
if (this->closed) {
ec = make_error_code(::beman::execution26::conqueue_errc::closed);
return false;
}
if (this->head - this->tail == this->max) {
ec = make_error_code(::beman::execution26::conqueue_errc::full);
return false;
}
this->construct(this->get(this->head), ::std::forward<Arg>(arg));
++this->head;
::std::unique_lock cerberus(this->mutex);
if (this->closed) {
ec = make_error_code(::beman::execution26::conqueue_errc::closed);
return false;
}
this->pop_condition.notify_one();
if (this->head - this->tail == this->max) {
ec = make_error_code(::beman::execution26::conqueue_errc::full);
return false;
}
this->construct(this->get(this->head), ::std::forward<Arg>(arg));
++this->head;
this->pop_notify(cerberus);
return true;
}
auto internal_async_push(T&& value) -> sender auto { return ::beman::execution26::just(); }
auto pop_notify(auto& cerberus) {
// if (not this->pop_queue.empty())
// this->pop_queue.pop()->complete();
// else
this->pop_condition.notify_one();
}
auto push_notify(auto& cerberus) {
if (not this->push_queue.empty())
this->push_queue.pop()->complete();
else
this->push_condition.notify_one();
}
template <typename TT>
auto internal_async_push(TT&& value) -> sender auto {
return push_sender{*this, ::std::forward<TT>(value)};
}
auto start_push(push_sender::state_base& s) -> bool {
std::unique_lock cerberus(this->mutex);
if (this->head - this->tail < this->max) {
this->construct(this->get(this->head), ::std::move(s.value));
++this->head;
this->pop_notify(cerberus);
return true;
} else {
this->push_queue.push(&s);
return false;
}
}

Allocator allocator;
array_allocator_type array_allocator;
Expand All @@ -201,6 +274,7 @@ class beman::execution26::bounded_queue : ::beman::execution26::detail::immovabl
element_t* elements;
::std::uint64_t head{}; // the next element to push to
::std::uint64_t tail{}; // the next element to push from
push_sender_queue push_queue;
bool closed{};
};

Expand Down
34 changes: 34 additions & 0 deletions include/beman/execution26/detail/intrusive_queue.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// include/beman/execution26/detail/intrusive_queue.hpp -*-C++-*-
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception

#ifndef INCLUDED_BEMAN_EXECUTION26_DETAIL_INTRUSIVE_QUEUE
#define INCLUDED_BEMAN_EXECUTION26_DETAIL_INTRUSIVE_QUEUE

#include <utility>

// ----------------------------------------------------------------------------

namespace beman::execution26::detail {
template <auto>
struct intrusive_queue;

template <typename T, T* T::* next>
struct intrusive_queue<next> {
T* head{};
T* tail{};

auto push(T* n) -> void {
if (this->head) {
std::exchange(this->tail, n)->*next = n;
} else {
this->head = this->tail = n;
}
}
auto pop() -> T* { return std::exchange(this->head, this->head->*next); }
auto empty() const -> bool { return this->head == nullptr; }
};
} // namespace beman::execution26::detail

// ----------------------------------------------------------------------------

#endif
14 changes: 7 additions & 7 deletions include/beman/execution26/detail/intrusive_stack.hpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// include/beman/execution26/detail/intrusive_queue.hpp -*-C++-*-
// include/beman/execution26/detail/intrusive_stack.hpp -*-C++-*-
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception

#ifndef INCLUDED_BEMAN_EXECUTION26_DETAIL_INTRUSIVE_QUEUE
Expand All @@ -15,16 +15,16 @@ class atomic_intrusive_stack;
template <auto Next>
class intrusive_stack;

//! @brief This data structure is an intrusive queue that is not thread-safe.
//! @brief This data structure is an intrusive stack that is not thread-safe.
template <class Item, Item* Item::*Next>
class intrusive_stack<Next> {
public:
//! @brief Pushes an item to the queue.
//! @brief Pushes an item to the stack.
auto push(Item* item) noexcept -> void { item->*Next = std::exchange(head_, item); }

//! @brief Pops one item from the queue.
//! @brief Pops one item from the stack.
//!
//! @return The item that was popped from the queue, or nullptr if the queue is empty.
//! @return The item that was popped from the stack, or nullptr if the stack is empty.
auto pop() noexcept -> Item* {
if (head_) {
auto item = head_;
Expand All @@ -34,7 +34,7 @@ class intrusive_stack<Next> {
return nullptr;
}

//! @brief Tests if the queue is empty.
//! @brief Tests if the stack is empty.
auto empty() const noexcept -> bool { return !head_; }

private:
Expand All @@ -44,4 +44,4 @@ class intrusive_stack<Next> {

} // namespace beman::execution26::detail

#endif
#endif
1 change: 1 addition & 0 deletions src/beman/execution26/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ target_sources(
${PROJECT_SOURCE_DIR}/include/beman/execution26/detail/indirect_meta_apply.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution26/detail/inplace_stop_source.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution26/detail/into_variant.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution26/detail/intrusive_queue.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution26/detail/intrusive_stack.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution26/detail/is_awaitable.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution26/detail/is_awaiter.hpp
Expand Down
1 change: 1 addition & 0 deletions tests/beman/execution26/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ endif()
list(
APPEND
execution_tests
intrusive-queue.test
bounded-queue.test
conqueue-error.test
conqueue-errc.test
Expand Down
54 changes: 54 additions & 0 deletions tests/beman/execution26/bounded-queue.test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,57 @@ auto test_pop(const auto one, auto two, const auto three, auto four, auto five)
t.join();
}
}

template <typename T>
auto test_async_push(auto one, auto two, auto three, auto four, auto five) -> void {
struct receiver {
using receiver_concept = test_std::receiver_t;
int& complete;
auto set_value() && noexcept -> void { this->complete = 1; }
auto set_error(test_std::conqueue_errc) && noexcept -> void { this->complete = 2; }
auto set_stopped() && noexcept -> void { this->complete = 3; }
};
static_assert(test_std::receiver<receiver>);

test_std::bounded_queue<T> queue(2);
auto s4{queue.async_push(four)}; // verify that the order isn't call but start()
auto s1{queue.async_push(one)};
auto s2{queue.async_push(two)};
auto s3{queue.async_push(three)};
auto s5{queue.async_push(five)};

int c1{}, c2{}, c3{}, c4{}, c5{};

auto op2{test_std::connect(std::move(s2), receiver{c2})}; // connect order also doesn't matter
ASSERT(c2 == 0);
auto op1{test_std::connect(std::move(s1), receiver{c1})};
ASSERT(c1 == 0);
auto op3{test_std::connect(std::move(s3), receiver{c3})};
ASSERT(c3 == 0);
auto op4{test_std::connect(std::move(s4), receiver{c4})};
ASSERT(c4 == 0);
auto op5{test_std::connect(std::move(s5), receiver{c5})};
ASSERT(c5 == 0);

test_std::start(op1);
ASSERT(c1 == 1);
test_std::start(op2);
ASSERT(c2 == 1);
test_std::start(op3);
ASSERT(c3 == 0);
test_std::start(op4);
ASSERT(c4 == 0);
test_std::start(op5);
ASSERT(c5 == 0);

ASSERT(queue.pop() == one);
ASSERT(c3 == 1);
ASSERT(queue.pop() == two);
ASSERT(c4 == 1);
queue.close();
ASSERT(c5 == 2);
}

} // namespace

TEST(bounded_queue) {
Expand All @@ -166,4 +217,7 @@ TEST(bounded_queue) {

test_pop<int>(1, 2, 3, 4, 5);
test_pop<std::string>("one"s, "two"s, "three"s, "four"s, "five"s);

test_async_push<int>(1, 2, 3, 4, 5);
test_async_push<std::string>("one"s, "two"s, "three"s, "four"s, "five"s);
}
Loading

0 comments on commit 7fa110f

Please sign in to comment.