Skip to content

Commit 10f9bda

Browse files
committed
feat: [#1096] ban client IP when exceeds connection ID errors limit
The life demo tracker is receiving many UDP requests with a wrong conenctions IDs. Errors are logged (write disk) and that decreases the tracker performance. This counts errors and bans Ips after 10 errors for 2 minutes. We use two levels of counters. 1. First level: A Counting Bloom Filter: fast and low memory consumption but innacurate (False Positives). 2. HashMap: Exact Counter for Ips. CBFs are fast and use litle memory but they are also innaccurate. They have False Positives meaning some IPs would be banned only becuase there are bucket colissions (IPs sharing the same counter). To avoid banning IPs incorrectly we decided to introduce a second counter, which is a HashMap that counts error precisely. IPs are only banned when this counter reaches the limit (over 10 errors). We keep the CBF as a first level filter. It's a fast-check IP filter without affecting tracker's performance. When the IP is banned according to the first filter we double-check in the HashMap. CBF is faster than checking always for banned IPs against the HashMap. This solution should be good if the number of IPs is low. We have to find another solution anyway for IPv6 where is cheaper to own a range of IPs.
1 parent 87401e8 commit 10f9bda

File tree

7 files changed

+288
-37
lines changed

7 files changed

+288
-37
lines changed

cSpell.json

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
"alekitto",
66
"appuser",
77
"Arvid",
8+
"ASMS",
89
"asyn",
910
"autoclean",
1011
"AUTOINCREMENT",

src/servers/udp/handlers.rs

+15-1
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,14 @@ use aquatic_udp_protocol::{
1111
ResponsePeer, ScrapeRequest, ScrapeResponse, TorrentScrapeStatistics, TransactionId,
1212
};
1313
use bittorrent_primitives::info_hash::InfoHash;
14+
use tokio::sync::RwLock;
1415
use torrust_tracker_clock::clock::Time as _;
1516
use tracing::{instrument, Level};
1617
use uuid::Uuid;
1718
use zerocopy::network_endian::I32;
1819

1920
use super::connection_cookie::{check, make};
21+
use super::server::banning::BanService;
2022
use super::RawRequest;
2123
use crate::core::{statistics, PeersWanted, Tracker};
2224
use crate::servers::udp::error::Error;
@@ -51,12 +53,13 @@ impl CookieTimeValues {
5153
/// - Delegating the request to the correct handler depending on the request type.
5254
///
5355
/// It will return an `Error` response if the request is invalid.
54-
#[instrument(fields(request_id), skip(udp_request, tracker, cookie_time_values), ret(level = Level::TRACE))]
56+
#[instrument(fields(request_id), skip(udp_request, tracker, cookie_time_values, ban_service), ret(level = Level::TRACE))]
5557
pub(crate) async fn handle_packet(
5658
udp_request: RawRequest,
5759
tracker: &Tracker,
5860
local_addr: SocketAddr,
5961
cookie_time_values: CookieTimeValues,
62+
ban_service: Arc<RwLock<BanService>>,
6063
) -> Response {
6164
tracing::Span::current().record("request_id", Uuid::new_v4().to_string());
6265
tracing::debug!("Handling Packets: {udp_request:?}");
@@ -68,6 +71,17 @@ pub(crate) async fn handle_packet(
6871
Ok(request) => match handle_request(request, udp_request.from, tracker, cookie_time_values.clone()).await {
6972
Ok(response) => return response,
7073
Err((e, transaction_id)) => {
74+
match &e {
75+
Error::CookieValueNotNormal { .. }
76+
| Error::CookieValueExpired { .. }
77+
| Error::CookieValueFromFuture { .. } => {
78+
// code-review: should we include `RequestParseError` and `BadRequest`?
79+
let mut ban_service = ban_service.write().await;
80+
ban_service.increase_counter(&udp_request.from.ip());
81+
}
82+
_ => {}
83+
}
84+
7185
handle_error(
7286
udp_request.from,
7387
tracker,

src/servers/udp/server/banning.rs

+150
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
//! Banning service for UDP tracker.
2+
//!
3+
//! It bans clients that send invalid connection id's.
4+
//!
5+
//! It uses two levels of filtering:
6+
//!
7+
//! 1. First, tt uses a Counting Bloom Filter to keep track of the number of
8+
//! connection ID errors per ip. That means there can be false positives, but
9+
//! not false negatives. 1 out of 100000 requests will be a false positive
10+
//! and the client will be banned and not receive a response.
11+
//! 2. Since we want to avoid false positives (banning a client that is not
12+
//! sending invalid connection id's), we use a `HashMap` to keep track of the
13+
//! exact number of connection ID errors per ip.
14+
//!
15+
//! This two level filtering is to avoid false positives. It has the advantage
16+
//! of being fast by using a Counting Bloom Filter and not having false
17+
//! negatives at the cost of increasing the memory usage.
18+
use std::collections::HashMap;
19+
use std::net::IpAddr;
20+
21+
use bloom::{CountingBloomFilter, ASMS};
22+
use tokio::time::Instant;
23+
use url::Url;
24+
25+
use crate::servers::udp::UDP_TRACKER_LOG_TARGET;
26+
27+
pub struct BanService {
28+
max_connection_id_errors_per_ip: u32,
29+
fuzzy_error_counter: CountingBloomFilter,
30+
accurate_error_counter: HashMap<IpAddr, u32>,
31+
local_addr: Url,
32+
last_connection_id_errors_reset: Instant,
33+
}
34+
35+
impl BanService {
36+
#[must_use]
37+
pub fn new(max_connection_id_errors_per_ip: u32, local_addr: Url) -> Self {
38+
Self {
39+
max_connection_id_errors_per_ip,
40+
local_addr,
41+
fuzzy_error_counter: CountingBloomFilter::with_rate(4, 0.01, 100),
42+
accurate_error_counter: HashMap::new(),
43+
last_connection_id_errors_reset: tokio::time::Instant::now(),
44+
}
45+
}
46+
47+
pub fn increase_counter(&mut self, ip: &IpAddr) {
48+
self.fuzzy_error_counter.insert(&ip.to_string());
49+
*self.accurate_error_counter.entry(*ip).or_insert(0) += 1;
50+
}
51+
52+
#[must_use]
53+
pub fn get_count(&self, ip: &IpAddr) -> Option<u32> {
54+
self.accurate_error_counter.get(ip).copied()
55+
}
56+
57+
#[must_use]
58+
pub fn get_estimate_count(&self, ip: &IpAddr) -> u32 {
59+
self.fuzzy_error_counter.estimate_count(&ip.to_string())
60+
}
61+
62+
/// Returns true if the given ip address is banned.
63+
#[must_use]
64+
pub fn is_banned(&self, ip: &IpAddr) -> bool {
65+
// First check if the ip is in the bloom filter (fast check)
66+
if self.fuzzy_error_counter.estimate_count(&ip.to_string()) <= self.max_connection_id_errors_per_ip {
67+
return false;
68+
}
69+
70+
// Check with the exact counter (to avoid false positives)
71+
match self.get_count(ip) {
72+
Some(count) => count > self.max_connection_id_errors_per_ip,
73+
None => false,
74+
}
75+
}
76+
77+
/// Resets the filters and updates the reset timestamp.
78+
pub fn reset_bans(&mut self) {
79+
self.fuzzy_error_counter.clear();
80+
81+
self.accurate_error_counter.clear();
82+
83+
self.last_connection_id_errors_reset = Instant::now();
84+
85+
let local_addr = self.local_addr.to_string();
86+
tracing::info!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_udp_server::loop (connection id errors filter cleared)");
87+
}
88+
}
89+
90+
#[cfg(test)]
91+
mod tests {
92+
use std::net::IpAddr;
93+
94+
use super::BanService;
95+
96+
/// Sample service with one day ban duration.
97+
fn ban_service(counter_limit: u32) -> BanService {
98+
let udp_tracker_url = "udp://127.0.0.1".parse().unwrap();
99+
BanService::new(counter_limit, udp_tracker_url)
100+
}
101+
102+
#[test]
103+
fn it_should_increase_the_errors_counter_for_a_given_ip() {
104+
let mut ban_service = ban_service(1);
105+
106+
let ip: IpAddr = "127.0.0.2".parse().unwrap();
107+
108+
ban_service.increase_counter(&ip);
109+
110+
assert_eq!(ban_service.get_count(&ip), Some(1));
111+
}
112+
113+
#[test]
114+
fn it_should_ban_ips_with_counters_exceeding_a_predefined_limit() {
115+
let mut ban_service = ban_service(1);
116+
117+
let ip: IpAddr = "127.0.0.2".parse().unwrap();
118+
119+
ban_service.increase_counter(&ip); // Counter = 1
120+
ban_service.increase_counter(&ip); // Counter = 2
121+
122+
println!("Counter: {}", ban_service.get_count(&ip).unwrap());
123+
124+
assert!(ban_service.is_banned(&ip));
125+
}
126+
127+
#[test]
128+
fn it_should_not_ban_ips_whose_counters_do_not_exceed_the_predefined_limit() {
129+
let mut ban_service = ban_service(1);
130+
131+
let ip: IpAddr = "127.0.0.2".parse().unwrap();
132+
133+
ban_service.increase_counter(&ip);
134+
135+
assert!(!ban_service.is_banned(&ip));
136+
}
137+
138+
#[test]
139+
fn it_should_allow_resetting_all_the_counters() {
140+
let mut ban_service = ban_service(1);
141+
142+
let ip: IpAddr = "127.0.0.2".parse().unwrap();
143+
144+
ban_service.increase_counter(&ip); // Counter = 1
145+
146+
ban_service.reset_bans();
147+
148+
assert_eq!(ban_service.get_estimate_count(&ip), 0);
149+
}
150+
}

src/servers/udp/server/launcher.rs

+47-15
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@ use bittorrent_tracker_client::udp::client::check;
66
use derive_more::Constructor;
77
use futures_util::StreamExt;
88
use tokio::select;
9-
use tokio::sync::oneshot;
9+
use tokio::sync::{oneshot, RwLock};
10+
use tokio::time::interval;
1011
use tracing::instrument;
1112

13+
use super::banning::BanService;
1214
use super::request_buffer::ActiveRequests;
1315
use crate::bootstrap::jobs::Started;
1416
use crate::core::{statistics, Tracker};
@@ -20,6 +22,11 @@ use crate::servers::udp::server::processor::Processor;
2022
use crate::servers::udp::server::receiver::Receiver;
2123
use crate::servers::udp::UDP_TRACKER_LOG_TARGET;
2224

25+
/// The maximum number of connection id errors per ip. Clients will be banned if
26+
/// they exceed this limit.
27+
const MAX_CONNECTION_ID_ERRORS_PER_IP: u32 = 10;
28+
const IP_BANS_RESET_INTERVAL_IN_SECS: u64 = 120;
29+
2330
/// A UDP server instance launcher.
2431
#[derive(Constructor)]
2532
pub struct Launcher;
@@ -115,13 +122,30 @@ impl Launcher {
115122
let active_requests = &mut ActiveRequests::default();
116123

117124
let addr = receiver.bound_socket_address();
125+
118126
let local_addr = format!("udp://{addr}");
119127

120128
let cookie_lifetime = cookie_lifetime.as_secs_f64();
121129

122-
loop {
123-
let processor = Processor::new(receiver.socket.clone(), tracker.clone(), cookie_lifetime);
130+
let ban_service = Arc::new(RwLock::new(BanService::new(
131+
MAX_CONNECTION_ID_ERRORS_PER_IP,
132+
local_addr.parse().unwrap(),
133+
)));
134+
135+
let ban_cleaner = ban_service.clone();
136+
137+
tokio::spawn(async move {
138+
let mut cleaner_interval = interval(Duration::from_secs(IP_BANS_RESET_INTERVAL_IN_SECS));
139+
140+
cleaner_interval.tick().await;
124141

142+
loop {
143+
cleaner_interval.tick().await;
144+
ban_cleaner.write().await.reset_bans();
145+
}
146+
});
147+
148+
loop {
125149
if let Some(req) = {
126150
tracing::trace!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_udp_server (wait for request)");
127151
receiver.next().await
@@ -149,18 +173,26 @@ impl Launcher {
149173
}
150174
}
151175

152-
// We spawn the new task even if there active requests buffer is
153-
// full. This could seem counterintuitive because we are accepting
154-
// more request and consuming more memory even if the server is
155-
// already busy. However, we "force_push" the new tasks in the
156-
// buffer. That means, in the worst scenario we will abort a
157-
// running task to make place for the new task.
158-
//
159-
// Once concern could be to reach an starvation point were we
160-
// are only adding and removing tasks without given them the
161-
// chance to finish. However, the buffer is yielding before
162-
// aborting one tasks, giving it the chance to finish.
163-
let abort_handle: tokio::task::AbortHandle = tokio::task::spawn(processor.process_request(req)).abort_handle();
176+
if ban_service.read().await.is_banned(&req.from.ip()) {
177+
tracing::debug!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_udp_server::loop continue: (banned ip)");
178+
continue;
179+
}
180+
181+
let processor = Processor::new(receiver.socket.clone(), tracker.clone(), cookie_lifetime);
182+
183+
/* We spawn the new task even if the active requests buffer is
184+
full. This could seem counterintuitive because we are accepting
185+
more request and consuming more memory even if the server is
186+
already busy. However, we "force_push" the new tasks in the
187+
buffer. That means, in the worst scenario we will abort a
188+
running task to make place for the new task.
189+
190+
Once concern could be to reach an starvation point were we are
191+
only adding and removing tasks without given them the chance to
192+
finish. However, the buffer is yielding before aborting one
193+
tasks, giving it the chance to finish. */
194+
let abort_handle: tokio::task::AbortHandle =
195+
tokio::task::spawn(processor.process_request(req, ban_service.clone())).abort_handle();
164196

165197
if abort_handle.is_finished() {
166198
continue;

src/servers/udp/server/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use thiserror::Error;
66

77
use super::RawRequest;
88

9+
pub mod banning;
910
pub mod bound_socket;
1011
pub mod launcher;
1112
pub mod processor;

src/servers/udp/server/processor.rs

+6-2
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@ use std::net::{IpAddr, SocketAddr};
33
use std::sync::Arc;
44

55
use aquatic_udp_protocol::Response;
6+
use tokio::sync::RwLock;
67
use tracing::{instrument, Level};
78

9+
use super::banning::BanService;
810
use super::bound_socket::BoundSocket;
911
use crate::core::{statistics, Tracker};
1012
use crate::servers::udp::handlers::CookieTimeValues;
@@ -25,16 +27,18 @@ impl Processor {
2527
}
2628
}
2729

28-
#[instrument(skip(self, request))]
29-
pub async fn process_request(self, request: RawRequest) {
30+
#[instrument(skip(self, request, ban_service))]
31+
pub async fn process_request(self, request: RawRequest, ban_service: Arc<RwLock<BanService>>) {
3032
let from = request.from;
3133
let response = handlers::handle_packet(
3234
request,
3335
&self.tracker,
3436
self.socket.address(),
3537
CookieTimeValues::new(self.cookie_lifetime),
38+
ban_service,
3639
)
3740
.await;
41+
3842
self.send_response(from, response).await;
3943
}
4044

0 commit comments

Comments
 (0)