Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[1/N] omdb db sagas: list running and inject fault #7732

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions dev-tools/omdb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ serde_json.workspace = true
sled-agent-client.workspace = true
slog.workspace = true
slog-error-chain.workspace = true
steno.workspace = true
strum.workspace = true
supports-color.workspace = true
tabled.workspace = true
Expand Down
7 changes: 7 additions & 0 deletions dev-tools/omdb/src/bin/omdb/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ use strum::IntoEnumIterator;
use tabled::Tabled;
use uuid::Uuid;

mod saga;

const NO_ACTIVE_PROPOLIS_MSG: &str = "<no active Propolis>";
const NOT_ON_SLED_MSG: &str = "<not on any sled>";

Expand Down Expand Up @@ -335,6 +337,8 @@ enum DbCommands {
/// Query for information about region snapshot replacements, optionally
/// manually triggering one.
RegionSnapshotReplacement(RegionSnapshotReplacementArgs),
/// Commands for querying and interacting with sagas
Saga(saga::SagaArgs),
/// Print information about sleds
Sleds(SledsArgs),
/// Print information about customer instances.
Expand Down Expand Up @@ -1023,6 +1027,9 @@ impl DbArgs {
)
.await
}
DbCommands::Saga(args) => {
args.exec(&omdb, &opctx, &datastore).await
}
DbCommands::Sleds(args) => {
cmd_db_sleds(&opctx, &datastore, &fetch_opts, args).await
}
Expand Down
182 changes: 182 additions & 0 deletions dev-tools/omdb/src/bin/omdb/db/saga.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

//! `omdb db saga` subcommands

use crate::Omdb;
use crate::check_allow_destructive::DestructiveOperationToken;
use crate::db::datetime_rfc3339_concise;
use anyhow::Context;
use async_bb8_diesel::AsyncRunQueryDsl;
use clap::Args;
use clap::Subcommand;
use diesel::prelude::*;
use nexus_db_model::Saga;
use nexus_db_model::SagaNodeEvent;
use nexus_db_queries::context::OpContext;
use nexus_db_queries::db;
use nexus_db_queries::db::DataStore;
use nexus_db_queries::db::datastore::DataStoreConnection;
use nexus_db_queries::db::datastore::SQL_BATCH_SIZE;
use nexus_db_queries::db::pagination::Paginator;
use nexus_db_queries::db::pagination::paginated;
use tabled::Tabled;
use uuid::Uuid;

/// `omdb db saga` subcommand
#[derive(Debug, Args, Clone)]
pub struct SagaArgs {
#[command(subcommand)]
command: SagaCommands,
}

#[derive(Debug, Subcommand, Clone)]
enum SagaCommands {
/// List running sagas
Running,

/// Inject an error into a saga's currently running node
Fault(SagaFaultArgs),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest calling this inject-error, only because fault feels like it could mean a bunch of different specific things. I think inject-error is less ambiguous?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done in b1108a3

}

#[derive(Clone, Debug, Args)]
struct SagaFaultArgs {
saga_id: Uuid,
}

impl SagaArgs {
pub async fn exec(
&self,
omdb: &Omdb,
opctx: &OpContext,
datastore: &DataStore,
) -> Result<(), anyhow::Error> {
match &self.command {
SagaCommands::Running => cmd_sagas_running(opctx, datastore).await,

SagaCommands::Fault(args) => {
let token = omdb.check_allow_destructive()?;
cmd_sagas_fault(opctx, datastore, args, token).await
}
}
}
}

async fn cmd_sagas_running(
_opctx: &OpContext,
datastore: &DataStore,
) -> Result<(), anyhow::Error> {
let conn = datastore.pool_connection_for_tests().await?;

let sagas =
get_all_sagas_in_state(&conn, steno::SagaCachedState::Running).await?;

#[derive(Tabled)]
struct SagaRow {
id: Uuid,
creator_id: Uuid,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest putting the current sec here instead of the creator. That's almost always more relevant I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done in b1108a3

time_created: String,
name: String,
state: String,
}

let rows: Vec<_> = sagas
.into_iter()
.map(|saga: Saga| SagaRow {
id: saga.id.0.into(),
creator_id: saga.creator.0,
time_created: datetime_rfc3339_concise(&saga.time_created),
name: saga.name,
state: format!("{:?}", saga.saga_state),
})
.collect();

let table = tabled::Table::new(rows)
.with(tabled::settings::Style::psql())
.to_string();

println!("{}", table);

Ok(())
}

