From 6a486294b3d1559095bc3d821109cd8fa110a56f Mon Sep 17 00:00:00 2001 From: Julian Sparber Date: Tue, 4 Feb 2025 18:57:02 +0100 Subject: [PATCH 1/6] node: Set the private key from the app The app will need the to store the private key locally, therefore let the app create the key. --- Cargo.lock | 1 + aardvark-app/Cargo.toml | 1 + aardvark-app/src/application.rs | 6 +++++- aardvark-node/src/network.rs | 7 +++---- 4 files changed, 10 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8f03be8..60652d7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12,6 +12,7 @@ dependencies = [ "gettext-rs", "gtk4", "libadwaita", + "p2panda-core", "serde", "serde_json", "sourceview5", diff --git a/aardvark-app/Cargo.toml b/aardvark-app/Cargo.toml index 550f1e6..351ef5c 100644 --- a/aardvark-app/Cargo.toml +++ b/aardvark-app/Cargo.toml @@ -14,6 +14,7 @@ serde_json = "1.0.128" tokio = { version = "1.42.0", features = ["full"] } tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } sourceview = { package = "sourceview5", version = "0.9" } +p2panda-core = { version = "0.2.0", default-features = false } [dependencies.adw] package = "libadwaita" diff --git a/aardvark-app/src/application.rs b/aardvark-app/src/application.rs index a36f658..9959fee 100644 --- a/aardvark-app/src/application.rs +++ b/aardvark-app/src/application.rs @@ -25,6 +25,7 @@ use adw::prelude::*; use adw::subclass::prelude::*; use gettextrs::gettext; use gtk::{gio, glib}; +use p2panda_core::PrivateKey; use tokio::sync::{mpsc, oneshot}; use automerge::PatchAction; @@ -57,7 +58,10 @@ mod imp { fn new() -> Self { let document = Document::default(); - let (backend_shutdown, tx, rx) = network::run().expect("running p2p backend"); + let private_key = PrivateKey::new(); + let public_key = private_key.public_key(); + + let (backend_shutdown, tx, rx) = network::run(private_key).expect("running p2p backend"); AardvarkApplication { document, diff --git a/aardvark-node/src/network.rs b/aardvark-node/src/network.rs index 538c124..dbed677 100644 --- a/aardvark-node/src/network.rs +++ b/aardvark-node/src/network.rs @@ -80,7 +80,9 @@ impl TopicLogMap for TextDocumentStore { } #[allow(clippy::type_complexity)] -pub fn run() -> Result<( +pub fn run( + private_key: PrivateKey, +) -> Result<( oneshot::Sender<()>, mpsc::Sender>, mpsc::Receiver>, @@ -100,9 +102,6 @@ pub fn run() -> Result<( let network_id = Hash::new(b"aardvark <3"); let document_id = TextDocument(Hash::new(b"my first doc <3").into()); - let private_key = PrivateKey::new(); - println!("my public key: {}", private_key.public_key()); - let mut operations_store = MemoryStore::::new(); let documents_store = TextDocumentStore::new(); documents_store From 7df5a00db6391913b14e525420fcb10ea7b02cf3 Mon Sep 17 00:00:00 2001 From: Julian Sparber Date: Tue, 4 Feb 2025 19:05:33 +0100 Subject: [PATCH 2/6] chore: Run rust fmt on some files --- aardvark-node/src/network.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/aardvark-node/src/network.rs b/aardvark-node/src/network.rs index dbed677..79544c5 100644 --- a/aardvark-node/src/network.rs +++ b/aardvark-node/src/network.rs @@ -10,17 +10,17 @@ use p2panda_net::config::GossipConfig; use p2panda_net::{FromNetwork, NetworkBuilder, SyncConfiguration, ToNetwork, TopicId}; use p2panda_store::MemoryStore; use p2panda_stream::{DecodeExt, IngestExt}; -use p2panda_sync::log_sync::{LogSyncProtocol, TopicLogMap}; use p2panda_sync::TopicQuery; +use p2panda_sync::log_sync::{LogSyncProtocol, TopicLogMap}; use serde::{Deserialize, Serialize}; use tokio::runtime::Builder; use tokio::sync::{mpsc, oneshot}; use tokio::task::JoinHandle; -use tokio_stream::wrappers::ReceiverStream; use tokio_stream::StreamExt; +use tokio_stream::wrappers::ReceiverStream; use crate::operation::{ - create_operation, decode_gossip_message, encode_gossip_operation, AardvarkExtensions, + AardvarkExtensions, create_operation, decode_gossip_message, encode_gossip_operation, }; #[derive(Clone, Debug, PartialEq, Eq, StdHash, Serialize, Deserialize)] From 14cbdd1d7ce4d05dccaf633929ce0f91288db277 Mon Sep 17 00:00:00 2001 From: Julian Sparber Date: Tue, 4 Feb 2025 14:09:36 +0100 Subject: [PATCH 3/6] node: Split network setup from document creation --- aardvark-app/src/application.rs | 14 +- aardvark-node/src/network.rs | 241 ++++++++++++++++++++------------ 2 files changed, 158 insertions(+), 97 deletions(-) diff --git a/aardvark-app/src/application.rs b/aardvark-app/src/application.rs index 9959fee..f052a7c 100644 --- a/aardvark-app/src/application.rs +++ b/aardvark-app/src/application.rs @@ -25,8 +25,8 @@ use adw::prelude::*; use adw::subclass::prelude::*; use gettextrs::gettext; use gtk::{gio, glib}; -use p2panda_core::PrivateKey; -use tokio::sync::{mpsc, oneshot}; +use p2panda_core::{Hash, PrivateKey}; +use tokio::sync::mpsc; use automerge::PatchAction; use crate::config::VERSION; @@ -37,14 +37,13 @@ use crate::{AardvarkTextBuffer, AardvarkWindow}; mod imp { use super::*; - #[derive(Debug)] pub struct AardvarkApplication { pub window: OnceCell, pub document: Document, pub tx: mpsc::Sender>, pub rx: RefCell>>>, #[allow(dead_code)] - backend_shutdown: oneshot::Sender<()>, + network: network::Network, } impl AardvarkApplication { @@ -60,12 +59,15 @@ mod imp { let document = Document::default(); let private_key = PrivateKey::new(); let public_key = private_key.public_key(); + let network = network::Network::new(); + println!("The public key used: {}", public_key); - let (backend_shutdown, tx, rx) = network::run(private_key).expect("running p2p backend"); + network.run(private_key, Hash::new(b"aardvark <3")); + let (tx, rx) = network.get_or_create_document(Hash::new(b"some document")); AardvarkApplication { document, - backend_shutdown, + network, tx, rx: RefCell::new(Some(rx)), window: OnceCell::new(), diff --git a/aardvark-node/src/network.rs b/aardvark-node/src/network.rs index 79544c5..e227b87 100644 --- a/aardvark-node/src/network.rs +++ b/aardvark-node/src/network.rs @@ -13,7 +13,8 @@ use p2panda_stream::{DecodeExt, IngestExt}; use p2panda_sync::TopicQuery; use p2panda_sync::log_sync::{LogSyncProtocol, TopicLogMap}; use serde::{Deserialize, Serialize}; -use tokio::runtime::Builder; +use tokio::runtime::{Builder, Runtime}; +use tokio::sync::OnceCell; use tokio::sync::{mpsc, oneshot}; use tokio::task::JoinHandle; use tokio_stream::StreamExt; @@ -79,65 +80,122 @@ impl TopicLogMap for TextDocumentStore { } } -#[allow(clippy::type_complexity)] -pub fn run( - private_key: PrivateKey, -) -> Result<( - oneshot::Sender<()>, - mpsc::Sender>, - mpsc::Receiver>, -)> { - let (to_network, mut from_app) = mpsc::channel::>(512); - let (to_app, from_network) = mpsc::channel(512); +pub struct Network { + inner: Arc, +} + +struct NetworkInner { + runtime: Runtime, + #[allow(dead_code)] + shutdown_tx: OnceCell>, + operations_store: MemoryStore, + documents_store: TextDocumentStore, + network: OnceCell>, + private_key: OnceCell, +} - let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); +impl Default for Network { + fn default() -> Self { + Network::new() + } +} + +impl Network { + pub fn new() -> Self { + let operations_store = MemoryStore::::new(); + let documents_store = TextDocumentStore::new(); - std::thread::spawn(move || { let runtime = Builder::new_current_thread() .enable_all() .build() .expect("backend runtime ready to spawn tasks"); - runtime.block_on(async move { - let network_id = Hash::new(b"aardvark <3"); - let document_id = TextDocument(Hash::new(b"my first doc <3").into()); - - let mut operations_store = MemoryStore::::new(); - let documents_store = TextDocumentStore::new(); - documents_store - .write() - .authors - .insert(private_key.public_key(), vec![document_id.clone()]); - - let sync = LogSyncProtocol::new(documents_store.clone(), operations_store.clone()); - let sync_config = SyncConfiguration::::new(sync); - - let network = NetworkBuilder::new(network_id.into()) - .private_key(private_key.clone()) - .discovery(LocalDiscovery::new()) - .gossip(GossipConfig { - // @TODO(adz): This is a temporary workaround to account for Automerge giving - // us surprisingly fairly large payloads which break the default gossip message - // size limit given by iroh-gossip (4092 bytes). - // - // This especially happens if another peer edits a document for the first time - // which already contains some text, even if it's just adding one single - // character. It's also surprising that the 4kb limit is reached even if the - // text itself is less than ca. 100 characters long. - // - // I believe we can fix this by understanding better how Automerge's "diffs" - // are made and possibily using more low-level methods of their library to - // really only send the actual changed text. - // - // Related issue: https://github.com/p2panda/aardvark/issues/11 - max_message_size: 512_000, - ..Default::default() - }) - .sync(sync_config) - .build() - .await - .expect("network spawning"); + Network { + inner: Arc::new(NetworkInner { + operations_store, + documents_store, + network: OnceCell::new(), + private_key: OnceCell::new(), + shutdown_tx: OnceCell::new(), + runtime, + }), + } + } + + pub fn run(&self, private_key: PrivateKey, network_id: Hash) { + let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); + let sync = LogSyncProtocol::new( + self.inner.documents_store.clone(), + self.inner.operations_store.clone(), + ); + let sync_config = SyncConfiguration::::new(sync); + + self.inner + .private_key + .set(private_key.clone()) + .expect("network can be run only once"); + + self.inner + .shutdown_tx + .set(shutdown_tx) + .expect("network can be run only once"); + + let network_inner_clone = self.inner.clone(); + std::thread::spawn(move || { + let network_inner_clone2 = network_inner_clone.clone(); + network_inner_clone.runtime.block_on(async move { + network_inner_clone2 + .network + .get_or_init(|| async { + NetworkBuilder::new(network_id.into()) + .private_key(private_key) + .discovery(LocalDiscovery::new()) + .gossip(GossipConfig { + // @TODO(adz): This is a temporary workaround to account for Automerge giving + // us surprisingly fairly large payloads which break the default gossip message + // size limit given by iroh-gossip (4092 bytes). + // + // This especially happens if another peer edits a document for the first time + // which already contains some text, even if it's just adding one single + // character. It's also surprising that the 4kb limit is reached even if the + // text itself is less than ca. 100 characters long. + // + // I believe we can fix this by understanding better how Automerge's "diffs" + // are made and possibily using more low-level methods of their library to + // really only send the actual changed text. + // + // Related issue: https://github.com/p2panda/aardvark/issues/11 + max_message_size: 512_000, + ..Default::default() + }) + .sync(sync_config) + .build() + .await + .expect("network spawning") + }) + .await; + shutdown_rx.await.unwrap(); + }); + }); + } + + pub fn get_or_create_document( + &self, + document_id: Hash, + ) -> (mpsc::Sender>, mpsc::Receiver>) { + let document_id = TextDocument(document_id.into()); + + let (to_network, mut from_app) = mpsc::channel::>(512); + let (to_app, from_network) = mpsc::channel(512); + + let network_inner = self.inner.clone(); + self.inner.runtime.spawn(async move { + // Wait for the network to be started + let network = network_inner + .network + .get_or_init(|| async { unreachable!("network not running") }) + .await; let (topic_tx, topic_rx, ready) = network .subscribe(document_id.clone()) .await @@ -148,45 +206,42 @@ pub fn run( println!("network joined!"); }); - // Task for handling operations arriving from the network. - let operations_store_clone = operations_store.clone(); let document_id_clone = document_id.clone(); - let _result: JoinHandle> = tokio::task::spawn(async move { - let stream = ReceiverStream::new(topic_rx); + let stream = ReceiverStream::new(topic_rx); + let stream = stream.filter_map(|event| match event { + FromNetwork::GossipMessage { bytes, .. } => match decode_gossip_message(&bytes) { + Ok(result) => Some(result), + Err(err) => { + eprintln!("could not decode gossip message: {err}"); + None + } + }, + FromNetwork::SyncMessage { + header, payload, .. + } => Some((header, payload)), + }); - let stream = stream.filter_map(|event| match event { - FromNetwork::GossipMessage { bytes, .. } => match decode_gossip_message(&bytes) - { - Ok(result) => Some(result), - Err(err) => { - eprintln!("could not decode gossip message: {err}"); - None - } - }, - FromNetwork::SyncMessage { - header, payload, .. - } => Some((header, payload)), + // Decode and ingest the p2panda operations. + let mut stream = stream + .decode() + .filter_map(|result| match result { + Ok(operation) => Some(operation), + Err(err) => { + eprintln!("decode operation error: {err}"); + None + } + }) + .ingest(network_inner.operations_store.clone(), 128) + .filter_map(|result| match result { + Ok(operation) => Some(operation), + Err(err) => { + eprintln!("ingest operation error: {err}"); + None + } }); - // Decode and ingest the p2panda operations. - let mut stream = stream - .decode() - .filter_map(|result| match result { - Ok(operation) => Some(operation), - Err(err) => { - eprintln!("decode operation error: {err}"); - None - } - }) - .ingest(operations_store_clone, 128) - .filter_map(|result| match result { - Ok(operation) => Some(operation), - Err(err) => { - eprintln!("ingest operation error: {err}"); - None - } - }); - + let documents_store = network_inner.documents_store.clone(); + let _result: JoinHandle> = tokio::task::spawn(async move { // Process the operations and forward application messages to app layer. while let Some(operation) = stream.next().await { let prune_flag: PruneFlag = operation.header.extract().unwrap_or_default(); @@ -225,6 +280,12 @@ pub fn run( Ok(()) }); + let mut operations_store = network_inner.operations_store.clone(); + let private_key = network_inner + .private_key + .get() + .expect("no private key set") + .clone(); // Task for handling events coming from the application layer. let _result: JoinHandle> = tokio::task::spawn(async move { while let Some(bytes) = from_app.recv().await { @@ -260,10 +321,8 @@ pub fn run( Ok(()) }); - - shutdown_rx.await.unwrap(); }); - }); - Ok((shutdown_tx, to_network, from_network)) + (to_network, from_network) + } } From 9dbacd1b84bc5f4cab8ee70547ac569ac4a492d9 Mon Sep 17 00:00:00 2001 From: Julian Sparber Date: Tue, 4 Feb 2025 14:36:02 +0100 Subject: [PATCH 4/6] node: use multi-thread tokio runtime There isn't much point in spawning our own thread to run a local thread runtime, so just use multi-thread tokio runtime directly. --- aardvark-node/src/network.rs | 83 +++++++++++++++--------------------- 1 file changed, 35 insertions(+), 48 deletions(-) diff --git a/aardvark-node/src/network.rs b/aardvark-node/src/network.rs index e227b87..9d4ee14 100644 --- a/aardvark-node/src/network.rs +++ b/aardvark-node/src/network.rs @@ -15,7 +15,7 @@ use p2panda_sync::log_sync::{LogSyncProtocol, TopicLogMap}; use serde::{Deserialize, Serialize}; use tokio::runtime::{Builder, Runtime}; use tokio::sync::OnceCell; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::mpsc; use tokio::task::JoinHandle; use tokio_stream::StreamExt; use tokio_stream::wrappers::ReceiverStream; @@ -86,8 +86,6 @@ pub struct Network { struct NetworkInner { runtime: Runtime, - #[allow(dead_code)] - shutdown_tx: OnceCell>, operations_store: MemoryStore, documents_store: TextDocumentStore, network: OnceCell>, @@ -105,10 +103,11 @@ impl Network { let operations_store = MemoryStore::::new(); let documents_store = TextDocumentStore::new(); - let runtime = Builder::new_current_thread() + let runtime = Builder::new_multi_thread() + .worker_threads(1) .enable_all() .build() - .expect("backend runtime ready to spawn tasks"); + .expect("Could not start tokio runtime"); Network { inner: Arc::new(NetworkInner { @@ -116,14 +115,12 @@ impl Network { documents_store, network: OnceCell::new(), private_key: OnceCell::new(), - shutdown_tx: OnceCell::new(), runtime, }), } } pub fn run(&self, private_key: PrivateKey, network_id: Hash) { - let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); let sync = LogSyncProtocol::new( self.inner.documents_store.clone(), self.inner.operations_store.clone(), @@ -135,48 +132,38 @@ impl Network { .set(private_key.clone()) .expect("network can be run only once"); - self.inner - .shutdown_tx - .set(shutdown_tx) - .expect("network can be run only once"); - let network_inner_clone = self.inner.clone(); - std::thread::spawn(move || { - let network_inner_clone2 = network_inner_clone.clone(); - network_inner_clone.runtime.block_on(async move { - network_inner_clone2 - .network - .get_or_init(|| async { - NetworkBuilder::new(network_id.into()) - .private_key(private_key) - .discovery(LocalDiscovery::new()) - .gossip(GossipConfig { - // @TODO(adz): This is a temporary workaround to account for Automerge giving - // us surprisingly fairly large payloads which break the default gossip message - // size limit given by iroh-gossip (4092 bytes). - // - // This especially happens if another peer edits a document for the first time - // which already contains some text, even if it's just adding one single - // character. It's also surprising that the 4kb limit is reached even if the - // text itself is less than ca. 100 characters long. - // - // I believe we can fix this by understanding better how Automerge's "diffs" - // are made and possibily using more low-level methods of their library to - // really only send the actual changed text. - // - // Related issue: https://github.com/p2panda/aardvark/issues/11 - max_message_size: 512_000, - ..Default::default() - }) - .sync(sync_config) - .build() - .await - .expect("network spawning") - }) - .await; - - shutdown_rx.await.unwrap(); - }); + self.inner.runtime.spawn(async move { + network_inner_clone + .network + .get_or_init(|| async { + NetworkBuilder::new(network_id.into()) + .private_key(private_key) + .discovery(LocalDiscovery::new()) + .gossip(GossipConfig { + // @TODO(adz): This is a temporary workaround to account for Automerge giving + // us surprisingly fairly large payloads which break the default gossip message + // size limit given by iroh-gossip (4092 bytes). + // + // This especially happens if another peer edits a document for the first time + // which already contains some text, even if it's just adding one single + // character. It's also surprising that the 4kb limit is reached even if the + // text itself is less than ca. 100 characters long. + // + // I believe we can fix this by understanding better how Automerge's "diffs" + // are made and possibily using more low-level methods of their library to + // really only send the actual changed text. + // + // Related issue: https://github.com/p2panda/aardvark/issues/11 + max_message_size: 512_000, + ..Default::default() + }) + .sync(sync_config) + .build() + .await + .expect("network spawning") + }) + .await; }); } From 353c4faf5684866cd271f1786814600fbb59896f Mon Sep 17 00:00:00 2001 From: Julian Sparber Date: Tue, 4 Feb 2025 14:48:38 +0100 Subject: [PATCH 5/6] node: Add shutdown method to network --- aardvark-node/src/network.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/aardvark-node/src/network.rs b/aardvark-node/src/network.rs index 9d4ee14..f29fe04 100644 --- a/aardvark-node/src/network.rs +++ b/aardvark-node/src/network.rs @@ -167,6 +167,13 @@ impl Network { }); } + pub fn shutdown(&self) { + let network = self.inner.network.get().expect("network running").clone(); + self.inner.runtime.block_on(async move { + network.shutdown().await.expect("network to shutdown"); + }); + } + pub fn get_or_create_document( &self, document_id: Hash, From 3893b059409b44d3776bda3a81c27136f4a8c9b9 Mon Sep 17 00:00:00 2001 From: Julian Sparber Date: Tue, 4 Feb 2025 18:12:31 +0100 Subject: [PATCH 6/6] node: Use tracing for logging --- Cargo.lock | 1 + aardvark-node/Cargo.toml | 1 + aardvark-node/src/network.rs | 13 +++++++------ 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 60652d7..7c3c061 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -36,6 +36,7 @@ dependencies = [ "serde", "tokio", "tokio-stream", + "tracing", ] [[package]] diff --git a/aardvark-node/Cargo.toml b/aardvark-node/Cargo.toml index 146c794..d382c96 100644 --- a/aardvark-node/Cargo.toml +++ b/aardvark-node/Cargo.toml @@ -16,3 +16,4 @@ p2panda-sync = { version = "0.2.0", features = ["log-sync"] } serde = { version = "1.0.215", features = ["derive"] } tokio = { version = "1.42.0", features = ["full"] } tokio-stream = "0.1.17" +tracing = "0.1" \ No newline at end of file diff --git a/aardvark-node/src/network.rs b/aardvark-node/src/network.rs index f29fe04..1743eb4 100644 --- a/aardvark-node/src/network.rs +++ b/aardvark-node/src/network.rs @@ -19,6 +19,7 @@ use tokio::sync::mpsc; use tokio::task::JoinHandle; use tokio_stream::StreamExt; use tokio_stream::wrappers::ReceiverStream; +use tracing::{error, debug}; use crate::operation::{ AardvarkExtensions, create_operation, decode_gossip_message, encode_gossip_operation, @@ -197,7 +198,7 @@ impl Network { tokio::task::spawn(async move { let _ = ready.await; - println!("network joined!"); + debug!("network joined!"); }); let document_id_clone = document_id.clone(); @@ -206,7 +207,7 @@ impl Network { FromNetwork::GossipMessage { bytes, .. } => match decode_gossip_message(&bytes) { Ok(result) => Some(result), Err(err) => { - eprintln!("could not decode gossip message: {err}"); + error!("could not decode gossip message: {err}"); None } }, @@ -221,7 +222,7 @@ impl Network { .filter_map(|result| match result { Ok(operation) => Some(operation), Err(err) => { - eprintln!("decode operation error: {err}"); + error!("decode operation error: {err}"); None } }) @@ -229,7 +230,7 @@ impl Network { .filter_map(|result| match result { Ok(operation) => Some(operation), Err(err) => { - eprintln!("ingest operation error: {err}"); + error!("ingest operation error: {err}"); None } }); @@ -239,7 +240,7 @@ impl Network { // Process the operations and forward application messages to app layer. while let Some(operation) = stream.next().await { let prune_flag: PruneFlag = operation.header.extract().unwrap_or_default(); - println!( + debug!( "received operation from {}, seq_num={}, prune_flag={}", operation.header.public_key, operation.header.seq_num, @@ -296,7 +297,7 @@ impl Network { ) .await?; - println!( + debug!( "created operation seq_num={}, prune_flag={}, payload_size={}", header.seq_num, prune_flag,