Skip to content

Commit 621a5a9

Browse files
aumetralalitb
andauthored
Upgrade to hyper/http v1.0 (#1674)
Co-authored-by: Lalit Kumar Bhasin <labhas@microsoft.com>
1 parent 166a127 commit 621a5a9

File tree

26 files changed

+187
-137
lines changed

26 files changed

+187
-137
lines changed

Cargo.toml

+10-8
Original file line numberDiff line numberDiff line change
@@ -23,23 +23,25 @@ criterion = "0.5"
2323
futures-core = "0.3"
2424
futures-executor = "0.3"
2525
futures-util = { version = "0.3", default-features = false }
26-
hyper = { version = "0.14", default-features = false }
27-
http = { version = "0.2", default-features = false }
26+
http = { version = "1.1", default-features = false, features = ["std"] }
27+
http-body-util = "0.1"
28+
hyper = { version = "1.3", default-features = false }
29+
hyper-util = "0.1"
2830
log = "0.4.21"
2931
once_cell = "1.13"
3032
ordered-float = "4.0"
3133
pin-project-lite = "0.2"
32-
prost = "0.12"
33-
prost-build = "0.12"
34-
prost-types = "0.12"
34+
prost = "0.13"
35+
prost-build = "0.13"
36+
prost-types = "0.13"
3537
rand = { version = "0.8", default-features = false }
36-
reqwest = { version = "0.11", default-features = false }
38+
reqwest = { version = "0.12", default-features = false }
3739
serde = { version = "1.0", default-features = false }
3840
serde_json = "1.0"
3941
temp-env = "0.3.6"
4042
thiserror = { version = "1", default-features = false }
41-
tonic = { version = "0.11", default-features = false }
42-
tonic-build = "0.11"
43+
tonic = { version = "0.12", default-features = false }
44+
tonic-build = "0.12"
4345
tokio = { version = "1", default-features = false }
4446
tokio-stream = "0.1.1"
4547
tracing = { version = "0.1", default-features = false }

examples/tracing-http-propagator/Cargo.toml

+2
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@ path = "src/client.rs"
1616
doc = false
1717

1818
[dependencies]
19+
http-body-util = { workspace = true }
1920
hyper = { workspace = true, features = ["full"] }
21+
hyper-util = { workspace = true, features = ["full"] }
2022
tokio = { workspace = true, features = ["full"] }
2123
opentelemetry = { path = "../../opentelemetry" }
2224
opentelemetry_sdk = { path = "../../opentelemetry-sdk" }

examples/tracing-http-propagator/src/client.rs

+5-4
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
1-
use hyper::{body::Body, Client};
1+
use http_body_util::Full;
2+
use hyper_util::{client::legacy::Client, rt::TokioExecutor};
23
use opentelemetry::{
34
global,
45
trace::{SpanKind, TraceContextExt, Tracer},
56
Context, KeyValue,
67
};
7-
use opentelemetry_http::HeaderInjector;
8+
use opentelemetry_http::{Bytes, HeaderInjector};
89
use opentelemetry_sdk::{propagation::TraceContextPropagator, trace::TracerProvider};
910
use opentelemetry_stdout::SpanExporter;
1011

@@ -24,7 +25,7 @@ async fn send_request(
2425
body_content: &str,
2526
span_name: &str,
2627
) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
27-
let client = Client::new();
28+
let client = Client::builder(TokioExecutor::new()).build_http();
2829
let tracer = global::tracer("example/client");
2930
let span = tracer
3031
.span_builder(String::from(span_name))
@@ -37,7 +38,7 @@ async fn send_request(
3738
propagator.inject_context(&cx, &mut HeaderInjector(req.headers_mut().unwrap()))
3839
});
3940
let res = client
40-
.request(req.body(Body::from(String::from(body_content)))?)
41+
.request(req.body(Full::new(Bytes::from(body_content.to_string())))?)
4142
.await?;
4243

