Skip to content

Commit 14edbf3

Browse files
authored
[nexus] Explicitly terminate pools for qorb (#6881)
Fixes #6505 , integrates usage of the new qorb APIs to terminate pools cleanly: oxidecomputer/qorb#45 # Background - [https://github.com/oxidecomputer/qorb](https://github.com/oxidecomputer/qorb) is a connection pooling crate, which spins up tokio task that attempt to connect to backends. - When `qorb` was integrated into Omicron in #5876, I used it to connect to our database backend (CockroachDB). This included usage in tests, even with a "single backend host" (one, test-only CRDB server) -- I wanted to ensure that we used the same pool and connector logic in tests and prod. # What Went Wrong As identified in #6505 , we saw some tests failing during **termination**. The specific cause of the failure was a panic from [async-bb8-diesel](https://github.com/oxidecomputer/async-bb8-diesel), where we attempted to spawn tasks with a terminating tokio executor. This issue stems from async-bb8-diesel's usage of `tokio::task::spawn_blocking`, where the returned `JoinHandle` is immediately awaited and **unwrapped**, with an expectation that "we should always be able to spawn a blocking task". There was a separate discussion about "whether or not async-bb8-diesel should be unwrapping in this case" (see: oxidecomputer/async-bb8-diesel#77), but instead, I chose to focus on the question: ## Why are we trying to send requests to async-bb8-diesel while the tokio runtime is exiting? The answer to this question lies in qorb's internals -- qorb itself spawns many tokio tasks to handle ongoing work, including monitoring DNS resolution, checking on backend health, and making connections before they are requested. One of these qorb tasks calling `ping_async`, which checks connection health, used the `async-bb8-diesel` interface that ultimately panicked. Within qorb most of these tokio tasks have a drop implementation of the form: ```rust struct MyQorbStructure { handle: tokio::task::JoinHandle<()>, } impl Drop for MyQorbStructure { fn drop(&mut self) { self.handle.abort(); } } ``` Tragically, without async drop support in Rust, this is the best we can do implicitly -- signal that the background tasks should stop ASAP -- but that may not be soon enough! Calling `.abort()` on the `JoinHandle` does not terminate the task immediately, it simply signals that it should shut down at the next yield point. As a result, we can still see the flake observed in #6505: - A qorb pool is initialized with background tasks - One of the qorb worker tasks is about to make a call to check on the health of a connection to a backend - The test finishes, and returns. The tokio runtime begins terminating - We call `drop` on the `qorb` pool, which signals the background task should abort - The aforementioned qorb worker task makes the call to `ping_async`, which calls `spawn_blocking`. This fails, because the tokio runtime is terminating, and returns a [JoinError::Cancelled](https://buildomat.eng.oxide.computer/wg/0/details/01J9YQVN7X5EQNXFSEY6XJBH8B/zfviqPz9RoJp3bY4TafbyqXTwbhqdr7w4oupqBtVARR00CXF/01J9YQWAXY36WM0R2VG27QMFRK#S6049). - `async-bb8-diesel` unwraps this `JoinError`, and the test panics. # How do we mitigate this? That's where this PR comes in. Using the new qorb APIs, we don't rely on the synchronous drop methods -- we explicitly call `.terminate().await` functions which do the following: - Use `tokio::sync::oneshot`s as signals to `tokio::tasks` that they should exit - `.await` the `JoinHandle` for those tasks before returning Doing this work explicitly as a part of cleanup ensures that there are not any background tasks attempting to do new work while the tokio runtime is terminating.
1 parent 27a4365 commit 14edbf3

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

70 files changed

+1215
-1115
lines changed

Cargo.lock

+2-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -520,7 +520,7 @@ propolis_api_types = { git = "https://github.com/oxidecomputer/propolis", rev =
520520
propolis-client = { git = "https://github.com/oxidecomputer/propolis", rev = "11371b0f3743f8df5b047dc0edc2699f4bdf3927" }
521521
propolis-mock-server = { git = "https://github.com/oxidecomputer/propolis", rev = "11371b0f3743f8df5b047dc0edc2699f4bdf3927" }
522522
proptest = "1.5.0"
523-
qorb = "0.1.1"
523+
qorb = "0.1.2"
524524
quote = "1.0"
525525
rand = "0.8.5"
526526
rand_core = "0.6.4"

dev-tools/omdb/src/bin/omdb/db.rs

+5-7
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ use crate::check_allow_destructive::DestructiveOperationToken;
1919
use crate::helpers::CONNECTION_OPTIONS_HEADING;
2020
use crate::helpers::DATABASE_OPTIONS_HEADING;
2121
use crate::Omdb;
22-
use anyhow::anyhow;
2322
use anyhow::bail;
2423
use anyhow::Context;
2524
use async_bb8_diesel::AsyncConnection;
@@ -255,10 +254,7 @@ impl DbUrlOptions {
255254
// doesn't match what we expect. So we use `DataStore::new_unchecked()`
256255
// here. We will then check the schema version explicitly and warn the
257256
// user if it doesn't match.
258-
let datastore = Arc::new(
259-
DataStore::new_unchecked(log.clone(), pool)
260-
.map_err(|e| anyhow!(e).context("creating datastore"))?,
261-
);
257+
let datastore = Arc::new(DataStore::new_unchecked(log.clone(), pool));
262258
check_schema_version(&datastore).await;
263259
Ok(datastore)
264260
}
@@ -785,7 +781,7 @@ impl DbArgs {
785781
) -> Result<(), anyhow::Error> {
786782
let datastore = self.db_url_opts.connect(omdb, log).await?;
787783
let opctx = OpContext::for_tests(log.clone(), datastore.clone());
788-
match &self.command {
784+
let res = match &self.command {
789785
DbCommands::Rack(RackArgs { command: RackCommands::List }) => {
790786
cmd_db_rack_list(&opctx, &datastore, &self.fetch_opts).await
791787
}
@@ -1013,7 +1009,9 @@ impl DbArgs {
10131009
DbCommands::Volumes(VolumeArgs {
10141010
command: VolumeCommands::List,
10151011
}) => cmd_db_volume_list(&datastore, &self.fetch_opts).await,
1016-
}
1012+
};
1013+
datastore.terminate().await;
1014+
res
10171015
}
10181016
}
10191017

live-tests/macros/src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ pub fn live_test(_attrs: TokenStream, input: TokenStream) -> TokenStream {
6868
#func_ident_string
6969
).await.expect("setting up LiveTestContext");
7070
#func_ident(&ctx).await;
71-
ctx.cleanup_successful();
71+
ctx.cleanup_successful().await;
7272
}
7373
};
7474
let mut sig = input_func.sig.clone();

