Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Supervisor transaction validation #448

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
293 changes: 238 additions & 55 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 6 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ members = [
default-members = ["crates/rbuilder", "crates/reth-rbuilder", "crates/test-relay"]
resolver = "2"


# Like release, but with full debug symbols. Useful for e.g. `perf`.
[profile.debug-fast]
inherits = "release"
Expand All @@ -33,6 +34,8 @@ codegen-units = 1
incremental = false

[workspace.dependencies]
kona-rpc = {git="https://github.com/op-rs/kona/", features=["interop","client","jsonrpsee"]}
kona-interop = {git="https://github.com/op-rs/kona/", features=["interop"]}
reth = { git = "https://github.com/paradigmxyz/reth", tag = "v1.2.0" }
reth-chain-state = { git = "https://github.com/paradigmxyz/reth", tag = "v1.2.0" }
reth-beacon-consensus = { git = "https://github.com/paradigmxyz/reth", tag = "v1.2.0" }
Expand Down Expand Up @@ -110,6 +113,7 @@ ethereum_ssz_derive = "0.8"
ethereum_ssz = "0.8"

alloy-primitives = { version = "0.8.15", default-features = false }
alloy-sol-types = { version = "0.8.15" }
alloy-rlp = "0.3.10"
alloy-chains = "0.1.33"
alloy-provider = { version = "0.11.1", features = ["ipc", "pubsub"] }
Expand Down Expand Up @@ -142,8 +146,8 @@ clap = { version = "4.4.3", features = ["derive", "env"] }
clap_builder = { version = "4.5.19" }
thiserror = { version = "1.0.64" }
eyre = { version = "0.6.12" }
jsonrpsee = { version = "0.24.4" }
jsonrpsee-types = { version = "0.24.4" }
jsonrpsee = { version = "0.24.8" }
jsonrpsee-types = { version = "0.24.8" }
parking_lot = { version = "0.12.3" }
tokio = { version = "1.40.0" }
auto_impl = { version = "1.2.0" }
Expand Down
14 changes: 3 additions & 11 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -49,22 +49,14 @@ RUN --mount=type=cache,target=/usr/local/cargo/registry \
--mount=type=cache,target=$SCCACHE_DIR,sharing=locked \
cargo build --release --features="$FEATURES" --package=${RBUILDER_BIN}

FROM builder as test-relay
RUN --mount=type=cache,target=/usr/local/cargo/registry \
--mount=type=cache,target=/usr/local/cargo/git \
--mount=type=cache,target=$SCCACHE_DIR,sharing=locked \
cargo build --release --features="$FEATURES" --package=test-relay



# Runtime container for rbuilder
FROM gcr.io/distroless/cc-debian12 as rbuilder-runtime
ARG RBUILDER_BIN="rbuilder"
WORKDIR /app
COPY --from=rbuilder /app/target/release/rbuilder /app/rbuilder
COPY --from=rbuilder /app/target/release/${RBUILDER_BIN} /app/rbuilder
ENTRYPOINT ["/app/rbuilder"]

# Runtime container for test-relay
FROM gcr.io/distroless/cc-debian12 as test-relay-runtime
WORKDIR /app
COPY --from=test-relay /app/target/release/test-relay /app/test-relay
ENTRYPOINT ["/app/test-relay"]

4 changes: 4 additions & 0 deletions crates/op-rbuilder/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ homepage.workspace = true
repository.workspace = true

[dependencies]
kona-interop.workspace = true
kona-rpc.workspace = true
reth.workspace = true
reth-optimism-node.workspace = true
reth-optimism-cli.workspace = true
Expand Down Expand Up @@ -47,6 +49,7 @@ alloy-rpc-types-beacon.workspace = true
alloy-rpc-types-engine.workspace = true
alloy-transport-http.workspace = true
alloy-rpc-types-eth.workspace = true
alloy-sol-types.workspace = true
alloy-rpc-client.workspace = true
alloy-transport.workspace = true
alloy-network.workspace = true
Expand All @@ -63,6 +66,7 @@ futures-util = "0.3.31"
eyre.workspace = true
alloy-provider.workspace = true
tower = "0.4"
thiserror.workspace = true
serde_with.workspace = true
serde.workspace = true
secp256k1.workspace = true
Expand Down
4 changes: 4 additions & 0 deletions crates/op-rbuilder/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use reth_optimism_node::args::RollupArgs;

use crate::tx_signer::Signer;
use alloy_transport_http::reqwest::Url;

/// Parameters for rollup configuration
#[derive(Debug, Clone, Default, PartialEq, Eq, clap::Args)]
Expand All @@ -17,4 +18,7 @@ pub struct OpRbuilderArgs {
/// Builder secret key for signing last transaction in block
#[arg(long = "rollup.builder-secret-key", env = "BUILDER_SECRET_KEY")]
pub builder_signer: Option<Signer>,
/// Supervisor URL
#[arg(long = "rollup.supervisor-url", env = "SUPERVISOR_URL")]
pub supervisor_url: Option<Url>,
}
11 changes: 6 additions & 5 deletions crates/op-rbuilder/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ mod payload_builder_vanilla;
mod tester;
mod tx_signer;

mod supervisor;

fn main() {
Cli::<OpChainSpecParser, args::OpRbuilderArgs>::parse()
.run(|builder, builder_args| async move {
Expand All @@ -28,11 +30,10 @@ fn main() {
let op_node = OpNode::new(rollup_args.clone());
let handle = builder
.with_types::<OpNode>()
.with_components(
op_node
.components()
.payload(CustomOpPayloadBuilder::new(builder_args.builder_signer)),
)
.with_components(op_node.components().payload(CustomOpPayloadBuilder::new(
builder_args.builder_signer,
builder_args.supervisor_url,
)))
.with_add_ons(
OpAddOnsBuilder::default()
.with_sequencer(rollup_args.sequencer_http.clone())
Expand Down
63 changes: 60 additions & 3 deletions crates/op-rbuilder/src/payload_builder_vanilla.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::generator::BlockPayloadJobGenerator;
use crate::generator::BuildArguments;
use crate::supervisor::{ExecutingMessageValidator, SupervisorValidator};
use crate::{
generator::{BlockCell, PayloadBuilder},
metrics::OpRBuilderMetrics,
Expand All @@ -13,8 +14,11 @@ use alloy_consensus::{
use alloy_eips::merge::BEACON_NONCE;
use alloy_primitives::private::alloy_rlp::Encodable;
use alloy_primitives::{Address, Bytes, TxKind, B256, U256};
use alloy_rpc_client::ReqwestClient;
use alloy_rpc_types_engine::PayloadId;
use alloy_rpc_types_eth::Withdrawals;
use alloy_transport_http::reqwest::Url;
use kona_interop::{ExecutingMessage, SafetyLevel, SupervisorClient};
use op_alloy_consensus::{OpDepositReceipt, OpTypedTransaction};
use reth::builder::{components::PayloadServiceBuilder, node::FullNodeTypes, BuilderContext};
use reth::core::primitives::InMemorySize;
Expand Down Expand Up @@ -76,15 +80,19 @@ use std::{fmt::Display, sync::Arc, time::Instant};
use tokio_util::sync::CancellationToken;
use tracing::{info, trace, warn};

#[derive(Debug, Clone, Copy, Default)]
#[derive(Debug, Clone, Default)]
#[non_exhaustive]
pub struct CustomOpPayloadBuilder {
builder_signer: Option<Signer>,
supervisor_url: Option<Url>,
}

impl CustomOpPayloadBuilder {
pub fn new(builder_signer: Option<Signer>) -> Self {
Self { builder_signer }
pub fn new(builder_signer: Option<Signer>, supervisor_url: Option<Url>) -> Self {
Self {
builder_signer,
supervisor_url,
}
}
}

Expand Down Expand Up @@ -114,6 +122,9 @@ where
pool,
ctx.provider().clone(),
Arc::new(BasicOpReceiptBuilder::default()),
self.supervisor_url
.clone()
.expect("supervisor url is required"),
))
}

Expand Down Expand Up @@ -195,6 +206,8 @@ pub struct OpPayloadBuilderVanilla<Pool, Client, EvmConfig, N: NodePrimitives, T
pub metrics: OpRBuilderMetrics,
/// Node primitive types.
pub receipt_builder: Arc<dyn OpReceiptBuilder<N::SignedTx, Receipt = N::Receipt>>,
/// Client to execute supervisor validation
pub supervisor_client: SupervisorClient,
}

impl<Pool, Client, EvmConfig, N: NodePrimitives>
Expand All @@ -207,13 +220,15 @@ impl<Pool, Client, EvmConfig, N: NodePrimitives>
pool: Pool,
client: Client,
receipt_builder: Arc<dyn OpReceiptBuilder<N::SignedTx, Receipt = N::Receipt>>,
supervisor_url: Url,
) -> Self {
Self::with_builder_config(
evm_config,
builder_signer,
pool,
client,
receipt_builder,
supervisor_url,
Default::default(),
)
}
Expand All @@ -224,8 +239,10 @@ impl<Pool, Client, EvmConfig, N: NodePrimitives>
pool: Pool,
client: Client,
receipt_builder: Arc<dyn OpReceiptBuilder<N::SignedTx, Receipt = N::Receipt>>,
supervisor_url: Url,
config: OpBuilderConfig,
) -> Self {
let supervisor_client = SupervisorClient::new(ReqwestClient::new_http(supervisor_url));
Self {
pool,
client,
Expand All @@ -235,6 +252,7 @@ impl<Pool, Client, EvmConfig, N: NodePrimitives>
best_transactions: (),
metrics: Default::default(),
builder_signer,
supervisor_client,
}
}
}
Expand Down Expand Up @@ -333,6 +351,7 @@ where
receipt_builder: self.receipt_builder.clone(),
builder_signer: self.builder_signer,
metrics: Default::default(),
supervisor_client: self.supervisor_client.clone(),
};

let builder = OpBuilder::new(best);
Expand Down Expand Up @@ -779,6 +798,8 @@ pub struct OpPayloadBuilderCtx<EvmConfig: ConfigureEvmEnv, ChainSpec, N: NodePri
pub builder_signer: Option<Signer>,
/// The metrics for the builder
pub metrics: OpRBuilderMetrics,
/// Client to execute supervisor validation
pub supervisor_client: SupervisorClient,
}

