diff --git a/Cargo.lock b/Cargo.lock index dff1bec8cf3..12add38a68f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7202,6 +7202,7 @@ dependencies = [ "omicron-test-utils", "omicron-uuid-kinds", "omicron-workspace-hack", + "owo-colors", "oximeter-client", "oximeter-db", "oxnet", @@ -7214,6 +7215,7 @@ dependencies = [ "sled-agent-client", "slog", "slog-error-chain", + "steno", "strum", "subprocess", "supports-color", diff --git a/dev-tools/omdb/Cargo.toml b/dev-tools/omdb/Cargo.toml index 73fb7b2815f..e62bd16c160 100644 --- a/dev-tools/omdb/Cargo.toml +++ b/dev-tools/omdb/Cargo.toml @@ -52,6 +52,7 @@ serde_json.workspace = true sled-agent-client.workspace = true slog.workspace = true slog-error-chain.workspace = true +steno.workspace = true strum.workspace = true supports-color.workspace = true tabled.workspace = true @@ -67,6 +68,7 @@ multimap.workspace = true indicatif.workspace = true petgraph.workspace = true oxnet.workspace = true +owo-colors.workspace = true [dev-dependencies] camino-tempfile.workspace = true diff --git a/dev-tools/omdb/src/bin/omdb/db.rs b/dev-tools/omdb/src/bin/omdb/db.rs index cd4b579583f..49473b05416 100644 --- a/dev-tools/omdb/src/bin/omdb/db.rs +++ b/dev-tools/omdb/src/bin/omdb/db.rs @@ -159,6 +159,8 @@ use strum::IntoEnumIterator; use tabled::Tabled; use uuid::Uuid; +mod saga; + const NO_ACTIVE_PROPOLIS_MSG: &str = ""; const NOT_ON_SLED_MSG: &str = ""; @@ -343,6 +345,8 @@ enum DbCommands { /// Query for information about region snapshot replacements, optionally /// manually triggering one. RegionSnapshotReplacement(RegionSnapshotReplacementArgs), + /// Commands for querying and interacting with sagas + Saga(saga::SagaArgs), /// Print information about sleds Sleds(SledsArgs), /// Print information about customer instances. @@ -1064,6 +1068,9 @@ impl DbArgs { ) .await } + DbCommands::Saga(args) => { + args.exec(&omdb, &opctx, &datastore).await + } DbCommands::Sleds(args) => { cmd_db_sleds(&opctx, &datastore, &fetch_opts, args).await } diff --git a/dev-tools/omdb/src/bin/omdb/db/saga.rs b/dev-tools/omdb/src/bin/omdb/db/saga.rs new file mode 100644 index 00000000000..5b2d9b1f72f --- /dev/null +++ b/dev-tools/omdb/src/bin/omdb/db/saga.rs @@ -0,0 +1,414 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! `omdb db saga` subcommands + +use crate::ConfirmationPrompt; +use crate::Omdb; +use crate::check_allow_destructive::DestructiveOperationToken; +use crate::db::datetime_rfc3339_concise; +use crate::helpers::should_colorize; +use anyhow::Context; +use anyhow::bail; +use async_bb8_diesel::AsyncRunQueryDsl; +use clap::Args; +use clap::Subcommand; +use diesel::prelude::*; +use internal_dns_types::names::ServiceName; +use nexus_db_model::Saga; +use nexus_db_model::SagaNodeEvent; +use nexus_db_queries::context::OpContext; +use nexus_db_queries::db; +use nexus_db_queries::db::DataStore; +use nexus_db_queries::db::datastore::DataStoreConnection; +use nexus_db_queries::db::datastore::SQL_BATCH_SIZE; +use nexus_db_queries::db::pagination::Paginator; +use nexus_db_queries::db::pagination::paginated; +use owo_colors::OwoColorize; +use std::collections::HashSet; +use std::sync::Arc; +use tabled::Tabled; +use uuid::Uuid; + +use steno::ActionError; +use steno::SagaCachedState; +use steno::SagaNodeEventType; + +/// OMDB's SEC id, used when inserting errors into running sagas. There should +/// be no way that regular V4 UUID creation collides with this, as the first +/// hexidecimal digit in the third group always starts with a 4 in for that +/// format. +const OMDB_SEC_UUID: Uuid = + Uuid::from_u128(0xAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAu128); + +/// `omdb db saga` subcommand +#[derive(Debug, Args, Clone)] +pub struct SagaArgs { + #[command(subcommand)] + command: SagaCommands, +} + +#[derive(Debug, Subcommand, Clone)] +enum SagaCommands { + /// List running sagas + Running, + + /// Inject an error into a saga's currently running node(s) + InjectError(SagaInjectErrorArgs), +} + +#[derive(Clone, Debug, Args)] +struct SagaInjectErrorArgs { + saga_id: Uuid, + + /// Skip checking if the SEC is up + #[clap(long, default_value_t = false)] + bypass_sec_check: bool, +} + +impl SagaArgs { + pub async fn exec( + &self, + omdb: &Omdb, + opctx: &OpContext, + datastore: &DataStore, + ) -> Result<(), anyhow::Error> { + match &self.command { + SagaCommands::Running => cmd_sagas_running(opctx, datastore).await, + + SagaCommands::InjectError(args) => { + let token = omdb.check_allow_destructive()?; + cmd_sagas_inject_error(omdb, opctx, datastore, args, token) + .await + } + } + } +} + +async fn cmd_sagas_running( + _opctx: &OpContext, + datastore: &DataStore, +) -> Result<(), anyhow::Error> { + let conn = datastore.pool_connection_for_tests().await?; + + let sagas = get_all_sagas_in_state(&conn, SagaCachedState::Running).await?; + + #[derive(Tabled)] + struct SagaRow { + id: Uuid, + current_sec: String, + time_created: String, + name: String, + state: String, + } + + let rows: Vec<_> = sagas + .into_iter() + .map(|saga: Saga| SagaRow { + id: saga.id.0.into(), + current_sec: if let Some(current_sec) = saga.current_sec { + current_sec.0.to_string() + } else { + String::from("-") + }, + time_created: datetime_rfc3339_concise(&saga.time_created), + name: saga.name, + state: format!("{:?}", saga.saga_state), + }) + .collect(); + + let table = tabled::Table::new(rows) + .with(tabled::settings::Style::psql()) + .to_string(); + + println!("{}", table); + + Ok(()) +} + +async fn cmd_sagas_inject_error( + omdb: &Omdb, + opctx: &OpContext, + datastore: &DataStore, + args: &SagaInjectErrorArgs, + _destruction_token: DestructiveOperationToken, +) -> Result<(), anyhow::Error> { + let should_print_color = + should_colorize(omdb.output.color, supports_color::Stream::Stdout); + let conn = datastore.pool_connection_for_tests().await?; + + // Before doing anything: find the current SEC for the saga, and ping it to + // ensure that the Nexus is down. + if !args.bypass_sec_check { + let saga: Saga = { + use db::schema::saga::dsl; + dsl::saga + .filter(dsl::id.eq(args.saga_id)) + .first_async(&*conn) + .await? + }; + + match saga.current_sec { + None => { + // If there's no current SEC, then we don't need to check if + // it's up. Would we see this if the saga was Requested but not + // started? + let text = "warning: saga has no assigned SEC, so cannot \ + verify that the saga is not still running!"; + + if should_print_color { + println!("{}", text.yellow().bold()); + } else { + println!("{text}"); + } + } + + Some(current_sec) => { + let resolver = omdb.dns_resolver(opctx.log.clone()).await?; + let srv = resolver.lookup_srv(ServiceName::Nexus).await?; + + let Some((target, port)) = srv + .iter() + .find(|(name, _)| name.contains(¤t_sec.to_string())) + else { + let text = format!( + "Cannot proceed: no SRV record for Nexus with id \ + {current_sec}, so cannot verify that it is not still \ + running!" + ); + + if should_print_color { + println!("{}", text.red().bold()); + } else { + println!("{text}"); + } + + bail!("dns lookup for {current_sec} found nothing"); + }; + + let Some(addr) = resolver.ipv6_lookup(&target).await? else { + let text = format!( + "Cannot proceed: no AAAA record for Nexus with id \ + {current_sec}, so cannot verify that it is not still \ + running!" + ); + + if should_print_color { + println!("{}", text.red().bold()); + } else { + println!("{text}"); + } + + bail!("dns lookup for {target} found nothing"); + }; + + let client = nexus_client::Client::new( + &format!("http://[{addr}]:{port}/"), + opctx.log.clone(), + ); + + match client.ping().await { + Ok(_) => { + let text = format!( + "Cannot proceed: Nexus with id matching current \ + SEC responded ok to a ping, meaning it is still \ + running. Injecting errors into running sagas is \ + not safe. Please ensure the Nexus with id \ + {current_sec} is stopped before proceeding." + ); + + if should_print_color { + println!("{}", text.red().bold()); + } else { + println!("{text}"); + } + + bail!("{current_sec} answered a ping"); + } + + Err(e) => match e { + nexus_client::Error::InvalidRequest(_) + | nexus_client::Error::InvalidUpgrade(_) + | nexus_client::Error::ErrorResponse(_) + | nexus_client::Error::ResponseBodyError(_) + | nexus_client::Error::InvalidResponsePayload(_, _) + | nexus_client::Error::UnexpectedResponse(_) + | nexus_client::Error::PreHookError(_) + | nexus_client::Error::PostHookError(_) => { + let text = format!( + "Cannot proceed: Nexus with id matching \ + current SEC responded with an error to a ping, \ + meaning it is still running. Injecting errors \ + into running sagas is not safe. Please ensure \ + the Nexus with id {current_sec} is stopped \ + before proceeding." + ); + + if should_print_color { + println!("{}", text.red().bold()); + } else { + println!("{text}"); + } + + bail!("{current_sec} failed a ping with {e}"); + } + + nexus_client::Error::CommunicationError(e) => { + // Assume communication error means that it could + // not be contacted. + // + // Note: this could be seen if Nexus is up but + // unreachable from where omdb is run! + + println!( + "saw {e} when trying to ping Nexus with id \ + {current_sec}. Proceed?" + ); + + let mut prompt = ConfirmationPrompt::new(); + prompt.read_and_validate("y/N", "y")?; + } + }, + } + } + } + } else { + let text = "Skipping check of whether the Nexus assigned to this saga \ + is running. If this Nexus is running, the control plane state managed \ + by this saga may become corrupted!"; + + if should_print_color { + println!("{}", text.red().bold()); + } else { + println!("{text}"); + } + } + + // Find all the nodes where there is a started record but not a done record + + let started_nodes: Vec = { + use db::schema::saga_node_event::dsl; + + dsl::saga_node_event + .filter(dsl::saga_id.eq(args.saga_id)) + .filter(dsl::event_type.eq(SagaNodeEventType::Started.label())) + .load_async(&*conn) + .await? + }; + + let complete_nodes: Vec = { + use db::schema::saga_node_event::dsl; + + // Note the actual enum contents don't matter in both these cases, it + // won't affect the label string + let succeeded_label = + SagaNodeEventType::Succeeded(Arc::new(serde_json::Value::Null)) + .label(); + + let failed_label = + SagaNodeEventType::Failed(ActionError::InjectedError).label(); + + dsl::saga_node_event + .filter(dsl::saga_id.eq(args.saga_id)) + .filter( + dsl::event_type + .eq(succeeded_label) + .or(dsl::event_type.eq(failed_label)), + ) + .load_async(&*conn) + .await? + }; + + let incomplete_nodes: HashSet = { + let started_node_ids: HashSet = + started_nodes.iter().map(|node| node.node_id.0.into()).collect(); + let complete_node_ids: HashSet = + complete_nodes.iter().map(|node| node.node_id.0.into()).collect(); + + started_node_ids.difference(&complete_node_ids).cloned().collect() + }; + + let incomplete_nodes: Vec<&SagaNodeEvent> = { + let mut result = vec![]; + + for node_id in incomplete_nodes { + // SAFETY: this unwrap is ok because incomplete_nodes will always + // contain a subset of entries from started_nodes. + let node = started_nodes + .iter() + .find(|node| node.node_id.0 == node_id.into()) + .unwrap(); + + result.push(node); + } + + result + }; + + // Inject an error for those nodes, which will cause the saga to unwind + for node in incomplete_nodes { + let action_error = ActionError::action_failed(String::from( + "error injected with omdb", + )); + + let fault = SagaNodeEvent { + saga_id: node.saga_id, + node_id: node.node_id, + event_type: SagaNodeEventType::Failed(action_error.clone()) + .label() + .to_string(), + data: Some(serde_json::to_value(action_error)?), + event_time: chrono::Utc::now(), + creator: OMDB_SEC_UUID.into(), + }; + + println!( + "injecting error for saga {:?} node {:?}", + node.saga_id, node.node_id, + ); + + { + use db::schema::saga_node_event::dsl; + + diesel::insert_into(dsl::saga_node_event) + .values(fault.clone()) + .execute_async(&*conn) + .await?; + } + } + + Ok(()) +} + +// helper functions + +async fn get_all_sagas_in_state( + conn: &DataStoreConnection, + state: SagaCachedState, +) -> Result, anyhow::Error> { + let mut sagas = Vec::new(); + let mut paginator = Paginator::new(SQL_BATCH_SIZE); + while let Some(p) = paginator.next() { + use db::schema::saga::dsl; + let records_batch = + paginated(dsl::saga, dsl::id, &p.current_pagparams()) + .filter( + dsl::saga_state.eq(nexus_db_model::SagaCachedState(state)), + ) + .select(Saga::as_select()) + .load_async(&**conn) + .await + .context("fetching sagas")?; + + paginator = p.found_batch(&records_batch, &|s: &Saga| s.id); + + sagas.extend(records_batch); + } + + // Sort them by creation time (equivalently: how long they've been running) + sagas.sort_by_key(|s| s.time_created); + sagas.reverse(); + + Ok(sagas) +} diff --git a/dev-tools/omdb/src/bin/omdb/main.rs b/dev-tools/omdb/src/bin/omdb/main.rs index b623ebf11ee..9fe7f17daa5 100644 --- a/dev-tools/omdb/src/bin/omdb/main.rs +++ b/dev-tools/omdb/src/bin/omdb/main.rs @@ -35,6 +35,7 @@ use anyhow::Context; use anyhow::anyhow; +use anyhow::bail; use anyhow::ensure; use clap::Args; use clap::ColorChoice; @@ -43,6 +44,9 @@ use clap::Subcommand; use futures::StreamExt; use internal_dns_types::names::ServiceName; use omicron_common::address::Ipv6Subnet; +use reedline::DefaultPrompt; +use reedline::DefaultPromptSegment; +use reedline::Reedline; use std::net::SocketAddr; use std::net::SocketAddrV6; use tokio::net::TcpSocket; @@ -57,6 +61,39 @@ mod oxql; mod reconfigurator; mod sled_agent; +struct ConfirmationPrompt(Reedline); + +impl ConfirmationPrompt { + fn new() -> Self { + Self(Reedline::create()) + } + + fn read(&mut self, message: &str) -> Result { + let prompt = DefaultPrompt::new( + DefaultPromptSegment::Basic(message.to_string()), + DefaultPromptSegment::Empty, + ); + if let Ok(reedline::Signal::Success(input)) = self.0.read_line(&prompt) + { + Ok(input) + } else { + bail!("operation aborted") + } + } + + fn read_and_validate( + &mut self, + message: &str, + expected: &str, + ) -> Result<(), anyhow::Error> { + let input = self.read(message)?; + if input != expected { + bail!("Aborting, input did not match expected value"); + } + Ok(()) + } +} + #[tokio::main] async fn main() -> Result<(), anyhow::Error> { let args = Omdb::parse(); diff --git a/dev-tools/omdb/src/bin/omdb/nexus.rs b/dev-tools/omdb/src/bin/omdb/nexus.rs index 5e40011412c..f92d756608b 100644 --- a/dev-tools/omdb/src/bin/omdb/nexus.rs +++ b/dev-tools/omdb/src/bin/omdb/nexus.rs @@ -4,6 +4,7 @@ //! omdb commands that query or update specific Nexus instances +use crate::ConfirmationPrompt; use crate::Omdb; use crate::check_allow_destructive::DestructiveOperationToken; use crate::db::DbUrlOptions; @@ -66,9 +67,6 @@ use omicron_uuid_kinds::GenericUuid; use omicron_uuid_kinds::ParseError; use omicron_uuid_kinds::PhysicalDiskUuid; use omicron_uuid_kinds::SledUuid; -use reedline::DefaultPrompt; -use reedline::DefaultPromptSegment; -use reedline::Reedline; use serde::Deserialize; use slog_error_chain::InlineErrorChain; use std::collections::BTreeMap; @@ -2994,39 +2992,6 @@ async fn cmd_nexus_sled_add( Ok(()) } -struct ConfirmationPrompt(Reedline); - -impl ConfirmationPrompt { - fn new() -> Self { - Self(Reedline::create()) - } - - fn read(&mut self, message: &str) -> Result { - let prompt = DefaultPrompt::new( - DefaultPromptSegment::Basic(message.to_string()), - DefaultPromptSegment::Empty, - ); - if let Ok(reedline::Signal::Success(input)) = self.0.read_line(&prompt) - { - Ok(input) - } else { - bail!("operation aborted") - } - } - - fn read_and_validate( - &mut self, - message: &str, - expected: &str, - ) -> Result<(), anyhow::Error> { - let input = self.read(message)?; - if input != expected { - bail!("Aborting, input did not match expected value"); - } - Ok(()) - } -} - /// Runs `omdb nexus sleds expunge` async fn cmd_nexus_sled_expunge( client: &nexus_client::Client, diff --git a/dev-tools/omdb/tests/test_all_output.rs b/dev-tools/omdb/tests/test_all_output.rs index ce0a2da4379..d7ade6aa8ec 100644 --- a/dev-tools/omdb/tests/test_all_output.rs +++ b/dev-tools/omdb/tests/test_all_output.rs @@ -80,6 +80,7 @@ async fn test_omdb_usage_errors() { &["db", "dns", "diff"], &["db", "dns", "names"], &["db", "sleds", "--help"], + &["db", "saga"], &["db", "snapshots"], &["db", "network"], &["mgs"], diff --git a/dev-tools/omdb/tests/usage_errors.out b/dev-tools/omdb/tests/usage_errors.out index 5a61dfd7716..43cefccf0a4 100644 --- a/dev-tools/omdb/tests/usage_errors.out +++ b/dev-tools/omdb/tests/usage_errors.out @@ -125,6 +125,7 @@ Commands: triggering one region-snapshot-replacement Query for information about region snapshot replacements, optionally manually triggering one + saga Commands for querying and interacting with sagas sleds Print information about sleds instance Print information about customer instances instances Alias to `omdb instance list` @@ -178,6 +179,7 @@ Commands: triggering one region-snapshot-replacement Query for information about region snapshot replacements, optionally manually triggering one + saga Commands for querying and interacting with sagas sleds Print information about sleds instance Print information about customer instances instances Alias to `omdb instance list` @@ -375,6 +377,39 @@ Safety Options: --------------------------------------------- stderr: ============================================= +EXECUTING COMMAND: omdb ["db", "saga"] +termination: Exited(2) +--------------------------------------------- +stdout: +--------------------------------------------- +stderr: +Commands for querying and interacting with sagas + +Usage: omdb db saga [OPTIONS] + +Commands: + running List running sagas + inject-error Inject an error into a saga's currently running node(s) + help Print this message or the help of the given subcommand(s) + +Options: + --log-level log level filter [env: LOG_LEVEL=] [default: warn] + --color Color output [default: auto] [possible values: auto, always, never] + -h, --help Print help + +Connection Options: + --db-url URL of the database SQL interface [env: OMDB_DB_URL=] + --dns-server [env: OMDB_DNS_SERVER=] + +Database Options: + --fetch-limit limit to apply to queries that fetch rows [env: + OMDB_FETCH_LIMIT=] [default: 500] + --include-deleted whether to include soft-deleted records when enumerating objects + that can be soft-deleted + +Safety Options: + -w, --destructive Allow potentially-destructive subcommands +============================================= EXECUTING COMMAND: omdb ["db", "snapshots"] termination: Exited(2) --------------------------------------------- diff --git a/internal-dns/resolver/src/resolver.rs b/internal-dns/resolver/src/resolver.rs index 72dd84edd37..b1cb35745ae 100644 --- a/internal-dns/resolver/src/resolver.rs +++ b/internal-dns/resolver/src/resolver.rs @@ -385,6 +385,27 @@ impl Resolver { }) .flatten() } + + /// Lookup a specific record's IPv6 address + /// + /// In general, callers should _not_ be using this function, and instead + /// using the other functions in this struct that query based on a service + /// name. This function is currently only used by omdb because it has to ask + /// questions of a _specific_ Nexus, but most (if not all) control plane + /// software should not care about talking to a specific instance of + /// something by name. + pub async fn ipv6_lookup( + &self, + query: &str, + ) -> Result, ResolveError> { + Ok(self + .resolver + .ipv6_lookup(query) + .await? + .into_iter() + .next() + .map(move |aaaa| aaaa.into())) + } } #[cfg(test)] diff --git a/nexus/internal-api/src/lib.rs b/nexus/internal-api/src/lib.rs index bcf84fd5079..a7ac6b22a29 100644 --- a/nexus/internal-api/src/lib.rs +++ b/nexus/internal-api/src/lib.rs @@ -17,6 +17,8 @@ use nexus_types::{ external_api::{ params::{PhysicalDiskPath, SledSelector, UninitializedSledId}, shared::{ProbeInfo, UninitializedSled}, + views::Ping, + views::PingStatus, views::SledPolicy, }, internal_api::{ @@ -49,6 +51,19 @@ const RACK_INITIALIZATION_REQUEST_MAX_BYTES: usize = 10 * 1024 * 1024; pub trait NexusInternalApi { type Context; + /// Ping API + /// + /// Always responds with Ok if it responds at all. + #[endpoint { + method = GET, + path = "/v1/ping", + }] + async fn ping( + _rqctx: RequestContext, + ) -> Result, HttpError> { + Ok(HttpResponseOk(Ping { status: PingStatus::Ok })) + } + /// Return information about the given sled agent #[endpoint { method = GET, diff --git a/openapi/nexus-internal.json b/openapi/nexus-internal.json index ac4c4167951..12d6007bbdb 100644 --- a/openapi/nexus-internal.json +++ b/openapi/nexus-internal.json @@ -1427,6 +1427,31 @@ } } }, + "/v1/ping": { + "get": { + "summary": "Ping API", + "description": "Always responds with Ok if it responds at all.", + "operationId": "ping", + "responses": { + "200": { + "description": "successful operation", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/Ping" + } + } + } + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } + } + }, "/vmms/{propolis_id}": { "put": { "summary": "Report updated state for a VMM.", @@ -4653,6 +4678,28 @@ "vendor" ] }, + "Ping": { + "type": "object", + "properties": { + "status": { + "description": "Whether the external API is reachable. Will always be Ok if the endpoint returns anything at all.", + "allOf": [ + { + "$ref": "#/components/schemas/PingStatus" + } + ] + } + }, + "required": [ + "status" + ] + }, + "PingStatus": { + "type": "string", + "enum": [ + "ok" + ] + }, "PortConfigV2": { "type": "object", "properties": {