From 0720a3350c918eaab8155845f6434791f06f12e7 Mon Sep 17 00:00:00 2001 From: 0x676e67 Date: Sun, 26 Jan 2025 21:40:32 +0800 Subject: [PATCH 1/6] fix(client): Fix `HTTP/2` upgrade `WebSocket` --- src/client/legacy/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client/legacy/client.rs b/src/client/legacy/client.rs index d3d5605..fd744d3 100644 --- a/src/client/legacy/client.rs +++ b/src/client/legacy/client.rs @@ -316,7 +316,7 @@ where } else { origin_form(req.uri_mut()); } - } else if req.method() == Method::CONNECT { + } else if req.method() == Method::CONNECT && !pooled.is_http2() { authority_form(req.uri_mut()); } From 558c0bfa4ea6cf4dafe4db5e860e63551af7b9e2 Mon Sep 17 00:00:00 2001 From: 0x676e67 Date: Tue, 28 Jan 2025 14:20:58 +0800 Subject: [PATCH 2/6] chore: Add client http2 upgrade test --- tests/legacy_client.rs | 62 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/tests/legacy_client.rs b/tests/legacy_client.rs index 0f11d77..486ca8f 100644 --- a/tests/legacy_client.rs +++ b/tests/legacy_client.rs @@ -807,6 +807,68 @@ fn client_upgrade() { assert_eq!(vec, b"bar=foo"); } +#[cfg(not(miri))] +#[test] +fn client_http2_upgrade() { + use http::{Method, Response, Version}; + use hyper::service::service_fn; + use tokio::net::TcpListener; + + let _ = pretty_env_logger::try_init(); + let rt = runtime(); + let server = rt + .block_on(TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))) + .unwrap(); + let addr = server.local_addr().unwrap(); + let mut connector = DebugConnector::new(); + connector.alpn_h2 = true; + + let client = Client::builder(TokioExecutor::new()).build(connector); + + rt.spawn(async move { + let (stream, _) = server.accept().await.expect("accept"); + let stream = TokioIo::new(stream); + let mut builder = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new()); + // IMPORTANT: This is required to advertise our support for HTTP/2 websockets to the client. + builder.http2().enable_connect_protocol(); + let _ = builder + .serve_connection_with_upgrades( + stream, + service_fn(|req| async move { + assert_eq!(req.headers().get("host"), None); + assert_eq!(req.version(), Version::HTTP_2); + assert_eq!( + req.headers().get(http::header::SEC_WEBSOCKET_VERSION), + Some(&http::header::HeaderValue::from_static("13")) + ); + assert_eq!( + req.extensions().get::(), + Some(&hyper::ext::Protocol::from_static("websocket")) + ); + Ok::<_, hyper::Error>(Response::new(Empty::::default())) + }), + ) + .await + .expect("server"); + }); + + let req = Request::builder() + .method(Method::CONNECT) + .uri(&*format!("http://{}/up", addr)) + .version(Version::HTTP_2) + .header(http::header::SEC_WEBSOCKET_VERSION, "13") + .extension(hyper::ext::Protocol::from_static("websocket")) + .body(Empty::::new()) + .unwrap(); + + let res = client.request(req); + let res = rt.block_on(res).unwrap(); + + assert_eq!(res.status(), 200); + assert_eq!(res.version(), Version::HTTP_2); + let _ = rt.block_on(hyper::upgrade::on(res)).expect("on_upgrade"); +} + #[cfg(not(miri))] #[test] fn alpn_h2() { From 69e57a9947bbcd8ceafb9ed47e0011a0b627d75b Mon Sep 17 00:00:00 2001 From: 0x676e67 Date: Tue, 28 Jan 2025 14:51:20 +0800 Subject: [PATCH 3/6] test: update client_http2_upgrade test --- tests/legacy_client.rs | 29 ++++++++++++++++++++++++++--- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/tests/legacy_client.rs b/tests/legacy_client.rs index 486ca8f..fd6d948 100644 --- a/tests/legacy_client.rs +++ b/tests/legacy_client.rs @@ -812,6 +812,7 @@ fn client_upgrade() { fn client_http2_upgrade() { use http::{Method, Response, Version}; use hyper::service::service_fn; + use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpListener; let _ = pretty_env_logger::try_init(); @@ -834,7 +835,7 @@ fn client_http2_upgrade() { let _ = builder .serve_connection_with_upgrades( stream, - service_fn(|req| async move { + service_fn(|mut req| async move { assert_eq!(req.headers().get("host"), None); assert_eq!(req.version(), Version::HTTP_2); assert_eq!( @@ -845,7 +846,22 @@ fn client_http2_upgrade() { req.extensions().get::(), Some(&hyper::ext::Protocol::from_static("websocket")) ); - Ok::<_, hyper::Error>(Response::new(Empty::::default())) + + let on_upgrade = req.extensions_mut().remove::(); + tokio::spawn(async move { + let on_upgrade = on_upgrade.unwrap(); + let upgraded = on_upgrade.await.unwrap(); + let mut io = TokioIo::new(upgraded); + + let mut vec = vec![]; + io.read_buf(&mut vec).await.unwrap(); + assert_eq!(vec, b"foo=bar"); + io.write_all(b"bar=foo").await.unwrap(); + }); + + Ok::<_, hyper::Error>(Response::new(Full::::new(Bytes::from_static( + b"foobar=ready", + )))) }), ) .await @@ -866,7 +882,14 @@ fn client_http2_upgrade() { assert_eq!(res.status(), 200); assert_eq!(res.version(), Version::HTTP_2); - let _ = rt.block_on(hyper::upgrade::on(res)).expect("on_upgrade"); + + let upgraded = rt.block_on(hyper::upgrade::on(res)).expect("on_upgrade"); + let mut io = TokioIo::new(upgraded); + + rt.block_on(io.write_all(b"foo=bar")).unwrap(); + let mut vec = vec![]; + rt.block_on(io.read_to_end(&mut vec)).unwrap(); + assert_eq!(vec, b"bar=foo"); } #[cfg(not(miri))] From 6a9019446a981be94e8d38d2cb6a09c90b407a35 Mon Sep 17 00:00:00 2001 From: 0x676e67 Date: Tue, 28 Jan 2025 18:48:47 +0800 Subject: [PATCH 4/6] test: update client_http2_upgrade test --- tests/legacy_client.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/legacy_client.rs b/tests/legacy_client.rs index fd6d948..98b5205 100644 --- a/tests/legacy_client.rs +++ b/tests/legacy_client.rs @@ -859,9 +859,7 @@ fn client_http2_upgrade() { io.write_all(b"bar=foo").await.unwrap(); }); - Ok::<_, hyper::Error>(Response::new(Full::::new(Bytes::from_static( - b"foobar=ready", - )))) + Ok::<_, hyper::Error>(Response::new(Empty::::new())) }), ) .await From 4a9cdc2fa1007bcead10237693763b37770d3515 Mon Sep 17 00:00:00 2001 From: 0x676e67 Date: Tue, 28 Jan 2025 19:01:19 +0800 Subject: [PATCH 5/6] test: update client_http2_upgrade test --- tests/legacy_client.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/tests/legacy_client.rs b/tests/legacy_client.rs index 98b5205..f0440c7 100644 --- a/tests/legacy_client.rs +++ b/tests/legacy_client.rs @@ -807,7 +807,7 @@ fn client_upgrade() { assert_eq!(vec, b"bar=foo"); } -#[cfg(not(miri))] +// #[cfg(not(miri))] #[test] fn client_http2_upgrade() { use http::{Method, Response, Version}; @@ -835,7 +835,7 @@ fn client_http2_upgrade() { let _ = builder .serve_connection_with_upgrades( stream, - service_fn(|mut req| async move { + service_fn(|req| async move { assert_eq!(req.headers().get("host"), None); assert_eq!(req.version(), Version::HTTP_2); assert_eq!( @@ -847,9 +847,8 @@ fn client_http2_upgrade() { Some(&hyper::ext::Protocol::from_static("websocket")) ); - let on_upgrade = req.extensions_mut().remove::(); + let on_upgrade = hyper::upgrade::on(req); tokio::spawn(async move { - let on_upgrade = on_upgrade.unwrap(); let upgraded = on_upgrade.await.unwrap(); let mut io = TokioIo::new(upgraded); @@ -869,8 +868,8 @@ fn client_http2_upgrade() { let req = Request::builder() .method(Method::CONNECT) .uri(&*format!("http://{}/up", addr)) - .version(Version::HTTP_2) .header(http::header::SEC_WEBSOCKET_VERSION, "13") + .version(Version::HTTP_2) .extension(hyper::ext::Protocol::from_static("websocket")) .body(Empty::::new()) .unwrap(); @@ -878,7 +877,7 @@ fn client_http2_upgrade() { let res = client.request(req); let res = rt.block_on(res).unwrap(); - assert_eq!(res.status(), 200); + assert_eq!(res.status(), http::StatusCode::OK); assert_eq!(res.version(), Version::HTTP_2); let upgraded = rt.block_on(hyper::upgrade::on(res)).expect("on_upgrade"); From a2caf7df30fa97a208b2d140e3bf0b4ca4c0791e Mon Sep 17 00:00:00 2001 From: 0x676e67 Date: Tue, 28 Jan 2025 19:01:40 +0800 Subject: [PATCH 6/6] test: update client_http2_upgrade test --- tests/legacy_client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/legacy_client.rs b/tests/legacy_client.rs index f0440c7..95983df 100644 --- a/tests/legacy_client.rs +++ b/tests/legacy_client.rs @@ -807,7 +807,7 @@ fn client_upgrade() { assert_eq!(vec, b"bar=foo"); } -// #[cfg(not(miri))] +#[cfg(not(miri))] #[test] fn client_http2_upgrade() { use http::{Method, Response, Version};