Skip to content

Commit

Permalink
g3-daemon: use vectored IO in register
Browse files Browse the repository at this point in the history
  • Loading branch information
zh-jq-b committed Mar 4, 2025
1 parent 7b76553 commit e48c6fd
Showing 1 changed file with 21 additions and 12 deletions.
33 changes: 21 additions & 12 deletions lib/g3-daemon/src/register/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@
* limitations under the License.
*/

use std::io::{self, IoSlice};
use std::sync::Arc;

use anyhow::anyhow;
use anyhow::{Context, anyhow};
use http::{Method, StatusCode};
use serde_json::{Map, Value};
use tokio::io::BufStream;
use tokio::io::{AsyncWriteExt, BufStream};
use tokio::net::TcpStream;

use g3_http::HttpBodyReader;
Expand Down Expand Up @@ -58,24 +59,26 @@ impl RegisterTask {
}

let body = Value::Object(content).to_string();
let data = format!(
let header = format!(
"POST {} HTTP/1.1\r\n\
Host: {}\r\n\
Content-Type: application/json\r\n\
Content-Length: {}\r\n\
Connection: Keep-Alive\r\n\
\r\n{body}",
\r\n",
self.config.register_path,
self.config.upstream.host(),
body.len()
);

self.write_request(data.as_bytes()).await?;
self.write_request(header.as_bytes(), Some(body.as_bytes()))
.await
.context("failed to send request")?;
self.check_response(Method::POST).await
}

pub async fn ping_until_end(&mut self) -> anyhow::Result<()> {
let data = format!(
let header = format!(
"GET {} HTTP/1.1\r\n\
Host: {}\r\n\
Content-Length: 0\r\n\
Expand All @@ -89,7 +92,9 @@ impl RegisterTask {
loop {
tokio::select! {
_ = interval.tick() => {
self.write_request(data.as_bytes()).await?;
self.write_request(header.as_bytes(), None)
.await
.context("failed to send request")?;
self.check_response(Method::GET).await?;
}
_ = self.stream.fill_wait_data() => {
Expand All @@ -99,11 +104,15 @@ impl RegisterTask {
}
}

async fn write_request(&mut self, data: &[u8]) -> anyhow::Result<()> {
self.stream
.write_all_flush(data)
.await
.map_err(|e| anyhow!("failed to write data: {e:?}"))
async fn write_request(&mut self, header: &[u8], body: Option<&[u8]>) -> io::Result<()> {
if let Some(body) = body {
self.stream
.write_all_vectored([IoSlice::new(header), IoSlice::new(body)])
.await?;
self.stream.flush().await
} else {
self.stream.write_all_flush(header).await
}
}

async fn check_response(&mut self, method: Method) -> anyhow::Result<()> {
Expand Down

0 comments on commit e48c6fd

Please sign in to comment.