From 04caf625422965bec4b680229c90ab0b7754b32d Mon Sep 17 00:00:00 2001 From: Matt Kline Date: Sat, 3 Feb 2024 14:56:06 -0800 Subject: [PATCH 1/2] stream: Provide context-specific timeout messages If we time out while connecting, don't tell the user we timed out reading a response. --- src/lib.rs | 1 + src/stream.rs | 30 +++++++----------------------- src/timeout.rs | 18 ++++++++++++++++++ 3 files changed, 26 insertions(+), 23 deletions(-) create mode 100644 src/timeout.rs diff --git a/src/lib.rs b/src/lib.rs index a99d1a0a..08a831ca 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -366,6 +366,7 @@ mod request; mod resolve; mod response; mod stream; +mod timeout; mod unit; // rustls is our default tls engine. If the feature is on, it will be diff --git a/src/stream.rs b/src/stream.rs index 8e7a36ee..ab5645db 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -14,6 +14,7 @@ use crate::chunked::Decoder as ChunkDecoder; use crate::error::ErrorKind; use crate::pool::{PoolKey, PoolReturner}; use crate::proxy::Proxy; +use crate::timeout::{io_err_timeout, time_until_deadline}; use crate::unit::Unit; use crate::Response; use crate::{error::Error, proxy::Proto}; @@ -83,7 +84,7 @@ impl From for Stream { impl BufRead for DeadlineStream { fn fill_buf(&mut self) -> io::Result<&[u8]> { if let Some(deadline) = self.deadline { - let timeout = time_until_deadline(deadline)?; + let timeout = time_until_deadline(deadline, "timed out reading response")?; if let Some(socket) = self.stream.socket() { socket.set_read_timeout(Some(timeout))?; socket.set_write_timeout(Some(timeout))?; @@ -130,20 +131,6 @@ impl Read for DeadlineStream { } } -// If the deadline is in the future, return the remaining time until -// then. Otherwise return a TimedOut error. -fn time_until_deadline(deadline: Instant) -> io::Result { - let now = Instant::now(); - match deadline.checked_duration_since(now) { - None => Err(io_err_timeout("timed out reading response".to_string())), - Some(duration) => Ok(duration), - } -} - -pub(crate) fn io_err_timeout(error: String) -> io::Error { - io::Error::new(io::ErrorKind::TimedOut, error) -} - #[derive(Debug)] pub(crate) struct ReadOnlyStream(Cursor>); @@ -348,6 +335,7 @@ pub(crate) fn connect_host( hostname: &str, port: u16, ) -> Result<(TcpStream, SocketAddr), Error> { + const TIMEOUT_MSG: &str = "timed out connecting"; let connect_deadline: Option = if let Some(timeout_connect) = unit.agent.config.timeout_connect { Instant::now().checked_add(timeout_connect) @@ -382,7 +370,7 @@ pub(crate) fn connect_host( // ensure connect timeout or overall timeout aren't yet hit. let timeout = match connect_deadline { Some(deadline) => { - let mut deadline = time_until_deadline(deadline)?; + let mut deadline = time_until_deadline(deadline, TIMEOUT_MSG)?; if multiple_addrs { deadline = deadline.div(2); } @@ -430,14 +418,10 @@ pub(crate) fn connect_host( stream.set_nodelay(unit.agent.config.no_delay)?; if let Some(deadline) = unit.deadline { - stream.set_read_timeout(Some(time_until_deadline(deadline)?))?; + stream.set_read_timeout(Some(time_until_deadline(deadline, TIMEOUT_MSG)?))?; + stream.set_write_timeout(Some(time_until_deadline(deadline, TIMEOUT_MSG)?))?; } else { stream.set_read_timeout(unit.agent.config.timeout_read)?; - } - - if let Some(deadline) = unit.deadline { - stream.set_write_timeout(Some(time_until_deadline(deadline)?))?; - } else { stream.set_write_timeout(unit.agent.config.timeout_write)?; } @@ -562,7 +546,7 @@ fn connect_socks( let (lock, cvar) = &*master_signal; let done = lock.lock().unwrap(); - let timeout_connect = time_until_deadline(deadline)?; + let timeout_connect = time_until_deadline(deadline, "SOCKS proxy timed out connecting")?; let done_result = cvar.wait_timeout(done, timeout_connect).unwrap(); let done = done_result.0; if *done { diff --git a/src/timeout.rs b/src/timeout.rs new file mode 100644 index 00000000..bc44c2f9 --- /dev/null +++ b/src/timeout.rs @@ -0,0 +1,18 @@ +//! Timeout utilities, mostly used during connecting. + +use std::io; +use std::time::{Duration, Instant}; + +/// If the deadline is in the future, return the remaining time until +/// then. Otherwise return a TimedOut error. +pub fn time_until_deadline>(deadline: Instant, error: S) -> io::Result { + let now = Instant::now(); + match deadline.checked_duration_since(now) { + None => Err(io_err_timeout(error.into())), + Some(duration) => Ok(duration), + } +} + +pub fn io_err_timeout(error: String) -> io::Error { + io::Error::new(io::ErrorKind::TimedOut, error) +} From 71f7bab2c7403d06595a1bac7ccb7d244cfba480 Mon Sep 17 00:00:00 2001 From: Matt Kline Date: Sat, 3 Feb 2024 14:57:06 -0800 Subject: [PATCH 2/2] Implmement Happy Eyeballs RFC --- src/eyeballs.rs | 163 ++++++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 1 + src/response.rs | 2 +- src/stream.rs | 66 +++++++------------- 4 files changed, 186 insertions(+), 46 deletions(-) create mode 100644 src/eyeballs.rs diff --git a/src/eyeballs.rs b/src/eyeballs.rs new file mode 100644 index 00000000..44cf1c17 --- /dev/null +++ b/src/eyeballs.rs @@ -0,0 +1,163 @@ +//! A Happy Eyeballs RFC implementation +//! +//! Races interleaved IPv4 and IPv6 connections to provide the fastest connection +//! in cases where certain addresses or address families might be blocked, broken, or slow. +//! (See ) +//! +//! ureq strives for simplicity, and avoids spawning threads where it can, +//! but - like with SOCKS - there's no way around it here. +//! Some mini internal async executor +//! (discussed in ) +//! wouldn't help - `connect()` is a blocking syscall with no non-blocking alternative. +//! (Big async runtimes like Tokio "solve" this problem by keeping a pool of OS threads +//! around for just these sorts of blocking calls.) +//! We _could_ have some thread pool (a la rayon) to avoid spawning threads +//! on each connection attempt, but spawning a few threads is a cheap operation +//! compared to everything else going on here. +//! (DNS resolution, handshaking across the Internet...) +//! +//! Much of this implementation was inspired by attohttpc's: +//! + +use std::{ + io, + iter::FusedIterator, + net::{SocketAddr, TcpStream}, + sync::mpsc::{channel, RecvTimeoutError}, + thread, + time::Instant, +}; + +use log::debug; + +use crate::timeout::{io_err_timeout, time_until_deadline}; + +const TIMEOUT_MSG: &str = "timed out connecting"; + +pub fn connect( + netloc: String, + addrs: &[SocketAddr], + deadline: Option, +) -> io::Result<(TcpStream, SocketAddr)> { + assert!(!addrs.is_empty()); + + // No racing needed if there's a single address. + if let [single] = addrs { + return single_connection(&netloc, *single, deadline); + } + + // Interleave IPV4 and IPV6 addresses + let fours = addrs.iter().filter(|a| matches!(a, SocketAddr::V4(_))); + let sixes = addrs.iter().filter(|a| matches!(a, SocketAddr::V6(_))); + let sorted = interleave(fours, sixes); + + let (tx, rx) = channel(); + let mut first_error = None; + + // Race connections! + // The RFC says: + // + // 1. Not to start connections "simultaneously", but since `connect()` + // syscalls don't return until they've connected or timed out, + // we don't have a way to start an attempt without blocking until it finishes. + // (And if we did that, we wouldn't be racing!) + // + // 2. Once we have a successful connection, all other attempts should be cancelled. + // Doing so would require a lot of nasty (and platform-specific) signal handling, + // as it's the only way to interrupt `connect()`. + for s in sorted { + // Instead, make a best effort to not start new connections if we've got one already. + if let Ok(resp) = rx.try_recv() { + match resp { + Ok(c) => return Ok(c), + Err(e) => { + let _ = first_error.get_or_insert(e); + } + } + } + + let tx2 = tx.clone(); + let nl2 = netloc.clone(); + let s2 = *s; + thread::spawn(move || { + // If the receiver was dropped, someone else already won the race. + let _ = tx2.send(single_connection(&nl2, s2, deadline)); + }); + } + drop(tx); + + const UNREACHABLE_MSG: &str = + "Unreachable: All Happy Eyeballs connections failed, but no error"; + + if let Some(d) = deadline { + // Wait for a successful connection, or for us to run out of time + loop { + let timeout = time_until_deadline(d, TIMEOUT_MSG)?; + match rx.recv_timeout(timeout) { + Ok(Ok(c)) => return Ok(c), + Ok(Err(e)) => { + let _ = first_error.get_or_insert(e); + } + Err(RecvTimeoutError::Timeout) => { + return Err(io_err_timeout(TIMEOUT_MSG.to_string())) + } + // If all the connecting threads hung up and none succeeded, + // return the first error. + Err(RecvTimeoutError::Disconnected) => { + return Err(first_error.expect(UNREACHABLE_MSG)) + } + }; + } + } else { + // If there's no deadline, just wait around. + let connections = rx.iter(); + for c in connections { + match c { + Ok(c) => return Ok(c), + Err(e) => { + let _ = first_error.get_or_insert(e); + } + } + } + // If we got here, everyone failed. Return the first error. + Err(first_error.expect(UNREACHABLE_MSG)) + } +} + +fn single_connection( + netloc: &str, + addr: SocketAddr, + deadline: Option, +) -> io::Result<(TcpStream, SocketAddr)> { + debug!("connecting to {} at {}", netloc, addr); + if let Some(d) = deadline { + let timeout = time_until_deadline(d, TIMEOUT_MSG)?; + Ok((TcpStream::connect_timeout(&addr, timeout)?, addr)) + } else { + Ok((TcpStream::connect(addr)?, addr)) + } +} + +fn interleave(mut left: A, mut right: B) -> impl Iterator +where + A: FusedIterator, + B: FusedIterator, +{ + let mut last_right = None; + + std::iter::from_fn(move || { + if let Some(r) = last_right.take() { + return Some(r); + } + + match (left.next(), right.next()) { + (Some(l), Some(r)) => { + last_right = Some(r); + Some(l) + } + (Some(l), None) => Some(l), + (None, Some(r)) => Some(r), + (None, None) => None, + } + }) +} diff --git a/src/lib.rs b/src/lib.rs index 08a831ca..e4104ce3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -358,6 +358,7 @@ mod agent; mod body; mod chunked; mod error; +mod eyeballs; mod header; mod middleware; mod pool; diff --git a/src/response.rs b/src/response.rs index 0223068a..0b13ec27 100644 --- a/src/response.rs +++ b/src/response.rs @@ -530,7 +530,7 @@ impl Response { /// ``` #[cfg(feature = "json")] pub fn into_json(self) -> io::Result { - use crate::stream::io_err_timeout; + use crate::timeout::io_err_timeout; let reader = self.into_reader(); serde_json::from_reader(reader).map_err(|e| { diff --git a/src/stream.rs b/src/stream.rs index ab5645db..03543279 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -2,7 +2,6 @@ use log::debug; use std::io::{self, BufRead, BufReader, Read, Write}; use std::net::SocketAddr; use std::net::TcpStream; -use std::ops::Div; use std::time::Duration; use std::time::Instant; use std::{fmt, io::Cursor}; @@ -12,6 +11,7 @@ use socks::{TargetAddr, ToTargetAddr}; use crate::chunked::Decoder as ChunkDecoder; use crate::error::ErrorKind; +use crate::eyeballs; use crate::pool::{PoolKey, PoolReturner}; use crate::proxy::Proxy; use crate::timeout::{io_err_timeout, time_until_deadline}; @@ -361,30 +361,20 @@ pub(crate) fn connect_host( let proto = proxy.as_ref().map(|proxy| proxy.proto); - let mut any_err = None; - let mut any_stream_and_addr = None; - // Find the first sock_addr that accepts a connection - let multiple_addrs = sock_addrs.len() > 1; - - for sock_addr in sock_addrs { - // ensure connect timeout or overall timeout aren't yet hit. - let timeout = match connect_deadline { - Some(deadline) => { - let mut deadline = time_until_deadline(deadline, TIMEOUT_MSG)?; - if multiple_addrs { - deadline = deadline.div(2); - } - Some(deadline) - } - None => None, - }; - - debug!("connecting to {} at {}", netloc, &sock_addr); - - // connect with a configured timeout. - #[allow(clippy::unnecessary_unwrap)] - let stream = if proto.is_some() && Some(Proto::HTTP) != proto { - connect_socks( + let (mut stream, remote_addr) = if proto.is_some() && Some(Proto::HTTP) != proto { + // SOCKS proxy connections. + // Don't mix that with happy eyeballs + // (where we race multiple connections and take the fastest) + // since we'd be repeatedly connecting to the same proxy server. + let mut stream_and_addr_result = None; + // Find the first sock_addr that accepts a connection + for sock_addr in sock_addrs { + // ensure connect timeout or overall timeout aren't yet hit. + debug!("connecting to {} at {}", netloc, &sock_addr); + + // connect with a configured timeout. + #[allow(clippy::unnecessary_unwrap)] + let stream = connect_socks( unit, proxy.clone().unwrap(), connect_deadline, @@ -392,28 +382,14 @@ pub(crate) fn connect_host( hostname, port, proto.unwrap(), - ) - } else if let Some(timeout) = timeout { - TcpStream::connect_timeout(&sock_addr, timeout) - } else { - TcpStream::connect(sock_addr) - }; - - if let Ok(stream) = stream { - any_stream_and_addr = Some((stream, sock_addr)); - break; - } else if let Err(err) = stream { - any_err = Some(err); + ); + stream_and_addr_result = Some(stream.map(|s| (s, sock_addr))); } - } - - let (mut stream, remote_addr) = if let Some(stream_and_addr) = any_stream_and_addr { - stream_and_addr - } else if let Some(e) = any_err { - return Err(ErrorKind::ConnectionFailed.msg("Connect error").src(e)); + stream_and_addr_result.expect("unreachable: connected to IPs, but no result") } else { - panic!("shouldn't happen: failed to connect to all IPs, but no error"); - }; + eyeballs::connect(netloc, &sock_addrs, connect_deadline) + } + .map_err(|e| ErrorKind::ConnectionFailed.msg("Connect error").src(e))?; stream.set_nodelay(unit.agent.config.no_delay)?;