Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: grpc server with u5c submit service #5

Merged
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .github/workflows/clippy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,5 @@ jobs:
uses: actions/checkout@v3

- name: Clippy check lints
# run: cargo clippy -- -D warnings
run: cargo clippy
run: cargo clippy -- -D warnings

4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,6 @@ Cargo.lock
**/*.rs.bk

dev.db*
.env*
.env*

boros.config.toml
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@ anyhow = "1.0.95"
async-trait = "0.1.85"
chrono = "0.4.39"
config = { version = "0.15.4", features = ["toml"] }
dotenv = "0.15.0"
futures-core = "0.3.31"
gasket = { git = "https://github.com/construkts/gasket-rs.git", features = ["derive"] }
pallas = "0.32.0"
protoc-wkt = "1.0.0"
serde = { version = "1.0.217", features = ["derive"] }
sqlx = { version = "0.8.3", features = ["runtime-tokio-rustls", "sqlite", "chrono"] }
tokio = { version = "1.42.0", features = ["macros", "rt-multi-thread"] }
Expand Down
11 changes: 0 additions & 11 deletions boros.toml

This file was deleted.

22 changes: 22 additions & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Run boros binary

For Boros configuration, one option is to create a config file named `boros.toml` in the root where the binary is executing, or set the env `BOROS_CONFIG` with the path of the config file.

File example [config](config.toml)

## Submit Tx

With Boros running, it's possible to submit a request to boros using the command line below, but change the `YOUR_TX_HEX` for your hex tx.

> [!NOTE]
> Example command tested using Linux(Fedora 41)

Command
```sh
grpcurl -plaintext -d "{\"tx\": [{\"raw\": \"$(echo YOUR_TX_HEX | xxd -r -p | base64 | tr -d '\n')\"}]}" localhost:50052 utxorpc.v1alpha.submit.SubmitService.SubmitTx
```

Your command should be something like this
```sh
grpcurl -plaintext -d "{\"tx\": [{\"raw\": \"$(echo 84a300d9010281825820cdc219e7abe938a35ca074d4bd02d6ccc3c2fc25d1462af07b6c1e8f40933af200018282581d603f79e7eab3ab95c1f78824872ac6fd65f79d120868057f2bd19306f81a3b9aca0082581d603f79e7eab3ab95c1f78824872ac6fd65f79d120868057f2bd19306f81a77c0bd2f021a0002990da100d90102818258205d4b008e92a42846add4d060e49d7427700ced0ab8eb73e559acc14d228ca5475840f3f12cbfd551e5e51f9eb32fcf695c3a63ec3dfb7329108f45b441cafc7a706659d06238665327779e32415c91b6190e0cd00096aee41f6e405be59d69462708f5f6 | xxd -r -p | base64 | tr -d '\n')\"}]}" localhost:50052 utxorpc.v1alpha.submit.SubmitService.SubmitTx
```
2 changes: 2 additions & 0 deletions examples/client/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
/target/
Cargo.lock
10 changes: 10 additions & 0 deletions examples/client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[package]
name = "grcp_client"
version = "0.1.0"
edition = "2021"

[dependencies]
hex = "0.4.3"
pallas = "0.32.0"
tokio = { version = "1.43.0", features = ["macros", "rt-multi-thread"] }
tonic = "0.12.3"
24 changes: 24 additions & 0 deletions examples/client/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
use pallas::interop::utxorpc::spec::submit::{self, any_chain_tx::Type, AnyChainTx};
use tonic::transport::{Channel, Uri};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let uri: Uri = "http://[::1]:50052".parse()?;
let channel = Channel::builder(uri).connect().await?;

let mut client = submit::submit_service_client::SubmitServiceClient::new(channel);

let tx = "84a300d9010281825820cdc219e7abe938a35ca074d4bd02d6ccc3c2fc25d1462af07b6c1e8f40933af200018282581d603f79e7eab3ab95c1f78824872ac6fd65f79d120868057f2bd19306f81a3b9aca0082581d603f79e7eab3ab95c1f78824872ac6fd65f79d120868057f2bd19306f81a77c0bd2f021a0002990da100d90102818258205d4b008e92a42846add4d060e49d7427700ced0ab8eb73e559acc14d228ca5475840f3f12cbfd551e5e51f9eb32fcf695c3a63ec3dfb7329108f45b441cafc7a706659d06238665327779e32415c91b6190e0cd00096aee41f6e405be59d69462708f5f6";

let bytes = hex::decode(tx)?;

let request = tonic::Request::new(submit::SubmitTxRequest {
tx: vec![AnyChainTx {
r#type: Some(Type::Raw(bytes.into())),
}],
});

client.submit_tx(request).await?;

Ok(())
}
6 changes: 6 additions & 0 deletions examples/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[storage]
db_path = "dev.db"

[server]
addr="[::1]:50052"

21 changes: 17 additions & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::{env, error::Error};
use std::{env, error::Error, path, sync::Arc};

use anyhow::Result;
use dotenv::dotenv;
use serde::Deserialize;
use storage::sqlite::{SqliteStorage, SqliteTransaction};
use tokio::try_join;
use tracing::Level;
use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
Expand All @@ -12,6 +14,8 @@ mod storage;

#[tokio::main]
async fn main() -> Result<()> {
dotenv().ok();

let env_filter = EnvFilter::builder()
.with_default_directive(Level::INFO.into())
.with_env_var("RUST_LOG")
Expand All @@ -22,18 +26,27 @@ async fn main() -> Result<()> {
.with(env_filter)
.init();

let _config = Config::new().expect("invalid config file");
let config = Config::new().expect("invalid config file");

let storage = SqliteStorage::new(path::Path::new(&config.storage.db_path)).await?;
storage.migrate().await?;

let tx_storage = Arc::new(SqliteTransaction::new(storage));

let pipeline = pipeline::run();
let server = server::run();
let server = server::run(config.server, tx_storage.clone());

try_join!(pipeline, server)?;

Ok(())
}

#[derive(Deserialize)]
struct Config {}
struct Config {
server: server::Config,
storage: storage::Config,
}

impl Config {
pub fn new() -> Result<Self, Box<dyn Error>> {
let config = config::Config::builder()
Expand Down
43 changes: 41 additions & 2 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,46 @@
use std::{net::SocketAddr, sync::Arc};

use anyhow::Result;
use pallas::interop::utxorpc::spec as u5c;
use serde::Deserialize;
use tonic::transport::Server;
use tracing::{error, info};

use crate::storage::sqlite::SqliteTransaction;

mod utxorpc;

pub async fn run(config: Config, tx_storage: Arc<SqliteTransaction>) -> Result<()> {
tokio::spawn(async move {
let reflection = tonic_reflection::server::Builder::configure()
.register_encoded_file_descriptor_set(u5c::submit::FILE_DESCRIPTOR_SET)
.register_encoded_file_descriptor_set(u5c::cardano::FILE_DESCRIPTOR_SET)
.register_encoded_file_descriptor_set(protoc_wkt::google::protobuf::FILE_DESCRIPTOR_SET)
.build_v1alpha()
.unwrap();

let submit_service = utxorpc::SubmitServiceImpl::new(tx_storage);
let submit_service =
u5c::submit::submit_service_server::SubmitServiceServer::new(submit_service);

pub mod utxorpc;
info!(address = config.addr.to_string(), "GRPC server running");

let result = Server::builder()
.add_service(reflection)
.add_service(submit_service)
.serve(config.addr)
.await;

if let Err(error) = result {
error!(?error);
std::process::exit(1);
}
});

pub async fn run() -> Result<()> {
Ok(())
}

#[derive(Deserialize)]
pub struct Config {
pub addr: SocketAddr,
}
92 changes: 92 additions & 0 deletions src/server/utxorpc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
use std::{pin::Pin, sync::Arc};

use futures_core::Stream;
use pallas::{
interop::utxorpc::spec::submit::{WaitForTxResponse, *},
ledger::traverse::MultiEraTx,
};
use tonic::{Request, Response, Status};
use tracing::error;

use crate::storage::{sqlite::SqliteTransaction, Transaction};

pub struct SubmitServiceImpl {
tx_storage: Arc<SqliteTransaction>,
}

impl SubmitServiceImpl {
pub fn new(tx_storage: Arc<SqliteTransaction>) -> Self {
Self { tx_storage }
}
}

#[async_trait::async_trait]
impl submit_service_server::SubmitService for SubmitServiceImpl {
type WaitForTxStream =
Pin<Box<dyn Stream<Item = Result<WaitForTxResponse, tonic::Status>> + Send + 'static>>;

type WatchMempoolStream =
Pin<Box<dyn Stream<Item = Result<WatchMempoolResponse, tonic::Status>> + Send + 'static>>;

async fn submit_tx(
&self,
request: Request<SubmitTxRequest>,
) -> Result<Response<SubmitTxResponse>, Status> {
let message = request.into_inner();

// TODO: validate a better structure to have this code.

let mut txs: Vec<Transaction> = Vec::default();
let mut hashes = vec![];

for (idx, tx_bytes) in message.tx.into_iter().flat_map(|x| x.r#type).enumerate() {
match tx_bytes {
any_chain_tx::Type::Raw(bytes) => {
let tx = MultiEraTx::decode(&bytes).map_err(|error| {
error!(?error);
Status::failed_precondition(format!("invalid tx at index {idx}"))
})?;
let hash = tx.hash();

hashes.push(hash.to_vec().into());
txs.push(Transaction::new(hash.to_string(), bytes.to_vec()))
}
}
}

self.tx_storage.create(&txs).await.map_err(|error| {
error!(?error);
Status::internal("internal error")
})?;

Ok(Response::new(SubmitTxResponse { r#ref: hashes }))
}

async fn wait_for_tx(
&self,
_request: Request<WaitForTxRequest>,
) -> Result<Response<Self::WaitForTxStream>, Status> {
todo!()
}

async fn read_mempool(
&self,
_request: tonic::Request<ReadMempoolRequest>,
) -> Result<tonic::Response<ReadMempoolResponse>, tonic::Status> {
Err(Status::unimplemented("read_mempool is not yet available"))
}

async fn watch_mempool(
&self,
_request: tonic::Request<WatchMempoolRequest>,
) -> Result<tonic::Response<Self::WatchMempoolStream>, tonic::Status> {
todo!()
}

async fn eval_tx(
&self,
_request: tonic::Request<EvalTxRequest>,
) -> Result<tonic::Response<EvalTxResponse>, Status> {
todo!()
}
}
Loading
Loading