Skip to content

Commit

Permalink
feat: implemented monitor adapter interface
Browse files Browse the repository at this point in the history
  • Loading branch information
paulobressan committed Jan 28, 2025
1 parent 44515f0 commit a43cc5e
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 23 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,5 @@ tonic = { version = "0.12.3", features = ["tls"] }
tonic-reflection = "0.12.3"
tracing = "0.1.41"
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
futures = "0.3.31"
serde_json = "1.0.138"
26 changes: 13 additions & 13 deletions src/pipeline/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::Arc;
use std::{path::Path, sync::Arc};

use anyhow::Result;
use monitor::file::FileMonitorAdapter;

use crate::{storage::sqlite::SqliteTransaction, Config};

Expand All @@ -9,21 +10,20 @@ pub mod ingest;
pub mod monitor;

pub async fn run(config: Config, tx_storage: Arc<SqliteTransaction>) -> Result<()> {
tokio::spawn(async move {
let ingest = ingest::Stage::new(tx_storage.clone());
let fanout = fanout::Stage::new(tx_storage.clone(), config.peer_manager);
let monitor = monitor::Stage {};
let ingest = ingest::Stage::new(tx_storage.clone());
let fanout = fanout::Stage::new(tx_storage.clone(), config.peer_manager);

let policy: gasket::runtime::Policy = Default::default();
let adapter = FileMonitorAdapter::new(Path::new("test/blocks"))?;
let monitor = monitor::Stage::new(Box::new(adapter));

let ingest = gasket::runtime::spawn_stage(ingest, policy.clone());
let fanout = gasket::runtime::spawn_stage(fanout, policy.clone());
let monitor = gasket::runtime::spawn_stage(monitor, policy.clone());
let policy: gasket::runtime::Policy = Default::default();

let daemon = gasket::daemon::Daemon::new(vec![ingest, fanout, monitor]);
daemon.block();
})
.await?;
let ingest = gasket::runtime::spawn_stage(ingest, policy.clone());
let fanout = gasket::runtime::spawn_stage(fanout, policy.clone());
let monitor = gasket::runtime::spawn_stage(monitor, policy.clone());

let daemon = gasket::daemon::Daemon::new(vec![ingest, fanout, monitor]);
daemon.block();

Ok(())
}
33 changes: 33 additions & 0 deletions src/pipeline/monitor/file.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use std::{
fs,
io::{self, BufRead},
path::Path,
pin::Pin,
};

use futures::stream;

use super::{Event, MonitorAdapter};

pub struct FileMonitorAdapter {
values: Vec<String>,
}
impl FileMonitorAdapter {
pub fn new(file_path: &Path) -> anyhow::Result<Self> {
let file = fs::File::open(file_path)?;
let reader = io::BufReader::new(file);
let values = reader.lines().collect::<io::Result<Vec<String>>>()?;

Ok(Self { values })
}
}
impl MonitorAdapter for FileMonitorAdapter {
fn stream(&mut self) -> Pin<Box<dyn futures::Stream<Item = anyhow::Result<Event>>>> {
let stream = stream::iter(self.values.clone().into_iter().map(|v| {
let bytes = hex::decode(&v)?;
Ok(Event::RollForward(bytes))
}));

Box::pin(stream)
}
}
43 changes: 33 additions & 10 deletions src/pipeline/monitor/mod.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,30 @@
use std::time::Duration;
use std::pin::Pin;

use futures::{Stream, TryStreamExt};
use gasket::framework::*;
use tokio::time::sleep;
use tracing::info;

pub struct Block;
pub mod file;

#[derive(Debug)]
pub enum Event {
RollForward(Vec<u8>),
}

pub trait MonitorAdapter {
fn stream(&mut self) -> Pin<Box<dyn Stream<Item = anyhow::Result<Event>>>>;
}

#[derive(Stage)]
#[stage(name = "monitor", unit = "Block", worker = "Worker")]
pub struct Stage {}
#[stage(name = "monitor", unit = "Event", worker = "Worker")]
pub struct Stage {
adapter: Box<dyn MonitorAdapter + Send>,
}
impl Stage {
pub fn new(adapter: Box<dyn MonitorAdapter + Send>) -> Self {
Self { adapter }
}
}

pub struct Worker;

Expand All @@ -18,15 +34,22 @@ impl gasket::framework::Worker<Stage> for Worker {
Ok(Self)
}

async fn schedule(&mut self, _stage: &mut Stage) -> Result<WorkSchedule<Block>, WorkerError> {
// TODO: fetch data from network
sleep(Duration::from_secs(30)).await;
Ok(WorkSchedule::Unit(Block {}))
async fn schedule(&mut self, stage: &mut Stage) -> Result<WorkSchedule<Event>, WorkerError> {
info!("monitor waiting next event");
if let Some(e) = stage.adapter.stream().try_next().await.or_restart()? {
return Ok(WorkSchedule::Unit(e));
}

Ok(WorkSchedule::Idle)
}

async fn execute(&mut self, _unit: &Block, _stage: &mut Stage) -> Result<(), WorkerError> {
async fn execute(&mut self, unit: &Event, _stage: &mut Stage) -> Result<(), WorkerError> {
info!("monitor");

match unit {
Event::RollForward(v) => info!("RollForward {}", hex::encode(v)),
};

Ok(())
}
}
Empty file added test/blocks
Empty file.

0 comments on commit a43cc5e

Please sign in to comment.