Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[reconfigurator-execution] switch to async closures #7811

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 25 additions & 32 deletions nexus/reconfigurator/execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ fn register_zone_external_networking_step<'a>(
.new_step(
ExecutionStepId::Ensure,
"Ensure external networking resources",
move |_cx| async move {
async move |_cx| {
datastore
.blueprint_ensure_external_networking_resources(
opctx, blueprint,
Expand All @@ -252,7 +252,7 @@ fn register_support_bundle_failure_step<'a>(
ExecutionStepId::Cleanup,
"Mark support bundles as failed if they rely on \
an expunged disk or sled",
move |_cx| async move {
async move |_cx| {
let res = match datastore
.support_bundle_fail_expunged(opctx, blueprint, nexus_id)
.await
Expand All @@ -276,26 +276,19 @@ fn register_sled_list_step<'a>(
datastore: &'a DataStore,
) -> StepHandle<Arc<BTreeMap<SledUuid, Sled>>> {
registrar
.new_step(
ExecutionStepId::Fetch,
"Fetch sled list",
move |_cx| async move {
let sleds_by_id: BTreeMap<SledUuid, _> = datastore
.sled_list_all_batched(opctx, SledFilter::InService)
.await
.context("listing all sleds")?
.into_iter()
.map(|db_sled| {
(
SledUuid::from_untyped_uuid(db_sled.id()),
db_sled.into(),
)
})
.collect();

StepSuccess::new(Arc::new(sleds_by_id)).into()
},
)
.new_step(ExecutionStepId::Fetch, "Fetch sled list", async move |_cx| {
let sleds_by_id: BTreeMap<SledUuid, _> = datastore
.sled_list_all_batched(opctx, SledFilter::InService)
.await
.context("listing all sleds")?
.into_iter()
.map(|db_sled| {
(SledUuid::from_untyped_uuid(db_sled.id()), db_sled.into())
})
.collect();

StepSuccess::new(Arc::new(sleds_by_id)).into()
})
.register()
}

Expand All @@ -309,7 +302,7 @@ fn register_deploy_sled_configs_step<'a>(
.new_step(
ExecutionStepId::Ensure,
"Deploy sled configs",
move |cx| async move {
async move |cx| {
let sleds_by_id = sleds.into_value(cx.token()).await;
let res = omicron_sled_config::deploy_sled_configs(
opctx,
Expand Down Expand Up @@ -340,7 +333,7 @@ fn register_plumb_firewall_rules_step<'a>(
.new_step(
ExecutionStepId::Ensure,
"Plumb service firewall rules",
move |_cx| async move {
async move |_cx| {
let res = nexus_networking::plumb_service_firewall_rules(
datastore,
opctx,
Expand Down Expand Up @@ -369,7 +362,7 @@ fn register_dns_records_step<'a>(
.new_step(
ExecutionStepId::Ensure,
"Deploy DNS records",
move |cx| async move {
async move |cx| {
let sleds_by_id = sleds.into_value(cx.token()).await;

let res = dns::deploy_dns(
Expand Down Expand Up @@ -399,7 +392,7 @@ fn register_cleanup_expunged_zones_step<'a>(
.new_step(
ExecutionStepId::Cleanup,
"Cleanup expunged zones",
move |_cx| async move {
async move |_cx| {
let res = omicron_zones::clean_up_expunged_zones(
opctx, datastore, resolver, blueprint,
)
Expand All @@ -421,7 +414,7 @@ fn register_decommission_sleds_step<'a>(
.new_step(
ExecutionStepId::Cleanup,
"Decommission sleds",
move |_cx| async move {
async move |_cx| {
let res =
sled_state::decommission_sleds(opctx, datastore, blueprint)
.await
Expand All @@ -442,7 +435,7 @@ fn register_decommission_disks_step<'a>(
.new_step(
ExecutionStepId::Cleanup,
"Decommission expunged disks",
move |_cx| async move {
async move |_cx| {
let res = omicron_physical_disks::decommission_expunged_disks(
opctx, datastore, blueprint,
)
Expand All @@ -463,7 +456,7 @@ fn register_deploy_clickhouse_cluster_nodes_step<'a>(
.new_step(
ExecutionStepId::Ensure,
"Deploy clickhouse cluster nodes",
move |_cx| async move {
async move |_cx| {
if let Some(clickhouse_cluster_config) =
&blueprint.clickhouse_cluster_config
{
Expand Down Expand Up @@ -492,7 +485,7 @@ fn register_deploy_clickhouse_single_node_step<'a>(
.new_step(
ExecutionStepId::Ensure,
"Deploy single-node clickhouse cluster",
move |_cx| async move {
async move |_cx| {
let res =
clickhouse::deploy_single_node(opctx, blueprint).await;
Ok(map_err_to_step_warning(res))
Expand All @@ -513,7 +506,7 @@ fn register_reassign_sagas_step<'a>(
.new_step(
ExecutionStepId::Cleanup,
"Reassign sagas",
move |_cx| async move {
async move |_cx| {
// For any expunged Nexus zones, re-assign in-progress sagas to
// some other Nexus. If this fails for some reason, it doesn't
// affect anything else.
Expand Down Expand Up @@ -546,7 +539,7 @@ fn register_cockroachdb_settings_step<'a>(
.new_step(
ExecutionStepId::Ensure,
"Ensure CockroachDB settings",
move |_cx| async move {
async move |_cx| {
let res =
cockroachdb::ensure_settings(opctx, datastore, blueprint)
.await;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ async fn decommission_expunged_disks_impl(
expunged_disks: impl Iterator<Item = (SledUuid, PhysicalDiskUuid)>,
) -> Result<(), Vec<anyhow::Error>> {
let errors: Vec<anyhow::Error> = stream::iter(expunged_disks)
.filter_map(|(sled_id, disk_id)| async move {
.filter_map(async |(sled_id, disk_id)| {
let log = opctx.log.new(slog::o!(
"sled_id" => sled_id.to_string(),
"disk_id" => disk_id.to_string(),
Expand Down
2 changes: 1 addition & 1 deletion nexus/reconfigurator/execution/src/omicron_sled_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub(crate) async fn deploy_sled_configs(
sled_configs: &BTreeMap<SledUuid, BlueprintSledConfig>,
) -> Result<(), Vec<anyhow::Error>> {
let errors: Vec<_> = stream::iter(sled_configs)
.filter_map(|(sled_id, config)| async move {
.filter_map(async |(sled_id, config)| {
let log = opctx.log.new(slog::o!(
"sled_id" => sled_id.to_string(),
"generation" => i64::from(&config.sled_agent_generation),
Expand Down
2 changes: 1 addition & 1 deletion nexus/reconfigurator/execution/src/omicron_zones.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ async fn clean_up_expunged_zones_impl<R: CleanupResolver>(
zones_to_clean_up: impl Iterator<Item = (SledUuid, &BlueprintZoneConfig)>,
) -> Result<(), Vec<anyhow::Error>> {
let errors: Vec<anyhow::Error> = stream::iter(zones_to_clean_up)
.filter_map(|(sled_id, config)| async move {
.filter_map(async |(sled_id, config)| {
let log = opctx.log.new(slog::o!(
"sled_id" => sled_id.to_string(),
"zone_id" => config.id.to_string(),
Expand Down
Loading