Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix batch queue handling in Optimism derivation #73

Merged
merged 2 commits into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 97 additions & 76 deletions lib/src/optimism/batcher.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023 RISC Zero, Inc.
// Copyright 2024 RISC Zero, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -13,14 +13,11 @@
// limitations under the License.

use core::cmp::Ordering;
use std::{
cmp::Reverse,
collections::{BinaryHeap, VecDeque},
};
use std::collections::{BTreeMap, VecDeque};

use anyhow::{ensure, Context, Result};
use anyhow::{bail, ensure, Context, Result};
use zeth_primitives::{
batch::Batch,
batch::{Batch, BatchEssence},
transactions::{
ethereum::EthereumTxEssence,
optimism::{OptimismTxEssence, OPTIMISM_DEPOSITED_TX_TYPE},
Expand All @@ -34,9 +31,16 @@ use super::{
};

#[derive(Debug, Clone, Copy, Eq, PartialEq, Default)]
pub struct BlockInfo {
pub struct BlockId {
pub hash: B256,
pub number: BlockNumber,
}

#[derive(Debug, Clone, Copy, Eq, PartialEq, Default)]
pub struct L2BlockInfo {
pub hash: B256,
pub timestamp: u64,
pub l1_origin: BlockId,
}

#[derive(Clone, Debug, Default, PartialEq, Eq)]
Expand All @@ -52,7 +56,7 @@ pub struct Epoch {
pub struct State {
pub current_l1_block_number: BlockNumber,
pub current_l1_block_hash: BlockHash,
pub safe_head: BlockInfo,
pub safe_head: L2BlockInfo,
pub epoch: Epoch,
pub op_epoch_queue: VecDeque<Epoch>,
pub next_epoch: Option<Epoch>,
Expand All @@ -62,7 +66,7 @@ impl State {
pub fn new(
current_l1_block_number: BlockNumber,
current_l1_block_hash: BlockHash,
safe_head: BlockInfo,
safe_head: L2BlockInfo,
epoch: Epoch,
) -> Self {
State {
Expand All @@ -75,19 +79,19 @@ impl State {
}
}

pub fn do_next_epoch(&mut self) -> anyhow::Result<()> {
self.epoch = self.next_epoch.take().expect("No next epoch!");
pub fn do_next_epoch(&mut self) -> Result<()> {
self.epoch = self.next_epoch.take().context("no next epoch!")?;
self.deque_next_epoch_if_none()?;
Ok(())
}

pub fn push_epoch(&mut self, epoch: Epoch) -> anyhow::Result<()> {
pub fn push_epoch(&mut self, epoch: Epoch) -> Result<()> {
self.op_epoch_queue.push_back(epoch);
self.deque_next_epoch_if_none()?;
Ok(())
}

fn deque_next_epoch_if_none(&mut self) -> anyhow::Result<()> {
fn deque_next_epoch_if_none(&mut self) -> Result<()> {
if self.next_epoch.is_none() {
while let Some(next_epoch) = self.op_epoch_queue.pop_front() {
if next_epoch.number <= self.epoch.number {
Expand All @@ -96,7 +100,7 @@ impl State {
self.next_epoch = Some(next_epoch);
break;
} else {
anyhow::bail!("Epoch gap!");
bail!("Epoch gap!");
}
}
}
Expand All @@ -112,8 +116,15 @@ enum BatchStatus {
Future,
}

/// A [Batch] with inclusion information.
pub struct BatchWithInclusion {
pub essence: BatchEssence,
pub inclusion_block_number: BlockNumber,
}

pub struct Batcher {
batches: BinaryHeap<Reverse<Batch>>,
/// Multimap of batches, keyed by timestamp
batches: BTreeMap<u64, VecDeque<BatchWithInclusion>>,
batcher_channel: BatcherChannels,
pub state: State,
pub config: ChainConfig,
Expand All @@ -122,7 +133,7 @@ pub struct Batcher {
impl Batcher {
pub fn new(
config: ChainConfig,
op_head: BlockInfo,
op_head: L2BlockInfo,
eth_block: &BlockInput<EthereumTxEssence>,
) -> Result<Batcher> {
let eth_block_hash = eth_block.block_header.hash();
Expand All @@ -143,7 +154,7 @@ impl Batcher {
);

Ok(Batcher {
batches: BinaryHeap::new(),
batches: BTreeMap::new(),
batcher_channel,
state,
config,
Expand Down Expand Up @@ -198,7 +209,10 @@ impl Batcher {
batch.essence.parent_hash,
batch.essence.epoch_num
);
self.batches.push(Reverse(batch));
self.batches
.entry(batch.essence.timestamp)
.or_default()
.push_back(batch);
});
}

Expand All @@ -209,78 +223,85 @@ impl Batcher {
}

pub fn read_batch(&mut self) -> Result<Option<Batch>> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice

let mut out = None;
let epoch = &self.state.epoch;
let safe_l2_head = self.state.safe_head;

ensure!(
safe_l2_head.l1_origin.hash == epoch.hash
|| safe_l2_head.l1_origin.number == epoch.number - 1,
"buffered L1 chain epoch does not match safe head origin"
);

let mut next_batch = None;

// Grab the first accepted batch. From the spec:
// "The batches are processed in order of the inclusion on L1: if multiple batches can be
// accept-ed the first is applied. An implementation can defer future batches a later
// derivation step to reduce validation work."
while let Some(Reverse(batch)) = self.batches.pop() {
match self.batch_status(&batch) {
BatchStatus::Accept => {
out = Some(batch);
break;
}
BatchStatus::Drop => {
#[cfg(not(target_os = "zkvm"))]
log::debug!("Dropping batch");
}
BatchStatus::Future => {
#[cfg(not(target_os = "zkvm"))]
log::debug!("Encountered future batch");

self.batches.push(Reverse(batch));
break;
}
BatchStatus::Undecided => {
#[cfg(not(target_os = "zkvm"))]
log::debug!("Encountered undecided batch");

self.batches.push(Reverse(batch));
break;
'outer: while let Some((ts, mut batches)) = self.batches.pop_first() {
// iterate over all batches, in order of inclusion and find the first accepted batch
// retain batches that may be processed in the future, or those we are undecided on
while let Some(batch) = batches.pop_front() {
match self.batch_status(&batch) {
BatchStatus::Accept => {
next_batch = Some(batch);
// if there are still batches left, insert them back into the map
if !batches.is_empty() {
self.batches.insert(ts, batches);
}
break 'outer;
}
BatchStatus::Drop => {}
BatchStatus::Future | BatchStatus::Undecided => {
batches.push_front(batch);
self.batches.insert(ts, batches);
break 'outer;
}
}
}
}

if let Some(batch) = next_batch {
return Ok(Some(Batch(batch.essence)));
}

// If there are no accepted batches, attempt to generate the default batch. From the spec:
// "If no batch can be accept-ed, and the stage has completed buffering of all batches
// that can fully be read from the L1 block at height epoch.number +
// sequence_window_size, and the next_epoch is available, then an empty batch can
// be derived."
if out.is_none() {
let current_l1_block = self.state.current_l1_block_number;
let safe_head = self.state.safe_head;
let current_epoch = &self.state.epoch;
let next_epoch = &self.state.next_epoch;
let seq_window_size = self.config.seq_window_size;

if let Some(next_epoch) = next_epoch {
if current_l1_block > current_epoch.number + seq_window_size {
let next_timestamp = safe_head.timestamp + self.config.blocktime;
let epoch = if next_timestamp < next_epoch.timestamp {
// From the spec:
// "If next_timestamp < next_epoch.time: the current L1 origin is repeated,
// to preserve the L2 time invariant."
current_epoch
} else {
next_epoch
};

out = Some(Batch::new(
current_l1_block,
safe_head.hash,
epoch.number,
epoch.hash,
next_timestamp,
))
}
// sequence_window_size, and the next_epoch is available, then an empty batch can be
// derived."
let current_l1_block = self.state.current_l1_block_number;
let sequence_window_size = self.config.seq_window_size;
let first_of_epoch = epoch.number == safe_l2_head.l1_origin.number + 1;

if current_l1_block > epoch.number + sequence_window_size {
if let Some(next_epoch) = &self.state.next_epoch {
let next_timestamp = safe_l2_head.timestamp + self.config.blocktime;
let batch_epoch = if next_timestamp < next_epoch.timestamp || first_of_epoch {
// From the spec:
// "If next_timestamp < next_epoch.time: the current L1 origin is repeated,
// to preserve the L2 time invariant."
// "If the batch is the first batch of the epoch, that epoch is used instead
// of advancing the epoch to ensure that there is at least one L2 block per
// epoch."
epoch
} else {
next_epoch
};

return Ok(Some(Batch::new(
safe_l2_head.hash,
batch_epoch.number,
batch_epoch.hash,
next_timestamp,
)));
}
}

Ok(out)
Ok(None)
}

fn batch_status(&self, batch: &Batch) -> BatchStatus {
fn batch_status(&self, batch: &BatchWithInclusion) -> BatchStatus {
// Apply the batch status rules. The spec describes a precise order for these checks.

let epoch = &self.state.epoch;
Expand All @@ -295,7 +316,7 @@ impl Batcher {
Ordering::Greater => {
#[cfg(not(target_os = "zkvm"))]
log::debug!(
"Future batch: {} = batch.essence.timestamp > next_timestamp = {}",
"Future batch: {} = batch.timestamp > next_timestamp = {}",
&batch.essence.timestamp,
&next_timestamp
);
Expand All @@ -304,7 +325,7 @@ impl Batcher {
Ordering::Less => {
#[cfg(not(target_os = "zkvm"))]
log::debug!(
"Batch too old: {} = batch.essence.timestamp < next_timestamp = {}",
"Batch too old: {} = batch.timestamp < next_timestamp = {}",
&batch.essence.timestamp,
&next_timestamp
);
Expand Down
24 changes: 15 additions & 9 deletions lib/src/optimism/batcher_channel.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023 RISC Zero, Inc.
// Copyright 2024 RISC Zero, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,7 +27,7 @@ use zeth_primitives::{
Address, BlockNumber,
};

use super::config::ChainConfig;
use super::{batcher::BatchWithInclusion, config::ChainConfig};
use crate::utils::MultiReader;

pub const MAX_RLP_BYTES_PER_CHANNEL: u64 = 10_000_000;
Expand All @@ -37,7 +37,7 @@ pub struct BatcherChannels {
max_channel_bank_size: u64,
channel_timeout: u64,
channels: VecDeque<Channel>,
batches: VecDeque<Vec<Batch>>,
batches: VecDeque<Vec<BatchWithInclusion>>,
}

impl BatcherChannels {
Expand Down Expand Up @@ -126,7 +126,7 @@ impl BatcherChannels {
Ok(())
}

pub fn read_batches(&mut self) -> Option<Vec<Batch>> {
pub fn read_batches(&mut self) -> Option<Vec<BatchWithInclusion>> {
self.batches.pop_front()
}

Expand Down Expand Up @@ -282,7 +282,7 @@ impl Channel {

/// Reads all batches from an ready channel. If there is an invalid batch, the rest of
/// the channel is skipped, but previous batches are returned.
fn read_batches(&self, block_number: BlockNumber) -> Vec<Batch> {
fn read_batches(&self, block_number: BlockNumber) -> Vec<BatchWithInclusion> {
debug_assert!(self.is_ready());

let mut batches = Vec::new();
Expand All @@ -297,18 +297,24 @@ impl Channel {
batches
}

fn decode_batches(&self, block_number: BlockNumber, batches: &mut Vec<Batch>) -> Result<()> {
fn decode_batches(
&self,
block_number: BlockNumber,
batches: &mut Vec<BatchWithInclusion>,
) -> Result<()> {
let decompressed = self
.decompress()
.context("failed to decompress channel data")?;

let mut channel_data = decompressed.as_slice();
while !channel_data.is_empty() {
let mut batch = Batch::decode(&mut channel_data)
let batch = Batch::decode(&mut channel_data)
.with_context(|| format!("failed to decode batch {}", batches.len()))?;
batch.inclusion_block_number = block_number;

batches.push(batch);
batches.push(BatchWithInclusion {
essence: batch.0,
inclusion_block_number: block_number,
});
}

Ok(())
Expand Down
Loading