Skip to content

Commit 208694f

Browse files
committed
Merge #1124: Feat: ban IPs not sending a valid connection ID
29e506d feat: use default aquatic udp port for benchmarking (Jose Celano) 10f9bda feat: [#1096] ban client IP when exceeds connection ID errors limit (Jose Celano) 87401e8 chore(deps): add dependency bloom (Jose Celano) Pull request description: This PR uses a [Counting Bloom Filter](https://docs.rs/bloom/latest/bloom/#counting-bloom-filters) to count IP sending UDP requests with wrong connection IDs. The IP is banned when the tracker receives more than 10 requests from a given IP with a bad connection ID. Bad connection IDs are cookie values that have expired or are from the future. With the current `CountingBloomFilter` configuration (0.01 rate), we would have a **False Positive** for every 10000 IPs, meaning when two IPs have a collision, and one of them is misbehaving, the other one would also be banned. To avoid false positives, we introduced a second counter with a HashMap. This consumes more memory, but it's reset every 120 seconds. The HashMap is only used when the CBF detects a potential bad client. ### TODO - [x] Straightforward implementation - [x] Benchmarking (how much this new feature affects performance) - [x] Add an E2E test - [x] Remove IPs from the banned list every hour - [x] Review filter settings `CountingBloomFilter::with_rate(4, 0.01, 100)` - [x] Refactor: extract the IP ban service from the main loop - [x] Benchmarking after extracting `BanService` ### Questions - [ ] Should we add a configuration option for the maximum number of errors allowed? ### Future PR - [ ] Add a metric to tracker stats for the number of banned IPs. - [ ] Ban subnets ACKs for top commit: josecelano: ACK 29e506d Tree-SHA512: 004959e00eced1b9c1de39de81f8f9f1d8da1b46f5ee38b3b0679e77cc40448525ac197145ace5dd62017c39a72f7175b06f556e6a7eb8cffbdc57f67052a856
2 parents a7e20df + 29e506d commit 208694f

File tree

10 files changed

+306
-38
lines changed

10 files changed

+306
-38
lines changed

Cargo.lock

+16
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ axum-server = { version = "0", features = ["tls-rustls-no-provider"] }
3939
bittorrent-http-protocol = { version = "3.0.0-develop", path = "packages/http-protocol" }
4040
bittorrent-primitives = "0.1.0"
4141
bittorrent-tracker-client = { version = "3.0.0-develop", path = "packages/tracker-client" }
42+
bloom = "0.3.2"
4243
blowfish = "0"
4344
camino = { version = "1", features = ["serde", "serde1"] }
4445
chrono = { version = "0", default-features = false, features = ["clock"] }

cSpell.json

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
"alekitto",
66
"appuser",
77
"Arvid",
8+
"ASMS",
89
"asyn",
910
"autoclean",
1011
"AUTOINCREMENT",

share/default/config/tracker.udp.benchmarking.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,4 @@ persistent_torrent_completed_stat = false
1818
remove_peerless_torrents = false
1919

2020
[[udp_trackers]]
21-
bind_address = "0.0.0.0:6969"
21+
bind_address = "0.0.0.0:3000"

src/servers/udp/handlers.rs

+15-1
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,14 @@ use aquatic_udp_protocol::{
1111
ResponsePeer, ScrapeRequest, ScrapeResponse, TorrentScrapeStatistics, TransactionId,
1212
};
1313
use bittorrent_primitives::info_hash::InfoHash;
14+
use tokio::sync::RwLock;
1415
use torrust_tracker_clock::clock::Time as _;
1516
use tracing::{instrument, Level};
1617
use uuid::Uuid;
1718
use zerocopy::network_endian::I32;
1819

1920
use super::connection_cookie::{check, make};
21+
use super::server::banning::BanService;
2022
use super::RawRequest;
2123
use crate::core::{statistics, PeersWanted, Tracker};
2224
use crate::servers::udp::error::Error;
@@ -51,12 +53,13 @@ impl CookieTimeValues {
5153
/// - Delegating the request to the correct handler depending on the request type.
5254
///
5355
/// It will return an `Error` response if the request is invalid.
54-
#[instrument(fields(request_id), skip(udp_request, tracker, cookie_time_values), ret(level = Level::TRACE))]
56+
#[instrument(fields(request_id), skip(udp_request, tracker, cookie_time_values, ban_service), ret(level = Level::TRACE))]
5557
pub(crate) async fn handle_packet(
5658
udp_request: RawRequest,
5759
tracker: &Tracker,
5860
local_addr: SocketAddr,
5961
cookie_time_values: CookieTimeValues,
62+
ban_service: Arc<RwLock<BanService>>,
6063
) -> Response {
6164
tracing::Span::current().record("request_id", Uuid::new_v4().to_string());
6265
tracing::debug!("Handling Packets: {udp_request:?}");
@@ -68,6 +71,17 @@ pub(crate) async fn handle_packet(
6871
Ok(request) => match handle_request(request, udp_request.from, tracker, cookie_time_values.clone()).await {
6972
Ok(response) => return response,
7073
Err((e, transaction_id)) => {
74+
match &e {
75+
Error::CookieValueNotNormal { .. }
76+
| Error::CookieValueExpired { .. }
77+
| Error::CookieValueFromFuture { .. } => {
78+
// code-review: should we include `RequestParseError` and `BadRequest`?
79+
let mut ban_service = ban_service.write().await;
80+
ban_service.increase_counter(&udp_request.from.ip());
81+
}
82+
_ => {}
83+
}
84+
7185
handle_error(
7286
udp_request.from,
7387
tracker,

src/servers/udp/server/banning.rs

+150
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
//! Banning service for UDP tracker.
2+
//!
3+
//! It bans clients that send invalid connection id's.
4+
//!
5+
//! It uses two levels of filtering:
6+
//!
7+
//! 1. First, tt uses a Counting Bloom Filter to keep track of the number of
8+
//! connection ID errors per ip. That means there can be false positives, but
9+
//! not false negatives. 1 out of 100000 requests will be a false positive
10+
//! and the client will be banned and not receive a response.
11+
//! 2. Since we want to avoid false positives (banning a client that is not
12+
//! sending invalid connection id's), we use a `HashMap` to keep track of the
13+
//! exact number of connection ID errors per ip.
14+
//!
15+
//! This two level filtering is to avoid false positives. It has the advantage
16+
//! of being fast by using a Counting Bloom Filter and not having false
17+
//! negatives at the cost of increasing the memory usage.
18+
use std::collections::HashMap;
19+
use std::net::IpAddr;
20+
21+
use bloom::{CountingBloomFilter, ASMS};
22+
use tokio::time::Instant;
23+
use url::Url;
24+
25+
use crate::servers::udp::UDP_TRACKER_LOG_TARGET;
26+
27+
pub struct BanService {
28+
max_connection_id_errors_per_ip: u32,
29+
fuzzy_error_counter: CountingBloomFilter,
30+
accurate_error_counter: HashMap<IpAddr, u32>,
31+
local_addr: Url,
32+
last_connection_id_errors_reset: Instant,
33+
}
34+
35+
impl BanService {
36+
#[must_use]
37+
pub fn new(max_connection_id_errors_per_ip: u32, local_addr: Url) -> Self {
38+
Self {
39+
max_connection_id_errors_per_ip,
40+
local_addr,
41+
fuzzy_error_counter: CountingBloomFilter::with_rate(4, 0.01, 100),
42+
accurate_error_counter: HashMap::new(),
43+
last_connection_id_errors_reset: tokio::time::Instant::now(),
44+
}
45+
}
46+
47+
pub fn increase_counter(&mut self, ip: &IpAddr) {
48+
self.fuzzy_error_counter.insert(&ip.to_string());
49+
*self.accurate_error_counter.entry(*ip).or_insert(0) += 1;
50+
}
51+
52+
#[must_use]
53+
pub fn get_count(&self, ip: &IpAddr) -> Option<u32> {
54+
self.accurate_error_counter.get(ip).copied()
55+
}
56+
57+
#[must_use]
58+
pub fn get_estimate_count(&self, ip: &IpAddr) -> u32 {
59+
self.fuzzy_error_counter.estimate_count(&ip.to_string())
60+
}
61+
62+
/// Returns true if the given ip address is banned.
63+
#[must_use]
64+
pub fn is_banned(&self, ip: &IpAddr) -> bool {
65+
// First check if the ip is in the bloom filter (fast check)
66+
if self.fuzzy_error_counter.estimate_count(&ip.to_string()) <= self.max_connection_id_errors_per_ip {
67+
return false;
68+
}
69+
70+
// Check with the exact counter (to avoid false positives)
71+
match self.get_count(ip) {
72+
Some(count) => count > self.max_connection_id_errors_per_ip,
73+
None => false,
74+
}
75+
}
76+
77+
/// Resets the filters and updates the reset timestamp.
78+
pub fn reset_bans(&mut self) {
79+
self.fuzzy_error_counter.clear();
80+
81+
self.accurate_error_counter.clear();
82+
83+
self.last_connection_id_errors_reset = Instant::now();
84+
85+
let local_addr = self.local_addr.to_string();
86+
tracing::info!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_udp_server::loop (connection id errors filter cleared)");
87+
}
88+
}
89+
90+
#[cfg(test)]
91+
mod tests {
92+
use std::net::IpAddr;
93+
94+
use super::BanService;
95+
96+
/// Sample service with one day ban duration.
97+
fn ban_service(counter_limit: u32) -> BanService {
98+
let udp_tracker_url = "udp://127.0.0.1".parse().unwrap();
99+
BanService::new(counter_limit, udp_tracker_url)
100+
}
101+
102+
#[test]
103+
fn it_should_increase_the_errors_counter_for_a_given_ip() {
104+
let mut ban_service = ban_service(1);
105+
106+
let ip: IpAddr = "127.0.0.2".parse().unwrap();
107+
108+
ban_service.increase_counter(&ip);
109+
110+
assert_eq!(ban_service.get_count(&ip), Some(1));
111+
}
112+
113+
#[test]
114+
fn it_should_ban_ips_with_counters_exceeding_a_predefined_limit() {
115+
let mut ban_service = ban_service(1);
116+
117+
let ip: IpAddr = "127.0.0.2".parse().unwrap();
118+
119+
ban_service.increase_counter(&ip); // Counter = 1
120+
ban_service.increase_counter(&ip); // Counter = 2
121+
122+
println!("Counter: {}", ban_service.get_count(&ip).unwrap());
123+
124+
assert!(ban_service.is_banned(&ip));
125+
}
126+
127+
#[test]
128+
fn it_should_not_ban_ips_whose_counters_do_not_exceed_the_predefined_limit() {
129+
let mut ban_service = ban_service(1);
130+
131+
let ip: IpAddr = "127.0.0.2".parse().unwrap();
132+
133+
ban_service.increase_counter(&ip);
134+
135+
assert!(!ban_service.is_banned(&ip));
136+
}
137+
138+
#[test]
139+
fn it_should_allow_resetting_all_the_counters() {
140+
let mut ban_service = ban_service(1);
141+
142+
let ip: IpAddr = "127.0.0.2".parse().unwrap();
143+
144+
ban_service.increase_counter(&ip); // Counter = 1
145+
146+
ban_service.reset_bans();
147+
148+
assert_eq!(ban_service.get_estimate_count(&ip), 0);
149+
}
150+
}

src/servers/udp/server/launcher.rs

+47-15
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@ use bittorrent_tracker_client::udp::client::check;
66
use derive_more::Constructor;
77
use futures_util::StreamExt;
88
use tokio::select;
9-
use tokio::sync::oneshot;
9+
use tokio::sync::{oneshot, RwLock};
10+
use tokio::time::interval;
1011
use tracing::instrument;
1112

13+
use super::banning::BanService;
1214
use super::request_buffer::ActiveRequests;
1315
use crate::bootstrap::jobs::Started;
1416
use crate::core::{statistics, Tracker};
@@ -20,6 +22,11 @@ use crate::servers::udp::server::processor::Processor;
2022
use crate::servers::udp::server::receiver::Receiver;
2123
use crate::servers::udp::UDP_TRACKER_LOG_TARGET;
2224

25+
/// The maximum number of connection id errors per ip. Clients will be banned if
26+
/// they exceed this limit.
27+
const MAX_CONNECTION_ID_ERRORS_PER_IP: u32 = 10;
28+
const IP_BANS_RESET_INTERVAL_IN_SECS: u64 = 120;
29+
2330
/// A UDP server instance launcher.
2431
#[derive(Constructor)]
2532
pub struct Launcher;
@@ -115,13 +122,30 @@ impl Launcher {
115122
let active_requests = &mut ActiveRequests::default();
116123

117124
let addr = receiver.bound_socket_address();
125+
118126
let local_addr = format!("udp://{addr}");
119127

120128
let cookie_lifetime = cookie_lifetime.as_secs_f64();
121129

122-
loop {
123-
let processor = Processor::new(receiver.socket.clone(), tracker.clone(), cookie_lifetime);
130+
let ban_service = Arc::new(RwLock::new(BanService::new(
131+
MAX_CONNECTION_ID_ERRORS_PER_IP,
132+
local_addr.parse().unwrap(),
133+
)));
134+
135+
let ban_cleaner = ban_service.clone();
136+
137+
tokio::spawn(async move {
138+
let mut cleaner_interval = interval(Duration::from_secs(IP_BANS_RESET_INTERVAL_IN_SECS));
139+
140+
cleaner_interval.tick().await;
124141

142+
loop {
143+
cleaner_interval.tick().await;
144+
ban_cleaner.write().await.reset_bans();
145+
}
146+
});
147+
148+
loop {
125149
if let Some(req) = {
126150
tracing::trace!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_udp_server (wait for request)");
127151
receiver.next().await
@@ -149,18 +173,26 @@ impl Launcher {
149173
}
150174
}
151175

152-
// We spawn the new task even if there active requests buffer is
153-
// full. This could seem counterintuitive because we are accepting
154-
// more request and consuming more memory even if the server is
155-
// already busy. However, we "force_push" the new tasks in the
156-
// buffer. That means, in the worst scenario we will abort a
157-
// running task to make place for the new task.
158-
//
159-
// Once concern could be to reach an starvation point were we
160-
// are only adding and removing tasks without given them the
161-
// chance to finish. However, the buffer is yielding before
162-
// aborting one tasks, giving it the chance to finish.
163-
let abort_handle: tokio::task::AbortHandle = tokio::task::spawn(processor.process_request(req)).abort_handle();
176+
if ban_service.read().await.is_banned(&req.from.ip()) {
177+
tracing::debug!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_udp_server::loop continue: (banned ip)");
178+
continue;
179+
}
180+
181+
let processor = Processor::new(receiver.socket.clone(), tracker.clone(), cookie_lifetime);
182+
183+
/* We spawn the new task even if the active requests buffer is
184+
full. This could seem counterintuitive because we are accepting
185+
more request and consuming more memory even if the server is
186+
already busy. However, we "force_push" the new tasks in the
187+
buffer. That means, in the worst scenario we will abort a
188+
running task to make place for the new task.
189+
190+
Once concern could be to reach an starvation point were we are
191+
only adding and removing tasks without given them the chance to
192+
finish. However, the buffer is yielding before aborting one
193+
tasks, giving it the chance to finish. */
194+
let abort_handle: tokio::task::AbortHandle =
195+
tokio::task::spawn(processor.process_request(req, ban_service.clone())).abort_handle();
164196

165197
if abort_handle.is_finished() {
166198
continue;

src/servers/udp/server/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use thiserror::Error;
66

77
use super::RawRequest;
88

9+
pub mod banning;
910
pub mod bound_socket;
1011
pub mod launcher;
1112
pub mod processor;

src/servers/udp/server/processor.rs

+6-2
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@ use std::net::{IpAddr, SocketAddr};
33
use std::sync::Arc;
44

55
use aquatic_udp_protocol::Response;
6+
use tokio::sync::RwLock;
67
use tracing::{instrument, Level};
78

9+
use super::banning::BanService;
810
use super::bound_socket::BoundSocket;
911
use crate::core::{statistics, Tracker};
1012
use crate::servers::udp::handlers::CookieTimeValues;
@@ -25,16 +27,18 @@ impl Processor {
2527
}
2628
}
2729

28-
#[instrument(skip(self, request))]
29-
pub async fn process_request(self, request: RawRequest) {
30+
#[instrument(skip(self, request, ban_service))]
31+
pub async fn process_request(self, request: RawRequest, ban_service: Arc<RwLock<BanService>>) {
3032
let from = request.from;
3133
let response = handlers::handle_packet(
3234
request,
3335
&self.tracker,
3436
self.socket.address(),
3537
CookieTimeValues::new(self.cookie_lifetime),
38+
ban_service,
3639
)
3740
.await;
41+
3842
self.send_response(from, response).await;
3943
}
4044

0 commit comments

Comments
 (0)