Skip to content

Commit

Permalink
Comms: simplify timeout handling (#9)
Browse files Browse the repository at this point in the history
The previous solution for handling timeouts was more complex than it
needed to be, relying on a `boost::asio::deadline_timer` and
`async_wait()`. This can be simplified quite a bit to just calling
`run_for()` on the IO context, which will also detect whether the
operation timed out (i.e., didn't complete before the time limit was
reached).
  • Loading branch information
willeccles authored Jan 24, 2025
2 parents 78dc3c9 + e810438 commit 4dd8fdd
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 172 deletions.
56 changes: 23 additions & 33 deletions src/SerialDriver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,20 @@
#include <fmt/core.h>

#include <algorithm>
#include <atomic>
#include <array>
#include <boost/asio.hpp>
#include <boost/asio/deadline_timer.hpp>
#include <boost/asio/read_until.hpp>
#include <boost/asio/serial_port.hpp>
#include <boost/asio/streambuf.hpp>
#include <boost/asio/write.hpp>
#include <chrono>
#include <memory>
#include <string>
#include <string_view>

#include "Util.h"

using boost::asio::deadline_timer;

namespace bci::abs::drivers {

using util::Err;
Expand All @@ -50,12 +49,10 @@ struct SerialDriver::Impl {
private:
boost::asio::io_service io_service_;
boost::asio::serial_port port_;
boost::asio::deadline_timer deadline_;
boost::asio::streambuf input_buffer_;
unsigned int dev_id_;
std::atomic<bool> timeout_;

void CheckDeadline();
bool Run(unsigned int timeout_ms);
};

SerialDriver::SerialDriver() : impl_(std::make_shared<Impl>()) {}
Expand Down Expand Up @@ -86,13 +83,8 @@ bool SerialDriver::IsSendOnly() const { return impl_->IsBroadcast(); }
SerialDriver::Impl::Impl()
: io_service_(),
port_(io_service_),
deadline_(io_service_),
input_buffer_(),
dev_id_{},
timeout_{} {
deadline_.expires_at(boost::posix_time::pos_infin);
CheckDeadline();
}
dev_id_{} { }

SerialDriver::Impl::~Impl() { Close(); }

Expand Down Expand Up @@ -157,14 +149,14 @@ ErrorCode SerialDriver::Impl::Write(std::string_view data,
fmt::format_to_n(dev_id_buf, sizeof(dev_id_buf) - 1, "@{} ", dev_id_);
std::string_view dev_id_str{dev_id_buf};

boost::system::error_code ec{};
std::array<boost::asio::const_buffer, 2> bufs = {
boost::asio::buffer(dev_id_str),
boost::asio::buffer(data),
};

boost::asio::write(port_, boost::asio::buffer(dev_id_str), ec);
if (ec) {
return ErrorCode::kSendFailed;
}
boost::system::error_code ec{};

boost::asio::write(port_, boost::asio::buffer(data), ec);
boost::asio::write(port_, bufs, ec);
if (ec) {
return ErrorCode::kSendFailed;
}
Expand All @@ -177,19 +169,12 @@ Result<std::string> SerialDriver::Impl::ReadLine(unsigned int timeout_ms) {
return Err(ErrorCode::kNotConnected);
}

timeout_ = false;
deadline_.expires_from_now(boost::posix_time::milliseconds(timeout_ms));

boost::system::error_code ec = boost::asio::error::would_block;
boost::system::error_code ec;

const auto read_handler = [&](auto&& e, auto&&) { ec = e; };
boost::asio::async_read_until(port_, input_buffer_, '\n', read_handler);

do {
io_service_.run_one();
} while (ec == boost::asio::error::would_block);

if (timeout_) {
if (!Run(timeout_ms)) {
return Err(ErrorCode::kReadTimedOut);
}

Expand All @@ -212,14 +197,19 @@ unsigned int SerialDriver::Impl::GetDeviceID() const { return dev_id_; }

bool SerialDriver::Impl::IsBroadcast() const { return dev_id_ > 31; }

void SerialDriver::Impl::CheckDeadline() {
if (deadline_.expires_at() <= deadline_timer::traits_type::now()) {
timeout_ = true;
port_.cancel();
deadline_.expires_at(boost::posix_time::pos_infin);
bool SerialDriver::Impl::Run(unsigned int timeout_ms) {
io_service_.restart();

io_service_.run_for(std::chrono::milliseconds(timeout_ms));

if (!io_service_.stopped()) {
boost::system::error_code ignored;
port_.cancel(ignored);
io_service_.run();
return false;
}

deadline_.async_wait([&](auto&&) { this->CheckDeadline(); });
return true;
}

} // namespace bci::abs::drivers
88 changes: 28 additions & 60 deletions src/TcpDriver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@
#include <boost/asio/read_until.hpp>
#include <boost/asio/streambuf.hpp>
#include <boost/asio/write.hpp>
#include <chrono>
#include <memory>
#include <string>
#include <string_view>

#include "Util.h"

using boost::asio::deadline_timer;
using boost::asio::ip::tcp;

namespace bci::abs::drivers {
Expand All @@ -42,11 +42,9 @@ struct TcpDriver::Impl {
private:
boost::asio::io_service io_service_;
boost::asio::ip::tcp::socket socket_;
boost::asio::deadline_timer deadline_;
boost::asio::streambuf input_buffer_;
bool did_timeout_;

void StartDeadline(unsigned int timeout_ms);
bool Run(unsigned int timeout_ms);
};

TcpDriver::TcpDriver() : impl_(std::make_shared<Impl>()) {}
Expand All @@ -71,9 +69,7 @@ Result<std::string> TcpDriver::ReadLine(unsigned int timeout_ms) const {
TcpDriver::Impl::Impl()
: io_service_(),
socket_(io_service_),
deadline_(io_service_),
input_buffer_(),
did_timeout_(false) {
input_buffer_() {
boost::system::error_code ignored;
socket_.set_option(boost::asio::socket_base::linger(false, 0), ignored);
socket_.set_option(boost::asio::socket_base::keep_alive(true), ignored);
Expand All @@ -92,36 +88,23 @@ ErrorCode TcpDriver::Impl::Connect(std::string_view ip,

tcp::endpoint endpoint(addr, 5025);

StartDeadline(timeout_ms);

// would_block is never set from async_functions, so it's a safe way to signal
// an incomplete async operation
ec = boost::asio::error::would_block;

ec = {};
const auto connect_handler = [&](auto&& e) { ec = e; };
socket_.async_connect(endpoint, connect_handler);

do {
io_service_.run_one();
} while (ec == boost::asio::error::would_block);
if (!Run(timeout_ms)) {
return ErrorCode::kConnectionTimedOut;
}

if (ec) {
if (did_timeout_) {
return ErrorCode::kConnectionTimedOut;
}
return ErrorCode::kConnectionFailed;
}

if (!socket_.is_open()) {
return ErrorCode::kConnectionTimedOut;
}

return ErrorCode::kSuccess;
}

void TcpDriver::Impl::Close() noexcept {
boost::system::error_code ignored;
deadline_.cancel(ignored);
socket_.cancel(ignored);
socket_.shutdown(tcp::socket::shutdown_send, ignored);
socket_.shutdown(tcp::socket::shutdown_receive, ignored);
Expand All @@ -134,28 +117,19 @@ ErrorCode TcpDriver::Impl::Write(std::string_view data,
return ErrorCode::kNotConnected;
}

StartDeadline(timeout_ms);

boost::system::error_code ec = boost::asio::error::would_block;
boost::system::error_code ec{};

const auto write_handler = [&](auto&& e, auto&&) { ec = e; };
boost::asio::async_write(socket_, boost::asio::buffer(data), write_handler);

do {
io_service_.run_one();
} while (ec == boost::asio::error::would_block);
if (!Run(timeout_ms)) {
return ErrorCode::kSendTimedOut;
}

if (ec) {
if (did_timeout_) {
return ErrorCode::kSendTimedOut;
}
return ErrorCode::kSendFailed;
}

if (!socket_.is_open()) {
return ErrorCode::kSendTimedOut;
}

return ErrorCode::kSuccess;
}

Expand All @@ -164,45 +138,39 @@ Result<std::string> TcpDriver::Impl::ReadLine(unsigned int timeout_ms) {
return Err(ErrorCode::kNotConnected);
}

StartDeadline(timeout_ms);

boost::system::error_code ec = boost::asio::error::would_block;
boost::system::error_code ec{};

const auto read_handler = [&](auto&& e, auto&&) { ec = e; };
boost::asio::async_read_until(socket_, input_buffer_, '\n', read_handler);

do {
io_service_.run_one();
} while (ec == boost::asio::error::would_block);
if (!Run(timeout_ms)) {
return Err(ErrorCode::kReadTimedOut);
}

if (ec) {
if (did_timeout_) {
return Err(ErrorCode::kReadTimedOut);
}
return Err(ErrorCode::kReadFailed);
}

if (!socket_.is_open()) {
return Err(ErrorCode::kReadTimedOut);
}

std::string line;
std::istream is(&input_buffer_);
std::getline(is, line);

return line;
}

void TcpDriver::Impl::StartDeadline(unsigned int timeout_ms) {
deadline_.expires_from_now(boost::posix_time::milliseconds(timeout_ms));
deadline_.async_wait([this](const boost::system::error_code& e) {
if (e != boost::asio::error::operation_aborted) {
did_timeout_ = true;
boost::system::error_code ignored;
socket_.cancel(ignored);
}
});
did_timeout_ = false;
bool TcpDriver::Impl::Run(unsigned int timeout_ms) {
io_service_.restart();

io_service_.run_for(std::chrono::milliseconds(timeout_ms));

if (!io_service_.stopped()) {
boost::system::error_code ignored;
socket_.cancel(ignored);
io_service_.run();
return false;
}

return true;
}

} // namespace bci::abs::drivers
Loading

0 comments on commit 4dd8fdd

Please sign in to comment.