async fn cmd_sagas_fault(
_opctx: &OpContext,
datastore: &DataStore,
args: &SagaFaultArgs,
_destruction_token: DestructiveOperationToken,
) -> Result<(), anyhow::Error> {
let conn = datastore.pool_connection_for_tests().await?;

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know it's a bunch more work but I think it's worth adding a safety check here (overrideable, I guess) that tries to contact the Nexus that we think is running this saga and fails if that succeeds.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done in b1108a3

// Find the most recent node for a given saga
let most_recent_node: SagaNodeEvent = {
use db::schema::saga_node_event::dsl;

dsl::saga_node_event
.filter(dsl::saga_id.eq(args.saga_id))
.order(dsl::event_time.desc())
.limit(1)
.first_async(&*conn)
.await?
};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you want to enumerate all nodes for which you have an ActionStarted but no ActionDone or ActionFailed.

I say this because:

  • There might be more than one of them.
  • They might not be the most recent ones.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done in b1108a3


// Inject a fault for that node, which will cause the saga to unwind
let action_error = steno::ActionError::action_failed(String::from(
"error injected with omdb",
));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you tested that Nexus doesn't choke on this? Like does it have expectations about what the contents of an error from the saga should look like?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah - my testing procedure was to

  • launch a debug saga
  • stop Nexus
  • inject an error
  • restart Nexus
  • ensure that the saga unwound correctly

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

                             saga id | event time               | sub saga | node id             | event type    | data
------------------------------------ | ------------------------ | -------- | ------------------- | ------------- | ---
02589f1f-b612-46b0-a460-d77442b57345 | 2025-03-11T17:38:51.337Z |          |   1: start          | started       | 
02589f1f-b612-46b0-a460-d77442b57345 | 2025-03-11T17:38:51.342Z |          |   1: start          | succeeded     | 
02589f1f-b612-46b0-a460-d77442b57345 | 2025-03-11T17:38:51.346Z |          |   0: demo.demo_wait | started       | 
02589f1f-b612-46b0-a460-d77442b57345 | 2025-03-11T17:39:39.077Z |          |   0: demo.demo_wait | failed        | "demo_wait" => {"ActionFailed":{"source_error":"error injected with omdb"}}
02589f1f-b612-46b0-a460-d77442b57345 | 2025-03-11T17:39:53.753Z |          |   1: start          | undo_started  | 
02589f1f-b612-46b0-a460-d77442b57345 | 2025-03-11T17:39:53.767Z |          |   1: start          | undo_finished | 


let fault = SagaNodeEvent {
saga_id: most_recent_node.saga_id,
node_id: most_recent_node.node_id,
event_type: steno::SagaNodeEventType::Failed(action_error.clone())
.label()
.to_string(),
data: Some(serde_json::to_value(action_error)?),
event_time: chrono::Utc::now(),
creator: most_recent_node.creator,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should make a specific well-known uuid for omdb and use that here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done in b1108a3 (let me know if you think the value is appropriate :))

};

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we print out exactly what it's doing? Something like:

injecting fault into running node ${node_id}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done in b1108a3

{
use db::schema::saga_node_event::dsl;

diesel::insert_into(dsl::saga_node_event)
.values(fault.clone())
.execute_async(&*conn)
.await?;
}

Ok(())
}

// helper functions

async fn get_all_sagas_in_state(
conn: &DataStoreConnection,
state: steno::SagaCachedState,
) -> Result<Vec<Saga>, anyhow::Error> {
let mut sagas = Vec::new();
let mut paginator = Paginator::new(SQL_BATCH_SIZE);
while let Some(p) = paginator.next() {
use db::schema::saga::dsl;
let records_batch =
paginated(dsl::saga, dsl::id, &p.current_pagparams())
.filter(
dsl::saga_state.eq(nexus_db_model::SagaCachedState(state)),
)
.select(Saga::as_select())
.load_async(&**conn)
.await
.context("fetching sagas")?;

paginator = p.found_batch(&records_batch, &|s: &Saga| s.id);

sagas.extend(records_batch);
}

// Sort them by creation time (equivalently: how long they've been running)
sagas.sort_by_key(|s| s.time_created);
sagas.reverse();

Ok(sagas)
}
1 change: 1 addition & 0 deletions dev-tools/omdb/tests/test_all_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ async fn test_omdb_usage_errors() {
&["db", "dns", "diff"],
&["db", "dns", "names"],
&["db", "sleds", "--help"],
&["db", "saga"],
&["db", "snapshots"],
&["db", "network"],
&["mgs"],
Expand Down
35 changes: 35 additions & 0 deletions dev-tools/omdb/tests/usage_errors.out
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ Commands:
triggering one
region-snapshot-replacement Query for information about region snapshot replacements, optionally
manually triggering one
saga Commands for querying and interacting with sagas
sleds Print information about sleds
instance Print information about customer instances
instances Alias to `omdb instance list`
Expand Down Expand Up @@ -177,6 +178,7 @@ Commands:
triggering one
region-snapshot-replacement Query for information about region snapshot replacements, optionally
manually triggering one
saga Commands for querying and interacting with sagas
sleds Print information about sleds
instance Print information about customer instances
instances Alias to `omdb instance list`
Expand Down Expand Up @@ -373,6 +375,39 @@ Safety Options:
---------------------------------------------
stderr:
=============================================
EXECUTING COMMAND: omdb ["db", "saga"]
termination: Exited(2)
---------------------------------------------
stdout:
---------------------------------------------
stderr:
Commands for querying and interacting with sagas

Usage: omdb db saga [OPTIONS] <COMMAND>

Commands:
running List running sagas
fault Inject an error into a saga's currently running node
help Print this message or the help of the given subcommand(s)

Options:
--log-level <LOG_LEVEL> log level filter [env: LOG_LEVEL=] [default: warn]
--color <COLOR> Color output [default: auto] [possible values: auto, always, never]
-h, --help Print help

Connection Options:
--db-url <DB_URL> URL of the database SQL interface [env: OMDB_DB_URL=]
--dns-server <DNS_SERVER> [env: OMDB_DNS_SERVER=]

Database Options:
--fetch-limit <FETCH_LIMIT> limit to apply to queries that fetch rows [env:
OMDB_FETCH_LIMIT=] [default: 500]
--include-deleted whether to include soft-deleted records when enumerating objects
that can be soft-deleted

Safety Options:
-w, --destructive Allow potentially-destructive subcommands
=============================================
EXECUTING COMMAND: omdb ["db", "snapshots"]
termination: Exited(2)
---------------------------------------------
Expand Down
Loading