Skip to content

Commit

Permalink
feat: improved monitor adtaper interface and mock
Browse files Browse the repository at this point in the history
  • Loading branch information
paulobressan committed Jan 29, 2025
1 parent a43cc5e commit b1d2064
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 17 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,4 @@ tracing = "0.1.41"
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
futures = "0.3.31"
serde_json = "1.0.138"
async-stream = "0.3.6"
4 changes: 2 additions & 2 deletions src/pipeline/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{path::Path, sync::Arc};
use std::sync::Arc;

use anyhow::Result;
use monitor::file::FileMonitorAdapter;
Expand All @@ -13,7 +13,7 @@ pub async fn run(config: Config, tx_storage: Arc<SqliteTransaction>) -> Result<(
let ingest = ingest::Stage::new(tx_storage.clone());
let fanout = fanout::Stage::new(tx_storage.clone(), config.peer_manager);

let adapter = FileMonitorAdapter::new(Path::new("test/blocks"))?;
let adapter = FileMonitorAdapter::try_new()?;
let monitor = monitor::Stage::new(Box::new(adapter));

let policy: gasket::runtime::Policy = Default::default();
Expand Down
23 changes: 15 additions & 8 deletions src/pipeline/monitor/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,37 @@ use std::{
io::{self, BufRead},
path::Path,
pin::Pin,
time::Duration,
};

use futures::stream;
use async_stream::stream;
use tokio::time::sleep;

use super::{Event, MonitorAdapter};

pub struct FileMonitorAdapter {
values: Vec<String>,
}
impl FileMonitorAdapter {
pub fn new(file_path: &Path) -> anyhow::Result<Self> {
let file = fs::File::open(file_path)?;
pub fn try_new() -> anyhow::Result<Self> {
let file = fs::File::open(Path::new("test/blocks"))?;
let reader = io::BufReader::new(file);
let values = reader.lines().collect::<io::Result<Vec<String>>>()?;

Ok(Self { values })
}
}
impl MonitorAdapter for FileMonitorAdapter {
fn stream(&mut self) -> Pin<Box<dyn futures::Stream<Item = anyhow::Result<Event>>>> {
let stream = stream::iter(self.values.clone().into_iter().map(|v| {
let bytes = hex::decode(&v)?;
Ok(Event::RollForward(bytes))
}));
fn stream(&self) -> Pin<Box<dyn futures::Stream<Item = anyhow::Result<Event>>>> {
let values = self.values.clone();

let stream = stream! {
for item in values.into_iter() {
let bytes = hex::decode(&item)?;
yield Ok(Event::RollForward(bytes));
sleep(Duration::from_secs(20)).await;
}
};

Box::pin(stream)
}
Expand Down
16 changes: 9 additions & 7 deletions src/pipeline/monitor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub enum Event {
}

pub trait MonitorAdapter {
fn stream(&mut self) -> Pin<Box<dyn Stream<Item = anyhow::Result<Event>>>>;
fn stream(&self) -> Pin<Box<dyn Stream<Item = anyhow::Result<Event>>>>;
}

#[derive(Stage)]
Expand All @@ -26,17 +26,19 @@ impl Stage {
}
}

pub struct Worker;
pub struct Worker {
stream: Pin<Box<dyn Stream<Item = anyhow::Result<Event>>>>,
}

#[async_trait::async_trait(?Send)]
impl gasket::framework::Worker<Stage> for Worker {
async fn bootstrap(_stage: &Stage) -> Result<Self, WorkerError> {
Ok(Self)
async fn bootstrap(stage: &Stage) -> Result<Self, WorkerError> {
let stream = stage.adapter.stream();
Ok(Self { stream })
}

async fn schedule(&mut self, stage: &mut Stage) -> Result<WorkSchedule<Event>, WorkerError> {
info!("monitor waiting next event");
if let Some(e) = stage.adapter.stream().try_next().await.or_restart()? {
async fn schedule(&mut self, _stage: &mut Stage) -> Result<WorkSchedule<Event>, WorkerError> {
if let Some(e) = self.stream.try_next().await.or_restart()? {
return Ok(WorkSchedule::Unit(e));
}

Expand Down
3 changes: 3 additions & 0 deletions test/blocks

Large diffs are not rendered by default.

0 comments on commit b1d2064

Please sign in to comment.