Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: separate statuses of inbound and outbound migrations #661

Merged
merged 6 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions bin/mock-server/src/lib/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ impl InstanceContext {
watch::channel(api::InstanceStateMonitorResponse {
gen: 0,
state: api::InstanceState::Creating,
migration: None,
migration: api::InstanceMigrateStatusResponse {
migration_in: None,
migration_out: None,
},
});
let serial = serial::Serial::new(&properties.name);

Expand Down Expand Up @@ -99,7 +102,10 @@ impl InstanceContext {
.send(api::InstanceStateMonitorResponse {
gen: self.generation,
state: self.state,
migration: None,
migration: api::InstanceMigrateStatusResponse {
migration_in: None,
migration_out: None,
},
})
.map_err(|_| Error::TransitionSendFail)
}
Expand Down Expand Up @@ -258,7 +264,10 @@ async fn instance_state_monitor(
let response = api::InstanceStateMonitorResponse {
gen: last.gen,
state: last.state,
migration: None,
migration: api::InstanceMigrateStatusResponse {
migration_in: None,
migration_out: None,
},
};
return Ok(HttpResponseOk(response));
}
Expand Down
38 changes: 32 additions & 6 deletions bin/propolis-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,12 +518,38 @@ async fn migrate_instance(
.map(|(role, client, id)| {
tokio::spawn(async move {
loop {
let state = client
.instance_migrate_status()
.migration_id(migration_id)
.send()
.await?
.state;
let state =
client.instance_migrate_status().send().await?.into_inner();

let migration = if role == "src" {
state.migration_out
} else {
state.migration_in
};

// The destination should start reporting migration status as
// soon as the ensure request completes. The source may not
// have a migration status yet because the request from the
// destination needs to arrive first.
let Some(migration) = migration else {
if role == "dst" {
anyhow::bail!("dst instance's migration ID wasn't set");
} else {
println!("src hasn't received migration request yet");
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
};

if migration.id != migration_id {
anyhow::bail!(
"{role} instance's migration ID is wrong: \
got {}, expected {migration_id}",
migration.id
);
}

let state = migration.state;
println!("{}({}) migration state={:?}", role, id, state);
if state == MigrationState::Finish {
return Ok::<_, anyhow::Error>(());
Expand Down
23 changes: 3 additions & 20 deletions bin/propolis-server/src/lib/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use std::net::SocketAddrV6;
use std::sync::Arc;
use std::{collections::BTreeMap, net::SocketAddr};

use crate::migrate::MigrateError;
use crate::serial::history_buffer::SerialHistoryOffset;
use crate::serial::SerialTaskControlMessage;
use dropshot::{
Expand Down Expand Up @@ -944,35 +943,19 @@ async fn instance_migrate_start(

#[endpoint {
method = GET,
path = "/instance/migrate/{migration_id}/status"
path = "/instance/migration-status"
}]
async fn instance_migrate_status(
rqctx: RequestContext<Arc<DropshotEndpointContext>>,
path_params: Path<api::InstanceMigrateStatusRequest>,
) -> Result<HttpResponseOk<api::InstanceMigrateStatusResponse>, HttpError> {
let migration_id = path_params.into_inner().migration_id;
let ctx = rqctx.context();
match &*ctx.services.vm.lock().await {
VmControllerState::NotCreated => Err(not_created_error()),
VmControllerState::Created(vm) => {
vm.migrate_status(migration_id).map_err(Into::into).map(|state| {
HttpResponseOk(api::InstanceMigrateStatusResponse {
migration_id,
state,
})
})
Ok(HttpResponseOk(vm.migrate_status()))
}
VmControllerState::Destroyed { state_watcher, .. } => {
let watcher = state_watcher.borrow();
match &watcher.migration {
None => Err((MigrateError::NoMigrationInProgress).into()),
Some(migration_status)
if migration_status.migration_id == migration_id =>
{
Ok(HttpResponseOk(migration_status.clone()))
}
Some(_) => Err((MigrateError::UuidMismatch).into()),
}
Ok(HttpResponseOk(state_watcher.borrow().migration.clone()))
}
}
}
Expand Down
72 changes: 43 additions & 29 deletions bin/propolis-server/src/lib/vm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
//! components to raise events for the state driver to process (e.g. a request
//! from a VM's chipset to reboot or halt the VM).

use crate::migrate;

use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt};
use std::{
collections::{BTreeMap, VecDeque},
Expand All @@ -48,7 +50,9 @@ use propolis::{
vmm::Machine,
};
use propolis_api_types::{
instance_spec::VersionedInstanceSpec, InstanceProperties,
instance_spec::VersionedInstanceSpec,
InstanceMigrateStatusResponse as ApiMigrateStatusResponse,
InstanceMigrationStatus as ApiMigrationStatus, InstanceProperties,
InstanceState as ApiInstanceState,
InstanceStateMonitorResponse as ApiMonitoredState,
InstanceStateRequested as ApiInstanceStateRequested,
Expand All @@ -65,7 +69,7 @@ use crate::{
initializer::{
build_instance, MachineInitializer, MachineInitializerState,
},
migrate::{self, MigrateError},
migrate::{MigrateError, MigrateRole},
serial::Serial,
server::{BlockBackendMap, CrucibleBackendMap, DeviceMap, StaticConfig},
vm::request_queue::ExternalRequest,
Expand Down Expand Up @@ -254,7 +258,7 @@ struct SharedVmStateInner {
/// who query migration state will observe that a live migration is in
/// progress even if the state driver has yet to pick up the live migration
/// tasks from its queue.
pending_migration_id: Option<Uuid>,
pending_migration_id: Option<(Uuid, MigrateRole)>,
}

impl SharedVmStateInner {
Expand Down Expand Up @@ -385,6 +389,11 @@ impl SharedVmState {
pub fn io_error_event(&self, vcpu_id: i32, error: std::io::Error) {
panic!("vCPU {}: Unhandled vCPU error: {}", vcpu_id, error);
}

pub fn clear_pending_migration(&self) {
let mut inner = self.inner.lock().unwrap();
inner.pending_migration_id = None;
}
}

/// Functions called by a Propolis chipset to notify another component that an
Expand Down Expand Up @@ -442,7 +451,10 @@ impl VmController {
tokio::sync::watch::channel(ApiMonitoredState {
gen: 0,
state: ApiInstanceState::Creating,
migration: None,
migration: ApiMigrateStatusResponse {
migration_in: None,
migration_out: None,
},
});

let worker_state = Arc::new(SharedVmState::new(&log));
Expand Down Expand Up @@ -821,34 +833,36 @@ impl VmController {
.map_err(Into::into)
}

pub fn migrate_status(
&self,
migration_id: Uuid,
) -> Result<ApiMigrationState, MigrateError> {
// If the state worker has published migration state with a matching ID,
// report the status from the worker. Note that this call to `borrow`
// takes a lock on the channel.
let published = self.vm_objects.monitor_rx.borrow();
if let Some(status) = &published.migration {
if status.migration_id == migration_id {
return Ok(status.state);
}
}
drop(published);

// Either the worker hasn't published any status or the IDs didn't
// match. See if there's a pending migration task that the worker hasn't
// picked up yet that has the correct ID and report its status if so.
pub fn migrate_status(&self) -> ApiMigrateStatusResponse {
let mut published =
self.vm_objects.monitor_rx.borrow().migration.clone();

// There's a window between the point where a request to migrate returns
// and the point where the state worker actually picks up the migration
// and publishes its state. To ensure that migrations are visible as
// soon as they're queued, pick up the queued migration (if there is
// one) and insert it into the output in the appropriate position. The
// state driver will consume the pending migration before actually
// executing it.
let inner = self.worker_state.inner.lock().unwrap();
if let Some(id) = inner.pending_migration_id {
if migration_id != id {
Err(MigrateError::UuidMismatch)
} else {
Ok(ApiMigrationState::Sync)
if let Some((id, role)) = inner.pending_migration_id {
match role {
MigrateRole::Destination => {
published.migration_in = Some(ApiMigrationStatus {
id,
state: ApiMigrationState::Sync,
});
}
MigrateRole::Source => {
published.migration_out = Some(ApiMigrationStatus {
id,
state: ApiMigrationState::Sync,
});
}
}
} else {
Err(MigrateError::NoMigrationInProgress)
}

published
}

pub(crate) fn for_each_device(
Expand Down
Loading
Loading