live-tests/tests/common/mod.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,10 @@ impl LiveTestContext {
4646

4747
/// Clean up this `LiveTestContext`
4848
///
49-
/// This mainly removes log files created by the test. We do this in this
50-
/// explicit cleanup function rather than on `Drop` because we want the log
51-
/// files preserved on test failure.
52-
pub fn cleanup_successful(self) {
49+
/// This removes log files and cleans up the [`DataStore`], which
50+
/// but be terminated asynchronously.
51+
pub async fn cleanup_successful(self) {
52+
self.datastore.terminate().await;
5353
self.logctx.cleanup_successful();
5454
}
5555

nexus-config/src/nexus_config.rs

+15-2
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,7 @@ pub struct MgdConfig {
270270
#[derive(Clone, Debug, Deserialize, PartialEq)]
271271
struct UnvalidatedTunables {
272272
max_vpc_ipv4_subnet_prefix: u8,
273+
load_timeout: Option<std::time::Duration>,
273274
}
274275

275276
/// Tunable configuration parameters, intended for use in test environments or
@@ -282,6 +283,11 @@ pub struct Tunables {
282283
/// Note that this is the maximum _prefix_ size, which sets the minimum size
283284
/// of the subnet.
284285
pub max_vpc_ipv4_subnet_prefix: u8,
286+
287+
/// How long should we attempt to loop until the schema matches?
288+
///
289+
/// If "None", nexus loops forever during initialization.
290+
pub load_timeout: Option<std::time::Duration>,
285291
}
286292

287293
// Convert from the unvalidated tunables, verifying each parameter as needed.
@@ -292,6 +298,7 @@ impl TryFrom<UnvalidatedTunables> for Tunables {
292298
Tunables::validate_ipv4_prefix(unvalidated.max_vpc_ipv4_subnet_prefix)?;
293299
Ok(Tunables {
294300
max_vpc_ipv4_subnet_prefix: unvalidated.max_vpc_ipv4_subnet_prefix,
301+
load_timeout: unvalidated.load_timeout,
295302
})
296303
}
297304
}
@@ -341,7 +348,10 @@ pub const MAX_VPC_IPV4_SUBNET_PREFIX: u8 = 26;
341348

342349
impl Default for Tunables {
343350
fn default() -> Self {
344-
Tunables { max_vpc_ipv4_subnet_prefix: MAX_VPC_IPV4_SUBNET_PREFIX }
351+
Tunables {
352+
max_vpc_ipv4_subnet_prefix: MAX_VPC_IPV4_SUBNET_PREFIX,
353+
load_timeout: None,
354+
}
345355
}
346356
}
347357

@@ -1003,7 +1013,10 @@ mod test {
10031013
trusted_root: Utf8PathBuf::from("/path/to/root.json"),
10041014
}),
10051015
schema: None,
1006-
tunables: Tunables { max_vpc_ipv4_subnet_prefix: 27 },
1016+
tunables: Tunables {
1017+
max_vpc_ipv4_subnet_prefix: 27,
1018+
load_timeout: None
1019+
},
10071020
dendrite: HashMap::from([(
10081021
SwitchLocation::Switch0,
10091022
DpdConfig {

nexus/db-queries/src/db/collection_attach.rs

+39-56
Original file line numberDiff line numberDiff line change
@@ -577,15 +577,15 @@ where
577577
#[cfg(test)]
578578
mod test {
579579
use super::*;
580-
use crate::db::{self, identity::Resource as IdentityResource};
580+
use crate::db::datastore::pub_test_utils::TestDatabase;
581+
use crate::db::identity::Resource as IdentityResource;
581582
use async_bb8_diesel::{AsyncRunQueryDsl, AsyncSimpleConnection};
582583
use chrono::Utc;
583584
use db_macros::Resource;
584585
use diesel::expression_methods::ExpressionMethods;
585586
use diesel::pg::Pg;
586587
use diesel::QueryDsl;
587588
use diesel::SelectableHelper;
588-
use nexus_test_utils::db::test_setup_database;
589589
use omicron_common::api::external::{IdentityMetadataCreateParams, Name};
590590
use omicron_test_utils::dev;
591591
use uuid::Uuid;
@@ -869,11 +869,9 @@ mod test {
869869
async fn test_attach_missing_collection_fails() {
870870
let logctx =
871871
dev::test_setup_log("test_attach_missing_collection_fails");
872-
let mut db = test_setup_database(&logctx.log).await;
873-
let cfg = db::Config { url: db.pg_config().clone() };
874-
let pool = db::Pool::new_single_host(&logctx.log, &cfg);
875-
876-
let conn = setup_db(&pool).await;
872+
let db = TestDatabase::new_with_pool(&logctx.log).await;
873+
let pool = db.pool();
874+
let conn = setup_db(pool).await;
877875

878876
let collection_id = uuid::Uuid::new_v4();
879877
let resource_id = uuid::Uuid::new_v4();
@@ -891,16 +889,15 @@ mod test {
891889

892890
assert!(matches!(attach, Err(AttachError::CollectionNotFound)));
893891

894-
db.cleanup().await.unwrap();
892+
db.terminate().await;
895893
logctx.cleanup_successful();
896894
}
897895

898896
#[tokio::test]
899897
async fn test_attach_missing_resource_fails() {
900898
let logctx = dev::test_setup_log("test_attach_missing_resource_fails");
901-
let mut db = test_setup_database(&logctx.log).await;
902-
let cfg = db::Config { url: db.pg_config().clone() };
903-
let pool = db::Pool::new_single_host(&logctx.log, &cfg);
899+
let db = TestDatabase::new_with_pool(&logctx.log).await;
900+
let pool = db.pool();
904901

905902
let conn = setup_db(&pool).await;
906903

@@ -928,16 +925,15 @@ mod test {
928925
// The collection should remain unchanged.
929926
assert_eq!(collection, get_collection(collection_id, &conn).await);
930927

931-
db.cleanup().await.unwrap();
928+
db.terminate().await;
932929
logctx.cleanup_successful();
933930
}
934931

935932
#[tokio::test]
936933
async fn test_attach_once() {
937934
let logctx = dev::test_setup_log("test_attach_once");
938-
let mut db = test_setup_database(&logctx.log).await;
939-
let cfg = db::Config { url: db.pg_config().clone() };
940-
let pool = db::Pool::new_single_host(&logctx.log, &cfg);
935+
let db = TestDatabase::new_with_pool(&logctx.log).await;
936+
let pool = db.pool();
941937

942938
let conn = setup_db(&pool).await;
943939

@@ -976,16 +972,15 @@ mod test {
976972
);
977973
assert_eq!(returned_resource, get_resource(resource_id, &conn).await);
978974

979-
db.cleanup().await.unwrap();
975+
db.terminate().await;
980976
logctx.cleanup_successful();
981977
}
982978

983979
#[tokio::test]
984980
async fn test_attach_once_synchronous() {
985981
let logctx = dev::test_setup_log("test_attach_once_synchronous");
986-
let mut db = test_setup_database(&logctx.log).await;
987-
let cfg = db::Config { url: db.pg_config().clone() };
988-
let pool = db::Pool::new_single_host(&logctx.log, &cfg);
982+
let db = TestDatabase::new_with_pool(&logctx.log).await;
983+
let pool = db.pool();
989984

990985
let conn = setup_db(&pool).await;
991986

@@ -1025,18 +1020,16 @@ mod test {
10251020
);
10261021
assert_eq!(returned_resource, get_resource(resource_id, &conn).await);
10271022

1028-
db.cleanup().await.unwrap();
1023+
db.terminate().await;
10291024
logctx.cleanup_successful();
10301025
}
10311026

10321027
#[tokio::test]
10331028
async fn test_attach_multiple_times() {
10341029
let logctx = dev::test_setup_log("test_attach_multiple_times");
1035-
let mut db = test_setup_database(&logctx.log).await;
1036-
let cfg = db::Config { url: db.pg_config().clone() };
1037-
let pool = db::Pool::new_single_host(&logctx.log, &cfg);
1038-
1039-
let conn = setup_db(&pool).await;
1030+
let db = TestDatabase::new_with_pool(&logctx.log).await;
1031+
let pool = db.pool();
1032+
let conn = setup_db(pool).await;
10401033

10411034
const RESOURCE_COUNT: u32 = 5;
10421035

@@ -1081,18 +1074,16 @@ mod test {
10811074
);
10821075
}
10831076

1084-
db.cleanup().await.unwrap();
1077+
db.terminate().await;
10851078
logctx.cleanup_successful();
10861079
}
10871080

10881081
#[tokio::test]
10891082
async fn test_attach_beyond_capacity_fails() {
10901083
let logctx = dev::test_setup_log("test_attach_beyond_capacity_fails");
1091-
let mut db = test_setup_database(&logctx.log).await;
1092-
let cfg = db::Config { url: db.pg_config().clone() };
1093-
let pool = db::Pool::new_single_host(&logctx.log, &cfg);
1094-
1095-
let conn = setup_db(&pool).await;
1084+
let db = TestDatabase::new_with_pool(&logctx.log).await;
1085+
let pool = db.pool();
1086+
let conn = setup_db(pool).await;
10961087

10971088
let collection_id = uuid::Uuid::new_v4();
10981089

@@ -1145,18 +1136,16 @@ mod test {
11451136
_ => panic!("Unexpected error: {:?}", err),
11461137
};
11471138

1148-
db.cleanup().await.unwrap();
1139+
db.terminate().await;
11491140
logctx.cleanup_successful();
11501141
}
11511142

11521143
#[tokio::test]
11531144
async fn test_attach_while_already_attached() {
11541145
let logctx = dev::test_setup_log("test_attach_while_already_attached");
1155-
let mut db = test_setup_database(&logctx.log).await;
1156-
let cfg = db::Config { url: db.pg_config().clone() };
1157-
let pool = db::Pool::new_single_host(&logctx.log, &cfg);
1158-
1159-
let conn = setup_db(&pool).await;
1146+
let db = TestDatabase::new_with_pool(&logctx.log).await;
1147+
let pool = db.pool();
1148+
let conn = setup_db(pool).await;
11601149

11611150
let collection_id = uuid::Uuid::new_v4();
11621151

@@ -1252,18 +1241,16 @@ mod test {
12521241
_ => panic!("Unexpected error: {:?}", err),
12531242
};
12541243

1255-
db.cleanup().await.unwrap();
1244+
db.terminate().await;
12561245
logctx.cleanup_successful();
12571246
}
12581247

12591248
#[tokio::test]
12601249
async fn test_attach_with_filters() {
12611250
let logctx = dev::test_setup_log("test_attach_once");
1262-
let mut db = test_setup_database(&logctx.log).await;
1263-
let cfg = db::Config { url: db.pg_config().clone() };
1264-
let pool = db::Pool::new_single_host(&logctx.log, &cfg);
1265-
1266-
let conn = setup_db(&pool).await;
1251+
let db = TestDatabase::new_with_pool(&logctx.log).await;
1252+
let pool = db.pool();
1253+
let conn = setup_db(pool).await;
12671254

12681255
let collection_id = uuid::Uuid::new_v4();
12691256
let resource_id = uuid::Uuid::new_v4();
@@ -1307,18 +1294,16 @@ mod test {
13071294
assert_eq!(returned_resource, get_resource(resource_id, &conn).await);
13081295
assert_eq!(returned_resource.description(), "new description");
13091296

1310-
db.cleanup().await.unwrap();
1297+
db.terminate().await;
13111298
logctx.cleanup_successful();
13121299
}
13131300

13141301
#[tokio::test]
13151302
async fn test_attach_deleted_resource_fails() {
13161303
let logctx = dev::test_setup_log("test_attach_deleted_resource_fails");
1317-
let mut db = test_setup_database(&logctx.log).await;
1318-
let cfg = db::Config { url: db.pg_config().clone() };
1319-
let pool = db::Pool::new_single_host(&logctx.log, &cfg);
1320-
1321-
let conn = setup_db(&pool).await;
1304+
let db = TestDatabase::new_with_pool(&logctx.log).await;
1305+
let pool = db.pool();
1306+
let conn = setup_db(pool).await;
13221307

13231308
let collection_id = uuid::Uuid::new_v4();
13241309
let resource_id = uuid::Uuid::new_v4();
@@ -1352,18 +1337,16 @@ mod test {
13521337
.await;
13531338
assert!(matches!(attach, Err(AttachError::ResourceNotFound)));
13541339

1355-
db.cleanup().await.unwrap();
1340+
db.terminate().await;
13561341
logctx.cleanup_successful();
13571342
}
13581343

13591344
#[tokio::test]
13601345
async fn test_attach_without_update_filter() {
13611346
let logctx = dev::test_setup_log("test_attach_without_update_filter");
1362-
let mut db = test_setup_database(&logctx.log).await;
1363-
let cfg = db::Config { url: db.pg_config().clone() };
1364-
let pool = db::Pool::new_single_host(&logctx.log, &cfg);
1365-
1366-
let conn = setup_db(&pool).await;
1347+
let db = TestDatabase::new_with_pool(&logctx.log).await;
1348+
let pool = db.pool();
1349+
let conn = setup_db(pool).await;
13671350

13681351
let collection_id = uuid::Uuid::new_v4();
13691352

@@ -1408,7 +1391,7 @@ mod test {
14081391
.collection_id
14091392
.is_none());
14101393

1411-
db.cleanup().await.unwrap();
1394+
db.terminate().await;
14121395
logctx.cleanup_successful();
14131396
}
14141397
}

0 commit comments

Comments
 (0)