Skip to content

Commit 167f3c6

Browse files
committed
[1/N] omdb db sagas: list running and inject fault
Breaking apart #4378 and copying the structure of #7695, add `omdb db saga` as a command and implement the following sub-commands: Usage: omdb db saga [OPTIONS] <COMMAND> Commands: running List running sagas fault Inject an error into a saga's currently running node This addresses part of the minimum amount required during a release deployment: 1. after quiescing (#6804), omdb can query if there are any running sagas. 2. if those running sagas are stuck in a loop and cannot be drained (#7623), and the release contains a change to the DAG that causes Nexus to panic after an upgrade (#7730), then omdb can inject a fault into the database that would cause that saga to unwind when the affected Nexus is restarted Note for 2, unwinding a saga that is stuck in this way may not be valid if there were significant changes between releases.
1 parent 7a6144b commit 167f3c6

File tree

6 files changed

+227
-0
lines changed

6 files changed

+227
-0
lines changed

Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

dev-tools/omdb/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ serde_json.workspace = true
5252
sled-agent-client.workspace = true
5353
slog.workspace = true
5454
slog-error-chain.workspace = true
55+
steno.workspace = true
5556
strum.workspace = true
5657
supports-color.workspace = true
5758
tabled.workspace = true

dev-tools/omdb/src/bin/omdb/db.rs

+7
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,8 @@ use strum::IntoEnumIterator;
151151
use tabled::Tabled;
152152
use uuid::Uuid;
153153

154+
mod saga;
155+
154156
const NO_ACTIVE_PROPOLIS_MSG: &str = "<no active Propolis>";
155157
const NOT_ON_SLED_MSG: &str = "<not on any sled>";
156158

