Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[1/N] omdb db sagas: list running and inject fault #7732

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions dev-tools/omdb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ serde_json.workspace = true
sled-agent-client.workspace = true
slog.workspace = true
slog-error-chain.workspace = true
steno.workspace = true
strum.workspace = true
supports-color.workspace = true
tabled.workspace = true
Expand Down
7 changes: 7 additions & 0 deletions dev-tools/omdb/src/bin/omdb/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ use strum::IntoEnumIterator;
use tabled::Tabled;
use uuid::Uuid;

mod saga;

const NO_ACTIVE_PROPOLIS_MSG: &str = "<no active Propolis>";
const NOT_ON_SLED_MSG: &str = "<not on any sled>";

Expand Down Expand Up @@ -335,6 +337,8 @@ enum DbCommands {
/// Query for information about region snapshot replacements, optionally
/// manually triggering one.
RegionSnapshotReplacement(RegionSnapshotReplacementArgs),
/// Commands for querying and interacting with sagas
Saga(saga::SagaArgs),
/// Print information about sleds
Sleds(SledsArgs),
/// Print information about customer instances.
Expand Down Expand Up @@ -1023,6 +1027,9 @@ impl DbArgs {
)
.await
}
DbCommands::Saga(args) => {
args.exec(&omdb, &opctx, &datastore).await
}
DbCommands::Sleds(args) => {
cmd_db_sleds(&opctx, &datastore, &fetch_opts, args).await
}
Expand Down
323 changes: 323 additions & 0 deletions dev-tools/omdb/src/bin/omdb/db/saga.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,323 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

//! `omdb db saga` subcommands

use crate::Omdb;
use crate::check_allow_destructive::DestructiveOperationToken;
use crate::db::datetime_rfc3339_concise;
use anyhow::Context;
use anyhow::bail;
use async_bb8_diesel::AsyncRunQueryDsl;
use clap::Args;
use clap::Subcommand;
use diesel::prelude::*;
use internal_dns_types::names::ServiceName;
use nexus_db_model::Saga;
use nexus_db_model::SagaNodeEvent;
use nexus_db_queries::context::OpContext;
use nexus_db_queries::db;
use nexus_db_queries::db::DataStore;
use nexus_db_queries::db::datastore::DataStoreConnection;
use nexus_db_queries::db::datastore::SQL_BATCH_SIZE;
use nexus_db_queries::db::pagination::Paginator;
use nexus_db_queries::db::pagination::paginated;
use std::collections::HashSet;
use std::sync::Arc;
use tabled::Tabled;
use uuid::Uuid;

use steno::ActionError;
use steno::SagaCachedState;
use steno::SagaNodeEventType;

/// `omdb db saga` subcommand
#[derive(Debug, Args, Clone)]
pub struct SagaArgs {
#[command(subcommand)]
command: SagaCommands,
}

#[derive(Debug, Subcommand, Clone)]
enum SagaCommands {
/// List running sagas
Running,

/// Inject an error into a saga's currently running node(s)
InjectError(SagaInjectErrorArgs),
}

#[derive(Clone, Debug, Args)]
struct SagaInjectErrorArgs {
saga_id: Uuid,

/// Skip checking if the SEC is up
#[clap(long, default_value_t = false)]
bypass_sec_check: bool,
}

impl SagaArgs {
pub async fn exec(
&self,
omdb: &Omdb,
opctx: &OpContext,
datastore: &DataStore,
) -> Result<(), anyhow::Error> {
match &self.command {
SagaCommands::Running => cmd_sagas_running(opctx, datastore).await,

SagaCommands::InjectError(args) => {
let token = omdb.check_allow_destructive()?;
cmd_sagas_inject_error(omdb, opctx, datastore, args, token)
.await
}
}
}
}

