Skip to content

Commit

Permalink
Don't fail mempool sync on missing transactions
Browse files Browse the repository at this point in the history
Otherwise, RBF may cause current sync implementation to fail.
  • Loading branch information
romanz committed Jan 27, 2024
1 parent f1be85f commit 36ba12d
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 41 deletions.
100 changes: 61 additions & 39 deletions src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use bitcoin::{Amount, BlockHash, Transaction, Txid};
use bitcoincore_rpc::{json, jsonrpc, Auth, Client, RpcApi};
use crossbeam_channel::Receiver;
use parking_lot::Mutex;
use serde::Serialize;
use serde_json::{json, Value};

use std::fs::File;
Expand Down Expand Up @@ -223,54 +224,49 @@ impl Daemon {
pub(crate) fn get_mempool_entries(
&self,
txids: &[Txid],
) -> Result<Vec<Result<json::GetMempoolEntryResult>>> {
let client = self.rpc.get_jsonrpc_client();
debug!("getting {} mempool entries", txids.len());
let args: Vec<_> = txids
.iter()
.map(|txid| vec![serde_json::value::to_raw_value(txid).unwrap()])
.collect();
let reqs: Vec<_> = args
.iter()
.map(|a| client.build_request("getmempoolentry", a))
.collect();
let res = client.send_batch(&reqs).context("batch request failed")?;
debug!("got {} mempool entries", res.len());
Ok(res
) -> Result<Vec<Option<json::GetMempoolEntryResult>>> {
let results = batch_request(self.rpc.get_jsonrpc_client(), "getmempoolentry", txids)?;
Ok(results
.into_iter()
.map(|r| {
r.context("missing response")?
.result::<json::GetMempoolEntryResult>()
.context("invalid response")
.map(|r| match r?.result::<json::GetMempoolEntryResult>() {
Ok(entry) => Some(entry),
Err(err) => {
debug!("failed to get mempool entry: {}", err); // probably due to RBF
None
}
})
.collect())
}

pub(crate) fn get_mempool_transactions(
&self,
txids: &[Txid],
) -> Result<Vec<Result<Transaction>>> {
let client = self.rpc.get_jsonrpc_client();
debug!("getting {} transactions", txids.len());
let args: Vec<_> = txids
.iter()
.map(|txid| vec![serde_json::value::to_raw_value(txid).unwrap()])
.collect();
let reqs: Vec<_> = args
.iter()
.map(|a| client.build_request("getrawtransaction", a))
.collect();
let res = client.send_batch(&reqs).context("batch request failed")?;
debug!("got {} mempool transactions", res.len());
Ok(res
) -> Result<Vec<Option<Transaction>>> {
let results = batch_request(self.rpc.get_jsonrpc_client(), "getrawtransaction", txids)?;
Ok(results
.into_iter()
.map(|r| -> Result<Transaction> {
let tx_hex = r
.context("missing response")?
.result::<String>()
.context("invalid response")?;
let tx_bytes = Vec::from_hex(&tx_hex).context("non-hex transaction")?;
deserialize(&tx_bytes).context("invalid transaction")
.map(|r| -> Option<Transaction> {
let tx_hex = match r?.result::<String>() {
Ok(tx_hex) => Some(tx_hex),
Err(err) => {
debug!("failed to get mempool tx: {}", err); // probably due to RBF
None
}
}?;
let tx_bytes = match Vec::from_hex(&tx_hex) {
Ok(tx_bytes) => Some(tx_bytes),
Err(err) => {
warn!("got non-hex transaction {}: {}", tx_hex, err);
None
}
}?;
match deserialize(&tx_bytes) {
Ok(tx) => Some(tx),
Err(err) => {
warn!("got invalid tx {}: {}", tx_hex, err);
None
}
}
})
.collect())
}
Expand Down Expand Up @@ -303,3 +299,29 @@ pub(crate) fn extract_bitcoind_error(err: &bitcoincore_rpc::Error) -> Option<&Rp
_ => None,
}
}

fn batch_request<T>(
client: &jsonrpc::Client,
name: &str,
items: &[T],
) -> Result<Vec<Option<jsonrpc::Response>>>
where
T: Serialize,
{
debug!("calling {} on {} items", name, items.len());
let args: Vec<_> = items
.iter()
.map(|item| vec![serde_json::value::to_raw_value(item).unwrap()])
.collect();
let reqs: Vec<_> = args
.iter()
.map(|arg| client.build_request(name, arg))
.collect();
match client.send_batch(&reqs) {
Ok(values) => {
assert_eq!(items.len(), values.len());
Ok(values)
}
Err(err) => bail!("batch {} request failed: {}", name, err),
}
}
16 changes: 14 additions & 2 deletions src/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,20 @@ impl MempoolSyncUpdate {
.iter()
.zip(entries.into_iter().zip(txs.into_iter()))
.filter_map(|(txid, (entry, tx))| {
let tx = tx.ok()?;
let entry = entry.ok()?;
let entry = match entry {
Some(entry) => entry,
None => {
warn!("missing mempool entry: {}", txid);
return None;
}
};
let tx = match tx {
Some(tx) => tx,
None => {
warn!("missing mempool tx: {}", txid);
return None;
}
};
Some(Entry {
txid: *txid,
tx,
Expand Down

0 comments on commit 36ba12d

Please sign in to comment.