@@ -335,6 +337,8 @@ enum DbCommands {
335337
/// Query for information about region snapshot replacements, optionally
336338
/// manually triggering one.
337339
RegionSnapshotReplacement(RegionSnapshotReplacementArgs),
340+
/// Commands for querying and interacting with sagas
341+
Saga(saga::SagaArgs),
338342
/// Print information about sleds
339343
Sleds(SledsArgs),
340344
/// Print information about customer instances.
@@ -1023,6 +1027,9 @@ impl DbArgs {
10231027
)
10241028
.await
10251029
}
1030+
DbCommands::Saga(args) => {
1031+
args.exec(&omdb, &opctx, &datastore).await
1032+
}
10261033
DbCommands::Sleds(args) => {
10271034
cmd_db_sleds(&opctx, &datastore, &fetch_opts, args).await
10281035
}
+182
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
// This Source Code Form is subject to the terms of the Mozilla Public
2+
// License, v. 2.0. If a copy of the MPL was not distributed with this
3+
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
5+
//! `omdb db saga` subcommands
6+
7+
use crate::Omdb;
8+
use crate::check_allow_destructive::DestructiveOperationToken;
9+
use crate::db::datetime_rfc3339_concise;
10+
use anyhow::Context;
11+
use async_bb8_diesel::AsyncRunQueryDsl;
12+
use clap::Args;
13+
use clap::Subcommand;
14+
use diesel::prelude::*;
15+
use nexus_db_model::Saga;
16+
use nexus_db_model::SagaNodeEvent;
17+
use nexus_db_queries::context::OpContext;
18+
use nexus_db_queries::db;
19+
use nexus_db_queries::db::DataStore;
20+
use nexus_db_queries::db::datastore::DataStoreConnection;
21+
use nexus_db_queries::db::datastore::SQL_BATCH_SIZE;
22+
use nexus_db_queries::db::pagination::Paginator;
23+
use nexus_db_queries::db::pagination::paginated;
24+
use tabled::Tabled;
25+
use uuid::Uuid;
26+
27+
/// `omdb db saga` subcommand
28+
#[derive(Debug, Args, Clone)]
29+
pub struct SagaArgs {
30+
#[command(subcommand)]
31+
command: SagaCommands,
32+
}
33+
34+
#[derive(Debug, Subcommand, Clone)]
35+
enum SagaCommands {
36+
/// List running sagas
37+
Running,
38+
39+
/// Inject an error into a saga's currently running node
40+
Fault(SagaFaultArgs),
41+
}
42+
43+
#[derive(Clone, Debug, Args)]
44+
struct SagaFaultArgs {
45+
saga_id: Uuid,
46+
}
47+
48+
impl SagaArgs {
49+
pub async fn exec(
50+
&self,
51+
omdb: &Omdb,
52+
opctx: &OpContext,
53+
datastore: &DataStore,
54+
) -> Result<(), anyhow::Error> {
55+
match &self.command {
56+
SagaCommands::Running => cmd_sagas_running(opctx, datastore).await,
57+
58+
SagaCommands::Fault(args) => {
59+
let token = omdb.check_allow_destructive()?;
60+
cmd_sagas_fault(opctx, datastore, args, token).await
61+
}
62+
}
63+
}
64+
}
65+
66+
async fn cmd_sagas_running(
67+
_opctx: &OpContext,
68+
datastore: &DataStore,
69+
) -> Result<(), anyhow::Error> {
70+
let conn = datastore.pool_connection_for_tests().await?;
71+
72+
let sagas =
73+
get_all_sagas_in_state(&conn, steno::SagaCachedState::Running).await?;
74+
75+
#[derive(Tabled)]
76+
struct SagaRow {
77+
id: Uuid,
78+
creator_id: Uuid,
79+
time_created: String,
80+
name: String,
81+
state: String,
82+
}
83+
84+
let rows: Vec<_> = sagas
85+
.into_iter()
86+
.map(|saga: Saga| SagaRow {
87+
id: saga.id.0.into(),
88+
creator_id: saga.creator.0,
89+
time_created: datetime_rfc3339_concise(&saga.time_created),
90+
name: saga.name,
91+
state: format!("{:?}", saga.saga_state),
92+
})
93+
.collect();
94+
95+
let table = tabled::Table::new(rows)
96+
.with(tabled::settings::Style::psql())
97+
.to_string();
98+
99+
println!("{}", table);
100+
101+
Ok(())
102+
}
103+
104+
async fn cmd_sagas_fault(
105+
_opctx: &OpContext,
106+
datastore: &DataStore,
107+
args: &SagaFaultArgs,
108+
_destruction_token: DestructiveOperationToken,
109+
) -> Result<(), anyhow::Error> {
110+
let conn = datastore.pool_connection_for_tests().await?;
111+
112+
// Find the most recent node for a given saga
113+
let most_recent_node: SagaNodeEvent = {
114+
use db::schema::saga_node_event::dsl;
115+
116+
dsl::saga_node_event
117+
.filter(dsl::saga_id.eq(args.saga_id))
118+
.order(dsl::event_time.desc())
119+
.limit(1)
120+
.first_async(&*conn)
121+
.await?
122+
};
123+
124+
// Inject a fault for that node, which will cause the saga to unwind
125+
let action_error = steno::ActionError::action_failed(String::from(
126+
"error injected with omdb",
127+
));
128+
129+
let fault = SagaNodeEvent {
130+
saga_id: most_recent_node.saga_id,
131+
node_id: most_recent_node.node_id,
132+
event_type: steno::SagaNodeEventType::Failed(action_error.clone())
133+
.label()
134+
.to_string(),
135+
data: Some(serde_json::to_value(action_error)?),
136+
event_time: chrono::Utc::now(),
137+
creator: most_recent_node.creator,
138+
};
139+
140+
{
141+
use db::schema::saga_node_event::dsl;
142+
143+
diesel::insert_into(dsl::saga_node_event)
144+
.values(fault.clone())
145+
.execute_async(&*conn)
146+
.await?;
147+
}
148+
149+
Ok(())
150+
}
151+
152+
// helper functions
153+
154+
async fn get_all_sagas_in_state(
155+
conn: &DataStoreConnection,
156+
state: steno::SagaCachedState,
157+
) -> Result<Vec<Saga>, anyhow::Error> {
158+
let mut sagas = Vec::new();
159+
let mut paginator = Paginator::new(SQL_BATCH_SIZE);
160+
while let Some(p) = paginator.next() {
161+
use db::schema::saga::dsl;
162+
let records_batch =
163+
paginated(dsl::saga, dsl::id, &p.current_pagparams())
164+
.filter(
165+
dsl::saga_state.eq(nexus_db_model::SagaCachedState(state)),
166+
)
167+
.select(Saga::as_select())
168+
.load_async(&**conn)
169+
.await
170+
.context("fetching sagas")?;
171+
172+
paginator = p.found_batch(&records_batch, &|s: &Saga| s.id);
173+
174+
sagas.extend(records_batch);
175+
}
176+
177+
// Sort them by creation time (equivalently: how long they've been running)
178+
sagas.sort_by_key(|s| s.time_created);
179+
sagas.reverse();
180+
181+
Ok(sagas)
182+
}

