Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix services bootstraping #629

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ pub async fn start(config: &Configuration, tracker: Arc<core::Tracker>) -> Vec<J

// Start the HTTP blocks
for http_tracker_config in &config.http_trackers {
if !http_tracker_config.enabled {
continue;
}

if let Some(job) = http_tracker::start_job(
http_tracker_config,
tracker.clone(),
Expand Down
20 changes: 14 additions & 6 deletions src/bootstrap/jobs/health_check_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,24 +42,32 @@ pub async fn start_job(config: &HealthCheckApi, register: ServiceRegistry) -> Jo

let (tx_start, rx_start) = oneshot::channel::<Started>();
let (tx_halt, rx_halt) = tokio::sync::oneshot::channel::<Halted>();
drop(tx_halt);

let protocol = "http";

// Run the API server
let join_handle = tokio::spawn(async move {
info!(target: "Health Check API", "Starting on: http://{}", bind_addr);
info!(target: "Health Check API", "Starting on: {protocol}://{}", bind_addr);

let handle = server::start(bind_addr, tx_start, rx_halt, register);

if let Ok(()) = handle.await {
info!(target: "Health Check API", "Stopped server running on: http://{}", bind_addr);
info!(target: "Health Check API", "Stopped server running on: {protocol}://{}", bind_addr);
}
});

// Wait until the API server job is running
// Wait until the server sends the started message
match rx_start.await {
Ok(msg) => info!(target: "Health Check API", "Started on: http://{}", msg.address),
Ok(msg) => info!(target: "Health Check API", "Started on: {protocol}://{}", msg.address),
Err(e) => panic!("the Health Check API server was dropped: {e}"),
}

join_handle
// Wait until the server finishes
tokio::spawn(async move {
assert!(!tx_halt.is_closed(), "Halt channel for Health Check API should be open");

join_handle
.await
.expect("it should be able to join to the Health Check API server task");
})
}
12 changes: 9 additions & 3 deletions src/servers/apis/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use axum_server::tls_rustls::RustlsConfig;
use axum_server::Handle;
use derive_more::Constructor;
use futures::future::BoxFuture;
use log::{error, info};
use log::{debug, error, info};
use tokio::sync::oneshot::{Receiver, Sender};
use torrust_tracker_configuration::AccessTokens;

Expand Down Expand Up @@ -120,7 +120,12 @@ impl ApiServer<Stopped> {
let launcher = self.state.launcher;

let task = tokio::spawn(async move {
launcher.start(tracker, access_tokens, tx_start, rx_halt).await;
debug!(target: "API", "Starting with launcher in spawned task ...");

let _task = launcher.start(tracker, access_tokens, tx_start, rx_halt).await;

debug!(target: "API", "Started with launcher in spawned task");

launcher
});

Expand Down Expand Up @@ -266,9 +271,10 @@ mod tests {
#[tokio::test]
async fn it_should_be_able_to_start_and_stop() {
let cfg = Arc::new(ephemeral_mode_public());
let tracker = initialize_with_configuration(&cfg);
let config = &cfg.http_api;

let tracker = initialize_with_configuration(&cfg);

let bind_to = config
.bind_address
.parse::<std::net::SocketAddr>()
Expand Down
5 changes: 4 additions & 1 deletion src/servers/health_check_api/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use axum::routing::get;
use axum::{Json, Router};
use axum_server::Handle;
use futures::Future;
use log::debug;
use serde_json::json;
use tokio::sync::oneshot::{Receiver, Sender};

Expand Down Expand Up @@ -37,10 +38,12 @@ pub fn start(

let handle = Handle::new();

debug!(target: "Health Check API", "Starting service with graceful shutdown in a spawned task ...");

tokio::task::spawn(graceful_shutdown(
handle.clone(),
rx_halt,
format!("shutting down http server on socket address: {address}"),
format!("Shutting down http server on socket address: {address}"),
));

let running = axum_server::from_tcp(socket)
Expand Down
10 changes: 8 additions & 2 deletions src/servers/registar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::net::SocketAddr;
use std::sync::Arc;

use derive_more::Constructor;
use log::debug;
use tokio::sync::Mutex;
use tokio::task::JoinHandle;

Expand Down Expand Up @@ -81,10 +82,15 @@ impl Registar {

/// Inserts a listing into the registry.
async fn insert(&self, rx: tokio::sync::oneshot::Receiver<ServiceRegistration>) {
let listing = rx.await.expect("it should receive the listing");
debug!("Waiting for the started service to send registration data ...");

let service_registration = rx
.await
.expect("it should receive the service registration from the started service");

let mut mutex = self.registry.lock().await;
mutex.insert(listing.binding, listing);

mutex.insert(service_registration.binding, service_registration);
}

/// Returns the [`ServiceRegistry`] of services
Expand Down
10 changes: 10 additions & 0 deletions tests/servers/health_check_api/environment.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::net::SocketAddr;
use std::sync::Arc;

use log::debug;
use tokio::sync::oneshot::{self, Sender};
use tokio::task::JoinHandle;
use torrust_tracker::bootstrap::jobs::Started;
Expand Down Expand Up @@ -50,13 +51,22 @@ impl Environment<Stopped> {

let register = self.registar.entries();

debug!(target: "Health Check API", "Spawning task to launch the service ...");

let server = tokio::spawn(async move {
debug!(target: "Health Check API", "Starting the server in a spawned task ...");

server::start(self.state.bind_to, tx_start, rx_halt, register)
.await
.expect("it should start the health check service");

debug!(target: "Health Check API", "Server started. Sending the binding {} ...", self.state.bind_to);

self.state.bind_to
});

debug!(target: "Health Check API", "Waiting for spawning task to send the binding ...");

let binding = rx_start.await.expect("it should send service binding").address;

Environment {
Expand Down