async fn cmd_sagas_running(
_opctx: &OpContext,
datastore: &DataStore,
) -> Result<(), anyhow::Error> {
let conn = datastore.pool_connection_for_tests().await?;

let sagas = get_all_sagas_in_state(&conn, SagaCachedState::Running).await?;

#[derive(Tabled)]
struct SagaRow {
id: Uuid,
current_sec: String,
time_created: String,
name: String,
state: String,
}

let rows: Vec<_> = sagas
.into_iter()
.map(|saga: Saga| SagaRow {
id: saga.id.0.into(),
current_sec: if let Some(current_sec) = saga.current_sec {
current_sec.0.to_string()
} else {
String::from("-")
},
time_created: datetime_rfc3339_concise(&saga.time_created),
name: saga.name,
state: format!("{:?}", saga.saga_state),
})
.collect();

let table = tabled::Table::new(rows)
.with(tabled::settings::Style::psql())
.to_string();

println!("{}", table);

Ok(())
}

async fn cmd_sagas_inject_error(
omdb: &Omdb,
opctx: &OpContext,
datastore: &DataStore,
args: &SagaInjectErrorArgs,
_destruction_token: DestructiveOperationToken,
) -> Result<(), anyhow::Error> {
let conn = datastore.pool_connection_for_tests().await?;

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know it's a bunch more work but I think it's worth adding a safety check here (overrideable, I guess) that tries to contact the Nexus that we think is running this saga and fails if that succeeds.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done in b1108a3

// Before doing anything: find the current SEC for the saga, and ping it to
// ensure that the Nexus is down.
if !args.bypass_sec_check {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about printing a scary warning here if this is set?

By use request, skipping check of whether the Nexus assigned to this saga is running. If this Nexus is running, the control plane state managed by this saga may become corrupted!

And if it is set:

Attempting to verify that the Nexus assigned to this saga is not running before proceeding.

(or something like that)

let saga: Saga = {
use db::schema::saga::dsl;
dsl::saga
.filter(dsl::id.eq(args.saga_id))
.first_async(&*conn)
.await?
};

match saga.current_sec {
None => {
// If there's no current SEC, then we don't need to check if
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Print a warning here? Something like:

warn: saga has no assigned SEC, so cannot verify that the saga is not still running

I believe this case is impossible. I think the schema allowed this for takeover, but I think today a saga always has this set.

// it's up. Would we see this if the saga was Requested but not
// started?
}

Some(current_sec) => {
let resolver = omdb.dns_resolver(opctx.log.clone()).await?;
let srv = resolver.lookup_srv(ServiceName::Nexus).await?;

let Some((target, port)) = srv
.iter()
.find(|(name, _)| name.contains(&current_sec.to_string()))
else {
bail!("dns lookup for {current_sec} found nothing");
};

let Some(addr) = resolver.ipv6_lookup(&target).await? else {
bail!("dns lookup for {target} found nothing");
};

let client = nexus_client::Client::new(
&format!("http://[{addr}]:{port}/"),
opctx.log.clone(),
);

match client.ping().await {
Ok(_) => {
bail!("{current_sec} answered a ping");
}

Err(e) => match e {
nexus_client::Error::InvalidRequest(_)
| nexus_client::Error::InvalidUpgrade(_)
| nexus_client::Error::ErrorResponse(_)
| nexus_client::Error::ResponseBodyError(_)
| nexus_client::Error::InvalidResponsePayload(_, _)
| nexus_client::Error::UnexpectedResponse(_)
| nexus_client::Error::PreHookError(_)
| nexus_client::Error::PostHookError(_) => {
bail!("{current_sec} failed a ping with {e}");
}

nexus_client::Error::CommunicationError(_) => {
// Assume communication error means that it could
// not be contacted.
//
// Note: this could be seen if Nexus is up but
// unreachable from where omdb is run!
}
},
}
Comment on lines +158 to +192
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these errors need more context. I'm assuming they might be seen by support engineers on a bad day and we want to be really clear with what's going on.

failed to verify that the Nexus instance running this saga is not currently running: found no DNS record for that Nexus instance

The Nexus instance running this saga appears to be still running. Injecting errors into running sagas is not safe. Please ensure Nexus is stopped before proceeding.

}
}
}

// Find all the nodes where there is a started record but not a done record

let started_nodes: Vec<SagaNodeEvent> = {
use db::schema::saga_node_event::dsl;

dsl::saga_node_event
.filter(dsl::saga_id.eq(args.saga_id))
.filter(dsl::event_type.eq(SagaNodeEventType::Started.label()))
.load_async(&*conn)
.await?
};

let complete_nodes: Vec<SagaNodeEvent> = {
use db::schema::saga_node_event::dsl;

// Note the actual enum contents don't matter in both these cases, it
// won't affect the label string
let succeeded_label =
SagaNodeEventType::Succeeded(Arc::new(serde_json::Value::Null))
.label();

let failed_label =
SagaNodeEventType::Failed(ActionError::InjectedError).label();

dsl::saga_node_event
.filter(dsl::saga_id.eq(args.saga_id))
.filter(
dsl::event_type
.eq(succeeded_label)
.or(dsl::event_type.eq(failed_label)),
)
.load_async(&*conn)
.await?
};

let incomplete_nodes: HashSet<u32> = {
let started_node_ids: HashSet<u32> =
started_nodes.iter().map(|node| node.node_id.0.into()).collect();
let complete_node_ids: HashSet<u32> =
complete_nodes.iter().map(|node| node.node_id.0.into()).collect();

started_node_ids.difference(&complete_node_ids).cloned().collect()
};

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we print out exactly what it's doing? Something like:

injecting fault into running node ${node_id}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done in b1108a3

let incomplete_nodes: Vec<&SagaNodeEvent> = {
let mut result = vec![];

for node_id in incomplete_nodes {
let Some(node) = started_nodes
.iter()
.find(|node| node.node_id.0 == node_id.into())
else {
bail!("could not find node?");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a panic? When would this ever happen?

};

result.push(node);
}

result
};

// Inject an error for those nodes, which will cause the saga to unwind
for node in incomplete_nodes {
let action_error = ActionError::action_failed(String::from(
"error injected with omdb",
));

let fault = SagaNodeEvent {
saga_id: node.saga_id,
node_id: node.node_id,
event_type: SagaNodeEventType::Failed(action_error.clone())
.label()
.to_string(),
data: Some(serde_json::to_value(action_error)?),
event_time: chrono::Utc::now(),
creator: crate::OMDB_UUID.into(),
};

eprintln!(
"injecting error for saga {:?} node {:?}",
node.saga_id, node.node_id,
);

{
use db::schema::saga_node_event::dsl;

diesel::insert_into(dsl::saga_node_event)
.values(fault.clone())
.execute_async(&*conn)
.await?;
}
}

Ok(())
}