dev-tools/omdb/tests/test_all_output.rs

+1
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ async fn test_omdb_usage_errors() {
8080
&["db", "dns", "diff"],
8181
&["db", "dns", "names"],
8282
&["db", "sleds", "--help"],
83+
&["db", "saga"],
8384
&["db", "snapshots"],
8485
&["db", "network"],
8586
&["mgs"],

dev-tools/omdb/tests/usage_errors.out

+35
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ Commands:
125125
triggering one
126126
region-snapshot-replacement Query for information about region snapshot replacements, optionally
127127
manually triggering one
128+
saga Commands for querying and interacting with sagas
128129
sleds Print information about sleds
129130
instance Print information about customer instances
130131
instances Alias to `omdb instance list`
@@ -177,6 +178,7 @@ Commands:
177178
triggering one
178179
region-snapshot-replacement Query for information about region snapshot replacements, optionally
179180
manually triggering one
181+
saga Commands for querying and interacting with sagas
180182
sleds Print information about sleds
181183
instance Print information about customer instances
182184
instances Alias to `omdb instance list`
@@ -373,6 +375,39 @@ Safety Options:
373375
---------------------------------------------
374376
stderr:
375377
=============================================
378+
EXECUTING COMMAND: omdb ["db", "saga"]
379+
termination: Exited(2)
380+
---------------------------------------------
381+
stdout:
382+
---------------------------------------------
383+
stderr:
384+
Commands for querying and interacting with sagas
385+
386+
Usage: omdb db saga [OPTIONS] <COMMAND>
387+
388+
Commands:
389+
running List running sagas
390+
fault Inject an error into a saga's currently running node
391+
help Print this message or the help of the given subcommand(s)
392+
393+
Options:
394+
--log-level <LOG_LEVEL> log level filter [env: LOG_LEVEL=] [default: warn]
395+
--color <COLOR> Color output [default: auto] [possible values: auto, always, never]
396+
-h, --help Print help
397+
398+
Connection Options:
399+
--db-url <DB_URL> URL of the database SQL interface [env: OMDB_DB_URL=]
400+
--dns-server <DNS_SERVER> [env: OMDB_DNS_SERVER=]
401+
402+
Database Options:
403+
--fetch-limit <FETCH_LIMIT> limit to apply to queries that fetch rows [env:
404+
OMDB_FETCH_LIMIT=] [default: 500]
405+
--include-deleted whether to include soft-deleted records when enumerating objects
406+
that can be soft-deleted
407+
408+
Safety Options:
409+
-w, --destructive Allow potentially-destructive subcommands
410+
=============================================
376411
EXECUTING COMMAND: omdb ["db", "snapshots"]
377412
termination: Exited(2)
378413
---------------------------------------------

0 commit comments

Comments
 (0)