4344
cx.span().add_event(

examples/tracing-http-propagator/src/server.rs

+37-19
Original file line numberDiff line numberDiff line change
@@ -1,50 +1,64 @@
1-
use hyper::{
2-
service::{make_service_fn, service_fn},
3-
Body, Request, Response, Server, StatusCode,
4-
};
1+
use http_body_util::{combinators::BoxBody, BodyExt, Full};
2+
use hyper::{body::Incoming, service::service_fn, Request, Response, StatusCode};
3+
use hyper_util::rt::{TokioExecutor, TokioIo};
54
use opentelemetry::{
65
global,
76
trace::{FutureExt, Span, SpanKind, TraceContextExt, Tracer},
87
Context, KeyValue,
98
};
10-
use opentelemetry_http::HeaderExtractor;
9+
use opentelemetry_http::{Bytes, HeaderExtractor};
1110
use opentelemetry_sdk::{propagation::TraceContextPropagator, trace::TracerProvider};
1211
use opentelemetry_semantic_conventions::trace;
1312
use opentelemetry_stdout::SpanExporter;
1413
use std::{convert::Infallible, net::SocketAddr};
14+
use tokio::net::TcpListener;
1515

1616
// Utility function to extract the context from the incoming request headers
17-
fn extract_context_from_request(req: &Request<Body>) -> Context {
17+
fn extract_context_from_request(req: &Request<Incoming>) -> Context {
1818
global::get_text_map_propagator(|propagator| {
1919
propagator.extract(&HeaderExtractor(req.headers()))
2020
})
2121
}
2222

2323
// Separate async function for the handle endpoint
24-
async fn handle_health_check(_req: Request<Body>) -> Result<Response<Body>, Infallible> {
24+
async fn handle_health_check(
25+
_req: Request<Incoming>,
26+
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, Infallible> {
2527
let tracer = global::tracer("example/server");
2628
let mut span = tracer
2729
.span_builder("health_check")
2830
.with_kind(SpanKind::Internal)
2931
.start(&tracer);
3032
span.add_event("Health check accessed", vec![]);
31-
let res = Response::new(Body::from("Server is up and running!"));
33+
34+
let res = Response::new(
35+
Full::new(Bytes::from_static(b"Server is up and running!"))
36+
.map_err(|err| match err {})
37+
.boxed(),
38+
);
39+
3240
Ok(res)
3341
}
3442

3543
// Separate async function for the echo endpoint
36-
async fn handle_echo(req: Request<Body>) -> Result<Response<Body>, Infallible> {
44+
async fn handle_echo(
45+
req: Request<Incoming>,
46+
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, Infallible> {
3747
let tracer = global::tracer("example/server");
3848
let mut span = tracer
3949
.span_builder("echo")
4050
.with_kind(SpanKind::Internal)
4151
.start(&tracer);
4252
span.add_event("Echoing back the request", vec![]);
43-
let res = Response::new(req.into_body());
53+
54+
let res = Response::new(req.into_body().boxed());
55+
4456
Ok(res)
4557
}
4658

47-
async fn router(req: Request<Body>) -> Result<Response<Body>, Infallible> {
59+
async fn router(
60+
req: Request<Incoming>,
61+
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, Infallible> {
4862
// Extract the context from the incoming request headers
4963
let parent_cx = extract_context_from_request(&req);
5064
let response = {
@@ -64,12 +78,13 @@ async fn router(req: Request<Body>) -> Result<Response<Body>, Infallible> {
6478
_ => {
6579
cx.span()
6680
.set_attribute(KeyValue::new(trace::HTTP_RESPONSE_STATUS_CODE, 404));
67-
let mut not_found = Response::default();
81+
let mut not_found = Response::new(BoxBody::default());
6882
*not_found.status_mut() = StatusCode::NOT_FOUND;
6983
Ok(not_found)
7084
}
7185
}
7286
};
87+
7388
response
7489
}
7590

@@ -87,15 +102,18 @@ fn init_tracer() {
87102

88103
#[tokio::main]
89104
async fn main() {
105+
use hyper_util::server::conn::auto::Builder;
106+
90107
init_tracer();
91108
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
109+
let listener = TcpListener::bind(addr).await.unwrap();
92110

93-
let make_svc = make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(router)) });
94-
95-
let server = Server::bind(&addr).serve(make_svc);
96-
97-
println!("Listening on {addr}");
98-
if let Err(e) = server.await {
99-
eprintln!("server error: {e}");
111+
while let Ok((stream, _addr)) = listener.accept().await {
112+
if let Err(err) = Builder::new(TokioExecutor::new())
113+
.serve_connection(TokioIo::new(stream), service_fn(router))
114+
.await
115+
{
116+
eprintln!("{err}");
117+
}
100118
}
101119
}

opentelemetry-appender-tracing/Cargo.toml

-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ rust-version = "1.65"
1212

1313
[dependencies]
1414
log = { workspace = true, optional = true }
15-
once_cell = { workspace = true }
1615
opentelemetry = { version = "0.23", path = "../opentelemetry", features = ["logs"] }
1716
tracing = { workspace = true, features = ["std"]}
1817
tracing-core = { workspace = true }

opentelemetry-http/CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
- **Breaking** Correct the misspelling of "webkpi" to "webpki" in features [#1842](https://github.com/open-telemetry/opentelemetry-rust/pull/1842)
66
- **Breaking** Remove support for the `isahc` HTTP client [#1924](https://github.com/open-telemetry/opentelemetry-rust/pull/1924)
7+
- Update to `http` v1 [#1674](https://github.com/open-telemetry/opentelemetry-rust/pull/1674)
78

89
## v0.12.0
910

opentelemetry-http/Cargo.toml

+4-1
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,17 @@ edition = "2021"
1010
rust-version = "1.65"
1111

1212
[features]
13+
hyper = ["dep:http-body-util", "dep:hyper", "dep:hyper-util", "dep:tokio"]
1314
reqwest-rustls = ["reqwest", "reqwest/rustls-tls-native-roots"]
1415
reqwest-rustls-webpki-roots = ["reqwest", "reqwest/rustls-tls-webpki-roots"]
1516

1617
[dependencies]
1718
async-trait = { workspace = true }
1819
bytes = { workspace = true }
1920
http = { workspace = true }
20-
hyper = { workspace = true, features = ["http2", "client", "tcp"], optional = true }
21+
http-body-util = { workspace = true, optional = true }
22+
hyper = { workspace = true, optional = true }
23+
hyper-util = { workspace = true, features = ["client-legacy", "http2"], optional = true }
2124
opentelemetry = { version = "0.23", path = "../opentelemetry", features = ["trace"] }
2225
reqwest = { workspace = true, features = ["blocking"], optional = true }
2326
tokio = { workspace = true, features = ["time"], optional = true }

opentelemetry-http/src/lib.rs

+37-7
Original file line numberDiff line numberDiff line change
@@ -105,21 +105,24 @@ pub mod hyper {
105105

106106
use super::{async_trait, Bytes, HttpClient, HttpError, Request, Response};
107107
use http::HeaderValue;
108-
use hyper::client::connect::Connect;
109-
use hyper::Client;
108+
use http_body_util::{BodyExt, Full};
109+
use hyper::body::{Body as HttpBody, Frame};
110+
use hyper_util::client::legacy::{connect::Connect, Client};
110111
use std::fmt::Debug;
112+
use std::pin::Pin;
113+
use std::task::{self, Poll};
111114
use std::time::Duration;
112115
use tokio::time;
113116

114117
#[derive(Debug, Clone)]
115118
pub struct HyperClient<C> {
116-
inner: Client<C>,
119+
inner: Client<C, Body>,
117120
timeout: Duration,
118121
authorization: Option<HeaderValue>,
119122
}
120123

121124
impl<C> HyperClient<C> {
122-
pub fn new_with_timeout(inner: Client<C>, timeout: Duration) -> Self {
125+
pub fn new_with_timeout(inner: Client<C, Body>, timeout: Duration) -> Self {
123126
Self {
124127
inner,
125128
timeout,
@@ -128,7 +131,7 @@ pub mod hyper {
128131
}
129132

130133
pub fn new_with_timeout_and_authorization_header(
131-
inner: Client<C>,
134+
inner: Client<C, Body>,
132135
timeout: Duration,
133136
authorization: HeaderValue,
134137
) -> Self {
@@ -147,22 +150,49 @@ pub mod hyper {
147150
{
148151
async fn send(&self, request: Request<Vec<u8>>) -> Result<Response<Bytes>, HttpError> {
149152
let (parts, body) = request.into_parts();
150-
let mut request = Request::from_parts(parts, body.into());
153+
let mut request = Request::from_parts(parts, Body(Full::from(body)));
151154
if let Some(ref authorization) = self.authorization {
152155
request
153156
.headers_mut()
154157
.insert(http::header::AUTHORIZATION, authorization.clone());
155158
}
156159
let mut response = time::timeout(self.timeout, self.inner.request(request)).await??;
157160
let headers = std::mem::take(response.headers_mut());
161+
158162
let mut http_response = Response::builder()
159163
.status(response.status())
160-
.body(hyper::body::to_bytes(response.into_body()).await?)?;
164+
.body(response.into_body().collect().await?.to_bytes())?;
161165
*http_response.headers_mut() = headers;
162166

163167
Ok(http_response.error_for_status()?)
164168
}
165169
}
170+
171+
pub struct Body(Full<Bytes>);
172+
173+
impl HttpBody for Body {
174+
type Data = Bytes;
175+
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
176+
177+
#[inline]
178+
fn poll_frame(
179+
self: Pin<&mut Self>,
180+
cx: &mut task::Context<'_>,
181+
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
182+
let inner_body = unsafe { self.map_unchecked_mut(|b| &mut b.0) };
183+
inner_body.poll_frame(cx).map_err(Into::into)
184+
}
185+
186+
#[inline]
187+
fn is_end_stream(&self) -> bool {
188+
self.0.is_end_stream()
189+
}
190+
191+
#[inline]
192+
fn size_hint(&self) -> hyper::body::SizeHint {
193+
self.0.size_hint()
194+
}
195+
}
166196
}
167197

168198
/// Methods to make working with responses from the [`HttpClient`] trait easier.

opentelemetry-otlp/CHANGELOG.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ now use `.with_resource(RESOURCE::default())` to configure Resource when using
2020
previous release.
2121
- **Breaking** [1869](https://github.com/open-telemetry/opentelemetry-rust/pull/1869) The OTLP logs exporter now overrides the [InstrumentationScope::name](https://github.com/open-telemetry/opentelemetry-proto/blob/b3060d2104df364136d75a35779e6bd48bac449a/opentelemetry/proto/common/v1/common.proto#L73) field with the `target` from `LogRecord`, if target is populated.
2222
- Groups batch of `LogRecord` and `Span` by their resource and instrumentation scope before exporting, for better efficiency [#1873](https://github.com/open-telemetry/opentelemetry-rust/pull/1873).
23-
23+
- **Breaking** Update to `http` v1 and `tonic` v0.12 [#1674](https://github.com/open-telemetry/opentelemetry-rust/pull/1674)
2424

2525
## v0.16.0
2626

opentelemetry-otlp/examples/basic-otlp-http/Cargo.toml

+3-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ publish = false
88
[features]
99
default = ["reqwest"]
1010
reqwest = ["opentelemetry-otlp/reqwest-client"]
11-
hyper = ["dep:async-trait", "dep:http", "dep:hyper", "dep:opentelemetry-http", "dep:bytes"]
11+
hyper = ["dep:async-trait", "dep:http", "dep:http-body-util", "dep:hyper", "dep:hyper-util", "dep:opentelemetry-http", "dep:bytes"]
1212

1313

1414
[dependencies]
@@ -23,7 +23,9 @@ opentelemetry-semantic-conventions = { path = "../../../opentelemetry-semantic-c
2323
async-trait = { workspace = true, optional = true }
2424
bytes = { workspace = true, optional = true }
2525
http = { workspace = true, optional = true }
26+
http-body-util = { workspace = true, optional = true }
2627
hyper = { workspace = true, features = ["client"], optional = true }
28+
hyper-util = { workspace = true, features = ["client-legacy"], optional = true }
2729
tokio = { workspace = true, features = ["full"] }
2830
tracing = { workspace = true, features = ["std"]}
2931
tracing-core = { workspace = true }

opentelemetry-otlp/examples/basic-otlp-http/src/hyper.rs

+11-7
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,24 @@
11
use async_trait::async_trait;
22
use bytes::Bytes;
33
use http::{Request, Response};
4-
use hyper::{
5-
client::{connect::Connect, HttpConnector},
6-
Body, Client,
4+
use http_body_util::{BodyExt, Full};
5+
use hyper_util::{
6+
client::legacy::{
7+
connect::{Connect, HttpConnector},
8+
Client,
9+
},
10+
rt::TokioExecutor,
711
};
812
use opentelemetry_http::{HttpClient, HttpError, ResponseExt};
913

1014
pub struct HyperClient<C> {
11-
inner: hyper::Client<C>,
15+
inner: hyper_util::client::legacy::Client<C, Full<Bytes>>,
1216
}
1317

1418
impl Default for HyperClient<HttpConnector> {
1519
fn default() -> Self {
1620
Self {
17-
inner: Client::new(),
21+
inner: Client::builder(TokioExecutor::new()).build_http(),
1822
}
1923
}
2024
}
@@ -30,15 +34,15 @@ impl<C> std::fmt::Debug for HyperClient<C> {
3034
#[async_trait]
3135
impl<C: Connect + Clone + Send + Sync + 'static> HttpClient for HyperClient<C> {
3236
async fn send(&self, request: Request<Vec<u8>>) -> Result<Response<Bytes>, HttpError> {
33-
let request = request.map(Body::from);
37+
let request = request.map(|body| Full::new(Bytes::from(body)));
3438

3539
let (parts, body) = self
3640
.inner
3741
.request(request)
3842
.await?
3943
.error_for_status()?
4044
.into_parts();
41-
let body = hyper::body::to_bytes(body).await?;
45+
let body = body.collect().await?.to_bytes();
4246

4347
Ok(Response::from_parts(parts, body))
4448
}

0 commit comments

Comments
 (0)