// helper functions

async fn get_all_sagas_in_state(
conn: &DataStoreConnection,
state: SagaCachedState,
) -> Result<Vec<Saga>, anyhow::Error> {
let mut sagas = Vec::new();
let mut paginator = Paginator::new(SQL_BATCH_SIZE);
while let Some(p) = paginator.next() {
use db::schema::saga::dsl;
let records_batch =
paginated(dsl::saga, dsl::id, &p.current_pagparams())
.filter(
dsl::saga_state.eq(nexus_db_model::SagaCachedState(state)),
)
.select(Saga::as_select())
.load_async(&**conn)
.await
.context("fetching sagas")?;

paginator = p.found_batch(&records_batch, &|s: &Saga| s.id);

sagas.extend(records_batch);
}

// Sort them by creation time (equivalently: how long they've been running)
sagas.sort_by_key(|s| s.time_created);
sagas.reverse();

Ok(sagas)
}
3 changes: 3 additions & 0 deletions dev-tools/omdb/src/bin/omdb/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use omicron_common::address::Ipv6Subnet;
use std::net::SocketAddr;
use std::net::SocketAddrV6;
use tokio::net::TcpSocket;
use uuid::Uuid;

mod crucible_agent;
mod db;
Expand All @@ -57,6 +58,8 @@ mod oxql;
mod reconfigurator;
mod sled_agent;

const OMDB_UUID: Uuid = Uuid::from_u128(0xAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAu128);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I suggested omdb having a uuid I was only thinking of that specific purpose (a creator for saga node events) so I think this can live in db/saga.rs unless you think it will be more generally useful? Either way, let's document it.


#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
let args = Omdb::parse();
Expand Down
1 change: 1 addition & 0 deletions dev-tools/omdb/tests/test_all_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ async fn test_omdb_usage_errors() {
&["db", "dns", "diff"],
&["db", "dns", "names"],
&["db", "sleds", "--help"],
&["db", "saga"],
&["db", "snapshots"],
&["db", "network"],
&["mgs"],
Expand Down
Loading
Loading