Skip to content

Commit

Permalink
WIP: pipeline mode
Browse files Browse the repository at this point in the history
  • Loading branch information
d-frey committed Dec 18, 2024
1 parent 021155e commit 7dd54c7
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 217 deletions.
1 change: 1 addition & 0 deletions include/tao/pq/connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <tao/pq/pipeline_status.hpp>
#include <tao/pq/poll.hpp>
#include <tao/pq/transaction.hpp>
#include <tao/pq/transaction_base.hpp>
#include <tao/pq/transaction_status.hpp>

namespace tao::pq
Expand Down
134 changes: 3 additions & 131 deletions include/tao/pq/transaction.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,145 +6,17 @@
#define TAO_PQ_TRANSACTION_HPP

#include <chrono>
#include <cstddef>
#include <cstdio>
#include <memory>
#include <string_view>
#if !defined( __cpp_pack_indexing ) && ( __cplusplus >= 202302L )
#include <tuple>
#endif
#include <type_traits>
#include <utility>

#include <libpq-fe.h>

#include <tao/pq/internal/gen.hpp>
#include <tao/pq/internal/zsv.hpp>
#include <tao/pq/parameter.hpp>
#include <tao/pq/parameter_traits.hpp>
#include <tao/pq/result.hpp>
#include <tao/pq/transaction_base.hpp>

