Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pingora tcp proxy #12

Merged
merged 4 commits into from
Mar 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
956 changes: 802 additions & 154 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dockerfile.operator
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ FROM rust:1.74-slim-buster as build
WORKDIR /app

RUN apt update
RUN apt install -y pkg-config libssl-dev
RUN apt install -y build-essential pkg-config libssl-dev cmake

COPY ./Cargo.toml ./Cargo.toml
COPY ./operator ./operator
Expand Down
2 changes: 1 addition & 1 deletion docker/dockerfile.proxy
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ FROM rust:1.74-slim-buster as build
WORKDIR /app

RUN apt update
RUN apt install -y pkg-config libssl-dev
RUN apt install -y build-essential pkg-config libssl-dev cmake

COPY ./Cargo.toml ./Cargo.toml
COPY ./operator ./operator
Expand Down
4 changes: 2 additions & 2 deletions operator/yaml/port.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@ metadata:
name: mainnet-user
namespace: prj-mainnet-test
spec:
network: "mainnet"
version: "stable"
network: "preview"
version: "v1"
throughputTier: "1"
8 changes: 3 additions & 5 deletions proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,12 @@ edition = "2021"

[dependencies]
operator = { path = "../operator" }
futures = "0.3.30"
native-tls = "0.2.11"
rustls = "0.22.2"
rustls-pemfile = "2.1.0"
tokio = { version = "1.36.0", features = ["full"] }
tokio-rustls = "0.25.0"
regex = "1.10.3"
dotenv = "0.15.0"
tracing = "0.1.40"
tracing-subscriber = "0.3.18"
futures-util = "0.3.30"
pingora = "0.1.0"
prometheus = "0.13.3"
async-trait = "0.1.77"
4 changes: 4 additions & 0 deletions proxy/examples/manifest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ spec:
env:
- name: PROXY_ADDR
value: "0.0.0.0:80"
- name: PROMETHEUS_ADDR
value: "0.0.0.0:9187"
- name: NODE_PORT
value: "80"
- name: NODE_DNS
Expand Down Expand Up @@ -365,6 +367,7 @@ metadata:
spec:
network: "mainnet"
version: "stable"
throughputTier: "1"
---
# Cardano Node Port 2
apiVersion: demeter.run/v1alpha1
Expand All @@ -375,3 +378,4 @@ metadata:
spec:
network: "mainnet"
version: "stable"
throughputTier: "1"
87 changes: 51 additions & 36 deletions proxy/src/auth.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{collections::HashMap, sync::Arc};

