Skip to content

Commit 25f9213

Browse files
committed
Add opaque wrapper around Full body, use wrapper
1 parent 19ddbd8 commit 25f9213

File tree

1 file changed

+38
-13
lines changed

1 file changed

+38
-13
lines changed

opentelemetry-http/src/lib.rs

+38-13
Original file line numberDiff line numberDiff line change
@@ -105,23 +105,51 @@ pub mod hyper {
105105

106106
use super::{async_trait, Bytes, HttpClient, HttpError, Request, Response};
107107
use http::HeaderValue;
108-
use http_body_util::BodyExt;
109-
use hyper::body::Body;
108+
use http_body_util::{BodyExt, Full};
109+
use hyper::body::{Body as HttpBody, Frame};
110110
use hyper_util::client::legacy::{connect::Connect, Client};
111-
use std::error::Error;
111+
use std::convert::Infallible;
112112
use std::fmt::Debug;
113+
use std::pin::Pin;
114+
use std::task::{self, Poll};
113115
use std::time::Duration;
114116
use tokio::time;
115117

118+
pub struct Body(Full<Bytes>);
119+
120+
impl HttpBody for Body {
121+
type Data = Bytes;
122+
type Error = Infallible;
123+
124+
#[inline]
125+
fn poll_frame(
126+
self: Pin<&mut Self>,
127+
cx: &mut task::Context<'_>,
128+
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
129+
let inner_body = unsafe { self.map_unchecked_mut(|b| &mut b.0) };
130+
inner_body.poll_frame(cx)
131+
}
132+
133+
#[inline]
134+
fn is_end_stream(&self) -> bool {
135+
self.0.is_end_stream()
136+
}
137+
138+
#[inline]
139+
fn size_hint(&self) -> hyper::body::SizeHint {
140+
self.0.size_hint()
141+
}
142+
}
143+
116144
#[derive(Debug, Clone)]
117-
pub struct HyperClient<C, B> {
118-
inner: Client<C, B>,
145+
pub struct HyperClient<C> {
146+
inner: Client<C, Body>,
119147
timeout: Duration,
120148
authorization: Option<HeaderValue>,
121149
}
122150

123-
impl<C, B> HyperClient<C, B> {
124-
pub fn new_with_timeout(inner: Client<C, B>, timeout: Duration) -> Self {
151+
impl<C> HyperClient<C> {
152+
pub fn new_with_timeout(inner: Client<C, Body>, timeout: Duration) -> Self {
125153
Self {
126154
inner,
127155
timeout,
@@ -130,7 +158,7 @@ pub mod hyper {
130158
}
131159

132160
pub fn new_with_timeout_and_authorization_header(
133-
inner: Client<C, B>,
161+
inner: Client<C, Body>,
134162
timeout: Duration,
135163
authorization: HeaderValue,
136164
) -> Self {
@@ -143,16 +171,13 @@ pub mod hyper {
143171
}
144172

145173
#[async_trait]
146-
impl<C, B> HttpClient for HyperClient<C, B>
174+
impl<C> HttpClient for HyperClient<C>
147175
where
148176
C: Connect + Send + Sync + Clone + Debug + 'static,
149-
B: From<Vec<u8>> + Body + Send + Sync + Debug + Unpin + 'static,
150-
<B as Body>::Data: Send,
151-
<B as Body>::Error: Into<Box<dyn Error + Send + Sync>>,
152177
{
153178
async fn send(&self, request: Request<Vec<u8>>) -> Result<Response<Bytes>, HttpError> {
154179
let (parts, body) = request.into_parts();
155-
let mut request = Request::from_parts(parts, B::from(body));
180+
let mut request = Request::from_parts(parts, Body(Full::from(body)));
156181
if let Some(ref authorization) = self.authorization {
157182
request
158183
.headers_mut()

0 commit comments

Comments
 (0)