namespace tao::pq
{
class connection;
class table_reader;
class table_writer;

class transaction_base
: public std::enable_shared_from_this< transaction_base >
{
protected:
std::shared_ptr< pq::connection > m_connection;

friend class table_reader;
friend class table_writer;

explicit transaction_base( const std::shared_ptr< pq::connection >& connection ) noexcept;

public:
virtual ~transaction_base() = default;

transaction_base( const transaction_base& ) = delete;
transaction_base( transaction_base&& ) = delete;
void operator=( const transaction_base& ) = delete;
void operator=( transaction_base&& ) = delete;

protected:
[[nodiscard]] virtual auto v_is_direct() const noexcept -> bool = 0;

[[nodiscard]] auto current_transaction() const noexcept -> transaction_base*&;
void check_current_transaction() const;

void send_params( const char* statement,
const int n_params,
const Oid types[],
const char* const values[],
const int lengths[],
const int formats[] );

#if defined( __cpp_pack_indexing ) && ( __cplusplus >= 202302L )

template< std::size_t... Os, std::size_t... Is >
void send_indexed( const char* statement,
std::index_sequence< Os... > /*unused*/,
std::index_sequence< Is... > /*unused*/,
const auto&... ts )
{
const Oid types[] = { static_cast< Oid >( ts...[ Os ].template type< Is >() )... };
const char* const values[] = { ts...[ Os ].template value< Is >()... };
const int lengths[] = { ts...[ Os ].template length< Is >()... };
const int formats[] = { ts...[ Os ].template format< Is >()... };
send_params( statement, sizeof...( Os ), types, values, lengths, formats );
}

template< typename... Ts >
void send_traits( const char* statement, const Ts&... ts )
{
using gen = internal::gen< Ts::columns... >;
transaction_base::send_indexed( statement, typename gen::outer_sequence(), typename gen::inner_sequence(), ts... );
}

#else

template< std::size_t... Os, std::size_t... Is, typename... Ts >
void send_indexed( const char* statement,
std::index_sequence< Os... > /*unused*/,
std::index_sequence< Is... > /*unused*/,
const std::tuple< Ts... >& tuple )
{
const Oid types[] = { static_cast< Oid >( std::get< Os >( tuple ).template type< Is >() )... };
const char* const values[] = { std::get< Os >( tuple ).template value< Is >()... };
const int lengths[] = { std::get< Os >( tuple ).template length< Is >()... };
const int formats[] = { std::get< Os >( tuple ).template format< Is >()... };
send_params( statement, sizeof...( Os ), types, values, lengths, formats );
}

template< typename... Ts >
void send_traits( const char* statement, const Ts&... ts )
{
using gen = internal::gen< Ts::columns... >;
transaction_base::send_indexed( statement, typename gen::outer_sequence(), typename gen::inner_sequence(), std::tie( ts... ) );
}

#endif

public:
[[nodiscard]] auto connection() const noexcept -> const std::shared_ptr< pq::connection >&
{
return m_connection;
}

void send( const internal::zsv statement )
{
send_params( statement, 0, nullptr, nullptr, nullptr, nullptr );
}

template< parameter_type_direct... As >
void send( const internal::zsv statement, As&&... as )
{
send_traits( statement, parameter_traits< std::decay_t< As > >( std::forward< As >( as ) )... );
}

template< parameter_type... As >
requires( parameter_type_dynamic< As > || ... )
void send( const internal::zsv statement, As&&... as )
{
const parameter< internal::parameter_size< As... > > p( std::forward< As >( as )... );
send_params( statement, p.m_size, p.m_types, p.m_values, p.m_lengths, p.m_formats );
}

template< parameter_type_dynamic A >
void send( const internal::zsv statement, A&& p )
{
send_params( statement, p.m_size, p.m_types, p.m_values, p.m_lengths, p.m_formats );
}

[[nodiscard]] auto get_result( const std::chrono::steady_clock::time_point start = std::chrono::steady_clock::now() ) -> result;

// TODO: move this to the pipeline_transaction class
void consume_pipeline_sync( const std::chrono::steady_clock::time_point start = std::chrono::steady_clock::now() );
};

class transaction
: public transaction_base
{
Expand All @@ -168,8 +40,8 @@ namespace tao::pq
auto execute( const internal::zsv statement, As&&... as )
{
const auto start = std::chrono::steady_clock::now();
transaction::send( statement, std::forward< As >( as )... );
return transaction::get_result( start );
transaction_base::send( statement, std::forward< As >( as )... );
return transaction_base::get_result( start );
}

void commit();
Expand Down
90 changes: 4 additions & 86 deletions src/lib/pq/transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,13 @@
#include <chrono>
#include <cstdio>
#include <exception>
#include <format>
#include <memory>
#include <stdexcept>
#include <utility>

#include <libpq-fe.h>

#include <tao/pq/connection.hpp>
#include <tao/pq/oid.hpp>
#include <tao/pq/result.hpp>

namespace tao::pq
{
Expand Down Expand Up @@ -115,83 +112,13 @@ namespace tao::pq

} // namespace internal

transaction_base::transaction_base( const std::shared_ptr< pq::connection >& connection ) noexcept // NOLINT(modernize-pass-by-value)
: m_connection( connection )
{}

auto transaction_base::current_transaction() const noexcept -> transaction_base*&
{
return m_connection->m_current_transaction;
}

void transaction_base::check_current_transaction() const
{
if( !m_connection || this != current_transaction() ) {
throw std::logic_error( "invalid transaction order" );
}
}

void transaction_base::send_params( const char* statement,
const int n_params,
const Oid types[],
const char* const values[],
const int lengths[],
const int formats[] )
{
check_current_transaction();
m_connection->send_params( statement, n_params, types, values, lengths, formats );
}

auto transaction_base::get_result( const std::chrono::steady_clock::time_point start ) -> result
{
check_current_transaction();
const auto end = m_connection->timeout_end( start );

auto result = m_connection->get_result( end );
if( !result ) {
throw std::runtime_error( "unable to obtain result" );
}

switch( PQresultStatus( result.get() ) ) {
case PGRES_COPY_IN:
m_connection->put_copy_end( "unexpected COPY FROM statement" );
result = m_connection->get_fatal_error( end );
break;

case PGRES_COPY_OUT:
m_connection->cancel();
m_connection->clear_copy_data( end );
std::ignore = m_connection->get_fatal_error( end );
m_connection->consume_empty_result( end );
throw std::runtime_error( "unexpected COPY TO statement" );

case PGRES_SINGLE_TUPLE:
#if defined( LIBPQ_HAS_CHUNK_MODE )
case PGRES_TUPLES_CHUNK:
#endif
return pq::result( result.release() );

default:;
}

m_connection->consume_empty_result( end );
return pq::result( result.release() );
}

void transaction_base::consume_pipeline_sync( const std::chrono::steady_clock::time_point start )
auto transaction::subtransaction() -> std::shared_ptr< transaction >
{
check_current_transaction();
const auto end = m_connection->timeout_end( start );

const auto result = m_connection->get_result( end );
if( !result ) {
throw std::runtime_error( "unable to obtain result" );
}

const auto status = PQresultStatus( result.get() );
if( status != PGRES_PIPELINE_SYNC ) {
throw std::runtime_error( std::format( "unexpected result status: {}", PQresStatus( status ) ) );
if( v_is_direct() ) {
return std::make_shared< internal::top_level_subtransaction >( m_connection );
}
return std::make_shared< internal::nested_subtransaction >( m_connection );
}

void transaction::set_single_row_mode()
Expand All @@ -212,15 +139,6 @@ namespace tao::pq
}
#endif

auto transaction::subtransaction() -> std::shared_ptr< transaction >
{
check_current_transaction();
if( v_is_direct() ) {
return std::make_shared< internal::top_level_subtransaction >( m_connection );
}
return std::make_shared< internal::nested_subtransaction >( m_connection );
}

void transaction::commit()
{
check_current_transaction();
Expand Down

0 comments on commit 7dd54c7

Please sign in to comment.