diff --git a/Cargo.lock b/Cargo.lock index e82fd14..7a35237 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3413,6 +3413,20 @@ dependencies = [ "thiserror", ] +[[package]] +name = "wallet" +version = "0.1.0" +dependencies = [ + "balius-sdk", + "hex", + "pallas-codec 0.30.2 (registry+https://github.com/rust-lang/crates.io-index)", + "pallas-primitives 0.30.2 (registry+https://github.com/rust-lang/crates.io-index)", + "pallas-traverse 0.30.2 (registry+https://github.com/rust-lang/crates.io-index)", + "serde", + "serde_json", + "serde_with", +] + [[package]] name = "want" version = "0.3.1" diff --git a/Cargo.toml b/Cargo.toml index 7db1ba5..e24de40 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,9 @@ members = [ # "examples/sundae-stop-loss-strategy", # "examples/ticket-vending-machine", "examples/minter/offchain", + "examples/wallet/offchain", "balius-sdk", "balius-runtime", - "balius-macros", "baliusd", + "balius-macros", + "baliusd", ] diff --git a/balius-runtime/src/drivers/chainsync.rs b/balius-runtime/src/drivers/chainsync.rs new file mode 100644 index 0000000..aee5737 --- /dev/null +++ b/balius-runtime/src/drivers/chainsync.rs @@ -0,0 +1,70 @@ +use pallas::codec::minicbor::decode::info; +use serde::{Deserialize, Serialize}; +use tokio::select; +use tokio_util::sync::CancellationToken; +use tracing::{info, warn}; +use utxorpc::CardanoSyncClient; + +use crate::{ChainPoint, Error, Runtime}; + +impl From for utxorpc::spec::sync::BlockRef { + fn from(point: ChainPoint) -> Self { + utxorpc::spec::sync::BlockRef { + index: point.0, + hash: point.1.to_vec().into(), + } + } +} + +#[derive(Clone, Serialize, Deserialize, Debug)] +pub struct Config { + pub endpoint_url: String, + pub api_key: String, +} + +pub async fn run(config: Config, runtime: Runtime, cancel: CancellationToken) -> Result<(), Error> { + let mut sync = utxorpc::ClientBuilder::new() + .uri(&config.endpoint_url)? + .metadata("dmtr-api-key", config.api_key)? + .build::() + .await; + + let cursor = runtime + .chain_cursor() + .await? + .map(Into::into) + .into_iter() + .collect(); + + // TODO: handle disconnections and retry logic + + let mut tip = sync.follow_tip(cursor).await?; + + info!("starting follow-tip loop"); + + loop { + select! { + _ = cancel.cancelled() => { + warn!("chainsync driver cancelled"); + break Ok(()) + }, + event = tip.event() => { + match event { + Ok(utxorpc::TipEvent::Apply(block)) => { + let block = pallas::ledger::traverse::MultiEraBlock::decode(&block.native).unwrap(); + runtime.apply_block(&block).await?; + } + Ok(utxorpc::TipEvent::Undo(block)) => { + let block = pallas::ledger::traverse::MultiEraBlock::decode(&block.native).unwrap(); + runtime.undo_block(&block).await?; + } + Ok(utxorpc::TipEvent::Reset(point)) => { + warn!(slot=point.index, "TODO: handle reset"); + continue; + }, + Err(_) => todo!(), + } + } + } + } +} diff --git a/balius-runtime/src/drivers/jsonrpc.rs b/balius-runtime/src/drivers/jsonrpc.rs index bceff71..f99bbfd 100644 --- a/balius-runtime/src/drivers/jsonrpc.rs +++ b/balius-runtime/src/drivers/jsonrpc.rs @@ -1,10 +1,23 @@ +//! Driver to serve JSON-RPC requests. +//! +//! This driver implements an HTTP server that listens for JSON-RPC requests +//! and funnels them into the Runtime. The path of the request is used as the +//! key to identify the worker that should handle the request. The JSON-RPC +//! method field is used as the key to identify the particular Balius request +//! for the worker. JSON-RPC params are mapped directly into Balius request +//! params. +//! +//! The JSON-RPC server is implemented as a Warp application and adheres to +//! the JSON-RPC 2.0 spec. + use serde::{Deserialize, Serialize}; +use serde_json::json; use std::net::SocketAddr; use tokio_util::sync::CancellationToken; use tracing::{debug, error}; use warp::Filter as _; -use crate::{Error, Runtime}; +use crate::{wit, Error, Runtime}; #[derive(Serialize, Deserialize, Clone, Debug)] pub struct Config { @@ -32,6 +45,51 @@ fn parse_request(body: serde_json::Value) -> Result { } } +pub async fn handle_request( + runtime: Runtime, + worker: String, + body: serde_json::Value, +) -> warp::reply::Json { + let request = match parse_request(body) { + Ok(x) => x, + Err(err) => return warp::reply::json(&err), + }; + + debug!( + worker, + id = request.id, + method = request.method, + "handling request" + ); + + let params = serde_json::to_vec(&request.params).unwrap(); + + let reply = runtime + .handle_request(&worker, &request.method, params) + .await; + + match reply { + Ok(x) => { + debug!(worker, id = request.id, "request successful"); + + let x = match x { + wit::Response::Acknowledge => json!({}), + wit::Response::Json(x) => serde_json::from_slice(&x).unwrap(), + wit::Response::Cbor(x) => json!({ "cbor": x }), + wit::Response::PartialTx(x) => json!({ "tx": x }), + }; + + warp::reply::json(&x) + } + Err(err) => { + error!(worker, id = request.id, "request failed"); + warp::reply::json(&ErrorResponse { + error: err.to_string(), + }) + } + } +} + pub async fn serve( config: Config, runtime: Runtime, @@ -42,38 +100,7 @@ pub async fn serve( .and(warp::path::param()) .and(warp::post()) .and(warp::body::json()) - .then( - |runtime: Runtime, worker: String, body: serde_json::Value| async move { - let request = match parse_request(body) { - Ok(x) => x, - Err(err) => return warp::reply::json(&err), - }; - - debug!( - worker, - id = request.id, - method = request.method, - "handling request" - ); - - let reply = runtime - .handle_request(&worker, &request.method, request.params) - .await; - - match reply { - Ok(x) => { - debug!(worker, id = request.id, "request successful"); - warp::reply::json(&x) - } - Err(err) => { - error!(worker, id = request.id, "request failed"); - warp::reply::json(&ErrorResponse { - error: err.to_string(), - }) - } - } - }, - ); + .then(handle_request); let address: SocketAddr = config .listen_address diff --git a/balius-runtime/src/drivers/mod.rs b/balius-runtime/src/drivers/mod.rs index ebba793..0c66415 100644 --- a/balius-runtime/src/drivers/mod.rs +++ b/balius-runtime/src/drivers/mod.rs @@ -1 +1,11 @@ +//! Control-flow components on top of the Runtime. +//! +//! Drivers are responsible for implementing the control-flows that +//! handle external interactions (e.g. handling requests, syncing from the +//! blockchain, etc) and funnels them into the Runtime. +//! +//! Each of these drivers has a way to trigger a forever-loop that should be +//! spawn as an independent tokio task running on the background. + +pub mod chainsync; pub mod jsonrpc; diff --git a/balius-runtime/src/lib.rs b/balius-runtime/src/lib.rs index 55b5e65..378cb9b 100644 --- a/balius-runtime/src/lib.rs +++ b/balius-runtime/src/lib.rs @@ -6,8 +6,11 @@ use std::{ path::Path, sync::Arc, }; +use store::AtomicUpdate; use thiserror::Error; use tokio::sync::Mutex; +use tracing::{debug, info, warn}; +use utxorpc::ChainBlock; mod wit { wasmtime::component::bindgen!({ @@ -29,6 +32,8 @@ pub mod submit; pub use store::Store; pub type WorkerId = String; +// pub type Block = utxorpc::ChainBlock; +pub type Block<'a> = pallas::ledger::traverse::MultiEraBlock<'a>; #[derive(Error, Debug)] pub enum Error { @@ -138,6 +143,7 @@ impl wit::balius::app::driver::Host for WorkerState { struct LoadedWorker { wasm_store: wasmtime::Store, instance: wit::Worker, + cursor: Option, } impl LoadedWorker { @@ -171,11 +177,7 @@ impl LoadedWorker { Ok(()) } - async fn apply_block( - &mut self, - block: &MultiEraBlock<'_>, - log_seq: LogSeq, - ) -> Result<(), Error> { + async fn apply_block(&mut self, block: &Block<'_>) -> Result<(), Error> { for tx in block.txs() { for (_, utxo) in tx.produces() { let event = wit::Event::Utxo(utxo.encode()); @@ -210,9 +212,22 @@ impl Runtime { RuntimeBuilder::new(store) } - pub fn chain_cursor(&self) -> Result, Error> { - // TODO: iterate over all workers and find the lowest cursor - todo!() + pub async fn chain_cursor(&self) -> Result, Error> { + let lowest_seq = self + .loaded + .lock() + .await + .values() + .map(|w| w.cursor) + .flatten() + .min(); + + if let Some(seq) = lowest_seq { + //TODO: map seq to chain point by searching the wal + warn!(seq, "TODO: map seq to chain point by searching the wal"); + } + + Ok(None) } pub async fn register_worker( @@ -240,27 +255,33 @@ impl Runtime { let config = serde_json::to_vec(&config).unwrap(); instance.call_init(&mut wasm_store, &config).await?; + let cursor = self.store.get_worker_cursor(id)?; + debug!(cursor, id, "found cursor for worker"); + self.loaded.lock().await.insert( id.to_owned(), LoadedWorker { wasm_store, instance, + cursor, }, ); Ok(()) } - pub async fn apply_block(&self, block: &MultiEraBlock<'_>) -> Result<(), Error> { + pub async fn apply_block(&self, block: &Block<'_>) -> Result<(), Error> { + info!(slot = block.slot(), "applying block"); + let log_seq = self.store.write_ahead(block)?; let mut lock = self.loaded.lock().await; - let mut atomic_update = self.store.start_atomic_update()?; + let mut atomic_update = self.store.start_atomic_update(log_seq)?; for (_, worker) in lock.iter_mut() { - worker.apply_block(block, log_seq).await?; - atomic_update.set_worker_cursor(&worker.wasm_store.data().worker_id, log_seq)?; + worker.apply_block(block).await?; + atomic_update.update_worker_cursor(&worker.wasm_store.data().worker_id)?; } atomic_update.commit()?; @@ -269,7 +290,7 @@ impl Runtime { } // TODO: implement undo once we have "apply" working - pub async fn undo_block(&self, block: &MultiEraBlock<'_>) -> Result<(), Error> { + pub async fn undo_block(&self, block: &Block<'_>) -> Result<(), Error> { Ok(()) } diff --git a/balius-runtime/src/router.rs b/balius-runtime/src/router.rs index c31ce9b..4a935de 100644 --- a/balius-runtime/src/router.rs +++ b/balius-runtime/src/router.rs @@ -5,7 +5,7 @@ use std::{ use pallas::ledger::traverse::MultiEraOutput; -use crate::wit::balius::app::driver::{Event, EventPattern}; +use crate::wit::balius::app::driver::{Event, EventPattern, UtxoPattern}; type WorkerId = String; type ChannelId = u32; @@ -15,13 +15,18 @@ type AddressBytes = Vec; #[derive(Clone, Debug, PartialEq, Eq, Hash)] enum MatchKey { RequestMethod(Method), + EveryUtxo, UtxoAddress(AddressBytes), } fn infer_match_keys(pattern: &EventPattern) -> Vec { match pattern { EventPattern::Request(x) => vec![MatchKey::RequestMethod(x.to_owned())], - EventPattern::Utxo(_) => todo!(), + EventPattern::Utxo(UtxoPattern { address, token }) => match (address, token) { + (None, None) => vec![MatchKey::EveryUtxo], + (Some(address), None) => vec![MatchKey::UtxoAddress(address.to_vec())], + _ => todo!(), + }, EventPattern::UtxoUndo(_) => todo!(), EventPattern::Timer(_) => todo!(), EventPattern::Message(_) => todo!(), diff --git a/balius-runtime/src/store.rs b/balius-runtime/src/store.rs index db1b8f3..40c547f 100644 --- a/balius-runtime/src/store.rs +++ b/balius-runtime/src/store.rs @@ -1,7 +1,6 @@ use std::{path::Path, sync::Arc}; use itertools::Itertools; -use pallas::ledger::traverse::MultiEraBlock; use redb::{ReadableTable as _, TableDefinition, WriteTransaction}; use tracing::warn; @@ -9,6 +8,8 @@ use crate::Error; pub type WorkerId = String; pub type LogSeq = u64; +// pub type Block = utxorpc::ChainBlock; +pub type Block<'a> = pallas::ledger::traverse::MultiEraBlock<'a>; const CURSORS: TableDefinition = TableDefinition::new("cursors"); @@ -16,12 +17,13 @@ const DEFAULT_CACHE_SIZE_MB: usize = 50; pub struct AtomicUpdate { wx: WriteTransaction, + log_seq: LogSeq, } impl AtomicUpdate { - pub fn set_worker_cursor(&mut self, id: &str, cursor: LogSeq) -> Result<(), super::Error> { + pub fn update_worker_cursor(&mut self, id: &str) -> Result<(), super::Error> { let mut table = self.wx.open_table(CURSORS)?; - table.insert(id.to_owned(), cursor)?; + table.insert(id.to_owned(), self.log_seq)?; Ok(()) } @@ -53,7 +55,7 @@ impl Store { Ok(out) } - pub fn write_ahead(&self, block: &MultiEraBlock<'_>) -> Result { + pub fn write_ahead(&self, block: &Block<'_>) -> Result { // TODO: write event to WAL table and return log sequence Ok(0) } @@ -72,9 +74,9 @@ impl Store { Ok(cursor.map(|x| x.value())) } - pub fn start_atomic_update(&self) -> Result { + pub fn start_atomic_update(&self, log_seq: LogSeq) -> Result { let wx = self.db.begin_write()?; - Ok(AtomicUpdate { wx }) + Ok(AtomicUpdate { wx, log_seq }) } // TODO: I don't think we need this since we're going to load each cursor as diff --git a/balius-runtime/tests/u5c-chainsync.rs b/balius-runtime/tests/u5c-chainsync.rs new file mode 100644 index 0000000..a1b0548 --- /dev/null +++ b/balius-runtime/tests/u5c-chainsync.rs @@ -0,0 +1,31 @@ +#![cfg(test)] +#![cfg(feature = "utxorpc")] + +use balius_runtime::{drivers, Runtime, Store}; +use serde_json::json; +use tokio_util::sync::CancellationToken; + +#[tokio::test] +async fn wallet_balance() { + let store = Store::open("tests/balius.db", None).unwrap(); + + let mut runtime = Runtime::builder(store).build().unwrap(); + + let config = json!({ + "address": "addr1qx2fxv2umyhttkxyxp8x0dlpdt3k6cwng5pxj3jhsydzer3n0d3vllmyqwsx5wktcd8cc3sq835lu7drv2xwl2wywfgse35a3x" + }); + + runtime + .register_worker("wallet", "tests/wallet.wasm", config) + .await + .unwrap(); + + let chainsync_config = drivers::chainsync::Config { + endpoint_url: "https://mainnet.utxorpc-v0.demeter.run".to_string(), + api_key: "dmtr_utxorpc1wgnnj0qcfj32zxsz2uc8d4g7uclm2s2w".to_string(), + }; + + drivers::chainsync::run(chainsync_config, runtime, CancellationToken::new()) + .await + .unwrap(); +} diff --git a/balius-runtime/tests/u5c.rs b/balius-runtime/tests/u5c-ledger.rs similarity index 100% rename from balius-runtime/tests/u5c.rs rename to balius-runtime/tests/u5c-ledger.rs diff --git a/balius-runtime/tests/wallet.wasm b/balius-runtime/tests/wallet.wasm new file mode 100644 index 0000000..3b99186 Binary files /dev/null and b/balius-runtime/tests/wallet.wasm differ diff --git a/balius-sdk/src/qol.rs b/balius-sdk/src/qol.rs index a2e4a16..c495514 100644 --- a/balius-sdk/src/qol.rs +++ b/balius-sdk/src/qol.rs @@ -1,13 +1,23 @@ use std::marker::PhantomData; +use thiserror::Error; + use crate::_internal::Handler; use crate::wit; -#[derive(Debug)] +#[derive(Debug, Error)] pub enum Error { + #[error("internal error: {0}")] Internal(String), + #[error("bad config")] BadConfig, + #[error("bad params")] BadParams, + #[error("bad utxo")] + BadUtxo, + #[error("event mismatch, expected {0}")] + EventMismatch(String), + #[error("ledger error: {0}")] Ledger(wit::balius::app::ledger::LedgerError), } @@ -30,6 +40,14 @@ impl From for wit::HandleError { code: 4, message: err.to_string(), }, + Error::BadUtxo => wit::HandleError { + code: 5, + message: "bad utxo".to_owned(), + }, + Error::EventMismatch(x) => wit::HandleError { + code: 6, + message: format!("event mismatch, expected {}", x), + }, } } } @@ -99,6 +117,16 @@ where } } +pub struct Ack; + +impl TryFrom for wit::Response { + type Error = Error; + + fn try_from(_: Ack) -> Result { + Ok(wit::Response::Acknowledge) + } +} + pub struct Config(pub T); impl TryFrom for Config @@ -179,6 +207,30 @@ impl std::ops::Deref for Json { } } +pub struct Utxo { + pub utxo: pallas_traverse::MultiEraOutput<'static>, + pub datum: Option, +} + +impl TryFrom for Utxo { + type Error = Error; + + fn try_from(value: wit::Event) -> Result { + let bytes = match value { + wit::Event::Utxo(x) => x, + _ => return Err(Error::EventMismatch("utxo".to_owned())), + }; + + // TODO: remove this once we have a way to keep the bytes around + let bytes: &'static [u8] = bytes.leak(); + + let utxo = pallas_traverse::MultiEraOutput::decode(pallas_traverse::Era::Conway, bytes) + .map_err(|_| Self::Error::BadUtxo)?; + + Ok(Utxo { utxo, datum: None }) + } +} + pub struct NewTx(pub Box); impl TryInto for NewTx { @@ -216,4 +268,20 @@ impl crate::_internal::Worker { self } + + pub fn with_utxo_handler( + mut self, + pattern: wit::balius::app::driver::UtxoPattern, + handler: impl Handler + Send + Sync + 'static, + ) -> Self { + self.channels.insert( + self.channels.len() as u32, + crate::_internal::Channel { + handler: Box::new(handler), + pattern: wit::balius::app::driver::EventPattern::Utxo(pattern), + }, + ); + + self + } } diff --git a/baliusd/example/baliusd.toml b/baliusd/example/baliusd.toml index cb44f75..8797dae 100644 --- a/baliusd/example/baliusd.toml +++ b/baliusd/example/baliusd.toml @@ -9,7 +9,16 @@ include_tokio = true endpoint_url = "https://mainnet.utxorpc-v0.demeter.run" api_key = "dmtr_utxorpc1wgnnj0qcfj32zxsz2uc8d4g7uclm2s2w" +[chainsync] +endpoint_url = "https://mainnet.utxorpc-v0.demeter.run" +api_key = "dmtr_utxorpc1wgnnj0qcfj32zxsz2uc8d4g7uclm2s2w" + [[workers]] name = "faucet" module = "faucet.wasm" config = "faucet.json" + +[[workers]] +name = "wallet" +module = "wallet.wasm" +config = "wallet.json" diff --git a/baliusd/example/wallet.json b/baliusd/example/wallet.json new file mode 100644 index 0000000..8651f24 --- /dev/null +++ b/baliusd/example/wallet.json @@ -0,0 +1,3 @@ +{ + "address": "addr1qx2fxv2umyhttkxyxp8x0dlpdt3k6cwng5pxj3jhsydzer3n0d3vllmyqwsx5wktcd8cc3sq835lu7drv2xwl2wywfgse35a3x" +} diff --git a/baliusd/example/wallet.wasm b/baliusd/example/wallet.wasm new file mode 100644 index 0000000..3b99186 Binary files /dev/null and b/baliusd/example/wallet.wasm differ diff --git a/baliusd/src/main.rs b/baliusd/src/main.rs index 8bc3373..51f3e04 100644 --- a/baliusd/src/main.rs +++ b/baliusd/src/main.rs @@ -5,6 +5,7 @@ use miette::{Context as _, IntoDiagnostic as _}; use serde::{Deserialize, Serialize}; use serde_json::json; use serde_with::{serde_as, DisplayFromStr}; +use tokio_util::sync::CancellationToken; use tracing::info; mod boilerplate; @@ -32,6 +33,7 @@ pub struct WorkerConfig { pub struct Config { pub rpc: drivers::jsonrpc::Config, pub ledger: ledgers::u5c::Config, + pub chainsync: drivers::chainsync::Config, pub workers: Vec, pub logging: LoggingConfig, } @@ -88,10 +90,19 @@ async fn main() -> miette::Result<()> { let cancel = boilerplate::hook_exit_token(); - balius_runtime::drivers::jsonrpc::serve(config.rpc, runtime, cancel) - .await - .into_diagnostic() - .context("serving json-rpc requests")?; + let jsonrpc_server = tokio::spawn(balius_runtime::drivers::jsonrpc::serve( + config.rpc, + runtime.clone(), + cancel.clone(), + )); + + let chainsync_driver = tokio::spawn(drivers::chainsync::run( + config.chainsync, + runtime.clone(), + cancel.clone(), + )); + + let tasks = tokio::try_join!(jsonrpc_server, chainsync_driver); Ok(()) } diff --git a/examples/wallet/offchain/Cargo.toml b/examples/wallet/offchain/Cargo.toml new file mode 100644 index 0000000..cc0f951 --- /dev/null +++ b/examples/wallet/offchain/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "wallet" +version = "0.1.0" +edition = "2021" + +[dependencies] +balius-sdk = { path = "../../../balius-sdk" } +serde = { version = "1.0.204", features = ["derive"] } +serde_with = "3.9.0" + +[lib] +crate-type = ["cdylib"] + +[dev-dependencies] +hex = "0.4.3" +serde_json = "1.0.128" +pallas-traverse = "0.30.2" +pallas-codec = "0.30.2" +pallas-primitives = "0.30.2" diff --git a/examples/wallet/offchain/src/lib.rs b/examples/wallet/offchain/src/lib.rs new file mode 100644 index 0000000..be6952d --- /dev/null +++ b/examples/wallet/offchain/src/lib.rs @@ -0,0 +1,64 @@ +use balius_sdk::{Ack, WorkerResult}; +use balius_sdk::{Config, FnHandler, Params, Utxo, Worker}; +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize, Clone)] +struct WalletConfig { + address: String, +} + +struct BalanceRequest {} + +#[derive(Serialize, Deserialize, Clone)] +struct Datum {} + +fn handle_utxo(config: Config, utxo: Utxo) -> WorkerResult { + Ok(Ack) +} + +#[balius_sdk::main] +fn main() -> Worker { + Worker::new().with_utxo_handler( + balius_sdk::wit::balius::app::driver::UtxoPattern { + address: None, + token: None, + }, + FnHandler::from(handle_utxo), + ) +} + +#[cfg(test)] +mod tests { + use super::*; + + use balius_sdk::txbuilder::{primitives, Address, Hash, UtxoSet}; + + use std::{collections::HashMap, str::FromStr as _}; + + #[test] + fn test_happy_path() { + let output = primitives::MintedTransactionOutput::PostAlonzo(primitives::MintedPostAlonzoTransactionOutput { + address: Address::from_bech32("addr1qx2fxv2umyhttkxyxp8x0dlpdt3k6cwng5pxj3jhsydzer3n0d3vllmyqwsx5wktcd8cc3sq835lu7drv2xwl2wywfgse35a3x").unwrap().to_vec().into(), + value: primitives::Value::Coin(5_000_000), + datum_option: None, + script_ref: None, + }); + + let cbor = pallas_codec::minicbor::to_vec(&output).unwrap(); + + let test_utxos: HashMap<_, _> = vec![( + "f7d3837715680f3a170e99cd202b726842d97f82c05af8fcd18053c64e33ec4f#0" + .parse() + .unwrap(), + cbor, + )] + .into_iter() + .collect(); + + let config = WalletConfig { + address: "addr1qx2fxv2umyhttkxyxp8x0dlpdt3k6cwng5pxj3jhsydzer3n0d3vllmyqwsx5wktcd8cc3sq835lu7drv2xwl2wywfgse35a3x".into(), + }; + + handle_utxo(config, utxo).unwrap(); + } +}