diff --git a/lib/g3-daemon/src/register/task.rs b/lib/g3-daemon/src/register/task.rs index 8b01ae38..f495602c 100644 --- a/lib/g3-daemon/src/register/task.rs +++ b/lib/g3-daemon/src/register/task.rs @@ -14,12 +14,13 @@ * limitations under the License. */ +use std::io::{self, IoSlice}; use std::sync::Arc; -use anyhow::anyhow; +use anyhow::{Context, anyhow}; use http::{Method, StatusCode}; use serde_json::{Map, Value}; -use tokio::io::BufStream; +use tokio::io::{AsyncWriteExt, BufStream}; use tokio::net::TcpStream; use g3_http::HttpBodyReader; @@ -58,24 +59,26 @@ impl RegisterTask { } let body = Value::Object(content).to_string(); - let data = format!( + let header = format!( "POST {} HTTP/1.1\r\n\ Host: {}\r\n\ Content-Type: application/json\r\n\ Content-Length: {}\r\n\ Connection: Keep-Alive\r\n\ - \r\n{body}", + \r\n", self.config.register_path, self.config.upstream.host(), body.len() ); - self.write_request(data.as_bytes()).await?; + self.write_request(header.as_bytes(), Some(body.as_bytes())) + .await + .context("failed to send request")?; self.check_response(Method::POST).await } pub async fn ping_until_end(&mut self) -> anyhow::Result<()> { - let data = format!( + let header = format!( "GET {} HTTP/1.1\r\n\ Host: {}\r\n\ Content-Length: 0\r\n\ @@ -89,7 +92,9 @@ impl RegisterTask { loop { tokio::select! { _ = interval.tick() => { - self.write_request(data.as_bytes()).await?; + self.write_request(header.as_bytes(), None) + .await + .context("failed to send request")?; self.check_response(Method::GET).await?; } _ = self.stream.fill_wait_data() => { @@ -99,11 +104,15 @@ impl RegisterTask { } } - async fn write_request(&mut self, data: &[u8]) -> anyhow::Result<()> { - self.stream - .write_all_flush(data) - .await - .map_err(|e| anyhow!("failed to write data: {e:?}")) + async fn write_request(&mut self, header: &[u8], body: Option<&[u8]>) -> io::Result<()> { + if let Some(body) = body { + self.stream + .write_all_vectored([IoSlice::new(header), IoSlice::new(body)]) + .await?; + self.stream.flush().await + } else { + self.stream.write_all_flush(header).await + } } async fn check_response(&mut self, method: Method) -> anyhow::Result<()> {