From 90dff553d4f262c63e0793ccaab5d211c8248873 Mon Sep 17 00:00:00 2001 From: paulobressan Date: Tue, 5 Mar 2024 11:44:57 -0300 Subject: [PATCH 1/2] fix(proxy): adjusted proxy events duplex --- .gitignore | 1 + proxy/src/proxy.rs | 25 +++++++++++++++++++++---- 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/.gitignore b/.gitignore index ea8c4bf..e6aabb4 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ /target +proxy.socket \ No newline at end of file diff --git a/proxy/src/proxy.rs b/proxy/src/proxy.rs index 43743fe..bc90247 100644 --- a/proxy/src/proxy.rs +++ b/proxy/src/proxy.rs @@ -47,18 +47,35 @@ impl ProxyApp { ) { let mut upstream_buf = [0; 1024]; let mut downstream_buf = [0; 1024]; + loop { let downstream_read = server_session.read(&mut upstream_buf); let upstream_read = client_session.read(&mut downstream_buf); let event: DuplexEvent; select! { - n = downstream_read => event - = DuplexEvent::DownstreamRead(n.unwrap()), - n = upstream_read => event - = DuplexEvent::UpstreamRead(n.unwrap()), + n = downstream_read => { + if let Err(err) = &n { + error!(error=err.to_string(), "Downstream error"); + return; + } + event = DuplexEvent::DownstreamRead(n.unwrap()) + }, + n = upstream_read => { + if let Err(err) = &n { + error!(error=err.to_string(), "Upstream error"); + return; + } + event = DuplexEvent::UpstreamRead(n.unwrap()) + }, } match event { + DuplexEvent::DownstreamRead(0) => { + return; + } + DuplexEvent::UpstreamRead(0) => { + return; + } DuplexEvent::DownstreamRead(n) => { state.metrics.count_total_packages_bytes(&consumer, n); From a0968081a81b543f863726ee95cce58904505ef4 Mon Sep 17 00:00:00 2001 From: paulobressan Date: Tue, 5 Mar 2024 18:45:17 -0300 Subject: [PATCH 2/2] feat(proxy): improved metrics information --- proxy/src/config.rs | 2 + proxy/src/main.rs | 14 +++-- proxy/src/proxy.rs | 128 +++++++++++++++++++++----------------------- 3 files changed, 73 insertions(+), 71 deletions(-) diff --git a/proxy/src/config.rs b/proxy/src/config.rs index 7f679c2..5d774db 100644 --- a/proxy/src/config.rs +++ b/proxy/src/config.rs @@ -3,6 +3,7 @@ use std::env; #[derive(Debug, Clone)] pub struct Config { pub proxy_addr: String, + pub proxy_namespace: String, pub prometheus_addr: String, pub ssl_crt_path: String, pub ssl_key_path: String, @@ -13,6 +14,7 @@ impl Config { pub fn new() -> Self { Self { proxy_addr: env::var("PROXY_ADDR").expect("PROXY_ADDR must be set"), + proxy_namespace: env::var("PROXY_NAMESPACE").expect("PROXY_NAMESPACE must be set"), prometheus_addr: env::var("PROMETHEUS_ADDR").expect("PROMETHEUS_ADDR must be set"), ssl_crt_path: env::var("SSL_CRT_PATH").expect("SSL_CRT_PATH must be set"), ssl_key_path: env::var("SSL_KEY_PATH").expect("SSL_KEY_PATH must be set"), diff --git a/proxy/src/main.rs b/proxy/src/main.rs index 14b4483..46b5bed 100644 --- a/proxy/src/main.rs +++ b/proxy/src/main.rs @@ -102,7 +102,7 @@ impl Metrics { pub fn new() -> Self { let total_packages_bytes = register_int_counter_vec!( opts!("node_proxy_total_packages_bytes", "Total bytes transferred",), - &["consumer"] + &["consumer", "namespace", "instance"] ) .unwrap(); @@ -111,9 +111,17 @@ impl Metrics { } } - pub fn count_total_packages_bytes(&self, consumer: &Consumer, value: usize) { + pub fn count_total_packages_bytes( + &self, + consumer: &Consumer, + namespace: &str, + instance: &str, + value: usize, + ) { + let consumer = &consumer.to_string(); + self.total_packages_bytes - .with_label_values(&[&consumer.to_string()]) + .with_label_values(&[consumer, namespace, instance]) .inc_by(value as u64) } } diff --git a/proxy/src/proxy.rs b/proxy/src/proxy.rs index bc90247..7eb5045 100644 --- a/proxy/src/proxy.rs +++ b/proxy/src/proxy.rs @@ -14,7 +14,7 @@ use tokio::{ }; use tracing::error; -use crate::{config::Config, Consumer, State}; +use crate::{config::Config, State}; pub struct ProxyApp { client_connector: TransportConnector, @@ -37,75 +37,18 @@ impl ProxyApp { state, } } - - async fn duplex( - &self, - state: State, - consumer: Consumer, - mut server_session: Stream, - mut client_session: Stream, - ) { - let mut upstream_buf = [0; 1024]; - let mut downstream_buf = [0; 1024]; - - loop { - let downstream_read = server_session.read(&mut upstream_buf); - let upstream_read = client_session.read(&mut downstream_buf); - let event: DuplexEvent; - select! { - n = downstream_read => { - if let Err(err) = &n { - error!(error=err.to_string(), "Downstream error"); - return; - } - event = DuplexEvent::DownstreamRead(n.unwrap()) - }, - n = upstream_read => { - if let Err(err) = &n { - error!(error=err.to_string(), "Upstream error"); - return; - } - event = DuplexEvent::UpstreamRead(n.unwrap()) - }, - } - - match event { - DuplexEvent::DownstreamRead(0) => { - return; - } - DuplexEvent::UpstreamRead(0) => { - return; - } - DuplexEvent::DownstreamRead(n) => { - state.metrics.count_total_packages_bytes(&consumer, n); - - client_session.write_all(&upstream_buf[0..n]).await.unwrap(); - client_session.flush().await.unwrap(); - } - DuplexEvent::UpstreamRead(n) => { - state.metrics.count_total_packages_bytes(&consumer, n); - - server_session - .write_all(&downstream_buf[0..n]) - .await - .unwrap(); - server_session.flush().await.unwrap(); - } - } - } - } } #[async_trait] impl ServerApp for ProxyApp { async fn process_new( self: &Arc, - io: Stream, + mut io_server: Stream, _shutdown: &ShutdownWatch, ) -> Option { let state = self.state.read().await.clone(); - let hostname = io.get_ssl()?.servername(NameType::HOST_NAME); + let hostname = io_server.get_ssl()?.servername(NameType::HOST_NAME); if hostname.is_none() { error!("hostname is not present in the certificate"); return None; @@ -121,15 +64,16 @@ impl ServerApp for ProxyApp { let token = captures.get(1)?.as_str().to_string(); let network = captures.get(2)?.as_str().to_string(); let version = captures.get(3)?.as_str().to_string(); - + let namespace = self.config.proxy_namespace.clone(); + let consumer = state.get_consumer(&network, &version, &token)?; - let node_host = format!( + let instance = format!( "node-{network}-{version}.{}:{}", self.config.node_dns, self.config.node_port ); - let lookup_result = lookup_host(node_host).await; + let lookup_result = lookup_host(&instance).await; if let Err(err) = lookup_result { error!(error = err.to_string(), "fail to lookup ip"); return None; @@ -139,12 +83,60 @@ impl ServerApp for ProxyApp { let proxy_to = BasicPeer::new(&node_addr.to_string()); - let client_session = self.client_connector.new_stream(&proxy_to).await; + let io_client = self.client_connector.new_stream(&proxy_to).await; + + match io_client { + Ok(mut io_client) => { + let mut upstream_buf = [0; 1024]; + let mut downstream_buf = [0; 1024]; + + loop { + let downstream_read = io_server.read(&mut upstream_buf); + let upstream_read = io_client.read(&mut downstream_buf); + let event: DuplexEvent; + + select! { + n = downstream_read => { + if let Err(err) = &n { + error!(error=err.to_string(), "Downstream error"); + return None; + } + event = DuplexEvent::DownstreamRead(n.unwrap()) + }, + n = upstream_read => { + if let Err(err) = &n { + error!(error=err.to_string(), "Upstream error"); + return None; + } + event = DuplexEvent::UpstreamRead(n.unwrap()) + }, + } - match client_session { - Ok(client_session) => { - self.duplex(state, consumer, io, client_session).await; - None + match event { + DuplexEvent::DownstreamRead(0) => { + return None; + } + DuplexEvent::UpstreamRead(0) => { + return None; + } + DuplexEvent::DownstreamRead(n) => { + state + .metrics + .count_total_packages_bytes(&consumer, &namespace, &instance, n); + + io_client.write_all(&upstream_buf[0..n]).await.unwrap(); + io_client.flush().await.unwrap(); + } + DuplexEvent::UpstreamRead(n) => { + state + .metrics + .count_total_packages_bytes(&consumer, &namespace, &instance, n); + + io_server.write_all(&downstream_buf[0..n]).await.unwrap(); + io_server.flush().await.unwrap(); + } + } + } } Err(e) => { error!("failed to create client session: {}", e);