Skip to content

Commit

Permalink
g3proxy: add tcp_tproxy server
Browse files Browse the repository at this point in the history
  • Loading branch information
zh-jq-b committed Jan 9, 2024
1 parent 9b101e0 commit 9ea3006
Show file tree
Hide file tree
Showing 12 changed files with 868 additions and 0 deletions.
1 change: 1 addition & 0 deletions g3proxy/doc/configuration/servers/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Servers

dummy_close
tcp_stream
tcp_tproxy
tls_stream
http_proxy
socks_proxy
Expand Down
32 changes: 32 additions & 0 deletions g3proxy/doc/configuration/servers/tcp_tproxy.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
.. _configuration_server_tcp_tproxy:

tcp_tproxy
==========

.. versionadded:: 1.7.34

A simple tcp tproxy server, which will just forward traffic to the targeted remote address.

The following common keys are supported:

* :ref:`escaper <conf_server_common_escaper>`
* :ref:`auditor <conf_server_common_auditor>`
* :ref:`shared_logger <conf_server_common_shared_logger>`
* :ref:`listen_in_worker <conf_server_common_listen_in_worker>`
* :ref:`tcp_sock_speed_limit <conf_server_common_tcp_sock_speed_limit>`
* :ref:`ingress_network_filter <conf_server_common_ingress_network_filter>`
* :ref:`tcp_copy_buffer_size <conf_server_common_tcp_copy_buffer_size>`
* :ref:`tcp_copy_yield_size <conf_server_common_tcp_copy_yield_size>`
* :ref:`tcp_misc_opts <conf_server_common_tcp_misc_opts>`
* :ref:`task_idle_check_duration <conf_server_common_task_idle_check_duration>`
* :ref:`task_idle_max_count <conf_server_common_task_idle_max_count>`
* :ref:`extra_metrics_tags <conf_server_common_extra_metrics_tags>`

listen
------

**required**, **type**: :ref:`tcp listen <conf_value_tcp_listen>`

Set the listen config for this server.

The instance count setting will be ignored if *listen_in_worker* is correctly enabled.
14 changes: 14 additions & 0 deletions g3proxy/src/config/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ pub(crate) mod http_rproxy;
pub(crate) mod sni_proxy;
pub(crate) mod socks_proxy;
pub(crate) mod tcp_stream;
#[cfg(any(target_os = "linux", target_os = "freebsd"))]
pub(crate) mod tcp_tproxy;
pub(crate) mod tls_stream;