use async_trait::async_trait;
use futures_util::TryStreamExt;
use operator::{
kube::{
Expand All @@ -12,57 +13,71 @@ use operator::{
},
CardanoNodePort,
};
use pingora::{server::ShutdownWatch, services::background::BackgroundService};
use tokio::{pin, sync::RwLock};
use tracing::error;

use crate::{Consumer, State};

pub async fn start(state: Arc<RwLock<State>>) {
let client = Client::try_default()
.await
.expect("failed to create kube client");

let api = Api::<CardanoNodePort>::all(client.clone());
update_auth(state.clone(), api.clone()).await;

let stream = watcher::watcher(api.clone(), Config::default()).touched_objects();
pin!(stream);
#[derive(Debug)]
pub struct AuthBackgroundService {
state: Arc<RwLock<State>>,
}
impl AuthBackgroundService {
pub fn new(state: Arc<RwLock<State>>) -> Self {
Self { state }
}

loop {
let result = stream.try_next().await;
async fn update_auth(&self, api: Api<CardanoNodePort>) {
let result = api.list(&ListParams::default()).await;
if let Err(err) = result {
error!(error = err.to_string(), "fail crd auth watcher");
continue;
error!(
error = err.to_string(),
"error to get crds while updating auth keys"
);
return;
}

update_auth(state.clone(), api.clone()).await;
let mut consumers = HashMap::new();
for crd in result.unwrap().items.iter() {
if crd.status.is_some() {
let network = crd.spec.network.to_string();
let version = crd.spec.version.clone();
let auth_token = crd.status.as_ref().unwrap().auth_token.clone();
let namespace = crd.metadata.namespace.as_ref().unwrap().clone();
let port_name = crd.name_any();

let hash_key = format!("{}.{}.{}", network, version, auth_token);
let consumer = Consumer::new(namespace, port_name);

consumers.insert(hash_key, consumer);
}
}
self.state.write().await.consumers = consumers;
}
}

async fn update_auth(state: Arc<RwLock<State>>, api: Api<CardanoNodePort>) {
let result = api.list(&ListParams::default()).await;
if let Err(err) = result {
error!(
error = err.to_string(),
"error to get crds while updating auth keys"
);
return;
}
#[async_trait]
impl BackgroundService for AuthBackgroundService {
async fn start(&self, mut _shutdown: ShutdownWatch) {
let client = Client::try_default()
.await
.expect("failed to create kube client");

let api = Api::<CardanoNodePort>::all(client.clone());
self.update_auth(api.clone()).await;

let mut consumers = HashMap::new();
for crd in result.unwrap().items.iter() {
if crd.status.is_some() {
let network = crd.spec.network.to_string();
let version = crd.spec.version.clone();
let auth_token = crd.status.as_ref().unwrap().auth_token.clone();
let namespace = crd.metadata.namespace.as_ref().unwrap().clone();
let port_name = crd.name_any();
let stream = watcher::watcher(api.clone(), Config::default()).touched_objects();
pin!(stream);

let hash_key = format!("{}.{}.{}", network, version, auth_token);
let consumer = Consumer::new(namespace, port_name);
loop {
let result = stream.try_next().await;
if let Err(err) = result {
error!(error = err.to_string(), "fail crd auth watcher");
continue;
}

consumers.insert(hash_key, consumer);
self.update_auth(api.clone()).await;
}
}
state.write().await.consumers = consumers;
}
22 changes: 12 additions & 10 deletions proxy/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,21 @@
use std::{env, path::PathBuf};
use std::env;

#[derive(Debug, Clone)]
pub struct Config {
pub proxy_addr: String,
pub ssl_crt_path: PathBuf,
pub ssl_key_path: PathBuf,
pub prometheus_addr: String,
pub ssl_crt_path: String,
pub ssl_key_path: String,
pub node_port: u16,
pub node_dns: String,
}

impl Config {
pub fn new() -> Self {
Self {
proxy_addr: env::var("PROXY_ADDR").expect("PROXY_ADDR must be set"),
ssl_crt_path: env::var("SSL_CRT_PATH")
.map(|e| e.into())
.expect("SSL_CRT_PATH must be set"),
ssl_key_path: env::var("SSL_KEY_PATH")
.map(|e| e.into())
.expect("SSL_KEY_PATH must be set"),
prometheus_addr: env::var("PROMETHEUS_ADDR").expect("PROMETHEUS_ADDR must be set"),
ssl_crt_path: env::var("SSL_CRT_PATH").expect("SSL_CRT_PATH must be set"),
ssl_key_path: env::var("SSL_KEY_PATH").expect("SSL_KEY_PATH must be set"),
node_port: env::var("NODE_PORT")
.expect("NODE_PORT must be set")
.parse()
Expand All @@ -27,3 +24,8 @@ impl Config {
}
}
}
impl Default for Config {
fn default() -> Self {
Self::new()
}
}
98 changes: 76 additions & 22 deletions proxy/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
use std::{collections::HashMap, fmt::Display, sync::Arc};

use auth::AuthBackgroundService;
use dotenv::dotenv;
use regex::Regex;
use std::{collections::HashMap, error::Error, fmt::Display, sync::Arc};
use pingora::{
listeners::Listeners,
server::{configuration::Opt, Server},
services::{background::background_service, listening::Service},
};
use prometheus::{opts, register_int_counter_vec};
use proxy::ProxyApp;
use tokio::sync::RwLock;
use tracing::Level;

Expand All @@ -10,43 +18,60 @@ mod auth;
mod config;
mod proxy;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
fn main() {
dotenv().ok();

tracing_subscriber::fmt().with_max_level(Level::INFO).init();
let state = Arc::new(RwLock::new(State::try_new()?));

let auth = auth::start(state.clone());
let proxy_server = proxy::start(state.clone());
let config: Arc<Config> = Arc::default();
let state: Arc<RwLock<State>> = Arc::default();

let opt = Opt::default();
let mut server = Server::new(Some(opt)).unwrap();
server.bootstrap();

let auth_background_service = background_service(
"K8S Auth Service",
AuthBackgroundService::new(state.clone()),
);
server.add_service(auth_background_service);

tokio::join!(auth, proxy_server);
let tls_proxy_service = Service::with_listeners(
"TLS Proxy Service".to_string(),
Listeners::tls(
&config.proxy_addr,
&config.ssl_crt_path,
&config.ssl_key_path,
)
.unwrap(),
Arc::new(ProxyApp::new(config.clone(), state)),
);
server.add_service(tls_proxy_service);

Ok(())
let mut prometheus_service_http =
pingora::services::listening::Service::prometheus_http_service();
prometheus_service_http.add_tcp(&config.prometheus_addr);
server.add_service(prometheus_service_http);

server.run_forever();
}

#[derive(Debug, Clone)]
#[derive(Debug, Clone, Default)]
pub struct State {
config: Config,
host_regex: Regex,
metrics: Metrics,
consumers: HashMap<String, Consumer>,
}
impl State {
pub fn try_new() -> Result<Self, Box<dyn Error>> {
let config = Config::new();
let host_regex = Regex::new(r"(dmtr_[\w\d-]+)\.([\w]+)-([\w\d]+).+")?;
pub fn new() -> Self {
let metrics = Metrics::new();
let consumers = HashMap::new();

Ok(Self {
config,
host_regex,
consumers,
})
Self { metrics, consumers }
}

pub fn is_authenticated(&self, network: &str, version: &str, token: &str) -> bool {
pub fn get_consumer(&self, network: &str, version: &str, token: &str) -> Option<Consumer> {
let hash_key = format!("{}.{}.{}", network, version, token);
self.consumers.get(&hash_key).is_some()
self.consumers.get(&hash_key).cloned()
}
}

Expand All @@ -68,3 +93,32 @@ impl Display for Consumer {
write!(f, "{}.{}", self.namespace, self.port_name)
}
}

#[derive(Debug, Clone)]
pub struct Metrics {
total_packages_bytes: prometheus::IntCounterVec,
}
impl Metrics {
pub fn new() -> Self {
let total_packages_bytes = register_int_counter_vec!(
opts!("node_proxy_total_packages_bytes", "Total bytes transferred",),
&["consumer"]
)
.unwrap();

Self {
total_packages_bytes,
}
}

pub fn count_total_packages_bytes(&self, consumer: &Consumer, value: usize) {
self.total_packages_bytes
.with_label_values(&[&consumer.to_string()])
.inc_by(value as u64)
}
}
impl Default for Metrics {
fn default() -> Self {
Self::new()
}
}
Loading
Loading