Skip to content

Commit

Permalink
g3-daemon: use vectored IO in register
Browse files Browse the repository at this point in the history
  • Loading branch information
zh-jq-b committed Mar 4, 2025
1 parent 7b76553 commit 194d484
Showing 1 changed file with 25 additions and 11 deletions.
36 changes: 25 additions & 11 deletions lib/g3-daemon/src/register/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@
* limitations under the License.
*/

use std::io::IoSlice;
use std::sync::Arc;

use anyhow::anyhow;
use http::{Method, StatusCode};
use serde_json::{Map, Value};
use tokio::io::BufStream;
use tokio::io::{AsyncWriteExt, BufStream};
use tokio::net::TcpStream;

use g3_http::HttpBodyReader;
Expand Down Expand Up @@ -58,24 +59,25 @@ impl RegisterTask {
}

let body = Value::Object(content).to_string();
let data = format!(
let header = format!(

Check warning on line 62 in lib/g3-daemon/src/register/task.rs

View check run for this annotation

Codecov / codecov/patch

lib/g3-daemon/src/register/task.rs#L62

Added line #L62 was not covered by tests
"POST {} HTTP/1.1\r\n\
Host: {}\r\n\
Content-Type: application/json\r\n\
Content-Length: {}\r\n\
Connection: Keep-Alive\r\n\
\r\n{body}",
\r\n",

Check warning on line 68 in lib/g3-daemon/src/register/task.rs

View check run for this annotation

Codecov / codecov/patch

lib/g3-daemon/src/register/task.rs#L68

Added line #L68 was not covered by tests
self.config.register_path,
self.config.upstream.host(),
body.len()
);

self.write_request(data.as_bytes()).await?;
self.write_request(header.as_bytes(), Some(body.as_bytes()))
.await?;

Check warning on line 75 in lib/g3-daemon/src/register/task.rs

View check run for this annotation

Codecov / codecov/patch

lib/g3-daemon/src/register/task.rs#L74-L75

Added lines #L74 - L75 were not covered by tests
self.check_response(Method::POST).await
}

pub async fn ping_until_end(&mut self) -> anyhow::Result<()> {
let data = format!(
let header = format!(

Check warning on line 80 in lib/g3-daemon/src/register/task.rs

View check run for this annotation

Codecov / codecov/patch

lib/g3-daemon/src/register/task.rs#L80

Added line #L80 was not covered by tests
"GET {} HTTP/1.1\r\n\
Host: {}\r\n\
Content-Length: 0\r\n\
Expand All @@ -89,7 +91,7 @@ impl RegisterTask {
loop {
tokio::select! {
_ = interval.tick() => {
self.write_request(data.as_bytes()).await?;
self.write_request(header.as_bytes(), None).await?;

Check warning on line 94 in lib/g3-daemon/src/register/task.rs

View check run for this annotation

Codecov / codecov/patch

lib/g3-daemon/src/register/task.rs#L94

Added line #L94 was not covered by tests
self.check_response(Method::GET).await?;
}
_ = self.stream.fill_wait_data() => {
Expand All @@ -99,11 +101,23 @@ impl RegisterTask {
}
}

async fn write_request(&mut self, data: &[u8]) -> anyhow::Result<()> {
self.stream
.write_all_flush(data)
.await
.map_err(|e| anyhow!("failed to write data: {e:?}"))
async fn write_request(&mut self, header: &[u8], body: Option<&[u8]>) -> anyhow::Result<()> {
if let Some(body) = body {
let slices = [IoSlice::new(header), IoSlice::new(body)];
self.stream
.write_all_vectored(slices)
.await
.map_err(|e| anyhow!("failed to write request: {e:?}"))?;
self.stream
.flush()
.await
.map_err(|e| anyhow!("failed to write request: {e:?}"))

Check warning on line 114 in lib/g3-daemon/src/register/task.rs

View check run for this annotation

Codecov / codecov/patch

lib/g3-daemon/src/register/task.rs#L104-L114

Added lines #L104 - L114 were not covered by tests
} else {
self.stream
.write_all_flush(header)
.await
.map_err(|e| anyhow!("failed to write request: {e:?}"))

Check warning on line 119 in lib/g3-daemon/src/register/task.rs

View check run for this annotation

Codecov / codecov/patch

lib/g3-daemon/src/register/task.rs#L116-L119

Added lines #L116 - L119 were not covered by tests
}
}

async fn check_response(&mut self, method: Method) -> anyhow::Result<()> {
Expand Down

0 comments on commit 194d484

Please sign in to comment.