Skip to content

Commit 31ab3a9

Browse files
committed
Merge #846: dev: fix udp ring-buffer not looping
9e01f7f dev: fix udp ring-buffer not looping (Cameron Garnham) Pull request description: @da2ce7's previous version would be limited to a single thread, as `push_overwrite` would keep on returning the last element when the ring-buffer was full. Now, the ring-buffer is pre-filled and is looped over with a mutating iterator. New handles are progressively swapped-in when the old entries are finished. Note: I think that this implementation can be replaced with a standard vector with the same effect. Top commit has no ACKs. Tree-SHA512: 362ea9148c7a7c6c990b881fe34087f4f61fdd80aa6aecfe3d11f3da3d0440fa6db38e97bb93fc8e57ed55cc05d229260bf353b5d06c7541b88a710ddbe5bba4
2 parents 218fbbe + 9e01f7f commit 31ab3a9

File tree

1 file changed

+66
-11
lines changed

1 file changed

+66
-11
lines changed

src/servers/udp/server.rs

+66-11
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use std::sync::Arc;
2424
use aquatic_udp_protocol::Response;
2525
use derive_more::Constructor;
2626
use log::{debug, error, info, trace};
27-
use ringbuf::traits::{Consumer, Observer, RingBuffer};
27+
use ringbuf::traits::{Consumer, Observer, Producer};
2828
use ringbuf::StaticRb;
2929
use tokio::net::UdpSocket;
3030
use tokio::sync::oneshot;
@@ -202,11 +202,23 @@ impl Launcher {
202202
}
203203
}
204204

205-
#[derive(Default)]
206205
struct ActiveRequests {
207206
rb: StaticRb<AbortHandle, 50>, // the number of requests we handle at the same time.
208207
}
209208

209+
impl ActiveRequests {
210+
/// Creates a new [`ActiveRequests`] filled with finished tasks.
211+
async fn new() -> Self {
212+
let mut rb = StaticRb::default();
213+
214+
let () = while rb.try_push(tokio::task::spawn_blocking(|| ()).abort_handle()).is_ok() {};
215+
216+
task::yield_now().await;
217+
218+
Self { rb }
219+
}
220+
}
221+
210222
impl std::fmt::Debug for ActiveRequests {
211223
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
212224
let (left, right) = &self.rb.as_slices();
@@ -280,15 +292,22 @@ impl Udp {
280292
let tracker = tracker.clone();
281293
let socket = socket.clone();
282294

283-
let reqs = &mut ActiveRequests::default();
295+
let reqs = &mut ActiveRequests::new().await;
284296

285-
// Main Waiting Loop, awaits on async [`receive_request`].
286297
loop {
287-
if let Some(h) = reqs.rb.push_overwrite(
288-
Self::spawn_request_processor(Self::receive_request(socket.clone()).await, tracker.clone(), socket.clone())
289-
.abort_handle(),
290-
) {
291-
if !h.is_finished() {
298+
task::yield_now().await;
299+
for h in reqs.rb.iter_mut() {
300+
if h.is_finished() {
301+
std::mem::swap(
302+
h,
303+
&mut Self::spawn_request_processor(
304+
Self::receive_request(socket.clone()).await,
305+
tracker.clone(),
306+
socket.clone(),
307+
)
308+
.abort_handle(),
309+
);
310+
} else {
292311
// the task is still running, lets yield and give it a chance to flush.
293312
tokio::task::yield_now().await;
294313

@@ -299,6 +318,9 @@ impl Udp {
299318
tracing::span!(
300319
target: "UDP TRACKER",
301320
tracing::Level::WARN, "request-aborted", server_socket_addr = %server_socket_addr);
321+
322+
// force-break a single thread, then loop again.
323+
break;
302324
}
303325
}
304326
}
@@ -396,13 +418,46 @@ mod tests {
396418
use std::sync::Arc;
397419
use std::time::Duration;
398420

399-
use tokio::time::sleep;
421+
use ringbuf::traits::{Consumer, Observer, RingBuffer};
400422
use torrust_tracker_test_helpers::configuration::ephemeral_mode_public;
401423

424+
use super::ActiveRequests;
402425
use crate::bootstrap::app::initialize_with_configuration;
403426
use crate::servers::registar::Registar;
404427
use crate::servers::udp::server::{Launcher, UdpServer};
405428

429+
#[tokio::test]
430+
async fn it_should_return_to_the_start_of_the_ring_buffer() {
431+
let mut a_req = ActiveRequests::new().await;
432+
433+
tokio::time::sleep(Duration::from_millis(10)).await;
434+
435+
let mut count: usize = 0;
436+
let cap: usize = a_req.rb.capacity().into();
437+
438+
// Add a single pending task to check that the ring-buffer is looping correctly.
439+
a_req
440+
.rb
441+
.push_overwrite(tokio::task::spawn(std::future::pending::<()>()).abort_handle());
442+
443+
count += 1;
444+
445+
for _ in 0..2 {
446+
for h in a_req.rb.iter() {
447+
let first = count % cap;
448+
println!("{count},{first},{}", h.is_finished());
449+
450+
if first == 0 {
451+
assert!(!h.is_finished());
452+
} else {
453+
assert!(h.is_finished());
454+
}
455+
456+
count += 1;
457+
}
458+
}
459+
}
460+
406461
#[tokio::test]
407462
async fn it_should_be_able_to_start_and_stop() {
408463
let cfg = Arc::new(ephemeral_mode_public());
@@ -423,7 +478,7 @@ mod tests {
423478
.expect("it should start the server");
424479
let stopped = started.stop().await.expect("it should stop the server");
425480

426-
sleep(Duration::from_secs(1)).await;
481+
tokio::time::sleep(Duration::from_secs(1)).await;
427482

428483
assert_eq!(stopped.state.launcher.bind_to, bind_to);
429484
}

0 commit comments

Comments
 (0)