mod registry;
Expand Down Expand Up @@ -131,6 +133,8 @@ pub(crate) enum AnyServerConfig {
PlainQuicPort(plain_quic_port::PlainQuicPortConfig),
IntelliProxy(intelli_proxy::IntelliProxyConfig),
TcpStream(Box<tcp_stream::TcpStreamServerConfig>),
#[cfg(any(target_os = "linux", target_os = "freebsd"))]
TcpTProxy(tcp_tproxy::TcpTProxyServerConfig),
TlsStream(Box<tls_stream::TlsStreamServerConfig>),
SniProxy(Box<sni_proxy::SniProxyServerConfig>),
SocksProxy(Box<socks_proxy::SocksProxyServerConfig>),
Expand All @@ -150,6 +154,8 @@ macro_rules! impl_transparent0 {
AnyServerConfig::PlainQuicPort(s) => s.$f(),
AnyServerConfig::IntelliProxy(s) => s.$f(),
AnyServerConfig::TcpStream(s) => s.$f(),
#[cfg(any(target_os = "linux", target_os = "freebsd"))]
AnyServerConfig::TcpTProxy(s) => s.$f(),
AnyServerConfig::TlsStream(s) => s.$f(),
AnyServerConfig::SniProxy(s) => s.$f(),
AnyServerConfig::SocksProxy(s) => s.$f(),
Expand All @@ -172,6 +178,8 @@ macro_rules! impl_transparent1 {
AnyServerConfig::PlainQuicPort(s) => s.$f(p),
AnyServerConfig::IntelliProxy(s) => s.$f(p),
AnyServerConfig::TcpStream(s) => s.$f(p),
#[cfg(any(target_os = "linux", target_os = "freebsd"))]
AnyServerConfig::TcpTProxy(s) => s.$f(p),
AnyServerConfig::TlsStream(s) => s.$f(p),
AnyServerConfig::SniProxy(s) => s.$f(p),
AnyServerConfig::SocksProxy(s) => s.$f(p),
Expand Down Expand Up @@ -259,6 +267,12 @@ fn load_server(
.context("failed to load this TcpStream server")?;
Ok(AnyServerConfig::TcpStream(Box::new(server)))
}
#[cfg(any(target_os = "linux", target_os = "freebsd"))]
"tcp_tproxy" | "tcptproxy" => {
let server = tcp_tproxy::TcpTProxyServerConfig::parse(map, position)
.context("failed to load this TcpTProxy server")?;
Ok(AnyServerConfig::TcpTProxy(server))
}
"tls_stream" | "tlsstream" => {
let server = tls_stream::TlsStreamServerConfig::parse(map, position)
.context("failed to load this TLsStream server")?;
Expand Down
239 changes: 239 additions & 0 deletions g3proxy/src/config/server/tcp_tproxy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
/*
* Copyright 2024 ByteDance and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

use std::sync::Arc;
use std::time::Duration;

use anyhow::{anyhow, Context};
use ascii::AsciiString;
use yaml_rust::{yaml, Yaml};

use g3_io_ext::LimitedCopyConfig;
use g3_types::acl::AclNetworkRuleBuilder;
use g3_types::metrics::{MetricsName, StaticMetricsTags};
use g3_types::net::{TcpListenConfig, TcpMiscSockOpts, TcpSockSpeedLimitConfig};
use g3_yaml::YamlDocPosition;

use super::{AnyServerConfig, ServerConfig, ServerConfigDiffAction, IDLE_CHECK_MAXIMUM_DURATION};

const SERVER_CONFIG_TYPE: &str = "TcpTProxy";

#[derive(Clone, Debug, Eq, PartialEq)]
pub(crate) struct TcpTProxyServerConfig {
name: MetricsName,
position: Option<YamlDocPosition>,
pub(crate) escaper: MetricsName,
pub(crate) auditor: MetricsName,
pub(crate) shared_logger: Option<AsciiString>,
pub(crate) listen: TcpListenConfig,
pub(crate) listen_in_worker: bool,
pub(crate) ingress_net_filter: Option<AclNetworkRuleBuilder>,
pub(crate) tcp_sock_speed_limit: TcpSockSpeedLimitConfig,
pub(crate) task_idle_check_duration: Duration,
pub(crate) task_idle_max_count: i32,
pub(crate) tcp_copy: LimitedCopyConfig,
pub(crate) tcp_misc_opts: TcpMiscSockOpts,
pub(crate) extra_metrics_tags: Option<Arc<StaticMetricsTags>>,
}

impl TcpTProxyServerConfig {
fn new(position: Option<YamlDocPosition>) -> Self {
TcpTProxyServerConfig {
name: MetricsName::default(),
position,
escaper: MetricsName::default(),
auditor: MetricsName::default(),
shared_logger: None,
listen: TcpListenConfig::default(),
listen_in_worker: false,
ingress_net_filter: None,
tcp_sock_speed_limit: TcpSockSpeedLimitConfig::default(),
task_idle_check_duration: Duration::from_secs(300),
task_idle_max_count: 1,
tcp_copy: Default::default(),
tcp_misc_opts: Default::default(),
extra_metrics_tags: None,
}
}

pub(crate) fn parse(
map: &yaml::Hash,
position: Option<YamlDocPosition>,
) -> anyhow::Result<Self> {
let mut server = TcpTProxyServerConfig::new(position);

g3_yaml::foreach_kv(map, |k, v| server.set(k, v))?;

server.check()?;
Ok(server)
}

fn set(&mut self, k: &str, v: &Yaml) -> anyhow::Result<()> {
match g3_yaml::key::normalize(k).as_str() {
super::CONFIG_KEY_SERVER_TYPE => Ok(()),
super::CONFIG_KEY_SERVER_NAME => {
self.name = g3_yaml::value::as_metrics_name(v)?;
Ok(())
}
"escaper" => {
self.escaper = g3_yaml::value::as_metrics_name(v)?;
Ok(())
}
"auditor" => {
self.auditor = g3_yaml::value::as_metrics_name(v)?;
Ok(())
}
"shared_logger" => {
let name = g3_yaml::value::as_ascii(v)?;
self.shared_logger = Some(name);
Ok(())
}
"extra_metrics_tags" => {
let tags = g3_yaml::value::as_static_metrics_tags(v)
.context(format!("invalid static metrics tags value for key {k}"))?;
self.extra_metrics_tags = Some(Arc::new(tags));
Ok(())
}
"listen" => {
self.listen = g3_yaml::value::as_tcp_listen_config(v)
.context(format!("invalid tcp listen config value for key {k}"))?;
Ok(())
}
"listen_in_worker" => {
self.listen_in_worker = g3_yaml::value::as_bool(v)?;
Ok(())
}
"ingress_network_filter" | "ingress_net_filter" => {
let filter = g3_yaml::value::acl::as_ingress_network_rule_builder(v).context(
format!("invalid ingress network acl rule value for key {k}"),
)?;
self.ingress_net_filter = Some(filter);
Ok(())
}
"tcp_sock_speed_limit" | "tcp_conn_speed_limit" | "tcp_conn_limit" | "conn_limit" => {
self.tcp_sock_speed_limit = g3_yaml::value::as_tcp_sock_speed_limit(v)
.context(format!("invalid tcp socket speed limit value for key {k}"))?;
Ok(())
}
"tcp_copy_buffer_size" => {
let buffer_size = g3_yaml::humanize::as_usize(v)
.context(format!("invalid humanize usize value for key {k}"))?;
self.tcp_copy.set_buffer_size(buffer_size);
Ok(())
}
"tcp_copy_yield_size" => {
let yield_size = g3_yaml::humanize::as_usize(v)
.context(format!("invalid humanize usize value for key {k}"))?;
self.tcp_copy.set_yield_size(yield_size);
Ok(())
}
"tcp_misc_opts" => {
self.tcp_misc_opts = g3_yaml::value::as_tcp_misc_sock_opts(v)
.context(format!("invalid tcp misc sock opts value for key {k}"))?;
Ok(())
}
"task_idle_check_duration" => {
self.task_idle_check_duration = g3_yaml::humanize::as_duration(v)
.context(format!("invalid humanize duration value for key {k}"))?;
Ok(())
}
"task_idle_max_count" => {
self.task_idle_max_count =
g3_yaml::value::as_i32(v).context(format!("invalid i32 value for key {k}"))?;
Ok(())
}
_ => Err(anyhow!("invalid key {k}")),
}
}

fn check(&mut self) -> anyhow::Result<()> {
if self.name.is_empty() {
return Err(anyhow!("name is not set"));
}
if self.escaper.is_empty() {
return Err(anyhow!("escaper is not set"));
}
if self.task_idle_check_duration > IDLE_CHECK_MAXIMUM_DURATION {
self.task_idle_check_duration = IDLE_CHECK_MAXIMUM_DURATION;
}

#[cfg(target_os = "linux")]
self.listen.set_transparent();
self.listen.check()?;

Ok(())
}
}

impl ServerConfig for TcpTProxyServerConfig {
fn name(&self) -> &MetricsName {
&self.name
}

fn position(&self) -> Option<YamlDocPosition> {
self.position.clone()
}

fn server_type(&self) -> &'static str {
SERVER_CONFIG_TYPE
}

fn escaper(&self) -> &MetricsName {
&self.escaper
}

fn user_group(&self) -> &MetricsName {
Default::default()
}

fn auditor(&self) -> &MetricsName {
&self.auditor
}

fn diff_action(&self, new: &AnyServerConfig) -> ServerConfigDiffAction {
let new = match new {
AnyServerConfig::TcpTProxy(config) => config,
_ => return ServerConfigDiffAction::SpawnNew,
};

if self.eq(new) {
return ServerConfigDiffAction::NoAction;
}

if self.listen != new.listen {
return ServerConfigDiffAction::ReloadAndRespawn;
}

ServerConfigDiffAction::ReloadOnlyConfig
}

fn shared_logger(&self) -> Option<&str> {
self.shared_logger.as_ref().map(|s| s.as_str())
}

#[inline]
fn limited_copy_config(&self) -> LimitedCopyConfig {
self.tcp_copy
}
#[inline]
fn task_idle_check_duration(&self) -> Duration {
self.task_idle_check_duration
}
#[inline]
fn task_max_idle_count(&self) -> i32 {
self.task_idle_max_count
}
}
2 changes: 2 additions & 0 deletions g3proxy/src/serve/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ mod http_rproxy;
mod sni_proxy;
mod socks_proxy;
mod tcp_stream;
#[cfg(any(target_os = "linux", target_os = "freebsd"))]
mod tcp_tproxy;
mod tls_stream;

mod error;
Expand Down
4 changes: 4 additions & 0 deletions g3proxy/src/serve/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ use super::http_rproxy::HttpRProxyServer;
use super::sni_proxy::SniProxyServer;
use super::socks_proxy::SocksProxyServer;
use super::tcp_stream::TcpStreamServer;
#[cfg(any(target_os = "linux", target_os = "freebsd"))]
use super::tcp_tproxy::TcpTProxyServer;
use super::tls_stream::TlsStreamServer;

static SERVER_OPS_LOCK: Mutex<()> = Mutex::const_new(());
Expand Down Expand Up @@ -294,6 +296,8 @@ fn spawn_new_unlocked(config: AnyServerConfig) -> anyhow::Result<()> {
AnyServerConfig::PlainQuicPort(c) => PlainQuicPort::prepare_initial(c)?,
AnyServerConfig::IntelliProxy(c) => IntelliProxy::prepare_initial(c)?,
AnyServerConfig::TcpStream(c) => TcpStreamServer::prepare_initial(*c)?,
#[cfg(any(target_os = "linux", target_os = "freebsd"))]
AnyServerConfig::TcpTProxy(c) => TcpTProxyServer::prepare_initial(c)?,
AnyServerConfig::TlsStream(c) => TlsStreamServer::prepare_initial(*c)?,
AnyServerConfig::SniProxy(c) => SniProxyServer::prepare_initial(*c)?,
AnyServerConfig::SocksProxy(c) => SocksProxyServer::prepare_initial(*c)?,
Expand Down
Loading

0 comments on commit 9ea3006

Please sign in to comment.