Skip to content

Commit b5244bd

Browse files
Add support for "Expect: 100-continue" header
Change-Id: I7cc1035e9d36e4b2c94edc5ad571b74c11f74c94
1 parent 0955dfe commit b5244bd

File tree

5 files changed

+138
-78
lines changed

5 files changed

+138
-78
lines changed

src/http_crate.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::{
33
net::{IpAddr, Ipv4Addr, SocketAddr},
44
};
55

6-
use crate::{header::HeaderLine, response::ResponseStatusIndex, Request, Response};
6+
use crate::{header::HeaderLine, response::PendingReader, response::ResponseStatusIndex, Request, Response};
77

88
/// Converts an [`http::Response`] into a [`Response`].
99
///
@@ -44,7 +44,7 @@ impl<T: AsRef<[u8]> + Send + Sync + 'static> From<http::Response<T>> for Respons
4444
HeaderLine::from(raw_header).into_header().unwrap()
4545
})
4646
.collect::<Vec<_>>(),
47-
reader: Box::new(Cursor::new(value.into_body())),
47+
reader: PendingReader::Reader(Box::new(Cursor::new(value.into_body()))),
4848
remote_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 80),
4949
local_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0),
5050
history: vec![],
@@ -330,10 +330,11 @@ mod tests {
330330
fn convert_to_http_response_bytes() {
331331
use http::Response;
332332
use std::io::Cursor;
333+
use crate::response::PendingReader;
333334

334335
let mut response = super::Response::new(200, "OK", "tbr").unwrap();
335336
// b'\xFF' as invalid UTF-8 character
336-
response.reader = Box::new(Cursor::new(vec![b'\xFF', 0xde, 0xad, 0xbe, 0xef]));
337+
response.reader = PendingReader::Reader(Box::new(Cursor::new(vec![b'\xFF', 0xde, 0xad, 0xbe, 0xef])));
337338
let http_response: Response<Vec<u8>> = response.into();
338339

339340
assert_eq!(

src/http_interop.rs

+7-3
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@ use std::{
55
net::{IpAddr, Ipv4Addr, SocketAddr},
66
};
77

8-
use crate::{header::HeaderLine, response::ResponseStatusIndex, Request, Response};
8+
use crate::{
9+
header::HeaderLine, response::PendingReader, response::ResponseStatusIndex, Request, Response,
10+
};
911

1012
/// Converts an [`http::Response`] into a [`Response`].
1113
///
@@ -47,7 +49,7 @@ impl<T: AsRef<[u8]> + Send + Sync + 'static> From<http::Response<T>> for Respons
4749
HeaderLine::from(raw_header).into_header().unwrap()
4850
})
4951
.collect::<Vec<_>>(),
50-
reader: Box::new(Cursor::new(value.into_body())),
52+
reader: PendingReader::Reader(Box::new(Cursor::new(value.into_body()))),
5153
remote_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 80),
5254
local_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0),
5355
history: vec![],
@@ -278,6 +280,7 @@ impl From<Request> for http::request::Builder {
278280

279281
#[cfg(test)]
280282
mod tests {
283+
use std::io::Read;
281284
use crate::header::{add_header, get_header_raw, HeaderLine};
282285
use http_02 as http;
283286

@@ -376,7 +379,8 @@ mod tests {
376379

377380
let mut response = super::Response::new(200, "OK", "tbr").unwrap();
378381
// b'\xFF' as invalid UTF-8 character
379-
response.reader = Box::new(Cursor::new(vec![b'\xFF', 0xde, 0xad, 0xbe, 0xef]));
382+
response.reader =
383+
super::PendingReader::Reader(Box::new(Cursor::new(vec![b'\xFF', 0xde, 0xad, 0xbe, 0xef])));
380384
let http_response: Response<Vec<u8>> = response.into();
381385

382386
assert_eq!(

src/response.rs

+107-69
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,11 @@ enum BodyType {
4949
CloseDelimited,
5050
}
5151

52+
pub(crate) enum PendingReader {
53+
BeforeBodyStart,
54+
Reader(Box<dyn Read + Send + Sync + 'static>),
55+
}
56+
5257
/// Response instances are created as results of firing off requests.
5358
///
5459
/// The `Response` is used to read response headers and decide what to do with the body.
@@ -81,7 +86,7 @@ pub struct Response {
8186
pub(crate) index: ResponseStatusIndex,
8287
pub(crate) status: u16,
8388
pub(crate) headers: Vec<Header>,
84-
pub(crate) reader: Box<dyn Read + Send + Sync + 'static>,
89+
pub(crate) reader: PendingReader,
8590
/// The socket address of the server that sent the response.
8691
pub(crate) remote_addr: SocketAddr,
8792
/// The socket address of the client that sent the request.
@@ -282,7 +287,12 @@ impl Response {
282287
/// # }
283288
/// ```
284289
pub fn into_reader(self) -> Box<dyn Read + Send + Sync + 'static> {
285-
self.reader
290+
match self.reader {
291+
PendingReader::Reader(reader) => reader,
292+
_ => panic!(
293+
"It is not valid to call into_reader before Request::stream_to_reader is called"
294+
),
295+
}
286296
}
287297

288298
// Determine what to do with the connection after we've read the body.
@@ -548,39 +558,33 @@ impl Response {
548558
})
549559
}
550560

551-
/// Create a response from a Read trait impl.
552-
///
553-
/// This is hopefully useful for unit tests.
554-
///
555-
/// Example:
561+
/// Create a response from a DeadlineStream, reading and parsing only the status line, headers
562+
/// and its following CRLF.
556563
///
557-
/// use std::io::Cursor;
558-
///
559-
/// let text = "HTTP/1.1 401 Authorization Required\r\n\r\nPlease log in\n";
560-
/// let read = Cursor::new(text.to_string().into_bytes());
561-
/// let resp = ureq::Response::do_from_read(read);
562-
///
563-
/// assert_eq!(resp.status(), 401);
564-
pub(crate) fn do_from_stream(stream: Stream, unit: &Unit) -> Result<Response, Error> {
565-
let remote_addr = stream.remote_addr;
564+
/// Since this function only reads the status line, header and the following CRLF, the returned
565+
/// Response will have an empty reader and does not take ownership of DeadlineStream.
566+
/// To read the following data, the DeadlineStream can be read again after the call to this
567+
/// function.
568+
pub(crate) fn read_response_head(
569+
stream: &mut DeadlineStream,
570+
unit: &Unit,
571+
) -> Result<Response, Error> {
572+
let mut bytes_read = 0;
573+
let remote_addr = stream.inner_ref().remote_addr;
566574

567-
let local_addr = match stream.socket() {
575+
let local_addr = match stream.inner_ref().socket() {
568576
Some(sock) => sock.local_addr().map_err(Error::from)?,
569577
None => std::net::SocketAddrV4::new(std::net::Ipv4Addr::new(127, 0, 0, 1), 0).into(),
570578
};
571579

572-
//
573-
// HTTP/1.1 200 OK\r\n
574-
let mut stream = stream::DeadlineStream::new(stream, unit.deadline);
575-
576580
// The status line we can ignore non-utf8 chars and parse as_str_lossy().
577-
let status_line = read_next_line(&mut stream, "the status line")?.into_string_lossy();
581+
let status_line =
582+
read_next_line(stream, "the status line", &mut bytes_read)?.into_string_lossy();
578583
let (index, status) = parse_status_line(status_line.as_str())?;
579-
let http_version = &status_line.as_str()[0..index.http_version];
580584

581585
let mut headers: Vec<Header> = Vec::new();
582586
while headers.len() <= MAX_HEADER_COUNT {
583-
let line = read_next_line(&mut stream, "a header")?;
587+
let line = read_next_line(stream, "a header", &mut bytes_read)?;
584588
if line.is_empty() {
585589
break;
586590
}
@@ -595,23 +599,8 @@ impl Response {
595599
));
596600
}
597601

598-
let compression =
599-
get_header(&headers, "content-encoding").and_then(Compression::from_header_value);
600-
601-
let connection_option =
602-
Self::connection_option(http_version, get_header(&headers, "connection"));
603-
604-
let body_type = Self::body_type(&unit.method, status, http_version, &headers);
605-
606-
// remove Content-Encoding and length due to automatic decompression
607-
if compression.is_some() {
608-
headers.retain(|h| !h.is_name("content-encoding") && !h.is_name("content-length"));
609-
}
610-
611-
let reader =
612-
Self::stream_to_reader(stream, unit, body_type, compression, connection_option);
613-
614602
let url = unit.url.clone();
603+
let reader = PendingReader::BeforeBodyStart;
615604

616605
let response = Response {
617606
url,
@@ -627,6 +616,50 @@ impl Response {
627616
Ok(response)
628617
}
629618

619+
/// Create a Response from a DeadlineStream
620+
///
621+
/// Parses and comsumes the header from the stream and creates a Response with
622+
/// the stream as the reader. The response reader also uncompresses the body
623+
/// if it is compressed.
624+
pub(crate) fn do_from_stream(
625+
mut stream: DeadlineStream,
626+
unit: &Unit,
627+
) -> Result<Response, Error> {
628+
let mut response = Self::read_response_head(&mut stream, unit)?;
629+
630+
let compression = get_header(&response.headers, "content-encoding")
631+
.and_then(Compression::from_header_value);
632+
633+
let connection_option = Self::connection_option(
634+
response.http_version(),
635+
get_header(&response.headers, "connection"),
636+
);
637+
638+
let body_type = Self::body_type(
639+
&unit.method,
640+
response.status(),
641+
response.http_version(),
642+
&response.headers,
643+
);
644+
645+
// remove Content-Encoding and length due to automatic decompression
646+
if compression.is_some() {
647+
response
648+
.headers
649+
.retain(|h| !h.is_name("content-encoding") && !h.is_name("content-length"));
650+
}
651+
652+
response.reader = PendingReader::Reader(Self::stream_to_reader(
653+
stream,
654+
unit,
655+
body_type,
656+
compression,
657+
connection_option,
658+
));
659+
660+
Ok(response)
661+
}
662+
630663
#[cfg(test)]
631664
pub fn set_url(&mut self, url: Url) {
632665
self.url = url;
@@ -766,16 +799,23 @@ impl FromStr for Response {
766799
&request_reader,
767800
None,
768801
);
802+
let stream = stream::DeadlineStream::new(stream, unit.deadline);
769803
Self::do_from_stream(stream, &unit)
770804
}
771805
}
772806

773-
fn read_next_line(reader: &mut impl BufRead, context: &str) -> io::Result<HeaderLine> {
807+
fn read_next_line(
808+
reader: &mut impl BufRead,
809+
context: &str,
810+
running_total: &mut usize,
811+
) -> io::Result<HeaderLine> {
774812
let mut buf = Vec::new();
775813
let result = reader
776814
.take((MAX_HEADER_SIZE + 1) as u64)
777815
.read_until(b'\n', &mut buf);
778816

817+
*running_total += buf.len();
818+
779819
match result {
780820
Ok(0) => Err(io::Error::new(
781821
io::ErrorKind::ConnectionAborted,
@@ -1078,7 +1118,8 @@ mod tests {
10781118
const LEN: usize = MAX_HEADER_SIZE + 1;
10791119
let s = format!("Long-Header: {}\r\n", "A".repeat(LEN),);
10801120
let mut cursor = Cursor::new(s);
1081-
let result = read_next_line(&mut cursor, "some context");
1121+
let mut bytes_read = 0;
1122+
let result = read_next_line(&mut cursor, "some context", &mut bytes_read);
10821123
let err = result.expect_err("did not error on too-large header");
10831124
assert_eq!(err.kind(), io::ErrorKind::Other);
10841125
assert_eq!(
@@ -1117,9 +1158,9 @@ mod tests {
11171158
encoding_rs::WINDOWS_1252.encode("HTTP/1.1 302 Déplacé Temporairement\r\n");
11181159
let bytes = cow.to_vec();
11191160
let mut reader = io::BufReader::new(io::Cursor::new(bytes));
1120-
let r = read_next_line(&mut reader, "test status line");
1121-
let h = r.unwrap();
1122-
assert_eq!(h.to_string(), "HTTP/1.1 302 D�plac� Temporairement");
1161+
let mut bytes_read = 0;
1162+
let header = read_next_line(&mut reader, "test status line", &mut bytes_read).unwrap();
1163+
assert_eq!(header.to_string(), "HTTP/1.1 302 D�plac� Temporairement");
11231164
}
11241165

11251166
#[test]
@@ -1150,6 +1191,7 @@ mod tests {
11501191
&request_reader,
11511192
None,
11521193
);
1194+
let s = stream::DeadlineStream::new(s, unit.deadline);
11531195
let resp = Response::do_from_stream(s.into(), &unit).unwrap();
11541196
assert_eq!(resp.status(), 200);
11551197
assert_eq!(resp.header("x-geo-header"), None);
@@ -1204,18 +1246,16 @@ mod tests {
12041246
"1.1.1.1:4343".parse().unwrap(),
12051247
PoolReturner::new(&agent, PoolKey::from_parts("https", "example.com", 443)),
12061248
);
1207-
Response::do_from_stream(
1208-
stream,
1209-
&Unit::new(
1210-
&agent,
1211-
"GET",
1212-
&"https://example.com/".parse().unwrap(),
1213-
vec![],
1214-
&Payload::Empty.into_read(),
1215-
None,
1216-
),
1217-
)
1218-
.unwrap();
1249+
let unit = &Unit::new(
1250+
&agent,
1251+
"GET",
1252+
&"https://example.com/".parse().unwrap(),
1253+
vec![],
1254+
&Payload::Empty.into_read(),
1255+
None,
1256+
);
1257+
let stream = stream::DeadlineStream::new(stream, unit.deadline);
1258+
Response::do_from_stream(stream, unit).unwrap();
12191259
assert_eq!(agent2.state.pool.len(), 1);
12201260
}
12211261

@@ -1236,18 +1276,16 @@ mod tests {
12361276
"1.1.1.1:4343".parse().unwrap(),
12371277
PoolReturner::none(),
12381278
);
1239-
let resp = Response::do_from_stream(
1240-
stream,
1241-
&Unit::new(
1242-
&agent,
1243-
"GET",
1244-
&"https://example.com/".parse().unwrap(),
1245-
vec![],
1246-
&Payload::Empty.into_read(),
1247-
None,
1248-
),
1249-
)
1250-
.unwrap();
1279+
let unit = &Unit::new(
1280+
&agent,
1281+
"GET",
1282+
&"https://example.com/".parse().unwrap(),
1283+
vec![],
1284+
&Payload::Empty.into_read(),
1285+
None,
1286+
);
1287+
let stream = stream::DeadlineStream::new(stream, unit.deadline);
1288+
let resp = Response::do_from_stream(stream, unit).unwrap();
12511289
let body = resp.into_string().unwrap();
12521290
assert_eq!(body, "hi\n");
12531291
}

src/stream.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -455,8 +455,9 @@ pub(crate) fn connect_host(
455455
let s = stream.try_clone()?;
456456
let pool_key = PoolKey::from_parts(unit.url.scheme(), hostname, port);
457457
let pool_returner = PoolReturner::new(&unit.agent, pool_key);
458-
let s = Stream::new(s, remote_addr, pool_returner);
459-
let response = Response::do_from_stream(s, unit)?;
458+
let stream = Stream::new(s, remote_addr, pool_returner);
459+
let stream = DeadlineStream::new(stream, unit.deadline);
460+
let response = Response::do_from_stream(stream, unit)?;
460461
Proxy::verify_response(&response)?;
461462
}
462463
}

src/unit.rs

+17-1
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,13 @@ fn connect_inner(
261261
debug!("sending request {} {}", method, url);
262262
}
263263

264+
let mut expect_100_continue = false;
265+
for header in &unit.headers {
266+
if header.name() == "Expect" && header.value() == Some("100-continue") {
267+
expect_100_continue = true;
268+
}
269+
}
270+
264271
let send_result = send_prelude(unit, &mut stream);
265272

266273
if let Err(err) = send_result {
@@ -277,8 +284,17 @@ fn connect_inner(
277284
}
278285
let retryable = unit.is_retryable(&body);
279286

287+
let mut stream = stream::DeadlineStream::new(stream, unit.deadline);
288+
289+
if expect_100_continue {
290+
let response = Response::read_response_head(&mut stream, unit)?;
291+
if response.status() != 100 {
292+
return Err(Error::Status(response.status(), response));
293+
}
294+
}
295+
280296
// send the body (which can be empty now depending on redirects)
281-
body::send_body(body, unit.is_chunked, &mut stream)?;
297+
body::send_body(body, unit.is_chunked, stream.inner_mut())?;
282298

283299
// start reading the response to process cookies and redirects.
284300
let result = Response::do_from_stream(stream, unit);

0 commit comments

Comments
 (0)