diff --git a/CMakeLists.txt b/CMakeLists.txt index e3a2957..08e3ef9 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -53,6 +53,7 @@ set(taopq_INCLUDE_FILES ${taopq_INCLUDE_DIRS}/tao/pq/parameter_traits_optional.hpp ${taopq_INCLUDE_DIRS}/tao/pq/parameter_traits_pair.hpp ${taopq_INCLUDE_DIRS}/tao/pq/parameter_traits_tuple.hpp + ${taopq_INCLUDE_DIRS}/tao/pq/pipeline.hpp ${taopq_INCLUDE_DIRS}/tao/pq/pipeline_status.hpp ${taopq_INCLUDE_DIRS}/tao/pq/poll.hpp ${taopq_INCLUDE_DIRS}/tao/pq/result.hpp @@ -84,6 +85,7 @@ set(taopq_SOURCE_FILES ${CMAKE_CURRENT_LIST_DIR}/src/lib/pq/internal/strtox.cpp ${CMAKE_CURRENT_LIST_DIR}/src/lib/pq/large_object.cpp ${CMAKE_CURRENT_LIST_DIR}/src/lib/pq/parameter_traits.cpp + ${CMAKE_CURRENT_LIST_DIR}/src/lib/pq/pipeline.cpp ${CMAKE_CURRENT_LIST_DIR}/src/lib/pq/result.cpp ${CMAKE_CURRENT_LIST_DIR}/src/lib/pq/result_traits.cpp ${CMAKE_CURRENT_LIST_DIR}/src/lib/pq/result_traits_array.cpp diff --git a/include/tao/pq.hpp b/include/tao/pq.hpp index d3ef97b..4172345 100644 --- a/include/tao/pq.hpp +++ b/include/tao/pq.hpp @@ -15,6 +15,7 @@ #include #include +#include #include #include diff --git a/include/tao/pq/connection.hpp b/include/tao/pq/connection.hpp index b2e82a6..658e5b4 100644 --- a/include/tao/pq/connection.hpp +++ b/include/tao/pq/connection.hpp @@ -34,6 +34,7 @@ namespace tao::pq { class connection_pool; + class pipeline; class table_reader; class table_writer; @@ -161,6 +162,8 @@ namespace tao::pq [[nodiscard]] auto transaction( const access_mode am, const isolation_level il = isolation_level::default_isolation_level ) -> std::shared_ptr< pq::transaction >; [[nodiscard]] auto transaction( const isolation_level il, const access_mode am = access_mode::default_access_mode ) -> std::shared_ptr< pq::transaction >; + [[nodiscard]] auto pipeline() -> std::shared_ptr< pq::pipeline >; + void prepare( std::string name, const internal::zsv statement ); void deallocate( const std::string_view name ); diff --git a/include/tao/pq/transaction.hpp b/include/tao/pq/transaction.hpp index 4fa4c93..938f98b 100644 --- a/include/tao/pq/transaction.hpp +++ b/include/tao/pq/transaction.hpp @@ -17,12 +17,16 @@ namespace tao::pq { + class pipeline; + class transaction : public transaction_base { protected: using transaction_base::transaction_base; + [[nodiscard]] virtual auto v_is_direct() const noexcept -> bool = 0; + virtual void v_commit() = 0; virtual void v_rollback() = 0; @@ -30,6 +34,7 @@ namespace tao::pq public: [[nodiscard]] auto subtransaction() -> std::shared_ptr< transaction >; + [[nodiscard]] auto pipeline() -> std::shared_ptr< pq::pipeline >; void set_single_row_mode(); #if defined( LIBPQ_HAS_CHUNK_MODE ) diff --git a/include/tao/pq/transaction_base.hpp b/include/tao/pq/transaction_base.hpp index 58300ed..98fa707 100644 --- a/include/tao/pq/transaction_base.hpp +++ b/include/tao/pq/transaction_base.hpp @@ -48,8 +48,6 @@ namespace tao::pq void operator=( transaction_base&& ) = delete; protected: - [[nodiscard]] virtual auto v_is_direct() const noexcept -> bool = 0; - [[nodiscard]] auto current_transaction() const noexcept -> transaction_base*&; void check_current_transaction() const; diff --git a/src/lib/pq/connection.cpp b/src/lib/pq/connection.cpp index 3475b02..98946a0 100644 --- a/src/lib/pq/connection.cpp +++ b/src/lib/pq/connection.cpp @@ -545,6 +545,11 @@ namespace tao::pq return std::make_shared< internal::top_level_transaction >( shared_from_this(), il, am ); } + auto connection::pipeline() -> std::shared_ptr< pq::pipeline > + { + return direct()->pipeline(); + } + void connection::prepare( std::string name, const internal::zsv statement ) { connection::check_prepared_name( name ); diff --git a/src/lib/pq/transaction.cpp b/src/lib/pq/transaction.cpp index 09262a6..b288192 100644 --- a/src/lib/pq/transaction.cpp +++ b/src/lib/pq/transaction.cpp @@ -14,6 +14,7 @@ #include #include +#include namespace tao::pq { @@ -121,6 +122,12 @@ namespace tao::pq return std::make_shared< internal::nested_subtransaction >( m_connection ); } + auto transaction::pipeline() -> std::shared_ptr< pq::pipeline > + { + check_current_transaction(); + return std::make_shared< pq::pipeline >( m_connection ); + } + void transaction::set_single_row_mode() { check_current_transaction(); diff --git a/src/test/pq/pipeline_mode.cpp b/src/test/pq/pipeline_mode.cpp index d329731..92e5d3d 100644 --- a/src/test/pq/pipeline_mode.cpp +++ b/src/test/pq/pipeline_mode.cpp @@ -90,6 +90,48 @@ namespace tr->commit(); } + + { + auto tr = connection->pipeline(); + + tr->send( "SELECT 42" ); + tr->send( "SELECT 1234" ); + tr->sync(); + + TEST_ASSERT( tr->get_result().as< int >() == 42 ); + + tr->send( "SELECT 1701" ); + tr->sync(); + + TEST_ASSERT( tr->get_result().as< int >() == 1234 ); + tr->consume_sync(); + + TEST_ASSERT( tr->get_result().as< int >() == 1701 ); + tr->consume_sync(); + + tr->finish(); + } + + { + auto tr = connection->transaction()->pipeline(); + + tr->send( "SELECT 42" ); + tr->send( "SELECT 1234" ); + tr->sync(); + + TEST_ASSERT( tr->get_result().as< int >() == 42 ); + + tr->send( "SELECT 1701" ); + tr->sync(); + + TEST_ASSERT( tr->get_result().as< int >() == 1234 ); + tr->consume_sync(); + + TEST_ASSERT( tr->get_result().as< int >() == 1701 ); + tr->consume_sync(); + + tr->finish(); + } } } // namespace