Skip to content

Commit abb47b2

Browse files
da2ce7josecelano
authored andcommitted
dev: fix udp ring-buffer not looping
My previous version would be limited to a single thread, as `push_overwrite` would keep on returning the last element when the ring-buffer was full. Now the ring-buffer is pre-filled and is looped over with a mutating iterator. New handles are progressively swapped-in when the old entries are finished. Note: I think that this implementation can be replaced with a standard vector with the same effect.
1 parent a1408ad commit abb47b2

File tree

1 file changed

+63
-9
lines changed

1 file changed

+63
-9
lines changed

src/servers/udp/server.rs

+63-9
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use std::sync::Arc;
2424
use aquatic_udp_protocol::Response;
2525
use derive_more::Constructor;
2626
use log::{debug, error, info, trace};
27-
use ringbuf::traits::{Consumer, Observer, RingBuffer};
27+
use ringbuf::traits::{Consumer, Observer, Producer};
2828
use ringbuf::StaticRb;
2929
use tokio::net::UdpSocket;
3030
use tokio::sync::oneshot;
@@ -202,11 +202,23 @@ impl Launcher {
202202
}
203203
}
204204

205-
#[derive(Default)]
206205
struct ActiveRequests {
207206
rb: StaticRb<AbortHandle, 50>, // the number of requests we handle at the same time.
208207
}
209208

209+
impl ActiveRequests {
210+
/// Creates a new [`ActiveRequests`] filled with finished tasks.
211+
async fn new() -> Self {
212+
let mut rb = StaticRb::default();
213+
214+
let () = while rb.try_push(tokio::task::spawn_blocking(|| ()).abort_handle()).is_ok() {};
215+
216+
task::yield_now().await;
217+
218+
Self { rb }
219+
}
220+
}
221+
210222
impl std::fmt::Debug for ActiveRequests {
211223
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
212224
let (left, right) = &self.rb.as_slices();
@@ -280,15 +292,22 @@ impl Udp {
280292
let tracker = tracker.clone();
281293
let socket = socket.clone();
282294

283-
let reqs = &mut ActiveRequests::default();
295+
let reqs = &mut ActiveRequests::new().await;
284296

285-
// Main Waiting Loop, awaits on async [`receive_request`].
286297
loop {
287-
if let Some(h) = reqs.rb.push_overwrite(
288-
Self::spawn_request_processor(Self::receive_request(socket.clone()).await, tracker.clone(), socket.clone())
289-
.abort_handle(),
290-
) {
291-
if !h.is_finished() {
298+
task::yield_now().await;
299+
for h in reqs.rb.iter_mut() {
300+
if h.is_finished() {
301+
std::mem::swap(
302+
h,
303+
&mut Self::spawn_request_processor(
304+
Self::receive_request(socket.clone()).await,
305+
tracker.clone(),
306+
socket.clone(),
307+
)
308+
.abort_handle(),
309+
);
310+
} else {
292311
// the task is still running, lets yield and give it a chance to flush.
293312
tokio::task::yield_now().await;
294313

@@ -299,6 +318,9 @@ impl Udp {
299318
tracing::span!(
300319
target: "UDP TRACKER",
301320
tracing::Level::WARN, "request-aborted", server_socket_addr = %server_socket_addr);
321+
322+
// force-break a single thread, then loop again.
323+
break;
302324
}
303325
}
304326
}
@@ -396,13 +418,45 @@ mod tests {
396418
use std::sync::Arc;
397419
use std::time::Duration;
398420

421+
use ringbuf::traits::{Consumer, Observer, RingBuffer};
399422
use tokio::time::sleep;
400423
use torrust_tracker_test_helpers::configuration::ephemeral_mode_public;
401424

425+
use super::ActiveRequests;
402426
use crate::bootstrap::app::initialize_with_configuration;
403427
use crate::servers::registar::Registar;
404428
use crate::servers::udp::server::{Launcher, UdpServer};
405429

430+
#[tokio::test]
431+
async fn it_should_return_to_the_start_of_the_ring_buffer() {
432+
let mut a_req = ActiveRequests::new().await;
433+
434+
let mut count: usize = 0;
435+
let cap: usize = a_req.rb.capacity().into();
436+
437+
// Add a single pending task to check that the ring-buffer is looping correctly.
438+
a_req
439+
.rb
440+
.push_overwrite(tokio::task::spawn(std::future::pending::<()>()).abort_handle());
441+
442+
count += 1;
443+
444+
for _ in 0..2 {
445+
for h in a_req.rb.iter() {
446+
let first = count % cap;
447+
println!("{count},{first},{}", h.is_finished());
448+
449+
if first == 0 {
450+
assert!(!h.is_finished());
451+
} else {
452+
assert!(h.is_finished());
453+
}
454+
455+
count += 1;
456+
}
457+
}
458+
}
459+
406460
#[tokio::test]
407461
async fn it_should_be_able_to_start_and_stop() {
408462
let cfg = Arc::new(ephemeral_mode_public());

0 commit comments

Comments
 (0)