Skip to content

Commit 50e6a13

Browse files
authored
Decouple ObjectStore from Reqwest (#7183)
* Create HttpClient * WIP * Decouple from reqwest * Hook up HttpConnector * WIP * Update other stores * Format * Remove out of date comment * RAT * Test fixes * Lints * Fix feature flags
1 parent a00f9f4 commit 50e6a13

26 files changed

+1334
-501
lines changed

object_store/Cargo.toml

+9-8
Original file line numberDiff line numberDiff line change
@@ -45,24 +45,28 @@ walkdir = { version = "2", optional = true }
4545

4646
# Cloud storage support
4747
base64 = { version = "0.22", default-features = false, features = ["std"], optional = true }
48+
form_urlencoded = { version = "1.2", optional = true }
49+
http = { version = "1.2.0", optional = true }
50+
http-body-util = { version = "0.1", optional = true }
51+
httparse = { version = "1.8.0", default-features = false, features = ["std"], optional = true }
4852
hyper = { version = "1.2", default-features = false, optional = true }
53+
md-5 = { version = "0.10.6", default-features = false, optional = true }
4954
quick-xml = { version = "0.37.0", features = ["serialize", "overlapped-lists"], optional = true }
50-
serde = { version = "1.0", default-features = false, features = ["derive"], optional = true }
51-
serde_json = { version = "1.0", default-features = false, optional = true }
5255
rand = { version = "0.8", default-features = false, features = ["std", "std_rng"], optional = true }
5356
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls-native-roots", "http2"], optional = true }
5457
ring = { version = "0.17", default-features = false, features = ["std"], optional = true }
5558
rustls-pemfile = { version = "2.0", default-features = false, features = ["std"], optional = true }
59+
serde = { version = "1.0", default-features = false, features = ["derive"], optional = true }
60+
serde_json = { version = "1.0", default-features = false, features = ["std"], optional = true }
61+
serde_urlencoded = { version = "0.7", optional = true }
5662
tokio = { version = "1.29.0", features = ["sync", "macros", "rt", "time", "io-util"] }
57-
md-5 = { version = "0.10.6", default-features = false, optional = true }
58-
httparse = { version = "1.8.0", default-features = false, features = ["std"], optional = true }
5963

6064
[target.'cfg(target_family="unix")'.dev-dependencies]
6165
nix = { version = "0.29.0", features = ["fs"] }
6266

6367
[features]
6468
default = ["fs"]
65-
cloud = ["serde", "serde_json", "quick-xml", "hyper", "reqwest", "reqwest/json", "reqwest/stream", "chrono/serde", "base64", "rand", "ring"]
69+
cloud = ["serde", "serde_json", "quick-xml", "hyper", "reqwest", "reqwest/stream", "chrono/serde", "base64", "rand", "ring", "dep:http", "http-body-util", "form_urlencoded", "serde_urlencoded"]
6670
azure = ["cloud", "httparse"]
6771
fs = ["walkdir"]
6872
gcp = ["cloud", "rustls-pemfile"]
@@ -72,16 +76,13 @@ tls-webpki-roots = ["reqwest?/rustls-tls-webpki-roots"]
7276
integration = []
7377

7478
[dev-dependencies] # In alphabetical order
75-
futures-test = "0.3"
7679
hyper = { version = "1.2", features = ["server"] }
7780
hyper-util = "0.1"
78-
http-body-util = "0.1"
7981
rand = "0.8"
8082
tempfile = "3.1.0"
8183
regex = "1.11.1"
8284
# The "gzip" feature for reqwest is enabled for an integration test.
8385
reqwest = { version = "0.12", features = ["gzip"] }
84-
http = "1.1.0"
8586

8687
[[test]]
8788
name = "get_range_file"

object_store/src/aws/builder.rs

+23-11
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use crate::aws::{
2323
AmazonS3, AwsCredential, AwsCredentialProvider, Checksum, S3ConditionalPut, S3CopyIfNotExists,
2424
STORE,
2525
};
26-
use crate::client::TokenCredentialProvider;
26+
use crate::client::{HttpConnector, ReqwestConnector, TokenCredentialProvider};
2727
use crate::config::ConfigValue;
2828
use crate::{ClientConfigKey, ClientOptions, Result, RetryConfig, StaticCredentialProvider};
2929
use base64::prelude::BASE64_STANDARD;
@@ -171,6 +171,8 @@ pub struct AmazonS3Builder {
171171
encryption_customer_key_base64: Option<String>,
172172
/// When set to true, charge requester for bucket operations
173173
request_payer: ConfigValue<bool>,
174+
/// The [`HttpConnector`] to use
175+
http_connector: Option<Arc<dyn HttpConnector>>,
174176
}
175177

176178
/// Configuration keys for [`AmazonS3Builder`]
@@ -882,13 +884,23 @@ impl AmazonS3Builder {
882884
self
883885
}
884886

887+
/// Overrides the [`HttpConnector`], by default uses [`ReqwestConnector`]
888+
pub fn with_http_connector<C: HttpConnector>(mut self, connector: C) -> Self {
889+
self.http_connector = Some(Arc::new(connector));
890+
self
891+
}
892+
885893
/// Create a [`AmazonS3`] instance from the provided values,
886894
/// consuming `self`.
887895
pub fn build(mut self) -> Result<AmazonS3> {
888896
if let Some(url) = self.url.take() {
889897
self.parse_url(&url)?;
890898
}
891899

900+
let http = self
901+
.http_connector
902+
.unwrap_or_else(|| Arc::new(ReqwestConnector::default()));
903+
892904
let bucket = self.bucket_name.ok_or(Error::MissingBucketName)?;
893905
let region = self.region.unwrap_or_else(|| "us-east-1".to_string());
894906
let checksum = self.checksum_algorithm.map(|x| x.get()).transpose()?;
@@ -925,11 +937,7 @@ impl AmazonS3Builder {
925937
let endpoint = format!("https://sts.{region}.amazonaws.com");
926938

927939
// Disallow non-HTTPs requests
928-
let client = self
929-
.client_options
930-
.clone()
931-
.with_allow_http(false)
932-
.client()?;
940+
let options = self.client_options.clone().with_allow_http(false);
933941

934942
let token = WebIdentityProvider {
935943
token_path,
@@ -940,16 +948,19 @@ impl AmazonS3Builder {
940948

941949
Arc::new(TokenCredentialProvider::new(
942950
token,
943-
client,
951+
http.connect(&options)?,
944952
self.retry_config.clone(),
945953
)) as _
946954
} else if let Some(uri) = self.container_credentials_relative_uri {
947955
info!("Using Task credential provider");
956+
957+
let options = self.client_options.clone().with_allow_http(true);
958+
948959
Arc::new(TaskCredentialProvider {
949960
url: format!("http://169.254.170.2{uri}"),
950961
retry: self.retry_config.clone(),
951962
// The instance metadata endpoint is access over HTTP
952-
client: self.client_options.clone().with_allow_http(true).client()?,
963+
client: http.connect(&options)?,
953964
cache: Default::default(),
954965
}) as _
955966
} else {
@@ -964,7 +975,7 @@ impl AmazonS3Builder {
964975

965976
Arc::new(TokenCredentialProvider::new(
966977
token,
967-
self.client_options.metadata_client()?,
978+
http.connect(&self.client_options.metadata_options())?,
968979
self.retry_config.clone(),
969980
)) as _
970981
};
@@ -986,7 +997,7 @@ impl AmazonS3Builder {
986997
region: region.clone(),
987998
credentials: Arc::clone(&credentials),
988999
},
989-
self.client_options.client()?,
1000+
http.connect(&self.client_options)?,
9901001
self.retry_config.clone(),
9911002
)
9921003
.with_min_ttl(Duration::from_secs(60)), // Credentials only valid for 5 minutes
@@ -1039,7 +1050,8 @@ impl AmazonS3Builder {
10391050
request_payer: self.request_payer.get()?,
10401051
};
10411052

1042-
let client = Arc::new(S3Client::new(config)?);
1053+
let http_client = http.connect(&config.client_options)?;
1054+
let client = Arc::new(S3Client::new(config, http_client));
10431055

10441056
Ok(AmazonS3 { client })
10451057
}

object_store/src/aws/client.rs

+35-29
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use crate::aws::{
2222
AwsAuthorizer, AwsCredentialProvider, S3ConditionalPut, S3CopyIfNotExists, COPY_SOURCE_HEADER,
2323
STORE, STRICT_PATH_ENCODE_SET, TAGS_HEADER,
2424
};
25+
use crate::client::builder::{HttpRequestBuilder, RequestBuilderError};
2526
use crate::client::get::GetClient;
2627
use crate::client::header::{get_etag, HeaderConfig};
2728
use crate::client::header::{get_put_result, get_version};
@@ -31,7 +32,7 @@ use crate::client::s3::{
3132
CompleteMultipartUpload, CompleteMultipartUploadResult, CopyPartResult,
3233
InitiateMultipartUploadResult, ListResponse, PartMetadata,
3334
};
34-
use crate::client::GetOptionsExt;
35+
use crate::client::{GetOptionsExt, HttpClient, HttpError, HttpResponse};
3536
use crate::multipart::PartId;
3637
use crate::path::DELIMITER;
3738
use crate::{
@@ -42,17 +43,15 @@ use async_trait::async_trait;
4243
use base64::prelude::BASE64_STANDARD;
4344
use base64::Engine;
4445
use bytes::{Buf, Bytes};
45-
use hyper::header::{
46+
use http::header::{
4647
CACHE_CONTROL, CONTENT_DISPOSITION, CONTENT_ENCODING, CONTENT_LANGUAGE, CONTENT_LENGTH,
4748
CONTENT_TYPE,
4849
};
49-
use hyper::http::HeaderName;
50-
use hyper::{http, HeaderMap};
50+
use http::{HeaderMap, HeaderName, Method};
5151
use itertools::Itertools;
5252
use md5::{Digest, Md5};
5353
use percent_encoding::{utf8_percent_encode, PercentEncode};
5454
use quick_xml::events::{self as xml_events};
55-
use reqwest::{Client as ReqwestClient, Method, RequestBuilder, Response};
5655
use ring::digest;
5756
use ring::digest::Context;
5857
use serde::{Deserialize, Serialize};
@@ -67,7 +66,9 @@ const ALGORITHM: &str = "x-amz-checksum-algorithm";
6766
#[derive(Debug, thiserror::Error)]
6867
pub(crate) enum Error {
6968
#[error("Error performing DeleteObjects request: {}", source)]
70-
DeleteObjectsRequest { source: crate::client::retry::Error },
69+
DeleteObjectsRequest {
70+
source: crate::client::retry::RetryError,
71+
},
7172

7273
#[error(
7374
"DeleteObjects request failed for key {}: {} (code: {})",
@@ -82,30 +83,32 @@ pub(crate) enum Error {
8283
},
8384

8485
#[error("Error getting DeleteObjects response body: {}", source)]
85-
DeleteObjectsResponse { source: reqwest::Error },
86+
DeleteObjectsResponse { source: HttpError },
8687

8788
#[error("Got invalid DeleteObjects response: {}", source)]
8889
InvalidDeleteObjectsResponse {
8990
source: Box<dyn std::error::Error + Send + Sync + 'static>,
9091
},
9192

9293
#[error("Error performing list request: {}", source)]
93-
ListRequest { source: crate::client::retry::Error },
94+
ListRequest {
95+
source: crate::client::retry::RetryError,
96+
},
9497

9598
#[error("Error getting list response body: {}", source)]
96-
ListResponseBody { source: reqwest::Error },
99+
ListResponseBody { source: HttpError },
97100

98101
#[error("Error getting create multipart response body: {}", source)]
99-
CreateMultipartResponseBody { source: reqwest::Error },
102+
CreateMultipartResponseBody { source: HttpError },
100103

101104
#[error("Error performing complete multipart request: {}: {}", path, source)]
102105
CompleteMultipartRequest {
103-
source: crate::client::retry::Error,
106+
source: crate::client::retry::RetryError,
104107
path: String,
105108
},
106109

107110
#[error("Error getting complete multipart response body: {}", source)]
108-
CompleteMultipartResponseBody { source: reqwest::Error },
111+
CompleteMultipartResponseBody { source: HttpError },
109112

110113
#[error("Got invalid list response: {}", source)]
111114
InvalidListResponse { source: quick_xml::de::DeError },
@@ -272,7 +275,7 @@ pub enum RequestError {
272275

273276
#[error("Retry")]
274277
Retry {
275-
source: crate::client::retry::Error,
278+
source: crate::client::retry::RetryError,
276279
path: String,
277280
},
278281
}
@@ -290,7 +293,7 @@ impl From<RequestError> for crate::Error {
290293
pub(crate) struct Request<'a> {
291294
path: &'a Path,
292295
config: &'a S3Config,
293-
builder: RequestBuilder,
296+
builder: HttpRequestBuilder,
294297
payload_sha256: Option<digest::Digest>,
295298
payload: Option<PutPayload>,
296299
use_session_creds: bool,
@@ -307,8 +310,8 @@ impl Request<'_> {
307310

308311
pub(crate) fn header<K>(self, k: K, v: &str) -> Self
309312
where
310-
HeaderName: TryFrom<K>,
311-
<HeaderName as TryFrom<K>>::Error: Into<http::Error>,
313+
K: TryInto<HeaderName>,
314+
K::Error: Into<RequestBuilderError>,
312315
{
313316
let builder = self.builder.header(k, v);
314317
Self { builder, ..self }
@@ -408,7 +411,7 @@ impl Request<'_> {
408411
self
409412
}
410413

411-
pub(crate) async fn send(self) -> Result<Response, RequestError> {
414+
pub(crate) async fn send(self) -> Result<HttpResponse, RequestError> {
412415
let credential = match self.use_session_creds {
413416
true => self.config.get_session_credential().await?,
414417
false => SessionCredential {
@@ -446,13 +449,12 @@ impl Request<'_> {
446449
#[derive(Debug)]
447450
pub(crate) struct S3Client {
448451
pub config: S3Config,
449-
pub client: ReqwestClient,
452+
pub client: HttpClient,
450453
}
451454

452455
impl S3Client {
453-
pub(crate) fn new(config: S3Config) -> Result<Self> {
454-
let client = config.client_options.client()?;
455-
Ok(Self { config, client })
456+
pub(crate) fn new(config: S3Config, client: HttpClient) -> Self {
457+
Self { config, client }
456458
}
457459

458460
pub(crate) fn request<'a>(&'a self, method: Method, path: &'a Path) -> Request<'a> {
@@ -544,6 +546,7 @@ impl S3Client {
544546
.send_retry(&self.config.retry_config)
545547
.await
546548
.map_err(|source| Error::DeleteObjectsRequest { source })?
549+
.into_body()
547550
.bytes()
548551
.await
549552
.map_err(|source| Error::DeleteObjectsResponse { source })?;
@@ -641,6 +644,7 @@ impl S3Client {
641644
.idempotent(true)
642645
.send()
643646
.await?
647+
.into_body()
644648
.bytes()
645649
.await
646650
.map_err(|source| Error::CreateMultipartResponseBody { source })?;
@@ -683,17 +687,17 @@ impl S3Client {
683687
// If SSE-C is used, we must include the encryption headers in every upload request.
684688
request = request.with_encryption_headers();
685689
}
686-
let response = request.send().await?;
687-
let checksum_sha256 = response
688-
.headers()
690+
let (parts, body) = request.send().await?.into_parts();
691+
let checksum_sha256 = parts
692+
.headers
689693
.get(SHA256_CHECKSUM)
690694
.and_then(|v| v.to_str().ok())
691695
.map(|v| v.to_string());
692696

693697
let e_tag = match is_copy {
694-
false => get_etag(response.headers()).map_err(|source| Error::Metadata { source })?,
698+
false => get_etag(&parts.headers).map_err(|source| Error::Metadata { source })?,
695699
true => {
696-
let response = response
700+
let response = body
697701
.bytes()
698702
.await
699703
.map_err(|source| Error::CreateMultipartResponseBody { source })?;
@@ -756,7 +760,7 @@ impl S3Client {
756760

757761
let request = self
758762
.client
759-
.request(Method::POST, url)
763+
.post(url)
760764
.query(&[("uploadId", upload_id)])
761765
.body(body)
762766
.with_aws_sigv4(credential.authorizer(), None);
@@ -781,6 +785,7 @@ impl S3Client {
781785
.map_err(|source| Error::Metadata { source })?;
782786

783787
let data = response
788+
.into_body()
784789
.bytes()
785790
.await
786791
.map_err(|source| Error::CompleteMultipartResponseBody { source })?;
@@ -795,7 +800,7 @@ impl S3Client {
795800
}
796801

797802
#[cfg(test)]
798-
pub(crate) async fn get_object_tagging(&self, path: &Path) -> Result<Response> {
803+
pub(crate) async fn get_object_tagging(&self, path: &Path) -> Result<HttpResponse> {
799804
let credential = self.config.get_session_credential().await?;
800805
let url = format!("{}?tagging", self.config.path_url(path));
801806
let response = self
@@ -821,7 +826,7 @@ impl GetClient for S3Client {
821826
};
822827

823828
/// Make an S3 GET request <https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html>
824-
async fn get_request(&self, path: &Path, options: GetOptions) -> Result<Response> {
829+
async fn get_request(&self, path: &Path, options: GetOptions) -> Result<HttpResponse> {
825830
let credential = self.config.get_session_credential().await?;
826831
let url = self.config.path_url(path);
827832
let method = match options.head {
@@ -895,6 +900,7 @@ impl ListClient for Arc<S3Client> {
895900
.send_retry(&self.config.retry_config)
896901
.await
897902
.map_err(|source| Error::ListRequest { source })?
903+
.into_body()
898904
.bytes()
899905
.await
900906
.map_err(|source| Error::ListResponseBody { source })?;

0 commit comments

Comments
 (0)