Skip to content

Commit 425206d

Browse files
authored
Merge branch 'master' into max_pending_accept_reset_streams
2 parents 8a54683 + 55b5171 commit 425206d

File tree

6 files changed

+130
-56
lines changed

6 files changed

+130
-56
lines changed

src/client/legacy/client.rs

+39-7
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ use tracing::{debug, trace, warn};
2222
use super::connect::HttpConnector;
2323
use super::connect::{Alpn, Connect, Connected, Connection};
2424
use super::pool::{self, Ver};
25-
use crate::common::{lazy as hyper_lazy, Exec, Lazy, SyncWrapper};
25+
26+
use crate::common::{lazy as hyper_lazy, timer, Exec, Lazy, SyncWrapper};
2627

2728
type BoxSendFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
2829

@@ -632,7 +633,7 @@ where
632633
impl<C: Clone, B> Clone for Client<C, B> {
633634
fn clone(&self) -> Client<C, B> {
634635
Client {
635-
config: self.config.clone(),
636+
config: self.config,
636637
exec: self.exec.clone(),
637638
#[cfg(feature = "http1")]
638639
h1_builder: self.h1_builder.clone(),
@@ -924,7 +925,7 @@ fn set_scheme(uri: &mut Uri, scheme: Scheme) {
924925
uri.scheme().is_none(),
925926
"set_scheme expects no existing scheme"
926927
);
927-
let old = std::mem::replace(uri, Uri::default());
928+
let old = std::mem::take(uri);
928929
let mut parts: ::http::uri::Parts = old.into();
929930
parts.scheme = Some(scheme);
930931
parts.path_and_query = Some("/".parse().expect("slash is a valid path"));
@@ -975,6 +976,7 @@ pub struct Builder {
975976
#[cfg(feature = "http2")]
976977
h2_builder: hyper::client::conn::http2::Builder<Exec>,
977978
pool_config: pool::Config,
979+
pool_timer: Option<timer::Timer>,
978980
}
979981

980982
impl Builder {
@@ -999,13 +1001,34 @@ impl Builder {
9991001
idle_timeout: Some(Duration::from_secs(90)),
10001002
max_idle_per_host: std::usize::MAX,
10011003
},
1004+
pool_timer: None,
10021005
}
10031006
}
10041007
/// Set an optional timeout for idle sockets being kept-alive.
1008+
/// A `Timer` is required for this to take effect. See `Builder::pool_timer`
10051009
///
10061010
/// Pass `None` to disable timeout.
10071011
///
10081012
/// Default is 90 seconds.
1013+
///
1014+
/// # Example
1015+
///
1016+
/// ```
1017+
/// # #[cfg(feature = "tokio")]
1018+
/// # fn run () {
1019+
/// use std::time::Duration;
1020+
/// use hyper_util::client::legacy::Client;
1021+
/// use hyper_util::rt::{TokioExecutor, TokioTimer};
1022+
///
1023+
/// let client = Client::builder(TokioExecutor::new())
1024+
/// .pool_idle_timeout(Duration::from_secs(30))
1025+
/// .pool_timer(TokioTimer::new())
1026+
/// .build_http();
1027+
///
1028+
/// # let infer: Client<_, http_body_util::Full<bytes::Bytes>> = client;
1029+
/// # }
1030+
/// # fn main() {}
1031+
/// ```
10091032
pub fn pool_idle_timeout<D>(&mut self, val: D) -> &mut Self
10101033
where
10111034
D: Into<Option<Duration>>,
@@ -1130,7 +1153,7 @@ impl Builder {
11301153

11311154
/// Sets whether invalid header lines should be silently ignored in HTTP/1 responses.
11321155
///
1133-
/// This mimicks the behaviour of major browsers. You probably don't want this.
1156+
/// This mimics the behaviour of major browsers. You probably don't want this.
11341157
/// You should only want this if you are implementing a proxy whose main
11351158
/// purpose is to sit in front of browsers whose users access arbitrary content
11361159
/// which may be malformed, and they expect everything that works without
@@ -1366,7 +1389,7 @@ impl Builder {
13661389
self
13671390
}
13681391

1369-
/// Provide a timer to be used for timeouts and intervals.
1392+
/// Provide a timer to be used for h2
13701393
///
13711394
/// See the documentation of [`h2::client::Builder::timer`] for more
13721395
/// details.
@@ -1378,7 +1401,15 @@ impl Builder {
13781401
{
13791402
#[cfg(feature = "http2")]
13801403
self.h2_builder.timer(timer);
1381-
// TODO(https://github.com/hyperium/hyper/issues/3167) set for pool as well
1404+
self
1405+
}
1406+
1407+
/// Provide a timer to be used for timeouts and intervals in connection pools.
1408+
pub fn pool_timer<M>(&mut self, timer: M) -> &mut Self
1409+
where
1410+
M: Timer + Clone + Send + Sync + 'static,
1411+
{
1412+
self.pool_timer = Some(timer::Timer::new(timer.clone()));
13821413
self
13831414
}
13841415

@@ -1447,6 +1478,7 @@ impl Builder {
14471478
B::Data: Send,
14481479
{
14491480
let exec = self.exec.clone();
1481+
let timer = self.pool_timer.clone();
14501482
Client {
14511483
config: self.client_config,
14521484
exec: exec.clone(),
@@ -1455,7 +1487,7 @@ impl Builder {
14551487
#[cfg(feature = "http2")]
14561488
h2_builder: self.h2_builder.clone(),
14571489
connector,
1458-
pool: pool::Pool::new(self.pool_config, exec),
1490+
pool: pool::Pool::new(self.pool_config, exec, timer),
14591491
}
14601492
}
14611493
}

src/client/legacy/connect/http.rs

+1-3
Original file line numberDiff line numberDiff line change
@@ -606,9 +606,7 @@ struct ConnectingTcpRemote {
606606

607607
impl ConnectingTcpRemote {
608608
fn new(addrs: dns::SocketAddrs, connect_timeout: Option<Duration>) -> Self {
609-
let connect_timeout = connect_timeout
610-
.map(|t| t.checked_div(addrs.len() as u32))
611-
.flatten();
609+
let connect_timeout = connect_timeout.and_then(|t| t.checked_div(addrs.len() as u32));
612610

613611
Self {
614612
addrs,

src/client/legacy/pool.rs

+49-30
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,13 @@ use std::task::{self, Poll};
1515
use std::time::{Duration, Instant};
1616

1717
use futures_channel::oneshot;
18+
use futures_util::ready;
1819
use tracing::{debug, trace};
1920

20-
use crate::common::{exec, exec::Exec, ready};
21+
use hyper::rt::Sleep;
22+
use hyper::rt::Timer as _;
23+
24+
use crate::common::{exec, exec::Exec, timer::Timer};
2125

2226
// FIXME: allow() required due to `impl Trait` leaking types to this lint
2327
#[allow(missing_debug_implementations)]
@@ -96,6 +100,7 @@ struct PoolInner<T, K: Eq + Hash> {
96100
// the Pool completely drops. That way, the interval can cancel immediately.
97101
idle_interval_ref: Option<oneshot::Sender<Infallible>>,
98102
exec: Exec,
103+
timer: Option<Timer>,
99104
timeout: Option<Duration>,
100105
}
101106

@@ -116,11 +121,13 @@ impl Config {
116121
}
117122

118123
impl<T, K: Key> Pool<T, K> {
119-
pub fn new<E>(config: Config, executor: E) -> Pool<T, K>
124+
pub fn new<E, M>(config: Config, executor: E, timer: Option<M>) -> Pool<T, K>
120125
where
121126
E: hyper::rt::Executor<exec::BoxSendFuture> + Send + Sync + Clone + 'static,
127+
M: hyper::rt::Timer + Send + Sync + Clone + 'static,
122128
{
123129
let exec = Exec::new(executor);
130+
let timer = timer.map(|t| Timer::new(t));
124131
let inner = if config.is_enabled() {
125132
Some(Arc::new(Mutex::new(PoolInner {
126133
connecting: HashSet::new(),
@@ -129,6 +136,7 @@ impl<T, K: Key> Pool<T, K> {
129136
max_idle_per_host: config.max_idle_per_host,
130137
waiters: HashMap::new(),
131138
exec,
139+
timer,
132140
timeout: config.idle_timeout,
133141
})))
134142
} else {
@@ -380,7 +388,7 @@ impl<T: Poolable, K: Key> PoolInner<T, K> {
380388
Some(value) => {
381389
// borrow-check scope...
382390
{
383-
let idle_list = self.idle.entry(key.clone()).or_insert_with(Vec::new);
391+
let idle_list = self.idle.entry(key.clone()).or_default();
384392
if self.max_idle_per_host <= idle_list.len() {
385393
trace!("max idle per host for {:?}, dropping connection", key);
386394
return;
@@ -410,31 +418,33 @@ impl<T: Poolable, K: Key> PoolInner<T, K> {
410418
self.waiters.remove(key);
411419
}
412420

413-
fn spawn_idle_interval(&mut self, _pool_ref: &Arc<Mutex<PoolInner<T, K>>>) {
414-
// TODO
415-
/*
416-
let (dur, rx) = {
417-
if self.idle_interval_ref.is_some() {
418-
return;
419-
}
420-
421-
if let Some(dur) = self.timeout {
422-
let (tx, rx) = oneshot::channel();
423-
self.idle_interval_ref = Some(tx);
424-
(dur, rx)
425-
} else {
426-
return;
427-
}
421+
fn spawn_idle_interval(&mut self, pool_ref: &Arc<Mutex<PoolInner<T, K>>>) {
422+
if self.idle_interval_ref.is_some() {
423+
return;
424+
}
425+
let dur = if let Some(dur) = self.timeout {
426+
dur
427+
} else {
428+
return;
429+
};
430+
let timer = if let Some(timer) = self.timer.clone() {
431+
timer
432+
} else {
433+
return;
428434
};
435+
let (tx, rx) = oneshot::channel();
436+
self.idle_interval_ref = Some(tx);
429437

430438
let interval = IdleTask {
431-
interval: tokio::time::interval(dur),
439+
timer: timer.clone(),
440+
duration: dur,
441+
deadline: Instant::now(),
442+
fut: timer.sleep_until(Instant::now()), // ready at first tick
432443
pool: WeakOpt::downgrade(pool_ref),
433444
pool_drop_notifier: rx,
434445
};
435446

436447
self.exec.execute(interval);
437-
*/
438448
}
439449
}
440450

@@ -754,11 +764,12 @@ impl Expiration {
754764
}
755765
}
756766

757-
/*
758767
pin_project_lite::pin_project! {
759768
struct IdleTask<T, K: Key> {
760-
#[pin]
761-
interval: Interval,
769+
timer: Timer,
770+
duration: Duration,
771+
deadline: Instant,
772+
fut: Pin<Box<dyn Sleep>>,
762773
pool: WeakOpt<Mutex<PoolInner<T, K>>>,
763774
// This allows the IdleTask to be notified as soon as the entire
764775
// Pool is fully dropped, and shutdown. This channel is never sent on,
@@ -783,7 +794,15 @@ impl<T: Poolable + 'static, K: Key> Future for IdleTask<T, K> {
783794
}
784795
}
785796

786-
ready!(this.interval.as_mut().poll_tick(cx));
797+
ready!(Pin::new(&mut this.fut).poll(cx));
798+
// Set this task to run after the next deadline
799+
// If the poll missed the deadline by a lot, set the deadline
800+
// from the current time instead
801+
*this.deadline = *this.deadline + *this.duration;
802+
if *this.deadline < Instant::now() - Duration::from_millis(5) {
803+
*this.deadline = Instant::now() + *this.duration;
804+
}
805+
*this.fut = this.timer.sleep_until(*this.deadline);
787806

788807
if let Some(inner) = this.pool.upgrade() {
789808
if let Ok(mut inner) = inner.lock() {
@@ -796,7 +815,6 @@ impl<T: Poolable + 'static, K: Key> Future for IdleTask<T, K> {
796815
}
797816
}
798817
}
799-
*/
800818

801819
impl<T> WeakOpt<T> {
802820
fn none() -> Self {
@@ -822,7 +840,9 @@ mod tests {
822840
use std::time::Duration;
823841

824842
use super::{Connecting, Key, Pool, Poolable, Reservation, WeakOpt};
825-
use crate::rt::TokioExecutor;
843+
use crate::rt::{TokioExecutor, TokioTimer};
844+
845+
use crate::common::timer;
826846

827847
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
828848
struct KeyImpl(http::uri::Scheme, http::uri::Authority);
@@ -869,6 +889,7 @@ mod tests {
869889
max_idle_per_host: max_idle,
870890
},
871891
TokioExecutor::new(),
892+
Option::<timer::Timer>::None,
872893
);
873894
pool.no_timer();
874895
pool
@@ -959,16 +980,14 @@ mod tests {
959980
}
960981

961982
#[tokio::test]
962-
#[ignore] // TODO
963983
async fn test_pool_timer_removes_expired() {
964-
tokio::time::pause();
965-
966984
let pool = Pool::new(
967985
super::Config {
968986
idle_timeout: Some(Duration::from_millis(10)),
969987
max_idle_per_host: std::usize::MAX,
970988
},
971989
TokioExecutor::new(),
990+
Some(TokioTimer::new()),
972991
);
973992

974993
let key = host_key("foo");
@@ -983,7 +1002,7 @@ mod tests {
9831002
);
9841003

9851004
// Let the timer tick passed the expiration...
986-
tokio::time::advance(Duration::from_millis(30)).await;
1005+
tokio::time::sleep(Duration::from_millis(30)).await;
9871006
// Yield so the Interval can reap...
9881007
tokio::task::yield_now().await;
9891008

src/common/mod.rs

+1-10
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,12 @@
11
#![allow(missing_docs)]
22

3-
macro_rules! ready {
4-
($e:expr) => {
5-
match $e {
6-
std::task::Poll::Ready(v) => v,
7-
std::task::Poll::Pending => return std::task::Poll::Pending,
8-
}
9-
};
10-
}
11-
12-
pub(crate) use ready;
133
pub(crate) mod exec;
144
#[cfg(feature = "client")]
155
mod lazy;
166
pub(crate) mod rewind;
177
#[cfg(feature = "client")]
188
mod sync;
9+
pub(crate) mod timer;
1910

2011
#[cfg(feature = "client")]
2112
pub(crate) use exec::Exec;

src/common/timer.rs

+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
#![allow(dead_code)]
2+
3+
use std::fmt;
4+
use std::pin::Pin;
5+
use std::sync::Arc;
6+
use std::time::Duration;
7+
use std::time::Instant;
8+
9+
use hyper::rt::Sleep;
10+
11+
#[derive(Clone)]
12+
pub(crate) struct Timer(Arc<dyn hyper::rt::Timer + Send + Sync>);
13+
14+
// =====impl Timer=====
15+
impl Timer {
16+
pub(crate) fn new<T>(inner: T) -> Self
17+
where
18+
T: hyper::rt::Timer + Send + Sync + 'static,
19+
{
20+
Self(Arc::new(inner))
21+
}
22+
}
23+
24+
impl fmt::Debug for Timer {
25+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
26+
f.debug_struct("Timer").finish()
27+
}
28+
}
29+
30+
impl hyper::rt::Timer for Timer {
31+
fn sleep(&self, duration: Duration) -> Pin<Box<dyn Sleep>> {
32+
self.0.sleep(duration)
33+
}
34+
35+
fn sleep_until(&self, deadline: Instant) -> Pin<Box<dyn Sleep>> {
36+
self.0.sleep_until(deadline)
37+
}
38+
}

0 commit comments

Comments
 (0)