impl<EvmConfig, ChainSpec, N> OpPayloadBuilderCtx<EvmConfig, ChainSpec, N>
Expand Down Expand Up @@ -1069,6 +1090,42 @@ where
return Err(PayloadBuilderError::EvmExecutionError(Box::new(err)));
}
};
// op-supervisor validation
let logs = result.clone().into_logs();
let executing_messages = SupervisorValidator::parse_messages(logs.as_slice())
.flatten()
.collect::<Vec<ExecutingMessage>>();
if !executing_messages.is_empty() {
info!("ExecutingMessage number {}", executing_messages.len());
let (tx, rx) = std::sync::mpsc::channel();
tokio::task::block_in_place(move || {
let res = tokio::runtime::Handle::current().block_on(async {
SupervisorValidator::validate_messages(
&self.supervisor_client,
executing_messages.as_slice(),
SafetyLevel::Finalized,
None,
)
.await
});
let _ = tx.send(res);
});
let res = rx.recv();

match res {
Ok(res) => match res {
Ok(()) => (),
Err(err) => {
trace!(target: "payload_builder", %err, "Error in supervisor validation, skipping.");
continue;
}
},
Err(err) => {
warn!("Channel closed during supervisor validation.");
return Err(PayloadBuilderError::Other(Box::new(err)));
}
}
}

