Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

run PHD tests in parallel by default #882

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion .github/buildomat/phd-run-with-args.sh
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ args=(
'--artifact-toml-path' $artifacts
'--tmp-directory' $tmpdir
'--artifact-directory' $artifactdir
'--parallelism' 2
$@
)

Expand All @@ -64,8 +65,20 @@ set +e
failcount=$?
set -e

# Disable errexit again because we may try collecting logs from runners that ran
# no tests (in which case *.log doesn't expand and tar will fail to find a file
# by that literal name)
set +e
tar -czvf /tmp/phd-tmp-files.tar.gz \
-C /tmp/propolis-phd /tmp/propolis-phd/*.log
-C $tmpdir $tmpdir/*.log
for runnerdir in $tmpdir/runner-*; do
if [ -d "$runnerdir" ]; then
tar -rzvf /tmp/phd-tmp-files.tar.gz \
-C $runnerdir $runnerdir/*.log
fi
done
set -e


exitcode=0
if [ $failcount -eq 0 ]; then
Expand Down
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/bhyve-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ impl AsRawFd for VmmCtlFd {
}
}

#[derive(Debug)]
pub enum ReservoirError {
/// Resizing operation was interrupted, but if a non-zero chunk size was
/// specified, one or more chunk-sized adjustments to the reservoir size may
Expand Down
1 change: 1 addition & 0 deletions crates/cpuid-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ bhyve_api.workspace = true
propolis_api_types = {workspace = true, optional = true}
propolis_types.workspace = true
thiserror.workspace = true
uuid.workspace = true

[dev-dependencies]
proptest.workspace = true
Expand Down
4 changes: 3 additions & 1 deletion crates/cpuid-utils/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use bhyve_api::{VmmCtlFd, VmmFd};
use propolis_types::{CpuidIdent, CpuidValues, CpuidVendor};
use thiserror::Error;
use uuid::Uuid;

use crate::{
bits::{
Expand Down Expand Up @@ -32,7 +33,8 @@ struct Vm(bhyve_api::VmmFd);

impl Vm {
fn new() -> Result<Self, GetHostCpuidError> {
let name = format!("cpuid-gen-{}", std::process::id());
let name =
format!("cpuid-gen-{}-{}", std::process::id(), Uuid::new_v4());
let mut req = bhyve_api::vm_create_req::new(name.as_bytes())
.expect("valid VM name");

Expand Down
3 changes: 3 additions & 0 deletions phd-tests/runner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@ doctest = false
[dependencies]
anyhow.workspace = true
backtrace.workspace = true
bhyve_api.workspace = true
camino.workspace = true
clap = { workspace = true, features = ["derive"] }
crossbeam-channel.workspace = true
libc.workspace = true
phd-framework.workspace = true
phd-tests.workspace = true
tokio = { workspace = true, features = ["full"] }
Expand Down
6 changes: 6 additions & 0 deletions phd-tests/runner/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,12 @@ pub struct RunOptions {
#[clap(long, default_value = "file")]
pub server_logging_mode: ServerLogMode,

/// The parallelism with which to run PHD tests. If not provided, phd-runner
/// will guess a reasonable number from the test environment's number of
/// CPUs and available memory.
#[clap(long, value_parser)]
pub parallelism: Option<u16>,

/// The number of CPUs to assign to the guest in tests where the test is
/// using the default machine configuration.
#[clap(long, value_parser, default_value = "2")]
Expand Down
202 changes: 114 additions & 88 deletions phd-tests/runner/src/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};

use phd_tests::phd_testcase::{Framework, TestCase, TestOutcome};
Expand Down Expand Up @@ -37,21 +37,9 @@ pub struct ExecutionStats {
pub failed_test_cases: Vec<&'static TestCase>,
}

#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
enum Status {
Ran(TestOutcome),
NotRun,
}

struct Execution {
tc: &'static TestCase,
status: Status,
}

/// Executes a set of tests using the supplied test context.
pub async fn run_tests_with_ctx(
ctx: &Arc<Framework>,
mut fixtures: TestFixtures,
ctx: &mut Vec<(Arc<Framework>, TestFixtures)>,
run_opts: &RunOptions,
) -> ExecutionStats {
let mut executions = Vec::new();
Expand All @@ -60,10 +48,10 @@ pub async fn run_tests_with_ctx(
&run_opts.include_filter,
&run_opts.exclude_filter,
) {
executions.push(Execution { tc, status: Status::NotRun });
executions.push(tc);
}

let mut stats = ExecutionStats {
let stats = ExecutionStats {
tests_passed: 0,
tests_failed: 0,
tests_skipped: 0,
Expand All @@ -77,90 +65,128 @@ pub async fn run_tests_with_ctx(
return stats;
}

fixtures.execution_setup().unwrap();
let sigint_rx = set_sigint_handler();
info!("Running {} test(s)", executions.len());
let start_time = Instant::now();
for execution in &mut executions {
if *sigint_rx.borrow() {
info!("Test run interrupted by SIGINT");
break;
}
let stats = Arc::new(Mutex::new(stats));

info!("Starting test {}", execution.tc.fully_qualified_name());
async fn run_tests(
execution_rx: crossbeam_channel::Receiver<&'static TestCase>,
test_ctx: Arc<Framework>,
mut fixtures: TestFixtures,
stats: Arc<Mutex<ExecutionStats>>,
sigint_rx: watch::Receiver<bool>,
) -> Result<(), ()> {
fixtures.execution_setup().unwrap();

// Failure to run a setup fixture is fatal to the rest of the run, but
// it's still possible to report results, so return gracefully instead
// of panicking.
if let Err(e) = fixtures.test_setup() {
error!("Error running test setup fixture: {}", e);
break;
}
loop {
// Check for SIGINT only at the top of the loop because while
// waiting for a new testcase is theoretically a blocking
// operation, it won't be in a meaningful way for our use. The
// recv() will return immediately because either there are more
// testcases to run or the sender is closed. The only long
// blocking operation to check against in this loop is the test
// run itself.
if *sigint_rx.borrow() {
info!("Test run interrupted by SIGINT");
break;
}

stats.tests_not_run -= 1;
let test_ctx = ctx.clone();
let tc = execution.tc;
let mut sigint_rx_task = sigint_rx.clone();
let test_outcome = tokio::spawn(async move {
tokio::select! {
// Ensure interrupt signals are always handled instead of
// continuing to run the test.
biased;
result = sigint_rx_task.changed() => {
assert!(
result.is_ok(),
"SIGINT channel shouldn't drop while tests are running"
);

TestOutcome::Failed(
Some("test interrupted by SIGINT".to_string())
)
let tc = match execution_rx.recv() {
Ok(tc) => tc,
Err(_) => {
// RecvError means the channel is closed, so we're all
// done.
break;
}
outcome = tc.run(test_ctx.as_ref()) => outcome
};

info!("Starting test {}", tc.fully_qualified_name());

// Failure to run a setup fixture is fatal to the rest of the
// run, but it's still possible to report results, so return
// gracefully instead of panicking.
if let Err(e) = fixtures.test_setup() {
error!("Error running test setup fixture: {}", e);
// TODO: set this on stats too
break;
}
})
.await
.unwrap_or_else(|_| {
TestOutcome::Failed(Some(
"test task panicked, see test logs".to_string(),
))
});

info!(
"test {} ... {}{}",
execution.tc.fully_qualified_name(),
match test_outcome {
TestOutcome::Passed => "ok",
TestOutcome::Failed(_) => "FAILED: ",
TestOutcome::Skipped(_) => "skipped: ",
},
match &test_outcome {
TestOutcome::Failed(Some(s))
| TestOutcome::Skipped(Some(s)) => s,
TestOutcome::Failed(None) | TestOutcome::Skipped(None) =>
"[no message]",
_ => "",

{
let mut stats = stats.lock().unwrap();
stats.tests_not_run -= 1;
}
);

match test_outcome {
TestOutcome::Passed => stats.tests_passed += 1,
TestOutcome::Failed(_) => {
stats.tests_failed += 1;
stats.failed_test_cases.push(execution.tc);
let test_outcome = tc.run(test_ctx.as_ref()).await;

info!(
"test {} ... {}{}",
tc.fully_qualified_name(),
match test_outcome {
TestOutcome::Passed => "ok",
TestOutcome::Failed(_) => "FAILED: ",
TestOutcome::Skipped(_) => "skipped: ",
},
match &test_outcome {
TestOutcome::Failed(Some(s))
| TestOutcome::Skipped(Some(s)) => s,
TestOutcome::Failed(None) | TestOutcome::Skipped(None) =>
"[no message]",
_ => "",
}
);

{
let mut stats = stats.lock().unwrap();
match test_outcome {
TestOutcome::Passed => stats.tests_passed += 1,
TestOutcome::Failed(_) => {
stats.tests_failed += 1;
stats.failed_test_cases.push(tc);
}
TestOutcome::Skipped(_) => stats.tests_skipped += 1,
}
}
TestOutcome::Skipped(_) => stats.tests_skipped += 1,
}

execution.status = Status::Ran(test_outcome);
if let Err(e) = fixtures.test_cleanup().await {
error!("Error running cleanup fixture: {}", e);
break;
if let Err(e) = fixtures.test_cleanup().await {
error!("Error running cleanup fixture: {}", e);
// TODO: set this on stats
break;
}
}

fixtures.execution_cleanup().unwrap();

Ok(())
}
stats.duration = start_time.elapsed();

fixtures.execution_cleanup().unwrap();
let sigint_rx = set_sigint_handler();
info!("Running {} test(s)", executions.len());
let start_time = Instant::now();

let (execution_tx, execution_rx) =
crossbeam_channel::unbounded::<&'static TestCase>();

let mut test_runners = tokio::task::JoinSet::new();

for (ctx, fixtures) in ctx.drain(..) {
test_runners.spawn(run_tests(
execution_rx.clone(),
ctx,
fixtures,
Arc::clone(&stats),
sigint_rx.clone(),
));
}

for execution in &mut executions {
execution_tx.send(execution).expect("ok");
}
std::mem::drop(execution_tx);

let _ = test_runners.join_all().await;

let mut stats =
Mutex::into_inner(Arc::into_inner(stats).expect("only one ref"))
.expect("lock not panicked");
stats.duration = start_time.elapsed();

stats
}
Expand Down
Loading
Loading