forked from torrust/torrust-tracker
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmod.rs
169 lines (137 loc) · 4.99 KB
/
mod.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
//! Module to handle the UDP server instances.
use std::fmt::Debug;
use super::RawRequest;
pub mod bound_socket;
pub mod launcher;
pub mod processor;
pub mod receiver;
pub mod request_buffer;
pub mod spawner;
pub mod states;
/// Error that can occur when starting or stopping the UDP server.
///
/// Some errors triggered while starting the server are:
///
/// - The server cannot bind to the given address.
/// - It cannot get the bound address.
///
/// Some errors triggered while stopping the server are:
///
/// - The [`Server`] cannot send the shutdown signal to the spawned UDP service thread.
#[derive(Debug)]
pub enum UdpError {
/// Any kind of error starting or stopping the server.
Socket(std::io::Error),
Error(String),
}
/// A UDP server.
///
/// It's an state machine. Configurations cannot be changed. This struct
/// represents concrete configuration and state. It allows to start and stop the
/// server but always keeping the same configuration.
///
/// > **NOTICE**: if the configurations changes after running the server it will
/// > reset to the initial value after stopping the server. This struct is not
/// > intended to persist configurations between runs.
#[allow(clippy::module_name_repetitions)]
pub struct Server<S> {
/// The state of the server: `running` or `stopped`.
pub state: S,
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::time::Duration;
use torrust_tracker_test_helpers::configuration::ephemeral_public;
use super::spawner::Spawner;
use super::Server;
use crate::bootstrap::app::initialize_with_configuration;
use crate::servers::registar::Registar;
#[tokio::test]
async fn it_should_be_able_to_start_and_stop() {
let cfg = Arc::new(ephemeral_public());
let tracker = initialize_with_configuration(&cfg);
let udp_trackers = cfg.udp_trackers.clone().expect("missing UDP trackers configuration");
let config = &udp_trackers[0];
let bind_to = config.bind_address;
let register = &Registar::default();
let stopped = Server::new(Spawner::new(bind_to));
let started = stopped
.start(tracker, register.give_form())
.await
.expect("it should start the server");
let stopped = started.stop().await.expect("it should stop the server");
tokio::time::sleep(Duration::from_secs(1)).await;
assert_eq!(stopped.state.spawner.bind_to, bind_to);
}
#[tokio::test]
async fn it_should_be_able_to_start_and_stop_with_wait() {
let cfg = Arc::new(ephemeral_public());
let tracker = initialize_with_configuration(&cfg);
let config = &cfg.udp_trackers.as_ref().unwrap().first().unwrap();
let bind_to = config.bind_address;
let register = &Registar::default();
let stopped = Server::new(Spawner::new(bind_to));
let started = stopped
.start(tracker, register.give_form())
.await
.expect("it should start the server");
tokio::time::sleep(Duration::from_secs(1)).await;
let stopped = started.stop().await.expect("it should stop the server");
tokio::time::sleep(Duration::from_secs(1)).await;
assert_eq!(stopped.state.spawner.bind_to, bind_to);
}
}
/// Todo: submit test to tokio documentation.
#[cfg(test)]
mod test_tokio {
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Barrier;
use tokio::task::JoinSet;
#[tokio::test]
async fn test_barrier_with_aborted_tasks() {
// Create a barrier that requires 10 tasks to proceed.
let barrier = Arc::new(Barrier::new(10));
let mut tasks = JoinSet::default();
let mut handles = Vec::default();
// Set Barrier to 9/10.
for _ in 0..9 {
let c = barrier.clone();
handles.push(tasks.spawn(async move {
c.wait().await;
}));
}
// Abort two tasks: Barrier: 7/10.
for _ in 0..2 {
if let Some(handle) = handles.pop() {
handle.abort();
}
}
// Spawn a single task: Barrier 8/10.
let c = barrier.clone();
handles.push(tasks.spawn(async move {
c.wait().await;
}));
// give a chance fro the barrier to release.
tokio::time::sleep(Duration::from_millis(50)).await;
// assert that the barrier isn't removed, i.e. 8, not 10.
for h in &handles {
assert!(!h.is_finished());
}
// Spawn two more tasks to trigger the barrier release: Barrier 10/10.
for _ in 0..2 {
let c = barrier.clone();
handles.push(tasks.spawn(async move {
c.wait().await;
}));
}
// give a chance fro the barrier to release.
tokio::time::sleep(Duration::from_millis(50)).await;
// assert that the barrier has been triggered
for h in &handles {
assert!(h.is_finished());
}
tasks.shutdown().await;
}
}