Skip to content

Commit

Permalink
Implement sorting algorithm (#25)
Browse files Browse the repository at this point in the history
* feat: updated priority to queue

* feat: implemented priority config

* feat: added priority module

* chore: adjusted lint

* feat: implemented priority quote

* feat: implemented storage query to be used in priority module

* test: implemented priority test

* chore: removed dbg

* chore: added validation when the state is empty

* docs: added priority docs

* chore: updated config structure

* docs: updated docs
  • Loading branch information
paulobressan authored Feb 19, 2025
1 parent 99003bf commit 67ea5ae
Show file tree
Hide file tree
Showing 16 changed files with 503 additions and 141 deletions.

This file was deleted.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions docs/app/_meta.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ export default {
index: 'Introduction',
installation: 'Installation',
configuration: 'Configuration',
priority: 'Priority Queues',
design: 'Design',
};
17 changes: 17 additions & 0 deletions docs/app/configuration/page.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ uri = "https://mainnet.utxorpc-v0.demeter.run"
# metadata is optional
[u5c.metadata]
"dmtr-api-key" = "your key"

# optional config
[[queues]]
name = "banana"
weight = 2
```

### `storage` section
Expand Down Expand Up @@ -102,6 +107,18 @@ The `u5c.metadata` section is optional and defines metadata for U5C connection,

- `key`: a string value.

### `queues` section

The `queues` section defines the options to create queues.

| property | type | example |
| -------- | ------- | ---------- |
| name | string | queue_name |
| weight | number | 2 |

- `name`: queue name that can be used in sending transactions.
- `weight`: the weight of the queue. A queue with a bigger weight will have more priority to process transactions.

## Tips

For U5C, the UTxO RPC from [Demeter](https://demeter.run/ports/cardano-utxorpc) can be used.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ We chose **Queue** because it allows the user to define a configurable queue, an

**Rationale**
- Supports configurable queues.
- Quote of transactions.
- Quota of transactions.
- Handles when a transaction can be moved to the fanout stage.

3 changes: 2 additions & 1 deletion docs/app/design/page.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ erDiagram
TEXT id PK
BLOB raw
TEXT status
INTEGER priority
INTEGER slot
TEXT queue
DATETIME created_at
DATETIME updated_at
}
Expand Down
16 changes: 16 additions & 0 deletions docs/app/priority/page.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
---
title: Priority Queues
sidebarTitle: Priority Queues
---

## Priority Queues

With the Priority module, Boros allows users to configure queues to prioritise transactions. Each queue has a weight that Boros uses to calculate how many slots of the quota a queue can use. For transactions where there's no queue, a default queue is set. When a queue is removed from the file and Boros still has transactions pending for the queue removed, Boros uses the default queue weight to process unknown queues.

Follow the [configuration](../configuration) to configure the priority queues.

### Example
The quota is 10, so the fanout stage has a backpressure of 10, meaning only 10 transactions can be waiting for propagation. The quota is released when transactions are sent, allowing new transactions to wait. Furthermore, two queues are configured: Queue A with a weight of 3 and Queue B with a weight of 2. Queue A has higher priority than Queue B, so Queue A receives 6 slots and Queue B receives 4 slots from the quota.

### Default
Priority configurations are not required. If not configured, the default queue is used. The default queue has a weight of 1 but can be replaced by setting a configuration with the name default. Transactions with unknown queues are processed using the default weight.
5 changes: 5 additions & 0 deletions examples/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,8 @@ uri = "https://mainnet.utxorpc-v0.demeter.run"
# metadata is optional
[u5c.metadata]
"dmtr-api-key" = "your key"

[[queues]]
name = "banana"
weight = 2

11 changes: 9 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::{env, error::Error, path, sync::Arc};
use std::{collections::HashSet, env, error::Error, path, sync::Arc};

use anyhow::Result;
use dotenv::dotenv;
use priority::DEFAULT_QUEUE;
use serde::Deserialize;
use storage::sqlite::{SqliteCursor, SqliteStorage, SqliteTransaction};
use tokio::try_join;
Expand All @@ -10,6 +11,7 @@ use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, Env

mod ledger;
mod pipeline;
mod priority;
mod server;
mod storage;

Expand Down Expand Up @@ -49,12 +51,14 @@ struct Config {
storage: storage::Config,
peer_manager: pipeline::fanout::PeerManagerConfig,
monitor: pipeline::monitor::Config,
#[serde(default)]
queues: HashSet<priority::QueueConfig>,
u5c: ledger::u5c::Config,
}

impl Config {
pub fn new() -> Result<Self, Box<dyn Error>> {
let config = config::Config::builder()
let mut config: Config = config::Config::builder()
.add_source(
config::File::with_name(&env::var("BOROS_CONFIG").unwrap_or("boros.toml".into()))
.required(false),
Expand All @@ -64,6 +68,9 @@ impl Config {
.build()?
.try_deserialize()?;

(!config.queues.iter().any(|q| q.name == DEFAULT_QUEUE))
.then(|| config.queues.insert(Default::default()));

Ok(config)
}
}
36 changes: 21 additions & 15 deletions src/pipeline/fanout/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use tracing::info;

use crate::{
ledger::{relay::RelayDataAdapter, u5c::U5cDataAdapter},
priority::Priority,
storage::{sqlite::SqliteTransaction, Transaction, TransactionStatus},
};

Expand All @@ -31,7 +32,7 @@ pub enum FanoutError {
}

pub enum FanoutUnit {
Transaction(Transaction),
Transactions(Vec<Transaction>),
PeerDiscovery(String),
}

Expand All @@ -42,6 +43,7 @@ pub struct Stage {
relay_adapter: Arc<dyn RelayDataAdapter + Send + Sync>,
u5c_adapter: Arc<dyn U5cDataAdapter>,
storage: Arc<SqliteTransaction>,
priority: Arc<Priority>,
}

impl Stage {
Expand All @@ -50,12 +52,14 @@ impl Stage {
relay_adapter: Arc<dyn RelayDataAdapter + Send + Sync>,
u5c_adapter: Arc<dyn U5cDataAdapter>,
storage: Arc<SqliteTransaction>,
priority: Arc<Priority>,
) -> Self {
Self {
config,
relay_adapter,
u5c_adapter,
storage,
priority,
}
}
}
Expand Down Expand Up @@ -94,14 +98,14 @@ impl gasket::framework::Worker<Stage> for Worker {
let additional_peers_required =
desired_count as usize - self.peer_manager.connected_peers_count().await;

if let Some(tx) = stage
.storage
let transactions = stage
.priority
.next(TransactionStatus::Validated)
.await
.or_retry()?
{
info!("Found Transaction: {}", tx.id);
return Ok(WorkSchedule::Unit(FanoutUnit::Transaction(tx)));
.or_retry()?;

if !transactions.is_empty() {
return Ok(WorkSchedule::Unit(FanoutUnit::Transactions(transactions)));
}

if self.peer_discovery_queue < additional_peers_required as u8 {
Expand Down Expand Up @@ -135,18 +139,20 @@ impl gasket::framework::Worker<Stage> for Worker {

async fn execute(&mut self, unit: &FanoutUnit, stage: &mut Stage) -> Result<(), WorkerError> {
match unit {
FanoutUnit::Transaction(tx) => {
let mut transaction = tx.clone();
info!("Propagating Transaction: {}", transaction.id);
FanoutUnit::Transactions(transactions) => {
for transaction in transactions {
let mut transaction = transaction.clone();
info!("Propagating Transaction: {}", transaction.id);

let tip = stage.u5c_adapter.fetch_tip().await.or_retry()?;
let tip = stage.u5c_adapter.fetch_tip().await.or_retry()?;

self.peer_manager.add_tx(transaction.raw.clone()).await;
self.peer_manager.add_tx(transaction.raw.clone()).await;

transaction.status = TransactionStatus::InFlight;
transaction.slot = Some(tip.0);
transaction.status = TransactionStatus::InFlight;
transaction.slot = Some(tip.0);

stage.storage.update(&transaction).await.or_retry()?;
stage.storage.update(&transaction).await.or_retry()?;
}
}
FanoutUnit::PeerDiscovery(peer_addr) => {
info!("Connecting to peer: {}", peer_addr);
Expand Down
45 changes: 30 additions & 15 deletions src/pipeline/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,21 @@ use gasket::framework::*;
use tokio::time::sleep;
use tracing::info;

use crate::storage::{sqlite::SqliteTransaction, Transaction, TransactionStatus};
use crate::{
priority::Priority,
storage::{sqlite::SqliteTransaction, Transaction, TransactionStatus},
};

#[derive(Stage)]
#[stage(name = "ingest", unit = "Transaction", worker = "Worker")]
#[stage(name = "ingest", unit = "Vec<Transaction>", worker = "Worker")]
pub struct Stage {
storage: Arc<SqliteTransaction>,
priority: Arc<Priority>,
}

impl Stage {
pub fn new(storage: Arc<SqliteTransaction>) -> Self {
Self { storage }
pub fn new(storage: Arc<SqliteTransaction>, priority: Arc<Priority>) -> Self {
Self { storage, priority }
}
}

Expand All @@ -29,27 +33,38 @@ impl gasket::framework::Worker<Stage> for Worker {
async fn schedule(
&mut self,
stage: &mut Stage,
) -> Result<WorkSchedule<Transaction>, WorkerError> {
if let Some(tx) = stage
.storage
) -> Result<WorkSchedule<Vec<Transaction>>, WorkerError> {
let transactions = stage
.priority
.next(TransactionStatus::Pending)
.await
.or_retry()?
{
return Ok(WorkSchedule::Unit(tx));
.or_retry()?;

if !transactions.is_empty() {
return Ok(WorkSchedule::Unit(transactions));
}

sleep(Duration::from_secs(1)).await;
Ok(WorkSchedule::Idle)
}

async fn execute(&mut self, unit: &Transaction, stage: &mut Stage) -> Result<(), WorkerError> {
let mut transaction = unit.clone();
async fn execute(
&mut self,
unit: &Vec<Transaction>,
stage: &mut Stage,
) -> Result<(), WorkerError> {
info!("validating {} transactions", unit.len());

info!("ingest {}", transaction.id);
let transactions = unit
.iter()
.map(|tx| {
let mut tx = tx.clone();
tx.status = TransactionStatus::Validated;
tx
})
.collect();

transaction.status = TransactionStatus::Validated;
stage.storage.update(&transaction).await.or_retry()?;
stage.storage.update_batch(&transactions).await.or_retry()?;

Ok(())
}
Expand Down
6 changes: 5 additions & 1 deletion src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::{
relay::{MockRelayDataAdapter, RelayDataAdapter},
u5c::{Point, U5cDataAdapterImpl},
},
priority::Priority,
storage::{
sqlite::{SqliteCursor, SqliteTransaction},
Cursor,
Expand All @@ -28,12 +29,15 @@ pub async fn run(
Arc::new(MockRelayDataAdapter::new());
let u5c_data_adapter = Arc::new(U5cDataAdapterImpl::try_new(config.u5c, cursor).await?);

let ingest = ingest::Stage::new(tx_storage.clone());
let priority = Arc::new(Priority::new(tx_storage.clone(), config.queues));

let ingest = ingest::Stage::new(tx_storage.clone(), priority.clone());
let fanout = fanout::Stage::new(
config.peer_manager,
relay_adapter.clone(),
u5c_data_adapter.clone(),
tx_storage.clone(),
priority.clone(),
);

let monitor = monitor::Stage::new(
Expand Down
Loading

0 comments on commit 67ea5ae

Please sign in to comment.