From 8ac7ded74c096f1705b16ae2a18621d453899c35 Mon Sep 17 00:00:00 2001 From: Alexander Polyakov Date: Fri, 2 Aug 2024 19:25:47 +0300 Subject: [PATCH] Runtime light: coroutine scheduler & forks (#1050) Main changes: * stream management: now all operations with streams are performed through ComponentState. ComponentState stores all opened streams, releases unneeded streams, etc; * cancellable awaitables: awaitables that can stop waiting for some event; * forks: task_t is now a handle to a fork. Forks can be started, waited on, and cancelled; * coroutine scheduler: a coroutine scheduler concept and its simple implementation are added. --- .clangd | 6 + builtin-functions/kphp-light/functions.txt | 14 +- compiler/code-gen/declarations.cpp | 7 +- compiler/code-gen/files/init-scripts.cpp | 2 +- compiler/code-gen/vertex-compiler.cpp | 14 +- compiler/data/function-data.h | 1 + compiler/pipes/calc-bad-vars.cpp | 22 +- runtime-core/core-types/decl/optional.h | 13 + .../memory-resource/resource_allocator.h | 23 +- runtime-core/utils/small-object-storage.h | 47 ++++ runtime-light/component/component.cpp | 155 ++++++++---- runtime-light/component/component.h | 91 +++---- runtime-light/core/globals/php-init-scripts.h | 2 +- .../core/globals/php-script-globals.cpp | 1 + runtime-light/coroutine/awaitable.h | 205 ++++++++++++---- runtime-light/coroutine/task.h | 17 +- runtime-light/runtime-light.cmake | 86 ++++--- runtime-light/runtime-light.cpp | 26 +- runtime-light/scheduler/scheduler.cmake | 1 + runtime-light/scheduler/scheduler.cpp | 124 ++++++++++ runtime-light/scheduler/scheduler.h | 133 +++++++++++ runtime-light/stdlib/fork/fork-api.cpp | 24 ++ runtime-light/stdlib/fork/fork-api.h | 45 ++++ runtime-light/stdlib/fork/fork-context.cpp | 12 + runtime-light/stdlib/fork/fork-context.h | 47 ++++ runtime-light/stdlib/fork/fork.h | 27 +++ runtime-light/stdlib/misc.cpp | 81 +++---- runtime-light/stdlib/misc.h | 7 +- runtime-light/stdlib/output-control.cpp | 3 +- runtime-light/stdlib/rpc/rpc-context.cpp | 1 + runtime-light/stdlib/stdlib.cmake | 33 +-- runtime-light/stdlib/string-functions.cpp | 1 + runtime-light/stdlib/superglobals.cpp | 1 + runtime-light/stdlib/superglobals.h | 3 - runtime-light/stdlib/timer/timer.h | 29 +++ runtime-light/stdlib/variable-handling.cpp | 4 +- runtime-light/streams/component-stream.cpp | 67 ++++-- runtime-light/streams/component-stream.h | 27 +-- runtime-light/streams/interface.cpp | 136 +++++------ runtime-light/streams/interface.h | 29 ++- runtime-light/streams/streams.cpp | 225 +++++++++--------- runtime-light/streams/streams.h | 23 +- runtime-light/utils/concepts.h | 7 + runtime-light/utils/panic.h | 15 +- runtime-light/utils/timer.cpp | 21 -- runtime-light/utils/timer.h | 20 -- runtime-light/utils/utils.cmake | 6 +- runtime/storage.cpp | 8 +- runtime/storage.h | 58 +---- tests/k2-components/yield_loop.php | 2 +- 50 files changed, 1325 insertions(+), 627 deletions(-) create mode 100644 .clangd create mode 100644 runtime-core/utils/small-object-storage.h create mode 100644 runtime-light/scheduler/scheduler.cmake create mode 100644 runtime-light/scheduler/scheduler.cpp create mode 100644 runtime-light/scheduler/scheduler.h create mode 100644 runtime-light/stdlib/fork/fork-api.cpp create mode 100644 runtime-light/stdlib/fork/fork-api.h create mode 100644 runtime-light/stdlib/fork/fork-context.cpp create mode 100644 runtime-light/stdlib/fork/fork-context.h create mode 100644 runtime-light/stdlib/fork/fork.h create mode 100644 runtime-light/stdlib/timer/timer.h delete mode 100644 runtime-light/utils/timer.cpp delete mode 100644 runtime-light/utils/timer.h diff --git a/.clangd b/.clangd new file mode 100644 index 0000000000..40962097a3 --- /dev/null +++ b/.clangd @@ -0,0 +1,6 @@ +CompileFlags: + CompilationDatabase: build/ # Search build/ directory for compile_commands.json + +Diagnostics: + Suppress: cppcoreguidelines-avoid-do-while + diff --git a/builtin-functions/kphp-light/functions.txt b/builtin-functions/kphp-light/functions.txt index d92b9dad24..c9f5980d70 100644 --- a/builtin-functions/kphp-light/functions.txt +++ b/builtin-functions/kphp-light/functions.txt @@ -76,6 +76,17 @@ function get_hash_of_class (object $klass) ::: int; function strlen ($str ::: string) ::: int; +// === Fork ======================================================================================= + +/** @kphp-extern-func-info interruptible cpp_template_call */ +function wait(future | false $id, float $timeout = -1.0) ::: ^1[*] | null; + +/** @kphp-extern-func-info interruptible */ +function sched_yield() ::: void; + +/** @kphp-extern-func-info interruptible */ +function sched_yield_sleep($timeout_ns ::: int) ::: void; + // === Rpc ======================================================================================== /** @kphp-tl-class */ @@ -198,8 +209,6 @@ function instance_cast(object $instance, $to_type ::: string) ::: instance<^2>; function make_clone ($x ::: any) ::: ^1; -/** @kphp-extern-func-info interruptible */ -function testyield() ::: void; function check_shutdown() ::: void; function warning($message ::: string) ::: void; @@ -211,4 +220,5 @@ function debug_print_string($str ::: string) ::: void; function byte_to_int($str ::: string) ::: ?int; function int_to_byte($v ::: int) ::: ?string; +/** @kphp-extern-func-info interruptible */ function set_timer(int $timeout, callable():void $callback) ::: void; diff --git a/compiler/code-gen/declarations.cpp b/compiler/code-gen/declarations.cpp index 31a3a97b6e..ecf571c3f7 100644 --- a/compiler/code-gen/declarations.cpp +++ b/compiler/code-gen/declarations.cpp @@ -72,7 +72,9 @@ void FunctionDeclaration::compile(CodeGenerator &W) const { switch (style) { case gen_out_style::tagger: case gen_out_style::cpp: { - if (function->is_interruptible) { + if (function->is_k2_fork) { + FunctionSignatureGenerator(W) << "task_t " << FunctionName(function) << "(" << params_gen << ")"; + } else if (function->is_interruptible) { FunctionSignatureGenerator(W) << "task_t<" << ret_type_gen << ">" << " " << FunctionName(function) << "(" << params_gen << ")"; } else { FunctionSignatureGenerator(W) << ret_type_gen << " " << FunctionName(function) << "(" << params_gen << ")"; @@ -115,7 +117,8 @@ void FunctionParams::declare_cpp_param(CodeGenerator &W, VertexAdaptor v auto var_ptr = var->var_id; if (var->ref_flag) { W << "&"; - } else if (var_ptr->marked_as_const || (!function->has_variadic_param && var_ptr->is_read_only)) { + } else if (!function->is_k2_fork && (var_ptr->marked_as_const || (!function->has_variadic_param && var_ptr->is_read_only))) { + // the top of k2 fork must take arguments by value (see C++ avoid reference parameters in coroutines) W << (!type.type->is_primitive_type() ? "const &" : ""); } W << VarName(var_ptr); diff --git a/compiler/code-gen/files/init-scripts.cpp b/compiler/code-gen/files/init-scripts.cpp index 388e2e6748..282b5bd71f 100644 --- a/compiler/code-gen/files/init-scripts.cpp +++ b/compiler/code-gen/files/init-scripts.cpp @@ -226,7 +226,7 @@ void InitScriptsCpp::compile(CodeGenerator &W) const { W << GlobalsResetFunction(main_file_id->main_function) << NL; if (G->is_output_mode_k2_component()) { - FunctionSignatureGenerator(W) << "void init_php_scripts_in_each_worker(" << PhpMutableGlobalsRefArgument() << ", task_t&run" ")" << BEGIN; + FunctionSignatureGenerator(W) << "void init_php_scripts_in_each_worker(" << PhpMutableGlobalsRefArgument() << ", task_t &run" ")" << BEGIN; } else { FunctionSignatureGenerator(W) << "void init_php_scripts_in_each_worker(" << PhpMutableGlobalsRefArgument() << ")" << BEGIN; } diff --git a/compiler/code-gen/vertex-compiler.cpp b/compiler/code-gen/vertex-compiler.cpp index 71ed0feed1..826e2456ab 100644 --- a/compiler/code-gen/vertex-compiler.cpp +++ b/compiler/code-gen/vertex-compiler.cpp @@ -844,7 +844,11 @@ void compile_func_call(VertexAdaptor root, CodeGenerator &W, func_ if (mode == func_call_mode::fork_call) { - W << FunctionForkName(func); + if (func->is_interruptible) { + W << "(co_await start_fork_and_reschedule_t{" << FunctionName(func); + } else { + W << FunctionForkName(func); + } } else { if (func->is_interruptible) { W << "(" << "co_await "; @@ -874,7 +878,13 @@ void compile_func_call(VertexAdaptor root, CodeGenerator &W, func_ W << JoinValues(args, ", "); W << ")"; if (func->is_interruptible) { - W << ")"; + if (mode == func_call_mode::fork_call) { + W << "})"; + } else if (func->is_k2_fork) { // k2 fork's return type is 'task_t' so we need to unpack actual result from fork_result + W << ").get_result<" << TypeName(tinf::get_type(root)) << ">()"; + } else { + W << ")"; + } } } diff --git a/compiler/data/function-data.h b/compiler/data/function-data.h index 3962e49b62..1a3662a74c 100644 --- a/compiler/data/function-data.h +++ b/compiler/data/function-data.h @@ -118,6 +118,7 @@ class FunctionData { bool cpp_variadic_call = false; bool is_resumable = false; bool is_interruptible = false; + bool is_k2_fork = false; bool can_be_implicitly_interrupted_by_other_resumable = false; bool is_virtual_method = false; bool is_overridden_method = false; diff --git a/compiler/pipes/calc-bad-vars.cpp b/compiler/pipes/calc-bad-vars.cpp index 032dec6f69..cac43595d2 100644 --- a/compiler/pipes/calc-bad-vars.cpp +++ b/compiler/pipes/calc-bad-vars.cpp @@ -2,13 +2,16 @@ // Copyright (c) 2020 LLC «V Kontakte» // Distributed under the GPL v3 License, see LICENSE.notice.txt -#include #include "compiler/pipes/calc-bad-vars.h" +#include +#include + #include "compiler/compiler-core.h" #include "compiler/data/class-data.h" #include "compiler/data/src-file.h" #include "compiler/function-pass.h" +#include "compiler/pipes/calc-func-dep.h" #include "compiler/utils/idmap.h" /*** Common algorithm ***/ @@ -548,6 +551,15 @@ class CalcBadVars { } } + static void calc_k2_fork(const FuncCallGraph &call_graph, const std::vector &dep_data) { + for (int i = 0; i < call_graph.n; ++i) { + for (const auto &fork : dep_data[i].forks) { + fork->is_interruptible = true; + fork->is_k2_fork = true; + } + } + } + static void calc_resumable(const FuncCallGraph &call_graph, const std::vector &dep_data) { for (int i = 0; i < call_graph.n; i++) { for (const auto &fork : dep_data[i].forks) { @@ -684,8 +696,12 @@ class CalcBadVars { { FuncCallGraph call_graph(std::move(functions), dep_datas); - calc_interruptible(call_graph); - calc_resumable(call_graph, dep_datas); + if (G->is_output_mode_k2_component()) { + calc_k2_fork(call_graph, dep_datas); + calc_interruptible(call_graph); + } else { + calc_resumable(call_graph, dep_datas); + } generate_bad_vars(call_graph, dep_datas); check_func_colors(call_graph); save_func_dep(call_graph); diff --git a/runtime-core/core-types/decl/optional.h b/runtime-core/core-types/decl/optional.h index 69dbde2a31..925549c56a 100644 --- a/runtime-core/core-types/decl/optional.h +++ b/runtime-core/core-types/decl/optional.h @@ -203,3 +203,16 @@ using enable_if_t_is_optional_t2 = std::enable_if_t template using enable_if_t_is_optional_string = enable_if_t_is_optional_t2; + +template +struct InternalOptionalType { + using type = T; +}; + +template +struct InternalOptionalType> { + using type = T; +}; + +template +using internal_optional_type_t = typename InternalOptionalType::type; diff --git a/runtime-core/memory-resource/resource_allocator.h b/runtime-core/memory-resource/resource_allocator.h index a4bab87535..eca8f3b1a1 100644 --- a/runtime-core/memory-resource/resource_allocator.h +++ b/runtime-core/memory-resource/resource_allocator.h @@ -3,9 +3,12 @@ // Distributed under the GPL v3 License, see LICENSE.notice.txt #pragma once + +#include #include #include #include +#include #include "common/wrappers/likely.h" @@ -19,19 +22,16 @@ class resource_allocator { using value_type = T; template - friend - class resource_allocator; + friend class resource_allocator; - explicit resource_allocator(MemoryResource &memory_resource) noexcept: - memory_resource_(memory_resource) { - } + explicit resource_allocator(MemoryResource &memory_resource) noexcept + : memory_resource_(memory_resource) {} template - explicit resource_allocator(const resource_allocator &other) noexcept: - memory_resource_(other.memory_resource_) { - } + explicit resource_allocator(const resource_allocator &other) noexcept + : memory_resource_(other.memory_resource_) {} - value_type *allocate(size_t size, void const * = nullptr) { + value_type *allocate(size_t size, [[maybe_unused]] void const *ptr = nullptr) { static_assert(sizeof(value_type) <= max_value_type_size(), "memory limit"); auto result = static_cast(memory_resource_.allocate(sizeof(value_type) * size)); if (unlikely(!result)) { @@ -46,7 +46,7 @@ class resource_allocator { } static constexpr size_t max_value_type_size() { - return 128u; + return 128U; } friend inline bool operator==(const resource_allocator &lhs, const resource_allocator &rhs) noexcept { @@ -65,6 +65,9 @@ namespace stl { template, class KeyEqual = std::equal_to> using unordered_map = std::unordered_map, Resource>>; +template, class KeyEqual = std::equal_to> +using unordered_set = std::unordered_set>; + template> using map = std::map, Resource>>; diff --git a/runtime-core/utils/small-object-storage.h b/runtime-core/utils/small-object-storage.h new file mode 100644 index 0000000000..2ea08dbfe7 --- /dev/null +++ b/runtime-core/utils/small-object-storage.h @@ -0,0 +1,47 @@ +// Compiler for PHP (aka KPHP) +// Copyright (c) 2020 LLC «V Kontakte» +// Distributed under the GPL v3 License, see LICENSE.notice.txt + +#pragma once + +#include +#include +#include +#include + +#include "runtime-core/runtime-core.h" + +template +union small_object_storage { + std::array storage_; + void *storage_ptr; + + template + std::enable_if_t emplace(Args &&...args) noexcept { + return new (storage_.data()) T(std::forward(args)...); + } + template + std::enable_if_t get() noexcept { + return reinterpret_cast(storage_.data()); + } + template + std::enable_if_t destroy() noexcept { + get()->~T(); + } + + template + std::enable_if_t < limit emplace(Args &&...args) noexcept { + storage_ptr = RuntimeAllocator::current().alloc_script_memory(sizeof(T)); + return new (storage_ptr) T(std::forward(args)...); + } + template + std::enable_if_t < limit get() noexcept { + return static_cast(storage_ptr); + } + template + std::enable_if_t < limit destroy() noexcept { + T *mem = get(); + mem->~T(); + RuntimeAllocator::current().free_script_memory(mem, sizeof(T)); + } +}; diff --git a/runtime-light/component/component.cpp b/runtime-light/component/component.cpp index 72f282f749..e1a81c95e1 100644 --- a/runtime-light/component/component.cpp +++ b/runtime-light/component/component.cpp @@ -3,64 +3,137 @@ // Distributed under the GPL v3 License, see LICENSE.notice.txt #include "runtime-light/component/component.h" + +#include +#include +#include + +#include "runtime-core/utils/kphp-assert-core.h" #include "runtime-light/core/globals/php-init-scripts.h" +#include "runtime-light/header.h" +#include "runtime-light/scheduler/scheduler.h" +#include "runtime-light/utils/context.h" -void ComponentState::resume_if_was_rescheduled() { - if (poll_status == PollStatus::PollReschedule) { - // If component was suspended by please yield and there is no awaitable streams - main_thread(); - } +void ComponentState::init_script_execution() noexcept { + kphp_core_context.init(); + init_php_scripts_in_each_worker(php_script_mutable_globals_singleton, main_task); + scheduler.suspend(std::make_pair(main_task.get_handle(), WaitEvent::Rechedule{})); } -bool ComponentState::is_stream_already_being_processed(uint64_t stream_d) { - return opened_streams.contains(stream_d); -} +void ComponentState::process_platform_updates() noexcept { + const auto &platform_ctx{*get_platform_context()}; + + for (;;) { + // check if platform asked for yield + if (static_cast(platform_ctx.please_yield.load())) { // tell the scheduler that we are about to yield + const auto schedule_status{scheduler.schedule(ScheduleEvent::Yield{})}; + poll_status = schedule_status == ScheduleStatus::Error ? PollStatus::PollFinishedError : PollStatus::PollReschedule; + return; + } -void ComponentState::resume_if_wait_stream(uint64_t stream_d, StreamStatus status) { - if (is_stream_timer(stream_d)) { - process_timer(stream_d); - } else { - process_stream(stream_d, status); + // try taking update from the platform + if (uint64_t stream_d{}; static_cast(platform_ctx.take_update(std::addressof(stream_d)))) { + if (opened_streams_.contains(stream_d)) { // update on opened stream + switch (scheduler.schedule(ScheduleEvent::UpdateOnStream{.stream_d = stream_d})) { + case ScheduleStatus::Resumed: { // scheduler's resumed a coroutine waiting for update + break; + } + case ScheduleStatus::Skipped: { // no one is waiting for the event yet, so just save it + pending_updates_.insert(stream_d); + break; + } + case ScheduleStatus::Error: { // something bad's happened, stop execution + poll_status = PollStatus::PollFinishedError; + return; + } + } + } else { // update on incoming stream + if (standard_stream_ != INVALID_PLATFORM_DESCRIPTOR) { + php_warning("skip new incoming stream since previous one is not closed"); + release_stream(stream_d); + continue; + } // TODO: multiple incoming streams (except for http queries) + standard_stream_ = stream_d; + incoming_streams_.push_back(stream_d); + opened_streams_.insert(stream_d); + if (const auto schedule_status{scheduler.schedule(ScheduleEvent::IncomingStream{.stream_d = stream_d})}; schedule_status == ScheduleStatus::Error) { + poll_status = PollStatus::PollFinishedError; + return; + } + } + } else { // we'are out of updates so let the scheduler do whatever it wants + switch (scheduler.schedule(ScheduleEvent::NoEvent{})) { + case ScheduleStatus::Resumed: { // scheduler's resumed some coroutine, so let's continue scheduling + break; + } + case ScheduleStatus::Skipped: { // scheduler's done nothing, so it's either scheduled all coroutines or is waiting for events + poll_status = scheduler.done() ? PollStatus::PollFinishedOk : PollStatus::PollBlocked; + return; + } + case ScheduleStatus::Error: { // something bad's happened, stop execution + poll_status = PollStatus::PollFinishedError; + return; + } + } + } } + // unreachable code + poll_status = PollStatus::PollFinishedError; } -void ComponentState::process_new_input_stream(uint64_t stream_d) { - bool already_pending = std::find(incoming_pending_queries.begin(), incoming_pending_queries.end(), stream_d) != incoming_pending_queries.end(); - if (!already_pending) { - php_debug("got new pending query %lu", stream_d); - incoming_pending_queries.push_back(stream_d); - } - if (wait_incoming_stream) { - php_debug("start process pending query %lu", stream_d); - main_thread(); +uint64_t ComponentState::take_incoming_stream() noexcept { + if (incoming_streams_.empty()) { + php_warning("can't take incoming stream cause we don't have them"); + return INVALID_PLATFORM_DESCRIPTOR; } + const auto stream_d{incoming_streams_.front()}; + incoming_streams_.pop_front(); + php_debug("take incoming stream %" PRIu64, stream_d); + return stream_d; } -void ComponentState::init_script_execution() { - kphp_core_context.init(); - init_php_scripts_in_each_worker(php_script_mutable_globals_singleton, k_main); - main_thread = k_main.get_handle(); +uint64_t ComponentState::open_stream(const string &component_name) noexcept { + uint64_t stream_d{}; + if (const auto open_stream_res{get_platform_context()->open(component_name.size(), component_name.c_str(), std::addressof(stream_d))}; + open_stream_res != OpenStreamResult::OpenStreamOk) { + php_warning("can't open stream to %s", component_name.c_str()); + return INVALID_PLATFORM_DESCRIPTOR; + } + opened_streams_.insert(stream_d); + php_debug("opened a stream %" PRIu64 " to %s", stream_d, component_name.c_str()); + return stream_d; } -bool ComponentState::is_stream_timer(uint64_t stream_d) { - return timer_callbacks.contains(stream_d); +uint64_t ComponentState::set_timer(std::chrono::nanoseconds duration) noexcept { + uint64_t timer_d{}; + if (const auto set_timer_res{get_platform_context()->set_timer(std::addressof(timer_d), static_cast(duration.count()))}; + set_timer_res != SetTimerResult::SetTimerOk) { + php_warning("can't set timer for %.9f sec", std::chrono::duration(duration).count()); + return INVALID_PLATFORM_DESCRIPTOR; + } + opened_streams_.insert(timer_d); + php_debug("set timer %" PRIu64 " for %.9f sec", timer_d, std::chrono::duration(duration).count()); + return timer_d; } -void ComponentState::process_timer(uint64_t stream_d) { +void ComponentState::release_stream(uint64_t stream_d) noexcept { + if (stream_d == standard_stream_) { + standard_stream_ = INVALID_PLATFORM_DESCRIPTOR; + } + opened_streams_.erase(stream_d); + pending_updates_.erase(stream_d); // also erase pending updates if exists get_platform_context()->free_descriptor(stream_d); - timer_callbacks[stream_d](); - timer_callbacks.erase(stream_d); - opened_streams.erase(stream_d); + php_debug("released a stream %" PRIu64, stream_d); } -void ComponentState::process_stream(uint64_t stream_d, StreamStatus status) { - auto expected_status = opened_streams[stream_d]; - if ((expected_status == StreamRuntimeStatus::WBlocked && status.write_status != IOBlocked) - || (expected_status == StreamRuntimeStatus::RBlocked && status.read_status != IOBlocked)) { - php_debug("resume on waited query %lu", stream_d); - auto suspend_point = awaiting_coroutines[stream_d]; - awaiting_coroutines.erase(stream_d); - php_assert(awaiting_coroutines.empty()); - suspend_point(); +void ComponentState::release_all_streams() noexcept { + const auto &platform_ctx{*get_platform_context()}; + standard_stream_ = INVALID_PLATFORM_DESCRIPTOR; + for (const auto stream_d : opened_streams_) { + platform_ctx.free_descriptor(stream_d); + php_debug("released a stream %" PRIu64, stream_d); } + opened_streams_.clear(); + pending_updates_.clear(); + incoming_streams_.clear(); } diff --git a/runtime-light/component/component.h b/runtime-light/component/component.h index 7057f6dd5a..65a9ca173d 100644 --- a/runtime-light/component/component.h +++ b/runtime-light/component/component.h @@ -4,78 +4,87 @@ #pragma once -#include +#include #include #include -#include -#include +#include #include "runtime-core/memory-resource/resource_allocator.h" #include "runtime-core/memory-resource/unsynchronized_pool_resource.h" #include "runtime-core/runtime-core.h" - #include "runtime-light/core/globals/php-script-globals.h" #include "runtime-light/coroutine/task.h" +#include "runtime-light/header.h" +#include "runtime-light/scheduler/scheduler.h" +#include "runtime-light/stdlib/fork/fork-context.h" #include "runtime-light/stdlib/output-control.h" #include "runtime-light/stdlib/rpc/rpc-context.h" -#include "runtime-light/stdlib/superglobals.h" -#include "runtime-light/streams/streams.h" -#include "runtime-light/utils/context.h" + +constexpr uint64_t INVALID_PLATFORM_DESCRIPTOR = 0; + +// Coroutine scheduler type. Change it here if you want to use another scheduler +using CoroutineScheduler = SimpleCoroutineScheduler; +static_assert(CoroutineSchedulerConcept); struct ComponentState { - template - using unordered_map = memory_resource::stl::unordered_map; + template + using unordered_set = memory_resource::stl::unordered_set; + template using deque = memory_resource::stl::deque; - static constexpr auto INIT_RUNTIME_ALLOCATOR_SIZE = static_cast(512U * 1024U); // 512KB - ComponentState() + ComponentState() noexcept : runtime_allocator(INIT_RUNTIME_ALLOCATOR_SIZE, 0) + , scheduler(runtime_allocator.memory_resource) + , fork_component_context(runtime_allocator.memory_resource) , php_script_mutable_globals_singleton(runtime_allocator.memory_resource) - , opened_streams(unordered_map::allocator_type{runtime_allocator.memory_resource}) - , awaiting_coroutines(unordered_map>::allocator_type{runtime_allocator.memory_resource}) - , timer_callbacks(unordered_map>::allocator_type{runtime_allocator.memory_resource}) - , incoming_pending_queries(deque::allocator_type{runtime_allocator.memory_resource}) - , rpc_component_context(runtime_allocator.memory_resource) {} + , rpc_component_context(runtime_allocator.memory_resource) + , incoming_streams_(deque::allocator_type{runtime_allocator.memory_resource}) + , opened_streams_(unordered_set::allocator_type{runtime_allocator.memory_resource}) + , pending_updates_(unordered_set::allocator_type{runtime_allocator.memory_resource}) {} ~ComponentState() = default; - bool not_finished() const noexcept { - return poll_status != PollStatus::PollFinishedOk && poll_status != PollStatus::PollFinishedError; - } - - void resume_if_was_rescheduled(); - - bool is_stream_already_being_processed(uint64_t stream_d); - - void resume_if_wait_stream(uint64_t stream_d, StreamStatus status); + void init_script_execution() noexcept; + void process_platform_updates() noexcept; - void process_new_input_stream(uint64_t stream_d); - - void init_script_execution(); + bool stream_updated(uint64_t stream_d) const noexcept { + return pending_updates_.contains(stream_d); + } + const unordered_set &opened_streams() const noexcept { + return opened_streams_; + } + const deque &incoming_streams() const noexcept { + return incoming_streams_; + } + uint64_t standard_stream() const noexcept { + return standard_stream_; + } + uint64_t take_incoming_stream() noexcept; + uint64_t open_stream(const string &) noexcept; + uint64_t set_timer(std::chrono::nanoseconds) noexcept; + void release_stream(uint64_t) noexcept; + void release_all_streams() noexcept; RuntimeAllocator runtime_allocator; - task_t k_main; - Response response; - PhpScriptMutableGlobals php_script_mutable_globals_singleton; + CoroutineScheduler scheduler; + ForkComponentContext fork_component_context; PollStatus poll_status = PollStatus::PollReschedule; - uint64_t standard_stream = 0; - std::coroutine_handle<> main_thread; - bool wait_incoming_stream = false; - unordered_map opened_streams; // подумать про необходимость opened_streams. Объединить с awaiting_coroutines - unordered_map> awaiting_coroutines; - unordered_map> timer_callbacks; - deque incoming_pending_queries; + Response response; + PhpScriptMutableGlobals php_script_mutable_globals_singleton; KphpCoreContext kphp_core_context; RpcComponentContext rpc_component_context; private: - bool is_stream_timer(uint64_t stream_d); + task_t main_task; - void process_timer(uint64_t stream_d); + uint64_t standard_stream_{INVALID_PLATFORM_DESCRIPTOR}; + deque incoming_streams_; + unordered_set opened_streams_; + unordered_set pending_updates_; - void process_stream(uint64_t stream_d, StreamStatus status); + static constexpr auto INIT_RUNTIME_ALLOCATOR_SIZE = static_cast(512U * 1024U); // 512KB }; diff --git a/runtime-light/core/globals/php-init-scripts.h b/runtime-light/core/globals/php-init-scripts.h index 6acf8111cc..f9530da589 100644 --- a/runtime-light/core/globals/php-init-scripts.h +++ b/runtime-light/core/globals/php-init-scripts.h @@ -4,7 +4,7 @@ #pragma once -#include +#include "runtime-light/coroutine/task.h" class PhpScriptMutableGlobals; diff --git a/runtime-light/core/globals/php-script-globals.cpp b/runtime-light/core/globals/php-script-globals.cpp index d9f2421d82..afd86c307f 100644 --- a/runtime-light/core/globals/php-script-globals.cpp +++ b/runtime-light/core/globals/php-script-globals.cpp @@ -5,6 +5,7 @@ #include "php-script-globals.h" #include "runtime-light/component/component.h" +#include "runtime-light/utils/context.h" PhpScriptMutableGlobals &PhpScriptMutableGlobals::current() noexcept { return get_component_context()->php_script_mutable_globals_singleton; diff --git a/runtime-light/coroutine/awaitable.h b/runtime-light/coroutine/awaitable.h index e239cd559d..f468796117 100644 --- a/runtime-light/coroutine/awaitable.h +++ b/runtime-light/coroutine/awaitable.h @@ -4,75 +4,198 @@ #pragma once +#include +#include #include +#include +#include +#include +#include "runtime-core/core-types/decl/optional.h" +#include "runtime-core/utils/kphp-assert-core.h" #include "runtime-light/component/component.h" -#include "runtime-light/utils/logs.h" +#include "runtime-light/coroutine/task.h" +#include "runtime-light/stdlib/fork/fork-context.h" +#include "runtime-light/stdlib/fork/fork.h" +#include "runtime-light/header.h" +#include "runtime-light/scheduler/scheduler.h" +#include "runtime-light/utils/context.h" -struct blocked_operation_t { - uint64_t awaited_stream; +template +concept Awaitable = requires(T && awaitable, std::coroutine_handle<> coro) { + { awaitable.await_ready() } noexcept -> std::convertible_to; + { awaitable.await_suspend(coro) } noexcept; + { awaitable.await_resume() } noexcept; +}; - blocked_operation_t(uint64_t stream_d) - : awaited_stream(stream_d) {} +template +concept CancellableAwaitable = Awaitable && requires(T && awaitable) { + { awaitable.cancel() } noexcept -> std::same_as; +}; - constexpr bool await_ready() const noexcept { - return false; +// === Awaitables ================================================================================= + +class wait_for_update_t { + uint64_t stream_d; + SuspendToken suspend_token_; + +public: + explicit wait_for_update_t(uint64_t stream_d_) noexcept + : stream_d(stream_d_) + , suspend_token_(std::noop_coroutine(), WaitEvent::UpdateOnStream{.stream_d = stream_d}) {} + + bool await_ready() const noexcept { + return get_component_context()->stream_updated(stream_d); } - void await_resume() const noexcept { - ComponentState &ctx = *get_component_context(); - ctx.opened_streams[awaited_stream] = StreamRuntimeStatus::NotBlocked; + void await_suspend(std::coroutine_handle<> coro) noexcept { + suspend_token_.first = coro; + CoroutineScheduler::get().suspend(suspend_token_); } -}; -struct read_blocked_t : blocked_operation_t { - void await_suspend(std::coroutine_handle<> h) const noexcept { - php_debug("blocked read on stream %lu", awaited_stream); - ComponentState &ctx = *get_component_context(); - ctx.poll_status = PollStatus::PollBlocked; - ctx.opened_streams[awaited_stream] = StreamRuntimeStatus::RBlocked; - ctx.awaiting_coroutines[awaited_stream] = h; + constexpr void await_resume() const noexcept {} + + void cancel() const noexcept { + CoroutineScheduler::get().cancel(suspend_token_); } }; -struct write_blocked_t : blocked_operation_t { - void await_suspend(std::coroutine_handle<> h) const noexcept { - php_debug("blocked write on stream %lu", awaited_stream); - ComponentState &ctx = *get_component_context(); - ctx.poll_status = PollStatus::PollBlocked; - ctx.opened_streams[awaited_stream] = StreamRuntimeStatus::WBlocked; - ctx.awaiting_coroutines[awaited_stream] = h; +// ================================================================================================ + +class wait_for_incoming_stream_t { + SuspendToken suspend_token_{std::noop_coroutine(), WaitEvent::IncomingStream{}}; + +public: + bool await_ready() const noexcept { + return !get_component_context()->incoming_streams().empty(); + } + + void await_suspend(std::coroutine_handle<> coro) noexcept { + suspend_token_.first = coro; + CoroutineScheduler::get().suspend(suspend_token_); + } + + uint64_t await_resume() const noexcept { + const auto incoming_stream_d{get_component_context()->take_incoming_stream()}; + php_assert(incoming_stream_d != INVALID_PLATFORM_DESCRIPTOR); + return incoming_stream_d; + } + + void cancel() const noexcept { + CoroutineScheduler::get().cancel(suspend_token_); } }; -struct test_yield_t { - bool await_ready() const noexcept { - return !get_platform_context()->please_yield.load(); +// ================================================================================================ + +class wait_for_reschedule_t { + SuspendToken suspend_token_{std::noop_coroutine(), WaitEvent::Rechedule{}}; + +public: + constexpr bool await_ready() const noexcept { + return false; } - void await_suspend(std::coroutine_handle<> h) const noexcept { - ComponentState &ctx = *get_component_context(); - ctx.poll_status = PollStatus::PollReschedule; - ctx.main_thread = h; + void await_suspend(std::coroutine_handle<> coro) noexcept { + suspend_token_.first = coro; + CoroutineScheduler::get().suspend(suspend_token_); } constexpr void await_resume() const noexcept {} + + void cancel() const noexcept { + CoroutineScheduler::get().cancel(suspend_token_); + } }; -struct wait_incoming_query_t { +// ================================================================================================ + +class wait_for_timer_t { + uint64_t timer_d{}; + SuspendToken suspend_token_; + +public: + explicit wait_for_timer_t(std::chrono::nanoseconds duration) noexcept + : timer_d(get_component_context()->set_timer(duration)) + , suspend_token_(std::noop_coroutine(), WaitEvent::UpdateOnTimer{.timer_d = timer_d}) {} + bool await_ready() const noexcept { - return !get_component_context()->incoming_pending_queries.empty(); + TimePoint tp{}; + return timer_d == INVALID_PLATFORM_DESCRIPTOR || get_platform_context()->get_timer_status(timer_d, std::addressof(tp)) == TimerStatus::TimerStatusElapsed; } - void await_suspend(std::coroutine_handle<> h) const noexcept { - ComponentState &ctx = *get_component_context(); - php_assert(ctx.standard_stream == 0); - ctx.main_thread = h; - ctx.wait_incoming_stream = true; - ctx.poll_status = PollBlocked; + void await_suspend(std::coroutine_handle<> coro) noexcept { + suspend_token_.first = coro; + CoroutineScheduler::get().suspend(suspend_token_); } void await_resume() const noexcept { - get_component_context()->wait_incoming_stream = false; + get_component_context()->release_stream(timer_d); + } + + void cancel() const noexcept { + get_component_context()->release_stream(timer_d); + CoroutineScheduler::get().cancel(suspend_token_); + } +}; + +// ================================================================================================ + +class start_fork_and_reschedule_t { + std::coroutine_handle<> fork_coro; + int64_t fork_id{}; + SuspendToken suspend_token_{std::noop_coroutine(), WaitEvent::Rechedule{}}; + +public: + explicit start_fork_and_reschedule_t(task_t &&task_) noexcept + : fork_coro(task_.get_handle()) + , fork_id(ForkComponentContext::get().push_fork(std::move(task_))) {} + + constexpr bool await_ready() const noexcept { + return false; + } + + std::coroutine_handle<> await_suspend(std::coroutine_handle<> current_coro) noexcept { + suspend_token_.first = current_coro; + CoroutineScheduler::get().suspend(suspend_token_); + return fork_coro; + } + + int64_t await_resume() const noexcept { + return fork_id; + } +}; + +// ================================================================================================ + +template +class wait_fork_t { + task_t task; + wait_for_timer_t timer_awaiter; + task_t::awaiter_t fork_awaiter; + +public: + wait_fork_t(task_t &&task_, std::chrono::nanoseconds timeout_) noexcept + : task(std::move(task_)) + , timer_awaiter(timeout_) + , fork_awaiter(std::addressof(task)) {} + + bool await_ready() const noexcept { + return task.done(); + } + + void await_suspend(std::coroutine_handle<> coro) noexcept { + fork_awaiter.await_suspend(coro); + timer_awaiter.await_suspend(coro); + } + + Optional await_resume() noexcept { + if (task.done()) { + timer_awaiter.cancel(); + return {fork_awaiter.await_resume().get_result()}; + } else { + fork_awaiter.cancel(); + return {}; + } } }; diff --git a/runtime-light/coroutine/task.h b/runtime-light/coroutine/task.h index cc4b990e0c..b9c13b7cf6 100644 --- a/runtime-light/coroutine/task.h +++ b/runtime-light/coroutine/task.h @@ -10,6 +10,7 @@ #include #include "common/containers/final_action.h" +#include "runtime-core/utils/kphp-assert-core.h" #include "runtime-light/utils/context.h" #if __clang_major__ > 7 @@ -108,7 +109,7 @@ struct task_t : public task_base_t { std::exception_ptr exception; static task_t get_return_object_on_allocation_failure() { - throw std::bad_alloc(); + php_critical_error("cannot allocate memory for task_t"); } template @@ -143,14 +144,14 @@ struct task_t : public task_base_t { get_handle().resume(); } - T get_result() { + T get_result() noexcept { if (get_handle().promise().exception) { std::rethrow_exception(std::move(get_handle().promise().exception)); } if constexpr (!std::is_void{}) { T *t = std::launder(reinterpret_cast(get_handle().promise().bytes)); const vk::final_action final_action([t] { t->~T(); }); - return *t; + return std::move(*t); } } @@ -171,7 +172,7 @@ struct task_t : public task_base_t { explicit awaiter_t(task_t *task) : task{task} {} - bool await_ready() const { + constexpr bool await_ready() const noexcept { return false; } @@ -181,7 +182,7 @@ struct task_t : public task_base_t { #else bool #endif - await_suspend(std::coroutine_handle h) { + await_suspend(std::coroutine_handle h) noexcept { #ifdef CPPCORO_COMPILER_SUPPORTS_SYMMETRIC_TRANSFER task->get_handle().promise().next = h.address(); return task->get_handle(); @@ -197,10 +198,14 @@ struct task_t : public task_base_t { #endif } - T await_resume() { + T await_resume() noexcept { return task->get_result(); } + void cancel() const noexcept { + task->get_handle().promise().next = nullptr; + } + task_t *task; }; diff --git a/runtime-light/runtime-light.cmake b/runtime-light/runtime-light.cmake index 6100395c54..7487d40ee7 100644 --- a/runtime-light/runtime-light.cmake +++ b/runtime-light/runtime-light.cmake @@ -1,5 +1,6 @@ include(${RUNTIME_LIGHT_DIR}/allocator/allocator.cmake) include(${RUNTIME_LIGHT_DIR}/core/core.cmake) +include(${RUNTIME_LIGHT_DIR}/scheduler/scheduler.cmake) include(${RUNTIME_LIGHT_DIR}/stdlib/stdlib.cmake) include(${RUNTIME_LIGHT_DIR}/streams/streams.cmake) include(${RUNTIME_LIGHT_DIR}/tl/tl.cmake) @@ -7,41 +8,48 @@ include(${RUNTIME_LIGHT_DIR}/utils/utils.cmake) include(${RUNTIME_LIGHT_DIR}/component/component.cmake) include(${RUNTIME_LIGHT_DIR}/memory-resource-impl/memory-resource-impl.cmake) -set(RUNTIME_LIGHT_SRC ${RUNTIME_CORE_SRC} - ${RUNTIME_STDLIB_SRC} - ${RUNTIME_ALLOCATOR_SRC} - ${RUNTIME_COROUTINE_SRC} - ${RUNTIME_COMPONENT_SRC} - ${RUNTIME_STREAMS_SRC} - ${RUNTIME_TL_SRC} - ${RUNTIME_UTILS_SRC} - ${RUNTIME_LANGUAGE_SRC} - ${RUNTIME_MEMORY_RESOURCE_IMPL_SRC} - runtime-light.cpp) +set(RUNTIME_LIGHT_SRC + ${RUNTIME_CORE_SRC} + ${RUNTIME_STDLIB_SRC} + ${RUNTIME_SCHEDULER_SRC} + ${RUNTIME_ALLOCATOR_SRC} + ${RUNTIME_COROUTINE_SRC} + ${RUNTIME_COMPONENT_SRC} + ${RUNTIME_STREAMS_SRC} + ${RUNTIME_TL_SRC} + ${RUNTIME_UTILS_SRC} + ${RUNTIME_LANGUAGE_SRC} + ${RUNTIME_MEMORY_RESOURCE_IMPL_SRC} + runtime-light.cpp) set(RUNTIME_SOURCES_FOR_COMP "${RUNTIME_LIGHT_SRC}") -configure_file(${BASE_DIR}/compiler/runtime_sources.h.in ${AUTO_DIR}/compiler/runtime_sources.h) +configure_file(${BASE_DIR}/compiler/runtime_sources.h.in + ${AUTO_DIR}/compiler/runtime_sources.h) prepend(RUNTIME_LIGHT_SRC ${RUNTIME_LIGHT_DIR}/ "${RUNTIME_LIGHT_SRC}") vk_add_library(runtime-light OBJECT ${RUNTIME_LIGHT_SRC}) set_property(TARGET runtime-light PROPERTY POSITION_INDEPENDENT_CODE ON) -set_target_properties(runtime-light PROPERTIES - LIBRARY_OUTPUT_DIRECTORY ${BASE_DIR}/objs) +set_target_properties(runtime-light PROPERTIES LIBRARY_OUTPUT_DIRECTORY + ${BASE_DIR}/objs) target_compile_options(runtime-light PUBLIC -stdlib=libc++) target_link_options(runtime-light PUBLIC -stdlib=libc++ -static-libstdc++) vk_add_library(kphp-light-runtime STATIC) -target_link_libraries(kphp-light-runtime PUBLIC vk::light_common vk::runtime-light vk::runtime-core) -set_target_properties(kphp-light-runtime PROPERTIES ARCHIVE_OUTPUT_DIRECTORY ${OBJS_DIR}) +target_link_libraries( + kphp-light-runtime PUBLIC vk::light_common vk::runtime-light vk::runtime-core) +set_target_properties(kphp-light-runtime PROPERTIES ARCHIVE_OUTPUT_DIRECTORY + ${OBJS_DIR}) -file(GLOB_RECURSE KPHP_RUNTIME_ALL_HEADERS - RELATIVE ${BASE_DIR} - CONFIGURE_DEPENDS - "${RUNTIME_LIGHT_DIR}/*.h") +file( + GLOB_RECURSE KPHP_RUNTIME_ALL_HEADERS + RELATIVE ${BASE_DIR} + CONFIGURE_DEPENDS "${RUNTIME_LIGHT_DIR}/*.h") list(TRANSFORM KPHP_RUNTIME_ALL_HEADERS REPLACE "^(.+)$" [[#include "\1"]]) list(JOIN KPHP_RUNTIME_ALL_HEADERS "\n" MERGED_RUNTIME_HEADERS) -file(WRITE ${AUTO_DIR}/runtime/runtime-headers.h "\ +file( + WRITE ${AUTO_DIR}/runtime/runtime-headers.h + "\ #ifndef MERGED_RUNTIME_LIGHT_HEADERS_H #define MERGED_RUNTIME_LIGHT_HEADERS_H @@ -50,25 +58,37 @@ ${MERGED_RUNTIME_HEADERS} #endif ") -file(WRITE ${CMAKE_CURRENT_BINARY_DIR}/php_lib_version.cpp - [[ +file( + WRITE ${CMAKE_CURRENT_BINARY_DIR}/php_lib_version.cpp + [[ #include "auto/runtime/runtime-headers.h" ]]) -add_library(php_lib_version_j OBJECT ${CMAKE_CURRENT_BINARY_DIR}/php_lib_version.cpp) +add_library(php_lib_version_j OBJECT + ${CMAKE_CURRENT_BINARY_DIR}/php_lib_version.cpp) target_compile_options(php_lib_version_j PRIVATE -I. -E) add_dependencies(php_lib_version_j kphp-light-runtime) -add_custom_command(OUTPUT ${OBJS_DIR}/php_lib_version.sha256 - COMMAND tail -n +3 $ | sha256sum | awk '{print $$1}' > ${OBJS_DIR}/php_lib_version.sha256 - DEPENDS php_lib_version_j $ - COMMENT "php_lib_version.sha256 generation") +add_custom_command( + OUTPUT ${OBJS_DIR}/php_lib_version.sha256 + COMMAND tail -n +3 $ | sha256sum | awk + '{print $$1}' > ${OBJS_DIR}/php_lib_version.sha256 + DEPENDS php_lib_version_j $ + COMMENT "php_lib_version.sha256 generation") -add_custom_target(php_lib_version_sha_256 DEPENDS ${OBJS_DIR}/php_lib_version.sha256) +add_custom_target(php_lib_version_sha_256 + DEPENDS ${OBJS_DIR}/php_lib_version.sha256) -get_property(RUNTIME_COMPILE_FLAGS TARGET runtime-light PROPERTY COMPILE_OPTIONS) -get_property(RUNTIME_INCLUDE_DIRS TARGET runtime-light PROPERTY INCLUDE_DIRECTORIES) +get_property( + RUNTIME_COMPILE_FLAGS + TARGET runtime-light + PROPERTY COMPILE_OPTIONS) +get_property( + RUNTIME_INCLUDE_DIRS + TARGET runtime-light + PROPERTY INCLUDE_DIRECTORIES) -list (JOIN RUNTIME_COMPILE_FLAGS "\;" RUNTIME_COMPILE_FLAGS) +list(JOIN RUNTIME_COMPILE_FLAGS "\;" RUNTIME_COMPILE_FLAGS) string(REPLACE "\"" "\\\"" RUNTIME_COMPILE_FLAGS ${RUNTIME_COMPILE_FLAGS}) -configure_file(${BASE_DIR}/compiler/runtime_compile_flags.h.in ${AUTO_DIR}/compiler/runtime_compile_flags.h) +configure_file(${BASE_DIR}/compiler/runtime_compile_flags.h.in + ${AUTO_DIR}/compiler/runtime_compile_flags.h) diff --git a/runtime-light/runtime-light.cpp b/runtime-light/runtime-light.cpp index 9725c64520..9ffc1a2bf9 100644 --- a/runtime-light/runtime-light.cpp +++ b/runtime-light/runtime-light.cpp @@ -2,9 +2,11 @@ // Copyright (c) 2024 LLC «V Kontakte» // Distributed under the GPL v3 License, see LICENSE.notice.txt +#include "runtime-core/utils/kphp-assert-core.h" #include "runtime-light/component/component.h" #include "runtime-light/component/image.h" #include "runtime-light/core/globals/php-init-scripts.h" +#include "runtime-light/utils/context.h" ImageState *vk_k2_create_image_state(const struct PlatformCtx *pt_ctx) { // Note that in vk_k2_create_image_state available only allocator and logs from pt_ctx @@ -47,26 +49,10 @@ PollStatus vk_k2_poll(const ImageState *image_state, const PlatformCtx *pt_ctx, platformCtx = pt_ctx; componentState = component_ctx; - componentState->resume_if_was_rescheduled(); - uint64_t stream_d = 0; - while (platformCtx->take_update(&stream_d) && componentState->not_finished()) { - php_debug("take update on stream %lu", stream_d); - StreamStatus status; - GetStatusResult res = platformCtx->get_stream_status(stream_d, &status); - if (res != GetStatusOk) { - php_warning("get stream status %d", res); - } - php_debug("stream status %d, %d, %d", status.read_status, status.write_status, status.please_shutdown_write); - php_debug("opened_streams size %zu", componentState->opened_streams.size()); - if (componentState->is_stream_already_being_processed(stream_d)) { - php_debug("update on processed stream %lu", stream_d); - componentState->resume_if_wait_stream(stream_d, status); - } else { - componentState->process_new_input_stream(stream_d); - } - } - - PollStatus poll_status = componentState->poll_status; + php_debug("vk_k2_poll started..."); + componentState->process_platform_updates(); + const auto poll_status = componentState->poll_status; + php_debug("vk_k2_poll finished with status: %d", poll_status); reset_thread_locals(); return poll_status; } diff --git a/runtime-light/scheduler/scheduler.cmake b/runtime-light/scheduler/scheduler.cmake new file mode 100644 index 0000000000..fab00a2389 --- /dev/null +++ b/runtime-light/scheduler/scheduler.cmake @@ -0,0 +1 @@ +prepend(RUNTIME_SCHEDULER_SRC scheduler/ scheduler.cpp) diff --git a/runtime-light/scheduler/scheduler.cpp b/runtime-light/scheduler/scheduler.cpp new file mode 100644 index 0000000000..c0bbe95a20 --- /dev/null +++ b/runtime-light/scheduler/scheduler.cpp @@ -0,0 +1,124 @@ +// Compiler for PHP (aka KPHP) +// Copyright (c) 2024 LLC «V Kontakte» +// Distributed under the GPL v3 License, see LICENSE.notice.txt + +#include "runtime-light/scheduler/scheduler.h" + +#include +#include +#include +#include +#include + +#include "runtime-light/component/component.h" +#include "runtime-light/utils/context.h" + +// === SimpleCoroutineScheduler =================================================================== + +SimpleCoroutineScheduler &SimpleCoroutineScheduler::get() noexcept { + return get_component_context()->scheduler; +} + +ScheduleStatus SimpleCoroutineScheduler::scheduleOnNoEvent() noexcept { + if (yield_coros.empty()) { + return ScheduleStatus::Skipped; + } + const auto coro{yield_coros.front()}; + yield_coros.pop_front(); + coro.resume(); + return ScheduleStatus::Resumed; +} + +ScheduleStatus SimpleCoroutineScheduler::scheduleOnIncomingStream() noexcept { + if (awaiting_for_stream_coros.empty()) { + return ScheduleStatus::Skipped; + } + const auto coro{awaiting_for_stream_coros.front()}; + awaiting_for_stream_coros.pop_front(); + coro.resume(); + return ScheduleStatus::Resumed; +} + +ScheduleStatus SimpleCoroutineScheduler::scheduleOnStreamUpdate(uint64_t stream_d) noexcept { + if (stream_d == INVALID_PLATFORM_DESCRIPTOR) { + return ScheduleStatus::Error; + } else if (const auto it_coro{awaiting_for_update_coros.find(stream_d)}; it_coro != awaiting_for_update_coros.cend()) { + const auto coro{it_coro->second}; + awaiting_for_update_coros.erase(it_coro); + coro.resume(); + return ScheduleStatus::Resumed; + } else { + return ScheduleStatus::Skipped; + } +} + +ScheduleStatus SimpleCoroutineScheduler::scheduleOnYield() noexcept { + return ScheduleStatus::Skipped; +} + +ScheduleStatus SimpleCoroutineScheduler::schedule(ScheduleEvent::EventT event) noexcept { + return std::visit( + [this](auto &&event) { + using event_t = std::remove_cvref_t; + if constexpr (std::is_same_v) { + return scheduleOnNoEvent(); + } else if constexpr (std::is_same_v) { + return scheduleOnIncomingStream(); + } else if constexpr (std::is_same_v) { + return scheduleOnStreamUpdate(event.stream_d); + } else if constexpr (std::is_same_v) { + return scheduleOnStreamUpdate(event.timer_d); + } else if constexpr (std::is_same_v) { + return scheduleOnYield(); + } else { + static_assert(false, "non-exhaustive visitor"); + } + }, + event); +} + +void SimpleCoroutineScheduler::suspend(SuspendToken token) noexcept { + const auto [coro, event]{token}; + std::visit( + [this, coro](auto &&event) { + using event_t = std::remove_cvref_t; + if constexpr (std::is_same_v) { + yield_coros.push_back(coro); + } else if constexpr (std::is_same_v) { + awaiting_for_stream_coros.push_back(coro); + } else if constexpr (std::is_same_v) { + if (event.stream_d == INVALID_PLATFORM_DESCRIPTOR) { + return; + } + awaiting_for_update_coros.emplace(event.stream_d, coro); + } else if constexpr (std::is_same_v) { + if (event.timer_d == INVALID_PLATFORM_DESCRIPTOR) { + return; + } + awaiting_for_update_coros.emplace(event.timer_d, coro); + } else { + static_assert(false, "non-exhaustive visitor"); + } + }, + event); +} + +void SimpleCoroutineScheduler::cancel(SuspendToken token) noexcept { + const auto [coro, event]{token}; + std::visit( + [this, coro](auto &&event) { + using event_t = std::remove_cvref_t; + if constexpr (std::is_same_v) { + yield_coros.erase(std::find(yield_coros.cbegin(), yield_coros.cend(), coro)); + } else if constexpr (std::is_same_v) { + awaiting_for_stream_coros.erase(std::find(awaiting_for_stream_coros.cbegin(), awaiting_for_stream_coros.cend(), coro)); + } else if constexpr (std::is_same_v) { + awaiting_for_update_coros.erase(event.stream_d); + } else if constexpr (std::is_same_v) { + awaiting_for_update_coros.erase(event.timer_d); + } else { + static_assert(false, "non-exhaustive visitor"); + } + }, + event); +} diff --git a/runtime-light/scheduler/scheduler.h b/runtime-light/scheduler/scheduler.h new file mode 100644 index 0000000000..c03723bf8f --- /dev/null +++ b/runtime-light/scheduler/scheduler.h @@ -0,0 +1,133 @@ +// Compiler for PHP (aka KPHP) +// Copyright (c) 2024 LLC «V Kontakte» +// Distributed under the GPL v3 License, see LICENSE.notice.txt + +#pragma once + +#include +#include +#include +#include +#include + +#include "runtime-core/memory-resource/resource_allocator.h" +#include "runtime-core/memory-resource/unsynchronized_pool_resource.h" +#include "runtime-light/utils/concepts.h" + +/** + * Supported types of updates: + * 1. NoEvent: there was not an update event, so it's up to scheduler what to do; + * 2. IncomingStream(uint64_t): there is a new stream; + * 3. UpdateOnStream(uint64_t): there is an update on some stream; + * 4. UpdateOnTimer(uint64_t): there is an update event on timer; + * 5. Yield: request to yield execution received. + */ +namespace ScheduleEvent { + +struct NoEvent {}; + +struct Yield {}; + +struct IncomingStream { + uint64_t stream_d{}; +}; + +struct UpdateOnStream { + uint64_t stream_d{}; +}; + +struct UpdateOnTimer { + uint64_t timer_d{}; +}; + +using EventT = std::variant; + +} // namespace ScheduleEvent + +enum class ScheduleStatus : uint8_t { Resumed, Skipped, Error }; + +/** + * Supported types of awaitable events: + * 1. Reschedule: yield execution, it's up to scheduler when the coroutine will continue its execution; + * 2. IncomingStream: wait for incoming stream; + * 3. UpdateOnStream(uint64_t): wait for update on specified stream; + * 4. UpdateOnTimer(uint64_t): wait for update on specified timer. + */ +namespace WaitEvent { + +struct Rechedule {}; + +struct IncomingStream {}; + +struct UpdateOnStream { + uint64_t stream_d{}; +}; + +struct UpdateOnTimer { + uint64_t timer_d{}; +}; + +using EventT = std::variant; + +}; // namespace WaitEvent + +/** + * SuspendToken type that binds an event and a coroutine waiting for that event. + */ +using SuspendToken = std::pair, WaitEvent::EventT>; + +/** + * Coroutine scheduler concept. + * + * Any type that is supposed to be used as a coroutine scheduler should conform to following interface: + * 1. be constructible from `memory_resource::unsyncrhonized_pool_resource&`; + * 2. have static `get` function that returns a reference to scheduler instance; + * 3. have `done` method that returns whether scheduler's scheduled all coroutines; + * 4. have `schedule` method that takes an event and schedules coroutines for execution; + * 5. have `suspend` method that suspends specified coroutine; + * 6. have `cancel` method that cancels specified SuspendToken. + */ +template +concept CoroutineSchedulerConcept = std::constructible_from + && requires(scheduler_t && s, ScheduleEvent::EventT schedule_event, SuspendToken token) { + { scheduler_t::get() } noexcept -> std::same_as; + { s.done() } noexcept -> std::convertible_to; + { s.schedule(schedule_event) } noexcept -> std::same_as; + { s.suspend(token) } noexcept -> std::same_as; + { s.cancel(token) } noexcept -> std::same_as; +}; + +// === SimpleCoroutineScheduler =================================================================== + +class SimpleCoroutineScheduler { + template + using unordered_map = memory_resource::stl::unordered_map; + + template + using deque = memory_resource::stl::deque; + + deque> yield_coros; + deque> awaiting_for_stream_coros; + unordered_map> awaiting_for_update_coros; + + ScheduleStatus scheduleOnNoEvent() noexcept; + ScheduleStatus scheduleOnIncomingStream() noexcept; + ScheduleStatus scheduleOnStreamUpdate(uint64_t) noexcept; + ScheduleStatus scheduleOnYield() noexcept; + +public: + explicit SimpleCoroutineScheduler(memory_resource::unsynchronized_pool_resource &memory_resource) noexcept + : yield_coros(deque>::allocator_type{memory_resource}) + , awaiting_for_stream_coros(deque>::allocator_type{memory_resource}) + , awaiting_for_update_coros(unordered_map>::allocator_type{memory_resource}) {} + + static SimpleCoroutineScheduler &get() noexcept; + + bool done() const noexcept { + return yield_coros.empty() && awaiting_for_stream_coros.empty() && awaiting_for_update_coros.empty(); + } + + ScheduleStatus schedule(ScheduleEvent::EventT) noexcept; + void suspend(SuspendToken) noexcept; + void cancel(SuspendToken) noexcept; +}; diff --git a/runtime-light/stdlib/fork/fork-api.cpp b/runtime-light/stdlib/fork/fork-api.cpp new file mode 100644 index 0000000000..b8e68746ce --- /dev/null +++ b/runtime-light/stdlib/fork/fork-api.cpp @@ -0,0 +1,24 @@ +// Compiler for PHP (aka KPHP) +// Copyright (c) 2024 LLC «V Kontakte» +// Distributed under the GPL v3 License, see LICENSE.notice.txt + +#include "runtime-light/stdlib/fork/fork-api.h" + +#include +#include + +#include "runtime-core/utils/kphp-assert-core.h" +#include "runtime-light/coroutine/awaitable.h" +#include "runtime-light/coroutine/task.h" + +task_t f$sched_yield() noexcept { + co_await wait_for_reschedule_t{}; +} + +task_t f$sched_yield_sleep(int64_t duration_ns) noexcept { + if (duration_ns < 0) { + php_warning("can't sleep for negative duration %" PRId64, duration_ns); + co_return; + } + co_await wait_for_timer_t{std::chrono::nanoseconds{static_cast(duration_ns)}}; +} diff --git a/runtime-light/stdlib/fork/fork-api.h b/runtime-light/stdlib/fork/fork-api.h new file mode 100644 index 0000000000..9ccb268e64 --- /dev/null +++ b/runtime-light/stdlib/fork/fork-api.h @@ -0,0 +1,45 @@ +// Compiler for PHP (aka KPHP) +// Copyright (c) 2024 LLC «V Kontakte» +// Distributed under the GPL v3 License, see LICENSE.notice.txt + +#pragma once + +#include +#include + +#include "runtime-core/core-types/decl/optional.h" +#include "runtime-core/utils/kphp-assert-core.h" +#include "runtime-light/coroutine/awaitable.h" +#include "runtime-light/coroutine/task.h" +#include "runtime-light/stdlib/fork/fork-context.h" + +namespace fork_api_impl_ { + +constexpr double WAIT_FORK_MAX_TIMEOUT = 86400.0; + +} // namespace fork_api_impl_ + +constexpr int64_t INVALID_FORK_ID = -1; + +template +requires(is_optional::value) task_t f$wait(int64_t fork_id, double timeout = -1.0) noexcept { + if (timeout < 0.0) { + timeout = fork_api_impl_::WAIT_FORK_MAX_TIMEOUT; + } + auto task_opt{ForkComponentContext::get().pop_fork(fork_id)}; + if (!task_opt.has_value()) { + php_warning("can't find fork %" PRId64, fork_id); + co_return T{}; + } + const auto timeout_ns{std::chrono::duration_cast(std::chrono::duration{timeout})}; + co_return co_await wait_fork_t>{*std::move(task_opt), timeout_ns}; +} + +template +requires(is_optional::value) task_t f$wait(Optional fork_id_opt, double timeout = -1.0) noexcept { + co_return co_await f$wait(fork_id_opt.has_value() ? fork_id_opt.val() : INVALID_FORK_ID, timeout); +} + +task_t f$sched_yield() noexcept; + +task_t f$sched_yield_sleep(int64_t duration_ns) noexcept; diff --git a/runtime-light/stdlib/fork/fork-context.cpp b/runtime-light/stdlib/fork/fork-context.cpp new file mode 100644 index 0000000000..9006243eac --- /dev/null +++ b/runtime-light/stdlib/fork/fork-context.cpp @@ -0,0 +1,12 @@ +// Compiler for PHP (aka KPHP) +// Copyright (c) 2024 LLC «V Kontakte» +// Distributed under the GPL v3 License, see LICENSE.notice.txt + +#include "runtime-light/stdlib/fork/fork-context.h" + +#include "runtime-light/component/component.h" +#include "runtime-light/utils/context.h" + +ForkComponentContext &ForkComponentContext::get() noexcept { + return get_component_context()->fork_component_context; +} diff --git a/runtime-light/stdlib/fork/fork-context.h b/runtime-light/stdlib/fork/fork-context.h new file mode 100644 index 0000000000..79bd0f6f24 --- /dev/null +++ b/runtime-light/stdlib/fork/fork-context.h @@ -0,0 +1,47 @@ +// Compiler for PHP (aka KPHP) +// Copyright (c) 2024 LLC «V Kontakte» +// Distributed under the GPL v3 License, see LICENSE.notice.txt + +#pragma once + +#include +#include + +#include "runtime-core/memory-resource/unsynchronized_pool_resource.h" +#include "runtime-core/utils/kphp-assert-core.h" +#include "runtime-light/coroutine/task.h" +#include "runtime-light/stdlib/fork/fork.h" +#include "runtime-light/utils/concepts.h" + +class ForkComponentContext { + template + using unordered_map = memory_resource::stl::unordered_map; + + static constexpr auto FORK_ID_INIT = 1; + + unordered_map> forks_; + int64_t next_fork_id_{FORK_ID_INIT}; + +public: + explicit ForkComponentContext(memory_resource::unsynchronized_pool_resource &memory_resource) noexcept + : forks_(unordered_map>::allocator_type{memory_resource}) {} + + static ForkComponentContext &get() noexcept; + + int64_t push_fork(task_t &&task) noexcept { + const auto fork_id{next_fork_id_++}; + forks_.emplace(fork_id, std::move(task)); + php_debug("ForkComponentContext: push fork %" PRId64, fork_id); + return fork_id; + } + + std::optional> pop_fork(int64_t fork_id) noexcept { + if (const auto it_fork{forks_.find(fork_id)}; it_fork != forks_.cend()) { + php_debug("ForkComponentContext: pop fork %" PRId64, fork_id); + auto fork{std::move(it_fork->second)}; + forks_.erase(it_fork); + return {std::move(fork)}; + } + return std::nullopt; + } +}; diff --git a/runtime-light/stdlib/fork/fork.h b/runtime-light/stdlib/fork/fork.h new file mode 100644 index 0000000000..0837de8d38 --- /dev/null +++ b/runtime-light/stdlib/fork/fork.h @@ -0,0 +1,27 @@ +// Compiler for PHP (aka KPHP) +// Copyright (c) 2024 LLC «V Kontakte» +// Distributed under the GPL v3 License, see LICENSE.notice.txt + +#pragma once + +#include +#include +#include + +#include "runtime-core/runtime-core.h" +#include "runtime-core/utils/small-object-storage.h" + +class fork_result { + small_object_storage storage{}; + +public: + template + requires(!std::same_as) explicit fork_result(T &&t) noexcept { + storage.emplace>(std::forward(t)); + } + + template + T get_result() noexcept { + return *storage.get(); + } +}; diff --git a/runtime-light/stdlib/misc.cpp b/runtime-light/stdlib/misc.cpp index aae4bca717..daca639574 100644 --- a/runtime-light/stdlib/misc.cpp +++ b/runtime-light/stdlib/misc.cpp @@ -4,73 +4,74 @@ #include "runtime-light/stdlib/misc.h" +#include + #include "runtime-light/component/component.h" #include "runtime-light/coroutine/awaitable.h" -#include "runtime-light/utils/json-functions.h" +#include "runtime-light/header.h" +#include "runtime-light/stdlib/superglobals.h" +#include "runtime-light/streams/streams.h" +#include "runtime-light/utils/context.h" #include "runtime-light/utils/panic.h" -static int ob_merge_buffers() { +namespace { + +int32_t ob_merge_buffers() { Response &response = get_component_context()->response; php_assert(response.current_buffer >= 0); - int ob_first_not_empty = 0; + int32_t ob_first_not_empty = 0; while (ob_first_not_empty < response.current_buffer && response.output_buffers[ob_first_not_empty].size() == 0) { ob_first_not_empty++; } - for (int i = ob_first_not_empty + 1; i <= response.current_buffer; i++) { + for (auto i = ob_first_not_empty + 1; i <= response.current_buffer; i++) { response.output_buffers[ob_first_not_empty].append(response.output_buffers[i].c_str(), response.output_buffers[i].size()); } return ob_first_not_empty; } -task_t parse_input_query(QueryType query_type) { - ComponentState &ctx = *get_component_context(); - php_assert(ctx.standard_stream == 0); - co_await wait_incoming_query_t{}; - ctx.standard_stream = ctx.incoming_pending_queries.front(); - ctx.incoming_pending_queries.pop_front(); - ctx.opened_streams[ctx.standard_stream] = StreamRuntimeStatus::NotBlocked; +} // namespace - if (query_type == QueryType::HTTP) { - auto [buffer, size] = co_await read_all_from_stream(ctx.standard_stream); - init_http_superglobals(buffer, size); - get_platform_context()->allocator.free(buffer); - } else if (query_type == QueryType::COMPONENT) { - // Processing takes place in the calling function - } else { - php_critical_error("unexpected query type %d in parse_input_query", static_cast(query_type)); +task_t wait_and_process_incoming_stream(QueryType query_type) { + const auto incoming_stream_d{co_await wait_for_incoming_stream_t{}}; + switch (query_type) { + case QueryType::HTTP: { + const auto [buffer, size] = co_await read_all_from_stream(incoming_stream_d); + init_http_superglobals(buffer, size); + get_platform_context()->allocator.free(buffer); + break; + } + case QueryType::COMPONENT: { // processing takes place in a component + break; + } } - co_return; + co_return incoming_stream_d; } -task_t finish(int64_t exit_code, bool from_exit) { +task_t finish(int64_t exit_code, bool from_exit) { // TODO: use exit code (void)from_exit; (void)exit_code; - // todo:k2 use exit_code - ComponentState &ctx = *get_component_context(); - if (ctx.standard_stream == 0) { + auto &component_ctx{*get_component_context()}; + const auto standard_stream{component_ctx.standard_stream()}; + if (standard_stream == INVALID_PLATFORM_DESCRIPTOR) { + component_ctx.poll_status = PollStatus::PollFinishedError; co_return; } - int ob_total_buffer = ob_merge_buffers(); - Response &response = ctx.response; + + const auto ob_total_buffer = ob_merge_buffers(); + Response &response = component_ctx.response; auto &buffer = response.output_buffers[ob_total_buffer]; - bool ok = co_await write_all_to_stream(ctx.standard_stream, buffer.c_str(), buffer.size()); - if (!ok) { - php_warning("cannot write component result to input stream %lu", ctx.standard_stream); + if (co_await write_all_to_stream(standard_stream, buffer.c_str(), buffer.size())) { + php_warning("can't write component result to stream %" PRIu64, standard_stream); } - free_all_descriptors(); - ctx.poll_status = PollStatus::PollFinishedOk; - co_return; -} - -task_t f$testyield() { - co_await test_yield_t{}; + component_ctx.release_all_streams(); + component_ctx.poll_status = PollStatus::PollFinishedOk; } void f$check_shutdown() { - const PlatformCtx &ptx = *get_platform_context(); - if (get_platform_context()->please_graceful_shutdown.load()) { + const auto &platform_ctx{*get_platform_context()}; + if (static_cast(get_platform_context()->please_graceful_shutdown.load())) { php_notice("script was graceful shutdown"); - ptx.abort(); + platform_ctx.abort(); } } @@ -88,4 +89,4 @@ task_t f$exit(const mixed &v) { void f$die([[maybe_unused]] const mixed &v) { get_component_context()->poll_status = PollStatus::PollFinishedOk; critical_error_handler(); -} \ No newline at end of file +} diff --git a/runtime-light/stdlib/misc.h b/runtime-light/stdlib/misc.h index 6fabb051a5..1194e781ff 100644 --- a/runtime-light/stdlib/misc.h +++ b/runtime-light/stdlib/misc.h @@ -4,12 +4,12 @@ #pragma once +#include + #include "runtime-core/runtime-core.h" #include "runtime-light/coroutine/task.h" #include "runtime-light/stdlib/superglobals.h" -task_t f$testyield(); - void f$check_shutdown(); task_t f$exit(const mixed &v = 0); @@ -18,5 +18,6 @@ void f$die(const mixed &v = 0); void reset(); -task_t parse_input_query(QueryType query_type); +task_t wait_and_process_incoming_stream(QueryType query_type); + task_t finish(int64_t exit_code, bool from_exit); diff --git a/runtime-light/stdlib/output-control.cpp b/runtime-light/stdlib/output-control.cpp index 8aa7eeb6aa..e6637044f2 100644 --- a/runtime-light/stdlib/output-control.cpp +++ b/runtime-light/stdlib/output-control.cpp @@ -6,6 +6,7 @@ #include "runtime-light/component/component.h" #include "runtime-light/stdlib/string-functions.h" +#include "runtime-light/utils/context.h" static constexpr int system_level_buffer = 0; @@ -41,7 +42,7 @@ string f$ob_get_content() { void f$ob_start(const string &callback) { Response &httpResponse = get_component_context()->response; - if (httpResponse.current_buffer + 1 == httpResponse.ob_max_buffers) { + if (httpResponse.current_buffer + 1 == Response::ob_max_buffers) { php_warning("Maximum nested level of output buffering reached. Can't do ob_start(%s)", callback.c_str()); return; } diff --git a/runtime-light/stdlib/rpc/rpc-context.cpp b/runtime-light/stdlib/rpc/rpc-context.cpp index 6f3c667329..14b305d2bd 100644 --- a/runtime-light/stdlib/rpc/rpc-context.cpp +++ b/runtime-light/stdlib/rpc/rpc-context.cpp @@ -6,6 +6,7 @@ #include "runtime-light/component/component.h" #include "runtime-light/component/image.h" +#include "runtime-light/utils/context.h" RpcComponentContext::RpcComponentContext(memory_resource::unsynchronized_pool_resource &memory_resource) : current_query() diff --git a/runtime-light/stdlib/stdlib.cmake b/runtime-light/stdlib/stdlib.cmake index 74df2b9223..d7956932a3 100644 --- a/runtime-light/stdlib/stdlib.cmake +++ b/runtime-light/stdlib/stdlib.cmake @@ -1,15 +1,18 @@ -prepend(RUNTIME_STDLIB_SRC stdlib/ - interface.cpp - misc.cpp - output-control.cpp - string-functions.cpp - variable-handling.cpp - superglobals.cpp - rpc/rpc-api.cpp - rpc/rpc-context.cpp - rpc/rpc-extra-headers.cpp - rpc/rpc-extra-info.cpp - rpc/rpc-tl-error.cpp - rpc/rpc-tl-query.cpp - rpc/rpc-tl-request.cpp -) +prepend( + RUNTIME_STDLIB_SRC + stdlib/ + interface.cpp + misc.cpp + output-control.cpp + string-functions.cpp + variable-handling.cpp + superglobals.cpp + fork/fork-api.cpp + fork/fork-context.cpp + rpc/rpc-api.cpp + rpc/rpc-context.cpp + rpc/rpc-extra-headers.cpp + rpc/rpc-extra-info.cpp + rpc/rpc-tl-error.cpp + rpc/rpc-tl-query.cpp + rpc/rpc-tl-request.cpp) diff --git a/runtime-light/stdlib/string-functions.cpp b/runtime-light/stdlib/string-functions.cpp index 055919c34f..3a839a66d1 100644 --- a/runtime-light/stdlib/string-functions.cpp +++ b/runtime-light/stdlib/string-functions.cpp @@ -5,6 +5,7 @@ #include "runtime-light/stdlib/string-functions.h" #include "runtime-light/component/component.h" +#include "runtime-light/utils/context.h" void print(const char *s, size_t s_len) { Response &response = get_component_context()->response; diff --git a/runtime-light/stdlib/superglobals.cpp b/runtime-light/stdlib/superglobals.cpp index 9ca0be6126..d53a5efe90 100644 --- a/runtime-light/stdlib/superglobals.cpp +++ b/runtime-light/stdlib/superglobals.cpp @@ -5,6 +5,7 @@ #include "runtime-light/stdlib/superglobals.h" #include "runtime-light/component/component.h" +#include "runtime-light/utils/context.h" #include "runtime-light/utils/json-functions.h" void init_http_superglobals(const char *buffer, int size) { diff --git a/runtime-light/stdlib/superglobals.h b/runtime-light/stdlib/superglobals.h index 74ae2e802e..9bace21303 100644 --- a/runtime-light/stdlib/superglobals.h +++ b/runtime-light/stdlib/superglobals.h @@ -4,9 +4,6 @@ #pragma once -#include "runtime-core/runtime-core.h" -#include "runtime-light/coroutine/task.h" - enum class QueryType { HTTP, COMPONENT }; void init_http_superglobals(const char *buffer, int size); diff --git a/runtime-light/stdlib/timer/timer.h b/runtime-light/stdlib/timer/timer.h new file mode 100644 index 0000000000..fb34dd28d3 --- /dev/null +++ b/runtime-light/stdlib/timer/timer.h @@ -0,0 +1,29 @@ +// Compiler for PHP (aka KPHP) +// Copyright (c) 2024 LLC «V Kontakte» +// Distributed under the GPL v3 License, see LICENSE.notice.txt + +#pragma once + +#include +#include +#include +#include + +#include "runtime-core/utils/kphp-assert-core.h" +#include "runtime-light/coroutine/awaitable.h" +#include "runtime-light/coroutine/task.h" + +template +task_t f$set_timer(int64_t timeout_ms, T &&on_timer_callback) noexcept { + if (timeout_ms < 0) { + php_warning("can't set timer for negative duration %" PRId64 "ms", timeout_ms); + co_return; + } + const auto fork_f{[](std::chrono::nanoseconds duration, T &&on_timer_callback) -> task_t { + co_await wait_for_timer_t{duration}; + on_timer_callback(); + co_return 0; + }}; // TODO: someone should pop that fork from ForkComponentContext since it will stay there unless we perform f$wait on fork + const auto duration_ms{std::chrono::milliseconds{static_cast(timeout_ms)}}; + co_await start_fork_and_reschedule_t(fork_f(std::chrono::duration_cast(duration_ms), std::forward(on_timer_callback))); +} diff --git a/runtime-light/stdlib/variable-handling.cpp b/runtime-light/stdlib/variable-handling.cpp index 7b2ee44ab8..dba94a95b3 100644 --- a/runtime-light/stdlib/variable-handling.cpp +++ b/runtime-light/stdlib/variable-handling.cpp @@ -7,7 +7,7 @@ #include "runtime-core/runtime-core.h" #include "runtime-light/component/component.h" #include "runtime-light/stdlib/output-control.h" -#include "runtime-light/utils/php_assert.h" +#include "runtime-light/utils/context.h" void do_print_r(const mixed &v, int depth) { if (depth == 10) { @@ -208,4 +208,4 @@ string f$print_r(const mixed &v, bool buffered) { void f$var_dump(const mixed &v) { do_var_dump(v, 0); -} \ No newline at end of file +} diff --git a/runtime-light/streams/component-stream.cpp b/runtime-light/streams/component-stream.cpp index b7c9a757e1..2956c949a6 100644 --- a/runtime-light/streams/component-stream.cpp +++ b/runtime-light/streams/component-stream.cpp @@ -2,40 +2,60 @@ // Copyright (c) 2024 LLC «V Kontakte» // Distributed under the GPL v3 License, see LICENSE.notice.txt +#include + +#include "runtime-light/component/component.h" +#include "runtime-light/header.h" #include "runtime-light/streams/component-stream.h" +#include "runtime-light/utils/context.h" + +const char *C$ComponentStream::get_class() const noexcept { + return "ComponentStream"; +} + +int32_t C$ComponentStream::get_hash() const noexcept { + return static_cast(vk::std_hash(vk::string_view(C$ComponentStream::get_class()))); +} + +C$ComponentStream::~C$ComponentStream() { + auto &component_ctx{*get_component_context()}; + if (component_ctx.opened_streams().contains(stream_d)) { + component_ctx.release_stream(stream_d); + } +} bool f$ComponentStream$$is_read_closed(const class_instance &stream) { - StreamStatus status; - GetStatusResult res = get_platform_context()->get_stream_status(stream.get()->stream_d, &status); - if (res != GetStatusOk) { - php_warning("cannot get stream status error %d", res); + StreamStatus status{}; + if (const auto status_res{get_platform_context()->get_stream_status(stream.get()->stream_d, std::addressof(status))}; + status_res != GetStatusResult::GetStatusOk) { + php_warning("stream status error %d", status_res); return true; } - return status.read_status == IOClosed; + return status.read_status == IOStatus::IOClosed; } bool f$ComponentStream$$is_write_closed(const class_instance &stream) { - StreamStatus status; - GetStatusResult res = get_platform_context()->get_stream_status(stream.get()->stream_d, &status); - if (res != GetStatusOk) { - php_warning("cannot get stream status error %d", res); + StreamStatus status{}; + if (const auto status_res{get_platform_context()->get_stream_status(stream.get()->stream_d, std::addressof(status))}; + status_res != GetStatusResult::GetStatusOk) { + php_warning("stream status error %d", status_res); return true; } - return status.write_status == IOClosed; + return status.write_status == IOStatus::IOClosed; } bool f$ComponentStream$$is_please_shutdown_write(const class_instance &stream) { - StreamStatus status; - GetStatusResult res = get_platform_context()->get_stream_status(stream.get()->stream_d, &status); - if (res != GetStatusOk) { - php_warning("cannot get stream status error %d", res); + StreamStatus status{}; + if (const auto status_res{get_platform_context()->get_stream_status(stream.get()->stream_d, std::addressof(status))}; + status_res != GetStatusResult::GetStatusOk) { + php_warning("stream status error %d", status_res); return true; } return status.please_shutdown_write; } void f$ComponentStream$$close(const class_instance &stream) { - free_descriptor(stream->stream_d); + get_component_context()->release_stream(stream->stream_d); } void f$ComponentStream$$shutdown_write(const class_instance &stream) { @@ -45,3 +65,20 @@ void f$ComponentStream$$shutdown_write(const class_instance & void f$ComponentStream$$please_shutdown_write(const class_instance &stream) { get_platform_context()->please_shutdown_write(stream->stream_d); } + +// ================================================================================================ + +const char *C$ComponentQuery::get_class() const noexcept { + return "ComponentQuery"; +} + +int32_t C$ComponentQuery::get_hash() const noexcept { + return static_cast(vk::std_hash(vk::string_view(C$ComponentQuery::get_class()))); +} + +C$ComponentQuery::~C$ComponentQuery() { + auto &component_ctx{*get_component_context()}; + if (component_ctx.opened_streams().contains(stream_d)) { + component_ctx.release_stream(stream_d); + } +} diff --git a/runtime-light/streams/component-stream.h b/runtime-light/streams/component-stream.h index ee0f8b7b0d..f5edb1315c 100644 --- a/runtime-light/streams/component-stream.h +++ b/runtime-light/streams/component-stream.h @@ -4,41 +4,26 @@ #pragma once -#include "common/algorithms/hashes.h" -#include "common/wrappers/string_view.h" #include "runtime-core/class-instance/refcountable-php-classes.h" -#include "runtime-light/streams/streams.h" struct C$ComponentStream final : public refcountable_php_classes { uint64_t stream_d{}; - const char *get_class() const noexcept { - return "ComponentStream"; - } + const char *get_class() const noexcept; - int32_t get_hash() const noexcept { - return static_cast(vk::std_hash(vk::string_view(C$ComponentStream::get_class()))); - } + int32_t get_hash() const noexcept; - ~C$ComponentStream() { - free_descriptor(stream_d); - } + ~C$ComponentStream(); }; struct C$ComponentQuery final : public refcountable_php_classes { uint64_t stream_d{}; - const char *get_class() const noexcept { - return "ComponentQuery"; - } + const char *get_class() const noexcept; - int32_t get_hash() const noexcept { - return static_cast(vk::std_hash(vk::string_view(C$ComponentQuery::get_class()))); - } + int32_t get_hash() const noexcept; - ~C$ComponentQuery() { - free_descriptor(stream_d); - } + ~C$ComponentQuery(); }; bool f$ComponentStream$$is_read_closed(const class_instance &stream); diff --git a/runtime-light/streams/interface.cpp b/runtime-light/streams/interface.cpp index ad9fddf1cc..0bf69dea59 100644 --- a/runtime-light/streams/interface.cpp +++ b/runtime-light/streams/interface.cpp @@ -4,106 +4,91 @@ #include "runtime-light/streams/interface.h" +#include + +#include "runtime-core/runtime-core.h" +#include "runtime-core/utils/kphp-assert-core.h" #include "runtime-light/component/component.h" -#include "runtime-light/coroutine/awaitable.h" +#include "runtime-light/header.h" #include "runtime-light/stdlib/misc.h" +#include "runtime-light/streams/component-stream.h" #include "runtime-light/streams/streams.h" +#include "runtime-light/utils/context.h" task_t f$component_get_http_query() { - ComponentState &ctx = *get_component_context(); - if (ctx.standard_stream != 0) { - php_warning("previous incoming stream does not closed"); - ctx.standard_stream = 0; - } - co_await parse_input_query(QueryType::HTTP); + std::ignore = co_await wait_and_process_incoming_stream(QueryType::HTTP); } -task_t> f$component_client_send_query(const string &name, const string &message) { +task_t> f$component_client_send_query(string name, string message) { class_instance query; - const PlatformCtx &ptx = *get_platform_context(); - uint64_t stream_d{}; - OpenStreamResult res = ptx.open(name.size(), name.c_str(), &stream_d); - if (res != OpenStreamOk) { - php_warning("cannot open stream"); + const auto stream_d{get_component_context()->open_stream(name)}; + if (stream_d == INVALID_PLATFORM_DESCRIPTOR) { + php_warning("can't send client query"); co_return query; } - int writed = co_await write_all_to_stream(stream_d, message.c_str(), message.size()); - ptx.shutdown_write(stream_d); - php_debug("send %d bytes from %d to \"%s\" on stream %lu", writed, message.size(), name.c_str(), stream_d); + + int32_t written{co_await write_all_to_stream(stream_d, message.c_str(), message.size())}; + get_platform_context()->shutdown_write(stream_d); query.alloc(); query.get()->stream_d = stream_d; + php_debug("send %d bytes from %d to \"%s\" on stream %" PRIu64, written, message.size(), name.c_str(), stream_d); co_return query; } task_t f$component_client_get_result(class_instance query) { php_assert(!query.is_null()); - uint64_t stream_d = query.get()->stream_d; - if (stream_d == 0) { - php_warning("cannot get component client result"); - co_return string(); + uint64_t stream_d{query.get()->stream_d}; + if (stream_d == INVALID_PLATFORM_DESCRIPTOR) { + php_warning("can't get component client result"); + co_return string{}; } - auto [buffer, size] = co_await read_all_from_stream(stream_d); - string result; - result.assign(buffer, size); - free_descriptor(stream_d); - query.get()->stream_d = 0; - php_debug("read %d bytes from stream %lu", size, stream_d); + const auto [buffer, size]{co_await read_all_from_stream(stream_d)}; + string result{buffer, static_cast(size)}; + get_platform_context()->allocator.free(buffer); + get_component_context()->release_stream(stream_d); + query.get()->stream_d = INVALID_PLATFORM_DESCRIPTOR; + php_debug("read %d bytes from stream %" PRIu64, size, stream_d); co_return result; } -task_t f$component_server_send_result(const string &message) { - ComponentState &ctx = *get_component_context(); - bool ok = co_await write_all_to_stream(ctx.standard_stream, message.c_str(), message.size()); - if (!ok) { - php_warning("cannot send component result"); +task_t f$component_server_send_result(string message) { + auto &component_ctx{*get_component_context()}; + const auto standard_stream{component_ctx.standard_stream()}; + if (!co_await write_all_to_stream(standard_stream, message.c_str(), message.size())) { + php_warning("can't send component result"); } else { php_debug("send result \"%s\"", message.c_str()); } - free_descriptor(ctx.standard_stream); - ctx.standard_stream = 0; + component_ctx.release_stream(standard_stream); } task_t f$component_server_get_query() { - ComponentState &ctx = *get_component_context(); - if (ctx.standard_stream != 0) { - ctx.standard_stream = 0; - } - co_await parse_input_query(QueryType::COMPONENT); - auto [buffer, size] = co_await read_all_from_stream(ctx.standard_stream); - string query = string(buffer, size); + const auto incoming_stream_d{co_await wait_and_process_incoming_stream(QueryType::COMPONENT)}; + const auto [buffer, size] = co_await read_all_from_stream(incoming_stream_d); + string result{buffer, static_cast(size)}; get_platform_context()->allocator.free(buffer); - co_return query; + co_return result; } task_t> f$component_accept_stream() { - ComponentState &ctx = *get_component_context(); - if (ctx.standard_stream != 0) { - php_warning("previous stream does not closed"); - free_descriptor(ctx.standard_stream); - ctx.standard_stream = 0; - } - co_await parse_input_query(QueryType::COMPONENT); + const auto incoming_stream_d{co_await wait_and_process_incoming_stream(QueryType::COMPONENT)}; class_instance stream; stream.alloc(); - stream.get()->stream_d = ctx.standard_stream; + stream.get()->stream_d = incoming_stream_d; co_return stream; } class_instance f$component_open_stream(const string &name) { + auto &component_ctx = *get_component_context(); + class_instance query; - const PlatformCtx &ptx = *get_platform_context(); - ComponentState &ctx = *get_component_context(); - uint64_t stream_d{}; - OpenStreamResult res = ptx.open(name.size(), name.c_str(), &stream_d); - if (res != OpenStreamOk) { - php_warning("cannot open stream"); + const auto stream_d{component_ctx.open_stream(name)}; + if (stream_d == INVALID_PLATFORM_DESCRIPTOR) { return query; } - ctx.opened_streams[stream_d] = StreamRuntimeStatus::NotBlocked; query.alloc(); query.get()->stream_d = stream_d; - php_debug("open stream %lu to %s", stream_d, name.c_str()); return query; } @@ -112,37 +97,38 @@ int64_t f$component_stream_write_nonblock(const class_instance &stream) { - auto [ptr, size] = read_nonblock_from_stream(stream.get()->stream_d); - string result(ptr, size); - get_platform_context()->allocator.free(ptr); + const auto [buffer, size] = read_nonblock_from_stream(stream.get()->stream_d); + string result{buffer, static_cast(size)}; + get_platform_context()->allocator.free(buffer); // FIXME: do we need platform memory? return result; } -task_t f$component_stream_write_exact(const class_instance &stream, const string &message) { - int write = co_await write_exact_to_stream(stream->stream_d, message.c_str(), message.size()); - php_debug("write exact %d bytes to stream %lu", write, stream->stream_d); - co_return write; +task_t f$component_stream_write_exact(class_instance stream, string message) { + const auto written = co_await write_exact_to_stream(stream->stream_d, message.c_str(), message.size()); + php_debug("wrote exact %d bytes to stream %" PRIu64, written, stream->stream_d); + co_return written; } -task_t f$component_stream_read_exact(const class_instance &stream, int64_t len) { - char *buffer = static_cast(RuntimeAllocator::current().alloc_script_memory(len)); - int read = co_await read_exact_from_stream(stream->stream_d, buffer, len); - string result(buffer, read); +task_t f$component_stream_read_exact(class_instance stream, int64_t len) { + auto *buffer = static_cast(RuntimeAllocator::current().alloc_script_memory(len)); + const auto read = co_await read_exact_from_stream(stream->stream_d, buffer, len); + string result{buffer, static_cast(read)}; RuntimeAllocator::current().free_script_memory(buffer, len); - php_debug("read exact %d bytes from stream %lu", read, stream->stream_d); + php_debug("read exact %d bytes from stream %" PRIu64, read, stream->stream_d); co_return result; } void f$component_close_stream(const class_instance &stream) { - free_descriptor(stream->stream_d); + get_component_context()->release_stream(stream.get()->stream_d); + stream.get()->stream_d = INVALID_PLATFORM_DESCRIPTOR; // TODO: convert stream object to null? } void f$component_finish_stream_processing(const class_instance &stream) { - ComponentState &ctx = *get_component_context(); - if (stream->stream_d != ctx.standard_stream) { - php_warning("call server finish query on non server stream %lu", stream->stream_d); + auto &component_ctx = *get_component_context(); + if (stream.get()->stream_d != component_ctx.standard_stream()) { + php_warning("call server finish query on non server stream %lu", stream.get()->stream_d); return; } - free_descriptor(ctx.standard_stream); - ctx.standard_stream = 0; + component_ctx.release_stream(component_ctx.standard_stream()); + stream.get()->stream_d = INVALID_PLATFORM_DESCRIPTOR; } diff --git a/runtime-light/streams/interface.h b/runtime-light/streams/interface.h index 8e48b67539..3890e6930b 100644 --- a/runtime-light/streams/interface.h +++ b/runtime-light/streams/interface.h @@ -13,29 +13,32 @@ constexpr int64_t v$COMPONENT_ERROR = -1; task_t f$component_get_http_query(); -/** - * component query client blocked interface - * */ -task_t> f$component_client_send_query(const string &name, const string &message); +// === component query client blocked interface =================================================== + +task_t> f$component_client_send_query(string name, string message); + task_t f$component_client_get_result(class_instance query); -/** - * component query server blocked interface - * */ +// === component query server blocked interface =================================================== + task_t f$component_server_get_query(); -task_t f$component_server_send_result(const string &message); -/** - * component query low-level interface - * */ +task_t f$component_server_send_result(string message); + +// === component stream low-level interface ======================================================= + class_instance f$component_open_stream(const string &name); + task_t> f$component_accept_stream(); int64_t f$component_stream_write_nonblock(const class_instance &stream, const string &message); + string f$component_stream_read_nonblock(const class_instance &stream); -task_t f$component_stream_write_exact(const class_instance &stream, const string &message); -task_t f$component_stream_read_exact(const class_instance &stream, int64_t len); +task_t f$component_stream_write_exact(class_instance stream, string message); + +task_t f$component_stream_read_exact(class_instance stream, int64_t len); void f$component_close_stream(const class_instance &stream); + void f$component_finish_stream_processing(const class_instance &stream); diff --git a/runtime-light/streams/streams.cpp b/runtime-light/streams/streams.cpp index 66d24bdb1d..158e9fda00 100644 --- a/runtime-light/streams/streams.cpp +++ b/runtime-light/streams/streams.cpp @@ -4,93 +4,99 @@ #include "runtime-light/streams/streams.h" -#include "runtime-light/component/component.h" +#include +#include +#include +#include + +#include "runtime-core/utils/kphp-assert-core.h" #include "runtime-light/coroutine/awaitable.h" +#include "runtime-light/header.h" #include "runtime-light/utils/context.h" -task_t> read_all_from_stream(uint64_t stream_d) { - co_await test_yield_t{}; +task_t> read_all_from_stream(uint64_t stream_d) { + const auto &platform_ctx = *get_platform_context(); + constexpr int32_t batch_size = 32; - constexpr int batch_size = 32; - const PlatformCtx &ptx = *get_platform_context(); - int buffer_capacity = batch_size; - char *buffer = static_cast(ptx.allocator.alloc(buffer_capacity)); - int buffer_size = 0; - StreamStatus status; + int32_t buffer_capacity = batch_size; + auto *buffer = static_cast(platform_ctx.allocator.alloc(buffer_capacity)); + int32_t buffer_size = 0; + StreamStatus status{}; do { - GetStatusResult res = ptx.get_stream_status(stream_d, &status); - if (res != GetStatusOk) { - php_warning("get stream status return status %d", res); + GetStatusResult res = platform_ctx.get_stream_status(stream_d, std::addressof(status)); + if (res != GetStatusResult::GetStatusOk) { + php_warning("get stream status returned status %d", res); co_return std::make_pair(nullptr, 0); } - if (status.read_status == IOAvailable) { + if (status.read_status == IOStatus::IOAvailable) { if (buffer_capacity - buffer_size < batch_size) { - char *new_buffer = static_cast(ptx.allocator.alloc(buffer_capacity * 2)); - memcpy(new_buffer, buffer, buffer_size); - ptx.allocator.free(buffer); + auto *new_buffer = static_cast(platform_ctx.allocator.alloc(static_cast(buffer_capacity) * 2)); + std::memcpy(new_buffer, buffer, buffer_size); + platform_ctx.allocator.free(buffer); buffer_capacity = buffer_capacity * 2; buffer = new_buffer; } - buffer_size += ptx.read(stream_d, batch_size, buffer + buffer_size); - } else if (status.read_status == IOBlocked) { - co_await read_blocked_t{stream_d}; + buffer_size += platform_ctx.read(stream_d, batch_size, buffer + buffer_size); + } else if (status.read_status == IOStatus::IOBlocked) { + co_await wait_for_update_t{stream_d}; } - } while (status.read_status != IOClosed); + } while (status.read_status != IOStatus::IOClosed); co_return std::make_pair(buffer, buffer_size); } -std::pair read_nonblock_from_stream(uint64_t stream_d) { - constexpr int batch_size = 32; - const PlatformCtx &ptx = *get_platform_context(); - int buffer_capacity = batch_size; - char *buffer = static_cast(ptx.allocator.alloc(buffer_capacity)); - int buffer_size = 0; - StreamStatus status; +std::pair read_nonblock_from_stream(uint64_t stream_d) { + const auto &platform_ctx = *get_platform_context(); + constexpr int32_t batch_size = 32; + + int32_t buffer_capacity = batch_size; + auto *buffer = static_cast(platform_ctx.allocator.alloc(buffer_capacity)); + int32_t buffer_size = 0; + + StreamStatus status{}; do { - GetStatusResult res = ptx.get_stream_status(stream_d, &status); - if (res != GetStatusOk) { - php_warning("get stream status return status %d", res); + GetStatusResult res = platform_ctx.get_stream_status(stream_d, std::addressof(status)); + if (res != GetStatusResult::GetStatusOk) { + php_warning("get stream status returned status %d", res); return std::make_pair(nullptr, 0); } - if (status.read_status == IOAvailable) { + if (status.read_status == IOStatus::IOAvailable) { if (buffer_capacity - buffer_size < batch_size) { - char *new_buffer = static_cast(ptx.allocator.alloc(buffer_capacity * 2)); - memcpy(new_buffer, buffer, buffer_size); - ptx.allocator.free(buffer); + auto *new_buffer = static_cast(platform_ctx.allocator.alloc(static_cast(buffer_capacity) * 2)); + std::memcpy(new_buffer, buffer, buffer_size); + platform_ctx.allocator.free(buffer); buffer_capacity = buffer_capacity * 2; buffer = new_buffer; } - buffer_size += ptx.read(stream_d, batch_size, buffer + buffer_size); + buffer_size += platform_ctx.read(stream_d, batch_size, buffer + buffer_size); } else { break; } - } while (status.read_status != IOClosed); + } while (status.read_status != IOStatus::IOClosed); return std::make_pair(buffer, buffer_size); } -task_t read_exact_from_stream(uint64_t stream_d, char *buffer, int len) { - co_await test_yield_t{}; +task_t read_exact_from_stream(uint64_t stream_d, char *buffer, int32_t len) { + const PlatformCtx &platform_ctx = *get_platform_context(); - const PlatformCtx &ptx = *get_platform_context(); - int read = 0; - StreamStatus status{IOAvailable, IOAvailable, 0}; + int32_t read = 0; - while (read != len && status.read_status != IOClosed) { - GetStatusResult res = ptx.get_stream_status(stream_d, &status); - if (res != GetStatusOk) { - php_warning("get stream status return status %d", res); + StreamStatus status{IOStatus::IOAvailable, IOStatus::IOAvailable, 0}; + while (read != len && status.read_status != IOStatus::IOClosed) { + GetStatusResult res = platform_ctx.get_stream_status(stream_d, std::addressof(status)); + if (res != GetStatusResult::GetStatusOk) { + php_warning("get stream status returned status %d", res); co_return 0; } - if (status.read_status == IOAvailable) { - read += ptx.read(stream_d, len - read, buffer + read); - } else if (status.read_status == IOBlocked) { - co_await read_blocked_t{stream_d}; + if (status.read_status == IOStatus::IOAvailable) { + read += platform_ctx.read(stream_d, len - read, buffer + read); + } else if (status.read_status == IOStatus::IOBlocked) { + co_await wait_for_update_t{stream_d}; } else { co_return read; } @@ -99,103 +105,84 @@ task_t read_exact_from_stream(uint64_t stream_d, char *buffer, int len) { co_return read; } -task_t write_all_to_stream(uint64_t stream_d, const char *buffer, int len) { - co_await test_yield_t{}; +task_t write_all_to_stream(uint64_t stream_d, const char *buffer, int32_t len) { + const auto &platform_ctx = *get_platform_context(); - StreamStatus status; - const PlatformCtx &ptx = *get_platform_context(); - int writed = 0; + int32_t written = 0; + + StreamStatus status{}; do { - GetStatusResult res = ptx.get_stream_status(stream_d, &status); - if (res != GetStatusOk) { - php_warning("get stream status return status %d", res); - co_return writed; + GetStatusResult res = platform_ctx.get_stream_status(stream_d, std::addressof(status)); + if (res != GetStatusResult::GetStatusOk) { + php_warning("get stream status returned status %d", res); + co_return written; } if (status.please_shutdown_write) { - php_debug("stream %lu set please_shutdown_write. Stop writing", stream_d); - co_return writed; - } else if (status.write_status == IOAvailable) { - writed += ptx.write(stream_d, len - writed, buffer + writed); - } else if (status.write_status == IOBlocked) { - co_await write_blocked_t{stream_d}; + php_debug("stream %" PRIu64 " set please_shutdown_write. Stop writing", stream_d); + co_return written; + } else if (status.write_status == IOStatus::IOAvailable) { + written += platform_ctx.write(stream_d, len - written, buffer + written); + } else if (status.write_status == IOStatus::IOBlocked) { + co_await wait_for_update_t{stream_d}; } else { - php_warning("stream closed while writing. Writed %d. Size %d. Stream %lu", writed, len, stream_d); - co_return writed; + php_warning("stream closed while writing. Wrote %d. Size %d. Stream %" PRIu64, written, len, stream_d); + co_return written; } - } while (writed != len); + } while (written != len); - php_debug("write %d bytes to stream %lu", len, stream_d); - co_return writed; + php_debug("wrote %d bytes to stream %" PRIu64, len, stream_d); + co_return written; } -int write_nonblock_to_stream(uint64_t stream_d, const char *buffer, int len) { - StreamStatus status; - const PlatformCtx &ptx = *get_platform_context(); - int writed = 0; +int32_t write_nonblock_to_stream(uint64_t stream_d, const char *buffer, int32_t len) { + const auto &platform_ctx = *get_platform_context(); + + int32_t written = 0; + + StreamStatus status{}; do { - GetStatusResult res = ptx.get_stream_status(stream_d, &status); - if (res != GetStatusOk) { - php_warning("get stream status return status %d", res); + GetStatusResult res = platform_ctx.get_stream_status(stream_d, std::addressof(status)); + if (res != GetStatusResult::GetStatusOk) { + php_warning("get stream status returned status %d", res); return 0; } - if (status.write_status == IOAvailable) { - writed += ptx.write(stream_d, len - writed, buffer + writed); + if (status.write_status == IOStatus::IOAvailable) { + written += platform_ctx.write(stream_d, len - written, buffer + written); } else { break; } - } while (writed != len); + } while (written != len); - php_debug("write %d bytes from %d to stream %lu", writed, len, stream_d); - return writed; + php_debug("write %d bytes from %d to stream %" PRIu64, written, len, stream_d); + return written; } -task_t write_exact_to_stream(uint64_t stream_d, const char *buffer, int len) { - co_await test_yield_t{}; +task_t write_exact_to_stream(uint64_t stream_d, const char *buffer, int32_t len) { + const auto &platform_ctx = *get_platform_context(); + + int written = 0; StreamStatus status{IOAvailable, IOAvailable, 0}; - const PlatformCtx &ptx = *get_platform_context(); - int writed = 0; - while (writed != len && status.write_status != IOClosed) { - GetStatusResult res = ptx.get_stream_status(stream_d, &status); - if (res != GetStatusOk) { - php_warning("get stream status return status %d", res); - co_return writed; + while (written != len && status.write_status != IOStatus::IOClosed) { + GetStatusResult res = platform_ctx.get_stream_status(stream_d, std::addressof(status)); + if (res != GetStatusResult::GetStatusOk) { + php_warning("get stream status returned status %d", res); + co_return written; } if (status.please_shutdown_write) { - php_debug("stream %lu set please_shutdown_write. Stop writing", stream_d); - co_return writed; - } else if (status.write_status == IOAvailable) { - writed += ptx.write(stream_d, len - writed, buffer + writed); - } else if (status.write_status == IOBlocked) { - co_await write_blocked_t{stream_d}; + php_debug("stream %" PRIu64 " set please_shutdown_write. Stop writing", stream_d); + co_return written; + } else if (status.write_status == IOStatus::IOAvailable) { + written += platform_ctx.write(stream_d, len - written, buffer + written); + } else if (status.write_status == IOStatus::IOBlocked) { + co_await wait_for_update_t{stream_d}; } else { - co_return writed; + co_return written; } } - co_return writed; -} - -void free_all_descriptors() { - php_debug("free all descriptors"); - ComponentState &ctx = *get_component_context(); - const PlatformCtx &ptx = *get_platform_context(); - for (auto &processed_query : ctx.opened_streams) { - ptx.free_descriptor(processed_query.first); - } - ctx.opened_streams.clear(); - ctx.awaiting_coroutines.clear(); - ptx.free_descriptor(ctx.standard_stream); - ctx.standard_stream = 0; -} - -void free_descriptor(uint64_t stream_d) { - php_debug("free descriptor %lu", stream_d); - ComponentState &ctx = *get_component_context(); - get_platform_context()->free_descriptor(stream_d); - ctx.opened_streams.erase(stream_d); - ctx.awaiting_coroutines.erase(stream_d); + co_return written; } diff --git a/runtime-light/streams/streams.h b/runtime-light/streams/streams.h index 692983ef44..5721a85d00 100644 --- a/runtime-light/streams/streams.h +++ b/runtime-light/streams/streams.h @@ -7,19 +7,20 @@ #include #include -#include "runtime-core/runtime-core.h" - #include "runtime-light/coroutine/task.h" -enum class StreamRuntimeStatus { WBlocked, RBlocked, NotBlocked, Timer }; +// === read ======================================================================================= + +task_t> read_all_from_stream(uint64_t stream_d); + +std::pair read_nonblock_from_stream(uint64_t stream_d); + +task_t read_exact_from_stream(uint64_t stream_d, char *buffer, int32_t len); + +// === write ====================================================================================== -task_t> read_all_from_stream(uint64_t stream_d); -std::pair read_nonblock_from_stream(uint64_t stream_d); -task_t read_exact_from_stream(uint64_t stream_d, char *buffer, int len); +task_t write_all_to_stream(uint64_t stream_d, const char *buffer, int32_t len); -task_t write_all_to_stream(uint64_t stream_d, const char *buffer, int len); -int write_nonblock_to_stream(uint64_t stream_d, const char *buffer, int len); -task_t write_exact_to_stream(uint64_t stream_d, const char *buffer, int len); +int32_t write_nonblock_to_stream(uint64_t stream_d, const char *buffer, int32_t len); -void free_all_descriptors(); -void free_descriptor(uint64_t stream_d); +task_t write_exact_to_stream(uint64_t stream_d, const char *buffer, int32_t len); diff --git a/runtime-light/utils/concepts.h b/runtime-light/utils/concepts.h index 963a9c6b95..4969436e7d 100644 --- a/runtime-light/utils/concepts.h +++ b/runtime-light/utils/concepts.h @@ -5,6 +5,13 @@ #pragma once #include +#include +#include template concept standard_layout = std::is_standard_layout_v; + +template +concept hashable = requires(T a) { + { std::hash{}(a) } -> std::convertible_to; +}; diff --git a/runtime-light/utils/panic.h b/runtime-light/utils/panic.h index d11fbed624..88fc2e6b1b 100644 --- a/runtime-light/utils/panic.h +++ b/runtime-light/utils/panic.h @@ -8,17 +8,18 @@ #include "context.h" #include "runtime-light/component/component.h" +#include "runtime-light/header.h" #include "runtime-light/utils/logs.h" inline void critical_error_handler() { - constexpr const char * message = "script panic"; - const PlatformCtx & ptx = *get_platform_context(); - ComponentState & ctx = *get_component_context(); - ptx.log(Debug, strlen(message), message); + constexpr const char *message = "script panic"; + const auto &platform_ctx = *get_platform_context(); + auto &component_ctx = *get_component_context(); + platform_ctx.log(Debug, strlen(message), message); - if (ctx.not_finished()) { - ctx.poll_status = PollStatus::PollFinishedError; + if (component_ctx.poll_status != PollStatus::PollFinishedOk && component_ctx.poll_status != PollStatus::PollFinishedError) { + component_ctx.poll_status = PollStatus::PollFinishedError; } - ptx.abort(); + platform_ctx.abort(); exit(1); } diff --git a/runtime-light/utils/timer.cpp b/runtime-light/utils/timer.cpp deleted file mode 100644 index 3cb712012d..0000000000 --- a/runtime-light/utils/timer.cpp +++ /dev/null @@ -1,21 +0,0 @@ -// Compiler for PHP (aka KPHP) -// Copyright (c) 2024 LLC «V Kontakte» -// Distributed under the GPL v3 License, see LICENSE.notice.txt - -#include "runtime-light/utils/timer.h" - -void set_timer_impl(int64_t timeout_ms, on_timer_callback_t &&callback) { - const PlatformCtx &ptx = *get_platform_context(); - ComponentState &ctx = *get_component_context(); - uint64_t nanoseconds = static_cast(timeout_ms * 1e6); - uint64_t timer_d = 0; - SetTimerResult res = ptx.set_timer(&timer_d, nanoseconds); - if (res != SetTimerOk) { - php_warning("timer limit exceeded"); - return; - } - php_debug("set up timer %lu for %lu ms", timer_d, timeout_ms); - - ctx.opened_streams[timer_d] = StreamRuntimeStatus::Timer; - ctx.timer_callbacks[timer_d] = callback; -} \ No newline at end of file diff --git a/runtime-light/utils/timer.h b/runtime-light/utils/timer.h deleted file mode 100644 index e7ba0b5202..0000000000 --- a/runtime-light/utils/timer.h +++ /dev/null @@ -1,20 +0,0 @@ -// Compiler for PHP (aka KPHP) -// Copyright (c) 2024 LLC «V Kontakte» -// Distributed under the GPL v3 License, see LICENSE.notice.txt - -#pragma once - -#include - -#include "runtime-core/memory-resource/resource_allocator.h" -#include "runtime-light/component/component.h" - -// todo:k2 std::function use heap -using on_timer_callback_t = std::function; - -void set_timer_impl(int64_t timeout_ms, on_timer_callback_t &&callback); - -template -void f$set_timer(int64_t timeout, CallBack &&callback) { - set_timer_impl(timeout, on_timer_callback_t(std::forward(callback))); -} \ No newline at end of file diff --git a/runtime-light/utils/utils.cmake b/runtime-light/utils/utils.cmake index b25f9b6b3d..fee0426cd6 100644 --- a/runtime-light/utils/utils.cmake +++ b/runtime-light/utils/utils.cmake @@ -1,5 +1 @@ -prepend(RUNTIME_UTILS_SRC utils/ - php_assert.cpp - json-functions.cpp - context.cpp - timer.cpp) +prepend(RUNTIME_UTILS_SRC utils/ php_assert.cpp json-functions.cpp context.cpp) diff --git a/runtime/storage.cpp b/runtime/storage.cpp index b2a161ac9c..19f1201aed 100644 --- a/runtime/storage.cpp +++ b/runtime/storage.cpp @@ -4,9 +4,9 @@ #include "runtime/resumable.h" -Storage::Storage() noexcept : - tag(0) { - memset(storage_.storage_, 0, sizeof(mixed)); +Storage::Storage() noexcept + : tag(0) { + std::memset(storage_.storage_.data(), 0, sizeof(mixed)); } void Storage::save_void() noexcept { @@ -18,7 +18,7 @@ void Storage::save_void() noexcept { } void Storage::save_exception() noexcept { - php_assert (!CurException.is_null()); + php_assert(!CurException.is_null()); Throwable exception = std::move(CurException); save(thrown_exception{exception}); } diff --git a/runtime/storage.h b/runtime/storage.h index b778b8b75b..2f905618ff 100644 --- a/runtime/storage.h +++ b/runtime/storage.h @@ -7,6 +7,7 @@ #include #include "runtime-core/runtime-core.h" +#include "runtime-core/utils/small-object-storage.h" #include "runtime/exception.h" extern const char *last_wait_error; @@ -14,47 +15,13 @@ extern const char *last_wait_error; struct thrown_exception { Throwable exception; thrown_exception() = default; - explicit thrown_exception(Throwable exception) noexcept : exception(std::move(exception)) {} -}; - -template -union small_obect_ptr { - char storage_[limit]; - void *storage_ptr; - - template - std::enable_if_t emplace(Args&& ...args) noexcept { - return new (storage_) T(std::forward(args)...); - } - template - std::enable_if_t get() noexcept { - return reinterpret_cast(storage_); - } - template - std::enable_if_t destroy() noexcept { - get()->~T(); - } - - template - std::enable_if_t emplace(Args&& ...args) noexcept { - storage_ptr = dl::allocate(sizeof(T)); - return new (storage_ptr) T(std::forward(args)...); - } - template - std::enable_if_t get() noexcept { - return static_cast(storage_ptr); - } - template - std::enable_if_t destroy() noexcept { - T *mem = get(); - mem->~T(); - dl::deallocate(mem, sizeof(T)); - } + explicit thrown_exception(Throwable exception) noexcept + : exception(std::move(exception)) {} }; class Storage { private: - using storage_ptr = small_obect_ptr; + using storage_ptr = small_object_storage; storage_ptr storage_; template::type> @@ -63,7 +30,6 @@ class Storage { void save_exception() noexcept; public: - // this class specializations are generated by kphp compiler template @@ -75,7 +41,7 @@ class Storage { template struct loader { static_assert(!std::is_same{}, "int is forbidden"); - using loader_fun = T(*)(storage_ptr &); + using loader_fun = T (*)(storage_ptr &); static loader_fun get_function(int tag) noexcept; }; @@ -100,11 +66,10 @@ class Storage { X load_as() noexcept; }; - template struct Storage::load_implementation_helper { static Y load(storage_ptr &) noexcept { - php_assert(0); // should be never called in runtime, used just to prevent compilation errors + php_assert(0); // should be never called in runtime, used just to prevent compilation errors return Y(); } }; @@ -141,7 +106,7 @@ struct Storage::load_implementation_helper static_assert(!std::is_same{}, "int is forbidden"); static Y load(storage_ptr &storage) noexcept { - php_assert (CurException.is_null()); + php_assert(CurException.is_null()); CurException = load_implementation_helper::load(storage).exception; return Y(); } @@ -150,13 +115,11 @@ struct Storage::load_implementation_helper template<> struct Storage::load_implementation_helper { static void load(storage_ptr &storage) noexcept { - php_assert (CurException.is_null()); + php_assert(CurException.is_null()); CurException = load_implementation_helper::load(storage).exception; } }; - - template void Storage::save(std::enable_if_t x) noexcept { static_assert(!std::is_same{}, "int is forbidden"); @@ -173,7 +136,7 @@ template X Storage::load() noexcept { static_assert(!std::is_same{}, "int is forbidden"); - php_assert (tag != 0); + php_assert(tag != 0); if (tag == tagger::get_tag()) { tag = 0; return load_implementation_helper::load(storage_); @@ -184,12 +147,11 @@ X Storage::load() noexcept { return load_implementation_helper::load(storage_); } - template X Storage::load_as() noexcept { static_assert(!std::is_same{}, "int is forbidden"); - php_assert (tag != 0); + php_assert(tag != 0); int tag_save = tag; tag = 0; diff --git a/tests/k2-components/yield_loop.php b/tests/k2-components/yield_loop.php index f58783acfe..397a38eadc 100644 --- a/tests/k2-components/yield_loop.php +++ b/tests/k2-components/yield_loop.php @@ -1,5 +1,5 @@