// commit changes
evm.db_mut().commit(state);
Expand Down
67 changes: 67 additions & 0 deletions crates/op-rbuilder/src/supervisor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use alloy_primitives::Log;
use alloy_sol_types::SolEvent;
use async_trait::async_trait;
use core::time::Duration;
use kona_interop::{
ExecutingMessage, SafetyLevel, Supervisor, SupervisorClient, SupervisorError,
CROSS_L2_INBOX_ADDRESS,
};
use tokio::time::error::Elapsed;

pub struct SupervisorValidator;

impl ExecutingMessageValidator for SupervisorValidator {
const DEFAULT_TIMEOUT: Duration = Duration::from_millis(100);
}

/// Failures occurring during validation of [`ExecutingMessage`]s.
#[derive(thiserror::Error, Debug)]
pub enum ExecutingMessageValidatorError {
/// Failure from the [`SupervisorApiClient`] when validating messages.
#[error("Supervisor determined messages are invalid: {0}")]
SupervisorRpcError(#[from] SupervisorError),

/// Message validation against the Supervisor took longer than allowed.
#[error("Message validation timed out: {0}")]
ValidationTimeout(#[from] Elapsed),
}

/// Interacts with a Supervisor to validate [`ExecutingMessage`]s.
#[async_trait]
pub trait ExecutingMessageValidator {
/// Default duration that message validation is not allowed to exceed.
const DEFAULT_TIMEOUT: Duration;

/// Extracts [`ExecutingMessage`]s from the [`Log`] if there are any.
fn parse_messages(logs: &[Log]) -> impl Iterator<Item = Option<ExecutingMessage>> {
logs.iter().map(|log| {
(log.address == CROSS_L2_INBOX_ADDRESS && log.topics().len() == 2)
.then(|| ExecutingMessage::decode_log_data(&log.data, true).ok())
.flatten()
})
}

/// Validates a list of [`ExecutingMessage`]s against a Supervisor.
async fn validate_messages(
supervisor: &SupervisorClient,
messages: &[ExecutingMessage],
safety: SafetyLevel,
timeout: Option<Duration>,
) -> Result<(), ExecutingMessageValidatorError> {
// Set timeout duration based on input if provided.
let timeout = timeout.map_or(Self::DEFAULT_TIMEOUT, |t| t);

// Construct the future to validate all messages using supervisor.
let fut = async {
supervisor
.check_messages(messages, safety)
.await
.map_err(ExecutingMessageValidatorError::SupervisorRpcError)
};

// Await the validation future with timeout.
tokio::time::timeout(timeout, fut)
.await
.map_err(ExecutingMessageValidatorError::ValidationTimeout)?
}
}
1 change: 0 additions & 1 deletion crates/op-rbuilder/src/tester/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use reth_node_api::{EngineTypes, PayloadTypes};
use reth_optimism_node::OpEngineTypes;
use reth_payload_builder::PayloadId;
use reth_rpc_layer::{AuthClientLayer, AuthClientService, JwtSecret};
use serde_json;
use serde_json::Value;
use std::str::FromStr;
use std::time::SystemTime;
Expand Down
Loading