Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[1/N] omdb db sagas: list running and inject fault #7732

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
217 changes: 179 additions & 38 deletions dev-tools/omdb/src/bin/omdb/db/saga.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ use crate::Omdb;
use crate::check_allow_destructive::DestructiveOperationToken;
use crate::db::datetime_rfc3339_concise;
use anyhow::Context;
use anyhow::bail;
use async_bb8_diesel::AsyncRunQueryDsl;
use clap::Args;
use clap::Subcommand;
use diesel::prelude::*;
use internal_dns_types::names::ServiceName;
use nexus_db_model::Saga;
use nexus_db_model::SagaNodeEvent;
use nexus_db_queries::context::OpContext;
Expand All @@ -21,9 +23,15 @@ use nexus_db_queries::db::datastore::DataStoreConnection;
use nexus_db_queries::db::datastore::SQL_BATCH_SIZE;
use nexus_db_queries::db::pagination::Paginator;
use nexus_db_queries::db::pagination::paginated;
use std::collections::HashSet;
use std::sync::Arc;
use tabled::Tabled;
use uuid::Uuid;

use steno::ActionError;
use steno::SagaCachedState;
use steno::SagaNodeEventType;

/// `omdb db saga` subcommand
#[derive(Debug, Args, Clone)]
pub struct SagaArgs {
Expand All @@ -36,13 +44,17 @@ enum SagaCommands {
/// List running sagas
Running,

/// Inject an error into a saga's currently running node
Fault(SagaFaultArgs),
/// Inject an error into a saga's currently running node(s)
InjectError(SagaInjectErrorArgs),
}

#[derive(Clone, Debug, Args)]
struct SagaFaultArgs {
struct SagaInjectErrorArgs {
saga_id: Uuid,

/// Skip checking if the SEC is up
#[clap(long, default_value_t = false)]
bypass_sec_check: bool,
}

impl SagaArgs {
Expand All @@ -55,9 +67,10 @@ impl SagaArgs {
match &self.command {
SagaCommands::Running => cmd_sagas_running(opctx, datastore).await,

SagaCommands::Fault(args) => {
SagaCommands::InjectError(args) => {
let token = omdb.check_allow_destructive()?;
cmd_sagas_fault(opctx, datastore, args, token).await
cmd_sagas_inject_error(omdb, opctx, datastore, args, token)
.await
}
}
}
Expand All @@ -69,13 +82,12 @@ async fn cmd_sagas_running(
) -> Result<(), anyhow::Error> {
let conn = datastore.pool_connection_for_tests().await?;

let sagas =
get_all_sagas_in_state(&conn, steno::SagaCachedState::Running).await?;
let sagas = get_all_sagas_in_state(&conn, SagaCachedState::Running).await?;

#[derive(Tabled)]
struct SagaRow {
id: Uuid,
creator_id: Uuid,
current_sec: String,
time_created: String,
name: String,
state: String,
Expand All @@ -85,7 +97,11 @@ async fn cmd_sagas_running(
.into_iter()
.map(|saga: Saga| SagaRow {
id: saga.id.0.into(),
creator_id: saga.creator.0,
current_sec: if let Some(current_sec) = saga.current_sec {
current_sec.0.to_string()
} else {
String::from("-")
},
time_created: datetime_rfc3339_concise(&saga.time_created),
name: saga.name,
state: format!("{:?}", saga.saga_state),
Expand All @@ -101,49 +117,174 @@ async fn cmd_sagas_running(
Ok(())
}

async fn cmd_sagas_fault(
_opctx: &OpContext,
async fn cmd_sagas_inject_error(
omdb: &Omdb,
opctx: &OpContext,
datastore: &DataStore,
args: &SagaFaultArgs,
args: &SagaInjectErrorArgs,
_destruction_token: DestructiveOperationToken,
) -> Result<(), anyhow::Error> {
let conn = datastore.pool_connection_for_tests().await?;

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know it's a bunch more work but I think it's worth adding a safety check here (overrideable, I guess) that tries to contact the Nexus that we think is running this saga and fails if that succeeds.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done in b1108a3

// Find the most recent node for a given saga
let most_recent_node: SagaNodeEvent = {
// Before doing anything: find the current SEC for the saga, and ping it to
// ensure that the Nexus is down.
if !args.bypass_sec_check {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about printing a scary warning here if this is set?

By use request, skipping check of whether the Nexus assigned to this saga is running. If this Nexus is running, the control plane state managed by this saga may become corrupted!

And if it is set:

Attempting to verify that the Nexus assigned to this saga is not running before proceeding.

(or something like that)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see also 6cde273

let saga: Saga = {
use db::schema::saga::dsl;
dsl::saga
.filter(dsl::id.eq(args.saga_id))
.first_async(&*conn)
.await?
};

match saga.current_sec {
None => {
// If there's no current SEC, then we don't need to check if
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Print a warning here? Something like:

warn: saga has no assigned SEC, so cannot verify that the saga is not still running

I believe this case is impossible. I think the schema allowed this for takeover, but I think today a saga always has this set.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see also 6cde273

// it's up. Would we see this if the saga was Requested but not
// started?
}

Some(current_sec) => {
let resolver = omdb.dns_resolver(opctx.log.clone()).await?;
let srv = resolver.lookup_srv(ServiceName::Nexus).await?;

let Some((target, port)) = srv
.iter()
.find(|(name, _)| name.contains(&current_sec.to_string()))
else {
bail!("dns lookup for {current_sec} found nothing");
};

let Some(addr) = resolver.ipv6_lookup(&target).await? else {
bail!("dns lookup for {target} found nothing");
};

let client = nexus_client::Client::new(
&format!("http://[{addr}]:{port}/"),
opctx.log.clone(),
);

match client.ping().await {
Ok(_) => {
bail!("{current_sec} answered a ping");
}

Err(e) => match e {
nexus_client::Error::InvalidRequest(_)
| nexus_client::Error::InvalidUpgrade(_)
| nexus_client::Error::ErrorResponse(_)
| nexus_client::Error::ResponseBodyError(_)
| nexus_client::Error::InvalidResponsePayload(_, _)
| nexus_client::Error::UnexpectedResponse(_)
| nexus_client::Error::PreHookError(_)
| nexus_client::Error::PostHookError(_) => {
bail!("{current_sec} failed a ping with {e}");
}

nexus_client::Error::CommunicationError(_) => {
// Assume communication error means that it could
// not be contacted.
//
// Note: this could be seen if Nexus is up but
// unreachable from where omdb is run!
}
},
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these errors need more context. I'm assuming they might be seen by support engineers on a bad day and we want to be really clear with what's going on.

failed to verify that the Nexus instance running this saga is not currently running: found no DNS record for that Nexus instance

The Nexus instance running this saga appears to be still running. Injecting errors into running sagas is not safe. Please ensure Nexus is stopped before proceeding.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put more descriptive errors in 6cde273, let me know what you think!

}
}
}

// Find all the nodes where there is a started record but not a done record

let started_nodes: Vec<SagaNodeEvent> = {
use db::schema::saga_node_event::dsl;

dsl::saga_node_event
.filter(dsl::saga_id.eq(args.saga_id))
.filter(dsl::event_type.eq(SagaNodeEventType::Started.label()))
.load_async(&*conn)
.await?
};

let complete_nodes: Vec<SagaNodeEvent> = {
use db::schema::saga_node_event::dsl;

// Note the actual enum contents don't matter in both these cases, it
// won't affect the label string
let succeeded_label =
SagaNodeEventType::Succeeded(Arc::new(serde_json::Value::Null))
.label();

let failed_label =
SagaNodeEventType::Failed(ActionError::InjectedError).label();

dsl::saga_node_event
.filter(dsl::saga_id.eq(args.saga_id))
.order(dsl::event_time.desc())
.limit(1)
.first_async(&*conn)
.filter(
dsl::event_type
.eq(succeeded_label)
.or(dsl::event_type.eq(failed_label)),
)
.load_async(&*conn)
.await?
};

// Inject a fault for that node, which will cause the saga to unwind
let action_error = steno::ActionError::action_failed(String::from(
"error injected with omdb",
));

let fault = SagaNodeEvent {
saga_id: most_recent_node.saga_id,
node_id: most_recent_node.node_id,
event_type: steno::SagaNodeEventType::Failed(action_error.clone())
.label()
.to_string(),
data: Some(serde_json::to_value(action_error)?),
event_time: chrono::Utc::now(),
creator: most_recent_node.creator,
let incomplete_nodes: HashSet<u32> = {
let started_node_ids: HashSet<u32> =
started_nodes.iter().map(|node| node.node_id.0.into()).collect();
let complete_node_ids: HashSet<u32> =
complete_nodes.iter().map(|node| node.node_id.0.into()).collect();

started_node_ids.difference(&complete_node_ids).cloned().collect()
};

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we print out exactly what it's doing? Something like:

injecting fault into running node ${node_id}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done in b1108a3

{
use db::schema::saga_node_event::dsl;
let incomplete_nodes: Vec<&SagaNodeEvent> = {
let mut result = vec![];

for node_id in incomplete_nodes {
let Some(node) = started_nodes
.iter()
.find(|node| node.node_id.0 == node_id.into())
else {
bail!("could not find node?");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a panic? When would this ever happen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. 2df69c2 changes this to unwrap (plus a comment explaining why I don't think so haha)

};

diesel::insert_into(dsl::saga_node_event)
.values(fault.clone())
.execute_async(&*conn)
.await?;
result.push(node);
}

result
};

// Inject an error for those nodes, which will cause the saga to unwind
for node in incomplete_nodes {
let action_error = ActionError::action_failed(String::from(
"error injected with omdb",
));

let fault = SagaNodeEvent {
saga_id: node.saga_id,
node_id: node.node_id,
event_type: SagaNodeEventType::Failed(action_error.clone())
.label()
.to_string(),
data: Some(serde_json::to_value(action_error)?),
event_time: chrono::Utc::now(),
creator: crate::OMDB_UUID.into(),
};

eprintln!(
"injecting error for saga {:?} node {:?}",
node.saga_id, node.node_id,
);

{
use db::schema::saga_node_event::dsl;

diesel::insert_into(dsl::saga_node_event)
.values(fault.clone())
.execute_async(&*conn)
.await?;
}
}

Ok(())
Expand All @@ -153,7 +294,7 @@ async fn cmd_sagas_fault(

async fn get_all_sagas_in_state(
conn: &DataStoreConnection,
state: steno::SagaCachedState,
state: SagaCachedState,
) -> Result<Vec<Saga>, anyhow::Error> {
let mut sagas = Vec::new();
let mut paginator = Paginator::new(SQL_BATCH_SIZE);
Expand Down
3 changes: 3 additions & 0 deletions dev-tools/omdb/src/bin/omdb/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use omicron_common::address::Ipv6Subnet;
use std::net::SocketAddr;
use std::net::SocketAddrV6;
use tokio::net::TcpSocket;
use uuid::Uuid;

mod crucible_agent;
mod db;
Expand All @@ -57,6 +58,8 @@ mod oxql;
mod reconfigurator;
mod sled_agent;

const OMDB_UUID: Uuid = Uuid::from_u128(0xAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAu128);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I suggested omdb having a uuid I was only thinking of that specific purpose (a creator for saga node events) so I think this can live in db/saga.rs unless you think it will be more generally useful? Either way, let's document it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done in a6e1d38


#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
let args = Omdb::parse();
Expand Down
6 changes: 3 additions & 3 deletions dev-tools/omdb/tests/usage_errors.out
Original file line number Diff line number Diff line change
Expand Up @@ -386,9 +386,9 @@ Commands for querying and interacting with sagas
Usage: omdb db saga [OPTIONS] <COMMAND>

Commands:
running List running sagas
fault Inject an error into a saga's currently running node
help Print this message or the help of the given subcommand(s)
running List running sagas
inject-error Inject an error into a saga's currently running node(s)
help Print this message or the help of the given subcommand(s)

Options:
--log-level <LOG_LEVEL> log level filter [env: LOG_LEVEL=] [default: warn]
Expand Down
13 changes: 13 additions & 0 deletions internal-dns/resolver/src/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,19 @@ impl Resolver {
})
.flatten()
}

pub async fn ipv6_lookup(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you document this and make sure to add a comment that it's for omdb and that it's generally not the right thing for deployed software? (I'm worried people will reach for this when they should be using one of the ServiceName variants that uses the SRV records.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I put something in 8d2de09

&self,
query: &str,
) -> Result<Option<Ipv6Addr>, ResolveError> {
Ok(self
.resolver
.ipv6_lookup(query)
.await?
.into_iter()
.next()
.map(move |aaaa| aaaa.into()))
}
}

#[cfg(test)]
Expand Down
15 changes: 15 additions & 0 deletions nexus/internal-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ use nexus_types::{
external_api::{
params::{PhysicalDiskPath, SledSelector, UninitializedSledId},
shared::{ProbeInfo, UninitializedSled},
views::Ping,
views::PingStatus,
views::SledPolicy,
},
internal_api::{
Expand Down Expand Up @@ -49,6 +51,19 @@ const RACK_INITIALIZATION_REQUEST_MAX_BYTES: usize = 10 * 1024 * 1024;
pub trait NexusInternalApi {
type Context;

/// Ping API
///
/// Always responds with Ok if it responds at all.
#[endpoint {
method = GET,
path = "/v1/ping",
}]
async fn ping(
_rqctx: RequestContext<Self::Context>,
) -> Result<HttpResponseOk<Ping>, HttpError> {
Ok(HttpResponseOk(Ping { status: PingStatus::Ok }))
}

/// Return information about the given sled agent
#[endpoint {
method = GET,
Expand Down
Loading
Loading