Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

node: Wrap the network code into a struct #42

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions aardvark-app/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ serde_json = "1.0.128"
tokio = { version = "1.42.0", features = ["full"] }
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
sourceview = { package = "sourceview5", version = "0.9" }
p2panda-core = { version = "0.2.0", default-features = false }

[dependencies.adw]
package = "libadwaita"
Expand Down
16 changes: 11 additions & 5 deletions aardvark-app/src/application.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ use adw::prelude::*;
use adw::subclass::prelude::*;
use gettextrs::gettext;
use gtk::{gio, glib};
use tokio::sync::{mpsc, oneshot};
use p2panda_core::{Hash, PrivateKey};
use tokio::sync::mpsc;
use automerge::PatchAction;

use crate::config::VERSION;
Expand All @@ -36,14 +37,13 @@ use crate::{AardvarkTextBuffer, AardvarkWindow};
mod imp {
use super::*;

#[derive(Debug)]
pub struct AardvarkApplication {
pub window: OnceCell<AardvarkWindow>,
pub document: Document,
pub tx: mpsc::Sender<Vec<u8>>,
pub rx: RefCell<Option<mpsc::Receiver<Vec<u8>>>>,
#[allow(dead_code)]
backend_shutdown: oneshot::Sender<()>,
network: network::Network,
}

impl AardvarkApplication {
Expand All @@ -57,11 +57,17 @@ mod imp {

fn new() -> Self {
let document = Document::default();
let (backend_shutdown, tx, rx) = network::run().expect("running p2p backend");
let private_key = PrivateKey::new();
let public_key = private_key.public_key();
let network = network::Network::new();
println!("The public key used: {}", public_key);

network.run(private_key, Hash::new(b"aardvark <3"));
let (tx, rx) = network.get_or_create_document(Hash::new(b"some document"));

AardvarkApplication {
document,
backend_shutdown,
network,
tx,
rx: RefCell::new(Some(rx)),
window: OnceCell::new(),
Expand Down
1 change: 1 addition & 0 deletions aardvark-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ p2panda-sync = { version = "0.2.0", features = ["log-sync"] }
serde = { version = "1.0.215", features = ["derive"] }
tokio = { version = "1.42.0", features = ["full"] }
tokio-stream = "0.1.17"
tracing = "0.1"
255 changes: 154 additions & 101 deletions aardvark-node/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,19 @@ use p2panda_net::config::GossipConfig;
use p2panda_net::{FromNetwork, NetworkBuilder, SyncConfiguration, ToNetwork, TopicId};
use p2panda_store::MemoryStore;
use p2panda_stream::{DecodeExt, IngestExt};
use p2panda_sync::log_sync::{LogSyncProtocol, TopicLogMap};
use p2panda_sync::TopicQuery;
use p2panda_sync::log_sync::{LogSyncProtocol, TopicLogMap};
use serde::{Deserialize, Serialize};
use tokio::runtime::Builder;
use tokio::sync::{mpsc, oneshot};
use tokio::runtime::{Builder, Runtime};
use tokio::sync::OnceCell;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::StreamExt;
use tokio_stream::wrappers::ReceiverStream;
use tracing::{error, debug};

use crate::operation::{
create_operation, decode_gossip_message, encode_gossip_operation, AardvarkExtensions,
AardvarkExtensions, create_operation, decode_gossip_message, encode_gossip_operation,
};

#[derive(Clone, Debug, PartialEq, Eq, StdHash, Serialize, Deserialize)]
Expand Down Expand Up @@ -79,119 +81,166 @@ impl TopicLogMap<TextDocument, LogId> for TextDocumentStore {
}
}

#[allow(clippy::type_complexity)]
pub fn run() -> Result<(
oneshot::Sender<()>,
mpsc::Sender<Vec<u8>>,
mpsc::Receiver<Vec<u8>>,
)> {
let (to_network, mut from_app) = mpsc::channel::<Vec<u8>>(512);
let (to_app, from_network) = mpsc::channel(512);
pub struct Network {
inner: Arc<NetworkInner>,
}

let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
struct NetworkInner {
runtime: Runtime,
operations_store: MemoryStore<LogId, AardvarkExtensions>,
documents_store: TextDocumentStore,
network: OnceCell<p2panda_net::Network<TextDocument>>,
private_key: OnceCell<PrivateKey>,
}

std::thread::spawn(move || {
let runtime = Builder::new_current_thread()
impl Default for Network {
fn default() -> Self {
Network::new()
}
}

impl Network {
pub fn new() -> Self {
let operations_store = MemoryStore::<LogId, AardvarkExtensions>::new();
let documents_store = TextDocumentStore::new();

let runtime = Builder::new_multi_thread()
.worker_threads(1)
.enable_all()
.build()
.expect("backend runtime ready to spawn tasks");

runtime.block_on(async move {
let network_id = Hash::new(b"aardvark <3");
let document_id = TextDocument(Hash::new(b"my first doc <3").into());

let private_key = PrivateKey::new();
println!("my public key: {}", private_key.public_key());

let mut operations_store = MemoryStore::<LogId, AardvarkExtensions>::new();
let documents_store = TextDocumentStore::new();
documents_store
.write()
.authors
.insert(private_key.public_key(), vec![document_id.clone()]);

let sync = LogSyncProtocol::new(documents_store.clone(), operations_store.clone());
let sync_config = SyncConfiguration::<TextDocument>::new(sync);

let network = NetworkBuilder::new(network_id.into())
.private_key(private_key.clone())
.discovery(LocalDiscovery::new())
.gossip(GossipConfig {
// @TODO(adz): This is a temporary workaround to account for Automerge giving
// us surprisingly fairly large payloads which break the default gossip message
// size limit given by iroh-gossip (4092 bytes).
//
// This especially happens if another peer edits a document for the first time
// which already contains some text, even if it's just adding one single
// character. It's also surprising that the 4kb limit is reached even if the
// text itself is less than ca. 100 characters long.
//
// I believe we can fix this by understanding better how Automerge's "diffs"
// are made and possibily using more low-level methods of their library to
// really only send the actual changed text.
//
// Related issue: https://github.com/p2panda/aardvark/issues/11
max_message_size: 512_000,
..Default::default()
.expect("Could not start tokio runtime");

Network {
inner: Arc::new(NetworkInner {
operations_store,
documents_store,
network: OnceCell::new(),
private_key: OnceCell::new(),
runtime,
}),
}
}

pub fn run(&self, private_key: PrivateKey, network_id: Hash) {
let sync = LogSyncProtocol::new(
self.inner.documents_store.clone(),
self.inner.operations_store.clone(),
);
let sync_config = SyncConfiguration::<TextDocument>::new(sync);

self.inner
.private_key
.set(private_key.clone())
.expect("network can be run only once");

let network_inner_clone = self.inner.clone();
self.inner.runtime.spawn(async move {
network_inner_clone
.network
.get_or_init(|| async {
NetworkBuilder::new(network_id.into())
.private_key(private_key)
.discovery(LocalDiscovery::new())
.gossip(GossipConfig {
// @TODO(adz): This is a temporary workaround to account for Automerge giving
// us surprisingly fairly large payloads which break the default gossip message
// size limit given by iroh-gossip (4092 bytes).
//
// This especially happens if another peer edits a document for the first time
// which already contains some text, even if it's just adding one single
// character. It's also surprising that the 4kb limit is reached even if the
// text itself is less than ca. 100 characters long.
//
// I believe we can fix this by understanding better how Automerge's "diffs"
// are made and possibily using more low-level methods of their library to
// really only send the actual changed text.
//
// Related issue: https://github.com/p2panda/aardvark/issues/11
max_message_size: 512_000,
..Default::default()
})
.sync(sync_config)
.build()
.await
.expect("network spawning")
})
.sync(sync_config)
.build()
.await
.expect("network spawning");
.await;
});
}

pub fn shutdown(&self) {
let network = self.inner.network.get().expect("network running").clone();
self.inner.runtime.block_on(async move {
network.shutdown().await.expect("network to shutdown");
});
}

pub fn get_or_create_document(
&self,
document_id: Hash,
) -> (mpsc::Sender<Vec<u8>>, mpsc::Receiver<Vec<u8>>) {
let document_id = TextDocument(document_id.into());

let (to_network, mut from_app) = mpsc::channel::<Vec<u8>>(512);
let (to_app, from_network) = mpsc::channel(512);

let network_inner = self.inner.clone();
self.inner.runtime.spawn(async move {
// Wait for the network to be started
let network = network_inner
.network
.get_or_init(|| async { unreachable!("network not running") })
.await;
let (topic_tx, topic_rx, ready) = network
.subscribe(document_id.clone())
.await
.expect("subscribe to topic");

tokio::task::spawn(async move {
let _ = ready.await;
println!("network joined!");
debug!("network joined!");
});

// Task for handling operations arriving from the network.
let operations_store_clone = operations_store.clone();
let document_id_clone = document_id.clone();
let _result: JoinHandle<Result<()>> = tokio::task::spawn(async move {
let stream = ReceiverStream::new(topic_rx);
let stream = ReceiverStream::new(topic_rx);
let stream = stream.filter_map(|event| match event {
FromNetwork::GossipMessage { bytes, .. } => match decode_gossip_message(&bytes) {
Ok(result) => Some(result),
Err(err) => {
error!("could not decode gossip message: {err}");
None
}
},
FromNetwork::SyncMessage {
header, payload, ..
} => Some((header, payload)),
});

let stream = stream.filter_map(|event| match event {
FromNetwork::GossipMessage { bytes, .. } => match decode_gossip_message(&bytes)
{
Ok(result) => Some(result),
Err(err) => {
eprintln!("could not decode gossip message: {err}");
None
}
},
FromNetwork::SyncMessage {
header, payload, ..
} => Some((header, payload)),
// Decode and ingest the p2panda operations.
let mut stream = stream
.decode()
.filter_map(|result| match result {
Ok(operation) => Some(operation),
Err(err) => {
error!("decode operation error: {err}");
None
}
})
.ingest(network_inner.operations_store.clone(), 128)
.filter_map(|result| match result {
Ok(operation) => Some(operation),
Err(err) => {
error!("ingest operation error: {err}");
None
}
});

// Decode and ingest the p2panda operations.
let mut stream = stream
.decode()
.filter_map(|result| match result {
Ok(operation) => Some(operation),
Err(err) => {
eprintln!("decode operation error: {err}");
None
}
})
.ingest(operations_store_clone, 128)
.filter_map(|result| match result {
Ok(operation) => Some(operation),
Err(err) => {
eprintln!("ingest operation error: {err}");
None
}
});

let documents_store = network_inner.documents_store.clone();
let _result: JoinHandle<Result<()>> = tokio::task::spawn(async move {
// Process the operations and forward application messages to app layer.
while let Some(operation) = stream.next().await {
let prune_flag: PruneFlag = operation.header.extract().unwrap_or_default();
println!(
debug!(
"received operation from {}, seq_num={}, prune_flag={}",
operation.header.public_key,
operation.header.seq_num,
Expand Down Expand Up @@ -226,6 +275,12 @@ pub fn run() -> Result<(
Ok(())
});

let mut operations_store = network_inner.operations_store.clone();
let private_key = network_inner
.private_key
.get()
.expect("no private key set")
.clone();
// Task for handling events coming from the application layer.
let _result: JoinHandle<Result<()>> = tokio::task::spawn(async move {
while let Some(bytes) = from_app.recv().await {
Expand All @@ -242,7 +297,7 @@ pub fn run() -> Result<(
)
.await?;

println!(
debug!(
"created operation seq_num={}, prune_flag={}, payload_size={}",
header.seq_num,
prune_flag,
Expand All @@ -261,10 +316,8 @@ pub fn run() -> Result<(

Ok(())
});

shutdown_rx.await.unwrap();
});
});

Ok((shutdown_tx, to_network, from_network))
(to_network, from_network)
}
}
Loading