Skip to content

Commit bd976e1

Browse files
committed
Implmement Happy Eyeballs RFC
1 parent def31b3 commit bd976e1

File tree

3 files changed

+185
-45
lines changed

3 files changed

+185
-45
lines changed

src/eyeballs.rs

+163
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
//! A Happy Eyeballs RFC implementation
2+
//!
3+
//! Races interleaved IPv4 and IPv6 connections to provide the fastest connection
4+
//! in cases where certain addresses or address families might be blocked, broken, or slow.
5+
//! (See <https://datatracker.ietf.org/doc/html/rfc8305>)
6+
//!
7+
//! ureq strives for simplicity, and avoids spawning threads where it can,
8+
//! but - like with SOCKS - there's no way around it here.
9+
//! Some mini internal async executor
10+
//! (discussed in https://github.com/algesten/ureq/issues/535#issuecomment-1229433311)
11+
//! wouldn't help - `connect()` is a blocking syscall with no non-blocking alternative.
12+
//! (Big async runtimes like Tokio "solve" this problem by keeping a pool of OS threads
13+
//! around for just these sorts of blocking calls.)
14+
//! We _could_ have some thread pool (a la rayon) to avoid spawning threads
15+
//! on each connection attempt, but spawning a few threads is a cheap operation
16+
//! compared to everything else going on here.
17+
//! (DNS resolution, handshaking across the Internet...)
18+
//!
19+
//! Much of this implementation was inspired by attohttpc's:
20+
//! <https://github.com/sbstp/attohttpc/blob/master/src/happy.rs>
21+
22+
use std::{
23+
io,
24+
iter::FusedIterator,
25+
net::{SocketAddr, TcpStream},
26+
sync::mpsc::{channel, RecvTimeoutError},
27+
thread,
28+
time::Instant,
29+
};
30+
31+
use log::debug;
32+
33+
use crate::timeout::{io_err_timeout, time_until_deadline};
34+
35+
const TIMEOUT_MSG: &str = "timed out connecting";
36+
37+
pub fn connect(
38+
netloc: String,
39+
addrs: &[SocketAddr],
40+
deadline: Option<Instant>,
41+
) -> io::Result<(TcpStream, SocketAddr)> {
42+
assert!(!addrs.is_empty());
43+
44+
// No racing needed if there's a single address.
45+
if let [single] = addrs {
46+
return single_connection(&netloc, *single, deadline);
47+
}
48+
49+
// Interleave IPV4 and IPV6 addresses
50+
let fours = addrs.iter().filter(|a| matches!(a, SocketAddr::V4(_)));
51+
let sixes = addrs.iter().filter(|a| matches!(a, SocketAddr::V6(_)));
52+
let sorted = interleave(fours, sixes);
53+
54+
let (tx, rx) = channel();
55+
let mut first_error = None;
56+
57+
// Race connections!
58+
// The RFC says:
59+
//
60+
// 1. Not to start connections "simultaneously", but since `connect()`
61+
// syscalls don't return until they've connected or timed out,
62+
// we don't have a way to start an attempt without blocking until it finishes.
63+
// (And if we did that, we wouldn't be racing!)
64+
//
65+
// 2. Once we have a successful connection, all other attempts should be cancelled.
66+
// Doing so would require a lot of nasty (and platform-specific) signal handling,
67+
// as it's the only way to interrupt `connect()`.
68+
for s in sorted {
69+
// Instead, make a best effort to not start new connections if we've got one already.
70+
if let Ok(resp) = rx.try_recv() {
71+
match resp {
72+
Ok(c) => return Ok(c),
73+
Err(e) => {
74+
let _ = first_error.get_or_insert(e);
75+
}
76+
}
77+
}
78+
79+
let tx2 = tx.clone();
80+
let nl2 = netloc.clone();
81+
let s2 = *s;
82+
thread::spawn(move || {
83+
// If the receiver was dropped, someone else already won the race.
84+
let _ = tx2.send(single_connection(&nl2, s2, deadline));
85+
});
86+
}
87+
drop(tx);
88+
89+
const UNREACHABLE_MSG: &str =
90+
"Unreachable: All Happy Eyeballs connections failed, but no error";
91+
92+
if let Some(d) = deadline {
93+
// Wait for a successful connection, or for us to run out of time
94+
loop {
95+
let timeout = time_until_deadline(d, TIMEOUT_MSG)?;
96+
match rx.recv_timeout(timeout) {
97+
Ok(Ok(c)) => return Ok(c),
98+
Ok(Err(e)) => {
99+
let _ = first_error.get_or_insert(e);
100+
}
101+
Err(RecvTimeoutError::Timeout) => {
102+
return Err(io_err_timeout(TIMEOUT_MSG.to_string()))
103+
}
104+
// If all the connecting threads hung up and none succeeded,
105+
// return the first error.
106+
Err(RecvTimeoutError::Disconnected) => {
107+
return Err(first_error.expect(UNREACHABLE_MSG))
108+
}
109+
};
110+
}
111+
} else {
112+
// If there's no deadline, just wait around.
113+
let connections = rx.iter();
114+
for c in connections {
115+
match c {
116+
Ok(c) => return Ok(c),
117+
Err(e) => {
118+
let _ = first_error.get_or_insert(e);
119+
}
120+
}
121+
}
122+
// If we got here, everyone failed. Return the first error.
123+
Err(first_error.expect(UNREACHABLE_MSG))
124+
}
125+
}
126+
127+
fn single_connection(
128+
netloc: &str,
129+
addr: SocketAddr,
130+
deadline: Option<Instant>,
131+
) -> io::Result<(TcpStream, SocketAddr)> {
132+
debug!("connecting to {} at {}", netloc, addr);
133+
if let Some(d) = deadline {
134+
let timeout = time_until_deadline(d, TIMEOUT_MSG)?;
135+
Ok((TcpStream::connect_timeout(&addr, timeout)?, addr))
136+
} else {
137+
Ok((TcpStream::connect(addr)?, addr))
138+
}
139+
}
140+
141+
fn interleave<T, A, B>(mut left: A, mut right: B) -> impl Iterator<Item = T>
142+
where
143+
A: FusedIterator<Item = T>,
144+
B: FusedIterator<Item = T>,
145+
{
146+
let mut last_right = None;
147+
148+
std::iter::from_fn(move || {
149+
if let Some(r) = last_right.take() {
150+
return Some(r);
151+
}
152+
153+
match (left.next(), right.next()) {
154+
(Some(l), Some(r)) => {
155+
last_right = Some(r);
156+
Some(l)
157+
}
158+
(Some(l), None) => Some(l),
159+
(None, Some(r)) => Some(r),
160+
(None, None) => None,
161+
}
162+
})
163+
}

