Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Tasks trait from PayloadJobGenerator and PayloadJob #437

Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 23 additions & 35 deletions crates/op-rbuilder/src/generator.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use futures_util::Future;
use futures_util::FutureExt;
use reth::providers::BlockReaderIdExt;
use reth::providers::StateProviderFactory;
use reth::{
builder::{components::PayloadServiceBuilder, node::FullNodeTypes, BuilderContext},
payload::PayloadBuilderHandle,
providers::CanonStateSubscriptions,
transaction_pool::TransactionPool,
};
use reth::{providers::StateProviderFactory, tasks::TaskSpawner};
use reth_basic_payload_builder::{BasicPayloadJobGeneratorConfig, PayloadConfig};
use reth_node_api::NodeTypesWithEngine;
use reth_node_api::PayloadBuilderAttributes;
Expand Down Expand Up @@ -78,7 +78,6 @@ where
let payload_generator = BlockPayloadJobGenerator::with_builder(
ctx.provider().clone(),
pool,
ctx.task_executor().clone(),
payload_job_config,
self.builder,
false,
Expand Down Expand Up @@ -131,13 +130,11 @@ pub trait PayloadBuilder<Pool, Client>: Send + Sync + Clone {

/// The generator type that creates new jobs that builds empty blocks.
#[derive(Debug)]
pub struct BlockPayloadJobGenerator<Client, Pool, Tasks, Builder> {
pub struct BlockPayloadJobGenerator<Client, Pool, Builder> {
/// The client that can interact with the chain.
client: Client,
/// txpool
pool: Pool,
/// How to spawn building tasks
executor: Tasks,
/// The configuration for the job generator.
_config: BasicPayloadJobGeneratorConfig,
/// The type responsible for building payloads.
Expand All @@ -152,21 +149,19 @@ pub struct BlockPayloadJobGenerator<Client, Pool, Tasks, Builder> {

// === impl EmptyBlockPayloadJobGenerator ===

impl<Client, Pool, Tasks, Builder> BlockPayloadJobGenerator<Client, Pool, Tasks, Builder> {
impl<Client, Pool, Builder> BlockPayloadJobGenerator<Client, Pool, Builder> {
/// Creates a new [EmptyBlockPayloadJobGenerator] with the given config and custom
/// [PayloadBuilder]
pub fn with_builder(
client: Client,
pool: Pool,
executor: Tasks,
config: BasicPayloadJobGeneratorConfig,
builder: Builder,
ensure_only_one_payload: bool,
) -> Self {
Self {
client,
pool,
executor,
_config: config,
builder,
ensure_only_one_payload,
Expand All @@ -175,21 +170,19 @@ impl<Client, Pool, Tasks, Builder> BlockPayloadJobGenerator<Client, Pool, Tasks,
}
}

impl<Client, Pool, Tasks, Builder> PayloadJobGenerator
for BlockPayloadJobGenerator<Client, Pool, Tasks, Builder>
impl<Client, Pool, Builder> PayloadJobGenerator for BlockPayloadJobGenerator<Client, Pool, Builder>
where
Client: StateProviderFactory
+ BlockReaderIdExt<Header = alloy_consensus::Header>
+ Clone
+ Unpin
+ 'static,
Pool: TransactionPool + Unpin + 'static,
Tasks: TaskSpawner + Clone + Unpin + 'static,
Builder: PayloadBuilder<Pool, Client> + Unpin + 'static,
<Builder as PayloadBuilder<Pool, Client>>::Attributes: Unpin + Clone,
<Builder as PayloadBuilder<Pool, Client>>::BuiltPayload: Unpin + Clone,
{
type Job = BlockPayloadJob<Client, Pool, Tasks, Builder>;
type Job = BlockPayloadJob<Client, Pool, Builder>;

/// This is invoked when the node receives payload attributes from the beacon node via
/// `engine_forkchoiceUpdatedV1`
Expand Down Expand Up @@ -243,7 +236,6 @@ where
let mut job = BlockPayloadJob {
client: self.client.clone(),
pool: self.pool.clone(),
executor: self.executor.clone(),
builder: self.builder.clone(),
config,
cell: BlockCell::new(),
Expand All @@ -264,7 +256,7 @@ use std::{
};

/// A [PayloadJob] that builds empty blocks.
pub struct BlockPayloadJob<Client, Pool, Tasks, Builder>
pub struct BlockPayloadJob<Client, Pool, Builder>
where
Builder: PayloadBuilder<Pool, Client>,
{
Expand All @@ -274,8 +266,6 @@ where
pub(crate) client: Client,
/// The transaction pool.
pub(crate) pool: Pool,
/// How to spawn building tasks
pub(crate) executor: Tasks,
/// The type responsible for building payloads.
///
/// See [PayloadBuilder]
Expand All @@ -288,11 +278,10 @@ where
pub(crate) build_complete: Option<oneshot::Receiver<Result<(), PayloadBuilderError>>>,
}

impl<Client, Pool, Tasks, Builder> PayloadJob for BlockPayloadJob<Client, Pool, Tasks, Builder>
impl<Client, Pool, Builder> PayloadJob for BlockPayloadJob<Client, Pool, Builder>
where
Client: StateProviderFactory + Clone + Unpin + 'static,
Pool: TransactionPool + Unpin + 'static,
Tasks: TaskSpawner + Clone + 'static,
Builder: PayloadBuilder<Pool, Client> + Unpin + 'static,
<Builder as PayloadBuilder<Pool, Client>>::Attributes: Unpin + Clone,
<Builder as PayloadBuilder<Pool, Client>>::BuiltPayload: Unpin + Clone,
Expand Down Expand Up @@ -339,11 +328,10 @@ pub struct BuildArguments<Pool, Client, Attributes> {
}

/// A [PayloadJob] is a future that's being polled by the `PayloadBuilderService`
impl<Client, Pool, Tasks, Builder> BlockPayloadJob<Client, Pool, Tasks, Builder>
impl<Client, Pool, Builder> BlockPayloadJob<Client, Pool, Builder>
where
Client: StateProviderFactory + Clone + Unpin + 'static,
Pool: TransactionPool + Unpin + 'static,
Tasks: TaskSpawner + Clone + 'static,
Builder: PayloadBuilder<Pool, Client> + Unpin + 'static,
<Builder as PayloadBuilder<Pool, Client>>::Attributes: Unpin + Clone,
<Builder as PayloadBuilder<Pool, Client>>::BuiltPayload: Unpin + Clone,
Expand All @@ -359,27 +347,28 @@ where
let (tx, rx) = oneshot::channel();
self.build_complete = Some(rx);

self.executor.spawn_blocking(Box::pin(async move {
let args = BuildArguments {
client,
pool,
cached_reads: Default::default(),
config: payload_config,
cancel,
};

let result = builder.try_build(args, cell);
let _ = tx.send(result);
}));
tokio::task::spawn_blocking(move || {
tokio::runtime::Handle::current().block_on(async move {
let args = BuildArguments {
client,
pool,
cached_reads: Default::default(),
config: payload_config,
cancel,
};

let result = builder.try_build(args, cell);
let _ = tx.send(result);
})
});
}
}

/// A [PayloadJob] is a a future that's being polled by the `PayloadBuilderService`
impl<Client, Pool, Tasks, Builder> Future for BlockPayloadJob<Client, Pool, Tasks, Builder>
impl<Client, Pool, Builder> Future for BlockPayloadJob<Client, Pool, Builder>
where
Client: StateProviderFactory + Clone + Unpin + 'static,
Pool: TransactionPool + Unpin + 'static,
Tasks: TaskSpawner + Clone + 'static,
Builder: PayloadBuilder<Pool, Client> + Unpin + 'static,
<Builder as PayloadBuilder<Pool, Client>>::Attributes: Unpin + Clone,
<Builder as PayloadBuilder<Pool, Client>>::BuiltPayload: Unpin + Clone,
Expand Down Expand Up @@ -718,7 +707,6 @@ mod tests {
let generator = BlockPayloadJobGenerator::with_builder(
client.clone(),
pool,
executor,
config,
builder.clone(),
false,
Expand Down