Skip to content

Commit

Permalink
use oneshot channel in verify
Browse files Browse the repository at this point in the history
  • Loading branch information
chenyukang committed Jan 18, 2024
1 parent 3c0d6cf commit 29bbcc1
Showing 1 changed file with 15 additions and 18 deletions.
33 changes: 15 additions & 18 deletions script/src/verify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use std::collections::{BTreeMap, HashMap};
use std::sync::RwLock;
use std::sync::{Arc, Mutex};
use tokio::sync::{
mpsc,
oneshot,
watch::{self, Receiver},
};

Expand Down Expand Up @@ -1392,17 +1392,14 @@ async fn run_vms_with_signal(
}

let mut pause = machines[0].pause();
let (finished_send, mut finished_recv) =
mpsc::unbounded_channel::<(Result<i8, ckb_vm::Error>, u64)>();
let (finish_tx, mut finish_rx) = oneshot::channel::<(Result<i8, ckb_vm::Error>, u64)>();

// send initial `Resume` command to child
// it's maybe useful to set initial command to `signal.borrow().to_owned()`
// so that we can control the initial state of child, which is useful for testing purpose
let (child_sender, child_recv) = watch::channel(ChunkCommand::Resume);
let (child_tx, child_rx) = watch::channel(ChunkCommand::Resume);
let jh =
tokio::spawn(
async move { run_vms_child(machines, child_recv, finished_send, context).await },
);
tokio::spawn(async move { run_vms_child(machines, child_rx, finish_tx, context).await });

loop {
tokio::select! {
Expand All @@ -1414,11 +1411,11 @@ async fn run_vms_with_signal(
}
ChunkCommand::Resume | ChunkCommand::Stop => {
pause.free();
let _res = child_sender.send(command);
let _res = child_tx.send(command);
}
}
}
Some(res) = finished_recv.recv() => {
Ok(res) = &mut finish_rx => {
let _ = jh.await;
match res {
(Ok(0), cycles) => {
Expand All @@ -1442,23 +1439,23 @@ async fn run_vms_with_signal(

async fn run_vms_child(
mut machines: Vec<ResumableMachine>,
mut child_recv: watch::Receiver<ChunkCommand>,
finished_send: mpsc::UnboundedSender<(Result<i8, ckb_vm::Error>, u64)>,
mut child_rx: watch::Receiver<ChunkCommand>,
finish_tx: oneshot::Sender<(Result<i8, ckb_vm::Error>, u64)>,
context: Arc<Mutex<MachineContext>>,
) {
let (mut exit_code, mut cycles, mut spawn_data) = (0, 0, None);
// mark changed to make sure we can receive initial command
// and start to run immediately
child_recv.mark_changed();
child_rx.mark_changed();
loop {
let _ = child_recv.changed().await;
match *child_recv.borrow() {
let _ = child_rx.changed().await;
match *child_rx.borrow() {
ChunkCommand::Stop => {
let exit = (
Err(ckb_vm::Error::Unexpected("stopped".to_string())),
cycles,
);
let _ = finished_send.send(exit);
let _ = finish_tx.send(exit);
return;
}
ChunkCommand::Suspend => {
Expand All @@ -1467,7 +1464,7 @@ async fn run_vms_child(
ChunkCommand::Resume => {}
}
if machines.is_empty() {
finished_send
finish_tx
.send((Ok(exit_code), cycles))
.expect("send finished");
return;
Expand Down Expand Up @@ -1495,7 +1492,7 @@ async fn run_vms_child(
spawn_data = None;
}
if machines.is_empty() {
finished_send.send((Ok(exit_code), cycles)).unwrap();
finish_tx.send((Ok(exit_code), cycles)).unwrap();
return;
}
}
Expand All @@ -1520,7 +1517,7 @@ async fn run_vms_child(
_ => {
// other error happened here, for example CyclesExceeded,
// we need to return as verification failed
finished_send
finish_tx
.send((res, machine.cycles()))
.expect("send finished");
return;
Expand Down

0 comments on commit 29bbcc1

Please sign in to comment.