src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,7 @@ mod agent;
358358
mod body;
359359
mod chunked;
360360
mod error;
361+
mod eyeballs;
361362
mod header;
362363
mod middleware;
363364
mod pool;

src/stream.rs

+21-45
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ use log::debug;
22
use std::io::{self, BufRead, BufReader, Read, Write};
33
use std::net::SocketAddr;
44
use std::net::TcpStream;
5-
use std::ops::Div;
65
use std::time::Duration;
76
use std::time::Instant;
87
use std::{fmt, io::Cursor};
@@ -12,6 +11,7 @@ use socks::{TargetAddr, ToTargetAddr};
1211

1312
use crate::chunked::Decoder as ChunkDecoder;
1413
use crate::error::ErrorKind;
14+
use crate::eyeballs;
1515
use crate::pool::{PoolKey, PoolReturner};
1616
use crate::proxy::Proxy;
1717
use crate::timeout::{io_err_timeout, time_until_deadline};
@@ -361,59 +361,35 @@ pub(crate) fn connect_host(
361361

362362
let proto = proxy.as_ref().map(|proxy| proxy.proto);
363363

364-
let mut any_err = None;
365-
let mut any_stream_and_addr = None;
366-
// Find the first sock_addr that accepts a connection
367-
let multiple_addrs = sock_addrs.len() > 1;
368-
369-
for sock_addr in sock_addrs {
370-
// ensure connect timeout or overall timeout aren't yet hit.
371-
let timeout = match connect_deadline {
372-
Some(deadline) => {
373-
let mut deadline = time_until_deadline(deadline, TIMEOUT_MSG)?;
374-
if multiple_addrs {
375-
deadline = deadline.div(2);
376-
}
377-
Some(deadline)
378-
}
379-
None => None,
380-
};
381-
382-
debug!("connecting to {} at {}", netloc, &sock_addr);
383-
384-
// connect with a configured timeout.
385-
#[allow(clippy::unnecessary_unwrap)]
386-
let stream = if proto.is_some() && Some(Proto::HTTP) != proto {
387-
connect_socks(
364+
let (mut stream, remote_addr) = if proto.is_some() && Some(Proto::HTTP) != proto {
365+
// SOCKS proxy connections.
366+
// Don't mix that with happy eyeballs
367+
// (where we race multiple connections and take the fastest)
368+
// since we'd be repeatedly connecting to the same proxy server.
369+
let mut stream_and_addr_result = None;
370+
// Find the first sock_addr that accepts a connection
371+
for sock_addr in sock_addrs {
372+
// ensure connect timeout or overall timeout aren't yet hit.
373+
debug!("connecting to {} at {}", netloc, &sock_addr);
374+
375+
// connect with a configured timeout.
376+
#[allow(clippy::unnecessary_unwrap)]
377+
let stream = connect_socks(
388378
unit,
389379
proxy.clone().unwrap(),
390380
connect_deadline,
391381
sock_addr,
392382
hostname,
393383
port,
394384
proto.unwrap(),
395-
)
396-
} else if let Some(timeout) = timeout {
397-
TcpStream::connect_timeout(&sock_addr, timeout)
398-
} else {
399-
TcpStream::connect(sock_addr)
400-
};
401-
402-
if let Ok(stream) = stream {
403-
any_stream_and_addr = Some((stream, sock_addr));
404-
break;
405-
} else if let Err(err) = stream {
406-
any_err = Some(err);
385+
);
386+
stream_and_addr_result = Some(stream.map(|s| (s, sock_addr)));
407387
}
408-
}
409-
410-
let (mut stream, remote_addr) = if let Some(stream_and_addr) = any_stream_and_addr {
411-
stream_and_addr
412-
} else if let Some(e) = any_err {
413-
return Err(ErrorKind::ConnectionFailed.msg("Connect error").src(e));
388+
stream_and_addr_result.expect("unreachable: connected to IPs, but no result")
414389
} else {
415-
panic!("shouldn't happen: failed to connect to all IPs, but no error");
416-
};
390+
eyeballs::connect(netloc, &sock_addrs, connect_deadline)
391+
}
392+
.map_err(|e| ErrorKind::ConnectionFailed.msg("Connect error").src(e))?;
417393

418394
stream.set_nodelay(unit.agent.config.no_delay)?;
419395

0 commit comments

Comments
 (0)