Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nexus notifications have different importance #1621

Merged
merged 6 commits into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pantry/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ async fn bulk_write(

Ok(HttpResponseUpdatedNoContent())
}

#[derive(Deserialize, JsonSchema)]
struct BulkReadRequest {
pub offset: u64,
Expand Down
203 changes: 145 additions & 58 deletions upstairs/src/notify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ use nexus_client::types::{
};
use omicron_uuid_kinds::{GenericUuid, TypedUuid};

#[derive(Debug)]
pub(crate) enum NotifyQos {
High,
Low,
}

#[derive(Debug)]
pub(crate) enum NotifyRequest {
ClientTaskStopped {
Expand Down Expand Up @@ -65,42 +71,111 @@ pub(crate) enum NotifyRequest {
},
}

impl NotifyRequest {
pub(crate) fn qos(&self) -> NotifyQos {
match &self {
NotifyRequest::LiveRepairStart { .. }
| NotifyRequest::LiveRepairFinish { .. }
| NotifyRequest::ReconcileStart { .. }
| NotifyRequest::ReconcileFinish { .. } => NotifyQos::High,

NotifyRequest::ClientTaskStopped { .. }
| NotifyRequest::LiveRepairProgress { .. }
| NotifyRequest::ReconcileProgress { .. } => NotifyQos::Low,
}
}
}

pub(crate) struct NotifyQueue {
tx: mpsc::Sender<(DateTime<Utc>, NotifyRequest)>,
tx_high: mpsc::Sender<(DateTime<Utc>, NotifyRequest)>,
tx_low: mpsc::Sender<(DateTime<Utc>, NotifyRequest)>,
log: Logger,
}

impl NotifyQueue {
/// Insert a time-stamped request into the queue
pub fn send(&self, r: NotifyRequest) {
let now = Utc::now();
if let Err(r) = self.tx.try_send((now, r)) {
warn!(self.log, "could not send notify {r:?}; queue is full");
let qos = r.qos();
let queue = match &qos {
NotifyQos::High => &self.tx_high,
NotifyQos::Low => &self.tx_low,
};

if let Err(e) = queue.try_send((now, r)) {
warn!(self.log, "could not send {qos:?} notify: {e}",);
}
}
}

pub(crate) fn spawn_notify_task(addr: Ipv6Addr, log: &Logger) -> NotifyQueue {
let (tx, rx) = mpsc::channel(128);
let (tx_high, rx_high) = mpsc::channel(128);
let (tx_low, rx_low) = mpsc::channel(128);
let task_log = log.new(slog::o!("job" => "notify"));
tokio::spawn(async move { notify_task_nexus(addr, rx, task_log).await });

tokio::spawn(async move {
notify_task_nexus(addr, rx_high, rx_low, task_log).await
});

NotifyQueue {
tx,
tx_high,
tx_low,
log: log.new(o!("job" => "notify_queue")),
}
}

struct Notification {
message: (DateTime<Utc>, NotifyRequest),
qos: NotifyQos,
retries: usize,
}

async fn notify_task_nexus(
addr: Ipv6Addr,
mut rx: mpsc::Receiver<(DateTime<Utc>, NotifyRequest)>,
mut rx_high: mpsc::Receiver<(DateTime<Utc>, NotifyRequest)>,
mut rx_low: mpsc::Receiver<(DateTime<Utc>, NotifyRequest)>,
log: Logger,
) {
info!(log, "notify_task started");

// Store high QoS messages if they can't be sent
let mut stored_notification: Option<Notification> = None;

let reqwest_client = reqwest::ClientBuilder::new()
.connect_timeout(std::time::Duration::from_secs(15))
.timeout(std::time::Duration::from_secs(15))
.build()
.unwrap();
while let Some((time, m)) = rx.recv().await {

loop {
let r = tokio::select! {
biased;

Some(n) = async { stored_notification.take() } => Some(n),

i = rx_high.recv() => i.map(|message| Notification {
message,
qos: NotifyQos::High,
retries: 0,
}),

i = rx_low.recv() => i.map(|message| Notification {
message,
qos: NotifyQos::Low,
retries: 0,
}),
};

let Some(Notification {
message: (time, m),
qos,
retries,
}) = r
else {
error!(log, "one of the notify channels was closed!");
break;
};

debug!(log, "notify {m:?}");
let client = reqwest_client.clone();
let Some(nexus_client) = get_nexus_client(&log, client, addr).await
Expand All @@ -114,21 +189,23 @@ async fn notify_task_nexus(
);
continue;
};
let (r, s) = match m {

let (r, s) = match &m {
NotifyRequest::ClientTaskStopped {
upstairs_id,
downstairs_id,
reason,
} => {
let upstairs_id = TypedUuid::from_untyped_uuid(upstairs_id);
let downstairs_id = TypedUuid::from_untyped_uuid(downstairs_id);
let upstairs_id = TypedUuid::from_untyped_uuid(*upstairs_id);
let downstairs_id =
TypedUuid::from_untyped_uuid(*downstairs_id);
let reason = match reason {
ClientRunResult::ConnectionTimeout => {
DownstairsClientStoppedReason::ConnectionTimeout
}
ClientRunResult::ConnectionFailed(_) => {
// skip this notification, it's too noisy during connection
// retries
// skip this notification, it's too noisy during
// connection retries
//DownstairsClientStoppedReason::ConnectionFailed
continue;
}
Expand Down Expand Up @@ -159,16 +236,13 @@ async fn notify_task_nexus(
};

(
omicron_common::retry_until_known_result(&log, || async {
nexus_client
.cpapi_downstairs_client_stopped(
&upstairs_id,
&downstairs_id,
&DownstairsClientStopped { time, reason },
)
.await
})
.await,
nexus_client
.cpapi_downstairs_client_stopped(
&upstairs_id,
&downstairs_id,
&DownstairsClientStopped { time, reason },
)
.await,
"client stopped",
)
}
Expand All @@ -184,7 +258,7 @@ async fn notify_task_nexus(
session_id,
ref repairs,
} => {
let upstairs_id = TypedUuid::from_untyped_uuid(upstairs_id);
let upstairs_id = TypedUuid::from_untyped_uuid(*upstairs_id);
let (description, repair_type) =
if matches!(m, NotifyRequest::LiveRepairStart { .. }) {
("live repair start", UpstairsRepairType::Live)
Expand All @@ -193,9 +267,9 @@ async fn notify_task_nexus(
};
let info = RepairStartInfo {
time,
repair_id: TypedUuid::from_untyped_uuid(repair_id),
repair_id: TypedUuid::from_untyped_uuid(*repair_id),
repair_type,
session_id: TypedUuid::from_untyped_uuid(session_id),
session_id: TypedUuid::from_untyped_uuid(*session_id),
repairs: repairs
.iter()
.map(|(region_uuid, target_addr)| {
Expand All @@ -208,12 +282,9 @@ async fn notify_task_nexus(
};

(
omicron_common::retry_until_known_result(&log, || async {
nexus_client
.cpapi_upstairs_repair_start(&upstairs_id, &info)
.await
})
.await,
nexus_client
.cpapi_upstairs_repair_start(&upstairs_id, &info)
.await,
description,
)
}
Expand All @@ -229,8 +300,8 @@ async fn notify_task_nexus(
current_item,
total_items,
} => {
let upstairs_id = TypedUuid::from_untyped_uuid(upstairs_id);
let repair_id = TypedUuid::from_untyped_uuid(repair_id);
let upstairs_id = TypedUuid::from_untyped_uuid(*upstairs_id);
let repair_id = TypedUuid::from_untyped_uuid(*repair_id);
let description =
if matches!(m, NotifyRequest::LiveRepairProgress { .. }) {
"live repair progress"
Expand All @@ -239,20 +310,17 @@ async fn notify_task_nexus(
};

(
omicron_common::retry_until_known_result(&log, || async {
nexus_client
.cpapi_upstairs_repair_progress(
&upstairs_id,
&repair_id,
&RepairProgress {
current_item,
total_items,
time,
},
)
.await
})
.await,
nexus_client
.cpapi_upstairs_repair_progress(
&upstairs_id,
&repair_id,
&RepairProgress {
current_item: *current_item,
total_items: *total_items,
time,
},
)
.await,
description,
)
}
Expand All @@ -270,7 +338,7 @@ async fn notify_task_nexus(
aborted,
ref repairs,
} => {
let upstairs_id = TypedUuid::from_untyped_uuid(upstairs_id);
let upstairs_id = TypedUuid::from_untyped_uuid(*upstairs_id);
let (description, repair_type) =
if matches!(m, NotifyRequest::LiveRepairFinish { .. }) {
("live repair finish", UpstairsRepairType::Live)
Expand All @@ -279,9 +347,9 @@ async fn notify_task_nexus(
};
let info = RepairFinishInfo {
time,
repair_id: TypedUuid::from_untyped_uuid(repair_id),
repair_id: TypedUuid::from_untyped_uuid(*repair_id),
repair_type,
session_id: TypedUuid::from_untyped_uuid(session_id),
session_id: TypedUuid::from_untyped_uuid(*session_id),
repairs: repairs
.iter()
.map(|(region_uuid, target_addr)| {
Expand All @@ -291,30 +359,49 @@ async fn notify_task_nexus(
}
})
.collect(),
aborted,
aborted: *aborted,
};

(
omicron_common::retry_until_known_result(&log, || async {
nexus_client
.cpapi_upstairs_repair_finish(&upstairs_id, &info)
.await
})
.await,
nexus_client
.cpapi_upstairs_repair_finish(&upstairs_id, &info)
.await,
description,
)
}
};

match r {
Ok(_) => {
info!(log, "notified Nexus of {s}");
}

Err(e) => {
error!(log, "failed to notify Nexus of {s}: {e}");

// If there's a problem notifying Nexus, it could be due to
// Nexus being gone before the DNS was updated. If this is the
// case, then retrying should eventually pick a different Nexus
// and succeed. Store high priority messages so they can be
// resent.
if matches!(qos, NotifyQos::High) {
// If we've retried too many times, then drop this message.
// Unfortunately if this is true then other notifications
// will also likely fail.
if retries > 3 {
warn!(log, "retries > 3, dropping {m:?}");
} else {
stored_notification = Some(Notification {
message: (time, m),
qos,
retries: retries + 1,
});
}
}
}
}
}

info!(log, "notify_task exiting");
}

Expand Down