From 167f3c6066893337e91846079a113ad02e22489a Mon Sep 17 00:00:00 2001 From: James MacMahon Date: Wed, 5 Mar 2025 17:29:20 +0000 Subject: [PATCH 1/8] [1/N] omdb db sagas: list running and inject fault Breaking apart #4378 and copying the structure of #7695, add `omdb db saga` as a command and implement the following sub-commands: Usage: omdb db saga [OPTIONS] Commands: running List running sagas fault Inject an error into a saga's currently running node This addresses part of the minimum amount required during a release deployment: 1. after quiescing (#6804), omdb can query if there are any running sagas. 2. if those running sagas are stuck in a loop and cannot be drained (#7623), and the release contains a change to the DAG that causes Nexus to panic after an upgrade (#7730), then omdb can inject a fault into the database that would cause that saga to unwind when the affected Nexus is restarted Note for 2, unwinding a saga that is stuck in this way may not be valid if there were significant changes between releases. --- Cargo.lock | 1 + dev-tools/omdb/Cargo.toml | 1 + dev-tools/omdb/src/bin/omdb/db.rs | 7 + dev-tools/omdb/src/bin/omdb/db/saga.rs | 182 ++++++++++++++++++++++++ dev-tools/omdb/tests/test_all_output.rs | 1 + dev-tools/omdb/tests/usage_errors.out | 35 +++++ 6 files changed, 227 insertions(+) create mode 100644 dev-tools/omdb/src/bin/omdb/db/saga.rs diff --git a/Cargo.lock b/Cargo.lock index 679088b7693..55f3600bb94 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7170,6 +7170,7 @@ dependencies = [ "sled-agent-client", "slog", "slog-error-chain", + "steno", "strum", "subprocess", "supports-color", diff --git a/dev-tools/omdb/Cargo.toml b/dev-tools/omdb/Cargo.toml index 159be9d288a..1809da19142 100644 --- a/dev-tools/omdb/Cargo.toml +++ b/dev-tools/omdb/Cargo.toml @@ -52,6 +52,7 @@ serde_json.workspace = true sled-agent-client.workspace = true slog.workspace = true slog-error-chain.workspace = true +steno.workspace = true strum.workspace = true supports-color.workspace = true tabled.workspace = true diff --git a/dev-tools/omdb/src/bin/omdb/db.rs b/dev-tools/omdb/src/bin/omdb/db.rs index 6ffc8aec1bf..eb6c0901660 100644 --- a/dev-tools/omdb/src/bin/omdb/db.rs +++ b/dev-tools/omdb/src/bin/omdb/db.rs @@ -151,6 +151,8 @@ use strum::IntoEnumIterator; use tabled::Tabled; use uuid::Uuid; +mod saga; + const NO_ACTIVE_PROPOLIS_MSG: &str = ""; const NOT_ON_SLED_MSG: &str = ""; @@ -335,6 +337,8 @@ enum DbCommands { /// Query for information about region snapshot replacements, optionally /// manually triggering one. RegionSnapshotReplacement(RegionSnapshotReplacementArgs), + /// Commands for querying and interacting with sagas + Saga(saga::SagaArgs), /// Print information about sleds Sleds(SledsArgs), /// Print information about customer instances. @@ -1023,6 +1027,9 @@ impl DbArgs { ) .await } + DbCommands::Saga(args) => { + args.exec(&omdb, &opctx, &datastore).await + } DbCommands::Sleds(args) => { cmd_db_sleds(&opctx, &datastore, &fetch_opts, args).await } diff --git a/dev-tools/omdb/src/bin/omdb/db/saga.rs b/dev-tools/omdb/src/bin/omdb/db/saga.rs new file mode 100644 index 00000000000..dd21bcdfa90 --- /dev/null +++ b/dev-tools/omdb/src/bin/omdb/db/saga.rs @@ -0,0 +1,182 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! `omdb db saga` subcommands + +use crate::Omdb; +use crate::check_allow_destructive::DestructiveOperationToken; +use crate::db::datetime_rfc3339_concise; +use anyhow::Context; +use async_bb8_diesel::AsyncRunQueryDsl; +use clap::Args; +use clap::Subcommand; +use diesel::prelude::*; +use nexus_db_model::Saga; +use nexus_db_model::SagaNodeEvent; +use nexus_db_queries::context::OpContext; +use nexus_db_queries::db; +use nexus_db_queries::db::DataStore; +use nexus_db_queries::db::datastore::DataStoreConnection; +use nexus_db_queries::db::datastore::SQL_BATCH_SIZE; +use nexus_db_queries::db::pagination::Paginator; +use nexus_db_queries::db::pagination::paginated; +use tabled::Tabled; +use uuid::Uuid; + +/// `omdb db saga` subcommand +#[derive(Debug, Args, Clone)] +pub struct SagaArgs { + #[command(subcommand)] + command: SagaCommands, +} + +#[derive(Debug, Subcommand, Clone)] +enum SagaCommands { + /// List running sagas + Running, + + /// Inject an error into a saga's currently running node + Fault(SagaFaultArgs), +} + +#[derive(Clone, Debug, Args)] +struct SagaFaultArgs { + saga_id: Uuid, +} + +impl SagaArgs { + pub async fn exec( + &self, + omdb: &Omdb, + opctx: &OpContext, + datastore: &DataStore, + ) -> Result<(), anyhow::Error> { + match &self.command { + SagaCommands::Running => cmd_sagas_running(opctx, datastore).await, + + SagaCommands::Fault(args) => { + let token = omdb.check_allow_destructive()?; + cmd_sagas_fault(opctx, datastore, args, token).await + } + } + } +} + +async fn cmd_sagas_running( + _opctx: &OpContext, + datastore: &DataStore, +) -> Result<(), anyhow::Error> { + let conn = datastore.pool_connection_for_tests().await?; + + let sagas = + get_all_sagas_in_state(&conn, steno::SagaCachedState::Running).await?; + + #[derive(Tabled)] + struct SagaRow { + id: Uuid, + creator_id: Uuid, + time_created: String, + name: String, + state: String, + } + + let rows: Vec<_> = sagas + .into_iter() + .map(|saga: Saga| SagaRow { + id: saga.id.0.into(), + creator_id: saga.creator.0, + time_created: datetime_rfc3339_concise(&saga.time_created), + name: saga.name, + state: format!("{:?}", saga.saga_state), + }) + .collect(); + + let table = tabled::Table::new(rows) + .with(tabled::settings::Style::psql()) + .to_string(); + + println!("{}", table); + + Ok(()) +} + +async fn cmd_sagas_fault( + _opctx: &OpContext, + datastore: &DataStore, + args: &SagaFaultArgs, + _destruction_token: DestructiveOperationToken, +) -> Result<(), anyhow::Error> { + let conn = datastore.pool_connection_for_tests().await?; + + // Find the most recent node for a given saga + let most_recent_node: SagaNodeEvent = { + use db::schema::saga_node_event::dsl; + + dsl::saga_node_event + .filter(dsl::saga_id.eq(args.saga_id)) + .order(dsl::event_time.desc()) + .limit(1) + .first_async(&*conn) + .await? + }; + + // Inject a fault for that node, which will cause the saga to unwind + let action_error = steno::ActionError::action_failed(String::from( + "error injected with omdb", + )); + + let fault = SagaNodeEvent { + saga_id: most_recent_node.saga_id, + node_id: most_recent_node.node_id, + event_type: steno::SagaNodeEventType::Failed(action_error.clone()) + .label() + .to_string(), + data: Some(serde_json::to_value(action_error)?), + event_time: chrono::Utc::now(), + creator: most_recent_node.creator, + }; + + { + use db::schema::saga_node_event::dsl; + + diesel::insert_into(dsl::saga_node_event) + .values(fault.clone()) + .execute_async(&*conn) + .await?; + } + + Ok(()) +} + +// helper functions + +async fn get_all_sagas_in_state( + conn: &DataStoreConnection, + state: steno::SagaCachedState, +) -> Result, anyhow::Error> { + let mut sagas = Vec::new(); + let mut paginator = Paginator::new(SQL_BATCH_SIZE); + while let Some(p) = paginator.next() { + use db::schema::saga::dsl; + let records_batch = + paginated(dsl::saga, dsl::id, &p.current_pagparams()) + .filter( + dsl::saga_state.eq(nexus_db_model::SagaCachedState(state)), + ) + .select(Saga::as_select()) + .load_async(&**conn) + .await + .context("fetching sagas")?; + + paginator = p.found_batch(&records_batch, &|s: &Saga| s.id); + + sagas.extend(records_batch); + } + + // Sort them by creation time (equivalently: how long they've been running) + sagas.sort_by_key(|s| s.time_created); + sagas.reverse(); + + Ok(sagas) +} diff --git a/dev-tools/omdb/tests/test_all_output.rs b/dev-tools/omdb/tests/test_all_output.rs index 146965273d3..f0d9902e509 100644 --- a/dev-tools/omdb/tests/test_all_output.rs +++ b/dev-tools/omdb/tests/test_all_output.rs @@ -80,6 +80,7 @@ async fn test_omdb_usage_errors() { &["db", "dns", "diff"], &["db", "dns", "names"], &["db", "sleds", "--help"], + &["db", "saga"], &["db", "snapshots"], &["db", "network"], &["mgs"], diff --git a/dev-tools/omdb/tests/usage_errors.out b/dev-tools/omdb/tests/usage_errors.out index 44b7d13aed7..c09414c9a8f 100644 --- a/dev-tools/omdb/tests/usage_errors.out +++ b/dev-tools/omdb/tests/usage_errors.out @@ -125,6 +125,7 @@ Commands: triggering one region-snapshot-replacement Query for information about region snapshot replacements, optionally manually triggering one + saga Commands for querying and interacting with sagas sleds Print information about sleds instance Print information about customer instances instances Alias to `omdb instance list` @@ -177,6 +178,7 @@ Commands: triggering one region-snapshot-replacement Query for information about region snapshot replacements, optionally manually triggering one + saga Commands for querying and interacting with sagas sleds Print information about sleds instance Print information about customer instances instances Alias to `omdb instance list` @@ -373,6 +375,39 @@ Safety Options: --------------------------------------------- stderr: ============================================= +EXECUTING COMMAND: omdb ["db", "saga"] +termination: Exited(2) +--------------------------------------------- +stdout: +--------------------------------------------- +stderr: +Commands for querying and interacting with sagas + +Usage: omdb db saga [OPTIONS] + +Commands: + running List running sagas + fault Inject an error into a saga's currently running node + help Print this message or the help of the given subcommand(s) + +Options: + --log-level log level filter [env: LOG_LEVEL=] [default: warn] + --color Color output [default: auto] [possible values: auto, always, never] + -h, --help Print help + +Connection Options: + --db-url URL of the database SQL interface [env: OMDB_DB_URL=] + --dns-server [env: OMDB_DNS_SERVER=] + +Database Options: + --fetch-limit limit to apply to queries that fetch rows [env: + OMDB_FETCH_LIMIT=] [default: 500] + --include-deleted whether to include soft-deleted records when enumerating objects + that can be soft-deleted + +Safety Options: + -w, --destructive Allow potentially-destructive subcommands +============================================= EXECUTING COMMAND: omdb ["db", "snapshots"] termination: Exited(2) --------------------------------------------- From b1108a38aa3cc9703825f2c2333b13ad6f26a272 Mon Sep 17 00:00:00 2001 From: James MacMahon Date: Tue, 11 Mar 2025 15:18:19 +0000 Subject: [PATCH 2/8] Responding to PR feedback - change fault to inject-error - show the current sec for a saga instead of the creator - inject an error for all started (but not completed) nodes of a saga: remember, it's a dag! - add a /v1/ping endpoint to the internal api, and ping to see if the current sec is up - it's not normally safe to inject an error while the saga is running - add a bypass for this check - clearly state what errors we're injecting - inject errors using a specific uuid for omdb --- dev-tools/omdb/src/bin/omdb/db/saga.rs | 216 ++++++++++++++++++++----- dev-tools/omdb/src/bin/omdb/main.rs | 3 + internal-dns/resolver/src/resolver.rs | 13 ++ nexus/internal-api/src/lib.rs | 15 ++ openapi/nexus-internal.json | 47 ++++++ 5 files changed, 256 insertions(+), 38 deletions(-) diff --git a/dev-tools/omdb/src/bin/omdb/db/saga.rs b/dev-tools/omdb/src/bin/omdb/db/saga.rs index dd21bcdfa90..fa3811d2063 100644 --- a/dev-tools/omdb/src/bin/omdb/db/saga.rs +++ b/dev-tools/omdb/src/bin/omdb/db/saga.rs @@ -8,10 +8,12 @@ use crate::Omdb; use crate::check_allow_destructive::DestructiveOperationToken; use crate::db::datetime_rfc3339_concise; use anyhow::Context; +use anyhow::bail; use async_bb8_diesel::AsyncRunQueryDsl; use clap::Args; use clap::Subcommand; use diesel::prelude::*; +use internal_dns_types::names::ServiceName; use nexus_db_model::Saga; use nexus_db_model::SagaNodeEvent; use nexus_db_queries::context::OpContext; @@ -21,9 +23,15 @@ use nexus_db_queries::db::datastore::DataStoreConnection; use nexus_db_queries::db::datastore::SQL_BATCH_SIZE; use nexus_db_queries::db::pagination::Paginator; use nexus_db_queries::db::pagination::paginated; +use std::collections::HashSet; +use std::sync::Arc; use tabled::Tabled; use uuid::Uuid; +use steno::ActionError; +use steno::SagaCachedState; +use steno::SagaNodeEventType; + /// `omdb db saga` subcommand #[derive(Debug, Args, Clone)] pub struct SagaArgs { @@ -36,13 +44,17 @@ enum SagaCommands { /// List running sagas Running, - /// Inject an error into a saga's currently running node - Fault(SagaFaultArgs), + /// Inject an error into a saga's currently running node(s) + InjectError(SagaInjectErrorArgs), } #[derive(Clone, Debug, Args)] -struct SagaFaultArgs { +struct SagaInjectErrorArgs { saga_id: Uuid, + + /// Skip checking if the SEC is up + #[clap(long, default_value_t = false)] + bypass_sec_check: bool, } impl SagaArgs { @@ -55,9 +67,10 @@ impl SagaArgs { match &self.command { SagaCommands::Running => cmd_sagas_running(opctx, datastore).await, - SagaCommands::Fault(args) => { + SagaCommands::InjectError(args) => { let token = omdb.check_allow_destructive()?; - cmd_sagas_fault(opctx, datastore, args, token).await + cmd_sagas_inject_error(omdb, opctx, datastore, args, token) + .await } } } @@ -69,13 +82,12 @@ async fn cmd_sagas_running( ) -> Result<(), anyhow::Error> { let conn = datastore.pool_connection_for_tests().await?; - let sagas = - get_all_sagas_in_state(&conn, steno::SagaCachedState::Running).await?; + let sagas = get_all_sagas_in_state(&conn, SagaCachedState::Running).await?; #[derive(Tabled)] struct SagaRow { id: Uuid, - creator_id: Uuid, + current_sec: String, time_created: String, name: String, state: String, @@ -85,7 +97,11 @@ async fn cmd_sagas_running( .into_iter() .map(|saga: Saga| SagaRow { id: saga.id.0.into(), - creator_id: saga.creator.0, + current_sec: if let Some(current_sec) = saga.current_sec { + current_sec.0.to_string() + } else { + String::from("-") + }, time_created: datetime_rfc3339_concise(&saga.time_created), name: saga.name, state: format!("{:?}", saga.saga_state), @@ -101,49 +117,173 @@ async fn cmd_sagas_running( Ok(()) } -async fn cmd_sagas_fault( - _opctx: &OpContext, +async fn cmd_sagas_inject_error( + omdb: &Omdb, + opctx: &OpContext, datastore: &DataStore, - args: &SagaFaultArgs, + args: &SagaInjectErrorArgs, _destruction_token: DestructiveOperationToken, ) -> Result<(), anyhow::Error> { let conn = datastore.pool_connection_for_tests().await?; - // Find the most recent node for a given saga - let most_recent_node: SagaNodeEvent = { + // Find all the nodes where there is a started record but not a done record + + let started_nodes: Vec = { use db::schema::saga_node_event::dsl; dsl::saga_node_event .filter(dsl::saga_id.eq(args.saga_id)) - .order(dsl::event_time.desc()) - .limit(1) - .first_async(&*conn) + .filter(dsl::event_type.eq(SagaNodeEventType::Started.label())) + .load_async(&*conn) .await? }; - // Inject a fault for that node, which will cause the saga to unwind - let action_error = steno::ActionError::action_failed(String::from( - "error injected with omdb", - )); - - let fault = SagaNodeEvent { - saga_id: most_recent_node.saga_id, - node_id: most_recent_node.node_id, - event_type: steno::SagaNodeEventType::Failed(action_error.clone()) - .label() - .to_string(), - data: Some(serde_json::to_value(action_error)?), - event_time: chrono::Utc::now(), - creator: most_recent_node.creator, + let complete_nodes: Vec = { + use db::schema::saga_node_event::dsl; + + // Note the actual enum contents don't matter in both these cases, it + // won't affect the label string + let succeeded_label = + SagaNodeEventType::Succeeded(Arc::new(serde_json::Value::Null)) + .label(); + + let failed_label = + SagaNodeEventType::Failed(ActionError::InjectedError).label(); + + dsl::saga_node_event + .filter(dsl::saga_id.eq(args.saga_id)) + .filter( + dsl::event_type + .eq(succeeded_label) + .or(dsl::event_type.eq(failed_label)), + ) + .load_async(&*conn) + .await? }; - { - use db::schema::saga_node_event::dsl; + let incomplete_nodes: HashSet = { + let started_node_ids: HashSet = + started_nodes.iter().map(|node| node.node_id.0.into()).collect(); + let complete_node_ids: HashSet = + complete_nodes.iter().map(|node| node.node_id.0.into()).collect(); + + started_node_ids.difference(&complete_node_ids).cloned().collect() + }; + + let incomplete_nodes: Vec<&SagaNodeEvent> = { + let mut result = vec![]; - diesel::insert_into(dsl::saga_node_event) - .values(fault.clone()) - .execute_async(&*conn) - .await?; + for node_id in incomplete_nodes { + let Some(node) = started_nodes + .iter() + .find(|node| node.node_id.0 == node_id.into()) + else { + bail!("could not find node?"); + }; + + result.push(node); + } + + result + }; + + // For each incomplete node, find the current SEC, and ping it to ensure + // that the Nexus is down. + if !args.bypass_sec_check { + for node in &incomplete_nodes { + let saga: Saga = { + use db::schema::saga::dsl; + dsl::saga + .filter(dsl::id.eq(node.saga_id)) + .first_async(&*conn) + .await? + }; + + let Some(current_sec) = saga.current_sec else { + // If there's no current SEC, then we don't need to check if + // it's up. Would we see this if the saga was Requested but not + // started? + continue; + }; + + let resolver = omdb.dns_resolver(opctx.log.clone()).await?; + let srv = resolver.lookup_srv(ServiceName::Nexus).await?; + + let Some((target, port)) = srv + .iter() + .find(|(name, _)| name.contains(¤t_sec.to_string())) + else { + bail!("dns lookup for {current_sec} found nothing"); + }; + + let Some(addr) = resolver.ipv6_lookup(&target).await? else { + bail!("dns lookup for {target} found nothing"); + }; + + let client = nexus_client::Client::new( + &format!("http://[{addr}]:{port}/"), + opctx.log.clone(), + ); + + match client.ping().await { + Ok(_) => { + bail!("{current_sec} answered a ping"); + } + + Err(e) => match e { + nexus_client::Error::InvalidRequest(_) + | nexus_client::Error::InvalidUpgrade(_) + | nexus_client::Error::ErrorResponse(_) + | nexus_client::Error::ResponseBodyError(_) + | nexus_client::Error::InvalidResponsePayload(_, _) + | nexus_client::Error::UnexpectedResponse(_) + | nexus_client::Error::PreHookError(_) + | nexus_client::Error::PostHookError(_) => { + bail!("{current_sec} failed a ping with {e}"); + } + + nexus_client::Error::CommunicationError(_) => { + // Assume communication error means that it could not be + // contacted. + // + // Note: this could be seen if Nexus is up but + // unreachable from where omdb is run! + } + }, + } + } + } + + // Inject an error for those nodes, which will cause the saga to unwind + for node in incomplete_nodes { + let action_error = ActionError::action_failed(String::from( + "error injected with omdb", + )); + + let fault = SagaNodeEvent { + saga_id: node.saga_id, + node_id: node.node_id, + event_type: SagaNodeEventType::Failed(action_error.clone()) + .label() + .to_string(), + data: Some(serde_json::to_value(action_error)?), + event_time: chrono::Utc::now(), + creator: crate::OMDB_UUID.into(), + }; + + eprintln!( + "injecting error for saga {:?} node {:?}", + node.saga_id, node.node_id, + ); + + { + use db::schema::saga_node_event::dsl; + + diesel::insert_into(dsl::saga_node_event) + .values(fault.clone()) + .execute_async(&*conn) + .await?; + } } Ok(()) @@ -153,7 +293,7 @@ async fn cmd_sagas_fault( async fn get_all_sagas_in_state( conn: &DataStoreConnection, - state: steno::SagaCachedState, + state: SagaCachedState, ) -> Result, anyhow::Error> { let mut sagas = Vec::new(); let mut paginator = Paginator::new(SQL_BATCH_SIZE); diff --git a/dev-tools/omdb/src/bin/omdb/main.rs b/dev-tools/omdb/src/bin/omdb/main.rs index 2228a78db3a..1949866b968 100644 --- a/dev-tools/omdb/src/bin/omdb/main.rs +++ b/dev-tools/omdb/src/bin/omdb/main.rs @@ -46,6 +46,7 @@ use omicron_common::address::Ipv6Subnet; use std::net::SocketAddr; use std::net::SocketAddrV6; use tokio::net::TcpSocket; +use uuid::Uuid; mod crucible_agent; mod db; @@ -57,6 +58,8 @@ mod oxql; mod reconfigurator; mod sled_agent; +const OMDB_UUID: Uuid = Uuid::from_u128(0xAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAu128); + #[tokio::main] async fn main() -> Result<(), anyhow::Error> { let args = Omdb::parse(); diff --git a/internal-dns/resolver/src/resolver.rs b/internal-dns/resolver/src/resolver.rs index 72dd84edd37..a1201fb8de9 100644 --- a/internal-dns/resolver/src/resolver.rs +++ b/internal-dns/resolver/src/resolver.rs @@ -385,6 +385,19 @@ impl Resolver { }) .flatten() } + + pub async fn ipv6_lookup( + &self, + query: &str, + ) -> Result, ResolveError> { + Ok(self + .resolver + .ipv6_lookup(query) + .await? + .into_iter() + .next() + .map(move |aaaa| aaaa.into())) + } } #[cfg(test)] diff --git a/nexus/internal-api/src/lib.rs b/nexus/internal-api/src/lib.rs index bcf84fd5079..a7ac6b22a29 100644 --- a/nexus/internal-api/src/lib.rs +++ b/nexus/internal-api/src/lib.rs @@ -17,6 +17,8 @@ use nexus_types::{ external_api::{ params::{PhysicalDiskPath, SledSelector, UninitializedSledId}, shared::{ProbeInfo, UninitializedSled}, + views::Ping, + views::PingStatus, views::SledPolicy, }, internal_api::{ @@ -49,6 +51,19 @@ const RACK_INITIALIZATION_REQUEST_MAX_BYTES: usize = 10 * 1024 * 1024; pub trait NexusInternalApi { type Context; + /// Ping API + /// + /// Always responds with Ok if it responds at all. + #[endpoint { + method = GET, + path = "/v1/ping", + }] + async fn ping( + _rqctx: RequestContext, + ) -> Result, HttpError> { + Ok(HttpResponseOk(Ping { status: PingStatus::Ok })) + } + /// Return information about the given sled agent #[endpoint { method = GET, diff --git a/openapi/nexus-internal.json b/openapi/nexus-internal.json index 75daa8f5d8e..bd58f4031f6 100644 --- a/openapi/nexus-internal.json +++ b/openapi/nexus-internal.json @@ -1427,6 +1427,31 @@ } } }, + "/v1/ping": { + "get": { + "summary": "Ping API", + "description": "Always responds with Ok if it responds at all.", + "operationId": "ping", + "responses": { + "200": { + "description": "successful operation", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/Ping" + } + } + } + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } + } + }, "/vmms/{propolis_id}": { "put": { "summary": "Report updated state for a VMM.", @@ -4698,6 +4723,28 @@ "vendor" ] }, + "Ping": { + "type": "object", + "properties": { + "status": { + "description": "Whether the external API is reachable. Will always be Ok if the endpoint returns anything at all.", + "allOf": [ + { + "$ref": "#/components/schemas/PingStatus" + } + ] + } + }, + "required": [ + "status" + ] + }, + "PingStatus": { + "type": "string", + "enum": [ + "ok" + ] + }, "PortConfigV2": { "type": "object", "properties": { From f4c82d8ed94f29e53de5c46680c30286dab44b5b Mon Sep 17 00:00:00 2001 From: James MacMahon Date: Tue, 11 Mar 2025 18:29:03 +0000 Subject: [PATCH 3/8] update omdb expectorate output --- dev-tools/omdb/tests/usage_errors.out | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dev-tools/omdb/tests/usage_errors.out b/dev-tools/omdb/tests/usage_errors.out index c09414c9a8f..8d7ef9a0385 100644 --- a/dev-tools/omdb/tests/usage_errors.out +++ b/dev-tools/omdb/tests/usage_errors.out @@ -386,9 +386,9 @@ Commands for querying and interacting with sagas Usage: omdb db saga [OPTIONS] Commands: - running List running sagas - fault Inject an error into a saga's currently running node - help Print this message or the help of the given subcommand(s) + running List running sagas + inject-error Inject an error into a saga's currently running node(s) + help Print this message or the help of the given subcommand(s) Options: --log-level log level filter [env: LOG_LEVEL=] [default: warn] From ab8169954cc020246773ea0b1fca108c5e1701dc Mon Sep 17 00:00:00 2001 From: James MacMahon Date: Thu, 13 Mar 2025 01:31:33 +0000 Subject: [PATCH 4/8] check for SEC being up before looking at saga nodes, and only do it once! --- dev-tools/omdb/src/bin/omdb/db/saga.rs | 135 +++++++++++++------------ 1 file changed, 68 insertions(+), 67 deletions(-) diff --git a/dev-tools/omdb/src/bin/omdb/db/saga.rs b/dev-tools/omdb/src/bin/omdb/db/saga.rs index fa3811d2063..18ef4a5000b 100644 --- a/dev-tools/omdb/src/bin/omdb/db/saga.rs +++ b/dev-tools/omdb/src/bin/omdb/db/saga.rs @@ -126,6 +126,74 @@ async fn cmd_sagas_inject_error( ) -> Result<(), anyhow::Error> { let conn = datastore.pool_connection_for_tests().await?; + // Before doing anything: find the current SEC for the saga, and ping it to + // ensure that the Nexus is down. + if !args.bypass_sec_check { + let saga: Saga = { + use db::schema::saga::dsl; + dsl::saga + .filter(dsl::id.eq(args.saga_id)) + .first_async(&*conn) + .await? + }; + + match saga.current_sec { + None => { + // If there's no current SEC, then we don't need to check if + // it's up. Would we see this if the saga was Requested but not + // started? + } + + Some(current_sec) => { + let resolver = omdb.dns_resolver(opctx.log.clone()).await?; + let srv = resolver.lookup_srv(ServiceName::Nexus).await?; + + let Some((target, port)) = srv + .iter() + .find(|(name, _)| name.contains(¤t_sec.to_string())) + else { + bail!("dns lookup for {current_sec} found nothing"); + }; + + let Some(addr) = resolver.ipv6_lookup(&target).await? else { + bail!("dns lookup for {target} found nothing"); + }; + + let client = nexus_client::Client::new( + &format!("http://[{addr}]:{port}/"), + opctx.log.clone(), + ); + + match client.ping().await { + Ok(_) => { + bail!("{current_sec} answered a ping"); + } + + Err(e) => match e { + nexus_client::Error::InvalidRequest(_) + | nexus_client::Error::InvalidUpgrade(_) + | nexus_client::Error::ErrorResponse(_) + | nexus_client::Error::ResponseBodyError(_) + | nexus_client::Error::InvalidResponsePayload(_, _) + | nexus_client::Error::UnexpectedResponse(_) + | nexus_client::Error::PreHookError(_) + | nexus_client::Error::PostHookError(_) => { + bail!("{current_sec} failed a ping with {e}"); + } + + nexus_client::Error::CommunicationError(_) => { + // Assume communication error means that it could + // not be contacted. + // + // Note: this could be seen if Nexus is up but + // unreachable from where omdb is run! + } + }, + } + } + } + } + // Find all the nodes where there is a started record but not a done record let started_nodes: Vec = { @@ -187,73 +255,6 @@ async fn cmd_sagas_inject_error( result }; - // For each incomplete node, find the current SEC, and ping it to ensure - // that the Nexus is down. - if !args.bypass_sec_check { - for node in &incomplete_nodes { - let saga: Saga = { - use db::schema::saga::dsl; - dsl::saga - .filter(dsl::id.eq(node.saga_id)) - .first_async(&*conn) - .await? - }; - - let Some(current_sec) = saga.current_sec else { - // If there's no current SEC, then we don't need to check if - // it's up. Would we see this if the saga was Requested but not - // started? - continue; - }; - - let resolver = omdb.dns_resolver(opctx.log.clone()).await?; - let srv = resolver.lookup_srv(ServiceName::Nexus).await?; - - let Some((target, port)) = srv - .iter() - .find(|(name, _)| name.contains(¤t_sec.to_string())) - else { - bail!("dns lookup for {current_sec} found nothing"); - }; - - let Some(addr) = resolver.ipv6_lookup(&target).await? else { - bail!("dns lookup for {target} found nothing"); - }; - - let client = nexus_client::Client::new( - &format!("http://[{addr}]:{port}/"), - opctx.log.clone(), - ); - - match client.ping().await { - Ok(_) => { - bail!("{current_sec} answered a ping"); - } - - Err(e) => match e { - nexus_client::Error::InvalidRequest(_) - | nexus_client::Error::InvalidUpgrade(_) - | nexus_client::Error::ErrorResponse(_) - | nexus_client::Error::ResponseBodyError(_) - | nexus_client::Error::InvalidResponsePayload(_, _) - | nexus_client::Error::UnexpectedResponse(_) - | nexus_client::Error::PreHookError(_) - | nexus_client::Error::PostHookError(_) => { - bail!("{current_sec} failed a ping with {e}"); - } - - nexus_client::Error::CommunicationError(_) => { - // Assume communication error means that it could not be - // contacted. - // - // Note: this could be seen if Nexus is up but - // unreachable from where omdb is run! - } - }, - } - } - } - // Inject an error for those nodes, which will cause the saga to unwind for node in incomplete_nodes { let action_error = ActionError::action_failed(String::from( From a6e1d380e3b1ad3f2a0cfb5d3cb48770397c0767 Mon Sep 17 00:00:00 2001 From: James MacMahon Date: Mon, 24 Mar 2025 21:32:15 +0000 Subject: [PATCH 5/8] scope the OMDB_SEC_UUID to only the saga.rs code, and document it --- dev-tools/omdb/src/bin/omdb/db/saga.rs | 9 ++++++++- dev-tools/omdb/src/bin/omdb/main.rs | 3 --- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/dev-tools/omdb/src/bin/omdb/db/saga.rs b/dev-tools/omdb/src/bin/omdb/db/saga.rs index 18ef4a5000b..74da223057d 100644 --- a/dev-tools/omdb/src/bin/omdb/db/saga.rs +++ b/dev-tools/omdb/src/bin/omdb/db/saga.rs @@ -32,6 +32,13 @@ use steno::ActionError; use steno::SagaCachedState; use steno::SagaNodeEventType; +/// OMDB's SEC id, used when inserting errors into running sagas. There should +/// be no way that regular V4 UUID creation collides with this, as the first +/// hexidecimal digit in the third group always starts with a 4 in for that +/// format. +const OMDB_SEC_UUID: Uuid = + Uuid::from_u128(0xAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAu128); + /// `omdb db saga` subcommand #[derive(Debug, Args, Clone)] pub struct SagaArgs { @@ -269,7 +276,7 @@ async fn cmd_sagas_inject_error( .to_string(), data: Some(serde_json::to_value(action_error)?), event_time: chrono::Utc::now(), - creator: crate::OMDB_UUID.into(), + creator: OMDB_SEC_UUID.into(), }; eprintln!( diff --git a/dev-tools/omdb/src/bin/omdb/main.rs b/dev-tools/omdb/src/bin/omdb/main.rs index 1949866b968..2228a78db3a 100644 --- a/dev-tools/omdb/src/bin/omdb/main.rs +++ b/dev-tools/omdb/src/bin/omdb/main.rs @@ -46,7 +46,6 @@ use omicron_common::address::Ipv6Subnet; use std::net::SocketAddr; use std::net::SocketAddrV6; use tokio::net::TcpSocket; -use uuid::Uuid; mod crucible_agent; mod db; @@ -58,8 +57,6 @@ mod oxql; mod reconfigurator; mod sled_agent; -const OMDB_UUID: Uuid = Uuid::from_u128(0xAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAu128); - #[tokio::main] async fn main() -> Result<(), anyhow::Error> { let args = Omdb::parse(); From 8d2de0914073744edd97702f4d842d70b1ac9a27 Mon Sep 17 00:00:00 2001 From: James MacMahon Date: Mon, 24 Mar 2025 21:40:25 +0000 Subject: [PATCH 6/8] disclaimer on ipv6_lookup --- internal-dns/resolver/src/resolver.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/internal-dns/resolver/src/resolver.rs b/internal-dns/resolver/src/resolver.rs index a1201fb8de9..b1cb35745ae 100644 --- a/internal-dns/resolver/src/resolver.rs +++ b/internal-dns/resolver/src/resolver.rs @@ -386,6 +386,14 @@ impl Resolver { .flatten() } + /// Lookup a specific record's IPv6 address + /// + /// In general, callers should _not_ be using this function, and instead + /// using the other functions in this struct that query based on a service + /// name. This function is currently only used by omdb because it has to ask + /// questions of a _specific_ Nexus, but most (if not all) control plane + /// software should not care about talking to a specific instance of + /// something by name. pub async fn ipv6_lookup( &self, query: &str, From 6cde273650d8878a0fd7b6a95d9f1f3af07e11da Mon Sep 17 00:00:00 2001 From: James MacMahon Date: Tue, 25 Mar 2025 21:55:20 +0000 Subject: [PATCH 7/8] many more descriptive errors --- Cargo.lock | 1 + dev-tools/omdb/Cargo.toml | 1 + dev-tools/omdb/src/bin/omdb/db/saga.rs | 88 +++++++++++++++++++++++++- dev-tools/omdb/src/bin/omdb/main.rs | 37 +++++++++++ dev-tools/omdb/src/bin/omdb/nexus.rs | 37 +---------- 5 files changed, 126 insertions(+), 38 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 55f3600bb94..db1707d22f8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7160,6 +7160,7 @@ dependencies = [ "omicron-test-utils", "omicron-uuid-kinds", "omicron-workspace-hack", + "owo-colors", "oximeter-client", "oximeter-db", "pq-sys", diff --git a/dev-tools/omdb/Cargo.toml b/dev-tools/omdb/Cargo.toml index 1809da19142..fa93ce0e50e 100644 --- a/dev-tools/omdb/Cargo.toml +++ b/dev-tools/omdb/Cargo.toml @@ -66,6 +66,7 @@ ipnetwork.workspace = true omicron-workspace-hack.workspace = true multimap.workspace = true indicatif.workspace = true +owo-colors.workspace = true [dev-dependencies] camino-tempfile.workspace = true diff --git a/dev-tools/omdb/src/bin/omdb/db/saga.rs b/dev-tools/omdb/src/bin/omdb/db/saga.rs index 74da223057d..2dc43e58c1e 100644 --- a/dev-tools/omdb/src/bin/omdb/db/saga.rs +++ b/dev-tools/omdb/src/bin/omdb/db/saga.rs @@ -4,9 +4,11 @@ //! `omdb db saga` subcommands +use crate::ConfirmationPrompt; use crate::Omdb; use crate::check_allow_destructive::DestructiveOperationToken; use crate::db::datetime_rfc3339_concise; +use crate::helpers::should_colorize; use anyhow::Context; use anyhow::bail; use async_bb8_diesel::AsyncRunQueryDsl; @@ -23,6 +25,7 @@ use nexus_db_queries::db::datastore::DataStoreConnection; use nexus_db_queries::db::datastore::SQL_BATCH_SIZE; use nexus_db_queries::db::pagination::Paginator; use nexus_db_queries::db::pagination::paginated; +use owo_colors::OwoColorize; use std::collections::HashSet; use std::sync::Arc; use tabled::Tabled; @@ -131,6 +134,8 @@ async fn cmd_sagas_inject_error( args: &SagaInjectErrorArgs, _destruction_token: DestructiveOperationToken, ) -> Result<(), anyhow::Error> { + let should_print_color = + should_colorize(omdb.output.color, supports_color::Stream::Stdout); let conn = datastore.pool_connection_for_tests().await?; // Before doing anything: find the current SEC for the saga, and ping it to @@ -149,6 +154,14 @@ async fn cmd_sagas_inject_error( // If there's no current SEC, then we don't need to check if // it's up. Would we see this if the saga was Requested but not // started? + let text = "warning: saga has no assigned SEC, so cannot \ + verify that the saga is not still running!"; + + if should_print_color { + println!("{}", text.yellow().bold()); + } else { + println!("{text}"); + } } Some(current_sec) => { @@ -159,10 +172,34 @@ async fn cmd_sagas_inject_error( .iter() .find(|(name, _)| name.contains(¤t_sec.to_string())) else { + let text = format!( + "Cannot proceed: no SRV record for Nexus with id \ + {current_sec}, so cannot verify that it is not still \ + running!" + ); + + if should_print_color { + println!("{}", text.red().bold()); + } else { + println!("{text}"); + } + bail!("dns lookup for {current_sec} found nothing"); }; let Some(addr) = resolver.ipv6_lookup(&target).await? else { + let text = format!( + "Cannot proceed: no AAAA record for Nexus with id \ + {current_sec}, so cannot verify that it is not still \ + running!" + ); + + if should_print_color { + println!("{}", text.red().bold()); + } else { + println!("{text}"); + } + bail!("dns lookup for {target} found nothing"); }; @@ -173,6 +210,20 @@ async fn cmd_sagas_inject_error( match client.ping().await { Ok(_) => { + let text = format!( + "Cannot proceed: Nexus with id matching current \ + SEC responded ok to a ping, meaning it is still \ + running. Injecting errors into running sagas is \ + not safe. Please ensure the Nexus with id \ + {current_sec} is stopped before proceeding." + ); + + if should_print_color { + println!("{}", text.red().bold()); + } else { + println!("{text}"); + } + bail!("{current_sec} answered a ping"); } @@ -185,20 +236,53 @@ async fn cmd_sagas_inject_error( | nexus_client::Error::UnexpectedResponse(_) | nexus_client::Error::PreHookError(_) | nexus_client::Error::PostHookError(_) => { + let text = format!( + "Cannot proceed: Nexus with id matching \ + current SEC responded with an error to a ping, \ + meaning it is still running. Injecting errors \ + into running sagas is not safe. Please ensure \ + the Nexus with id {current_sec} is stopped \ + before proceeding." + ); + + if should_print_color { + println!("{}", text.red().bold()); + } else { + println!("{text}"); + } + bail!("{current_sec} failed a ping with {e}"); } - nexus_client::Error::CommunicationError(_) => { + nexus_client::Error::CommunicationError(e) => { // Assume communication error means that it could // not be contacted. // // Note: this could be seen if Nexus is up but // unreachable from where omdb is run! + + println!( + "saw {e} when trying to ping Nexus with id \ + {current_sec}. Proceed?" + ); + + let mut prompt = ConfirmationPrompt::new(); + prompt.read_and_validate("y/N", "y")?; } }, } } } + } else { + let text = "Skipping check of whether the Nexus assigned to this saga \ + is running. If this Nexus is running, the control plane state managed \ + by this saga may become corrupted!"; + + if should_print_color { + println!("{}", text.red().bold()); + } else { + println!("{text}"); + } } // Find all the nodes where there is a started record but not a done record @@ -279,7 +363,7 @@ async fn cmd_sagas_inject_error( creator: OMDB_SEC_UUID.into(), }; - eprintln!( + println!( "injecting error for saga {:?} node {:?}", node.saga_id, node.node_id, ); diff --git a/dev-tools/omdb/src/bin/omdb/main.rs b/dev-tools/omdb/src/bin/omdb/main.rs index 2228a78db3a..904f2e2c0dc 100644 --- a/dev-tools/omdb/src/bin/omdb/main.rs +++ b/dev-tools/omdb/src/bin/omdb/main.rs @@ -35,6 +35,7 @@ use anyhow::Context; use anyhow::anyhow; +use anyhow::bail; use anyhow::ensure; use clap::Args; use clap::ColorChoice; @@ -43,6 +44,9 @@ use clap::Subcommand; use futures::StreamExt; use internal_dns_types::names::ServiceName; use omicron_common::address::Ipv6Subnet; +use reedline::DefaultPrompt; +use reedline::DefaultPromptSegment; +use reedline::Reedline; use std::net::SocketAddr; use std::net::SocketAddrV6; use tokio::net::TcpSocket; @@ -57,6 +61,39 @@ mod oxql; mod reconfigurator; mod sled_agent; +struct ConfirmationPrompt(Reedline); + +impl ConfirmationPrompt { + fn new() -> Self { + Self(Reedline::create()) + } + + fn read(&mut self, message: &str) -> Result { + let prompt = DefaultPrompt::new( + DefaultPromptSegment::Basic(message.to_string()), + DefaultPromptSegment::Empty, + ); + if let Ok(reedline::Signal::Success(input)) = self.0.read_line(&prompt) + { + Ok(input) + } else { + bail!("operation aborted") + } + } + + fn read_and_validate( + &mut self, + message: &str, + expected: &str, + ) -> Result<(), anyhow::Error> { + let input = self.read(message)?; + if input != expected { + bail!("Aborting, input did not match expected value"); + } + Ok(()) + } +} + #[tokio::main] async fn main() -> Result<(), anyhow::Error> { let args = Omdb::parse(); diff --git a/dev-tools/omdb/src/bin/omdb/nexus.rs b/dev-tools/omdb/src/bin/omdb/nexus.rs index ec903d6de23..21c313223c6 100644 --- a/dev-tools/omdb/src/bin/omdb/nexus.rs +++ b/dev-tools/omdb/src/bin/omdb/nexus.rs @@ -4,6 +4,7 @@ //! omdb commands that query or update specific Nexus instances +use crate::ConfirmationPrompt; use crate::Omdb; use crate::check_allow_destructive::DestructiveOperationToken; use crate::db::DbUrlOptions; @@ -66,9 +67,6 @@ use omicron_uuid_kinds::GenericUuid; use omicron_uuid_kinds::ParseError; use omicron_uuid_kinds::PhysicalDiskUuid; use omicron_uuid_kinds::SledUuid; -use reedline::DefaultPrompt; -use reedline::DefaultPromptSegment; -use reedline::Reedline; use serde::Deserialize; use slog_error_chain::InlineErrorChain; use std::collections::BTreeMap; @@ -2994,39 +2992,6 @@ async fn cmd_nexus_sled_add( Ok(()) } -struct ConfirmationPrompt(Reedline); - -impl ConfirmationPrompt { - fn new() -> Self { - Self(Reedline::create()) - } - - fn read(&mut self, message: &str) -> Result { - let prompt = DefaultPrompt::new( - DefaultPromptSegment::Basic(message.to_string()), - DefaultPromptSegment::Empty, - ); - if let Ok(reedline::Signal::Success(input)) = self.0.read_line(&prompt) - { - Ok(input) - } else { - bail!("operation aborted") - } - } - - fn read_and_validate( - &mut self, - message: &str, - expected: &str, - ) -> Result<(), anyhow::Error> { - let input = self.read(message)?; - if input != expected { - bail!("Aborting, input did not match expected value"); - } - Ok(()) - } -} - /// Runs `omdb nexus sleds expunge` async fn cmd_nexus_sled_expunge( client: &nexus_client::Client, From 2df69c2e8cdf77da5ef398b7072b72195dfdb520 Mon Sep 17 00:00:00 2001 From: James MacMahon Date: Tue, 25 Mar 2025 22:03:26 +0000 Subject: [PATCH 8/8] unwrap from started_nodes --- dev-tools/omdb/src/bin/omdb/db/saga.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/dev-tools/omdb/src/bin/omdb/db/saga.rs b/dev-tools/omdb/src/bin/omdb/db/saga.rs index 2dc43e58c1e..5b2d9b1f72f 100644 --- a/dev-tools/omdb/src/bin/omdb/db/saga.rs +++ b/dev-tools/omdb/src/bin/omdb/db/saga.rs @@ -333,12 +333,12 @@ async fn cmd_sagas_inject_error( let mut result = vec![]; for node_id in incomplete_nodes { - let Some(node) = started_nodes + // SAFETY: this unwrap is ok because incomplete_nodes will always + // contain a subset of entries from started_nodes. + let node = started_nodes .iter() .find(|node| node.node_id.0 == node_id.into()) - else { - bail!("could not find node?"); - }; + .unwrap(); result.push(node); }